Commit 4809c1e0 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

change the message_queue and subscribers_queue type on the...

change the message_queue and subscribers_queue type on the ngx_http_push_stream_worker_data_t structure
parent 0b864402
......@@ -192,8 +192,8 @@ typedef struct {
} ngx_http_push_stream_worker_msg_t;
typedef struct {
ngx_http_push_stream_worker_msg_t *messages_queue;
ngx_http_push_stream_queue_elem_t *subscribers_sentinel;
ngx_queue_t messages_queue;
ngx_queue_t subscribers_queue;
ngx_uint_t subscribers; // # of subscribers in the worker
time_t startup;
pid_t pid;
......
......@@ -130,20 +130,8 @@ ngx_http_push_stream_ipc_init_worker()
ngx_shmtx_lock(&shpool->mutex);
if ((data->ipc[ngx_process_slot].messages_queue == NULL) && ((data->ipc[ngx_process_slot].messages_queue = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL)) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
if ((data->ipc[ngx_process_slot].subscribers_sentinel == NULL) && ((data->ipc[ngx_process_slot].subscribers_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_queue_elem_t))) == NULL)) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
data->ipc[ngx_process_slot].pid = ngx_pid;
data->ipc[ngx_process_slot].startup = ngx_time();
ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue);
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
data->subscribers = 0;
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked);
......@@ -185,18 +173,14 @@ ngx_http_push_stream_clean_worker_data()
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_msg_t *cur_msg;
ngx_queue_t *cur_msg;
ngx_shmtx_lock(&shpool->mutex);
if (data->ipc[ngx_process_slot].messages_queue != NULL) {
while ((cur_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&data->ipc[ngx_process_slot].messages_queue->queue)) != data->ipc[ngx_process_slot].messages_queue) {
ngx_http_push_stream_free_worker_message_memory_locked(shpool, cur_msg);
}
while ((cur_msg = ngx_queue_next(&data->ipc[ngx_process_slot].messages_queue)) != &data->ipc[ngx_process_slot].messages_queue) {
ngx_http_push_stream_free_worker_message_memory_locked(shpool, ngx_queue_data(cur_msg, ngx_http_push_stream_worker_msg_t, queue));
}
if (data->ipc[ngx_process_slot].subscribers_sentinel != NULL) {
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
}
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_queue);
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_unsubscribe_worker_locked);
......@@ -277,14 +261,17 @@ ngx_http_push_stream_census_worker_subscribers(void)
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *workers_data = data->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_queue_elem_t *elem;
ngx_queue_t *cur;
ngx_http_push_stream_subscription_t *cur_subscription;
ngx_shmtx_lock(&shpool->mutex);
cur = thisworker_data->subscribers_sentinel;
while ((cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur->queue)) != thisworker_data->subscribers_sentinel) {
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
cur = &thisworker_data->subscribers_queue;
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &thisworker_data->subscribers_queue)) {
elem = ngx_queue_data(cur, ngx_http_push_stream_queue_elem_t, queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) elem->value;
cur_subscription = &subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) {
cur_subscription->channel->subscribers++;
......@@ -300,14 +287,16 @@ ngx_http_push_stream_census_worker_subscribers(void)
static ngx_inline void
ngx_http_push_stream_process_worker_message(void)
{
ngx_http_push_stream_worker_msg_t *worker_msg, *sentinel;
ngx_http_push_stream_worker_msg_t *worker_msg;
ngx_queue_t *cur;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
sentinel = thisworker_data->messages_queue;
while ((worker_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
cur = &thisworker_data->messages_queue;
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &thisworker_data->messages_queue)) {
worker_msg = ngx_queue_data(cur, ngx_http_push_stream_worker_msg_t, queue);
if (worker_msg->pid == ngx_pid) {
// everything is okay
ngx_http_push_stream_respond_to_subscribers(worker_msg->channel, worker_msg->subscribers_sentinel, worker_msg->msg);
......@@ -360,8 +349,8 @@ ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *
newmessage->pid = pid;
newmessage->subscribers_sentinel = subscribers_sentinel;
newmessage->channel = channel;
*queue_was_empty = ngx_queue_empty(&thisworker_data->messages_queue->queue);
ngx_queue_insert_tail(&thisworker_data->messages_queue->queue, &newmessage->queue);
*queue_was_empty = ngx_queue_empty(&thisworker_data->messages_queue);
ngx_queue_insert_tail(&thisworker_data->messages_queue, &newmessage->queue);
return NGX_OK;
}
......
......@@ -907,8 +907,8 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->ipc[i].pid = -1;
d->ipc[i].startup = 0;
d->ipc[i].subscribers = 0;
d->ipc[i].messages_queue = NULL;
d->ipc[i].subscribers_sentinel = NULL;
ngx_queue_init(&d->ipc[i].messages_queue);
ngx_queue_init(&d->ipc[i].subscribers_queue);
}
d->channels = 0;
......
......@@ -510,7 +510,7 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
worker_subscriber->worker_subscriber_element_ref = element_subscriber;
// adding subscriber to worker list of subscribers
ngx_queue_insert_tail(&thisworker_data->subscribers_sentinel->queue, &element_subscriber->queue);
ngx_queue_insert_tail(&thisworker_data->subscribers_queue, &element_subscriber->queue);
ctx->longpolling = worker_subscriber->longpolling;
ctx->subscriber = worker_subscriber;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment