Commit d3c644bf authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

fix memory leak of not stored messages in channel

parent c7e67fda
......@@ -92,10 +92,10 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_X_YAML = ngx_string("text/x-
" <subscribers>%ui</subscribers>" CRLF \
"</channel>" CRLF
#define NGX_HTTP_PUSH_STREAM_WORKER_INFO_XML_PATTERN \
"<worker>" CRLF \
" <pid>%d</pid>" CRLF \
" <subscribers>%ui</subscribers>" CRLF \
"</worker>" CRLF
"<worker>" CRLF \
" <pid>%d</pid>" CRLF \
" <subscribers>%ui</subscribers>" CRLF \
"</worker>" CRLF
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML = ngx_string("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_XML = ngx_string("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF "<root>" CRLF" <hostname>%s</hostname>" CRLF" <time>%s</time>" CRLF" <channels>%ui</channels>" CRLF" <broadcast_channels>%ui</broadcast_channels>" CRLF" <infos>" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_XML = ngx_string(" </infos>" CRLF"</root>" CRLF);
......@@ -221,8 +221,10 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in
static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
u_char * ngx_http_push_stream_append_crlf(const ngx_str_t *str, ngx_pool_t *pool);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
......
......@@ -396,6 +396,7 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
// in shared memory, identified by pid.
ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *cur = sentinel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
pid_t worker_pid = cur->pid;
......@@ -409,6 +410,12 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with some other worker process");
}
}
if ((msg->queue.prev == NULL) && (msg->queue.next == NULL)) {
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_mark_message_to_delete_locked(msg);
ngx_shmtx_unlock(&shpool->mutex);
}
}
static ngx_int_t
......
......@@ -27,7 +27,6 @@
static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired) {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_msg_t *sentinel, *msg;
sentinel = &channel->message_queue;
......@@ -39,15 +38,14 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
break;
}
msg->deleted = 1;
msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->memory_cleanup_timeout;
channel->stored_messages--;
ngx_queue_remove(&msg->queue);
ngx_queue_insert_tail(&data->messages_to_delete.queue, &msg->queue);
ngx_http_push_stream_mark_message_to_delete_locked(msg);
}
}
ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf)
{
......@@ -85,6 +83,8 @@ ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf)
msg->buf->memory = 1;
msg->deleted = 0;
msg->expires = 0;
msg->queue.prev = NULL;
msg->queue.next = NULL;
return msg;
}
......@@ -230,7 +230,7 @@ ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *d
ngx_http_push_stream_channel_t *channel;
channel = (ngx_http_push_stream_channel_t *) node;
if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel)) {
if ((channel != NULL) && (channel->deleted == 0) && (&channel->node != data->tree.sentinel) && (&channel->node != data->channels_to_delete.sentinel)) {
if ((channel != NULL) && (channel->deleted == 0) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages(data, shpool, node->left, force);
......@@ -334,9 +334,7 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
next = (ngx_http_push_stream_msg_t *)ngx_queue_next(&cur->queue);
if ((ngx_time() > cur->expires) || force) {
ngx_queue_remove(&cur->queue);
ngx_slab_free_locked(shpool, cur->buf->start);
ngx_slab_free_locked(shpool, cur->buf);
ngx_slab_free_locked(shpool, cur);
ngx_http_push_stream_free_message_memory_locked(shpool, cur);
}
cur = next;
}
......@@ -347,6 +345,24 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
}
static void
ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg) {
ngx_slab_free_locked(shpool, msg->buf->start);
ngx_slab_free_locked(shpool, msg->buf);
ngx_slab_free_locked(shpool, msg);
}
static void
ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg) {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
msg->deleted = 1;
msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->memory_cleanup_timeout;
ngx_queue_insert_tail(&data->messages_to_delete.queue, &msg->queue);
}
static void
ngx_http_push_stream_ping_timer_set(ngx_http_push_stream_loc_conf_t *pslcf)
{
......
......@@ -18,27 +18,18 @@ class TestCreateManyChannels < Test::Unit::TestCase
EventMachine.run {
# ensure space for a subscriber after memory was full
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
fail_if_connecttion_error(sub_1)
i = 0
EM.add_periodic_timer(0.05) do
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
if pub_1.response_header.status == 500
EventMachine.stop
else
i += 1
end
EventMachine.stop if pub_1.response_header.status == 500
}
fail_if_connecttion_error(pub_1)
end
}
EventMachine.run {
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
fail_if_connecttion_error(sub_1)
stored_messages_setp_1 = 0
stored_messages_setp_2 = 0
......@@ -48,27 +39,26 @@ class TestCreateManyChannels < Test::Unit::TestCase
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
stored_messages_setp_1 = JSON.parse(pub_2.response)["stored_messages"].to_i
sleep(40) #wait for message timeout and for cleanup timer
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback {
assert_equal(200, pub_3.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_3.response_header.content_length, "Don't received channels statistics")
stored_messages_setp_2 = JSON.parse(pub_3.response)["stored_messages"].to_i
assert(stored_messages_setp_1 > stored_messages_setp_2, "Messages weren't clean up: #{stored_messages_setp_1} <= #{stored_messages_setp_2}")
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_4.callback {
assert_equal(200, pub_4.response_header.status, "Don't get channels statistics")
assert_equal(stored_messages_setp_2 + 1, JSON.parse(pub_4.response)["stored_messages"].to_i, "Don't get channels statistics")
EventMachine.stop
i = 0
EM.add_periodic_timer(1) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback {
assert_equal(200, pub_3.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_3.response_header.content_length, "Don't received channels statistics")
stored_messages_setp_2 = JSON.parse(pub_3.response)["stored_messages"].to_i
if (stored_messages_setp_1 > stored_messages_setp_2)
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_4.callback {
EventMachine.stop if (pub_4.response_header.status == 200)
}
end
fail("Don't free the memory in 60 seconds") if (i == 60)
i += 1
}
fail_if_connecttion_error(pub_4)
}
fail_if_connecttion_error(pub_3)
end
}
fail_if_connecttion_error(pub_2)
}
end
......@@ -83,18 +73,14 @@ class TestCreateManyChannels < Test::Unit::TestCase
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
i = 0
EventMachine.run {
EM.add_periodic_timer(0.05) do
i = 0
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + i.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
if pub_1.response_header.status == 500
EventMachine.stop
else
i += 1
end
EventMachine.stop if pub_1.response_header.status == 500
i += 1
}
fail_if_connecttion_error(pub_1)
end
}
......@@ -106,37 +92,96 @@ class TestCreateManyChannels < Test::Unit::TestCase
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
channels_setp_1 = JSON.parse(pub_2.response)["channels"].to_i
#depends in wich step memory was full
assert((channels_setp_1 == i) || (channels_setp_1 == i.next), "Channels were not here anymore")
sleep(45) #wait for message timeout and for cleanup timer
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback {
assert_equal(200, pub_3.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_3.response_header.content_length, "Don't received channels statistics")
channels_setp_2 = JSON.parse(pub_3.response)["channels"].to_i
assert(channels_setp_1 > channels_setp_2, "Channels weren't clean up: #{channels_setp_1} <= #{channels_setp_2}")
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + (i + 1).to_s).post :head => headers, :body => body, :timeout => 60
pub_4.callback {
assert_equal(200, pub_4.response_header.status, "Don't get channels statistics")
pub_5 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_5.callback {
assert_equal(200, pub_5.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_5.response_header.content_length, "Don't received channels statistics")
assert_equal(channels_setp_2 + 1, JSON.parse(pub_5.response)["channels"].to_i, "Don't get channels statistics")
EventMachine.stop
}
fail_if_connecttion_error(pub_5)
i = 0
EM.add_periodic_timer(1) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback {
assert_equal(200, pub_3.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_3.response_header.content_length, "Don't received channels statistics")
channels_setp_2 = JSON.parse(pub_3.response)["channels"].to_i
if (channels_setp_1 > channels_setp_2)
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + (i + 1).to_s).post :head => headers, :body => body, :timeout => 60
pub_4.callback {
EventMachine.stop if (pub_4.response_header.status == 200)
}
end
fail("Don't free the memory in 60 seconds") if (i == 60)
i += 1
}
fail_if_connecttion_error(pub_4)
}
fail_if_connecttion_error(pub_3)
end
}
fail_if_connecttion_error(pub_2)
}
end
def config_test_message_cleanup_with_store_off_with_subscriber
@max_reserved_memory = "32k"
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
end
def test_message_cleanup_with_store_off_with_subscriber
channel = 'ch_test_message_cleanup_with_store_off_with_subscriber'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
EventMachine.run {
# ensure space for a subscriber after memory was full
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if (pub_1.response_header.status == 500)
}
end
}
i = 0
EventMachine.run {
EM.add_periodic_timer(1) do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_2.callback {
fail("Don't free the memory in 60 seconds") if (i == 60)
EventMachine.stop if (pub_2.response_header.status == 200)
i += 1
}
end
}
end
def config_test_message_cleanup_with_store_off_without_subscriber
@max_reserved_memory = "32k"
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
end
def test_message_cleanup_with_store_off_without_subscriber
channel = 'ch_test_message_cleanup_with_store_off_without_subscriber'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
EventMachine.run {
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if (pub_1.response_header.status == 500)
}
end
}
i = 0
EventMachine.run {
EM.add_periodic_timer(1) do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_2.callback {
fail("Don't free the memory in 60 seconds") if (i == 60)
EventMachine.stop if (pub_2.response_header.status == 200)
i += 1
}
end
}
end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment