Commit 1745a5e6 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

setting default value to message_ttl as 30 minutes to avoid memory leak when...

setting default value to message_ttl as 30 minutes to avoid memory leak when stored is on and message_ttl was not configured
parent d2f4a9a4
h1(#changelog). Changelog
* Changed default value of push_stream_message_ttl to 30 minutes to avoid memory leak of a message which is never discarded
h2. Version 0.3.3
* Adding JSONP support to pushstream.js and dynamically callback parameter
......
......@@ -54,11 +54,11 @@ h2(#push_stream_message_ttl). push_stream_message_ttl <a name="push_stream_messa
*syntax:* _push_stream_message_ttl time_
*default:* _none_
*default:* _30m_
*context:* _http_
The length of time a message may be queued before it is considered expired. If you do not want messages to expire, just not set this directive.
The length of time a message may be queued before it is considered expired.
h2(#push_stream_max_subscribers_per_channel). push_stream_max_subscribers_per_channel <a name="push_stream_max_subscribers_per_channel" href="#">&nbsp;</a>
......
......@@ -44,7 +44,6 @@ h2(#push_stream_store_messages). push_stream_store_messages <a name="push_stream
*context:* _location (push_stream_publisher)_
Whether or not message queuing is enabled.
If store messages is "on" is needed to set at least one of these two directives push_stream_message_ttl or push_stream_max_messages_stored_per_channel.
h2(#push_stream_keepalive). push_stream_keepalive <a name="push_stream_keepalive" href="#">&nbsp;</a>
......
......@@ -70,7 +70,6 @@ typedef struct {
ngx_str_t broadcast_channel_prefix;
ngx_uint_t max_number_of_channels;
ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t buffer_cleanup_interval;
time_t message_ttl;
ngx_uint_t max_subscribers_per_channel;
ngx_uint_t max_messages_stored_per_channel;
......
......@@ -34,8 +34,10 @@
#include <ngx_http_push_stream_module_subscriber.h>
#include <ngx_http_push_stream_module_websocket.h>
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 30; // 30 seconds
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
#define NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL 5000 // 5 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 30; // 30 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TTL = 1800; // 30 minutes
#define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE "~text~"
......
......@@ -258,7 +258,7 @@ static void ngx_http_push_stream_timer_set(ngx_msec_t timer_inte
static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event);
#define ngx_http_push_stream_memory_cleanup_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event, ngx_http_push_stream_memory_cleanup_timer_wake_handler, 1);
#define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages);
#define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages);
static void ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
......
......@@ -409,7 +409,6 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->broadcast_channel_prefix.data = NULL;
mcf->max_number_of_channels = NGX_CONF_UNSET_UINT;
mcf->max_number_of_broadcast_channels = NGX_CONF_UNSET_UINT;
mcf->buffer_cleanup_interval = NGX_CONF_UNSET_MSEC;
mcf->message_ttl = NGX_CONF_UNSET;
mcf->max_channel_id_length = NGX_CONF_UNSET_UINT;
mcf->max_subscribers_per_channel = NGX_CONF_UNSET;
......@@ -432,6 +431,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
return NGX_CONF_OK;
}
ngx_conf_init_value(conf->message_ttl, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TTL);
ngx_conf_init_value(conf->shm_cleanup_objects_ttl, NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL);
ngx_conf_init_size_value(conf->shm_size, NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE);
ngx_conf_merge_str_value(conf->channel_deleted_message_text, conf->channel_deleted_message_text, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT);
......@@ -485,14 +485,6 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
ngx_uint_t interval = conf->shm_cleanup_objects_ttl / 10;
conf->memory_cleanup_interval = (interval * 1000) + 1000; // min 4 seconds (((30 / 10) * 1000) + 1000)
// calc buffer cleanup interval
if (conf->message_ttl != NGX_CONF_UNSET) {
ngx_uint_t interval = conf->message_ttl / 3;
conf->buffer_cleanup_interval = (interval > 1) ? (interval * 1000) + 1000 : 1000; // min 1 second
} else if (conf->buffer_cleanup_interval == NGX_CONF_UNSET_MSEC) {
conf->buffer_cleanup_interval = 1000; // 1 second
}
ngx_regex_compile_t *backtrack_parser = NULL;
u_char errstr[NGX_MAX_CONF_ERRSTR];
......@@ -687,12 +679,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
return NGX_CONF_ERROR;
}
// store messages cannot be set without buffer timeout or max messages
if (conf->store_messages && (ngx_http_push_stream_module_main_conf->message_ttl == NGX_CONF_UNSET) && (ngx_http_push_stream_module_main_conf->max_messages_stored_per_channel == NGX_CONF_UNSET_UINT)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_store_messages cannot be set without set max message buffer length or min message buffer timeout.");
return NGX_CONF_ERROR;
}
// formatting header and footer template for chunk transfer
if (conf->header_template.len > 0) {
ngx_str_t *aux = NULL;
......
......@@ -321,7 +321,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
channel->last_message_time = data->last_message_time = msg->time;
channel->last_message_tag = data->last_message_tag = msg->tag;
// set message expiration time
msg->expires = (ngx_http_push_stream_module_main_conf->message_ttl == NGX_CONF_UNSET ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->message_ttl));
msg->expires = msg->time + ngx_http_push_stream_module_main_conf->message_ttl;
// put messages on the queue
if (cf->store_messages) {
......@@ -986,7 +986,7 @@ static void
ngx_http_push_stream_buffer_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_push_stream_buffer_cleanup();
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event);
ngx_http_push_stream_timer_reset(NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL, &ngx_http_push_stream_buffer_cleanup_event);
}
static u_char *
......
......@@ -4,10 +4,10 @@ class TestCreateManyChannels < Test::Unit::TestCase
include BaseTestCase
def config_test_message_cleanup
@min_message_buffer_timeout = '10s'
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = 100
@memory_cleanup_timeout = '30s'
end
def test_message_cleanup
......@@ -37,7 +37,10 @@ class TestCreateManyChannels < Test::Unit::TestCase
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
stored_messages_setp_1 = JSON.parse(pub_2.response)["stored_messages"].to_i
result = JSON.parse(pub_2.response)
stored_messages_setp_1 = result["stored_messages"].to_i
assert_equal(@max_message_buffer_length, stored_messages_setp_1, "Don't limit stored messages")
fail("Don't reached the limit of stored messages") if result["published_messages"].to_i <= @max_message_buffer_length
fail("Don't create any message") if stored_messages_setp_1 == 0
i = 0
......@@ -64,55 +67,15 @@ class TestCreateManyChannels < Test::Unit::TestCase
}
end
def config_test_message_cleanup_without_set_message_ttl
@store_messages = 'on'
@min_message_buffer_timeout = nil
@max_message_buffer_length = 5
def config_test_discard_old_messages
@memory_cleanup_timeout = '30s'
end
def test_message_cleanup_without_set_message_ttl
channel = 'ch_test_message_cleanup_without_set_message_ttl'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
EventMachine.run {
10.times do |i|
publish_message_inline(channel, headers, body)
if i == 9
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_1.callback {
assert_equal(200, pub_1.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_1.response_header.content_length, "Don't received channels statistics")
assert_equal(@max_message_buffer_length, JSON.parse(pub_1.response)["stored_messages"].to_i, "Don't store messages")
sleep(15) # wait cleanup timer to be executed one time
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
assert_equal(@max_message_buffer_length, JSON.parse(pub_2.response)["stored_messages"].to_i, "Don't store messages")
EventMachine.stop
}
}
end
end
add_test_timeout(20)
}
end
def config_test_message_cleanup_without_max_messages_stored_per_channel
@store_messages = 'on'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = nil
@memory_cleanup_timeout = '30s'
end
def test_message_cleanup_without_max_messages_stored_per_channel
channel = 'ch_test_message_cleanup_without_max_messages_stored_per_channel'
def test_discard_old_messages
channel = 'ch_test_discard_old_messages'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
messages_to_publish = 10
......@@ -153,10 +116,74 @@ class TestCreateManyChannels < Test::Unit::TestCase
}
end
def config_test_channel_cleanup
@min_message_buffer_timeout = '10s'
def config_test_message_cleanup_without_max_messages_stored_per_channel
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = nil
end
def test_message_cleanup_without_max_messages_stored_per_channel
channel = 'ch_test_message_cleanup_without_max_messages_stored_per_channel'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
EventMachine.run {
# ensure space for a subscriber after memory was full
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if pub_1.response_header.status == 500
}
end
}
EventMachine.run {
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
stored_messages_setp_1 = 0
stored_messages_setp_2 = 0
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
stored_messages_setp_1 = JSON.parse(pub_2.response)["stored_messages"].to_i
fail("Limited the number of stored messages") if stored_messages_setp_1 <= 100
fail("Don't create any message") if stored_messages_setp_1 == 0
i = 0
EM.add_periodic_timer(1) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback {
assert_equal(200, pub_3.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_3.response_header.content_length, "Don't received channels statistics")
stored_messages_setp_2 = JSON.parse(pub_3.response)["stored_messages"].to_i
if (stored_messages_setp_1 > stored_messages_setp_2)
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_4.callback {
EventMachine.stop if (pub_4.response_header.status == 200)
}
end
fail("Don't free the memory in 60 seconds") if (i == 60)
i += 1
}
end
}
add_test_timeout(65)
}
end
def config_test_channel_cleanup
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = nil
end
def test_channel_cleanup
......@@ -210,9 +237,11 @@ class TestCreateManyChannels < Test::Unit::TestCase
end
def config_test_message_cleanup_with_store_off_with_subscriber
@max_reserved_memory = "129k"
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = nil
@max_message_buffer_length = nil
end
def test_message_cleanup_with_store_off_with_subscriber
......@@ -247,9 +276,11 @@ class TestCreateManyChannels < Test::Unit::TestCase
end
def config_test_message_cleanup_with_store_off_without_subscriber
@max_reserved_memory = "129k"
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = nil
@max_message_buffer_length = nil
end
def test_message_cleanup_with_store_off_without_subscriber
......@@ -257,11 +288,13 @@ class TestCreateManyChannels < Test::Unit::TestCase
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
j = 0
EventMachine.run {
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + j.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if (pub_1.response_header.status == 500)
j += 1
}
end
}
......@@ -269,7 +302,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
i = 0
EventMachine.run {
EM.add_periodic_timer(1) do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + (j + 1).to_s).post :head => headers, :body => body, :timeout => 60
pub_2.callback {
fail("Don't free the memory in 60 seconds") if (i == 60)
EventMachine.stop if (pub_2.response_header.status == 200)
......
......@@ -82,7 +82,7 @@ class TestComunicationProperties < Test::Unit::TestCase
}
end
#message will be certainly expired at 15 seconds, (min_message_buffer_timeout / 3) + 1
#message will be certainly expired at 15 seconds
EM.add_timer(16) do
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :head => headers, :timeout => 60
sub_3.stream { |chunk|
......
......@@ -80,17 +80,6 @@ class TestSetuParameters < Test::Unit::TestCase
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_store_messages_cannot_be_set_without_set_max_message_buffer_length_or_min_message_buffer_timeout
expected_error_message = "push_stream_store_messages cannot be set without set max message buffer length or min message buffer timeout"
@store_messages = 'on'
@min_message_buffer_timeout = nil
@max_message_buffer_length = nil
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_broadcast_channel_max_qtd_cannot_be_zero
expected_error_message = "push_stream_broadcast_channel_max_qtd cannot be zero"
@broadcast_channel_max_qtd = 0
......
......@@ -316,7 +316,7 @@ class TestSubscriber < Test::Unit::TestCase
#create channel
publish_message(channel, headers, body)
sleep(2) #to ensure message was gone
sleep(5) #to ensure message was gone
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
......@@ -326,6 +326,7 @@ class TestSubscriber < Test::Unit::TestCase
assert_equal("Subscriber could not create channels.", sub_1.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
EventMachine.stop
}
add_test_timeout
}
end
......@@ -345,7 +346,7 @@ class TestSubscriber < Test::Unit::TestCase
#create channel
publish_message(channel, headers, body)
sleep(2) #to ensure message was gone
sleep(5) #to ensure message was gone
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '/' + broadcast_channel.to_s).get :head => headers, :timeout => 30
......@@ -355,6 +356,7 @@ class TestSubscriber < Test::Unit::TestCase
assert_equal("Subscriber could not create channels.", sub_1.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
EventMachine.stop
}
add_test_timeout
}
end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment