Commit aeff049c authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding directive push_stream_footer_template to send a text to subscriber...

adding directive push_stream_footer_template to send a text to subscriber before close connection (channel delete or subscriber timeout)
parent 7ff9a1ad
......@@ -81,6 +81,8 @@ h2(#basic-configuration). Basic Configuration
push_stream_header_template "<html><head><meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-store\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-cache\">\r\n<meta http-equiv=\"Pragma\" content=\"no-cache\">\r\n<meta http-equiv=\"Expires\" content=\"Thu, 1 Jan 1970 00:00:00 GMT\">\r\n<script type=\"text/javascript\">\r\nwindow.onError = null;\r\ndocument.domain = 'localhost';\r\nparent.PushStream.register(this);\r\n</script>\r\n</head>\r\n<body onload=\"try { parent.PushStream.reset(this) } catch (e) {}\">";
# message template
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
# footer to be sent when finishing subscriber connection
push_stream_footer_template "</body></html>";
# content-type
push_stream_content_type "text/html; charset=utf-8";
# subscriber may create channels on demand or only authorized
......@@ -182,6 +184,7 @@ h3(#directives). Directives
|push_stream_max_message_buffer_length|unset|number|location|push_stream_publisher|
|push_stream_authorized_channels_only|off|on, off|location|push_stream_subscriber|
|push_stream_header_template|unset|any string|location|push_stream_subscriber|
|push_stream_footer_template|unset|any string|location|push_stream_subscriber|
|push_stream_content_type|text/plain|any valid content type|location|push_stream_subscriber|
|push_stream_ping_message_interval|unset|time constant|location|push_stream_subscriber|
|push_stream_subscriber_connection_timeout|unset|time constant|location|push_stream_subscriber|
......@@ -259,6 +262,15 @@ location: push_stream_subscriber
The text that will be sended to subscribers when they arrive.
h4(#push_stream_footer_template). push_stream_footer_template [ string ]
New in version 0.2.6
default: -
context: location
location: push_stream_subscriber
The text that will be sended to subscribers before connection is closed (channel deleted ou subscriber timeout).
h4(#push_stream_message_template). push_stream_message_template [ string ]
default: ==~text~==
......@@ -273,7 +285,7 @@ default: text/plain
context: location
location: push_stream_subscriber
The content type used on responses to subscribers. Must be complient with push_stream_header_template and push_stream_message_template.
The content type used on responses to subscribers. Must be complient with push_stream_header_template, push_stream_message_template and push_stream_footer_template.
h4(#push_stream_ping_message_interval). push_stream_ping_message_interval [ time ]
......
......@@ -59,6 +59,7 @@ typedef struct {
ngx_str_t header_template;
ngx_str_t message_template;
ngx_int_t message_template_index;
ngx_str_t footer_template;
ngx_str_t content_type;
ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_disconnect_interval;
......
......@@ -38,6 +38,7 @@ static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT = 30; // 30 se
#define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE "~text~"
#define NGX_HTTP_PUSH_STREAM_DEFAULT_FOOTER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_CONTENT_TYPE "text/plain"
......
......@@ -77,6 +77,8 @@ http {
push_stream_header_template "<html><head><meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-store\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-cache\">\r\n<meta http-equiv=\"Pragma\" content=\"no-cache\">\r\n<meta http-equiv=\"Expires\" content=\"Thu, 1 Jan 1970 00:00:00 GMT\">\r\n<script type=\"text/javascript\">\r\nwindow.onError = null;\r\ndocument.domain = 'localhost';\r\nparent.PushStream.register(this);\r\n</script>\r\n</head>\r\n<body onload=\"try { parent.PushStream.reset(this) } catch (e) {}\">";
# message template
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
# footer to be sent when finishing subscriber connection
push_stream_footer_template "</body></html>";
# content-type
push_stream_content_type "text/html; charset=utf-8";
# subscriber may create channels on demand or only authorized
......
......@@ -110,6 +110,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, message_template),
NULL },
{ ngx_string("push_stream_footer_template"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, footer_template),
NULL },
{ ngx_string("push_stream_content_type"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
......@@ -379,6 +385,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->message_template_index = -1;
lcf->message_template.data = NULL;
lcf->header_template.data = NULL;
lcf->footer_template.data = NULL;
lcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
lcf->content_type.data = NULL;
lcf->subscriber_disconnect_interval = NGX_CONF_UNSET_MSEC;
......@@ -407,6 +414,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_CONF_UNSET_UINT);
ngx_conf_merge_str_value(conf->header_template, prev->header_template, NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE);
ngx_conf_merge_str_value(conf->message_template, prev->message_template, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE);
ngx_conf_merge_str_value(conf->footer_template, prev->footer_template, NGX_HTTP_PUSH_STREAM_DEFAULT_FOOTER_TEMPLATE);
ngx_conf_merge_msec_value(conf->ping_message_interval, prev->ping_message_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_str_value(conf->content_type, prev->content_type, NGX_HTTP_PUSH_STREAM_DEFAULT_CONTENT_TYPE);
ngx_conf_merge_msec_value(conf->subscriber_disconnect_interval, prev->subscriber_disconnect_interval, NGX_CONF_UNSET_MSEC);
......@@ -499,9 +507,9 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
return NGX_CONF_ERROR;
}
// append crlf to templates
// formatting header and footer template for chunk transfer
if (conf->header_template.len > 0) {
ngx_str_t * aux = ngx_http_push_stream_get_formatted_chunk(conf->header_template.data, conf->header_template.len, cf->pool);
ngx_str_t *aux = ngx_http_push_stream_get_formatted_chunk(conf->header_template.data, conf->header_template.len, cf->pool);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format header template");
return NGX_CONF_ERROR;
......@@ -510,6 +518,16 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->header_template.len = aux->len;
}
if (conf->footer_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_get_formatted_chunk(conf->footer_template.data, conf->footer_template.len, cf->pool);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format footer template");
return NGX_CONF_ERROR;
}
conf->footer_template.data = aux->data;
conf->footer_template.len = aux->len;
}
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template);
// calc buffer cleanup interval
......
......@@ -283,6 +283,12 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex
static void
ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
{
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
if (pslcf->footer_template.len > 0) {
ngx_http_push_stream_send_response_text(r, pslcf->footer_template.data, pslcf->footer_template.len, 0);
}
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
ngx_http_finalize_request(r, NGX_HTTP_OK);
}
......
......@@ -136,6 +136,7 @@ module BaseTestCase
@broadcast_channel_prefix = 'broad_'
@content_type = 'text/html; charset=utf-8'
@header_template = %{<html><head><meta http-equiv=\\"Content-Type\\" content=\\"text/html; charset=utf-8\\">\\r\\n<meta http-equiv=\\"Cache-Control\\" content=\\"no-store\\">\\r\\n<meta http-equiv=\\"Cache-Control\\" content=\\"no-cache\\">\\r\\n<meta http-equiv=\\"Expires\\" content=\\"Thu, 1 Jan 1970 00:00:00 GMT\\">\\r\\n<script type=\\"text/javascript\\">\\r\\nwindow.onError = null;\\r\\ndocument.domain = \\'#{nginx_host}\\';\\r\\nparent.PushStream.register(this);\\r\\n</script>\\r\\n</head>\\r\\n<body onload=\\"try { parent.PushStream.reset(this) } catch (e) {}\\">}
@footer_template = %{</body></html>}
@max_channel_id_length = 200
@max_message_buffer_length = 20
@max_number_of_broadcast_channels = nil
......@@ -290,6 +291,8 @@ http {
<%= %{push_stream_header_template "#{@header_template}";} unless @header_template.nil? %>
# message template
<%= %{push_stream_message_template "#{@message_template}";} unless @message_template.nil? %>
# footer to be sent when finishing subscriber connection
<%= %{push_stream_footer_template "#{@footer_template}";} unless @footer_template.nil? %>
# content-type
<%= %{push_stream_content_type "#{@content_type}";} unless @content_type.nil? %>
# subscriber may create channels on demand or only authorized
......
......@@ -57,7 +57,7 @@ class TestPublishMessages < Test::Unit::TestCase
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout(5)
add_test_timeout
}
bytes = []
......
......@@ -416,6 +416,7 @@ class TestPublisherAdmin < Test::Unit::TestCase
def config_test_delete_channel_whith_subscriber_in_one_channel
@header_template = " " # send a space as header to has a chunk received
@footer_template = nil
@ping_message_interval = nil
@message_template = '{\"id\":\"~id~\", \"channel\":\"~channel~\", \"text\":\"~text~\"}'
end
......@@ -477,6 +478,7 @@ class TestPublisherAdmin < Test::Unit::TestCase
def config_test_delete_channel_whith_subscriber_in_two_channels
@header_template = " " # send a space as header to has a chunk received
@footer_template = nil
@ping_message_interval = nil
@message_template = '{\"id\":\"~id~\", \"channel\":\"~channel~\", \"text\":\"~text~\"}'
end
......@@ -552,9 +554,105 @@ class TestPublisherAdmin < Test::Unit::TestCase
}
end
def config_test_receive_footer_template_when_channel_is_deleted
@header_template = "HEADER_TEMPLATE"
@footer_template = "FOOTER_TEMPLATE"
@ping_message_interval = nil
@message_template = '~text~'
end
def test_receive_footer_template_when_channel_is_deleted
headers = {'accept' => 'application/json'}
body = 'published message'
channel = 'ch_test_receive_footer_template_when_channel_is_deleted'
resp = ""
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
resp = resp + chunk
if resp == "#{@header_template}\r\n"
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).delete :head => headers, :timeout => 30
pub.callback {
assert_equal(200, pub.response_header.status, "Request was not received")
assert_equal(0, pub.response_header.content_length, "Should response only with headers")
assert_equal("Channel deleted.", pub.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
}
end
}
sub_1.callback {
assert_equal("#{@header_template}\r\nChannel deleted\r\n#{@footer_template}\r\n", resp, "Didn't receive complete message")
EventMachine.stop
}
add_test_timeout
}
end
def config_test_different_header_and_footer_template_by_location
@header_template = "HEADER_TEMPLATE"
@footer_template = "FOOTER_TEMPLATE"
@header_template2 = "<html><body>"
@footer_template2 = "</body></html>"
@ping_message_interval = nil
@message_template = '~text~'
@extra_location = %{
location ~ /sub2/(.*)? {
# activate subscriber mode for this location
push_stream_subscriber;
# positional channel path
set $push_stream_channels_path $1;
push_stream_header_template "#{@header_template2}";
push_stream_footer_template "#{@footer_template2}";
push_stream_message_template "|~text~|";
}
}
end
def test_different_header_and_footer_template_by_location
headers = {'accept' => 'application/json'}
body = 'published message'
channel = 'ch_test_different_header_and_footer_template_by_location'
resp = ""
resp2 = ""
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
resp = resp + chunk
}
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub2/' + channel.to_s).get :head => headers, :timeout => 30
sub_2.stream { |chunk|
resp2 = resp2 + chunk
}
EM.add_timer(1) do
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).delete :head => headers, :timeout => 30
pub.callback {
assert_equal(200, pub.response_header.status, "Request was not received")
assert_equal(0, pub.response_header.content_length, "Should response only with headers")
assert_equal("Channel deleted.", pub.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
}
end
EM.add_timer(2) do
assert_equal("#{@header_template}\r\nChannel deleted\r\n#{@footer_template}\r\n", resp, "Didn't receive complete message")
assert_equal("#{@header_template2}\r\n|Channel deleted|\r\n#{@footer_template2}\r\n", resp2, "Didn't receive complete message")
EventMachine.stop
end
add_test_timeout
}
end
def config_test_custom_channel_deleted_message_text
@channel_deleted_message_text = "Channel has gone away."
@header_template = " " # send a space as header to has a chunk received
@footer_template = nil
@ping_message_interval = nil
@message_template = '{\"id\":\"~id~\", \"channel\":\"~channel~\", \"text\":\"~text~\"}'
end
......
......@@ -6,6 +6,7 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase
def config_test_subscriber_connection_timeout
@subscriber_connection_timeout = "37s"
@header_template = "HEADER_TEMPLATE"
@footer_template = "FOOTER_TEMPLATE"
@ping_message_interval = nil
end
......@@ -14,16 +15,19 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase
headers = {'accept' => 'text/html'}
start = Time.now
receivedHeaderTemplate = false
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
sub.stream { |chunk|
assert(chunk.include?(@header_template), "Didn't received header template")
response += chunk
assert(response.include?(@header_template), "Didn't received header template")
}
sub.callback {
stop = Time.now
elapsed = time_diff_sec(start, stop)
assert(elapsed >= 38 && elapsed <= 39.5, "Disconnect was in #{elapsed} seconds")
assert(response.include?(@footer_template), "Didn't received footer template")
EventMachine.stop
}
}
......@@ -33,6 +37,7 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase
@subscriber_connection_timeout = "37s"
@ping_message_interval = "5s"
@header_template = nil
@footer_template = nil
end
def test_subscriber_connection_timeout_with_ping_message
......@@ -41,6 +46,7 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase
start = Time.now
chunksReceived = 0
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
sub.stream { |chunk|
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment