Commit 7eb716b5 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

split lines before apply event source message pattern

parent 45828641
......@@ -36,6 +36,7 @@ typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_str_t *template;
ngx_uint_t index;
ngx_flag_t eventsource;
} ngx_http_push_stream_template_queue_t;
typedef struct {
......@@ -71,7 +72,7 @@ typedef struct {
ngx_msec_t buffer_cleanup_interval;
ngx_uint_t keepalive;
ngx_uint_t publisher_admin;
ngx_uint_t subscriber_eventsource;
ngx_flag_t subscriber_eventsource;
} ngx_http_push_stream_loc_conf_t;
// shared memory segment name
......@@ -190,7 +191,7 @@ static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix);
static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template);
static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID = ngx_string("ALL");
......
......@@ -29,6 +29,11 @@
#include <ngx_http_push_stream_module.h>
#include <ngx_http_push_stream_module_ipc.h>
typedef struct {
ngx_queue_t queue;
ngx_str_t *line;
} ngx_http_push_stream_line_t;
typedef struct {
char *subtype;
size_t len;
......@@ -192,7 +197,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX = ngx_string(": ");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE = ngx_string(": ~text~\r\n");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX = ngx_string("data: ");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE = ngx_string("id: ~event-id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE = ngx_string("text/event-stream; charset=utf-8");
......@@ -217,6 +222,7 @@ static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char
static ngx_str_t * ngx_http_push_stream_get_formatted_chunk(const u_char *text, off_t len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer);
......@@ -251,6 +257,10 @@ static ngx_inline void ngx_http_push_stream_delete_worker_channel(void);
static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request_t *r, ngx_uint_t default_subtype);
static ngx_http_push_stream_line_t * ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_join_with_crlf(ngx_http_push_stream_line_t *lines, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_current_time(ngx_pool_t *pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool);
......
......@@ -315,7 +315,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
}
static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template) {
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource) {
ngx_http_push_stream_template_queue_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
ngx_http_push_stream_template_queue_t *cur = sentinel;
ngx_str_t *aux = NULL;
......@@ -335,6 +335,7 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template) {
return -1;
}
cur->template = aux;
cur->eventsource = eventsource;
cur->index = ngx_http_push_stream_module_main_conf->qtd_templates;
ngx_memcpy(cur->template->data, template.data, template.len);
ngx_queue_insert_tail(&ngx_http_push_stream_module_main_conf->msg_templates.queue, &cur->queue);
......
......@@ -433,7 +433,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->buffer_cleanup_interval, prev->buffer_cleanup_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_uint_value(conf->keepalive, prev->keepalive, 0);
ngx_conf_merge_uint_value(conf->publisher_admin, prev->publisher_admin, 0);
ngx_conf_merge_uint_value(conf->subscriber_eventsource, prev->subscriber_eventsource, 0);
ngx_conf_merge_value(conf->subscriber_eventsource, prev->subscriber_eventsource, 0);
// changing properties for event source support
if (conf->subscriber_eventsource) {
......@@ -442,13 +442,11 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// formatting header template
if (conf->header_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_create_str(cf->pool, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.len + conf->header_template.len);
ngx_str_t *aux = ngx_http_push_stream_apply_template_to_each_line(&conf->header_template, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE, cf->pool);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to append comment prefix to header template");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_message_module failed to apply template to header message.");
return NGX_CONF_ERROR;
}
u_char *last = ngx_copy(aux->data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.len);
last = ngx_copy(last, conf->header_template.data, conf->header_template.len);
conf->header_template.data = aux->data;
conf->header_template.len = aux->len;
}
......@@ -469,13 +467,11 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// formatting footer template
if (conf->footer_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_create_str(cf->pool, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.len + conf->footer_template.len);
ngx_str_t *aux = ngx_http_push_stream_apply_template_to_each_line(&conf->footer_template, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE, cf->pool);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to append comment prefix to footer template");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_message_module failed to apply template to footer message.");
return NGX_CONF_ERROR;
}
u_char *last = ngx_copy(aux->data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX.len);
last = ngx_copy(last, conf->footer_template.data, conf->footer_template.len);
conf->footer_template.data = aux->data;
conf->footer_template.len = aux->len;
......@@ -583,7 +579,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->footer_template.len = aux->len;
}
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template);
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, conf->subscriber_eventsource);
// calc buffer cleanup interval
if (conf->buffer_timeout != NGX_CONF_UNSET) {
......
......@@ -197,11 +197,30 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
return NULL;
}
while ((cur = (ngx_http_push_stream_template_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_str_t *aux = ngx_http_push_stream_format_message(channel, msg, msg->raw, cur->template, temp_pool);
ngx_str_t *aux = NULL;
if (cur->eventsource) {
ngx_http_push_stream_line_t *lines, *cur_line;
if ((lines = ngx_http_push_stream_split_by_crlf(msg->raw, temp_pool)) == NULL) {
return NULL;
}
cur_line = lines;
while ((cur_line = (ngx_http_push_stream_line_t *) ngx_queue_next(&cur_line->queue)) != lines) {
if ((cur_line->line = ngx_http_push_stream_format_message(channel, msg, cur_line->line, cur->template, temp_pool)) == NULL) {
break;
}
}
aux = ngx_http_push_stream_join_with_crlf(lines, temp_pool);
} else {
aux = ngx_http_push_stream_format_message(channel, msg, msg->raw, cur->template, temp_pool);
}
if (aux == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
ngx_str_t *chunk = ngx_http_push_stream_get_formatted_chunk(aux->data, aux->len, temp_pool);
ngx_str_t *formmated = (msg->formatted_messages + i);
......@@ -940,3 +959,116 @@ ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len)
}
return aux;
}
static ngx_http_push_stream_line_t *
ngx_http_push_stream_add_line_to_queue(ngx_http_push_stream_line_t *sentinel, u_char *text, u_int len, ngx_pool_t *temp_pool) {
ngx_http_push_stream_line_t *cur = NULL;
ngx_str_t *line;
if (len > 0) {
cur = ngx_pcalloc(temp_pool, sizeof(ngx_http_push_stream_line_t));
line = ngx_http_push_stream_create_str(temp_pool, len);
if ((cur == NULL) || (line == NULL)) {
return NULL;
}
cur->line = line;
ngx_memcpy(cur->line->data, text, len);
ngx_queue_insert_tail(&sentinel->queue, &cur->queue);
}
return cur;
}
static ngx_http_push_stream_line_t *
ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool) {
ngx_http_push_stream_line_t *sentinel = NULL;
u_char *pos = NULL, *start = NULL, *crlf_pos, *cr_pos, *lf_pos;
u_int step = 0, len = 0;
if ((sentinel = ngx_pcalloc(temp_pool, sizeof(ngx_http_push_stream_line_t))) == NULL) {
return NULL;
}
ngx_queue_init(&sentinel->queue);
start = msg->data;
do {
crlf_pos = (u_char *) ngx_strstr(start, CRLF);
cr_pos = (u_char *) ngx_strstr(start, "\r");
lf_pos = (u_char *) ngx_strstr(start, "\n");
pos = crlf_pos;
step = 2;
if ((pos == NULL) || (cr_pos < pos)) {
pos = cr_pos;
step = 1;
}
if ((pos == NULL) || (lf_pos < pos)) {
pos = lf_pos;
step = 1;
}
if (pos != NULL) {
len = pos - start;
if ((len > 0) && (ngx_http_push_stream_add_line_to_queue(sentinel, start, len, temp_pool) == NULL)) {
return NULL;
}
start = pos + step;
}
} while (pos != NULL);
len = (msg->data + msg->len) - start;
if ((len > 0) && (ngx_http_push_stream_add_line_to_queue(sentinel, start, len, temp_pool) == NULL)) {
return NULL;
}
return sentinel;
}
static ngx_str_t *
ngx_http_push_stream_join_with_crlf(ngx_http_push_stream_line_t *lines, ngx_pool_t *temp_pool) {
ngx_http_push_stream_line_t *cur;
ngx_str_t *result = NULL, *tmp = &NGX_HTTP_PUSH_STREAM_EMPTY;
if (ngx_queue_empty(&lines->queue)) {
return &NGX_HTTP_PUSH_STREAM_EMPTY;
}
cur = lines;
while ((cur = (ngx_http_push_stream_line_t *) ngx_queue_next(&cur->queue)) != lines) {
if ((cur->line == NULL) || (result = ngx_http_push_stream_create_str(temp_pool, tmp->len + cur->line->len)) == NULL) {
return NULL;
}
ngx_memcpy(result->data, tmp->data, tmp->len);
ngx_memcpy((result->data + tmp->len), cur->line->data, cur->line->len);
tmp = result;
}
return result;
}
static ngx_str_t *
ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool){
ngx_http_push_stream_line_t *lines, *cur;
ngx_str_t *result = NULL;
lines = ngx_http_push_stream_split_by_crlf(text, temp_pool);
if (lines != NULL) {
cur = lines;
while ((cur = (ngx_http_push_stream_line_t *) ngx_queue_next(&cur->queue)) != lines) {
cur->line->data = ngx_http_push_stream_str_replace(message_template->data, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, cur->line->data, 0, temp_pool);
if (cur->line->data == NULL) {
return NULL;
}
cur->line->len = ngx_strlen(cur->line->data);
}
result = ngx_http_push_stream_join_with_crlf(lines, temp_pool);
}
return result;
}
......@@ -7,6 +7,7 @@ class TestEventSource < Test::Unit::TestCase
@subscriber_eventsource = 'on'
@header_template = nil
@message_template = nil
@footer_template = nil
@ping_message_interval = nil
end
......@@ -27,6 +28,84 @@ class TestEventSource < Test::Unit::TestCase
}
end
def config_test_each_line_on_header_template_should_be_prefixed_by_a_colon
@header_template = "header line 1\nheader line 2\rheader line 3\r\nheader line 4"
end
def test_each_line_on_header_template_should_be_prefixed_by_a_colon
channel = 'ch_test_each_line_on_header_template_should_be_prefixed_by_a_colon'
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
assert_equal(": header line 1\r\n: header line 2\r\n: header line 3\r\n: header line 4\r\n\r\n", chunk, "Wrong header")
EventMachine.stop
}
add_test_timeout
}
end
def config_test_escaped_new_lines_on_header_template_should_be_treated_as_single_line
@header_template = "header line 1\\\\nheader line 2"
end
def test_escaped_new_lines_on_header_template_should_be_treated_as_single_line
channel = 'ch_test_escaped_new_lines_on_header_template_should_be_treated_as_single_line'
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
assert_equal(": header line 1\\nheader line 2\r\n\r\n", chunk, "Wrong header")
EventMachine.stop
}
add_test_timeout
}
end
def config_test_each_line_on_footer_template_should_be_prefixed_by_a_colon
@footer_template = "footer line 1\nfooter line 2\rfooter line 3\r\nfooter line 4"
@subscriber_connection_timeout = '1s'
end
def test_each_line_on_footer_template_should_be_prefixed_by_a_colon
channel = 'ch_test_each_line_on_footer_template_should_be_prefixed_by_a_colon'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
}
sub.callback {
assert_equal(": footer line 1\r\n: footer line 2\r\n: footer line 3\r\n: footer line 4\r\n\r\n", response, "Wrong footer")
EventMachine.stop
}
add_test_timeout
}
end
def config_test_escaped_new_lines_on_footer_template_should_be_treated_as_single_line
@footer_template = "footer line 1\\\\nfooter line 2"
@subscriber_connection_timeout = '1s'
end
def test_escaped_new_lines_on_footer_template_should_be_treated_as_single_line
channel = 'ch_test_escaped_new_lines_on_footer_template_should_be_treated_as_single_line'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
}
sub.callback {
assert_equal(": footer line 1\\nfooter line 2\r\n\r\n", response, "Wrong footer")
EventMachine.stop
}
add_test_timeout
}
end
def test_default_message_template_without_event_id
headers = {'accept' => 'text/html'}
body = 'test message'
......@@ -125,6 +204,43 @@ class TestEventSource < Test::Unit::TestCase
}
end
def test_each_line_on_posted_message_should_be_applied_to_template
headers = {'accept' => 'text/html'}
body = "line 1\nline 2\rline 3\r\nline 4"
channel = 'ch_test_each_line_on_posted_message_should_be_applied_to_template'
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
assert_equal("data: line 1\r\ndata: line 2\r\ndata: line 3\r\ndata: line 4\r\n\r\n", chunk, "Wrong data message")
EventMachine.stop
}
publish_message_inline(channel, headers, body)
add_test_timeout
}
end
def test_escaped_new_lines_on_posted_message_should_be_treated_as_single_line
headers = {'accept' => 'text/html'}
body = "line 1\\nline 2"
channel = 'ch_test_escaped_new_lines_on_posted_message_should_be_treated_as_single_line'
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
assert_equal("data: line 1\\nline 2\r\n\r\n", chunk, "Wrong data message")
EventMachine.stop
}
publish_message_inline(channel, headers, body)
add_test_timeout
}
end
def config_test_ping_message_on_event_source
@ping_message_interval = '1s'
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\"}'
......@@ -194,5 +310,4 @@ class TestEventSource < Test::Unit::TestCase
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment