Commit cc6a0d89 authored by Wandenberg's avatar Wandenberg

add push_stream_timeout_with_body directive to indicate whether send a full...

add push_stream_timeout_with_body directive to indicate whether send a full message on timed out long polling connections, or not
parent ea35acb7
h1(#changelog). Changelog h1(#changelog). Changelog
* Added push_stream_timeout_with_body directive to indicate whether send a full message on timed out long polling connections, or not
* Removed default value from push_stream_padding_by_user_agent directive, it was "[A|a]ndroid 2,4097,4097:[S|s]afari,1025,0" * Removed default value from push_stream_padding_by_user_agent directive, it was "[A|a]ndroid 2,4097,4097:[S|s]afari,1025,0"
* Change the publish message action through a WebSocket connection to add the message to all subscribed channels * Change the publish message action through a WebSocket connection to add the message to all subscribed channels
* Added support to get channels statistics, delete channels and publish message to some channels specifying their ids on push_stream_channels_path * Added support to get channels statistics, delete channels and publish message to some channels specifying their ids on push_stream_channels_path
......
...@@ -129,6 +129,7 @@ h1(#directives). Directives ...@@ -129,6 +129,7 @@ h1(#directives). Directives
| "push_stream_ping_message_interval":push_stream_ping_message_interval |   - |   - |   x |   - |   - |   x | | "push_stream_ping_message_interval":push_stream_ping_message_interval |   - |   - |   x |   - |   - |   x |
| "push_stream_subscriber_connection_ttl":push_stream_subscriber_connection_ttl |   - |   - |   x |   - |   - |   x | | "push_stream_subscriber_connection_ttl":push_stream_subscriber_connection_ttl |   - |   - |   x |   - |   - |   x |
| "push_stream_longpolling_connection_ttl":push_stream_longpolling_connection_ttl |   - |   - |   x |   - |   - |   - | | "push_stream_longpolling_connection_ttl":push_stream_longpolling_connection_ttl |   - |   - |   x |   - |   - |   - |
| "push_stream_timeout_with_body":push_stream_timeout_with_body |   - |   x |   - |   - |   - |   - |
| "push_stream_last_received_message_time":push_stream_last_received_message_time |   - |   - |   x |   - |   - |   - | | "push_stream_last_received_message_time":push_stream_last_received_message_time |   - |   - |   x |   - |   - |   - |
| "push_stream_last_received_message_tag":push_stream_last_received_message_tag |   - |   - |   x |   - |   - |   - | | "push_stream_last_received_message_tag":push_stream_last_received_message_tag |   - |   - |   x |   - |   - |   - |
| "push_stream_last_event_id":push_stream_last_event_id |   - |   - |   x |   - |   - |   - | | "push_stream_last_event_id":push_stream_last_event_id |   - |   - |   x |   - |   - |   - |
...@@ -233,6 +234,7 @@ h1(#contributors). Contributors ...@@ -233,6 +234,7 @@ h1(#contributors). Contributors
[push_stream_ping_message_interval]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_ping_message_interval [push_stream_ping_message_interval]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_ping_message_interval
[push_stream_subscriber_connection_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_subscriber_connection_ttl [push_stream_subscriber_connection_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_subscriber_connection_ttl
[push_stream_longpolling_connection_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_longpolling_connection_ttl [push_stream_longpolling_connection_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_longpolling_connection_ttl
[push_stream_timeout_with_body]docs/directives/subscribers.textile#push_stream_timeout_with_body
[push_stream_last_received_message_time]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_received_message_time [push_stream_last_received_message_time]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_received_message_time
[push_stream_last_received_message_tag]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_received_message_tag [push_stream_last_received_message_tag]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_received_message_tag
[push_stream_last_event_id]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_event_id [push_stream_last_event_id]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_event_id
......
...@@ -185,6 +185,19 @@ The length of time a long polling subscriber will stay connected waiting for a m ...@@ -185,6 +185,19 @@ The length of time a long polling subscriber will stay connected waiting for a m
But, this operation is very important to help Nginx recycle memory consumed to send messages to susbscriber, allocated at pool request. But, this operation is very important to help Nginx recycle memory consumed to send messages to susbscriber, allocated at pool request.
h2(#push_stream_timeout_with_body). push_stream_timeout_with_body <a name="push_stream_timeout_with_body" href="#">&nbsp;</a>
*syntax:* _push_stream_timeout_with_body on | off_
*default:* _off_
*context:* _location (push_stream_subscriber)_
*release version:* _0.4.0_
When set to on will send a http 200 message indicating that a timeout happens on long polling connections instead of send only a http 304 header.
h2(#push_stream_websocket_allow_publish). push_stream_websocket_allow_publish <a name="push_stream_websocket_allow_publish" href="#">&nbsp;</a> h2(#push_stream_websocket_allow_publish). push_stream_websocket_allow_publish <a name="push_stream_websocket_allow_publish" href="#">&nbsp;</a>
*syntax:* _push_stream_websocket_allow_publish on | off_ *syntax:* _push_stream_websocket_allow_publish on | off_
......
...@@ -62,6 +62,7 @@ typedef struct { ...@@ -62,6 +62,7 @@ typedef struct {
ngx_uint_t max_messages_stored_per_channel; ngx_uint_t max_messages_stored_per_channel;
ngx_uint_t max_channel_id_length; ngx_uint_t max_channel_id_length;
ngx_http_push_stream_template_queue_t msg_templates; ngx_http_push_stream_template_queue_t msg_templates;
ngx_flag_t timeout_with_body;
ngx_regex_t *backtrack_parser_regex; ngx_regex_t *backtrack_parser_regex;
} ngx_http_push_stream_main_conf_t; } ngx_http_push_stream_main_conf_t;
......
...@@ -195,6 +195,9 @@ static const ngx_int_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID = -1; ...@@ -195,6 +195,9 @@ static const ngx_int_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID = -1;
static const ngx_int_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID = -2; static const ngx_int_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID = -2;
#define NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT "Channel deleted" #define NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT "Channel deleted"
static const ngx_int_t NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID = -3;
#define NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT "Timed out"
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string("~event-id~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string("~event-id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE = ngx_string("~event-type~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE = ngx_string("~event-type~");
...@@ -223,6 +226,7 @@ ngx_event_t ngx_http_push_stream_memory_cleanup_event; ...@@ -223,6 +226,7 @@ ngx_event_t ngx_http_push_stream_memory_cleanup_event;
ngx_event_t ngx_http_push_stream_buffer_cleanup_event; ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL; ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
ngx_http_push_stream_msg_t *ngx_http_push_stream_longpooling_timeout_msg = NULL;
// general request handling // general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool); ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
......
...@@ -45,6 +45,7 @@ http { ...@@ -45,6 +45,7 @@ http {
push_stream_subscriber_connection_ttl 15m; push_stream_subscriber_connection_ttl 15m;
# connection ttl for long polling # connection ttl for long polling
push_stream_longpolling_connection_ttl 30s; push_stream_longpolling_connection_ttl 30s;
push_stream_timeout_with_body off;
# wildcard # wildcard
push_stream_wildcard_channel_prefix "broad_"; push_stream_wildcard_channel_prefix "broad_";
......
...@@ -19,6 +19,7 @@ module NginxConfiguration ...@@ -19,6 +19,7 @@ module NginxConfiguration
:subscriber_connection_ttl => nil, :subscriber_connection_ttl => nil,
:longpolling_connection_ttl => nil, :longpolling_connection_ttl => nil,
:timeout_with_body => 'off',
:message_ttl => '50m', :message_ttl => '50m',
:max_channel_id_length => 200, :max_channel_id_length => 200,
...@@ -113,6 +114,7 @@ http { ...@@ -113,6 +114,7 @@ http {
<%= write_directive("push_stream_subscriber_connection_ttl", subscriber_connection_ttl, "timeout for subscriber connections") %> <%= write_directive("push_stream_subscriber_connection_ttl", subscriber_connection_ttl, "timeout for subscriber connections") %>
<%= write_directive("push_stream_longpolling_connection_ttl", longpolling_connection_ttl, "timeout for long polling connections") %> <%= write_directive("push_stream_longpolling_connection_ttl", longpolling_connection_ttl, "timeout for long polling connections") %>
<%= write_directive("push_stream_timeout_with_body", timeout_with_body) %>
<%= write_directive("push_stream_header_template", header_template, "header to be sent when receiving new subscriber connection") %> <%= write_directive("push_stream_header_template", header_template, "header to be sent when receiving new subscriber connection") %>
<%= write_directive("push_stream_message_ttl", message_ttl, "message ttl") %> <%= write_directive("push_stream_message_ttl", message_ttl, "message ttl") %>
<%= write_directive("push_stream_footer_template", footer_template, "footer to be sent when finishing subscriber connection") %> <%= write_directive("push_stream_footer_template", footer_template, "footer to be sent when finishing subscriber connection") %>
......
...@@ -134,6 +134,31 @@ describe "Subscriber Properties" do ...@@ -134,6 +134,31 @@ describe "Subscriber Properties" do
end end
end end
it "should receive a timed out message when timeout_with_body is on" do
channel = 'ch_test_disconnect_long_polling_subscriber_when_longpolling_timeout_is_set'
start = Time.now
nginx_run_server(config.merge(:subscriber_connection_ttl => "1s", :timeout_with_body => 'on', :message_template => '{\"id\":\"~id~\", \"message\":\"~text~\", \"channel\":\"~channel~\", \"tag\":\"~tag~\", \"time\":\"~time~\"}'), :timeout => 30) do |conf|
EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub.callback do
stop = Time.now
time_diff_sec(start, stop).should be_in_the_interval(1, 1.5)
sub.should be_http_status(200)
response = JSON.parse(sub.response)
response["id"].should eql("-3")
response["message"].should eql("Timed out")
response["channel"].should eql("")
response["tag"].should eql("0")
response["time"].should eql("Thu, 01 Jan 1970 00:00:00 GMT")
Time.parse(sub.response_header['LAST_MODIFIED'].to_s).utc.to_i.should be_in_the_interval(Time.now.utc.to_i-1, Time.now.utc.to_i)
sub.response_header['ETAG'].to_s.should eql("0")
EventMachine.stop
end
end
end
end
it "should receive messages when connected in more than one channel" do it "should receive messages when connected in more than one channel" do
channel_1 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_1' channel_1 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_1'
channel_2 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_2' channel_2 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_2'
......
...@@ -72,6 +72,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -72,6 +72,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_MAIN_CONF_OFFSET, NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, ping_message_text), offsetof(ngx_http_push_stream_main_conf_t, ping_message_text),
NULL }, NULL },
{ ngx_string("push_stream_timeout_with_body"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, timeout_with_body),
NULL },
{ ngx_string("push_stream_message_ttl"), { ngx_string("push_stream_message_ttl"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot, ngx_conf_set_sec_slot,
...@@ -420,6 +426,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf) ...@@ -420,6 +426,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->max_subscribers_per_channel = NGX_CONF_UNSET; mcf->max_subscribers_per_channel = NGX_CONF_UNSET;
mcf->max_messages_stored_per_channel = NGX_CONF_UNSET_UINT; mcf->max_messages_stored_per_channel = NGX_CONF_UNSET_UINT;
mcf->qtd_templates = 0; mcf->qtd_templates = 0;
mcf->timeout_with_body = NGX_CONF_UNSET;
ngx_queue_init(&mcf->msg_templates.queue); ngx_queue_init(&mcf->msg_templates.queue);
ngx_http_push_stream_module_main_conf = mcf; ngx_http_push_stream_module_main_conf = mcf;
...@@ -443,6 +450,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent) ...@@ -443,6 +450,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
ngx_conf_merge_str_value(conf->channel_deleted_message_text, conf->channel_deleted_message_text, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT); ngx_conf_merge_str_value(conf->channel_deleted_message_text, conf->channel_deleted_message_text, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT);
ngx_conf_merge_str_value(conf->ping_message_text, conf->ping_message_text, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT); ngx_conf_merge_str_value(conf->ping_message_text, conf->ping_message_text, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT);
ngx_conf_merge_str_value(conf->wildcard_channel_prefix, conf->wildcard_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_WILDCARD_CHANNEL_PREFIX); ngx_conf_merge_str_value(conf->wildcard_channel_prefix, conf->wildcard_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_WILDCARD_CHANNEL_PREFIX);
ngx_conf_init_value(conf->timeout_with_body, 0);
// sanity checks // sanity checks
// max number of channels cannot be zero // max number of channels cannot be zero
...@@ -896,5 +904,10 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -896,5 +904,10 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
return NGX_ERROR; return NGX_ERROR;
} }
// create longpooling timeout message
if ((ngx_http_push_stream_longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked((u_char *)NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, sizeof(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
return NGX_ERROR;
}
return NGX_OK; return NGX_OK;
} }
...@@ -227,8 +227,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -227,8 +227,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->queue.next = NULL; msg->queue.next = NULL;
msg->id = id; msg->id = id;
msg->workers_ref_count = 0; msg->workers_ref_count = 0;
msg->time = (id == -1) ? 0 : ngx_time(); msg->time = (id < 0) ? 0 : ngx_time();
msg->tag = (msg->time == shm_data->last_message_time) ? (shm_data->last_message_tag + 1) : 1; msg->tag = (id < 0) ? 0 : ((msg->time == shm_data->last_message_time) ? (shm_data->last_message_tag + 1) : 1);
if ((msg->raw.data = ngx_slab_alloc_locked(shpool, len + 1)) == NULL) { if ((msg->raw.data = ngx_slab_alloc_locked(shpool, len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg); ngx_http_push_stream_free_message_memory_locked(shpool, msg);
...@@ -669,8 +669,17 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_ ...@@ -669,8 +669,17 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context); ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context);
ngx_http_push_stream_add_polling_headers(r, ngx_time(), 0, r->pool); ngx_http_push_stream_add_polling_headers(r, ngx_time(), 0, r->pool);
if (ngx_http_push_stream_module_main_conf->timeout_with_body) {
ngx_http_send_header(r);
ngx_http_push_stream_send_response_content_header(r, ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module));
ngx_http_push_stream_send_response_message(r, NULL, ngx_http_push_stream_longpooling_timeout_msg, 1, 0);
ngx_http_push_stream_send_response_finalize(r);
} else {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_MODIFIED, NULL); ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_MODIFIED, NULL);
ngx_http_finalize_request(r, NGX_DONE); ngx_http_finalize_request(r, NGX_DONE);
}
} }
static ngx_int_t static ngx_int_t
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment