Commit 34d222ed authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

add push_stream_channel_inactivity_time directive

parent 3b89b897
...@@ -114,6 +114,7 @@ h1(#directives). Directives ...@@ -114,6 +114,7 @@ h1(#directives). Directives
| "push_stream_shared_memory_cleanup_objects_ttl":push_stream_shared_memory_cleanup_objects_ttl |   - |   x |   - |   - |   - |   - | | "push_stream_shared_memory_cleanup_objects_ttl":push_stream_shared_memory_cleanup_objects_ttl |   - |   x |   - |   - |   - |   - |
| "push_stream_channel_deleted_message_text":push_stream_channel_deleted_message_text |   - |   x |   - |   - |   - |   - | | "push_stream_channel_deleted_message_text":push_stream_channel_deleted_message_text |   - |   x |   - |   - |   - |   - |
| "push_stream_ping_message_text":push_stream_ping_message_text |   - |   x |   - |   - |   - |   - | | "push_stream_ping_message_text":push_stream_ping_message_text |   - |   x |   - |   - |   - |   - |
| "push_stream_channel_inactivity_time":push_stream_channel_inactivity_time |   - |   x |   - |   - |   - |   - |
| "push_stream_message_ttl":push_stream_message_ttl |   - |   x |   - |   - |   - |   - | | "push_stream_message_ttl":push_stream_message_ttl |   - |   x |   - |   - |   - |   - |
| "push_stream_max_subscribers_per_channel":push_stream_max_subscribers_per_channel |   - |   x |   - |   - |   - |   - | | "push_stream_max_subscribers_per_channel":push_stream_max_subscribers_per_channel |   - |   x |   - |   - |   - |   - |
| "push_stream_max_messages_stored_per_channel":push_stream_max_messages_stored_per_channel |   - |   x |   - |   - |   - |   - | | "push_stream_max_messages_stored_per_channel":push_stream_max_messages_stored_per_channel |   - |   x |   - |   - |   - |   - |
...@@ -225,6 +226,7 @@ h1(#contributors). Contributors ...@@ -225,6 +226,7 @@ h1(#contributors). Contributors
[push_stream_shared_memory_cleanup_objects_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_shared_memory_cleanup_objects_ttl [push_stream_shared_memory_cleanup_objects_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_shared_memory_cleanup_objects_ttl
[push_stream_channel_deleted_message_text]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_channel_deleted_message_text [push_stream_channel_deleted_message_text]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_channel_deleted_message_text
[push_stream_ping_message_text]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_ping_message_text [push_stream_ping_message_text]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_ping_message_text
[push_stream_channel_inactivity_time]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_channel_inactivity_time
[push_stream_message_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_message_ttl [push_stream_message_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_message_ttl
[push_stream_max_subscribers_per_channel]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_max_subscribers_per_channel [push_stream_max_subscribers_per_channel]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_max_subscribers_per_channel
[push_stream_max_messages_stored_per_channel]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_max_messages_stored_per_channel [push_stream_max_messages_stored_per_channel]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_max_messages_stored_per_channel
......
...@@ -50,6 +50,20 @@ h2(#push_stream_ping_message_text). push_stream_ping_message_text <a name="push_ ...@@ -50,6 +50,20 @@ h2(#push_stream_ping_message_text). push_stream_ping_message_text <a name="push_
The string used on ping message sent to subscribers. The string used on ping message sent to subscribers.
h2(#push_stream_channel_inactivity_time). push_stream_channel_inactivity_time <a name="push_stream_channel_inactivity_time" href="#">&nbsp;</a>
*syntax:* _push_stream_channel_inactivity_time time_
*default:* _30s_
*context:* _http_
*release version:* _0.3.5_
The length of time after what a channel will be considered inactive, counted after the last message was published on it or the last subscriber leave it.
After this time the channel will no longer be available and will be moved to the trash queue.
h2(#push_stream_message_ttl). push_stream_message_ttl <a name="push_stream_message_ttl" href="#">&nbsp;</a> h2(#push_stream_message_ttl). push_stream_message_ttl <a name="push_stream_message_ttl" href="#">&nbsp;</a>
*syntax:* _push_stream_message_ttl time_ *syntax:* _push_stream_message_ttl time_
......
...@@ -53,6 +53,7 @@ typedef struct { ...@@ -53,6 +53,7 @@ typedef struct {
ngx_msec_t memory_cleanup_interval; ngx_msec_t memory_cleanup_interval;
time_t shm_cleanup_objects_ttl; time_t shm_cleanup_objects_ttl;
ngx_str_t channel_deleted_message_text; ngx_str_t channel_deleted_message_text;
time_t channel_inactivity_time;
ngx_str_t ping_message_text; ngx_str_t ping_message_text;
ngx_uint_t qtd_templates; ngx_uint_t qtd_templates;
ngx_str_t broadcast_channel_prefix; ngx_str_t broadcast_channel_prefix;
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#define NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL 5000 // 5 seconds #define NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL 5000 // 5 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 10; // 10 seconds static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 10; // 10 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TTL = 1800; // 30 minutes static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TTL = 1800; // 30 minutes
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_CHANNEL_INACTIVITY_TIME = 30; // 30 seconds
#define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE "" #define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE "~text~" #define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE "~text~"
......
...@@ -16,7 +16,7 @@ describe "Cleanup Memory" do ...@@ -16,7 +16,7 @@ describe "Cleanup Memory" do
shared_examples_for "executing on normal conditions" do shared_examples_for "executing on normal conditions" do
it "should cleanup memory used for published message" do it "should cleanup memory used for published message", :cleanup => true do
channel = 'ch_test_message_cleanup' channel = 'ch_test_message_cleanup'
body = 'message to create a channel' body = 'message to create a channel'
expected_time_for_clear = 45 expected_time_for_clear = 45
...@@ -96,7 +96,7 @@ describe "Cleanup Memory" do ...@@ -96,7 +96,7 @@ describe "Cleanup Memory" do
end end
end end
it "should discard old messages" do it "should discard old messages", :cleanup => true do
channel = 'ch_test_discard_old_messages' channel = 'ch_test_discard_old_messages'
body = 'message to create a channel' body = 'message to create a channel'
messages_to_publish = 10 messages_to_publish = 10
...@@ -137,7 +137,7 @@ describe "Cleanup Memory" do ...@@ -137,7 +137,7 @@ describe "Cleanup Memory" do
end end
end end
it "should cleanup message memory without max messages stored per channelXXX" do it "should cleanup message memory without max messages stored per channelXXX", :cleanup => true do
channel = 'ch_test_message_cleanup_without_max_messages_stored_per_chann' channel = 'ch_test_message_cleanup_without_max_messages_stored_per_chann'
body = 'message to create a channel' body = 'message to create a channel'
expected_time_for_clear = 45 expected_time_for_clear = 45
...@@ -211,7 +211,7 @@ describe "Cleanup Memory" do ...@@ -211,7 +211,7 @@ describe "Cleanup Memory" do
end end
end end
it "should cleanup memory used for create channels" do it "should cleanup memory used for create channels", :cleanup => true do
channel = 'ch_test_channel_cleanup_' channel = 'ch_test_channel_cleanup_'
body = 'message to create a channel' body = 'message to create a channel'
...@@ -282,7 +282,7 @@ describe "Cleanup Memory" do ...@@ -282,7 +282,7 @@ describe "Cleanup Memory" do
end end
end end
it "should cleanup memory used for publish messages with store 'off' and with subscriber" do it "should cleanup memory used for publish messages with store 'off' and with subscriber", :cleanup => true do
channel = 'ch_test_message_cleanup_with_store_off_with_subscriber' channel = 'ch_test_message_cleanup_with_store_off_with_subscriber'
body = 'message to create a channel' body = 'message to create a channel'
expected_time_for_clear = 35 expected_time_for_clear = 35
...@@ -351,7 +351,7 @@ describe "Cleanup Memory" do ...@@ -351,7 +351,7 @@ describe "Cleanup Memory" do
end end
end end
it "should cleanup memory used for publish messages with store 'off' and without subscriber" do it "should cleanup memory used for publish messages with store 'off' and without subscriber", :cleanup => true do
channel = 'ch_test_message_cleanup_with_store_off_without_subscriber' channel = 'ch_test_message_cleanup_with_store_off_without_subscriber'
body = 'message to create a channel' body = 'message to create a channel'
expected_time_for_clear = 65 expected_time_for_clear = 65
...@@ -419,7 +419,7 @@ describe "Cleanup Memory" do ...@@ -419,7 +419,7 @@ describe "Cleanup Memory" do
end end
end end
it "should cleanup memory used after delete created channels" do it "should cleanup memory used after delete created channels", :cleanup => true do
channel = 'ch_test_channel_cleanup_after_delete' channel = 'ch_test_channel_cleanup_after_delete'
body = 'message to create a channel' body = 'message to create a channel'
expected_time_for_clear = 35 expected_time_for_clear = 35
...@@ -475,7 +475,7 @@ describe "Cleanup Memory" do ...@@ -475,7 +475,7 @@ describe "Cleanup Memory" do
end end
end end
it "should cleanup memory used after delete created channels with same id" do it "should cleanup memory used after delete created channels with same id", :cleanup => true do
channel = 'ch_test_channel_cleanup_after_delete_same_id' channel = 'ch_test_channel_cleanup_after_delete_same_id'
body = 'message to create a channel' body = 'message to create a channel'
expected_time_for_clear = 35 expected_time_for_clear = 35
...@@ -575,6 +575,71 @@ describe "Cleanup Memory" do ...@@ -575,6 +575,71 @@ describe "Cleanup Memory" do
{'accept' => 'text/html'} {'accept' => 'text/html'}
end end
context "when moving inactive channels to trash" do
it "should wait 30s by default" do
channel = 'ch_move_inactive_channels'
body = 'body'
nginx_run_server(config.merge(:store_messages => "off"), :timeout => 40) do |conf|
EventMachine.run do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body
pub_1.callback do
pub_1.should be_http_status(200).with_body
start = Time.now
timer = EventMachine::PeriodicTimer.new(1) do
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
stats.callback do
stats.should be_http_status(200).with_body
response = JSON.parse(stats.response)
if response["channels"].to_i != 1
stop = Time.now
time_diff_sec(start, stop).should be_within(5).of(30)
response["channels_in_trash"].to_i.should eql(1)
response["channels"].to_i.should eql(0)
EventMachine.stop
end
end
end
end
end
end
end
it "should be possible change the default value" do
channel = 'ch_move_inactive_channels_with_custom_value'
body = 'body'
nginx_run_server(config.merge(:store_messages => "off", :channel_inactivity_time => "5s"), :timeout => 10) do |conf|
EventMachine.run do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body
pub_1.callback do
pub_1.should be_http_status(200).with_body
start = Time.now
timer = EventMachine::PeriodicTimer.new(1) do
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
stats.callback do
stats.should be_http_status(200).with_body
response = JSON.parse(stats.response)
if response["channels"].to_i != 1
stop = Time.now
time_diff_sec(start, stop).should be_within(3).of(5)
response["channels_in_trash"].to_i.should eql(1)
response["channels"].to_i.should eql(0)
EventMachine.stop
end
end
end
end
end
end
end
#after the last published message
end
context "when nothing strange occur" do context "when nothing strange occur" do
def execute_changes_on_environment(conf, &block) def execute_changes_on_environment(conf, &block)
#nothing strange happens #nothing strange happens
......
...@@ -51,6 +51,7 @@ module NginxConfiguration ...@@ -51,6 +51,7 @@ module NginxConfiguration
:client_body_buffer_size => '32k', :client_body_buffer_size => '32k',
:channel_info_on_publish => "on", :channel_info_on_publish => "on",
:channel_inactivity_time => nil,
:extra_location => '' :extra_location => ''
} }
...@@ -131,6 +132,7 @@ http { ...@@ -131,6 +132,7 @@ http {
<%= write_directive("push_stream_channel_deleted_message_text", channel_deleted_message_text) %> <%= write_directive("push_stream_channel_deleted_message_text", channel_deleted_message_text) %>
<%= write_directive("push_stream_ping_message_text", ping_message_text) %> <%= write_directive("push_stream_ping_message_text", ping_message_text) %>
<%= write_directive("push_stream_channel_inactivity_time", channel_inactivity_time) %>
server { server {
listen <%= nginx_port %>; listen <%= nginx_port %>;
......
...@@ -72,6 +72,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -72,6 +72,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_MAIN_CONF_OFFSET, NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, channel_deleted_message_text), offsetof(ngx_http_push_stream_main_conf_t, channel_deleted_message_text),
NULL }, NULL },
{ ngx_string("push_stream_channel_inactivity_time"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, channel_inactivity_time),
NULL },
{ ngx_string("push_stream_ping_message_text"), { ngx_string("push_stream_ping_message_text"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot, ngx_conf_set_str_slot,
...@@ -414,6 +420,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf) ...@@ -414,6 +420,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->memory_cleanup_interval = NGX_CONF_UNSET_MSEC; mcf->memory_cleanup_interval = NGX_CONF_UNSET_MSEC;
mcf->shm_cleanup_objects_ttl = NGX_CONF_UNSET; mcf->shm_cleanup_objects_ttl = NGX_CONF_UNSET;
mcf->channel_deleted_message_text.data = NULL; mcf->channel_deleted_message_text.data = NULL;
mcf->channel_inactivity_time = NGX_CONF_UNSET;
mcf->ping_message_text.data = NULL; mcf->ping_message_text.data = NULL;
mcf->broadcast_channel_prefix.data = NULL; mcf->broadcast_channel_prefix.data = NULL;
mcf->max_number_of_channels = NGX_CONF_UNSET_UINT; mcf->max_number_of_channels = NGX_CONF_UNSET_UINT;
...@@ -443,6 +450,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent) ...@@ -443,6 +450,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
ngx_conf_init_value(conf->message_ttl, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TTL); ngx_conf_init_value(conf->message_ttl, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TTL);
ngx_conf_init_value(conf->shm_cleanup_objects_ttl, NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL); ngx_conf_init_value(conf->shm_cleanup_objects_ttl, NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL);
ngx_conf_init_size_value(conf->shm_size, NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE); ngx_conf_init_size_value(conf->shm_size, NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE);
ngx_conf_init_value(conf->channel_inactivity_time, NGX_HTTP_PUSH_STREAM_DEFAULT_CHANNEL_INACTIVITY_TIME);
ngx_conf_merge_str_value(conf->channel_deleted_message_text, conf->channel_deleted_message_text, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT); ngx_conf_merge_str_value(conf->channel_deleted_message_text, conf->channel_deleted_message_text, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT);
ngx_conf_merge_str_value(conf->ping_message_text, conf->ping_message_text, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT); ngx_conf_merge_str_value(conf->ping_message_text, conf->ping_message_text, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT);
ngx_conf_merge_str_value(conf->broadcast_channel_prefix, conf->broadcast_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX); ngx_conf_merge_str_value(conf->broadcast_channel_prefix, conf->broadcast_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX);
......
...@@ -718,7 +718,8 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s ...@@ -718,7 +718,8 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
continue; continue;
} }
if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->last_activity_time + 30 < ngx_time())) { if ((channel->stored_messages == 0) && (channel->subscribers == 0) &&
(channel->last_activity_time + ngx_http_push_stream_module_main_conf->channel_inactivity_time < ngx_time())) {
// go back one node on queue, since the current node will be removed // go back one node on queue, since the current node will be removed
cur = prev; cur = prev;
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment