Commit b434a517 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding keepalive support

parent 1ec157da
h2. Version 0.2.4
* Adding keepalive support
* Fixing bug when reloading nginx configuration file (Thanks _Rob Mueller_ for bug report)
h2. Version 0.2.3 h2. Version 0.2.3
* Fixing bug to accept different message templates on different subscriber locations (Now you CAN remove push_stream_message_template directive from publisher location, it will not be used there) * Fixing bug to accept different message templates on different subscriber locations (Now you CAN remove push_stream_message_template directive from publisher location, it will not be used there)
......
...@@ -184,6 +184,7 @@ h3(#directives). Directives ...@@ -184,6 +184,7 @@ h3(#directives). Directives
|push_stream_max_number_of_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration| |push_stream_max_number_of_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_max_number_of_broadcast_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration| |push_stream_max_number_of_broadcast_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_memory_cleanup_timeout|30 seconds|time constant|http|main nginx configuration| |push_stream_memory_cleanup_timeout|30 seconds|time constant|http|main nginx configuration|
|push_stream_keepalive|off|on, off|http, location|(push_stream_publisher and push_stream_channels_statistics) or main nginx configuration|
h4(#push_stream_channels_statistics). push_stream_channels_statistics h4(#push_stream_channels_statistics). push_stream_channels_statistics
...@@ -377,6 +378,15 @@ location: (push_stream_subscriber and push_stream_publisher) or main nginx confi ...@@ -377,6 +378,15 @@ location: (push_stream_subscriber and push_stream_publisher) or main nginx confi
The length of time a message or a channel will stay on garbage collection area before it is completly discarded, freeing the shared memory. The minimum length is 30 seconds to ensure that no one is using these elements. The length of time a message or a channel will stay on garbage collection area before it is completly discarded, freeing the shared memory. The minimum length is 30 seconds to ensure that no one is using these elements.
This operation is very important to help Nginx recycle memory consumed to create messages and channels, so do not use a large time. This operation is very important to help Nginx recycle memory consumed to create messages and channels, so do not use a large time.
h4(#push_stream_keepalive). push_stream_keepalive [ on | off ]
New in version 0.2.4
default: off
context: location
location: (push_stream_publisher and push_stream_channels_statistics) or main nginx configuration
Enable keepalive connections, on publisher or channels statistics locations.
h2(#attention). Attention h2(#attention). Attention
This module controls everything needed to send the messages to subscribers. This module controls everything needed to send the messages to subscribers.
......
...@@ -66,6 +66,7 @@ typedef struct { ...@@ -66,6 +66,7 @@ typedef struct {
ngx_uint_t max_number_of_channels; ngx_uint_t max_number_of_channels;
ngx_uint_t max_number_of_broadcast_channels; ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t buffer_cleanup_interval; ngx_msec_t buffer_cleanup_interval;
ngx_uint_t keepalive;
} ngx_http_push_stream_loc_conf_t; } ngx_http_push_stream_loc_conf_t;
// shared memory segment name // shared memory segment name
......
...@@ -109,7 +109,6 @@ ngx_http_push_stream_send_buf_response(ngx_http_request_t *r, ngx_buf_t *buf, co ...@@ -109,7 +109,6 @@ ngx_http_push_stream_send_buf_response(ngx_http_request_t *r, ngx_buf_t *buf, co
buf->memory = 1; buf->memory = 1;
buf->last_buf = 1; buf->last_buf = 1;
r->keepalive = 0;
r->headers_out.status = status_code; r->headers_out.status = status_code;
rc = ngx_http_send_header(r); rc = ngx_http_send_header(r);
......
...@@ -34,8 +34,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -34,8 +34,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel = NULL; ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
// Publisher never do a keep alive connection r->keepalive = cf->keepalive;
r->keepalive = 0;
// only accept GET and POST methods // only accept GET and POST methods
if (!(r->method & (NGX_HTTP_GET|NGX_HTTP_POST))) { if (!(r->method & (NGX_HTTP_GET|NGX_HTTP_POST))) {
...@@ -222,6 +221,8 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r) ...@@ -222,6 +221,8 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel = NULL; ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
r->keepalive = cf->keepalive;
// only accept GET method // only accept GET method
if (!(r->method & NGX_HTTP_GET)) { if (!(r->method & NGX_HTTP_GET)) {
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET); ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET);
......
...@@ -140,6 +140,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -140,6 +140,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_MAIN_CONF_OFFSET, NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, memory_cleanup_timeout), offsetof(ngx_http_push_stream_main_conf_t, memory_cleanup_timeout),
NULL }, NULL },
{ ngx_string("push_stream_keepalive"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, keepalive),
NULL },
ngx_null_command ngx_null_command
}; };
...@@ -351,6 +357,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ...@@ -351,6 +357,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->max_number_of_channels = NGX_CONF_UNSET_UINT; lcf->max_number_of_channels = NGX_CONF_UNSET_UINT;
lcf->max_number_of_broadcast_channels = NGX_CONF_UNSET_UINT; lcf->max_number_of_broadcast_channels = NGX_CONF_UNSET_UINT;
lcf->buffer_cleanup_interval = NGX_CONF_UNSET_MSEC; lcf->buffer_cleanup_interval = NGX_CONF_UNSET_MSEC;
lcf->keepalive = NGX_CONF_UNSET_UINT;
return lcf; return lcf;
} }
...@@ -377,6 +384,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -377,6 +384,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->max_number_of_channels, prev->max_number_of_channels, NGX_CONF_UNSET_UINT); ngx_conf_merge_uint_value(conf->max_number_of_channels, prev->max_number_of_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->max_number_of_broadcast_channels, prev->max_number_of_broadcast_channels, NGX_CONF_UNSET_UINT); ngx_conf_merge_uint_value(conf->max_number_of_broadcast_channels, prev->max_number_of_broadcast_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->buffer_cleanup_interval, prev->buffer_cleanup_interval, NGX_CONF_UNSET_MSEC); ngx_conf_merge_uint_value(conf->buffer_cleanup_interval, prev->buffer_cleanup_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_uint_value(conf->keepalive, prev->keepalive, 0);
// sanity checks // sanity checks
......
...@@ -124,7 +124,6 @@ ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t ...@@ -124,7 +124,6 @@ ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t
{ {
ngx_int_t rc; ngx_int_t rc;
r->keepalive = 0;
r->header_only = 1; r->header_only = 1;
r->headers_out.content_length_n = 0; r->headers_out.content_length_n = 0;
r->headers_out.status = status_code; r->headers_out.status = status_code;
......
...@@ -146,6 +146,7 @@ module BaseTestCase ...@@ -146,6 +146,7 @@ module BaseTestCase
@subscriber_connection_timeout = nil @subscriber_connection_timeout = nil
@memory_cleanup_timeout = '5m' @memory_cleanup_timeout = '5m'
@config_template = nil @config_template = nil
@keepalive = 'off'
end end
def publish_message(channel, headers, body) def publish_message(channel, headers, body)
...@@ -228,6 +229,9 @@ http { ...@@ -228,6 +229,9 @@ http {
# query string based channel id # query string based channel id
set $push_stream_channel_id $arg_id; set $push_stream_channel_id $arg_id;
# keepalive
<%= "push_stream_keepalive #{@keepalive};" unless @keepalive.nil? %>
} }
location /pub { location /pub {
...@@ -242,6 +246,8 @@ http { ...@@ -242,6 +246,8 @@ http {
<%= "push_stream_max_message_buffer_length #{@max_message_buffer_length};" unless @max_message_buffer_length.nil? %> <%= "push_stream_max_message_buffer_length #{@max_message_buffer_length};" unless @max_message_buffer_length.nil? %>
# message ttl # message ttl
<%= "push_stream_min_message_buffer_timeout #{@min_message_buffer_timeout};" unless @min_message_buffer_timeout.nil? %> <%= "push_stream_min_message_buffer_timeout #{@min_message_buffer_timeout};" unless @min_message_buffer_timeout.nil? %>
# keepalive
<%= "push_stream_keepalive #{@keepalive};" unless @keepalive.nil? %>
<%= "push_stream_max_channel_id_length #{@max_channel_id_length};" unless @max_channel_id_length.nil? %> <%= "push_stream_max_channel_id_length #{@max_channel_id_length};" unless @max_channel_id_length.nil? %>
<%= %{push_stream_broadcast_channel_prefix "#{@broadcast_channel_prefix}";} unless @broadcast_channel_prefix.nil? %> <%= %{push_stream_broadcast_channel_prefix "#{@broadcast_channel_prefix}";} unless @broadcast_channel_prefix.nil? %>
......
require File.expand_path('base_test_case', File.dirname(__FILE__))
require 'socket'
class TestKeepalive < Test::Unit::TestCase
include BaseTestCase
def config_test_different_operation_with_keepalive
@keepalive = 'on'
end
def test_different_operation_with_keepalive
channel = 'ch_test_different_operation_with_keepalive'
body = 'message to be sent'
get_without_channel_id = "GET /pub HTTP/1.0\r\n\r\n"
post_channel_message = "POST /pub?id=#{channel} HTTP/1.0\r\nContent-Length: #{body.size}\r\n\r\n#{body}"
get_channels_stats = "GET /channels-stats HTTP/1.0\r\n\r\n"
get_channel_stats = "GET /pub?id=#{channel} HTTP/1.0\r\n\r\n"
socket = TCPSocket.open(nginx_host, nginx_port)
socket.print(get_without_channel_id)
headers, body = read_response(socket)
assert_equal("", body, "Wrong response")
assert(headers.index('No channel id provided.') > 0, "Didn't receive error message")
socket.print(post_channel_message)
headers, body = read_response(socket)
assert_equal("{\"channel\": \"#{channel}\", \"published_messages\": \"1\", \"stored_messages\": \"1\", \"subscribers\": \"0\"}\r\n", body, "Wrong response")
socket.print(get_channels_stats)
headers, body = read_response(socket)
assert(body.index("\"channels\": \"1\", \"broadcast_channels\": \"0\", \"published_messages\": \"1\", \"subscribers\": \"0\", \"by_worker\": [\r\n") > 0, "Didn't receive message")
assert(body.index("\"subscribers\": \"0\"}") > 0, "Didn't receive message")
socket.print(get_channel_stats)
headers, body = read_response(socket)
assert_equal("{\"channel\": \"#{channel}\", \"published_messages\": \"1\", \"stored_messages\": \"1\", \"subscribers\": \"0\"}\r\n", body, "Wrong response")
end
def read_response(socket)
response = socket.readpartial(1)
while (tmp = socket.read_nonblock(256))
response += tmp
end
ensure
fail("Any response") if response.nil?
headers, body = response.split("\r\n\r\n", 2)
return headers, body
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment