Commit 78409d82 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding function ngx_http_push_stream_run_cleanup_pool_handler and refactor on...

adding function ngx_http_push_stream_run_cleanup_pool_handler and refactor on channel delete to avoid infinite loop
parent 3ac55889
...@@ -289,22 +289,17 @@ ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect) ...@@ -289,22 +289,17 @@ ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect)
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc; ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot; ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel; ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_subscriber_t *cur = sentinel; ngx_http_push_stream_worker_subscriber_t *cur = sentinel;
time_t now = ngx_time(); time_t now = ngx_time();
ngx_shmtx_lock(&shpool->mutex);
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) { if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) {
ngx_http_push_stream_worker_subscriber_cleanup_locked(cur);
ngx_http_push_stream_send_response_finalize(cur->request); ngx_http_push_stream_send_response_finalize(cur->request);
} else { } else {
break; break;
} }
} }
ngx_shmtx_unlock(&shpool->mutex);
} }
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include <ngx_http_push_stream_module_utils.h> #include <ngx_http_push_stream_module_utils.h>
static void nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel); static void nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel);
static void ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler);
static ngx_inline void static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired) { ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired) {
...@@ -48,24 +49,31 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_ ...@@ -48,24 +49,31 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
} }
static ngx_inline void static void
ngx_http_push_stream_delete_worker_channel(void) ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node)
{ {
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *cur_worker; ngx_http_push_stream_pid_queue_t *cur_worker;
ngx_http_push_stream_subscriber_t *cur; ngx_http_push_stream_subscriber_t *cur;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc; ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot; ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *worker_subscriber; ngx_http_push_stream_worker_subscriber_t *worker_subscriber;
ngx_http_push_stream_subscription_t *cur_subscription; ngx_http_push_stream_subscription_t *cur_subscription;
while (data->unrecoverable_channels.root != data->unrecoverable_channels.sentinel) { channel = (ngx_http_push_stream_channel_t *) node;
ngx_shmtx_lock(&shpool->mutex);
// try to delete the channel at the root of the tree if ((channel != NULL) && (channel->node.key != 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel) && (&channel->node != data->unrecoverable_channels.sentinel)) {
if (data->unrecoverable_channels.root != data->unrecoverable_channels.sentinel) {
channel = (ngx_http_push_stream_channel_t *) data->unrecoverable_channels.root; if ((channel != NULL) && (channel->node.key != 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool, node->left);
}
if ((channel != NULL) && (channel->node.key != 0) && (channel->node.right != NULL)) {
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool, node->right);
}
if ((channel != NULL) && (channel->node.key != 0)) {
// remove subscribers if any
if (channel->subscribers > 0) { if (channel->subscribers > 0) {
cur_worker = &channel->workers_with_subscribers; cur_worker = &channel->workers_with_subscribers;
...@@ -88,10 +96,12 @@ ngx_http_push_stream_delete_worker_channel(void) ...@@ -88,10 +96,12 @@ ngx_http_push_stream_delete_worker_channel(void)
if (cur_subscription->channel == channel) { if (cur_subscription->channel == channel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
ngx_shmtx_lock(&shpool->mutex);
// remove the reference from subscription for channel // remove the reference from subscription for channel
ngx_queue_remove(&cur_subscription->queue); ngx_queue_remove(&cur_subscription->queue);
// remove the reference from channel for subscriber // remove the reference from channel for subscriber
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_send_response_message(cur->request, channel, channel->channel_deleted_message); ngx_http_push_stream_send_response_message(cur->request, channel, channel->channel_deleted_message);
...@@ -101,7 +111,6 @@ ngx_http_push_stream_delete_worker_channel(void) ...@@ -101,7 +111,6 @@ ngx_http_push_stream_delete_worker_channel(void)
// subscriber does not have any other subscription, the connection may be closed // subscriber does not have any other subscription, the connection may be closed
if (ngx_queue_empty(&worker_subscriber->subscriptions_sentinel.queue)) { if (ngx_queue_empty(&worker_subscriber->subscriptions_sentinel.queue)) {
ngx_http_push_stream_worker_subscriber_cleanup_locked(worker_subscriber);
ngx_http_push_stream_send_response_finalize(worker_subscriber->request); ngx_http_push_stream_send_response_finalize(worker_subscriber->request);
} }
...@@ -112,13 +121,32 @@ ngx_http_push_stream_delete_worker_channel(void) ...@@ -112,13 +121,32 @@ ngx_http_push_stream_delete_worker_channel(void)
} }
} }
} else { }
// channel has not subscribers and can be released
if (channel->subscribers == 0) {
ngx_shmtx_lock(&shpool->mutex);
// avoid more than one worker try to free channel memory
if ((channel != NULL) && (channel->node.key != 0) && (channel->node.left != NULL) && (channel->node.right != NULL)) {
ngx_rbtree_delete(&data->unrecoverable_channels, &channel->node); ngx_rbtree_delete(&data->unrecoverable_channels, &channel->node);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel); nxg_http_push_stream_free_channel_memory_locked(shpool, channel);
} }
}
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
} }
}
}
}
static ngx_inline void
ngx_http_push_stream_delete_worker_channel(void)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
if (data->unrecoverable_channels.root != data->unrecoverable_channels.sentinel) {
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool, data->unrecoverable_channels.root);
}
} }
...@@ -371,11 +399,29 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex ...@@ -371,11 +399,29 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex
return ngx_http_output_filter(r, out); return ngx_http_output_filter(r, out);
} }
static void
ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler)
{
ngx_pool_cleanup_t *c;
for (c = p->cleanup; c; c = c->next) {
if ((c->handler == handler) && (c->data != NULL)) {
c->handler(c->data);
return;
}
}
}
/**
* Should never be called inside a locked block
* */
static void static void
ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r) ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
{ {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_subscriber_cleanup);
if (pslcf->footer_template.len > 0) { if (pslcf->footer_template.len > 0) {
ngx_http_push_stream_send_response_text(r, pslcf->footer_template.data, pslcf->footer_template.len, 0); ngx_http_push_stream_send_response_text(r, pslcf->footer_template.data, pslcf->footer_template.len, 0);
} }
...@@ -537,16 +583,11 @@ static void ...@@ -537,16 +583,11 @@ static void
nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel) nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel)
{ {
// delete the worker-subscriber queue // delete the worker-subscriber queue
ngx_http_push_stream_pid_queue_t *workers_sentinel, *cur, *next; ngx_http_push_stream_pid_queue_t *cur;
workers_sentinel = &channel->workers_with_subscribers;
cur = (ngx_http_push_stream_pid_queue_t *)ngx_queue_next(&workers_sentinel->queue);
while (cur != workers_sentinel) { while ((cur = (ngx_http_push_stream_pid_queue_t *)ngx_queue_next(&channel->workers_with_subscribers.queue)) != &channel->workers_with_subscribers) {
next = (ngx_http_push_stream_pid_queue_t *)ngx_queue_next(&cur->queue);
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
ngx_slab_free_locked(shpool, cur); ngx_slab_free_locked(shpool, cur);
cur = next;
} }
ngx_slab_free_locked(shpool, channel->id.data); ngx_slab_free_locked(shpool, channel->id.data);
......
...@@ -490,6 +490,7 @@ class TestPublisherAdmin < Test::Unit::TestCase ...@@ -490,6 +490,7 @@ class TestPublisherAdmin < Test::Unit::TestCase
channel_1 = 'test_delete_channel_whith_subscriber_in_two_channels_1' channel_1 = 'test_delete_channel_whith_subscriber_in_two_channels_1'
channel_2 = 'test_delete_channel_whith_subscriber_in_two_channels_2' channel_2 = 'test_delete_channel_whith_subscriber_in_two_channels_2'
stage1_complete = false stage1_complete = false
stage2_complete = false
resp = "" resp = ""
EventMachine.run { EventMachine.run {
...@@ -539,12 +540,34 @@ class TestPublisherAdmin < Test::Unit::TestCase ...@@ -539,12 +540,34 @@ class TestPublisherAdmin < Test::Unit::TestCase
assert_equal(200, pub.response_header.status, "Request was not received") assert_equal(200, pub.response_header.status, "Request was not received")
} }
} }
else elsif !stage2_complete
stage2_complete = true
response = JSON.parse(resp.split("\r\n")[2]) response = JSON.parse(resp.split("\r\n")[2])
assert_equal(channel_2, response["channel"], "Wrong channel") assert_equal(channel_2, response["channel"], "Wrong channel")
assert_equal(1, response["id"].to_i, "Wrong message id") assert_equal(1, response["id"].to_i, "Wrong message id")
assert_equal(body, response["text"], "Wrong message id") assert_equal(body, response["text"], "Wrong message id")
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_2.to_s).delete :head => headers, :timeout => 30
pub.callback {
assert_equal(200, pub.response_header.status, "Request was not received")
assert_equal(0, pub.response_header.content_length, "Should response only with headers")
assert_equal("Channel deleted.", pub.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
}
else
response = JSON.parse(resp.split("\r\n")[3])
assert_equal(channel_2, response["channel"], "Wrong channel")
assert_equal(-2, response["id"].to_i, "Wrong message id")
assert_equal("Channel deleted", response["text"], "Wrong message text")
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => {'accept' => 'application/json'}, :timeout => 30
stats.callback {
assert_equal(200, stats.response_header.status, "Don't get channels statistics")
assert_not_equal(0, stats.response_header.content_length, "Don't received channels statistics")
response = JSON.parse(stats.response)
assert_equal(0, response["subscribers"].to_i, "Subscriber was not deleted")
assert_equal(0, response["channels"].to_i, "Channel was not deleted")
EventMachine.stop EventMachine.stop
}
end end
rescue JSON::ParserError rescue JSON::ParserError
EventMachine.stop EventMachine.stop
...@@ -552,6 +575,87 @@ class TestPublisherAdmin < Test::Unit::TestCase ...@@ -552,6 +575,87 @@ class TestPublisherAdmin < Test::Unit::TestCase
end end
end end
} }
add_test_timeout
}
end
def config_test_delete_channels_whith_subscribers
@header_template = nil
@footer_template = "FOOTER"
@ping_message_interval = nil
@memory_cleanup_timeout = nil
@message_template = '{\"id\":\"~id~\", \"channel\":\"~channel~\", \"text\":\"~text~\"}'
end
def test_delete_channels_whith_subscribers
headers = {'accept' => 'application/json'}
body = 'published message'
channel_1 = 'test_delete_channels_whith_subscribers_1'
channel_2 = 'test_delete_channels_whith_subscribers_2'
EventMachine.run {
resp_1 = ""
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
resp_1 += chunk
}
sub_1.callback {
assert_equal("{\"id\":\"-2\", \"channel\":\"test_delete_channels_whith_subscribers_1\", \"text\":\"Channel deleted\"}\r\nFOOTER\r\n", resp_1, "Subscriber was not created")
}
resp_2 = ""
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s).get :head => headers, :timeout => 30
sub_2.stream { |chunk|
resp_2 += chunk
}
sub_2.callback {
assert_equal("{\"id\":\"-2\", \"channel\":\"test_delete_channels_whith_subscribers_2\", \"text\":\"Channel deleted\"}\r\nFOOTER\r\n", resp_2, "Subscriber was not created")
}
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => {'accept' => 'application/json'}, :timeout => 30
stats.callback {
assert_equal(200, stats.response_header.status, "Don't get channels statistics")
assert_not_equal(0, stats.response_header.content_length, "Don't received channels statistics")
begin
response = JSON.parse(stats.response)
assert_equal(2, response["subscribers"].to_i, "Subscriber was not created")
assert_equal(2, response["channels"].to_i, "Channel was not created")
rescue JSON::ParserError
fail("Didn't receive a valid response")
end
}
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_1.to_s).delete :head => headers, :timeout => 30
pub_1.callback {
assert_equal(200, pub_1.response_header.status, "Request was not received")
assert_equal(0, pub_1.response_header.content_length, "Should response only with headers")
assert_equal("Channel deleted.", pub_1.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
}
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_2.to_s).delete :head => headers, :timeout => 30
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Request was not received")
assert_equal(0, pub_2.response_header.content_length, "Should response only with headers")
assert_equal("Channel deleted.", pub_2.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
}
EM.add_timer(5) {
stats_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => {'accept' => 'application/json'}, :timeout => 30
stats_2.callback {
assert_equal(200, stats_2.response_header.status, "Don't get channels statistics")
assert_not_equal(0, stats_2.response_header.content_length, "Don't received channels statistics")
begin
response = JSON.parse(stats_2.response)
assert_equal(0, response["subscribers"].to_i, "Subscriber was not created")
assert_equal(0, response["channels"].to_i, "Channel was not created")
rescue JSON::ParserError
fail("Didn't receive a valid response")
end
EventMachine.stop
}
}
add_test_timeout(10)
} }
end end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment