Commit 9935347c authored by Wandenberg's avatar Wandenberg

refactor the loop on queues

parent c16c6bbe
......@@ -40,7 +40,7 @@ typedef struct {
// template queue
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_queue_t queue;
ngx_str_t *template;
ngx_uint_t index;
ngx_flag_t eventsource;
......@@ -64,7 +64,7 @@ typedef struct {
ngx_uint_t max_subscribers_per_channel;
ngx_uint_t max_messages_stored_per_channel;
ngx_uint_t max_channel_id_length;
ngx_http_push_stream_template_queue_t msg_templates;
ngx_queue_t msg_templates;
ngx_flag_t timeout_with_body;
ngx_regex_t *backtrack_parser_regex;
ngx_http_push_stream_msg_t *ping_msg;
......@@ -94,7 +94,7 @@ typedef struct {
ngx_http_complex_value_t *last_event_id;
ngx_http_complex_value_t *user_agent;
ngx_str_t padding_by_user_agent;
ngx_http_push_stream_padding_t *paddings;
ngx_queue_t *paddings;
ngx_http_complex_value_t *allowed_origins;
} ngx_http_push_stream_loc_conf_t;
......@@ -104,7 +104,7 @@ static ngx_str_t ngx_http_push_stream_global_shm_name = ngx_string("push_stre
// message queue
struct ngx_http_push_stream_msg_s {
ngx_queue_t queue; // this MUST be first
ngx_queue_t queue;
time_t expires;
time_t time;
ngx_flag_t deleted;
......@@ -126,13 +126,13 @@ typedef struct {
ngx_queue_t queue;
pid_t pid;
ngx_int_t slot;
ngx_queue_t subscriptions_queue;
ngx_queue_t subscriptions;
ngx_uint_t subscribers;
} ngx_http_push_stream_pid_queue_t;
// our typecast-friendly rbtree node (channel)
typedef struct {
ngx_rbtree_node_t node; // this MUST be first
ngx_rbtree_node_t node;
ngx_queue_t queue;
ngx_queue_t *queue_sentinel;
ngx_str_t id;
......@@ -168,14 +168,14 @@ typedef struct {
struct ngx_http_push_stream_subscriber_s {
ngx_http_request_t *request;
ngx_http_push_stream_subscription_t subscriptions_sentinel;
ngx_queue_t subscriptions;
ngx_pid_t worker_subscribed_pid;
ngx_flag_t longpolling;
ngx_queue_t worker_queue;
};
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_queue_t queue;
ngx_str_t *id;
ngx_uint_t backtrack_messages;
} ngx_http_push_stream_requested_channel_t;
......
......@@ -72,6 +72,6 @@ static ngx_inline void ngx_http_push_stream_process_worker_message(void);
static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void);
static ngx_inline void ngx_http_push_stream_cleanup_shutting_down_worker(void);
static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_http_push_stream_msg_t *msg);
static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions, ngx_http_push_stream_msg_t *msg);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_ */
......@@ -281,12 +281,12 @@ static ngx_inline void ngx_http_push_stream_delete_worker_channel(void);
static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request_t *r, ngx_uint_t default_subtype);
static ngx_http_push_stream_line_t * ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_join_with_crlf(ngx_http_push_stream_line_t *lines, ngx_pool_t *temp_pool);
static ngx_queue_t * ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_join_with_crlf(ngx_queue_t *lines, ngx_pool_t *temp_pool);
static ngx_http_push_stream_module_ctx_t * ngx_http_push_stream_add_request_context(ngx_http_request_t *r);
static ngx_http_push_stream_padding_t * ngx_http_push_stream_parse_paddings(ngx_conf_t *cf, ngx_str_t *paddings_by_user_agent);
static ngx_queue_t * ngx_http_push_stream_parse_paddings(ngx_conf_t *cf, ngx_str_t *paddings_by_user_agent);
static ngx_str_t * ngx_http_push_stream_get_formatted_current_time(ngx_pool_t *pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool);
......
......@@ -114,7 +114,7 @@ ngx_http_push_stream_send_response_channels_info(ngx_http_request_t *r, ngx_queu
ngx_int_t rc, content_len = 0;
ngx_chain_t *chain, *first = NULL, *last = NULL;
ngx_str_t *currenttime, *hostname, *text, *header_response;
ngx_queue_t *cur, *next;
ngx_queue_t *q;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
......@@ -124,16 +124,14 @@ ngx_http_push_stream_send_response_channels_info(ngx_http_request_t *r, ngx_queu
const ngx_str_t *tail = subtype->format_group_tail;
// format content body
cur = ngx_queue_head(queue_channel_info);
while (cur != queue_channel_info) {
next = ngx_queue_next(cur);
ngx_http_push_stream_channel_info_t *channel_info = (ngx_http_push_stream_channel_info_t *) cur;
for (q = ngx_queue_head(queue_channel_info); q != ngx_queue_sentinel(queue_channel_info); q = ngx_queue_next(q)) {
ngx_http_push_stream_channel_info_t *channel_info = ngx_queue_data(q, ngx_http_push_stream_channel_info_t, queue);
if ((chain = ngx_http_push_stream_get_buf(r)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
format = (next != queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item;
format = (q != ngx_queue_last(queue_channel_info)) ? subtype->format_group_item : subtype->format_group_last_item;
if ((text = ngx_http_push_stream_channel_info_formatted(r->pool, format, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to format channel info");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
......@@ -157,7 +155,6 @@ ngx_http_push_stream_send_response_channels_info(ngx_http_request_t *r, ngx_queu
}
last = chain;
cur = next;
}
// get formatted current time
......@@ -204,14 +201,14 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_queue_t queue_channel_info;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_queue_t *cur = &data->channels_queue;
ngx_queue_t *q;
ngx_http_push_stream_channel_t *channel;
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &data->channels_queue)) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
ngx_http_push_stream_channel_info_t *channel_info;
......@@ -244,14 +241,14 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
ngx_http_push_stream_channel_info_t *channel_info;
ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *cur = &requested_channels->queue;
ngx_queue_t *q;
ngx_uint_t qtd_channels = 0;
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
while ((cur = ngx_queue_next(cur)) != &requested_channels->queue) {
requested_channel = ngx_queue_data(cur, ngx_http_push_stream_requested_channel_t, queue);
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
// search for a existing channel with this id
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
......@@ -289,11 +286,12 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket) {
ngx_http_push_stream_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
ngx_http_push_stream_template_queue_t *sentinel = &mcf->msg_templates;
ngx_http_push_stream_template_queue_t *cur = sentinel;
ngx_queue_t *q;
ngx_http_push_stream_template_queue_t *cur;
ngx_str_t *aux = NULL;
while ((cur = (ngx_http_push_stream_template_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
for (q = ngx_queue_head(&mcf->msg_templates); q != ngx_queue_sentinel(&mcf->msg_templates); q = ngx_queue_next(q)) {
cur = ngx_queue_data(q, ngx_http_push_stream_template_queue_t, queue);
if ((ngx_memn2cmp(cur->template->data, template.data, cur->template->len, template.len) == 0) &&
(cur->eventsource == eventsource) && (cur->websocket == websocket)) {
return cur->index;
......@@ -313,6 +311,6 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n
cur->websocket = websocket;
cur->index = mcf->qtd_templates;
ngx_memcpy(cur->template->data, template.data, template.len);
ngx_queue_insert_tail(&mcf->msg_templates.queue, &cur->queue);
ngx_queue_insert_tail(&mcf->msg_templates, &cur->queue);
return cur->index;
}
......@@ -119,13 +119,13 @@ ngx_http_push_stream_ipc_init_worker(void)
{
ngx_slab_pool_t *global_shpool = (ngx_slab_pool_t *) ngx_http_push_stream_global_shm_zone->shm.addr;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
int i;
ngx_shmtx_lock(&global_shpool->mutex);
global_data->pid[ngx_process_slot] = ngx_pid;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_ipc_init_worker_data(data);
}
ngx_shmtx_unlock(&global_shpool->mutex);
......@@ -186,11 +186,10 @@ static ngx_int_t
ngx_http_push_stream_unsubscribe_worker_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_queue_t *q;
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
if ((worker->pid == ngx_pid) || (worker->slot == ngx_process_slot)) {
ngx_queue_remove(&worker->queue);
ngx_slab_free_locked(shpool, worker);
......@@ -206,20 +205,21 @@ static void
ngx_http_push_stream_clean_worker_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_queue_t *cur = &data->channels_queue;
ngx_queue_t *cur, *q;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *cur_msg;
ngx_http_push_stream_worker_msg_t *worker_msg;
ngx_shmtx_lock(&shpool->mutex);
while ((cur_msg = ngx_queue_next(&data->ipc[ngx_process_slot].messages_queue)) != &data->ipc[ngx_process_slot].messages_queue) {
ngx_http_push_stream_free_worker_message_memory_locked(shpool, ngx_queue_data(cur_msg, ngx_http_push_stream_worker_msg_t, queue));
while (!ngx_queue_empty(&data->ipc[ngx_process_slot].messages_queue)) {
cur = ngx_queue_head(&data->ipc[ngx_process_slot].messages_queue);
worker_msg = ngx_queue_data(cur, ngx_http_push_stream_worker_msg_t, queue);
ngx_http_push_stream_free_worker_message_memory_locked(shpool, worker_msg);
}
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_queue);
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &data->channels_queue)) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
ngx_http_push_stream_unsubscribe_worker_locked(channel, shpool);
}
......@@ -303,11 +303,11 @@ ngx_http_push_stream_census_worker_subscribers(void)
{
ngx_slab_pool_t *global_shpool = (ngx_slab_pool_t *) ngx_http_push_stream_global_shm_zone->shm.addr;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
ngx_shmtx_lock(&global_shpool->mutex);
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_census_worker_subscribers_data(data);
}
ngx_shmtx_unlock(&global_shpool->mutex);
......@@ -319,7 +319,6 @@ ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_dat
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_worker_data_t *thisworker_data = &data->ipc[ngx_process_slot];
ngx_queue_t *q, *cur, *cur_worker;
ngx_http_push_stream_subscription_t *cur_subscription;
int i;
ngx_shmtx_lock(&shpool->mutex);
......@@ -336,13 +335,12 @@ ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_dat
}
}
cur = &thisworker_data->subscribers_queue;
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &thisworker_data->subscribers_queue)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(cur, ngx_http_push_stream_subscriber_t, worker_queue);
for (q = ngx_queue_head(&thisworker_data->subscribers_queue); q != ngx_queue_sentinel(&thisworker_data->subscribers_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(q, ngx_http_push_stream_subscriber_t, worker_queue);
cur_subscription = &subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) {
cur_subscription->channel_worker_sentinel->subscribers++;
for (cur = ngx_queue_head(&subscriber->subscriptions); cur != ngx_queue_sentinel(&subscriber->subscriptions); cur = ngx_queue_next(cur)) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, queue);
subscription->channel_worker_sentinel->subscribers++;
}
thisworker_data->subscribers++;
}
......@@ -374,10 +372,10 @@ static ngx_inline void
ngx_http_push_stream_process_worker_message(void)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_process_worker_message_data(data);
}
}
......@@ -387,12 +385,13 @@ static ngx_inline void
ngx_http_push_stream_process_worker_message_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_http_push_stream_worker_msg_t *worker_msg;
ngx_queue_t *cur;
ngx_queue_t *cur, *q;
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
while ((cur = ngx_queue_head(&thisworker_data->messages_queue)) && (cur != NULL) && (cur != &thisworker_data->messages_queue)) {
while (!ngx_queue_empty(&thisworker_data->messages_queue)) {
cur = ngx_queue_head(&thisworker_data->messages_queue);
worker_msg = ngx_queue_data(cur, ngx_http_push_stream_worker_msg_t, queue);
if (worker_msg->pid == ngx_pid) {
// everything is okay
......@@ -401,15 +400,11 @@ ngx_http_push_stream_process_worker_message_data(ngx_http_push_stream_shm_data_t
// that's quite bad you see. a previous worker died with an undelivered message.
// but all its subscribers' connections presumably got canned, too. so it's not so bad after all.
ngx_queue_t *cur_worker;
ngx_http_push_stream_pid_queue_t *worker;
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: worker %i intercepted a message intended for another worker process (%i) that probably died", ngx_pid, worker_msg->pid);
// delete that invalid sucker
cur_worker = &worker_msg->channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &worker_msg->channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
for (q = ngx_queue_head(&worker_msg->channel->workers_with_subscribers); q != ngx_queue_sentinel(&worker_msg->channel->workers_with_subscribers); q = ngx_queue_next(q)) {
ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == worker_msg->pid) {
ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "push stream module: reference to worker %i will be removed", worker_msg->pid);
ngx_shmtx_lock(&shpool->mutex);
......@@ -460,21 +455,19 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
// subscribers are queued up in a local pool. Queue heads, however, are located
// in shared memory, identified by pid.
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_queue_t *q;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_flag_t queue_was_empty[NGX_MAX_PROCESSES];
ngx_shmtx_lock(&shpool->mutex);
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscriptions_queue, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log, mcf);
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscriptions, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log, mcf);
}
ngx_shmtx_unlock(&shpool->mutex);
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
// interprocess communication breakdown
if (queue_was_empty[worker->slot] && (ngx_http_push_stream_alert_worker_check_messages(worker->pid, worker->slot, log) != NGX_OK)) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", worker->pid, worker->slot);
......@@ -489,19 +482,20 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
}
static ngx_int_t
ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_http_push_stream_msg_t *msg)
ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions, ngx_http_push_stream_msg_t *msg)
{
ngx_queue_t *cur = subscriptions_sentinel, *prev = NULL;
ngx_queue_t *q;
if (subscriptions_sentinel == NULL) {
if (subscriptions == NULL) {
return NGX_ERROR;
}
if (msg != NULL) {
// now let's respond to some requests!
while (((cur = ngx_queue_next(cur)) != subscriptions_sentinel) && (prev = ngx_queue_prev(cur))) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, channel_worker_queue);
for (q = ngx_queue_head(subscriptions); q != ngx_queue_sentinel(subscriptions);) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, channel_worker_queue);
q = ngx_queue_next(q);
ngx_http_push_stream_subscriber_t *subscriber = subscription->subscriber;
if (subscriber->longpolling) {
ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->request->pool);
......@@ -510,12 +504,9 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
ngx_http_push_stream_send_response_message(subscriber->request, channel, msg, 1, 0);
ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev;
} else {
if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg, 0, 0) != NGX_OK) {
ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev;
} else {
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(subscriber->request, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module);
......
......@@ -36,8 +36,9 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_module_ctx_t *ctx;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_requested_channel_t *requested_channels, *requested_channel;
ngx_str_t vv_allowed_origins = ngx_null_string;
ngx_queue_t *q;
ngx_http_push_stream_set_expires(r, NGX_HTTP_PUSH_STREAM_EXPIRES_EPOCH, 0);
......@@ -75,28 +76,29 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
}
//get channels ids
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
requested_channels = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
if ((requested_channels == NULL) || ngx_queue_empty(&requested_channels->queue)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the push_stream_channels_path is required but is not set");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
}
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
// check if channel id isn't equals to ALL or contain wildcard
if ((ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(cur->id->data, '*') != NULL)) {
if ((ngx_memn2cmp(requested_channel->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, requested_channel->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(requested_channel->id->data, '*') != NULL)) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE);
}
// could not have a large size
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", cur->id->len);
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (requested_channel->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", requested_channel->id->len);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
}
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
// create the channel if doesn't exist
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf, mcf);
channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, cf, mcf);
if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
......@@ -109,7 +111,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
}
}
ctx->requested_channels = channels_ids;
ctx->requested_channels = requested_channels;
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
return ngx_http_push_stream_publisher_handle_after_read_body(r, ngx_http_push_stream_publisher_body_handler);
......@@ -119,7 +121,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
return ngx_http_push_stream_publisher_handle_after_read_body(r, ngx_http_push_stream_publisher_delete_handler);
}
return ngx_http_push_stream_send_response_channels_info_detailed(r, channels_ids);
return ngx_http_push_stream_send_response_channels_info_detailed(r, requested_channels);
}
static ngx_int_t
......@@ -198,7 +200,7 @@ ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
ngx_uint_t qtd_channels = 0;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *cur = &ctx->requested_channels->queue;
ngx_queue_t *q;
if (r->headers_in.content_length_n > 0) {
......@@ -212,8 +214,8 @@ ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
len = ngx_buf_size(buf);
}
while ((cur = ngx_queue_next(cur)) != &ctx->requested_channels->queue) {
requested_channel = ngx_queue_data(cur, ngx_http_push_stream_requested_channel_t, queue);
for (q = ngx_queue_head(&ctx->requested_channels->queue); q != ngx_queue_sentinel(&ctx->requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
if (ngx_http_push_stream_delete_channel(mcf, requested_channel->id, text, len, r->pool)) {
qtd_channels++;
}
......@@ -236,7 +238,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *cur = &ctx->requested_channels->queue;
ngx_queue_t *q;
// check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) {
......@@ -256,8 +258,8 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
event_type = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE);
while ((cur = ngx_queue_next(cur)) != &ctx->requested_channels->queue) {
requested_channel = ngx_queue_data(cur, ngx_http_push_stream_requested_channel_t, queue);
for (q = ngx_queue_head(&ctx->requested_channels->queue); q != ngx_queue_sentinel(&ctx->requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_add_msg_to_channel(r, requested_channel->id, buf->pos, ngx_buf_size(buf), event_id, event_type, r->pool);
if (channel == NULL) {
......@@ -280,7 +282,9 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
char *pos = NULL;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_requested_channel_t *requested_channels, *requested_channel;
ngx_queue_t *q;
ngx_http_push_stream_set_expires(r, NGX_HTTP_PUSH_STREAM_EXPIRES_EPOCH, 0);
......@@ -294,37 +298,38 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_COMMIT, &NGX_HTTP_PUSH_STREAM_COMMIT);
//get channels ids
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
requested_channels = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
// if not specify a channel id, get info about all channels in a resumed way
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
if ((requested_channels == NULL) || ngx_queue_empty(&requested_channels->queue)) {
return ngx_http_push_stream_send_response_all_channels_info_summarized(r);
}
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
// could not have a large size
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", cur->id->len);
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (requested_channel->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", requested_channel->id->len);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
}
if ((pos = ngx_strchr(cur->id->data, '*')) != NULL) {
if ((pos = ngx_strchr(requested_channel->id->data, '*')) != NULL) {
ngx_str_t *aux = NULL;
if (pos != (char *) cur->id->data) {
if (pos != (char *) requested_channel->id->data) {
*pos = '\0';
cur->id->len = ngx_strlen(cur->id->data);
aux = cur->id;
requested_channel->id->len = ngx_strlen(requested_channel->id->data);
aux = requested_channel->id;
}
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, aux);
}
// if specify a channel id equals to ALL, get info about all channels in a detailed way
if (ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
if (ngx_memn2cmp(requested_channel->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, requested_channel->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, NULL);
}
}
// if specify a channels ids != ALL, get info about specified channels if they exists
return ngx_http_push_stream_send_response_channels_info_detailed(r, channels_ids);
return ngx_http_push_stream_send_response_channels_info_detailed(r, requested_channels);
}
......@@ -452,7 +452,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->timeout_with_body = NGX_CONF_UNSET;
mcf->ping_msg = NULL;
mcf->longpooling_timeout_msg = NULL;
ngx_queue_init(&mcf->msg_templates.queue);
ngx_queue_init(&mcf->msg_templates);
return mcf;
}
......@@ -756,8 +756,9 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
return NGX_CONF_ERROR;
}
ngx_http_push_stream_padding_t *padding = conf->paddings;
while ((padding = (ngx_http_push_stream_padding_t *) ngx_queue_next(&padding->queue)) != conf->paddings) {
ngx_queue_t *q;
for (q = ngx_queue_head(conf->paddings); q != ngx_queue_sentinel(conf->paddings); q = ngx_queue_next(q)) {
ngx_http_push_stream_padding_t *padding = ngx_queue_data(q, ngx_http_push_stream_padding_t, queue);
ngx_http_push_stream_padding_max_len = ngx_max(ngx_http_push_stream_padding_max_len, padding->header_min_len);
ngx_http_push_stream_padding_max_len = ngx_max(ngx_http_push_stream_padding_max_len, padding->message_min_len);
}
......@@ -887,10 +888,10 @@ ngx_http_push_stream_set_shm_size_slot(ngx_conf_t *cf, ngx_command_t *cmd, void
name = (cf->args->nelts > 2) ? &value[2] : &ngx_http_push_stream_shm_name;
if ((ngx_http_push_stream_global_shm_zone != NULL) && (ngx_http_push_stream_global_shm_zone->data != NULL)) {
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
if ((name->len == data->shm_zone->shm.name.len) &&
(ngx_strncmp(name->data, data->shm_zone->shm.name.data, name->len) == 0) &&
(data->shm_zone->shm.size != shm_size)) {
......
......@@ -32,7 +32,7 @@ static ngx_flag_t ngx_http_push_stream_has_old_me
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool);
static ngx_http_push_stream_padding_t *ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r);
void ngx_http_push_stream_websocket_reading(ngx_http_request_t *r);
......@@ -44,7 +44,8 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_requested_channel_t *requested_channels, *requested_channel;
ngx_queue_t *q;
ngx_http_push_stream_module_ctx_t *ctx;
ngx_int_t tag;
time_t if_modified_since;
......@@ -85,14 +86,14 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
//get channels ids and backtracks from path
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, ctx->temp_pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
requested_channels = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
if ((requested_channels == NULL) || ngx_queue_empty(&requested_channels->queue)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the push_stream_channels_path is required but is not set");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
}
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
if (ngx_http_push_stream_validate_channels(r, channels_ids, &status_code, &explain_error_message) == NGX_ERROR) {
if (ngx_http_push_stream_validate_channels(r, requested_channels, &status_code, &explain_error_message) == NGX_ERROR) {
return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message);
}
......@@ -104,7 +105,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
longpolling = ((cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING) || ((push_mode != NULL) && (push_mode->len == NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) && (ngx_strncasecmp(push_mode->data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) == 0)));
if (polling || longpolling) {
ngx_int_t result = ngx_http_push_stream_subscriber_polling_handler(r, channels_ids, if_modified_since, tag, last_event_id, longpolling, ctx->temp_pool);
ngx_int_t result = ngx_http_push_stream_subscriber_polling_handler(r, requested_channels, if_modified_since, tag, last_event_id, longpolling, ctx->temp_pool);
if (ctx->temp_pool != NULL) {
ngx_destroy_pool(ctx->temp_pool);
ctx->temp_pool = NULL;
......@@ -136,9 +137,10 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
// adding subscriber to channel(s) and send old messages
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(mcf, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
if (ngx_http_push_stream_subscriber_assign_channel(mcf, cf, r, requested_channel, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
......@@ -151,13 +153,14 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
static ngx_int_t
ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool)
ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channels, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_requested_channel_t *cur;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *q;
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription;
......@@ -182,17 +185,17 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_shmtx_lock(&shpool->mutex);
// check if has any message to send
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log, mcf);
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
if (channel == NULL) {
// channel not found
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (ngx_http_push_stream_has_old_messages_to_send(channel, cur->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) {
if (ngx_http_push_stream_has_old_messages_to_send(channel, requested_channel->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) {
has_message_to_send = 1;
if (channel->last_message_time > greater_message_time) {
greater_message_time = channel->last_message_time;
......@@ -220,12 +223,12 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
}
// adding subscriber to channel(s)
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if ((channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log, mcf)) == NULL) {
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
if ((channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf)) == NULL) {
// channel not found
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -234,7 +237,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, cur->id, subscription, &worker_subscriber->subscriptions_sentinel, r->connection->log);
ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, &worker_subscriber->subscriptions, r->connection->log);
}
ngx_shmtx_unlock(&shpool->mutex);
......@@ -269,15 +272,15 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK.len, 0);
}
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log, mcf);
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
if (channel == NULL) {
// channel not found
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_push_stream_send_old_messages(r, channel, cur->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id);
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id);
}
if (ctx->callback != NULL) {
......@@ -315,7 +318,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, tag, 0, -1, last_event_id);
ngx_shmtx_lock(&shpool->mutex);
result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, &subscriber->subscriptions_sentinel, r->connection->log);
result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, &subscriber->subscriptions, r->connection->log);
ngx_shmtx_unlock(&shpool->mutex);
return result;
......@@ -323,27 +326,29 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t
static ngx_int_t
ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, ngx_int_t *status_code, ngx_str_t **explain_error_message)
ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channels, ngx_int_t *status_code, ngx_str_t **explain_error_message)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_requested_channel_t *cur = channels_ids;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *q;
ngx_uint_t subscribed_channels_qtd = 0;
ngx_uint_t subscribed_wildcard_channels_qtd = 0;
ngx_flag_t is_wildcard_channel;
ngx_http_push_stream_channel_t *channel;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
// could not be ALL channel or contain wildcard
if ((ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(cur->id->data, '*') != NULL)) {
if ((ngx_memn2cmp(requested_channel->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, requested_channel->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(requested_channel->id->data, '*') != NULL)) {
*status_code = NGX_HTTP_FORBIDDEN;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE;
return NGX_ERROR;
}
// could not have a large size
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", cur->id->len);
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (requested_channel->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", requested_channel->id->len);
*status_code = NGX_HTTP_BAD_REQUEST;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE;
return NGX_ERROR;
......@@ -352,12 +357,12 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
// count subscribed normal and wildcard channels
subscribed_channels_qtd++;
is_wildcard_channel = 0;
if ((mcf->wildcard_channel_prefix.len > 0) && (ngx_strncmp(cur->id->data, mcf->wildcard_channel_prefix.data, mcf->wildcard_channel_prefix.len) == 0)) {
if ((mcf->wildcard_channel_prefix.len > 0) && (ngx_strncmp(requested_channel->id->data, mcf->wildcard_channel_prefix.data, mcf->wildcard_channel_prefix.len) == 0)) {
is_wildcard_channel = 1;
subscribed_wildcard_channels_qtd++;
}
channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log, mcf);
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
// check if channel exists when authorized_channels_only is on
if (cf->authorized_channels_only && !is_wildcard_channel && ((channel == NULL) || (channel->stored_messages == 0))) {
......@@ -383,9 +388,9 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
}
// create the channels in advance, if doesn't exist, to ensure max number of channels in the server
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf, mcf);
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, cf, mcf);
if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
*status_code = NGX_HTTP_INTERNAL_SERVER_ERROR;
......@@ -421,7 +426,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
worker_subscriber->longpolling = 0;
worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid;
ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue);
ngx_queue_init(&worker_subscriber->subscriptions);
ctx->subscriber = worker_subscriber;
// increment request reference count to keep connection open
......@@ -501,7 +506,7 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
{
ngx_flag_t old_messages = 0;
ngx_http_push_stream_msg_t *message;
ngx_queue_t *cur;
ngx_queue_t *q;
if (channel->stored_messages > 0) {
......@@ -509,9 +514,8 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
old_messages = 1;
} else if ((last_event_id != NULL) || (if_modified_since >= 0)) {
ngx_flag_t found = 0;
cur = &channel->message_queue;
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &channel->message_queue)) {
message = (ngx_http_push_stream_msg_t *) ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
for (q = ngx_queue_head(&channel->message_queue); q != ngx_queue_sentinel(&channel->message_queue); q = ngx_queue_next(q)) {
message = ngx_queue_data(q, ngx_http_push_stream_msg_t, queue);
if (message->deleted) {
break;
}
......@@ -542,16 +546,15 @@ static void
ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id)
{
ngx_http_push_stream_msg_t *message, *next_message;
ngx_queue_t *cur, *next;
ngx_queue_t *q, *next;
if (ngx_http_push_stream_has_old_messages_to_send(channel, backtrack, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) {
cur = &channel->message_queue;
if (backtrack > 0) {
ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack;
ngx_uint_t start = channel->stored_messages - qtd;
// positioning at first message, and send the others
while ((qtd > 0) && (cur = ngx_queue_next(cur)) && (cur != &channel->message_queue)) {
message = (ngx_http_push_stream_msg_t *) ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
for (q = ngx_queue_head(&channel->message_queue); (qtd > 0) && q != ngx_queue_sentinel(&channel->message_queue); q = ngx_queue_next(q)) {
message = ngx_queue_data(q, ngx_http_push_stream_msg_t, queue);
if (message->deleted) {
break;
}
......@@ -565,8 +568,8 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
}
} else if ((last_event_id != NULL) || (if_modified_since >= 0)) {
ngx_flag_t found = 0;
while ((cur = ngx_queue_next(cur)) && (cur != &channel->message_queue)) {
message = (ngx_http_push_stream_msg_t *) ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
for (q = ngx_queue_head(&channel->message_queue); q != ngx_queue_sentinel(&channel->message_queue); q = ngx_queue_next(q)) {
message = ngx_queue_data(q, ngx_http_push_stream_msg_t, queue);
if (message->deleted) {
break;
}
......@@ -584,10 +587,10 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
}
if (found && (((greater_message_time == 0) && (greater_message_tag == -1)) || (greater_message_time > message->time) || ((greater_message_time == message->time) && (greater_message_tag >= message->tag)))) {
next = ngx_queue_next(cur);
next_message = (ngx_http_push_stream_msg_t *) ngx_queue_data(next, ngx_http_push_stream_msg_t, queue);
next = ngx_queue_next(q);
next_message = ngx_queue_data(next, ngx_http_push_stream_msg_t, queue);
ngx_flag_t send_separator = 1;
if ((next == &channel->message_queue) || ((greater_message_time > 0) &&
if ((q == ngx_queue_last(&channel->message_queue)) || ((greater_message_time > 0) &&
((next_message->time > greater_message_time) || ((next_message->time == greater_message_time) && (next_message->tag > greater_message_tag))))) {
send_separator = 0;
}
......@@ -622,7 +625,7 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p
worker_sentinel->subscribers = 0;
worker_sentinel->pid = ngx_pid;
worker_sentinel->slot = ngx_process_slot;
ngx_queue_init(&worker_sentinel->subscriptions_queue);
ngx_queue_init(&worker_sentinel->subscriptions);
return worker_sentinel;
}
......@@ -647,10 +650,10 @@ ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http
}
static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log)
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(subscription->subscriber->request, ngx_http_push_stream_module);
ngx_queue_t *cur_worker;
ngx_queue_t *q;
ngx_http_push_stream_pid_queue_t *worker, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel;
......@@ -660,9 +663,8 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
return NGX_ERROR;
}
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == ngx_pid) {
worker_subscribers_sentinel = worker;
break;
......@@ -679,8 +681,8 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
channel->subscribers++; // do this only when we know everything went okay
worker_subscribers_sentinel->subscribers++;
channel->expires = ngx_time() + mcf->channel_inactivity_time;
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriptions_queue, &subscription->channel_worker_queue);
ngx_queue_insert_tail(subscriptions, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriptions, &subscription->channel_worker_queue);
subscription->channel_worker_sentinel = worker_subscribers_sentinel;
return NGX_OK;
}
......@@ -690,7 +692,7 @@ static ngx_http_push_stream_padding_t *
ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r)
{
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_padding_t *padding = cf->paddings;
ngx_queue_t *q;
ngx_str_t vv_user_agent = ngx_null_string;
if (cf->user_agent != NULL) {
......@@ -699,8 +701,9 @@ ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r)
vv_user_agent = r->headers_in.user_agent->value;
}
if ((padding != NULL) && (vv_user_agent.len > 0)) {
while ((padding = (ngx_http_push_stream_padding_t *) ngx_queue_next(&padding->queue)) != cf->paddings) {
if ((cf->paddings != NULL) && (vv_user_agent.len > 0)) {
for (q = ngx_queue_head(cf->paddings); q != ngx_queue_sentinel(cf->paddings); q = ngx_queue_next(q)) {
ngx_http_push_stream_padding_t *padding = ngx_queue_data(q, ngx_http_push_stream_padding_t, queue);
if (ngx_regex_exec(padding->agent, &vv_user_agent, NULL, 0) >= 0) {
return padding;
}
......
......@@ -40,14 +40,15 @@ static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_shm_data_t *data, ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
{
ngx_http_push_stream_msg_t *msg;
ngx_queue_t *cur;
ngx_queue_t *q;
if (max_messages == NGX_CONF_UNSET_UINT) {
return;
}
while ((cur = ngx_queue_head(&channel->message_queue)) && (cur != NULL) && (cur != &channel->message_queue) && ((channel->stored_messages > max_messages) || expired)) {
msg = (ngx_http_push_stream_msg_t *) ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
while (!ngx_queue_empty(&channel->message_queue) && ((channel->stored_messages > max_messages) || expired)) {
q = ngx_queue_head(&channel->message_queue);
msg = ngx_queue_data(q, ngx_http_push_stream_msg_t, queue);
if (expired && (msg->deleted || (msg->expires == 0) || (msg->expires > ngx_time()) || (msg->workers_ref_count > 0))) {
break;
......@@ -65,10 +66,10 @@ static void
ngx_http_push_stream_delete_channels(void)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_delete_channels_data(data);
}
}
......@@ -81,26 +82,25 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker, *cur;
ngx_queue_t *prev_channel, *cur_channel = &data->channels_to_delete;
ngx_queue_t *q;
while ((cur_channel = ngx_queue_next(cur_channel)) && (cur_channel != NULL) && (cur_channel != &data->channels_to_delete)) {
channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue);
for (q = ngx_queue_head(&data->channels_to_delete); q != ngx_queue_sentinel(&data->channels_to_delete); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
if (channel->queue_sentinel != &data->channels_to_delete) {
cur_channel = &data->channels_to_delete;
q = &data->channels_to_delete;
continue;
}
// remove subscribers if any
if (channel->subscribers > 0) {
cur_worker = &channel->workers_with_subscribers;
// find the current worker
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
for (cur_worker = ngx_queue_head(&channel->workers_with_subscribers); cur_worker != ngx_queue_sentinel(&channel->workers_with_subscribers); cur_worker = ngx_queue_next(cur_worker)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == ngx_pid) {
// to each subscription of this channel in this worker
while ((cur = ngx_queue_head(&worker->subscriptions_queue)) != &worker->subscriptions_queue) {
while (!ngx_queue_empty(&worker->subscriptions)) {
cur = ngx_queue_head(&worker->subscriptions);
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, channel_worker_queue);
ngx_http_push_stream_subscriber_t *subscriber = subscription->subscriber;
......@@ -124,7 +124,7 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
// subscriber does not have any other subscription, the connection may be closed
if (subscriber->longpolling || ngx_queue_empty(&subscriber->subscriptions_sentinel.queue)) {
if (subscriber->longpolling || ngx_queue_empty(&subscriber->subscriptions)) {
ngx_http_push_stream_send_response_finalize(subscriber->request);
}
}
......@@ -134,14 +134,12 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
}
ngx_shmtx_lock(&shpool->mutex);
while (((cur_channel = ngx_queue_next(cur_channel)) != &data->channels_to_delete) && (prev_channel = ngx_queue_prev(cur_channel))) {
channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue);
for (q = ngx_queue_head(&data->channels_to_delete); q != ngx_queue_sentinel(&data->channels_to_delete);) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
q = ngx_queue_next(q);
// channel has not subscribers and can be released
if (channel->subscribers == 0) {
// go back one node on queue, since the current node will be removed
cur_channel = prev_channel;
channel->expires = ngx_time() + NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL;
// move the channel to trash queue
......@@ -166,10 +164,10 @@ static ngx_inline void
ngx_http_push_stream_cleanup_shutting_down_worker(void)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_cleanup_shutting_down_worker_data(data);
}
global_data->pid[ngx_process_slot] = -1;
......@@ -180,9 +178,11 @@ static ngx_inline void
ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_queue_t *q;
while (!ngx_queue_empty(&thisworker_data->subscribers_queue)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(ngx_queue_head(&thisworker_data->subscribers_queue), ngx_http_push_stream_subscriber_t, worker_queue);
q = ngx_queue_head(&thisworker_data->subscribers_queue);
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(q, ngx_http_push_stream_subscriber_t, worker_queue);
if (subscriber->longpolling) {
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(subscriber->request);
} else {
......@@ -236,8 +236,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
{
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *shm_data = mcf->shm_data;
ngx_http_push_stream_template_queue_t *sentinel = &mcf->msg_templates;
ngx_http_push_stream_template_queue_t *cur = sentinel;
ngx_queue_t *q;
ngx_http_push_stream_msg_t *msg;
int i = 0;
......@@ -285,17 +284,19 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
return NULL;
}
while ((cur = (ngx_http_push_stream_template_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
for (q = ngx_queue_head(&mcf->msg_templates); q != ngx_queue_sentinel(&mcf->msg_templates); q = ngx_queue_next(q)) {
ngx_http_push_stream_template_queue_t *cur = ngx_queue_data(q, ngx_http_push_stream_template_queue_t, queue);
ngx_str_t *aux = NULL;
if (cur->eventsource) {
ngx_http_push_stream_line_t *lines, *cur_line;
ngx_http_push_stream_line_t *cur_line;
ngx_queue_t *lines, *q_line;
if ((lines = ngx_http_push_stream_split_by_crlf(&msg->raw, temp_pool)) == NULL) {
return NULL;
}
cur_line = lines;
while ((cur_line = (ngx_http_push_stream_line_t *) ngx_queue_next(&cur_line->queue)) != lines) {
for (q_line = ngx_queue_head(lines); q_line != ngx_queue_sentinel(lines); q_line = ngx_queue_next(q_line )) {
cur_line = ngx_queue_data(q_line , ngx_http_push_stream_line_t, queue);
if ((cur_line->line = ngx_http_push_stream_format_message(channel, msg, cur_line->line, cur->template, temp_pool)) == NULL) {
break;
}
......@@ -850,7 +851,7 @@ ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_s
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_queue_t *q;
ngx_shmtx_lock(&shpool->mutex);
......@@ -878,13 +879,11 @@ ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_s
}
// send signal to each worker with subscriber to this channel
cur_worker = &channel->workers_with_subscribers;
if (ngx_queue_empty(&channel->workers_with_subscribers)) {
ngx_http_push_stream_alert_worker_delete_channel(ngx_pid, ngx_process_slot, ngx_cycle->log);
} else {
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
ngx_http_push_stream_alert_worker_delete_channel(worker->pid, worker->slot, ngx_cycle->log);
}
}
......@@ -899,10 +898,10 @@ static void
ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_flag_t force)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(data, force);
}
}
......@@ -913,20 +912,20 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_p
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *prev, *cur = &data->channels_queue;
ngx_queue_t *q;
ngx_http_push_stream_collect_expired_messages_data(data, force);
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &data->channels_queue) && (prev = ngx_queue_prev(cur))) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
if (channel->queue_sentinel != &data->channels_queue) {
cur = &data->channels_queue;
q = &data->channels_queue;
continue;
}
if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->expires < ngx_time())) {
// go back one node on queue, since the current node will be removed
cur = prev;
q = ngx_queue_prev(q);
ngx_shmtx_lock(&shpool->mutex);
if (!channel->deleted) {
......@@ -953,12 +952,12 @@ ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *cur = &data->channels_queue;
ngx_queue_t *q;
ngx_shmtx_lock(&shpool->mutex);
while ((cur = ngx_queue_next(cur)) != &data->channels_queue) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
ngx_http_push_stream_ensure_qtd_of_messages_locked(data, channel, (force) ? 0 : channel->stored_messages, 1);
}
......@@ -973,7 +972,8 @@ ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_http_push_stream
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *cur;
while ((cur = ngx_queue_head(&data->channels_trash)) != &data->channels_trash) {
while (!ngx_queue_empty(&data->channels_trash)) {
cur = ngx_queue_head(&data->channels_trash);
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
if ((ngx_time() > channel->expires) || force) {
......@@ -992,10 +992,11 @@ nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
{
// delete the worker-subscriber queue
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_queue_t *cur;
while ((cur_worker = ngx_queue_head(&channel->workers_with_subscribers)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
while (!ngx_queue_empty(&channel->workers_with_subscribers)) {
cur = ngx_queue_head(&channel->workers_with_subscribers);
worker = ngx_queue_data(cur, ngx_http_push_stream_pid_queue_t, queue);
ngx_queue_remove(&worker->queue);
ngx_slab_free_locked(shpool, worker);
}
......@@ -1010,10 +1011,10 @@ static ngx_int_t
ngx_http_push_stream_memory_cleanup(void)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_delete_channels_data(data);
ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(data, 0);
......@@ -1028,10 +1029,10 @@ static ngx_int_t
ngx_http_push_stream_buffer_cleanup(void)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_collect_expired_messages_data(data, 0);
}
......@@ -1043,10 +1044,10 @@ static ngx_int_t
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_queue_t *q;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(data, 0);
}
......@@ -1062,7 +1063,8 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(ngx_http_
ngx_queue_t *cur;
ngx_shmtx_lock(&shpool->mutex);
while ((cur = ngx_queue_head(&data->messages_trash)) && (cur != NULL) && (cur != &data->messages_trash)) {
while (!ngx_queue_empty(&data->messages_trash)) {
cur = ngx_queue_head(&data->messages_trash);
message = ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
if (force || ((message->workers_ref_count <= 0) && (ngx_time() > message->expires))) {
......@@ -1388,18 +1390,18 @@ static void
ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(worker_subscriber->request, ngx_http_push_stream_module);
ngx_http_push_stream_subscription_t *cur, *sentinel;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_queue_t *cur;
sentinel = &worker_subscriber->subscriptions_sentinel;
while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel_worker_sentinel->subscribers);
ngx_queue_remove(&cur->channel_worker_queue);
ngx_queue_remove(&cur->queue);
while (!ngx_queue_empty(&worker_subscriber->subscriptions)) {
cur = ngx_queue_head(&worker_subscriber->subscriptions);
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, queue);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(subscription->channel->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(subscription->channel_worker_sentinel->subscribers);
ngx_queue_remove(&subscription->channel_worker_queue);
ngx_queue_remove(&subscription->queue);
}
ngx_queue_init(&sentinel->queue);
ngx_queue_remove(&worker_subscriber->worker_queue);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers);
......@@ -1535,7 +1537,7 @@ ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len)
static ngx_http_push_stream_line_t *
ngx_http_push_stream_add_line_to_queue(ngx_http_push_stream_line_t *sentinel, u_char *text, u_int len, ngx_pool_t *temp_pool)
ngx_http_push_stream_add_line_to_queue(ngx_queue_t *lines, u_char *text, u_int len, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_line_t *cur = NULL;
ngx_str_t *line;
......@@ -1547,23 +1549,23 @@ ngx_http_push_stream_add_line_to_queue(ngx_http_push_stream_line_t *sentinel, u_
}
cur->line = line;
ngx_memcpy(cur->line->data, text, len);
ngx_queue_insert_tail(&sentinel->queue, &cur->queue);
ngx_queue_insert_tail(lines, &cur->queue);
}
return cur;
}
static ngx_http_push_stream_line_t *
static ngx_queue_t *
ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_line_t *sentinel = NULL;
ngx_queue_t *lines = NULL;
u_char *pos = NULL, *start = NULL, *crlf_pos, *cr_pos, *lf_pos;
u_int step = 0, len = 0;
if ((sentinel = ngx_pcalloc(temp_pool, sizeof(ngx_http_push_stream_line_t))) == NULL) {
if ((lines = ngx_pcalloc(temp_pool, sizeof(ngx_queue_t))) == NULL) {
return NULL;
}
ngx_queue_init(&sentinel->queue);
ngx_queue_init(lines);
start = msg->data;
do {
......@@ -1585,7 +1587,7 @@ ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool)
if (pos != NULL) {
len = pos - start;
if ((len > 0) && (ngx_http_push_stream_add_line_to_queue(sentinel, start, len, temp_pool) == NULL)) {
if ((len > 0) && (ngx_http_push_stream_add_line_to_queue(lines, start, len, temp_pool) == NULL)) {
return NULL;
}
start = pos + step;
......@@ -1594,26 +1596,28 @@ ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool)
} while (pos != NULL);
len = (msg->data + msg->len) - start;
if ((len > 0) && (ngx_http_push_stream_add_line_to_queue(sentinel, start, len, temp_pool) == NULL)) {
if ((len > 0) && (ngx_http_push_stream_add_line_to_queue(lines, start, len, temp_pool) == NULL)) {
return NULL;
}
return sentinel;
return lines;
}
static ngx_str_t *
ngx_http_push_stream_join_with_crlf(ngx_http_push_stream_line_t *lines, ngx_pool_t *temp_pool)
ngx_http_push_stream_join_with_crlf(ngx_queue_t *lines, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_line_t *cur;
ngx_str_t *result = NULL, *tmp = &NGX_HTTP_PUSH_STREAM_EMPTY;
ngx_queue_t *q;
if (ngx_queue_empty(&lines->queue)) {
if (ngx_queue_empty(lines)) {
return &NGX_HTTP_PUSH_STREAM_EMPTY;
}
cur = lines;
while ((cur = (ngx_http_push_stream_line_t *) ngx_queue_next(&cur->queue)) != lines) {
for (q = ngx_queue_head(lines); q != ngx_queue_sentinel(lines); q = ngx_queue_next(q)) {
cur = ngx_queue_data(q, ngx_http_push_stream_line_t, queue);
if ((cur->line == NULL) || (result = ngx_http_push_stream_create_str(temp_pool, tmp->len + cur->line->len)) == NULL) {
return NULL;
}
......@@ -1631,13 +1635,14 @@ ngx_http_push_stream_join_with_crlf(ngx_http_push_stream_line_t *lines, ngx_pool
static ngx_str_t *
ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_line_t *lines, *cur;
ngx_http_push_stream_line_t *cur;
ngx_str_t *result = NULL;
ngx_queue_t *lines, *q;
lines = ngx_http_push_stream_split_by_crlf(text, temp_pool);
if (lines != NULL) {
cur = lines;
while ((cur = (ngx_http_push_stream_line_t *) ngx_queue_next(&cur->queue)) != lines) {
for (q = ngx_queue_head(lines); q != ngx_queue_sentinel(lines); q = ngx_queue_next(q)) {
cur = ngx_queue_data(q, ngx_http_push_stream_line_t, queue);
cur->line = ngx_http_push_stream_str_replace(message_template, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT, cur->line, 0, temp_pool);
if (cur->line == NULL) {
return NULL;
......@@ -1844,22 +1849,23 @@ ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r)
}
static ngx_http_push_stream_padding_t *
static ngx_queue_t *
ngx_http_push_stream_parse_paddings(ngx_conf_t *cf, ngx_str_t *paddings_by_user_agent)
{
ngx_int_t rc;
u_char errstr[NGX_MAX_CONF_ERRSTR];
ngx_regex_compile_t padding_rc, *agent_rc;
int captures[12];
ngx_http_push_stream_padding_t *sentinel, *padding;
ngx_queue_t *paddings;
ngx_http_push_stream_padding_t *padding;
ngx_str_t aux, *agent;
if ((sentinel = ngx_palloc(cf->pool, sizeof(ngx_http_push_stream_padding_t))) == NULL) {
if ((paddings = ngx_palloc(cf->pool, sizeof(ngx_queue_t))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to save padding info");
return NULL;
}
ngx_queue_init(&sentinel->queue);
ngx_queue_init(paddings);
ngx_memzero(&padding_rc, sizeof(ngx_regex_compile_t));
......@@ -1923,7 +1929,7 @@ ngx_http_push_stream_parse_paddings(ngx_conf_t *cf, ngx_str_t *paddings_by_user
padding->header_min_len = ngx_atoi(aux.data + captures[4], captures[5] - captures[4]);
padding->message_min_len = ngx_atoi(aux.data + captures[6], captures[7] - captures[6]);
ngx_queue_insert_tail(&sentinel->queue, &padding->queue);
ngx_queue_insert_tail(paddings, &padding->queue);
ngx_conf_log_error(NGX_LOG_INFO, cf, 0, "push stream module: padding detected %V, header_min_len %d, message_min_len %d", &agent_rc->pattern, padding->header_min_len, padding->message_min_len);
......@@ -1932,7 +1938,7 @@ ngx_http_push_stream_parse_paddings(ngx_conf_t *cf, ngx_str_t *paddings_by_user
} while (aux.data < (paddings_by_user_agent->data + paddings_by_user_agent->len));
return sentinel;
return paddings;
}
......@@ -2082,7 +2088,7 @@ ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_poo
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_str_t vv_channels_path = ngx_null_string;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_requested_channel_t *requested_channels, *requested_channel;
ngx_str_t aux;
int captures[15];
ngx_int_t n;
......@@ -2092,38 +2098,38 @@ ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_poo
return NULL;
}
if ((channels_ids = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channels_ids queue");
if ((requested_channels = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for requested_channels queue");
return NULL;
}
ngx_queue_init(&channels_ids->queue);
ngx_queue_init(&requested_channels->queue);
// doing the parser of given channel path
aux.data = vv_channels_path.data;
do {
aux.len = vv_channels_path.len - (aux.data - vv_channels_path.data);
if ((n = ngx_regex_exec(mcf->backtrack_parser_regex, &aux, captures, 15)) >= 0) {
if ((cur = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
if ((requested_channel = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id item");
return NULL;
}
if ((cur->id = ngx_http_push_stream_create_str(pool, captures[0])) == NULL) {
if ((requested_channel->id = ngx_http_push_stream_create_str(pool, captures[0])) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id string");
return NULL;
}
ngx_memcpy(cur->id->data, aux.data, captures[0]);
cur->backtrack_messages = 0;
ngx_memcpy(requested_channel->id->data, aux.data, captures[0]);
requested_channel->backtrack_messages = 0;
if (captures[7] > captures[6]) {
cur->backtrack_messages = ngx_atoi(aux.data + captures[6], captures[7] - captures[6]);
requested_channel->backtrack_messages = ngx_atoi(aux.data + captures[6], captures[7] - captures[6]);
}
ngx_queue_insert_tail(&channels_ids->queue, &cur->queue);
ngx_queue_insert_tail(&requested_channels->queue, &requested_channel->queue);
aux.data = aux.data + captures[1];
}
} while ((n != NGX_REGEX_NO_MATCHED) && (aux.data < (vv_channels_path.data + vv_channels_path.len)));
return channels_ids;
return requested_channels;
}
......@@ -40,7 +40,8 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_requested_channel_t *requested_channels, *requested_channel;
ngx_queue_t *q;
ngx_http_push_stream_module_ctx_t *ctx;
ngx_int_t tag;
time_t if_modified_since;
......@@ -105,14 +106,14 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_http_push_stream_send_only_added_headers(r);
//get channels ids and backtracks from path
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, ctx->temp_pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
requested_channels = ngx_http_push_stream_parse_channels_ids_from_path(r, ctx->temp_pool);
if ((requested_channels == NULL) || ngx_queue_empty(&requested_channels->queue)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the push_stream_channels_path is required but is not set");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
}
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
if (ngx_http_push_stream_validate_channels(r, channels_ids, &status_code, &explain_error_message) == NGX_ERROR) {
if (ngx_http_push_stream_validate_channels(r, requested_channels, &status_code, &explain_error_message) == NGX_ERROR) {
return ngx_http_push_stream_send_websocket_close_frame(r, status_code, explain_error_message);
}
......@@ -139,9 +140,9 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
}
// adding subscriber to channel(s) and send backtrack messages
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(mcf, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
if (ngx_http_push_stream_subscriber_assign_channel(mcf, cf, r, requested_channel, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
}
}
......@@ -191,8 +192,8 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ngx_event_t *rev;
ngx_connection_t *c;
uint64_t i;
ngx_queue_t *cur = NULL;
ngx_buf_t buf;
ngx_queue_t *q;
ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, ctx->frame->last, 8);
......@@ -289,9 +290,8 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
}
}
cur = &ctx->subscriber->subscriptions_sentinel.queue;
while ((cur = ngx_queue_next(cur)) != &ctx->subscriber->subscriptions_sentinel.queue) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, queue);
for (q = ngx_queue_head(&ctx->subscriber->subscriptions); q != ngx_queue_sentinel(&ctx->subscriber->subscriptions); q = ngx_queue_next(q)) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, queue);
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, ctx->temp_pool) == NULL) {
ngx_http_finalize_request(r, NGX_OK);
return;
......
......@@ -72,28 +72,6 @@ ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbt
return NULL;
}
static void
ngx_http_push_stream_initialize_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_channel_t *channel)
{
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
channel->channel_deleted_message = NULL;
channel->last_message_id = 0;
channel->last_message_time = 0;
channel->last_message_tag = 0;
channel->stored_messages = 0;
channel->subscribers = 0;
channel->deleted = 0;
channel->expires = ngx_time() + mcf->channel_inactivity_time;
ngx_queue_init(&channel->message_queue);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->tree, &channel->node);
ngx_queue_insert_tail(&data->channels_queue, &channel->queue);
channel->queue_sentinel = &data->channels_queue;
(channel->wildcard) ? data->wildcard_channels++ : data->channels++;
}
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
......@@ -164,12 +142,24 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
channel->id.data[channel->id.len] = '\0';
channel->wildcard = is_wildcard_channel;
channel->channel_deleted_message = NULL;
channel->last_message_id = 0;
channel->last_message_time = 0;
channel->last_message_tag = 0;
channel->stored_messages = 0;
channel->subscribers = 0;
channel->deleted = 0;
channel->expires = ngx_time() + mcf->channel_inactivity_time;
ngx_http_push_stream_initialize_channel(mcf, channel);
// initialize workers_with_subscribers queues only when a channel is created
ngx_queue_init(&channel->message_queue);
ngx_queue_init(&channel->workers_with_subscribers);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->tree, &channel->node);
ngx_queue_insert_tail(&data->channels_queue, &channel->queue);
channel->queue_sentinel = &data->channels_queue;
(channel->wildcard) ? data->wildcard_channels++ : data->channels++;
ngx_shmtx_unlock(&shpool->mutex);
return channel;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment