Commit 486d49fa authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding push_stream_longpolling_connection_ttl directive and make...

adding push_stream_longpolling_connection_ttl directive and make push_stream_ping_message_interval and push_stream_subscriber_connection_ttl directives available in location context
parent a9a45321
......@@ -399,29 +399,6 @@ h2(#push_stream_max_channel_id_length). push_stream_max_channel_id_length
Maximum permissible channel id length (number of characters). Longer ids will receive an 400 Bad Request response. If you do not want to limit channel id length, just not set this directive.
h2(#push_stream_ping_message_interval). push_stream_ping_message_interval
*syntax:* _push_stream_ping_message_interval time_
*default:* _none_
*context:* _http_
The time interval in which a keepalive message is sent to subscribers. If you do not want to send ping messages, just not set this directive.
h2(#push_stream_subscriber_connection_ttl). push_stream_subscriber_connection_ttl
*syntax:* _push_stream_subscriber_connection_ttl time_
*default:* _none_
*context:* _http_
The length of time a subscriber will stay connected before it is considered expired and disconnected. If you do not want subscribers to be automatically disconnected, just not set this directive.
But, this operation is very important to help Nginx recycle memory consumed to send messages to susbscriber, allocated at pool request.
h2(#push_stream_max_number_of_channels). push_stream_max_number_of_channels
*syntax:* _push_stream_max_number_of_channels number_
......@@ -562,6 +539,43 @@ h2(#push_stream_eventsource_support). push_stream_eventsource_support
Enable "Event Source":eventsource support for subscribers.
h2(#push_stream_ping_message_interval). push_stream_ping_message_interval
*syntax:* _push_stream_ping_message_interval time_
*default:* _none_
*context:* _location (push_stream_subscriber)_
The time interval in which a keepalive message is sent to subscribers. If you do not want to send ping messages, just not set this directive.
h2(#push_stream_subscriber_connection_ttl). push_stream_subscriber_connection_ttl
*syntax:* _push_stream_subscriber_connection_ttl time_
*default:* _none_
*context:* _location (push_stream_subscriber)_
The length of time a subscriber will stay connected before it is considered expired and disconnected. If you do not want subscribers to be automatically disconnected, just not set this directive.
But, this operation is very important to help Nginx recycle memory consumed to send messages to susbscriber, allocated at pool request.
h2(#push_stream_longpolling_connection_ttl). push_stream_longpolling_connection_ttl
*syntax:* _push_stream_longpolling_connection_ttl time_
*default:* _value in push_stream_subscriber_connection_ttl_
*context:* _location (push_stream_subscriber)_
*release version:* _0.3.1_
The length of time a long polling subscriber will stay connected waiting for a message before it is disconnected. If you do not want subscribers to be automatically disconnected, just not set this directive and push_stream_longpolling_connection_ttl directive.
But, this operation is very important to help Nginx recycle memory consumed to send messages to susbscriber, allocated at pool request.
h1(#attention). Attention
This module controls everything needed to send the messages to subscribers.
......
......@@ -49,8 +49,6 @@ typedef struct {
ngx_str_t broadcast_channel_prefix;
ngx_uint_t max_number_of_channels;
ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_connection_ttl;
ngx_msec_t buffer_cleanup_interval;
time_t message_ttl;
ngx_uint_t max_subscribers_per_channel;
......@@ -74,6 +72,9 @@ typedef struct {
ngx_uint_t publisher_admin;
ngx_flag_t eventsource_support;
ngx_uint_t subscriber_mode;
ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_connection_ttl;
ngx_msec_t longpolling_connection_ttl;
} ngx_http_push_stream_loc_conf_t;
// shared memory segment name
......
......@@ -94,18 +94,6 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, max_channel_id_length),
NULL },
{ ngx_string("push_stream_ping_message_interval"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, ping_message_interval),
NULL },
{ ngx_string("push_stream_subscriber_connection_ttl"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, subscriber_connection_ttl),
NULL },
{ ngx_string("push_stream_max_number_of_channels"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
......@@ -180,6 +168,24 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, eventsource_support),
NULL },
{ ngx_string("push_stream_ping_message_interval"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, ping_message_interval),
NULL },
{ ngx_string("push_stream_subscriber_connection_ttl"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, subscriber_connection_ttl),
NULL },
{ ngx_string("push_stream_longpolling_connection_ttl"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, longpolling_connection_ttl),
NULL },
ngx_null_command
};
......@@ -336,8 +342,6 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->buffer_cleanup_interval = NGX_CONF_UNSET_MSEC;
mcf->message_ttl = NGX_CONF_UNSET;
mcf->max_channel_id_length = NGX_CONF_UNSET_UINT;
mcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
mcf->subscriber_connection_ttl = NGX_CONF_UNSET_MSEC;
mcf->max_subscribers_per_channel = NGX_CONF_UNSET;
mcf->max_messages_stored_per_channel = NGX_CONF_UNSET_UINT;
mcf->qtd_templates = 0;
......@@ -361,12 +365,6 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
ngx_conf_merge_str_value(conf->broadcast_channel_prefix, conf->broadcast_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX);
// sanity checks
// ping message interval cannot be zero
if ((conf->ping_message_interval != NGX_CONF_UNSET_MSEC) && (conf->ping_message_interval == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_ping_message_interval cannot be zero.");
return NGX_CONF_ERROR;
}
// memory cleanup objects ttl cannot't be small
if (conf->shm_cleanup_objects_ttl < NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "memory cleanup objects ttl cannot't be less than %d.", NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL);
......@@ -385,12 +383,6 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
return NGX_CONF_ERROR;
}
// subscriber connection ttl cannot be zero
if ((conf->subscriber_connection_ttl != NGX_CONF_UNSET_MSEC) && (conf->subscriber_connection_ttl == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_connection_ttl cannot be zero.");
return NGX_CONF_ERROR;
}
// message ttl cannot be zero
if ((conf->message_ttl != NGX_CONF_UNSET) && (conf->message_ttl == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_message_ttl cannot be zero.");
......@@ -453,6 +445,9 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->publisher_admin = NGX_CONF_UNSET_UINT;
lcf->eventsource_support = NGX_CONF_UNSET_UINT;
lcf->subscriber_mode = NGX_CONF_UNSET_UINT;
lcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
lcf->subscriber_connection_ttl = NGX_CONF_UNSET_MSEC;
lcf->longpolling_connection_ttl = NGX_CONF_UNSET_MSEC;
return lcf;
}
......@@ -473,6 +468,9 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->keepalive, prev->keepalive, 0);
ngx_conf_merge_uint_value(conf->publisher_admin, prev->publisher_admin, 0);
ngx_conf_merge_value(conf->eventsource_support, prev->eventsource_support, 0);
ngx_conf_merge_msec_value(conf->ping_message_interval, prev->ping_message_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_msec_value(conf->subscriber_connection_ttl, prev->subscriber_connection_ttl, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_msec_value(conf->longpolling_connection_ttl, prev->longpolling_connection_ttl, conf->subscriber_connection_ttl);
// changing properties for event source support
if (conf->eventsource_support) {
......@@ -519,6 +517,24 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// sanity checks
// ping message interval cannot be zero
if ((conf->ping_message_interval != NGX_CONF_UNSET_MSEC) && (conf->ping_message_interval == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_ping_message_interval cannot be zero.");
return NGX_CONF_ERROR;
}
// subscriber connection ttl cannot be zero
if ((conf->subscriber_connection_ttl != NGX_CONF_UNSET_MSEC) && (conf->subscriber_connection_ttl == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_connection_ttl cannot be zero.");
return NGX_CONF_ERROR;
}
// long polling connection ttl cannot be zero
if ((conf->longpolling_connection_ttl != NGX_CONF_UNSET_MSEC) && (conf->longpolling_connection_ttl == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_longpolling_connection_ttl cannot be zero.");
return NGX_CONF_ERROR;
}
// message template cannot be blank
if (conf->message_template.len == 0) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_message_template cannot be blank.");
......
......@@ -27,7 +27,7 @@
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool);
static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log);
......@@ -165,7 +165,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
ngx_shmtx_lock(&shpool->mutex);
rc = ngx_http_push_stream_registry_subscriber_locked(worker_subscriber);
rc = ngx_http_push_stream_registry_subscriber_locked(r, worker_subscriber);
ngx_shmtx_unlock(&shpool->mutex);
if (rc == NGX_ERROR) {
......@@ -238,7 +238,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
}
worker_subscriber->longpolling = 1;
if (ngx_http_push_stream_registry_subscriber_locked(worker_subscriber) == NGX_ERROR) {
if (ngx_http_push_stream_registry_subscriber_locked(r, worker_subscriber) == NGX_ERROR) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -483,30 +483,31 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
}
static ngx_int_t
ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber)
ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_worker_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_ctx_t *ctx;
ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl;
// adding subscriber to woker list of subscribers
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel->queue, &worker_subscriber->queue);
if ((ngx_http_push_stream_module_main_conf->subscriber_connection_ttl != NGX_CONF_UNSET_MSEC) ||
(ngx_http_push_stream_module_main_conf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if ((connection_ttl != NGX_CONF_UNSET_MSEC) || (cf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if ((ctx = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_http_push_stream_subscriber_ctx_t))) == NULL) {
return NGX_ERROR;
}
ctx->longpolling = worker_subscriber->longpolling;
if (ngx_http_push_stream_module_main_conf->subscriber_connection_ttl != NGX_CONF_UNSET_MSEC) {
if (connection_ttl != NGX_CONF_UNSET_MSEC) {
if ((ctx->disconnect_timer = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_event_t))) == NULL) {
return NGX_ERROR;
}
}
if ((!ctx->longpolling) && (ngx_http_push_stream_module_main_conf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if ((!ctx->longpolling) && (cf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if ((ctx->ping_timer = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_event_t))) == NULL) {
return NGX_ERROR;
}
......@@ -516,14 +517,14 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subs
ctx->disconnect_timer->handler = ngx_http_push_stream_disconnect_timer_wake_handler;
ctx->disconnect_timer->data = worker_subscriber->request;
ctx->disconnect_timer->log = worker_subscriber->request->connection->log;
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->subscriber_connection_ttl, ctx->disconnect_timer);
ngx_http_push_stream_timer_reset(connection_ttl, ctx->disconnect_timer);
}
if (ctx->ping_timer != NULL) {
ctx->ping_timer->handler = ngx_http_push_stream_ping_timer_wake_handler;
ctx->ping_timer->data = worker_subscriber->request;
ctx->ping_timer->log = worker_subscriber->request->connection->log;
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->ping_message_interval, ctx->ping_timer);
ngx_http_push_stream_timer_reset(cf->ping_message_interval, ctx->ping_timer);
}
ngx_http_set_ctx(worker_subscriber->request, ctx, ngx_http_push_stream_module);
......
......@@ -746,7 +746,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
ngx_http_push_stream_send_response_finalize(r);
} else {
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->ping_message_interval, ctx->ping_timer);
ngx_http_push_stream_timer_reset(pslcf->ping_message_interval, ctx->ping_timer);
}
}
......
......@@ -150,6 +150,7 @@ module BaseTestCase
@ping_message_interval = '10s'
@store_messages = 'on'
@subscriber_connection_timeout = nil
@longpolling_connection_timeout = nil
@memory_cleanup_timeout = '5m'
@config_template = nil
@keepalive = 'off'
......@@ -276,6 +277,8 @@ http {
<%= "push_stream_ping_message_interval #{@ping_message_interval};" unless @ping_message_interval.nil? %>
# connection ttl to enable recycle
<%= "push_stream_subscriber_connection_ttl #{@subscriber_connection_timeout};" unless @subscriber_connection_timeout.nil? %>
# timeout for long polling connections
<%= "push_stream_longpolling_connection_ttl #{@longpolling_connection_ttl};" unless @longpolling_connection_ttl.nil? %>
server {
listen <%=nginx_port%>;
......
......@@ -34,6 +34,15 @@ class TestSetuParameters < Test::Unit::TestCase
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_longpolling_connection_ttl_cannot_be_zero
expected_error_message = "push_stream_longpolling_connection_ttl cannot be zero"
@longpolling_connection_ttl = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_max_channel_id_length_cannot_be_zero
expected_error_message = "push_stream_max_channel_id_length cannot be zero"
@max_channel_id_length = 0
......
......@@ -329,7 +329,7 @@ class TestSubscriberLongPolling < Test::Unit::TestCase
sub.callback {
stop = Time.now
elapsed = time_diff_sec(start, stop)
assert(elapsed >= 15 && elapsed <= 20, "Disconnect was in #{elapsed} seconds")
assert(elapsed >= 15 && elapsed <= 15.5, "Disconnect was in #{elapsed} seconds")
assert_equal(304, sub.response_header.status, "Wrong status")
assert_equal(Time.now.utc.strftime("%a, %d %b %Y %T %Z"), sub.response_header['LAST_MODIFIED'].to_s, "Wrong header")
assert_equal("0", sub.response_header['ETAG'].to_s, "Wrong header")
......@@ -337,7 +337,60 @@ class TestSubscriberLongPolling < Test::Unit::TestCase
EventMachine.stop
}
add_test_timeout(30)
add_test_timeout(20)
}
end
def config_test_disconnect_long_polling_subscriber_when_longpolling_timeout_is_set
@subscriber_connection_timeout = "15s"
@longpolling_connection_ttl = "5s"
end
def test_disconnect_long_polling_subscriber_when_longpolling_timeout_is_set
channel = 'ch_test_disconnect_long_polling_subscriber_when_longpolling_timeout_is_set'
start = Time.now
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :timeout => 30
sub.callback {
stop = Time.now
elapsed = time_diff_sec(start, stop)
assert(elapsed >= 5 && elapsed <= 5.5, "Disconnect was in #{elapsed} seconds")
assert_equal(304, sub.response_header.status, "Wrong status")
assert_equal(Time.now.utc.strftime("%a, %d %b %Y %T %Z"), sub.response_header['LAST_MODIFIED'].to_s, "Wrong header")
assert_equal("0", sub.response_header['ETAG'].to_s, "Wrong header")
assert_equal("", sub.response, "Wrong header")
EventMachine.stop
}
add_test_timeout(20)
}
end
def config_test_disconnect_long_polling_subscriber_when_only_longpolling_timeout_is_set
@longpolling_connection_ttl = "3s"
end
def test_disconnect_long_polling_subscriber_when_only_longpolling_timeout_is_set
channel = 'ch_test_disconnect_long_polling_subscriber_when_only_longpolling_timeout_is_set'
start = Time.now
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :timeout => 30
sub.callback {
stop = Time.now
elapsed = time_diff_sec(start, stop)
assert(elapsed >= 3 && elapsed <= 3.5, "Disconnect was in #{elapsed} seconds")
assert_equal(304, sub.response_header.status, "Wrong status")
assert_equal(Time.now.utc.strftime("%a, %d %b %Y %T %Z"), sub.response_header['LAST_MODIFIED'].to_s, "Wrong header")
assert_equal("0", sub.response_header['ETAG'].to_s, "Wrong header")
assert_equal("", sub.response, "Wrong header")
EventMachine.stop
}
add_test_timeout(20)
}
end
......@@ -347,7 +400,7 @@ class TestSubscriberLongPolling < Test::Unit::TestCase
end
def test_not_receive_ping_message
channel = 'ch_test_disconnect_long_polling_subscriber_when_disconnect_timeout_is_set'
channel = 'ch_test_not_receive_ping_message'
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :timeout => 30
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment