Commit 73e503c8 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding tag and time available at message template, and make possible pass...

adding tag and time available at message template, and make possible pass these values whithout set headers
parent 0144420e
* Adding tag and time available at message template, and make possible pass these values whithout set headers
* Adding a reference count to the message to avoid discard it before be processed for all workers
* Improvement on memory usage to reuse chains and buffers
* Improvement on javascript message parser regexp
......
......@@ -817,7 +817,7 @@ h2(#push_stream_message_template). push_stream_message_template
*context:* _location (push_stream_subscriber)_
The text template that will be used to format the message before be sended to subscribers. The template can contain any number of the reserved words: ==~id~, ~text~, ~channel~ and ~event-id~, example: "<script>p(~id~,'~channel~','~text~');</script>"==
The text template that will be used to format the message before be sended to subscribers. The template can contain any number of the reserved words: ==~id~, ~text~, ~channel~, ~time~, ~tag~ and ~event-id~, example: "<script>p(~id~,'~channel~','~text~', ~tag~, '~time~');</script>"==
h2(#push_stream_footer_template). push_stream_footer_template
......@@ -932,6 +932,32 @@ h2(#push_stream_websocket_allow_publish). push_stream_websocket_allow_publish
Enable a WebSocket subscriber send messages to the channel it is connected (the first, if connected in more than one) through the same connection it is receiving the messages, using _send_ method from WebSocket interface.
h2(#push_stream_last_received_message_time). push_stream_last_received_message_time
*syntax:* _push_stream_last_received_message_time string_
*default:* _none_
*context:* _location_
*release version:* _0.3.3_
Set the time when last message was received to the server knows which messages has to be sent to subscriber. Is a replacement for If-Modified-Since header. Example, $arg_time indicate that the value will be take from time argument.
h2(#push_stream_last_received_message_tag). push_stream_last_received_message_tag
*syntax:* _push_stream_last_received_message_tag string_
*default:* _none_
*context:* _location_
*release version:* _0.3.3_
Set the tag of the last message received to the server knows which messages has to be sent to subscriber. Is a replacement for If-None-Match header. Example, $arg_tag indicate that the value will be take from tag argument.
h1(#attention). Attention
This module controls everything needed to send the messages to subscribers.
......
......@@ -88,6 +88,8 @@ typedef struct {
ngx_msec_t subscriber_connection_ttl;
ngx_msec_t longpolling_connection_ttl;
ngx_flag_t websocket_allow_publish;
ngx_http_complex_value_t *last_received_message_time;
ngx_http_complex_value_t *last_received_message_tag;
} ngx_http_push_stream_loc_conf_t;
// shared memory segment name
......@@ -324,4 +326,6 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET");
#define NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(counter) \
(counter = (counter > 1) ? counter - 1 : 0)
#define NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN 30 //sizeof("Mon, 28 Sep 1970 06:00:00 GMT")
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_H_ */
......@@ -196,6 +196,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string("~event-id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG = ngx_string("~tag~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME = ngx_string("~time~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_DEFAULT_HEADER_TEMPLATE = ngx_string(":");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE = ngx_string(": ~text~\r\n");
......
......@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("e1f3dd26fa6d52c97a77a55384894ccf49454d76");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("6796034c6ae6b627d0f31e613287a425f22dd97a");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
......@@ -198,6 +198,18 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, websocket_allow_publish),
NULL },
{ ngx_string("push_stream_last_received_message_time"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, last_received_message_time),
NULL },
{ ngx_string("push_stream_last_received_message_tag"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, last_received_message_tag),
NULL },
ngx_null_command
};
......@@ -470,6 +482,8 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->subscriber_connection_ttl = NGX_CONF_UNSET_MSEC;
lcf->longpolling_connection_ttl = NGX_CONF_UNSET_MSEC;
lcf->websocket_allow_publish = NGX_CONF_UNSET_UINT;
lcf->last_received_message_time = NULL;
lcf->last_received_message_tag = NULL;
return lcf;
}
......@@ -480,10 +494,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_push_stream_loc_conf_t *prev = parent, *conf = child;
if ((ngx_http_push_stream_module_main_conf == NULL) || !ngx_http_push_stream_module_main_conf->enabled) {
return NGX_CONF_OK;
}
ngx_conf_merge_uint_value(conf->authorized_channels_only, prev->authorized_channels_only, 0);
ngx_conf_merge_value(conf->store_messages, prev->store_messages, 0);
ngx_conf_merge_str_value(conf->header_template, prev->header_template, NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE);
......@@ -498,6 +508,14 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_msec_value(conf->longpolling_connection_ttl, prev->longpolling_connection_ttl, conf->subscriber_connection_ttl);
ngx_conf_merge_value(conf->websocket_allow_publish, prev->websocket_allow_publish, 0);
if (conf->last_received_message_time == NULL) {
conf->last_received_message_time = prev->last_received_message_time;
}
if (conf->last_received_message_tag == NULL) {
conf->last_received_message_tag = prev->last_received_message_tag;
}
if (conf->location_type == NGX_CONF_UNSET_UINT) {
return NGX_CONF_OK;
}
......
......@@ -44,7 +44,8 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_subscriber_ctx_t *ctx;
time_t if_modified_since;
ngx_str_t *last_event_id;
ngx_str_t *last_event_id, vv_time = ngx_null_string;
u_char *dst, *src;
ngx_str_t *push_mode;
ngx_flag_t polling, longpolling;
ngx_int_t rc;
......@@ -74,8 +75,23 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message);
}
if (cf->last_received_message_time != NULL) {
ngx_http_complex_value(r, cf->last_received_message_time, &vv_time);
if (vv_time.len) {
dst = vv_time.data;
src = vv_time.data;
ngx_unescape_uri(&dst, &src, vv_time.len, NGX_UNESCAPE_URI);
if (dst < src) {
*dst = '\0';
vv_time.len = dst - vv_time.data;
}
}
} else if (r->headers_in.if_modified_since != NULL) {
vv_time = r->headers_in.if_modified_since->value;
}
// get control headers
if_modified_since = (r->headers_in.if_modified_since != NULL) ? ngx_http_parse_time(r->headers_in.if_modified_since->value.data, r->headers_in.if_modified_since->value.len) : -1;
if_modified_since = vv_time.len ? ngx_http_parse_time(vv_time.data, vv_time.len) : -1;
last_event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_LAST_EVENT_ID);
push_mode = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_MODE);
......@@ -137,12 +153,22 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription;
ngx_str_t *etag = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH);
ngx_int_t tag = ((etag != NULL) && ((tag = ngx_atoi(etag->data, etag->len)) != NGX_ERROR)) ? ngx_abs(tag) : -1;
ngx_str_t *etag = NULL, vv_etag = ngx_null_string;
ngx_int_t tag;
time_t greater_message_time;
ngx_int_t greater_message_tag = tag;
ngx_int_t greater_message_tag;
ngx_flag_t has_message_to_send = 0;
if (cf->last_received_message_tag != NULL) {
ngx_http_complex_value(r, cf->last_received_message_tag, &vv_etag);
etag = vv_etag.len ? &vv_etag : NULL;
} else {
etag = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH);
}
tag = ((etag != NULL) && ((tag = ngx_atoi(etag->data, etag->len)) != NGX_ERROR)) ? ngx_abs(tag) : -1;
greater_message_tag = tag;
greater_message_time = if_modified_since = (if_modified_since < 0) ? 0 : if_modified_since;
ngx_shmtx_lock(&shpool->mutex);
......
......@@ -156,6 +156,7 @@ ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *shm_data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_template_queue_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
ngx_http_push_stream_template_queue_t *cur = sentinel;
ngx_http_push_stream_msg_t *msg;
......@@ -216,6 +217,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->queue.next = NULL;
msg->id = id;
msg->workers_ref_count = 0;
msg->time = (id == -1) ? 0 : ngx_time();
msg->tag = (msg->time == shm_data->last_message_time) ? (shm_data->last_message_tag + 1) : 0;
if ((msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t)*ngx_http_push_stream_module_main_conf->qtd_templates)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
......@@ -301,8 +304,6 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
data->published_messages++;
// tag message with time stamp and a sequence tag
msg->time = ngx_time();
msg->tag = (msg->time == data->last_message_time) ? (data->last_message_tag + 1) : 0;
channel->last_message_time = data->last_message_time = msg->time;
channel->last_message_tag = data->last_message_tag = msg->tag;
// set message expiration time
......@@ -1033,17 +1034,28 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
u_char *txt = NULL, *last;
ngx_str_t *str = NULL;
u_char char_id[NGX_INT_T_LEN];
u_char char_id[NGX_INT_T_LEN + 1];
u_char tag[NGX_INT_T_LEN + 1];
u_char time[NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN];
u_char *channel_id = (channel != NULL) ? channel->id.data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
u_char *event_id = (message->event_id != NULL) ? message->event_id->data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
last = ngx_sprintf(char_id, "%d", message->id);
*last = '\0';
last = ngx_http_time(time, message->time);
*last = '\0';
last = ngx_sprintf(tag, "%d", message->tag);
*last = '\0';
txt = ngx_http_push_stream_str_replace(message_template->data, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID.data, char_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID.data, event_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.data, channel_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, text->data, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME.data, time, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG.data, tag, 0, temp_pool);
if (txt == NULL) {
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to replace message values on template");
......
......@@ -159,6 +159,8 @@ module BaseTestCase
@subscriber_eventsource = 'off'
@subscriber_mode = nil
@publisher_mode = nil
@last_received_message_time = nil
@last_received_message_tag = nil
self.send(:global_configuration) if self.respond_to?(:global_configuration)
end
......@@ -265,6 +267,9 @@ http {
<%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %>
<%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %>
<%= "push_stream_last_received_message_time #{@last_received_message_time};" unless @last_received_message_time.nil? %>
<%= "push_stream_last_received_message_tag #{@last_received_message_tag};" unless @last_received_message_tag.nil? %>
# max subscribers per channel
<%= "push_stream_max_subscribers_per_channel #{@max_subscribers_per_channel};" unless @max_subscribers_per_channel.nil? %>
# max messages to store in memory
......
require File.expand_path('base_test_case', File.dirname(__FILE__))
require 'time'
class TestPublishMessages < Test::Unit::TestCase
include BaseTestCase
......@@ -180,4 +181,73 @@ class TestPublishMessages < Test::Unit::TestCase
}
end
def config_test_expose_message_publish_time_through_message_template
@header_template = nil
@message_template = '{\"id\": \"~id~\", \"channel\": \"~channel~\", \"text\": \"~text~\", \"publish_time\": \"~time~\"}'
end
def test_expose_message_publish_time_through_message_template
headers = {'accept' => 'text/html'}
body = 'test message'
channel = 'ch_test_expose_message_publish_time_through_message_template'
response = ''
now = nil
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response = JSON.parse(chunk)
assert_equal(1, response["id"].to_i, "Wrong data received")
assert_equal(channel, response["channel"], "Wrong data received")
assert_equal(body, response["text"], "Wrong data received")
assert_equal(29, response["publish_time"].size, "Wrong data received")
publish_time = Time.parse(response["publish_time"])
assert_equal(now.to_i, publish_time.to_i, "Didn't receive the correct publish time")
EventMachine.stop
}
now = Time.now
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout
}
end
def config_test_expose_message_tag_through_message_template
@header_template = nil
@message_template = '{\"id\": \"~id~\", \"channel\": \"~channel~\", \"text\": \"~text~\", \"tag\": \"~tag~\"}'
end
def test_expose_message_tag_through_message_template
headers = {'accept' => 'text/html'}
body = 'test message'
channel = 'ch_test_expose_message_tag_through_message_template'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
lines = response.split('\r\n')
if lines.size > 1
lines.each_with_index do |line, i|
resp = JSON.parse(line)
assert_equal(i + 1, resp["id"].to_i, "Wrong data received")
assert_equal(channel, resp["channel"], "Wrong data received")
assert_equal(body, resp["text"], "Wrong data received")
assert_equal(i, resp["tag"].to_i, "Wrong data received")
end
end
EventMachine.stop
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout
}
end
end
......@@ -532,4 +532,46 @@ class TestSubscriberLongPolling < Test::Unit::TestCase
add_test_timeout
}
end
def config_test_send_modified_since_and_none_match_values_not_using_headers
@last_received_message_time = "$arg_time"
@last_received_message_tag = "$arg_tag"
end
def test_send_modified_since_and_none_match_values_not_using_headers
headers = {'accept' => 'application/json'}
channel = 'ch_test_send_modified_since_and_none_match_values_not_using_headers'
body = 'body'
response = ""
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
response += chunk
}
sub_1.callback { |chunk|
assert_equal("#{body}\r\n", response, "Wrong message")
time = sub_1.response_header['LAST_MODIFIED']
tag = sub_1.response_header['ETAG']
response = ""
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?time=' + time + '&tag=' + tag).get :head => headers, :timeout => 30
sub_2.stream { |chunk|
response += chunk
}
sub_2.callback { |chunk|
assert_equal("#{body} 1\r\n", response, "Wrong message")
EventMachine.stop
}
publish_message_inline(channel, {'accept' => 'text/html'}, body + " 1")
}
publish_message_inline(channel, {'accept' => 'text/html'}, body)
add_test_timeout
}
end
end
......@@ -544,4 +544,41 @@ class TestSubscriberPolling < Test::Unit::TestCase
add_test_timeout
}
end
def config_test_send_modified_since_and_none_match_values_not_using_headers_when_polling
@last_received_message_time = "$arg_time"
@last_received_message_tag = "$arg_tag"
end
def test_send_modified_since_and_none_match_values_not_using_headers_when_polling
headers = {'accept' => 'application/json'}
channel = 'ch_test_send_modified_since_and_none_match_values_not_using_headers_when_polling'
body = 'body'
response = ""
EventMachine.run {
publish_message_inline(channel, {'accept' => 'text/html'}, body)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.callback {
assert_equal("#{body}\r\n", sub_1.response, "Wrong message")
time = sub_1.response_header['LAST_MODIFIED']
tag = sub_1.response_header['ETAG']
publish_message_inline(channel, {'accept' => 'text/html'}, body + " 1")
response = ""
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?time=' + time + '&tag=' + tag).get :head => headers, :timeout => 30
sub_2.callback {
assert_equal("#{body} 1\r\n", sub_2.response, "Wrong message")
EventMachine.stop
}
}
add_test_timeout
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment