Commit 3693c9c6 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

change the workers_with_subscribers type on the ngx_http_push_stream_channel_t structure

parent 4809c1e0
......@@ -137,7 +137,7 @@ typedef struct {
ngx_int_t last_message_tag;
ngx_uint_t stored_messages;
ngx_uint_t subscribers;
ngx_http_push_stream_pid_queue_t workers_with_subscribers;
ngx_queue_t workers_with_subscribers;
ngx_queue_t message_queue;
time_t last_activity_time;
time_t expires;
......
......@@ -152,13 +152,15 @@ ngx_http_push_stream_ipc_init_worker()
static ngx_int_t
ngx_http_push_stream_unsubscribe_worker_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *cur = sentinel;
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if ((cur->pid == ngx_pid) || (cur->slot == ngx_process_slot)) {
ngx_queue_remove(&cur->queue);
ngx_slab_free_locked(shpool, cur);
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
if ((worker->pid == ngx_pid) || (worker->slot == ngx_process_slot)) {
ngx_queue_remove(&worker->queue);
ngx_slab_free_locked(shpool, worker);
break;
}
}
......@@ -304,20 +306,21 @@ ngx_http_push_stream_process_worker_message(void)
// that's quite bad you see. a previous worker died with an undelivered message.
// but all its subscribers' connections presumably got canned, too. so it's not so bad after all.
ngx_http_push_stream_pid_queue_t *channel_worker_sentinel = &worker_msg->channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *channel_worker_cur = channel_worker_sentinel;
ngx_queue_t *cur_worker;
ngx_http_push_stream_pid_queue_t *worker;
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: worker %i intercepted a message intended for another worker process (%i) that probably died", ngx_pid, worker_msg->pid);
// delete that invalid sucker
while ((channel_worker_cur != NULL) && (channel_worker_cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&channel_worker_cur->queue)) != channel_worker_sentinel) {
if (channel_worker_cur->pid == worker_msg->pid) {
cur_worker = &worker_msg->channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &worker_msg->channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == worker_msg->pid) {
ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "push stream module: reference to worker %i will be removed", worker_msg->pid);
ngx_shmtx_lock(&shpool->mutex);
ngx_queue_remove(&channel_worker_cur->queue);
ngx_slab_free_locked(shpool, channel_worker_cur);
ngx_queue_remove(&worker->queue);
ngx_slab_free_locked(shpool, worker);
ngx_shmtx_unlock(&shpool->mutex);
channel_worker_cur = NULL;
break;
}
}
......@@ -361,22 +364,25 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
{
// subscribers are queued up in a local pool. Queue heads, however, are located
// in shared memory, identified by pid.
ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *cur = sentinel;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_flag_t queue_was_empty[NGX_MAX_PROCESSES];
ngx_shmtx_lock(&shpool->mutex);
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_http_push_stream_send_worker_message_locked(channel, &cur->subscribers_sentinel, cur->pid, cur->slot, msg, &queue_was_empty[cur->slot], log);
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscribers_sentinel, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log);
}
ngx_shmtx_unlock(&shpool->mutex);
cur = sentinel;
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
// interprocess communication breakdown
if (queue_was_empty[cur->slot] && (ngx_http_push_stream_alert_worker_check_messages(cur->pid, cur->slot, log) != NGX_OK)) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", cur->pid, cur->slot);
if (queue_was_empty[worker->slot] && (ngx_http_push_stream_alert_worker_check_messages(worker->pid, worker->slot, log) != NGX_OK)) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", worker->pid, worker->slot);
}
}
......
......@@ -664,7 +664,7 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p
}
// initialize
ngx_queue_insert_tail(&channel->workers_with_subscribers.queue, &worker_sentinel->queue);
ngx_queue_insert_tail(&channel->workers_with_subscribers, &worker_sentinel->queue);
worker_sentinel->pid = ngx_pid;
worker_sentinel->slot = ngx_process_slot;
......@@ -692,7 +692,8 @@ ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http
static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log)
{
ngx_http_push_stream_pid_queue_t *cur, *worker_subscribers_sentinel = NULL;
ngx_queue_t *cur_worker;
ngx_http_push_stream_pid_queue_t *worker, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_queue_elem_t *element_subscriber;
......@@ -702,10 +703,11 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
return NGX_ERROR;
}
cur = &channel->workers_with_subscribers;
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != &channel->workers_with_subscribers) {
if (cur->pid == ngx_pid) {
worker_subscribers_sentinel = cur;
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == ngx_pid) {
worker_subscribers_sentinel = worker;
break;
}
}
......
......@@ -61,13 +61,14 @@ static void
ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool)
{
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *cur_worker;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_subscription_t *cur_subscription;
ngx_queue_t *prev_channel, *cur_channel = &data->channels_to_delete;
while ((cur_channel = ngx_queue_next(cur_channel)) != &data->channels_to_delete) {
while ((cur_channel = ngx_queue_next(cur_channel)) && (cur_channel != NULL) && (cur_channel != &data->channels_to_delete)) {
channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue);
// remove subscribers if any
......@@ -75,12 +76,13 @@ ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_
cur_worker = &channel->workers_with_subscribers;
// find the current worker
while ((cur_worker = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur_worker->queue)) != &channel->workers_with_subscribers) {
if (cur_worker->pid == ngx_pid) {
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == ngx_pid) {
// to each subscriber of this channel in this worker
while (!ngx_queue_empty(&cur_worker->subscribers_sentinel.queue)) {
cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur_worker->subscribers_sentinel.queue);
while (!ngx_queue_empty(&worker->subscribers_sentinel.queue)) {
cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&worker->subscribers_sentinel.queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
// find the subscription for the channel being deleted
......@@ -662,7 +664,8 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_pid_queue_t *cur;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_shmtx_lock(&shpool->mutex);
......@@ -690,13 +693,14 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
}
// send signal to each worker with subscriber to this channel
cur = &channel->workers_with_subscribers;
cur_worker = &channel->workers_with_subscribers;
if (ngx_queue_empty(&channel->workers_with_subscribers.queue)) {
if (ngx_queue_empty(&channel->workers_with_subscribers)) {
ngx_http_push_stream_alert_worker_delete_channel(ngx_pid, ngx_process_slot, ngx_cycle->log);
} else {
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != &channel->workers_with_subscribers) {
ngx_http_push_stream_alert_worker_delete_channel(cur->pid, cur->slot, ngx_cycle->log);
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
ngx_http_push_stream_alert_worker_delete_channel(worker->pid, worker->slot, ngx_cycle->log);
}
}
}
......@@ -786,11 +790,13 @@ static void
nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel)
{
// delete the worker-subscriber queue
ngx_http_push_stream_pid_queue_t *cur;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
while ((cur = (ngx_http_push_stream_pid_queue_t *)ngx_queue_next(&channel->workers_with_subscribers.queue)) != &channel->workers_with_subscribers) {
ngx_queue_remove(&cur->queue);
ngx_slab_free_locked(shpool, cur);
while ((cur_worker = ngx_queue_head(&channel->workers_with_subscribers)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
ngx_queue_remove(&worker->queue);
ngx_slab_free_locked(shpool, worker);
}
if (channel->channel_deleted_message != NULL) ngx_http_push_stream_free_message_memory_locked(shpool, channel->channel_deleted_message);
......@@ -834,7 +840,7 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
ngx_queue_t *cur;
ngx_shmtx_lock(&shpool->mutex);
while ((cur = ngx_queue_head(&data->messages_trash)) != &data->messages_trash) {
while ((cur = ngx_queue_head(&data->messages_trash)) && (cur != NULL) && (cur != &data->messages_trash)) {
message = ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
if (force || ((message->workers_ref_count <= 0) && (ngx_time() > message->expires))) {
......
......@@ -170,7 +170,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_http_push_stream_initialize_channel(channel);
// initialize workers_with_subscribers queues only when a channel is created
ngx_queue_init(&channel->workers_with_subscribers.queue);
ngx_queue_init(&channel->workers_with_subscribers);
ngx_shmtx_unlock(&shpool->mutex);
return channel;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment