Commit 64f0dc1a authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

unifying types: ngx_http_push_stream_worker_subscriber_t and...

unifying types: ngx_http_push_stream_worker_subscriber_t and ngx_http_push_stream_subscriber_t to reduce memory consumption
parent 486d49fa
...@@ -31,6 +31,11 @@ ...@@ -31,6 +31,11 @@
#include <ngx_http.h> #include <ngx_http.h>
#include <nginx.h> #include <nginx.h>
typedef struct {
ngx_queue_t queue;
void *value;
} ngx_http_push_stream_queue_elem_t;
// template queue // template queue
typedef struct { typedef struct {
ngx_queue_t queue; // this MUST be first ngx_queue_t queue; // this MUST be first
...@@ -95,19 +100,13 @@ typedef struct { ...@@ -95,19 +100,13 @@ typedef struct {
} ngx_http_push_stream_msg_t; } ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t; typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t;
typedef struct ngx_http_push_stream_subscriber_s ngx_http_push_stream_subscriber_t;
// subscriber request queue
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_http_request_t *request;
ngx_flag_t longpolling;
} ngx_http_push_stream_subscriber_t;
typedef struct { typedef struct {
ngx_queue_t queue; ngx_queue_t queue;
pid_t pid; pid_t pid;
ngx_int_t slot; ngx_int_t slot;
ngx_http_push_stream_subscriber_t subscriber_sentinel; ngx_http_push_stream_queue_elem_t subscribers_sentinel;
} ngx_http_push_stream_pid_queue_t; } ngx_http_push_stream_pid_queue_t;
// our typecast-friendly rbtree node (channel) // our typecast-friendly rbtree node (channel)
...@@ -140,16 +139,17 @@ typedef struct { ...@@ -140,16 +139,17 @@ typedef struct {
ngx_queue_t queue; ngx_queue_t queue;
ngx_http_push_stream_subscriber_t *subscriber; ngx_http_push_stream_subscriber_t *subscriber;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_queue_elem_t *channel_subscriber_element_ref;
} ngx_http_push_stream_subscription_t; } ngx_http_push_stream_subscription_t;
typedef struct { struct ngx_http_push_stream_subscriber_s {
ngx_queue_t queue; // this MUST be first
ngx_http_request_t *request; ngx_http_request_t *request;
ngx_http_push_stream_subscription_t subscriptions_sentinel; ngx_http_push_stream_subscription_t subscriptions_sentinel;
ngx_http_push_stream_subscriber_cleanup_t *clndata; ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_pid_t worker_subscribed_pid; ngx_pid_t worker_subscribed_pid;
ngx_flag_t longpolling; ngx_flag_t longpolling;
} ngx_http_push_stream_worker_subscriber_t; ngx_http_push_stream_queue_elem_t *worker_subscriber_element_ref;
};
typedef struct { typedef struct {
ngx_event_t *disconnect_timer; ngx_event_t *disconnect_timer;
...@@ -159,7 +159,7 @@ typedef struct { ...@@ -159,7 +159,7 @@ typedef struct {
// cleaning supplies // cleaning supplies
struct ngx_http_push_stream_subscriber_cleanup_s { struct ngx_http_push_stream_subscriber_cleanup_s {
ngx_http_push_stream_worker_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
}; };
// messages to worker processes // messages to worker processes
...@@ -168,12 +168,12 @@ typedef struct { ...@@ -168,12 +168,12 @@ typedef struct {
ngx_http_push_stream_msg_t *msg; // ->shared memory ngx_http_push_stream_msg_t *msg; // ->shared memory
ngx_pid_t pid; ngx_pid_t pid;
ngx_http_push_stream_channel_t *channel; // ->shared memory ngx_http_push_stream_channel_t *channel; // ->shared memory
ngx_http_push_stream_subscriber_t *subscriber_sentinel; // ->a worker's local pool ngx_http_push_stream_queue_elem_t *subscribers_sentinel; // ->a worker's local pool
} ngx_http_push_stream_worker_msg_t; } ngx_http_push_stream_worker_msg_t;
typedef struct { typedef struct {
ngx_http_push_stream_worker_msg_t *messages_queue; ngx_http_push_stream_worker_msg_t *messages_queue;
ngx_http_push_stream_worker_subscriber_t *worker_subscribers_sentinel; ngx_http_push_stream_queue_elem_t *subscribers_sentinel;
ngx_uint_t subscribers; // # of subscribers in the worker ngx_uint_t subscribers; // # of subscribers in the worker
time_t startup; time_t startup;
pid_t pid; pid_t pid;
......
...@@ -56,7 +56,7 @@ static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int ...@@ -56,7 +56,7 @@ static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int
#define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS); #define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS);
#define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL); #define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL);
static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_log_t *log); static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers); static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle); static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
...@@ -67,6 +67,6 @@ static void ngx_http_push_stream_channel_handler(ngx_event_t *ev); ...@@ -67,6 +67,6 @@ static void ngx_http_push_stream_channel_handler(ngx_event_t *ev);
static ngx_inline void ngx_http_push_stream_process_worker_message(void); static ngx_inline void ngx_http_push_stream_process_worker_message(void);
static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void); static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void);
static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *sentinel, ngx_http_push_stream_msg_t *msg); static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_http_push_stream_msg_t *msg);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_ */
...@@ -241,7 +241,7 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in ...@@ -241,7 +241,7 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in
#define ngx_http_push_stream_memory_cleanup_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event, ngx_http_push_stream_memory_cleanup_timer_wake_handler, 1); #define ngx_http_push_stream_memory_cleanup_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event, ngx_http_push_stream_memory_cleanup_timer_wake_handler, 1);
#define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages); #define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages);
static void ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber); static void ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len); static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg); static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg);
......
...@@ -135,7 +135,7 @@ ngx_http_push_stream_ipc_init_worker() ...@@ -135,7 +135,7 @@ ngx_http_push_stream_ipc_init_worker()
return NGX_ERROR; return NGX_ERROR;
} }
if ((data->ipc[ngx_process_slot].worker_subscribers_sentinel == NULL) && ((data->ipc[ngx_process_slot].worker_subscribers_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_subscriber_t))) == NULL)) { if ((data->ipc[ngx_process_slot].subscribers_sentinel == NULL) && ((data->ipc[ngx_process_slot].subscribers_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_queue_elem_t))) == NULL)) {
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR; return NGX_ERROR;
} }
...@@ -143,7 +143,7 @@ ngx_http_push_stream_ipc_init_worker() ...@@ -143,7 +143,7 @@ ngx_http_push_stream_ipc_init_worker()
data->ipc[ngx_process_slot].pid = ngx_pid; data->ipc[ngx_process_slot].pid = ngx_pid;
data->ipc[ngx_process_slot].startup = ngx_time(); data->ipc[ngx_process_slot].startup = ngx_time();
ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue); ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue);
ngx_queue_init(&data->ipc[ngx_process_slot].worker_subscribers_sentinel->queue); ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
data->subscribers = 0; data->subscribers = 0;
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked); ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked);
...@@ -167,18 +167,18 @@ ngx_http_push_stream_clean_worker_data() ...@@ -167,18 +167,18 @@ ngx_http_push_stream_clean_worker_data()
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_msg_t *cur_msg; ngx_http_push_stream_worker_msg_t *cur_msg;
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
if (data->ipc[ngx_process_slot].messages_queue != NULL) { if (data->ipc[ngx_process_slot].messages_queue != NULL) {
while ((cur_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&data->ipc[ngx_process_slot].messages_queue->queue)) != data->ipc[ngx_process_slot].messages_queue) { while ((cur_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&data->ipc[ngx_process_slot].messages_queue->queue)) != data->ipc[ngx_process_slot].messages_queue) {
ngx_queue_remove(&cur_msg->queue); ngx_queue_remove(&cur_msg->queue);
ngx_slab_free_locked(shpool, cur_msg); ngx_slab_free_locked(shpool, cur_msg);
} }
} }
if (data->ipc[ngx_process_slot].worker_subscribers_sentinel != NULL) { if (data->ipc[ngx_process_slot].subscribers_sentinel != NULL) {
ngx_queue_init(&data->ipc[ngx_process_slot].worker_subscribers_sentinel->queue); ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -257,17 +257,16 @@ ngx_http_push_stream_census_worker_subscribers(void) ...@@ -257,17 +257,16 @@ ngx_http_push_stream_census_worker_subscribers(void)
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *workers_data = data->ipc; ngx_http_push_stream_worker_data_t *workers_data = data->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot; ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *cur, *sentinel; ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_subscription_t *cur_subscription, *sentinel_subscription; ngx_http_push_stream_subscription_t *cur_subscription;
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
sentinel = thisworker_data->worker_subscribers_sentinel; cur = thisworker_data->subscribers_sentinel;
cur = sentinel; while ((cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur->queue)) != thisworker_data->subscribers_sentinel) {
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
sentinel_subscription = &cur->subscriptions_sentinel; cur_subscription = &subscriber->subscriptions_sentinel;
cur_subscription = sentinel_subscription; while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) {
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != sentinel_subscription) {
cur_subscription->channel->subscribers++; cur_subscription->channel->subscribers++;
} }
data->subscribers++; data->subscribers++;
...@@ -292,7 +291,7 @@ ngx_http_push_stream_process_worker_message(void) ...@@ -292,7 +291,7 @@ ngx_http_push_stream_process_worker_message(void)
while (worker_msg != sentinel) { while (worker_msg != sentinel) {
if (worker_msg->pid == ngx_pid) { if (worker_msg->pid == ngx_pid) {
// everything is okay // everything is okay
ngx_http_push_stream_respond_to_subscribers(worker_msg->channel, worker_msg->subscriber_sentinel, worker_msg->msg); ngx_http_push_stream_respond_to_subscribers(worker_msg->channel, worker_msg->subscribers_sentinel, worker_msg->msg);
} else { } else {
// that's quite bad you see. a previous worker died with an undelivered message. // that's quite bad you see. a previous worker died with an undelivered message.
// but all its subscribers' connections presumably got canned, too. so it's not so bad after all. // but all its subscribers' connections presumably got canned, too. so it's not so bad after all.
...@@ -329,7 +328,7 @@ ngx_http_push_stream_process_worker_message(void) ...@@ -329,7 +328,7 @@ ngx_http_push_stream_process_worker_message(void)
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_log_t *log) ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_log_t *log)
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc; ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
...@@ -347,7 +346,7 @@ ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel ...@@ -347,7 +346,7 @@ ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel
newmessage->msg = msg; newmessage->msg = msg;
newmessage->pid = pid; newmessage->pid = pid;
newmessage->subscriber_sentinel = subscriber_sentinel; newmessage->subscribers_sentinel = subscribers_sentinel;
newmessage->channel = channel; newmessage->channel = channel;
ngx_queue_insert_tail(&thisworker_data->messages_queue->queue, &newmessage->queue); ngx_queue_insert_tail(&thisworker_data->messages_queue->queue, &newmessage->queue);
...@@ -369,10 +368,9 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http ...@@ -369,10 +368,9 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
pid_t worker_pid = cur->pid; pid_t worker_pid = cur->pid;
ngx_int_t worker_slot = cur->slot; ngx_int_t worker_slot = cur->slot;
ngx_http_push_stream_subscriber_t *subscriber_sentinel = &cur->subscriber_sentinel;
// interprocess communication breakdown // interprocess communication breakdown
if (ngx_http_push_stream_send_worker_message(channel, subscriber_sentinel, worker_pid, worker_slot, msg, log) != NGX_ERROR) { if (ngx_http_push_stream_send_worker_message(channel, &cur->subscribers_sentinel, worker_pid, worker_slot, msg, log) != NGX_ERROR) {
ngx_http_push_stream_alert_worker_check_messages(worker_pid, worker_slot, log); ngx_http_push_stream_alert_worker_check_messages(worker_pid, worker_slot, log);
} else { } else {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with some other worker process"); ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with some other worker process");
...@@ -387,35 +385,34 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http ...@@ -387,35 +385,34 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *sentinel, ngx_http_push_stream_msg_t *msg) ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_http_push_stream_msg_t *msg)
{ {
ngx_http_push_stream_subscriber_t *cur; ngx_http_push_stream_queue_elem_t *cur = subscribers_sentinel;
if (sentinel == NULL) { if (subscribers_sentinel == NULL) {
return NGX_ERROR; return NGX_ERROR;
} }
cur = sentinel;
if (msg != NULL) { if (msg != NULL) {
// now let's respond to some requests! // now let's respond to some requests!
while ((cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur->queue)) != subscribers_sentinel) {
if (cur->longpolling) { ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
ngx_http_push_stream_subscriber_t *prev = (ngx_http_push_stream_subscriber_t *) ngx_queue_prev(&cur->queue); if (subscriber->longpolling) {
ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_add_polling_headers(cur->request, msg->time, msg->tag, cur->request->pool); ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->request->pool);
ngx_http_send_header(cur->request); ngx_http_send_header(subscriber->request);
ngx_http_push_stream_send_response_content_header(cur->request, ngx_http_get_module_loc_conf(cur->request, ngx_http_push_stream_module)); ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
ngx_http_push_stream_send_response_message(cur->request, channel, msg); ngx_http_push_stream_send_response_message(subscriber->request, channel, msg);
ngx_http_push_stream_send_response_finalize(cur->request); ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev; cur = prev;
} else { } else {
if (ngx_http_push_stream_send_response_message(cur->request, channel, msg) == NGX_ERROR) { if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg) == NGX_ERROR) {
ngx_http_push_stream_subscriber_t *prev = (ngx_http_push_stream_subscriber_t *) ngx_queue_prev(&cur->queue); ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_send_response_finalize(cur->request); ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev; cur = prev;
} }
} }
......
...@@ -737,7 +737,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -737,7 +737,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->ipc[i].startup = 0; d->ipc[i].startup = 0;
d->ipc[i].subscribers = 0; d->ipc[i].subscribers = 0;
d->ipc[i].messages_queue = NULL; d->ipc[i].messages_queue = NULL;
d->ipc[i].worker_subscribers_sentinel = NULL; d->ipc[i].subscribers_sentinel = NULL;
} }
d->startup = ngx_time(); d->startup = ngx_time();
......
...@@ -25,13 +25,13 @@ ...@@ -25,13 +25,13 @@
#include <ngx_http_push_stream_module_subscriber.h> #include <ngx_http_push_stream_module_subscriber.h>
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool); static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool);
static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r); static ngx_http_push_stream_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_worker_subscriber_t *worker_subscriber); static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id); static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id); static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log); static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log);
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_flag_t longpolling); static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log); static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool); static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool);
...@@ -40,7 +40,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -40,7 +40,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_worker_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur; ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_pool_t *temp_pool; ngx_pool_t *temp_pool;
ngx_uint_t subscribed_channels_qtd = 0; ngx_uint_t subscribed_channels_qtd = 0;
...@@ -176,7 +176,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -176,7 +176,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// adding subscriber to channel(s) and send backtrack messages // adding subscriber to channel(s) and send backtrack messages
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, last_event_id, &worker_subscriber->subscriptions_sentinel, temp_pool) != NGX_OK) { if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, last_event_id, worker_subscriber, temp_pool) != NGX_OK) {
ngx_destroy_pool(temp_pool); ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -192,7 +192,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -192,7 +192,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_requested_channel_t *cur; ngx_http_push_stream_requested_channel_t *cur;
ngx_http_push_stream_worker_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription; ngx_http_push_stream_subscription_t *subscription;
ngx_str_t *etag = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH); ngx_str_t *etag = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH);
...@@ -253,7 +253,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -253,7 +253,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel, longpolling)) == NULL) { if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel, worker_subscriber)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -312,7 +312,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -312,7 +312,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool) ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool)
{ {
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription; ngx_http_push_stream_subscription_t *subscription;
...@@ -324,7 +324,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http ...@@ -324,7 +324,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
return NGX_ERROR; return NGX_ERROR;
} }
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel, 0)) == NULL) { if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel, subscriber)) == NULL) {
return NGX_ERROR; return NGX_ERROR;
} }
...@@ -332,7 +332,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http ...@@ -332,7 +332,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, 0, 0, -1, last_event_id); ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, 0, 0, -1, last_event_id);
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, subscriptions_sentinel, r->connection->log); result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, &subscriber->subscriptions_sentinel, r->connection->log);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return result; return result;
...@@ -435,13 +435,13 @@ ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_ ...@@ -435,13 +435,13 @@ ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_
} }
} }
static ngx_http_push_stream_worker_subscriber_t * static ngx_http_push_stream_subscriber_t *
ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r) ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r)
{ {
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_pool_cleanup_t *cln; ngx_pool_cleanup_t *cln;
ngx_http_push_stream_subscriber_cleanup_t *clndata; ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_http_push_stream_worker_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
// attach a cleaner to remove the request from the channel // attach a cleaner to remove the request from the channel
if ((cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_http_push_stream_subscriber_cleanup_t))) == NULL) { if ((cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_http_push_stream_subscriber_cleanup_t))) == NULL) {
...@@ -449,7 +449,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque ...@@ -449,7 +449,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
return NULL; return NULL;
} }
if ((worker_subscriber = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_worker_subscriber_t))) == NULL) { if ((worker_subscriber = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_subscriber_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber");
return NULL; return NULL;
} }
...@@ -457,7 +457,6 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque ...@@ -457,7 +457,6 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
worker_subscriber->longpolling = 0; worker_subscriber->longpolling = 0;
worker_subscriber->request = r; worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid; worker_subscriber->worker_subscribed_pid = ngx_pid;
ngx_queue_init(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue); ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue);
// set a cleaner to subscriber // set a cleaner to subscriber
...@@ -483,16 +482,24 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque ...@@ -483,16 +482,24 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_worker_subscriber_t *worker_subscriber) ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot; ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_ctx_t *ctx;
ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl; ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl;
ngx_http_push_stream_queue_elem_t *element_subscriber;
ngx_http_push_stream_subscriber_ctx_t *ctx;
if ((element_subscriber = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_queue_elem_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscriber reference");
return NGX_ERROR;
}
element_subscriber->value = worker_subscriber;
worker_subscriber->worker_subscriber_element_ref = element_subscriber;
// adding subscriber to woker list of subscribers // adding subscriber to woker list of subscribers
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel->queue, &worker_subscriber->queue); ngx_queue_insert_tail(&thisworker_data->subscribers_sentinel->queue, &element_subscriber->queue);
if ((connection_ttl != NGX_CONF_UNSET_MSEC) || (cf->ping_message_interval != NGX_CONF_UNSET_MSEC)) { if ((connection_ttl != NGX_CONF_UNSET_MSEC) || (cf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
...@@ -639,30 +646,21 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p ...@@ -639,30 +646,21 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p
worker_sentinel->pid = ngx_pid; worker_sentinel->pid = ngx_pid;
worker_sentinel->slot = ngx_process_slot; worker_sentinel->slot = ngx_process_slot;
ngx_queue_init(&worker_sentinel->subscriber_sentinel.queue); ngx_queue_init(&worker_sentinel->subscribers_sentinel.queue);
return worker_sentinel; return worker_sentinel;
} }
static ngx_http_push_stream_subscription_t * static ngx_http_push_stream_subscription_t *
ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_flag_t longpolling) ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber)
{ {
ngx_http_push_stream_subscription_t *subscription; ngx_http_push_stream_subscription_t *subscription;
ngx_http_push_stream_subscriber_t *subscriber;
if ((subscription = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_subscription_t))) == NULL) { if ((subscription = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_subscription_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channel reference"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channel reference");
return NULL; return NULL;
} }
if ((subscriber = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_subscriber_t))) == NULL) { // unable to allocate request queue element
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channel reference");
return NULL;
}
subscriber->request = r;
subscriber->longpolling = longpolling;
subscription->channel = channel; subscription->channel = channel;
subscription->subscriber = subscriber; subscription->subscriber = subscriber;
...@@ -673,7 +671,8 @@ static ngx_int_t ...@@ -673,7 +671,8 @@ static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log) ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log)
{ {
ngx_http_push_stream_pid_queue_t *cur, *worker_subscribers_sentinel = NULL; ngx_http_push_stream_pid_queue_t *cur, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_queue_elem_t *element_subscriber;
// check if channel still exists // check if channel still exists
if ((channel = ngx_http_push_stream_find_channel_locked(channel_id, log)) == NULL) { if ((channel = ngx_http_push_stream_find_channel_locked(channel_id, log)) == NULL) {
...@@ -696,8 +695,15 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo ...@@ -696,8 +695,15 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
} }
} }
if ((element_subscriber = ngx_palloc(subscription->subscriber->request->pool, sizeof(ngx_http_push_stream_queue_elem_t))) == NULL) { // unable to allocate request queue element
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate subscriber reference");
return NGX_ERROR;
}
element_subscriber->value = subscription->subscriber;
subscription->channel_subscriber_element_ref = element_subscriber;
channel->subscribers++; // do this only when we know everything went okay channel->subscribers++; // do this only when we know everything went okay
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue); ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriber_sentinel.queue, &subscription->subscriber->queue); ngx_queue_insert_tail(&worker_subscribers_sentinel->subscribers_sentinel.queue, &element_subscriber->queue);
return NGX_OK; return NGX_OK;
} }
...@@ -59,10 +59,10 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data ...@@ -59,10 +59,10 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data
{ {
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *cur_worker; ngx_http_push_stream_pid_queue_t *cur_worker;
ngx_http_push_stream_subscriber_t *cur; ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc; ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot; ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *worker_subscriber; ngx_http_push_stream_queue_elem_t *cur_subscriber;
ngx_http_push_stream_subscription_t *cur_subscription; ngx_http_push_stream_subscription_t *cur_subscription;
channel = (ngx_http_push_stream_channel_t *) node; channel = (ngx_http_push_stream_channel_t *) node;
...@@ -87,13 +87,15 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data ...@@ -87,13 +87,15 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data
if (cur_worker->slot == ngx_process_slot) { if (cur_worker->slot == ngx_process_slot) {
// to each subscriber of this channel in this worker // to each subscriber of this channel in this worker
while(!ngx_queue_empty(&cur_worker->subscriber_sentinel.queue)) { while(!ngx_queue_empty(&cur_worker->subscribers_sentinel.queue)) {
cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur_worker->subscriber_sentinel.queue); cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur_worker->subscribers_sentinel.queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
// find the subscriber subscriptions on the worker // find the subscriber subscriptions on the worker
worker_subscriber = thisworker_data->worker_subscribers_sentinel; cur_subscriber = thisworker_data->subscribers_sentinel;
while ((worker_subscriber = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&worker_subscriber->queue)) != thisworker_data->worker_subscribers_sentinel) { while ((cur_subscriber = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur_subscriber->queue)) != thisworker_data->subscribers_sentinel) {
if (worker_subscriber->request == cur->request) { ngx_http_push_stream_subscriber_t *worker_subscriber = (ngx_http_push_stream_subscriber_t *) cur_subscriber->value;
if (worker_subscriber->request == subscriber->request) {
// find the subscription for the channel being deleted // find the subscription for the channel being deleted
cur_subscription = &worker_subscriber->subscriptions_sentinel; cur_subscription = &worker_subscriber->subscriptions_sentinel;
...@@ -108,7 +110,7 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data ...@@ -108,7 +110,7 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_send_response_message(cur->request, channel, channel->channel_deleted_message); ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message);
break; break;
} }
...@@ -638,19 +640,17 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for ...@@ -638,19 +640,17 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_msg_t *sentinel, *cur, *next; ngx_http_push_stream_msg_t *cur, *prev;
sentinel = &data->messages_to_delete;
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
cur = (ngx_http_push_stream_msg_t *)ngx_queue_next(&sentinel->queue); cur = &data->messages_to_delete;
while (cur != sentinel) { while ((cur = (ngx_http_push_stream_msg_t *)ngx_queue_next(&cur->queue)) != &data->messages_to_delete) {
next = (ngx_http_push_stream_msg_t *)ngx_queue_next(&cur->queue);
if ((ngx_time() > cur->expires) || force) { if ((ngx_time() > cur->expires) || force) {
prev = (ngx_http_push_stream_msg_t *)ngx_queue_prev(&cur->queue);
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
ngx_http_push_stream_free_message_memory_locked(shpool, cur); ngx_http_push_stream_free_message_memory_locked(shpool, cur);
cur = prev;
} }
cur = next;
} }
ngx_http_push_stream_free_memory_of_expired_channels_locked(&data->channels_to_delete, shpool, data->channels_to_delete.root, force); ngx_http_push_stream_free_memory_of_expired_channels_locked(&data->channels_to_delete, shpool, data->channels_to_delete.root, force);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -665,7 +665,7 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt ...@@ -665,7 +665,7 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
u_int i; u_int i;
if (msg->formatted_messages != NULL) { if (msg->formatted_messages != NULL) {
for(i = 0; i < ngx_http_push_stream_module_main_conf->qtd_templates; i++) { for (i = 0; i < ngx_http_push_stream_module_main_conf->qtd_templates; i++) {
ngx_str_t *formmated = (msg->formatted_messages + i); ngx_str_t *formmated = (msg->formatted_messages + i);
if ((formmated != NULL) && (formmated->data != NULL)) { if ((formmated != NULL) && (formmated->data != NULL)) {
ngx_slab_free_locked(shpool, formmated->data); ngx_slab_free_locked(shpool, formmated->data);
...@@ -860,7 +860,7 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx ...@@ -860,7 +860,7 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
static void static void
ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber) ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber)
{ {
ngx_http_push_stream_subscription_t *cur, *sentinel; ngx_http_push_stream_subscription_t *cur, *sentinel;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
...@@ -880,12 +880,12 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worke ...@@ -880,12 +880,12 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worke
while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers);
ngx_queue_remove(&cur->subscriber->queue); ngx_queue_remove(&cur->channel_subscriber_element_ref->queue);
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
} }
ngx_queue_init(&sentinel->queue); ngx_queue_init(&sentinel->queue);
ngx_queue_remove(&worker_subscriber->queue); ngx_queue_remove(&worker_subscriber->worker_subscriber_element_ref->queue);
ngx_queue_init(&worker_subscriber->queue); ngx_queue_init(&worker_subscriber->worker_subscriber_element_ref->queue);
worker_subscriber->clndata->worker_subscriber = NULL; worker_subscriber->clndata->worker_subscriber = NULL;
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment