Commit 561f16aa authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

removing directive push_stream_subscriber_disconnect_interval, this value will...

removing directive push_stream_subscriber_disconnect_interval, this value will be based on push_stream_subscriber_connection_timeout directive
parent b8d36994
...@@ -85,8 +85,6 @@ http { ...@@ -85,8 +85,6 @@ http {
push_stream_authorized_channels_only off; push_stream_authorized_channels_only off;
# ping frequency # ping frequency
push_stream_ping_message_interval 10s; push_stream_ping_message_interval 10s;
# disconnection candidates test frequency
push_stream_subscriber_disconnect_interval 30s;
# connection ttl to enable recycle # connection ttl to enable recycle
push_stream_subscriber_connection_timeout 15m; push_stream_subscriber_connection_timeout 15m;
push_stream_broadcast_channel_prefix "broad_"; push_stream_broadcast_channel_prefix "broad_";
......
...@@ -86,12 +86,6 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -86,12 +86,6 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, ping_message_interval), offsetof(ngx_http_push_stream_loc_conf_t, ping_message_interval),
NULL }, NULL },
{ ngx_string("push_stream_subscriber_disconnect_interval"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, subscriber_disconnect_interval),
NULL },
{ ngx_string("push_stream_subscriber_connection_timeout"), { ngx_string("push_stream_subscriber_connection_timeout"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot, ngx_conf_set_sec_slot,
...@@ -316,7 +310,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -316,7 +310,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->max_number_of_channels, prev->max_number_of_channels, NGX_CONF_UNSET_UINT); ngx_conf_merge_uint_value(conf->max_number_of_channels, prev->max_number_of_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->max_number_of_broadcast_channels, prev->max_number_of_broadcast_channels, NGX_CONF_UNSET_UINT); ngx_conf_merge_uint_value(conf->max_number_of_broadcast_channels, prev->max_number_of_broadcast_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->memory_cleanup_interval, prev->memory_cleanup_interval, NGX_CONF_UNSET_MSEC); ngx_conf_merge_uint_value(conf->memory_cleanup_interval, prev->memory_cleanup_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_sec_value(conf->memory_cleanup_timeout, prev->memory_cleanup_timeout, NGX_CONF_UNSET); ngx_conf_merge_sec_value(conf->memory_cleanup_timeout, prev->memory_cleanup_timeout, NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT);
// sanity checks // sanity checks
...@@ -332,30 +326,12 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -332,30 +326,12 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
// subscriber disconnect interval cannot be zero
if ((conf->subscriber_disconnect_interval != NGX_CONF_UNSET_MSEC) && (conf->subscriber_disconnect_interval == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_disconnect_interval cannot be zero.");
return NGX_CONF_ERROR;
}
// subscriber connection timeout cannot be zero // subscriber connection timeout cannot be zero
if ((conf->subscriber_connection_timeout != NGX_CONF_UNSET) && (conf->subscriber_connection_timeout == 0)) { if ((conf->subscriber_connection_timeout != NGX_CONF_UNSET) && (conf->subscriber_connection_timeout == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_connection_timeout cannot be zero."); ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_connection_timeout cannot be zero.");
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
// subscriber disconnect interval cannot be set without a connection timeout
if ((conf->subscriber_disconnect_interval != NGX_CONF_UNSET_MSEC) && (conf->subscriber_disconnect_interval > 0) && (conf->subscriber_connection_timeout == NGX_CONF_UNSET)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "cannot set subscriber disconnect interval if push_stream_subscriber_connection_timeout is not set or zero.");
return NGX_CONF_ERROR;
}
// subscriber connection timeout cannot be set without a disconnect interval
if ((conf->subscriber_connection_timeout != NGX_CONF_UNSET) && (conf->subscriber_connection_timeout > 0) && (conf->subscriber_disconnect_interval == NGX_CONF_UNSET_MSEC)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "cannot set subscriber connection timeout if push_stream_subscriber_disconnect_interval is not set or zero.");
return NGX_CONF_ERROR;
}
// buffer timeout cannot be zero // buffer timeout cannot be zero
if ((conf->buffer_timeout != NGX_CONF_UNSET) && (conf->buffer_timeout == 0)) { if ((conf->buffer_timeout != NGX_CONF_UNSET) && (conf->buffer_timeout == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_min_message_buffer_timeout cannot be zero."); ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_min_message_buffer_timeout cannot be zero.");
...@@ -417,7 +393,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -417,7 +393,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
} }
// memory cleanup timeout cannot't be small // memory cleanup timeout cannot't be small
if ((conf->memory_cleanup_timeout != NGX_CONF_UNSET) && (conf->memory_cleanup_timeout < NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT)) { if (conf->memory_cleanup_timeout < NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "memory cleanup timeout cannot't be less than %d.", NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT); ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "memory cleanup timeout cannot't be less than %d.", NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT);
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
...@@ -436,11 +412,17 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -436,11 +412,17 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// calc memory cleanup interval // calc memory cleanup interval
if (conf->buffer_timeout != NGX_CONF_UNSET) { if (conf->buffer_timeout != NGX_CONF_UNSET) {
ngx_uint_t interval = conf->buffer_timeout / 3; ngx_uint_t interval = conf->buffer_timeout / 3;
conf->memory_cleanup_interval = (interval > 1) ? interval * 1000 : 1000; // min 1 second conf->memory_cleanup_interval = (interval > 1) ? (interval * 1000) + 1000 : 1000; // min 1 second
} else if (conf->memory_cleanup_interval == NGX_CONF_UNSET_MSEC) { } else if (conf->memory_cleanup_interval == NGX_CONF_UNSET_MSEC) {
conf->memory_cleanup_interval = 1000; // 1 second conf->memory_cleanup_interval = 1000; // 1 second
} }
// calc subscriber disconnect interval
if (conf->subscriber_connection_timeout != NGX_CONF_UNSET) {
ngx_uint_t interval = conf->subscriber_connection_timeout / 3;
conf->subscriber_disconnect_interval = (interval > 1) ? (interval * 1000) + 1000 : 1000; // min 1 second
}
// create ping message // create ping message
if ((conf->message_template.len > 0) && (ngx_http_push_stream_ping_buf == NULL)) { if ((conf->message_template.len > 0) && (ngx_http_push_stream_ping_buf == NULL)) {
if ((ngx_http_push_stream_ping_buf = ngx_http_push_stream_get_formatted_message(conf, NULL, NULL, cf->pool)) == NULL) { if ((ngx_http_push_stream_ping_buf = ngx_http_push_stream_get_formatted_message(conf, NULL, NULL, cf->pool)) == NULL) {
......
...@@ -144,7 +144,6 @@ module BaseTestCase ...@@ -144,7 +144,6 @@ module BaseTestCase
@ping_message_interval = '10s' @ping_message_interval = '10s'
@store_messages = 'on' @store_messages = 'on'
@subscriber_connection_timeout = nil @subscriber_connection_timeout = nil
@subscriber_disconnect_interval = nil
@memory_cleanup_timeout = '5m' @memory_cleanup_timeout = '5m'
end end
...@@ -269,8 +268,6 @@ http { ...@@ -269,8 +268,6 @@ http {
<%= "push_stream_authorized_channels_only #{@authorized_channels_only};" unless @authorized_channels_only.nil? %> <%= "push_stream_authorized_channels_only #{@authorized_channels_only};" unless @authorized_channels_only.nil? %>
# ping frequency # ping frequency
<%= "push_stream_ping_message_interval #{@ping_message_interval};" unless @ping_message_interval.nil? %> <%= "push_stream_ping_message_interval #{@ping_message_interval};" unless @ping_message_interval.nil? %>
# disconnection candidates test frequency
<%= "push_stream_subscriber_disconnect_interval #{@subscriber_disconnect_interval};" unless @subscriber_disconnect_interval.nil? %>
# connection ttl to enable recycle # connection ttl to enable recycle
<%= "push_stream_subscriber_connection_timeout #{@subscriber_connection_timeout};" unless @subscriber_connection_timeout.nil? %> <%= "push_stream_subscriber_connection_timeout #{@subscriber_connection_timeout};" unless @subscriber_connection_timeout.nil? %>
<%= %{push_stream_broadcast_channel_prefix "#{@broadcast_channel_prefix}";} unless @broadcast_channel_prefix.nil? %> <%= %{push_stream_broadcast_channel_prefix "#{@broadcast_channel_prefix}";} unless @broadcast_channel_prefix.nil? %>
......
...@@ -89,7 +89,8 @@ class TestComunicationProperties < Test::Unit::TestCase ...@@ -89,7 +89,8 @@ class TestComunicationProperties < Test::Unit::TestCase
fail_if_connecttion_error(sub_2) fail_if_connecttion_error(sub_2)
end end
EM.add_timer(13) do #message will be certainly expired at 15 seconds, (min_message_buffer_timeout / 3) + 1
EM.add_timer(15) do
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :head => headers, :timeout => 60 sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :head => headers, :timeout => 60
sub_3.stream { |chunk| sub_3.stream { |chunk|
response_3 += chunk response_3 += chunk
...@@ -98,7 +99,7 @@ class TestComunicationProperties < Test::Unit::TestCase ...@@ -98,7 +99,7 @@ class TestComunicationProperties < Test::Unit::TestCase
fail_if_connecttion_error(sub_3) fail_if_connecttion_error(sub_3)
end end
EM.add_timer(14) do EM.add_timer(16) do
assert_equal("#{@header_template}\r\n#{body}\r\n", response_1, "Didn't received header and message") assert_equal("#{@header_template}\r\n#{body}\r\n", response_1, "Didn't received header and message")
assert_equal("#{@header_template}\r\n#{body}\r\n", response_2, "Didn't received header and message") assert_equal("#{@header_template}\r\n#{body}\r\n", response_2, "Didn't received header and message")
assert_equal("#{@header_template}\r\n", response_3, "Didn't received header") assert_equal("#{@header_template}\r\n", response_3, "Didn't received header")
......
...@@ -37,15 +37,6 @@ class TestSetuParameters < Test::Unit::TestCase ...@@ -37,15 +37,6 @@ class TestSetuParameters < Test::Unit::TestCase
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'") assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end end
def test_subscriber_disconnect_interval_cannot_be_zero
expected_error_message = "push_stream_subscriber_disconnect_interval cannot be zero"
@subscriber_disconnect_interval = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_subscriber_connection_timeout_cannot_be_zero def test_subscriber_connection_timeout_cannot_be_zero
expected_error_message = "push_stream_subscriber_connection_timeout cannot be zero" expected_error_message = "push_stream_subscriber_connection_timeout cannot be zero"
@subscriber_connection_timeout = 0 @subscriber_connection_timeout = 0
...@@ -55,24 +46,6 @@ class TestSetuParameters < Test::Unit::TestCase ...@@ -55,24 +46,6 @@ class TestSetuParameters < Test::Unit::TestCase
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'") assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end end
def test_subscriber_disconnect_interval_cannot_be_set_without_a_connection_timeout
expected_error_message = "cannot set subscriber disconnect interval if push_stream_subscriber_connection_timeout is not set or zero"
@subscriber_disconnect_interval = "1s"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_subscriber_connection_timeout_cannot_be_set_without_a_disconnect_interval
expected_error_message = "cannot set subscriber connection timeout if push_stream_subscriber_disconnect_interval is not set or zero"
@subscriber_connection_timeout = "1s"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_max_channel_id_length_cannot_be_zero def test_max_channel_id_length_cannot_be_zero
expected_error_message = "push_stream_max_channel_id_length cannot be zero" expected_error_message = "push_stream_max_channel_id_length cannot be zero"
@max_channel_id_length = 0 @max_channel_id_length = 0
......
...@@ -5,7 +5,6 @@ class TestPublisher < Test::Unit::TestCase ...@@ -5,7 +5,6 @@ class TestPublisher < Test::Unit::TestCase
def config_test_accepted_methods def config_test_accepted_methods
@subscriber_connection_timeout = '1s' @subscriber_connection_timeout = '1s'
@subscriber_disconnect_interval = '1s'
end end
def test_accepted_methods def test_accepted_methods
...@@ -63,7 +62,6 @@ class TestPublisher < Test::Unit::TestCase ...@@ -63,7 +62,6 @@ class TestPublisher < Test::Unit::TestCase
def config_test_multi_channels def config_test_multi_channels
@subscriber_connection_timeout = '1s' @subscriber_connection_timeout = '1s'
@subscriber_disconnect_interval = '1s'
end end
def test_multi_channels def test_multi_channels
...@@ -128,7 +126,6 @@ class TestPublisher < Test::Unit::TestCase ...@@ -128,7 +126,6 @@ class TestPublisher < Test::Unit::TestCase
def config_test_broadcast_channels_without_common_channel def config_test_broadcast_channels_without_common_channel
@subscriber_connection_timeout = '1s' @subscriber_connection_timeout = '1s'
@subscriber_disconnect_interval = '1s'
@broadcast_channel_prefix = "bd_" @broadcast_channel_prefix = "bd_"
end end
...@@ -170,7 +167,6 @@ class TestPublisher < Test::Unit::TestCase ...@@ -170,7 +167,6 @@ class TestPublisher < Test::Unit::TestCase
def config_test_broadcast_channels_with_common_channels def config_test_broadcast_channels_with_common_channels
@subscriber_connection_timeout = '1s' @subscriber_connection_timeout = '1s'
@subscriber_disconnect_interval = '1s'
@authorized_channels_only = "off" @authorized_channels_only = "off"
@broadcast_channel_prefix = "bd_" @broadcast_channel_prefix = "bd_"
@broadcast_channel_max_qtd = 2 @broadcast_channel_max_qtd = 2
...@@ -233,7 +229,6 @@ class TestPublisher < Test::Unit::TestCase ...@@ -233,7 +229,6 @@ class TestPublisher < Test::Unit::TestCase
def config_test_subscribe_an_existing_channel_with_authorized_only_on def config_test_subscribe_an_existing_channel_with_authorized_only_on
@authorized_channels_only = 'on' @authorized_channels_only = 'on'
@subscriber_connection_timeout = '1s' @subscriber_connection_timeout = '1s'
@subscriber_disconnect_interval = '1s'
end end
def test_subscribe_an_existing_channel_with_authorized_only_on def test_subscribe_an_existing_channel_with_authorized_only_on
...@@ -257,7 +252,6 @@ class TestPublisher < Test::Unit::TestCase ...@@ -257,7 +252,6 @@ class TestPublisher < Test::Unit::TestCase
def config_test_subscribe_an_existing_channel_and_absent_broadcast_channel_with_authorized_only_on def config_test_subscribe_an_existing_channel_and_absent_broadcast_channel_with_authorized_only_on
@authorized_channels_only = 'on' @authorized_channels_only = 'on'
@subscriber_connection_timeout = '1s' @subscriber_connection_timeout = '1s'
@subscriber_disconnect_interval = '1s'
@broadcast_channel_prefix = "bd_" @broadcast_channel_prefix = "bd_"
@broadcast_channel_max_qtd = 1 @broadcast_channel_max_qtd = 1
end end
......
...@@ -5,7 +5,6 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase ...@@ -5,7 +5,6 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase
def config_test_subscriber_connection_timeout def config_test_subscriber_connection_timeout
@subscriber_connection_timeout = "37s" @subscriber_connection_timeout = "37s"
@subscriber_disconnect_interval = "1s"
@header_template = "HEADER_TEMPLATE" @header_template = "HEADER_TEMPLATE"
@ping_message_interval = nil @ping_message_interval = nil
end end
...@@ -24,21 +23,20 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase ...@@ -24,21 +23,20 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase
sub.callback { sub.callback {
stop = Time.now stop = Time.now
elapsed = time_diff_sec(start, stop) elapsed = time_diff_sec(start, stop)
assert(elapsed >= 37 && elapsed <= 38.5, "Disconnect was in #{elapsed} seconds") assert(elapsed >= 38 && elapsed <= 39.5, "Disconnect was in #{elapsed} seconds")
EventMachine.stop EventMachine.stop
} }
fail_if_connecttion_error(sub) fail_if_connecttion_error(sub)
} }
end end
def config_test_subscriber_disconnect_interval def config_test_subscriber_connection_timeout_with_ping_message
@subscriber_connection_timeout = "37s" @subscriber_connection_timeout = "37s"
@ping_message_interval = "5s" @ping_message_interval = "5s"
@subscriber_disconnect_interval = "2s"
end end
def test_subscriber_disconnect_interval def test_subscriber_connection_timeout_with_ping_message
channel = 'ch_test_subscriber_disconnect_interval' channel = 'ch_test_subscriber_connection_timeout_with_ping_message'
headers = {'accept' => 'text/html'} headers = {'accept' => 'text/html'}
start = Time.now start = Time.now
......
...@@ -6,7 +6,6 @@ class TestSubscriberProperties < Test::Unit::TestCase ...@@ -6,7 +6,6 @@ class TestSubscriberProperties < Test::Unit::TestCase
def config_test_header_template def config_test_header_template
@header_template = "HEADER\r\nTEMPLATE\r\n1234\r\n" @header_template = "HEADER\r\nTEMPLATE\r\n1234\r\n"
@authorized_channels_only = "off" @authorized_channels_only = "off"
@subscriber_disconnect_interval = nil
end end
def test_header_template def test_header_template
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment