Commit af02c91b authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

refactor to simplify ngx_http_push_stream_process_worker_message function

parent 48e08f22
...@@ -280,15 +280,14 @@ ngx_http_push_stream_census_worker_subscribers(void) ...@@ -280,15 +280,14 @@ ngx_http_push_stream_census_worker_subscribers(void)
static ngx_inline void static ngx_inline void
ngx_http_push_stream_process_worker_message(void) ngx_http_push_stream_process_worker_message(void)
{ {
ngx_http_push_stream_worker_msg_t *prev_worker_msg, *worker_msg, *sentinel; ngx_http_push_stream_worker_msg_t *worker_msg, *sentinel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc; ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot; ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
sentinel = thisworker_data->messages_queue; sentinel = thisworker_data->messages_queue;
worker_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&sentinel->queue); while ((worker_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
while (worker_msg != sentinel) {
if (worker_msg->pid == ngx_pid) { if (worker_msg->pid == ngx_pid) {
// everything is okay // everything is okay
ngx_http_push_stream_respond_to_subscribers(worker_msg->channel, worker_msg->subscribers_sentinel, worker_msg->msg); ngx_http_push_stream_respond_to_subscribers(worker_msg->channel, worker_msg->subscribers_sentinel, worker_msg->msg);
...@@ -315,13 +314,10 @@ ngx_http_push_stream_process_worker_message(void) ...@@ -315,13 +314,10 @@ ngx_http_push_stream_process_worker_message(void)
} }
} }
prev_worker_msg = worker_msg;
worker_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&worker_msg->queue);
// free worker_msg already sent // free worker_msg already sent
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
ngx_queue_remove(&prev_worker_msg->queue); ngx_queue_remove(&worker_msg->queue);
ngx_slab_free_locked(shpool, prev_worker_msg); ngx_slab_free_locked(shpool, worker_msg);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment