Commit 6e0e7946 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

double check if channel is available

parent 87170726
...@@ -221,8 +221,8 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in ...@@ -221,8 +221,8 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in
static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_worker_subscriber_t *worker_subscriber); static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
u_char * ngx_http_push_stream_append_crlf(const ngx_str_t *str, ngx_pool_t *pool); u_char * ngx_http_push_stream_append_crlf(const ngx_str_t *str, ngx_pool_t *pool);
static void ngx_http_push_stream_collect_expired_messages(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force); static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force); static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force); static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired); static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
......
...@@ -210,7 +210,7 @@ ngx_http_push_stream_exit_master(ngx_cycle_t *cycle) ...@@ -210,7 +210,7 @@ ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
// destroy channel tree in shared memory // destroy channel tree in shared memory
ngx_http_push_stream_collect_expired_messages_and_empty_channels(&data->tree, shpool, data->tree.root, 1); ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, data->tree.root, 1);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(1); ngx_http_push_stream_free_memory_of_expired_messages_and_channels(1);
} }
......
...@@ -186,28 +186,24 @@ ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t * ...@@ -186,28 +186,24 @@ ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *
static void static void
ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force) ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_rbtree_node_t *sentinel;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
sentinel = tree->sentinel;
channel = (ngx_http_push_stream_channel_t *) node; channel = (ngx_http_push_stream_channel_t *) node;
if ((!channel->deleted) && (&channel->node != sentinel)) { if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel)) {
if ((!channel->deleted) && (channel->node.left != NULL)) { if ((channel != NULL) && (channel->deleted == 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(tree, shpool, node->left, force); ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, node->left, force);
} }
if ((!channel->deleted) && (channel->node.right != NULL)) { if ((channel != NULL) && (channel->deleted == 0) && (channel->node.right != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(tree, shpool, node->right, force); ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, node->right, force);
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
if ((channel != NULL) && (!channel->deleted)) { if ((channel != NULL) && (channel->deleted == 0)) {
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1); ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
...@@ -229,27 +225,24 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *t ...@@ -229,27 +225,24 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *t
static void static void
ngx_http_push_stream_collect_expired_messages(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force) ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force)
{ {
ngx_rbtree_node_t *sentinel;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
sentinel = tree->sentinel;
channel = (ngx_http_push_stream_channel_t *) node; channel = (ngx_http_push_stream_channel_t *) node;
if ((!channel->deleted) && (&channel->node != sentinel)) { if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel)) {
if ((!channel->deleted) && (channel->node.left != NULL)) { if ((channel != NULL) && (channel->deleted == 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages(tree, shpool, node->left, force); ngx_http_push_stream_collect_expired_messages(data, shpool, node->left, force);
} }
if ((!channel->deleted) && (channel->node.right != NULL)) { if ((channel != NULL) && (channel->deleted == 0) && (channel->node.right != NULL)) {
ngx_http_push_stream_collect_expired_messages(tree, shpool, node->right, force); ngx_http_push_stream_collect_expired_messages(data, shpool, node->right, force);
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
if ((channel != NULL) && (!channel->deleted)) { if ((channel != NULL) && (channel->deleted == 0)) {
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1); ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
} }
...@@ -307,7 +300,7 @@ ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_co ...@@ -307,7 +300,7 @@ ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_co
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_collect_expired_messages_and_empty_channels(&data->tree, shpool, data->tree.root, 0); ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, data->tree.root, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0); ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
return NGX_OK; return NGX_OK;
...@@ -320,7 +313,7 @@ ngx_http_push_stream_buffer_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_con ...@@ -320,7 +313,7 @@ ngx_http_push_stream_buffer_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_con
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_collect_expired_messages(&data->tree, shpool, data->tree.root, 0); ngx_http_push_stream_collect_expired_messages(data, shpool, data->tree.root, 0);
return NGX_OK; return NGX_OK;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment