Commit f8e7af9d authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

refactor on ngx_http_push_stream_delete_unrecoverable_channels removing unecessary loop

parent f9572d6a
......@@ -60,9 +60,6 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *cur_worker;
ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_queue_elem_t *cur_subscriber;
ngx_http_push_stream_subscription_t *cur_subscription;
channel = (ngx_http_push_stream_channel_t *) node;
......@@ -84,46 +81,36 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data
// find the current work
while ((cur_worker = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur_worker->queue)) != &channel->workers_with_subscribers) {
if (cur_worker->slot == ngx_process_slot) {
if (cur_worker->pid == ngx_pid) {
// to each subscriber of this channel in this worker
while(!ngx_queue_empty(&cur_worker->subscribers_sentinel.queue)) {
while (!ngx_queue_empty(&cur_worker->subscribers_sentinel.queue)) {
cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur_worker->subscribers_sentinel.queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
// find the subscriber subscriptions on the worker
cur_subscriber = thisworker_data->subscribers_sentinel;
while ((cur_subscriber = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur_subscriber->queue)) != thisworker_data->subscribers_sentinel) {
ngx_http_push_stream_subscriber_t *worker_subscriber = (ngx_http_push_stream_subscriber_t *) cur_subscriber->value;
if (worker_subscriber->request == subscriber->request) {
// find the subscription for the channel being deleted
cur_subscription = &subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) {
if (cur_subscription->channel == channel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
// find the subscription for the channel being deleted
cur_subscription = &worker_subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &worker_subscriber->subscriptions_sentinel) {
if (cur_subscription->channel == channel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
ngx_shmtx_lock(&shpool->mutex);
// remove the reference from subscription for channel
ngx_queue_remove(&cur_subscription->queue);
// remove the reference from channel for subscriber
ngx_queue_remove(&cur->queue);
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_lock(&shpool->mutex);
// remove the reference from subscription for channel
ngx_queue_remove(&cur_subscription->queue);
// remove the reference from channel for subscriber
ngx_queue_remove(&cur->queue);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message);
break;
}
}
// subscriber does not have any other subscription, the connection may be closed
if (ngx_queue_empty(&worker_subscriber->subscriptions_sentinel.queue)) {
ngx_http_push_stream_send_response_finalize(worker_subscriber->request);
}
ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message);
break;
}
}
// subscriber does not have any other subscription, the connection may be closed
if (ngx_queue_empty(&subscriber->subscriptions_sentinel.queue)) {
ngx_http_push_stream_send_response_finalize(subscriber->request);
}
}
}
}
......@@ -902,7 +889,7 @@ ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request
u_char *cur = r->headers_in.accept->value.data;
size_t rem = 0;
while((cur != NULL) && (cur = ngx_strnstr(cur, "/", r->headers_in.accept->value.len)) != NULL) {
while ((cur != NULL) && (cur = ngx_strnstr(cur, "/", r->headers_in.accept->value.len)) != NULL) {
cur = cur + 1;
rem = r->headers_in.accept->value.len - (r->headers_in.accept->value.data - cur);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment