Commit 5b8b865c authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

simplifying the relation between subscription (subscriber <-> channel) and worker structures

parent 571f222f
...@@ -123,7 +123,7 @@ typedef struct { ...@@ -123,7 +123,7 @@ typedef struct {
ngx_queue_t queue; ngx_queue_t queue;
pid_t pid; pid_t pid;
ngx_int_t slot; ngx_int_t slot;
ngx_http_push_stream_queue_elem_t subscribers_sentinel; ngx_queue_t subscriptions_queue;
} ngx_http_push_stream_pid_queue_t; } ngx_http_push_stream_pid_queue_t;
// our typecast-friendly rbtree node (channel) // our typecast-friendly rbtree node (channel)
...@@ -157,9 +157,9 @@ typedef struct { ...@@ -157,9 +157,9 @@ typedef struct {
typedef struct { typedef struct {
ngx_queue_t queue; ngx_queue_t queue;
ngx_queue_t channel_worker_queue;
ngx_http_push_stream_subscriber_t *subscriber; ngx_http_push_stream_subscriber_t *subscriber;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_queue_elem_t *channel_subscriber_element_ref;
} ngx_http_push_stream_subscription_t; } ngx_http_push_stream_subscription_t;
struct ngx_http_push_stream_subscriber_s { struct ngx_http_push_stream_subscriber_s {
...@@ -188,7 +188,7 @@ typedef struct { ...@@ -188,7 +188,7 @@ typedef struct {
ngx_http_push_stream_msg_t *msg; // ->shared memory ngx_http_push_stream_msg_t *msg; // ->shared memory
ngx_pid_t pid; ngx_pid_t pid;
ngx_http_push_stream_channel_t *channel; // ->shared memory ngx_http_push_stream_channel_t *channel; // ->shared memory
ngx_http_push_stream_queue_elem_t *subscribers_sentinel; // ->a worker's local pool ngx_queue_t *subscriptions_sentinel; // ->a worker's local pool
} ngx_http_push_stream_worker_msg_t; } ngx_http_push_stream_worker_msg_t;
typedef struct { typedef struct {
......
...@@ -56,7 +56,7 @@ static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int ...@@ -56,7 +56,7 @@ static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int
#define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS) #define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS)
#define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL) #define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL)
static ngx_int_t ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log); static ngx_int_t ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers); static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle); static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
...@@ -67,6 +67,6 @@ static void ngx_http_push_stream_channel_handler(ngx_event_t *ev); ...@@ -67,6 +67,6 @@ static void ngx_http_push_stream_channel_handler(ngx_event_t *ev);
static ngx_inline void ngx_http_push_stream_process_worker_message(void); static ngx_inline void ngx_http_push_stream_process_worker_message(void);
static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void); static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void);
static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_http_push_stream_msg_t *msg); static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_http_push_stream_msg_t *msg);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_ */
...@@ -298,7 +298,7 @@ ngx_http_push_stream_process_worker_message(void) ...@@ -298,7 +298,7 @@ ngx_http_push_stream_process_worker_message(void)
worker_msg = ngx_queue_data(cur, ngx_http_push_stream_worker_msg_t, queue); worker_msg = ngx_queue_data(cur, ngx_http_push_stream_worker_msg_t, queue);
if (worker_msg->pid == ngx_pid) { if (worker_msg->pid == ngx_pid) {
// everything is okay // everything is okay
ngx_http_push_stream_respond_to_subscribers(worker_msg->channel, worker_msg->subscribers_sentinel, worker_msg->msg); ngx_http_push_stream_respond_to_subscribers(worker_msg->channel, worker_msg->subscriptions_sentinel, worker_msg->msg);
} else { } else {
// that's quite bad you see. a previous worker died with an undelivered message. // that's quite bad you see. a previous worker died with an undelivered message.
// but all its subscribers' connections presumably got canned, too. so it's not so bad after all. // but all its subscribers' connections presumably got canned, too. so it's not so bad after all.
...@@ -332,7 +332,7 @@ ngx_http_push_stream_process_worker_message(void) ...@@ -332,7 +332,7 @@ ngx_http_push_stream_process_worker_message(void)
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log) ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log)
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc; ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
...@@ -347,7 +347,7 @@ ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t * ...@@ -347,7 +347,7 @@ ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *
msg->workers_ref_count++; msg->workers_ref_count++;
newmessage->msg = msg; newmessage->msg = msg;
newmessage->pid = pid; newmessage->pid = pid;
newmessage->subscribers_sentinel = subscribers_sentinel; newmessage->subscriptions_sentinel = subscriptions_sentinel;
newmessage->channel = channel; newmessage->channel = channel;
*queue_was_empty = ngx_queue_empty(&thisworker_data->messages_queue); *queue_was_empty = ngx_queue_empty(&thisworker_data->messages_queue);
ngx_queue_insert_tail(&thisworker_data->messages_queue, &newmessage->queue); ngx_queue_insert_tail(&thisworker_data->messages_queue, &newmessage->queue);
...@@ -370,7 +370,7 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http ...@@ -370,7 +370,7 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
cur_worker = &channel->workers_with_subscribers; cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) { while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue); worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscribers_sentinel, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log); ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscriptions_queue, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log);
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -391,22 +391,21 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http ...@@ -391,22 +391,21 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_http_push_stream_msg_t *msg) ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_http_push_stream_msg_t *msg)
{ {
ngx_http_push_stream_queue_elem_t *cur = subscribers_sentinel; ngx_queue_t *cur = subscriptions_sentinel, *prev = NULL;
if (subscribers_sentinel == NULL) { if (subscriptions_sentinel == NULL) {
return NGX_ERROR; return NGX_ERROR;
} }
if (msg != NULL) { if (msg != NULL) {
// now let's respond to some requests! // now let's respond to some requests!
while ((cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur->queue)) != subscribers_sentinel) { while (((cur = ngx_queue_next(cur)) != subscriptions_sentinel) && (prev = ngx_queue_prev(cur))) {
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value; ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, channel_worker_queue);
ngx_http_push_stream_subscriber_t *subscriber = subscription->subscriber;
if (subscriber->longpolling) { if (subscriber->longpolling) {
ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_add_response_header(subscriber->request, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED); ngx_http_push_stream_add_response_header(subscriber->request, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->request->pool); ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->request->pool);
ngx_http_send_header(subscriber->request); ngx_http_send_header(subscriber->request);
...@@ -418,7 +417,6 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan ...@@ -418,7 +417,6 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
cur = prev; cur = prev;
} else { } else {
if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg, 0, 0) != NGX_OK) { if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg, 0, 0) != NGX_OK) {
ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_send_response_finalize(subscriber->request); ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev; cur = prev;
} else { } else {
......
...@@ -660,7 +660,7 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p ...@@ -660,7 +660,7 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p
worker_sentinel->pid = ngx_pid; worker_sentinel->pid = ngx_pid;
worker_sentinel->slot = ngx_process_slot; worker_sentinel->slot = ngx_process_slot;
ngx_queue_init(&worker_sentinel->subscribers_sentinel.queue); ngx_queue_init(&worker_sentinel->subscriptions_queue);
return worker_sentinel; return worker_sentinel;
} }
...@@ -687,7 +687,6 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo ...@@ -687,7 +687,6 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
ngx_queue_t *cur_worker; ngx_queue_t *cur_worker;
ngx_http_push_stream_pid_queue_t *worker, *worker_subscribers_sentinel = NULL; ngx_http_push_stream_pid_queue_t *worker, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_queue_elem_t *element_subscriber;
// check if channel still exists // check if channel still exists
if ((channel = ngx_http_push_stream_find_channel(channel_id, log)) == NULL) { if ((channel = ngx_http_push_stream_find_channel(channel_id, log)) == NULL) {
...@@ -711,17 +710,10 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo ...@@ -711,17 +710,10 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
} }
} }
if ((element_subscriber = ngx_palloc(subscription->subscriber->request->pool, sizeof(ngx_http_push_stream_queue_elem_t))) == NULL) { // unable to allocate request queue element
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate subscriber reference");
return NGX_ERROR;
}
element_subscriber->value = subscription->subscriber;
subscription->channel_subscriber_element_ref = element_subscriber;
channel->subscribers++; // do this only when we know everything went okay channel->subscribers++; // do this only when we know everything went okay
channel->last_activity_time = ngx_time(); channel->last_activity_time = ngx_time();
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue); ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscribers_sentinel.queue, &element_subscriber->queue); ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriptions_queue, &subscription->channel_worker_queue);
return NGX_OK; return NGX_OK;
} }
......
...@@ -62,9 +62,7 @@ ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_ ...@@ -62,9 +62,7 @@ ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_
{ {
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *worker; ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker; ngx_queue_t *cur_worker, *cur;
ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_subscription_t *cur_subscription;
ngx_queue_t *prev_channel, *cur_channel = &data->channels_to_delete; ngx_queue_t *prev_channel, *cur_channel = &data->channels_to_delete;
...@@ -84,37 +82,29 @@ ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_ ...@@ -84,37 +82,29 @@ ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue); worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == ngx_pid) { if (worker->pid == ngx_pid) {
// to each subscriber of this channel in this worker // to each subscription of this channel in this worker
while (!ngx_queue_empty(&worker->subscribers_sentinel.queue)) { while ((cur = ngx_queue_head(&worker->subscriptions_queue)) != &worker->subscriptions_queue) {
cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&worker->subscribers_sentinel.queue); ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, channel_worker_queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value; ngx_http_push_stream_subscriber_t *subscriber = subscription->subscriber;
// find the subscription for the channel being deleted ngx_shmtx_lock(&shpool->mutex);
cur_subscription = &subscriber->subscriptions_sentinel; NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) { // remove the subscription for the channel from subscriber
if (cur_subscription->channel == channel) { ngx_queue_remove(&subscription->queue);
// remove the subscription for the channel from worker
ngx_shmtx_lock(&shpool->mutex); ngx_queue_remove(&subscription->channel_worker_queue);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers); ngx_shmtx_unlock(&shpool->mutex);
// remove the reference from subscription for channel
ngx_queue_remove(&cur_subscription->queue); if (subscriber->longpolling) {
// remove the reference from channel for subscriber ngx_http_push_stream_add_response_header(subscriber->request, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
ngx_queue_remove(&cur->queue); ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool);
ngx_shmtx_unlock(&shpool->mutex); ngx_http_send_header(subscriber->request);
if (subscriber->longpolling) { ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
ngx_http_push_stream_add_response_header(subscriber->request, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED); }
ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool);
ngx_http_send_header(subscriber->request);
ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
}
ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message, 1, 1); ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message, 1, 1);
break;
}
}
// subscriber does not have any other subscription, the connection may be closed // subscriber does not have any other subscription, the connection may be closed
if (subscriber->longpolling || ngx_queue_empty(&subscriber->subscriptions_sentinel.queue)) { if (subscriber->longpolling || ngx_queue_empty(&subscriber->subscriptions_sentinel.queue)) {
...@@ -1174,7 +1164,7 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc ...@@ -1174,7 +1164,7 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc
while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers);
cur->channel->last_activity_time = ngx_time(); cur->channel->last_activity_time = ngx_time();
ngx_queue_remove(&cur->channel_subscriber_element_ref->queue); ngx_queue_remove(&cur->channel_worker_queue);
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
} }
ngx_queue_init(&sentinel->queue); ngx_queue_init(&sentinel->queue);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment