Commit 152326ab authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding the channel to a queue to be easier, faster and stable go through all...

adding the channel to a queue to be easier, faster and stable go through all channels when looking for those ready to be discarded
parent baed4ca7
......@@ -137,6 +137,7 @@ typedef struct {
typedef struct {
ngx_rbtree_node_t node; // this MUST be first
ngx_queue_t queue;
ngx_queue_t *queue_sentinel;
ngx_str_t id;
ngx_uint_t last_message_id;
time_t last_message_time;
......@@ -214,6 +215,8 @@ typedef struct {
ngx_uint_t subscribers; // # of subscribers in all channels
ngx_http_push_stream_msg_t messages_to_delete;
ngx_rbtree_t channels_to_delete;
ngx_queue_t channels_queue;
ngx_queue_t channels_to_delete_queue;
ngx_queue_t unrecoverable_channels;
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup;
......
......@@ -265,8 +265,8 @@ static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, ui
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_free_worker_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
......
......@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.4");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("7e70cc1cd5f5070d37ca23d7961b4afafebe3668");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("baed4ca743e3da679f2ad90bdf02ac4444d72643");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
......@@ -314,7 +314,7 @@ ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
// destroy channel tree in shared memory
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, data->tree.root, 1);
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, 1);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(1);
}
......@@ -924,6 +924,8 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
ngx_rbtree_init(&d->channels_to_delete, remove_sentinel, ngx_http_push_stream_rbtree_insert);
ngx_queue_init(&d->channels_queue);
ngx_queue_init(&d->channels_to_delete_queue);
ngx_queue_init(&d->unrecoverable_channels);
// create ping message
......
......@@ -129,6 +129,7 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data
if (channel->expires == 0) {
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
} else if (ngx_time() > channel->expires) {
// go back one node on queue, since the current node will be removed
cur_channel = prev_channel;
ngx_queue_remove(&channel->queue);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel);
......@@ -233,6 +234,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
while ((cur = (ngx_http_push_stream_template_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_str_t *aux = NULL;
if (cur->eventsource) {
......@@ -669,7 +671,9 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
// move the channel to unrecoverable queue
ngx_rbtree_delete(&data->tree, &channel->node);
ngx_queue_remove(&channel->queue);
ngx_queue_insert_tail(&data->unrecoverable_channels, &channel->queue);
channel->queue_sentinel = &data->unrecoverable_channels;
// remove all messages
......@@ -695,95 +699,79 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
static void
ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force)
ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force)
{
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *prev, *cur = &data->channels_queue;
channel = (ngx_http_push_stream_channel_t *) node;
if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel)) {
ngx_http_push_stream_collect_expired_messages(data, shpool, force);
if ((channel != NULL) && (channel->deleted == 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, node->left, force);
}
while (((cur = ngx_queue_next(cur)) != &data->channels_queue) && (prev = ngx_queue_prev(cur))) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
if ((channel != NULL) && (channel->deleted == 0) && (channel->node.right != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, node->right, force);
if (channel->queue_sentinel != &data->channels_queue) {
break;
}
ngx_shmtx_lock(&shpool->mutex);
if ((channel != NULL) && (channel->deleted == 0)) {
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->last_activity_time + 30 < ngx_time())) {
// go back one node on queue, since the current node will be removed
cur = prev;
ngx_shmtx_lock(&shpool->mutex);
if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->last_activity_time + 30 < ngx_time())) {
if (!channel->deleted) {
channel->deleted = 1;
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
(channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
// move the channel to trash tree
ngx_rbtree_delete(&data->tree, &channel->node);
ngx_queue_remove(&channel->queue);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->channels_to_delete, &channel->node);
ngx_queue_insert_tail(&data->channels_to_delete_queue, &channel->queue);
channel->queue_sentinel = &data->channels_to_delete_queue;
}
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&shpool->mutex);
}
}
}
static void
ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force)
ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force)
{
ngx_http_push_stream_channel_t *channel;
channel = (ngx_http_push_stream_channel_t *) node;
if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel)) {
if ((channel != NULL) && (channel->deleted == 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages(data, shpool, node->left, force);
}
if ((channel != NULL) && (channel->deleted == 0) && (channel->node.right != NULL)) {
ngx_http_push_stream_collect_expired_messages(data, shpool, node->right, force);
}
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *cur = &data->channels_queue;
ngx_shmtx_lock(&shpool->mutex);
ngx_shmtx_lock(&shpool->mutex);
if ((channel != NULL) && (channel->deleted == 0)) {
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
}
while ((cur = ngx_queue_next(cur)) != &data->channels_queue) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
}
ngx_shmtx_unlock(&shpool->mutex);
}
static void
ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force)
{
ngx_rbtree_node_t *sentinel;
ngx_http_push_stream_channel_t *channel;
sentinel = tree->sentinel;
if (node != sentinel) {
if (node->left != NULL) {
ngx_http_push_stream_free_memory_of_expired_channels_locked(tree, shpool, node->left, force);
}
if (node->right != NULL) {
ngx_http_push_stream_free_memory_of_expired_channels_locked(tree, shpool, node->right, force);
}
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_queue_t *cur;
channel = (ngx_http_push_stream_channel_t *) node;
while ((cur = ngx_queue_head(&data->channels_to_delete_queue)) != &data->channels_to_delete_queue) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
if ((ngx_time() > channel->expires) || force) {
ngx_rbtree_delete(tree, node);
ngx_rbtree_delete(&data->channels_to_delete, &channel->node);
ngx_queue_remove(&channel->queue);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel);
} else {
break;
}
}
}
......@@ -813,7 +801,7 @@ ngx_http_push_stream_memory_cleanup()
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool);
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, data->tree.root, 0);
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
return NGX_OK;
......@@ -826,7 +814,7 @@ ngx_http_push_stream_buffer_cleanup()
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_collect_expired_messages(data, shpool, data->tree.root, 0);
ngx_http_push_stream_collect_expired_messages(data, shpool, 0);
return NGX_OK;
}
......
......@@ -91,6 +91,8 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->tree, &channel->node);
ngx_queue_insert_tail(&data->channels_queue, &channel->queue);
channel->queue_sentinel = &data->channels_queue;
(channel->broadcast) ? data->broadcast_channels++ : data->channels++;
}
......@@ -122,6 +124,7 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
if (channel != NULL) {
// move the channel back to main tree (recover from trash)
ngx_rbtree_delete(&data->channels_to_delete, &channel->node);
ngx_queue_remove(&channel->queue);
ngx_http_push_stream_initialize_channel(channel);
}
......@@ -155,6 +158,7 @@ ngx_http_push_stream_find_channel_locked(ngx_str_t *id, ngx_log_t *log)
if (channel != NULL) {
// move the channel back to main tree (recover from trash)
ngx_rbtree_delete(&data->channels_to_delete, &channel->node);
ngx_queue_remove(&channel->queue);
ngx_http_push_stream_initialize_channel(channel);
}
}
......
......@@ -118,7 +118,7 @@ class TestCleanupMemory < Test::Unit::TestCase
end
end
EM.add_timer(50) do
EM.add_timer(40) do
create_and_delete_channel_in_loop(channel, body, headers)
EM.add_timer(5) do
......@@ -131,7 +131,7 @@ class TestCleanupMemory < Test::Unit::TestCase
end
end
end
add_test_timeout(60)
add_test_timeout(50)
end
end
......@@ -282,7 +282,7 @@ class TestCleanupMemory < Test::Unit::TestCase
})
end
EM.add_timer(45) do
EM.add_timer(50) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
......@@ -304,7 +304,7 @@ class TestCleanupMemory < Test::Unit::TestCase
end
end
end
add_test_timeout(50)
add_test_timeout(60)
end
end
......@@ -340,7 +340,7 @@ class TestCleanupMemory < Test::Unit::TestCase
i += 1
end
EM.add_timer(60) do
EM.add_timer(40) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
......@@ -407,7 +407,7 @@ class TestCleanupMemory < Test::Unit::TestCase
})
end
EM.add_timer(45) do
EM.add_timer(40) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
......@@ -464,7 +464,7 @@ class TestCleanupMemory < Test::Unit::TestCase
i += 1
end
EM.add_timer(60) do
EM.add_timer(40) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
......
......@@ -2,7 +2,7 @@ require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestCleanupMemoryAfterKill < Test::Unit::TestCase
include BaseTestCase
@@second_step_timer = 120
@@second_step_timer = 90
@@timeout = 130
def global_configuration
......
......@@ -2,7 +2,7 @@ require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestCleanupMemoryAfterReload < Test::Unit::TestCase
include BaseTestCase
@@second_step_timer = 120
@@second_step_timer = 90
@@timeout = 130
def global_configuration
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment