Commit 47146111 authored by Wandenberg's avatar Wandenberg

replace some channels tree operations by actions on the channels list

parent 91057cbc
...@@ -107,40 +107,6 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -107,40 +107,6 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK); return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK);
} }
static void
ngx_http_push_stream_rbtree_walker_channel_info_locked(ngx_rbtree_t *tree, ngx_pool_t *pool, ngx_rbtree_node_t *node, ngx_queue_t *queue_channel_info, ngx_str_t *prefix)
{
ngx_rbtree_node_t *sentinel = tree->sentinel;
if (node != sentinel) {
ngx_http_push_stream_channel_t *channel = (ngx_http_push_stream_channel_t *) node;
ngx_http_push_stream_channel_info_t *channel_info;
if(!prefix || (ngx_strncmp(channel->id.data, prefix->data, prefix->len) == 0)) {
if ((channel_info = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_channel_info_t))) == NULL) {
return;
}
channel_info->id.data = channel->id.data;
channel_info->id.len = channel->id.len;
channel_info->published_messages = channel->last_message_id;
channel_info->stored_messages = channel->stored_messages;
channel_info->subscribers = channel->subscribers;
ngx_queue_insert_tail(queue_channel_info, &channel_info->queue);
}
if (node->left != NULL) {
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, pool, node->left, queue_channel_info, prefix);
}
if (node->right != NULL) {
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, pool, node->right, queue_channel_info, prefix);
}
}
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_response_channels_info(ngx_http_request_t *r, ngx_queue_t *queue_channel_info) { ngx_http_push_stream_send_response_channels_info(ngx_http_request_t *r, ngx_queue_t *queue_channel_info) {
...@@ -234,11 +200,31 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -234,11 +200,31 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_queue_t queue_channel_info; ngx_queue_t queue_channel_info;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_queue_t *cur = &data->channels_queue;
ngx_http_push_stream_channel_t *channel;
ngx_queue_init(&queue_channel_info); ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, r->pool, data->tree.root, &queue_channel_info, prefix); while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &data->channels_queue)) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
ngx_http_push_stream_channel_info_t *channel_info;
if(!prefix || (ngx_strncmp(channel->id.data, prefix->data, prefix->len) == 0)) {
if ((channel_info = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_channel_info_t))) != NULL) {
channel_info->id.data = channel->id.data;
channel_info->id.len = channel->id.len;
channel_info->published_messages = channel->last_message_id;
channel_info->stored_messages = channel->stored_messages;
channel_info->subscribers = channel->subscribers;
ngx_queue_insert_tail(&queue_channel_info, &channel_info->queue);
}
}
}
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return ngx_http_push_stream_send_response_channels_info(r, &queue_channel_info); return ngx_http_push_stream_send_response_channels_info(r, &queue_channel_info);
......
...@@ -108,15 +108,6 @@ ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle) ...@@ -108,15 +108,6 @@ ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle)
} }
static ngx_int_t
ngx_http_push_stream_reset_channel_subscribers_count_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
channel->subscribers = 0;
return NGX_OK;
}
// will be called many times // will be called many times
static ngx_int_t static ngx_int_t
ngx_http_push_stream_ipc_init_worker() ngx_http_push_stream_ipc_init_worker()
...@@ -124,6 +115,8 @@ ngx_http_push_stream_ipc_init_worker() ...@@ -124,6 +115,8 @@ ngx_http_push_stream_ipc_init_worker()
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
int i; int i;
ngx_queue_t *cur = &data->channels_queue;
ngx_http_push_stream_channel_t *channel;
// cleanning old content if worker die and another one is set on same slot // cleanning old content if worker die and another one is set on same slot
ngx_http_push_stream_clean_worker_data(); ngx_http_push_stream_clean_worker_data();
...@@ -134,7 +127,10 @@ ngx_http_push_stream_ipc_init_worker() ...@@ -134,7 +127,10 @@ ngx_http_push_stream_ipc_init_worker()
data->ipc[ngx_process_slot].startup = ngx_time(); data->ipc[ngx_process_slot].startup = ngx_time();
data->subscribers = 0; data->subscribers = 0;
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked); while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &data->channels_queue)) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
channel->subscribers = 0;
}
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -188,6 +184,8 @@ ngx_http_push_stream_clean_worker_data() ...@@ -188,6 +184,8 @@ ngx_http_push_stream_clean_worker_data()
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_queue_t *cur = &data->channels_queue;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *cur_msg; ngx_queue_t *cur_msg;
...@@ -198,7 +196,10 @@ ngx_http_push_stream_clean_worker_data() ...@@ -198,7 +196,10 @@ ngx_http_push_stream_clean_worker_data()
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_queue); ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_queue);
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_unsubscribe_worker_locked); while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &data->channels_queue)) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
ngx_http_push_stream_unsubscribe_worker_locked(channel, shpool);
}
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
......
...@@ -206,28 +206,6 @@ ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_ ...@@ -206,28 +206,6 @@ ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_
} }
#define ngx_http_push_stream_walk_rbtree(apply) \
ngx_http_push_stream_rbtree_walker(&((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->tree, (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr, apply, ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->tree.root)
static void
ngx_http_push_stream_rbtree_walker(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_int_t (*apply) (ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool), ngx_rbtree_node_t *node)
{
ngx_rbtree_node_t *sentinel = tree->sentinel;
if ((node != NULL) && (node != sentinel)) {
apply((ngx_http_push_stream_channel_t *) node, shpool);
if (node->left != NULL) {
ngx_http_push_stream_rbtree_walker(tree, shpool, apply, node->left);
}
if (node->right != NULL) {
ngx_http_push_stream_rbtree_walker(tree, shpool, apply, node->right);
}
}
}
static void static void
ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel) ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment