Commit 0a9dd7a0 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

sending messages (ping, header, event id, ...) with event source format

parent 6b436e61
...@@ -71,6 +71,7 @@ typedef struct { ...@@ -71,6 +71,7 @@ typedef struct {
ngx_msec_t buffer_cleanup_interval; ngx_msec_t buffer_cleanup_interval;
ngx_uint_t keepalive; ngx_uint_t keepalive;
ngx_uint_t publisher_admin; ngx_uint_t publisher_admin;
ngx_uint_t subscriber_eventsource;
} ngx_http_push_stream_loc_conf_t; } ngx_http_push_stream_loc_conf_t;
// shared memory segment name // shared memory segment name
...@@ -85,6 +86,7 @@ typedef struct { ...@@ -85,6 +86,7 @@ typedef struct {
ngx_int_t id; ngx_int_t id;
ngx_str_t *raw; ngx_str_t *raw;
ngx_str_t *event_id; ngx_str_t *event_id;
ngx_str_t *event_id_message;
ngx_str_t *formatted_messages; ngx_str_t *formatted_messages;
} ngx_http_push_stream_msg_t; } ngx_http_push_stream_msg_t;
......
...@@ -192,6 +192,12 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string ...@@ -192,6 +192,12 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX = ngx_string(": ");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX = ngx_string("data: ");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE = ngx_string("id: ~event-id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE = ngx_string("text/event-stream; charset=utf-8");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK = ngx_string("6" CRLF ": -1" CRLF CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_LAST_CHUNK = ngx_string("0" CRLF CRLF); static const ngx_str_t NGX_HTTP_PUSH_STREAM_LAST_CHUNK = ngx_string("0" CRLF CRLF);
ngx_event_t ngx_http_push_stream_ping_event; ngx_event_t ngx_http_push_stream_ping_event;
......
...@@ -319,7 +319,12 @@ ngx_http_push_stream_send_worker_ping_message(void) ...@@ -319,7 +319,12 @@ ngx_http_push_stream_send_worker_ping_message(void)
if ((ngx_http_push_stream_ping_msg != NULL) && (!ngx_queue_empty(&sentinel->queue))) { if ((ngx_http_push_stream_ping_msg != NULL) && (!ngx_queue_empty(&sentinel->queue))) {
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (cur->request != NULL) { if (cur->request != NULL) {
ngx_http_push_stream_send_response_message(cur->request, NULL, ngx_http_push_stream_ping_msg); ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(cur->request, ngx_http_push_stream_module);
if (pslcf->subscriber_eventsource) {
ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
} else {
ngx_http_push_stream_send_response_message(cur->request, NULL, ngx_http_push_stream_ping_msg);
}
} }
} }
} }
......
...@@ -170,6 +170,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -170,6 +170,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, publisher_admin), offsetof(ngx_http_push_stream_loc_conf_t, publisher_admin),
NULL }, NULL },
{ ngx_string("push_stream_subscriber_eventsource"),
NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, subscriber_eventsource),
NULL },
ngx_null_command ngx_null_command
}; };
...@@ -397,6 +403,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ...@@ -397,6 +403,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->buffer_cleanup_interval = NGX_CONF_UNSET_MSEC; lcf->buffer_cleanup_interval = NGX_CONF_UNSET_MSEC;
lcf->keepalive = NGX_CONF_UNSET_UINT; lcf->keepalive = NGX_CONF_UNSET_UINT;
lcf->publisher_admin = NGX_CONF_UNSET_UINT; lcf->publisher_admin = NGX_CONF_UNSET_UINT;
lcf->subscriber_eventsource = NGX_CONF_UNSET_UINT;
return lcf; return lcf;
} }
...@@ -426,6 +433,54 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -426,6 +433,54 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->buffer_cleanup_interval, prev->buffer_cleanup_interval, NGX_CONF_UNSET_MSEC); ngx_conf_merge_uint_value(conf->buffer_cleanup_interval, prev->buffer_cleanup_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_uint_value(conf->keepalive, prev->keepalive, 0); ngx_conf_merge_uint_value(conf->keepalive, prev->keepalive, 0);
ngx_conf_merge_uint_value(conf->publisher_admin, prev->publisher_admin, 0); ngx_conf_merge_uint_value(conf->publisher_admin, prev->publisher_admin, 0);
ngx_conf_merge_uint_value(conf->subscriber_eventsource, prev->subscriber_eventsource, 0);
// changing properties for event source support
if (conf->subscriber_eventsource) {
conf->content_type.data = NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE.data;
conf->content_type.len = NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE.len;
// formatting header template
if (conf->header_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_create_str(cf->pool, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.len + conf->header_template.len);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to append comment prefix to header template");
return NGX_CONF_ERROR;
}
u_char *last = ngx_copy(aux->data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.len);
last = ngx_copy(last, conf->header_template.data, conf->header_template.len);
conf->header_template.data = aux->data;
conf->header_template.len = aux->len;
}
// formatting message template
ngx_str_t *aux = (conf->message_template.len > 0) ? &conf->message_template : (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT;
ngx_str_t *template = ngx_http_push_stream_create_str(cf->pool, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX.len + aux->len + sizeof(CRLF) -1);
if (template == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to append message prefix to message template");
return NGX_CONF_ERROR;
}
u_char *last = ngx_copy(template->data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX.len);
last = ngx_copy(last, aux->data, aux->len);
ngx_memcpy(last, CRLF, 2);
conf->message_template.data = template->data;
conf->message_template.len = template->len;
// formatting footer template
if (conf->footer_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_create_str(cf->pool, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.len + conf->footer_template.len);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to append comment prefix to footer template");
return NGX_CONF_ERROR;
}
u_char *last = ngx_copy(aux->data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.len);
last = ngx_copy(last, conf->footer_template.data, conf->footer_template.len);
conf->footer_template.data = aux->data;
conf->footer_template.len = aux->len;
}
}
// sanity checks // sanity checks
......
...@@ -143,6 +143,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -143,6 +143,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
} }
msg->event_id = NULL; msg->event_id = NULL;
msg->event_id_message = NULL;
msg->formatted_messages = NULL; msg->formatted_messages = NULL;
if ((msg->raw = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + len + 1)) == NULL) { if ((msg->raw = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + len + 1)) == NULL) {
...@@ -166,6 +167,23 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -166,6 +167,23 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->event_id->data = (u_char *) (msg->event_id + 1); msg->event_id->data = (u_char *) (msg->event_id + 1);
ngx_memset(msg->event_id->data, '\0', event_id->len + 1); ngx_memset(msg->event_id->data, '\0', event_id->len + 1);
ngx_memcpy(msg->event_id->data, event_id->data, event_id->len); ngx_memcpy(msg->event_id->data, event_id->data, event_id->len);
u_char *aux = ngx_http_push_stream_str_replace(NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE.data, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID.data, event_id->data, 0, temp_pool);
if (aux == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
ngx_str_t *chunk = ngx_http_push_stream_get_formatted_chunk(aux, ngx_strlen(aux), temp_pool);
if ((chunk == NULL) || (msg->event_id_message = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + chunk->len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
msg->event_id_message->len = chunk->len;
msg->event_id_message->data = (u_char *) (msg->event_id_message + 1);
ngx_memset(msg->event_id_message->data, '\0', msg->event_id_message->len + 1);
ngx_memcpy(msg->event_id_message->data, chunk->data, msg->event_id_message->len);
} }
msg->deleted = 0; msg->deleted = 0;
...@@ -292,6 +310,12 @@ ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_htt ...@@ -292,6 +310,12 @@ ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_htt
static void static void
ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg) ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg)
{ {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
if (pslcf->subscriber_eventsource && (msg->event_id_message != NULL)) {
ngx_http_push_stream_send_response_text(r, msg->event_id_message->data, msg->event_id_message->len, 0);
}
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, msg, r->pool); ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, msg, r->pool);
if (str != NULL) { if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0); ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
...@@ -579,6 +603,7 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt ...@@ -579,6 +603,7 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
if (msg->raw != NULL) ngx_slab_free_locked(shpool, msg->raw); if (msg->raw != NULL) ngx_slab_free_locked(shpool, msg->raw);
if (msg->event_id != NULL) ngx_slab_free_locked(shpool, msg->event_id); if (msg->event_id != NULL) ngx_slab_free_locked(shpool, msg->event_id);
if (msg->event_id_message != NULL) ngx_slab_free_locked(shpool, msg->event_id_message);
if (msg != NULL) ngx_slab_free_locked(shpool, msg); if (msg != NULL) ngx_slab_free_locked(shpool, msg);
} }
......
...@@ -152,6 +152,7 @@ module BaseTestCase ...@@ -152,6 +152,7 @@ module BaseTestCase
@publisher_admin = 'off' @publisher_admin = 'off'
@channel_deleted_message_text = nil @channel_deleted_message_text = nil
@ping_message_text = nil @ping_message_text = nil
@subscriber_eventsource = 'off'
self.send(:global_configuration) if self.respond_to?(:global_configuration) self.send(:global_configuration) if self.respond_to?(:global_configuration)
end end
...@@ -254,7 +255,7 @@ http { ...@@ -254,7 +255,7 @@ http {
# activate publisher mode for this location # activate publisher mode for this location
push_stream_publisher; push_stream_publisher;
# activate publisher mode for this location # activate publisher admin mode for this location
<%= "push_stream_publisher_admin #{@publisher_admin};" unless @publisher_admin.nil? %> <%= "push_stream_publisher_admin #{@publisher_admin};" unless @publisher_admin.nil? %>
# query string based channel id # query string based channel id
...@@ -284,6 +285,9 @@ http { ...@@ -284,6 +285,9 @@ http {
# activate subscriber mode for this location # activate subscriber mode for this location
push_stream_subscriber; push_stream_subscriber;
# activate eventsource support for this location
<%= "push_stream_subscriber_eventsource #{@subscriber_eventsource};" unless @subscriber_eventsource.nil? %>
# positional channel path # positional channel path
set $push_stream_channels_path $1; set $push_stream_channels_path $1;
<%= "push_stream_max_channel_id_length #{@max_channel_id_length};" unless @max_channel_id_length.nil? %> <%= "push_stream_max_channel_id_length #{@max_channel_id_length};" unless @max_channel_id_length.nil? %>
......
require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestEventSource < Test::Unit::TestCase
include BaseTestCase
def global_configuration
@subscriber_eventsource = 'on'
@header_template = nil
@message_template = nil
end
def config_test_content_type_should_be_event_stream
@header_template = "header"
end
def test_content_type_should_be_event_stream
channel = 'ch_test_content_type_should_be_event_stream'
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
assert_equal("text/event-stream; charset=utf-8", sub.response_header["CONTENT_TYPE"], "wrong content-type")
EventMachine.stop
}
add_test_timeout
}
end
def test_default_message_template_without_event_id
headers = {'accept' => 'text/html'}
body = 'test message'
channel = 'ch_test_default_message_template_without_event_id'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
if response.include?("\r\n\r\n")
assert_equal("data: #{body}\r\n\r\n", response, "The published message was not received correctly")
EventMachine.stop
end
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout
}
end
def test_default_message_template_with_event_id
event_id = 'event_id_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-iD' => event_id }
body = 'test message'
channel = 'ch_test_default_message_template_with_event_id'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
if response.include?("\r\n\r\n")
assert_equal("id: #{event_id}\r\ndata: #{body}\r\n\r\n", response, "The published message was not received correctly")
EventMachine.stop
end
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout
}
end
def config_test_custom_message_template_without_event_id
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\"}'
end
def test_custom_message_template_without_event_id
headers = {'accept' => 'text/html'}
body = 'test message'
channel = 'ch_test_custom_message_template_without_event_id'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
if response.include?("\r\n\r\n")
assert_equal(%(data: {"id":"1", "message":"#{body}"}\r\n\r\n), response, "The published message was not received correctly")
EventMachine.stop
end
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout
}
end
def config_test_custom_message_template_with_event_id
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\"}'
end
def test_custom_message_template_with_event_id
event_id = 'event_id_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-iD' => event_id }
body = 'test message'
channel = 'ch_test_custom_message_template_with_event_id'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
if response.include?("\r\n\r\n")
assert_equal(%(id: #{event_id}\r\ndata: {"id":"1", "message":"#{body}"}\r\n\r\n), response, "The published message was not received correctly")
EventMachine.stop
end
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment