Commit 9666f079 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

avoiding counters to overlap on decrement

parent 4ab86b78
...@@ -234,4 +234,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET"); ...@@ -234,4 +234,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET");
return; \ return; \
} }
#define NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(counter) \
(counter = (counter > 1) ? counter - 1 : 0)
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_H_ */
...@@ -40,7 +40,7 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_ ...@@ -40,7 +40,7 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
break; break;
} }
channel->stored_messages--; NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->stored_messages);
ngx_queue_remove(&msg->queue); ngx_queue_remove(&msg->queue);
ngx_http_push_stream_mark_message_to_delete_locked(msg); ngx_http_push_stream_mark_message_to_delete_locked(msg);
} }
...@@ -86,7 +86,7 @@ ngx_http_push_stream_delete_worker_channel(void) ...@@ -86,7 +86,7 @@ ngx_http_push_stream_delete_worker_channel(void)
cur_subscription = &worker_subscriber->subscriptions_sentinel; cur_subscription = &worker_subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &worker_subscriber->subscriptions_sentinel) { while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &worker_subscriber->subscriptions_sentinel) {
if (cur_subscription->channel == channel) { if (cur_subscription->channel == channel) {
channel->subscribers--; NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
// remove the reference from subscription for channel // remove the reference from subscription for channel
ngx_queue_remove(&cur_subscription->queue); ngx_queue_remove(&cur_subscription->queue);
...@@ -317,7 +317,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) { ...@@ -317,7 +317,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) {
if (channel != NULL) { if (channel != NULL) {
// remove channel from tree // remove channel from tree
channel->deleted = 1; channel->deleted = 1;
(channel->broadcast) ? data->broadcast_channels-- : data->channels--; (channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
// move the channel to unrecoverable tree // move the channel to unrecoverable tree
ngx_rbtree_delete(&data->tree, (ngx_rbtree_node_t *) channel); ngx_rbtree_delete(&data->tree, (ngx_rbtree_node_t *) channel);
...@@ -372,7 +372,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s ...@@ -372,7 +372,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
if ((channel->stored_messages == 0) && (channel->subscribers == 0)) { if ((channel->stored_messages == 0) && (channel->subscribers == 0)) {
channel->deleted = 1; channel->deleted = 1;
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->memory_cleanup_timeout; channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->memory_cleanup_timeout;
(channel->broadcast) ? data->broadcast_channels-- : data->channels--; (channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
// move the channel to trash tree // move the channel to trash tree
ngx_rbtree_delete(&data->tree, (ngx_rbtree_node_t *) channel); ngx_rbtree_delete(&data->tree, (ngx_rbtree_node_t *) channel);
...@@ -762,7 +762,7 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worke ...@@ -762,7 +762,7 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worke
sentinel = &worker_subscriber->subscriptions_sentinel; sentinel = &worker_subscriber->subscriptions_sentinel;
while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
cur->channel->subscribers--; NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers);
ngx_queue_remove(&cur->subscriber->queue); ngx_queue_remove(&cur->subscriber->queue);
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
} }
...@@ -770,8 +770,8 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worke ...@@ -770,8 +770,8 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worke
ngx_queue_remove(&worker_subscriber->queue); ngx_queue_remove(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->queue); ngx_queue_init(&worker_subscriber->queue);
worker_subscriber->clndata->worker_subscriber = NULL; worker_subscriber->clndata->worker_subscriber = NULL;
data->subscribers--; NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers);
(data->ipc + ngx_process_slot)->subscribers--; NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers);
} }
......
...@@ -80,6 +80,9 @@ ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbt ...@@ -80,6 +80,9 @@ ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbt
static void static void
ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel) ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
channel->last_message_id = 0;
channel->stored_messages = 0; channel->stored_messages = 0;
channel->subscribers = 0; channel->subscribers = 0;
channel->deleted = 0; channel->deleted = 0;
...@@ -92,6 +95,8 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel) ...@@ -92,6 +95,8 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
channel->message_queue.deleted = 0; channel->message_queue.deleted = 0;
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len); channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(channel->broadcast) ? data->broadcast_channels++ : data->channels++;
} }
static ngx_http_push_stream_channel_t * static ngx_http_push_stream_channel_t *
...@@ -123,8 +128,6 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log) ...@@ -123,8 +128,6 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
// move the channel back to main tree (recover from trash) // move the channel back to main tree (recover from trash)
ngx_rbtree_delete(&data->channels_to_delete, (ngx_rbtree_node_t *) channel); ngx_rbtree_delete(&data->channels_to_delete, (ngx_rbtree_node_t *) channel);
ngx_http_push_stream_initialize_channel(channel); ngx_http_push_stream_initialize_channel(channel);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(channel->broadcast) ? data->broadcast_channels++ : data->channels++;
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -158,8 +161,6 @@ ngx_http_push_stream_find_channel_locked(ngx_str_t *id, ngx_log_t *log) ...@@ -158,8 +161,6 @@ ngx_http_push_stream_find_channel_locked(ngx_str_t *id, ngx_log_t *log)
// move the channel back to main tree (recover from trash) // move the channel back to main tree (recover from trash)
ngx_rbtree_delete(&data->channels_to_delete, (ngx_rbtree_node_t *) channel); ngx_rbtree_delete(&data->channels_to_delete, (ngx_rbtree_node_t *) channel);
ngx_http_push_stream_initialize_channel(channel); ngx_http_push_stream_initialize_channel(channel);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(channel->broadcast) ? data->broadcast_channels++ : data->channels++;
} }
} }
...@@ -215,14 +216,10 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st ...@@ -215,14 +216,10 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
channel->id.len = id->len; channel->id.len = id->len;
ngx_memcpy(channel->id.data, id->data, channel->id.len); ngx_memcpy(channel->id.data, id->data, channel->id.len);
channel->last_message_id = 0;
channel->broadcast = is_broadcast_channel; channel->broadcast = is_broadcast_channel;
ngx_http_push_stream_initialize_channel(channel); ngx_http_push_stream_initialize_channel(channel);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(is_broadcast_channel) ? data->broadcast_channels++ : data->channels++;
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return channel; return channel;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment