Commit baed4ca7 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

replacing unrecoverable_channels tree by a queue to be easier go through all channels on it

parent 57e0e153
......@@ -136,6 +136,7 @@ typedef struct {
// our typecast-friendly rbtree node (channel)
typedef struct {
ngx_rbtree_node_t node; // this MUST be first
ngx_queue_t queue;
ngx_str_t id;
ngx_uint_t last_message_id;
time_t last_message_time;
......@@ -213,7 +214,7 @@ typedef struct {
ngx_uint_t subscribers; // # of subscribers in all channels
ngx_http_push_stream_msg_t messages_to_delete;
ngx_rbtree_t channels_to_delete;
ngx_rbtree_t unrecoverable_channels;
ngx_queue_t unrecoverable_channels;
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup;
time_t last_message_time;
......
......@@ -893,7 +893,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
ngx_rbtree_node_t *sentinel, *remove_sentinel, *unrecoverable_sentinel;
ngx_rbtree_node_t *sentinel, *remove_sentinel;
ngx_http_push_stream_shm_data_t *d;
if ((d = (ngx_http_push_stream_shm_data_t *) ngx_slab_alloc(shpool, sizeof(*d))) == NULL) { //shm_data plus an array.
......@@ -924,10 +924,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
ngx_rbtree_init(&d->channels_to_delete, remove_sentinel, ngx_http_push_stream_rbtree_insert);
if ((unrecoverable_sentinel = ngx_slab_alloc(shpool, sizeof(*unrecoverable_sentinel))) == NULL) {
return NGX_ERROR;
}
ngx_rbtree_init(&d->unrecoverable_channels, unrecoverable_sentinel, ngx_http_push_stream_rbtree_insert);
ngx_queue_init(&d->unrecoverable_channels);
// create ping message
if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
......
......@@ -58,88 +58,84 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
static void
ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node)
ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool)
{
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *cur_worker;
ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_subscription_t *cur_subscription;
channel = (ngx_http_push_stream_channel_t *) node;
if ((channel != NULL) && (channel->node.key != 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel) && (&channel->node != data->unrecoverable_channels.sentinel)) {
ngx_queue_t *prev_channel, *cur_channel = &data->unrecoverable_channels;
if ((channel != NULL) && (channel->node.key != 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool, node->left);
}
if ((channel != NULL) && (channel->node.key != 0) && (channel->node.right != NULL)) {
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool, node->right);
}
while ((cur_channel = ngx_queue_next(cur_channel)) != &data->unrecoverable_channels) {
channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue);
if ((channel != NULL) && (channel->node.key != 0)) {
// remove subscribers if any
if (channel->subscribers > 0) {
cur_worker = &channel->workers_with_subscribers;
// remove subscribers if any
if (channel->subscribers > 0) {
cur_worker = &channel->workers_with_subscribers;
// find the current work
while ((cur_worker = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur_worker->queue)) != &channel->workers_with_subscribers) {
if (cur_worker->pid == ngx_pid) {
// find the current worker
while ((cur_worker = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur_worker->queue)) != &channel->workers_with_subscribers) {
if (cur_worker->pid == ngx_pid) {
// to each subscriber of this channel in this worker
while (!ngx_queue_empty(&cur_worker->subscribers_sentinel.queue)) {
cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur_worker->subscribers_sentinel.queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
// to each subscriber of this channel in this worker
while (!ngx_queue_empty(&cur_worker->subscribers_sentinel.queue)) {
cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur_worker->subscribers_sentinel.queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
// find the subscription for the channel being deleted
cur_subscription = &subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) {
if (cur_subscription->channel == channel) {
// find the subscription for the channel being deleted
cur_subscription = &subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) {
if (cur_subscription->channel == channel) {
ngx_shmtx_lock(&shpool->mutex);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
// remove the reference from subscription for channel
ngx_queue_remove(&cur_subscription->queue);
// remove the reference from channel for subscriber
ngx_queue_remove(&cur->queue);
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_lock(&shpool->mutex);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
// remove the reference from subscription for channel
ngx_queue_remove(&cur_subscription->queue);
// remove the reference from channel for subscriber
ngx_queue_remove(&cur->queue);
ngx_shmtx_unlock(&shpool->mutex);
if (subscriber->longpolling) {
ngx_http_push_stream_add_response_header(subscriber->request, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool);
ngx_http_send_header(subscriber->request);
if (subscriber->longpolling) {
ngx_http_push_stream_add_response_header(subscriber->request, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool);
ngx_http_send_header(subscriber->request);
ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
}
ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
}
ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message, 1, 1);
ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message, 1, 1);
break;
}
break;
}
}
// subscriber does not have any other subscription, the connection may be closed
if (subscriber->longpolling || ngx_queue_empty(&subscriber->subscriptions_sentinel.queue)) {
ngx_http_push_stream_send_response_finalize(subscriber->request);
}
// subscriber does not have any other subscription, the connection may be closed
if (subscriber->longpolling || ngx_queue_empty(&subscriber->subscriptions_sentinel.queue)) {
ngx_http_push_stream_send_response_finalize(subscriber->request);
}
}
}
}
}
}
// channel has not subscribers and can be released
if (channel->subscribers == 0) {
ngx_shmtx_lock(&shpool->mutex);
// avoid more than one worker try to free channel memory
if ((channel != NULL) && (channel->node.key != 0) && (channel->node.left != NULL) && (channel->node.right != NULL)) {
ngx_rbtree_delete(&data->unrecoverable_channels, &channel->node);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel);
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_lock(&shpool->mutex);
while (((cur_channel = ngx_queue_next(cur_channel)) != &data->unrecoverable_channels) && (prev_channel = ngx_queue_prev(cur_channel))) {
channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue);
// channel has not subscribers and can be released
if (channel->subscribers == 0) {
if (channel->expires == 0) {
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
} else if (ngx_time() > channel->expires) {
cur_channel = prev_channel;
ngx_queue_remove(&channel->queue);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel);
}
}
}
ngx_shmtx_unlock(&shpool->mutex);
}
......@@ -149,9 +145,7 @@ ngx_http_push_stream_delete_worker_channel(void)
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
if (data->unrecoverable_channels.root != data->unrecoverable_channels.sentinel) {
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool, data->unrecoverable_channels.root);
}
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool);
}
ngx_uint_t
......@@ -673,10 +667,9 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
channel->deleted = 1;
(channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
// move the channel to unrecoverable tree
// move the channel to unrecoverable queue
ngx_rbtree_delete(&data->tree, &channel->node);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->unrecoverable_channels, &channel->node);
ngx_queue_insert_tail(&data->unrecoverable_channels, &channel->queue);
// remove all messages
......@@ -707,7 +700,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
ngx_http_push_stream_channel_t *channel;
channel = (ngx_http_push_stream_channel_t *) node;
if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel) && (&channel->node != data->unrecoverable_channels.sentinel)) {
if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel)) {
if ((channel != NULL) && (channel->deleted == 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, node->left, force);
......@@ -746,7 +739,7 @@ ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *d
ngx_http_push_stream_channel_t *channel;
channel = (ngx_http_push_stream_channel_t *) node;
if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel) && (&channel->node != data->unrecoverable_channels.sentinel)) {
if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel)) {
if ((channel != NULL) && (channel->deleted == 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages(data, shpool, node->left, force);
......@@ -819,7 +812,7 @@ ngx_http_push_stream_memory_cleanup()
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool, data->unrecoverable_channels.root);
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool);
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, data->tree.root, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
......
......@@ -47,7 +47,7 @@ class TestCleanupMemory < Test::Unit::TestCase
i += 1
end
EM.add_timer(55) do
EM.add_timer(40) do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + i.to_s).post :body => body, :head => headers, :timeout => 30
......@@ -66,6 +66,71 @@ class TestCleanupMemory < Test::Unit::TestCase
i += 1
end
end
add_test_timeout(45)
end
end
def create_and_delete_channel(channel, body, headers, &block)
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :body => body, :head => headers, :timeout => 30
pub_1.callback do
if pub_1.response_header.status == 200
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).delete :head => headers, :timeout => 30
pub.callback do
block.call((pub.response_header.status == 200) ? :success : :error)
end
else
block.call(:error)
end
end
end
def create_and_delete_channel_in_loop(channel, body, headers)
create_and_delete_channel(channel, body, headers) do |status|
create_and_delete_channel_in_loop(channel, body, headers) if status == :success
end
end
def config_test_channel_cleanup_after_delete_same_id
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = nil
@publisher_mode = 'admin'
end
def test_channel_cleanup_after_delete_same_id
channel = 'ch_test_channel_cleanup'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
published_messages_setp_1 = 0
EventMachine.run do
create_and_delete_channel_in_loop(channel, body, headers)
EM.add_timer(5) do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
fail("Don't create any message") if published_messages_setp_1 == 0
end
end
EM.add_timer(50) do
create_and_delete_channel_in_loop(channel, body, headers)
EM.add_timer(5) do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
assert_equal(published_messages_setp_1, result["published_messages"].to_i / 2, "Don't cleaned all memory")
EventMachine.stop
end
end
end
add_test_timeout(60)
end
end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment