Commit eef068c7 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding event-type feature to the message to be ok with Event Source specification

parent af255bf4
* Adding event type feature to Event Source support
* Adding tag and time available at message template, and make possible pass these values whithout set headers
* Adding a reference count to the message to avoid discard it before be processed for all workers
* Improvement on memory usage to reuse chains and buffers
......
......@@ -817,7 +817,7 @@ h2(#push_stream_message_template). push_stream_message_template
*context:* _location (push_stream_subscriber)_
The text template that will be used to format the message before be sended to subscribers. The template can contain any number of the reserved words: ==~id~, ~text~, ~channel~, ~time~, ~tag~ and ~event-id~, example: "<script>p(~id~,'~channel~','~text~', ~tag~, '~time~');</script>"==
The text template that will be used to format the message before be sended to subscribers. The template can contain any number of the reserved words: ==~id~, ~text~, ~channel~, ~time~, ~tag~, ~event-id~ and ~event-type~, example: "<script>p(~id~,'~channel~','~text~', ~tag~, '~time~');</script>"==
h2(#push_stream_footer_template). push_stream_footer_template
......@@ -879,7 +879,7 @@ h2(#push_stream_eventsource_support). push_stream_eventsource_support
*release version:* _0.3.0_
Enable "Event Source":eventsource support for subscribers.
Enable "Event Source":eventsource support for subscribers. Using headers Event-ID and Event-Type on publish is possible to set values to _id:_ and _event:_ attributes on message sent to subscribers.
h2(#push_stream_ping_message_interval). push_stream_ping_message_interval
......
......@@ -116,7 +116,9 @@ typedef struct {
ngx_str_t *raw;
ngx_int_t tag;
ngx_str_t *event_id;
ngx_str_t *event_type;
ngx_str_t *event_id_message;
ngx_str_t *event_type_message;
ngx_str_t *formatted_messages;
ngx_int_t workers_ref_count;
} ngx_http_push_stream_msg_t;
......@@ -256,6 +258,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_DATE_FORMAT_ISO_8601 = ngx_string("
// headers
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID = ngx_string("Event-Id");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE = ngx_string("Event-Type");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_LAST_EVENT_ID = ngx_string("Last-Event-Id");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ALLOW = ngx_string("Allow");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EXPLAIN = ngx_string("X-Nginx-PushStream-Explain");
......
......@@ -194,6 +194,7 @@ static const ngx_int_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID = -2;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string("~event-id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE = ngx_string("~event-type~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG = ngx_string("~tag~");
......@@ -203,6 +204,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_DEFAULT_HEADER_TEMPLATE
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE = ngx_string(": ~text~\r\n");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX = ngx_string("data: ");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE = ngx_string("id: ~event-id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_EVENT_TEMPLATE = ngx_string("event: ~event-type~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE = ngx_string("text/event-stream; charset=utf-8");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK = ngx_string("6" CRLF ": -1" CRLF CRLF);
......@@ -216,7 +218,7 @@ ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
// general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r);
static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool);
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
......@@ -241,7 +243,7 @@ ngx_chain_t * ngx_http_push_stream_get_buf(ngx_http_request_t *r);
static void ngx_http_push_stream_complex_value(ngx_http_request_t *r, ngx_http_complex_value_t *val, ngx_str_t *value);
ngx_http_push_stream_channel_t *ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool);
ngx_http_push_stream_channel_t *ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
......
......@@ -137,7 +137,7 @@ static void
ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
{
ngx_str_t *id;
ngx_str_t *event_id;
ngx_str_t *event_id, *event_type;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL;
ngx_chain_t *chain;
......@@ -196,8 +196,9 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
}
event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
event_type = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE);
channel = ngx_http_push_stream_add_msg_to_channel(r, id, buf->pos, ngx_buf_size(buf), event_id, temp_pool);
channel = ngx_http_push_stream_add_msg_to_channel(r, id, buf->pos, ngx_buf_size(buf), event_id, event_type, temp_pool);
if (channel == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
......
......@@ -917,7 +917,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
ngx_rbtree_init(&d->unrecoverable_channels, unrecoverable_sentinel, ngx_http_push_stream_rbtree_insert);
// create ping message
if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, ngx_cycle->pool)) == NULL) {
if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
return NGX_ERROR;
}
......
......@@ -152,9 +152,42 @@ ngx_http_push_stream_delete_worker_channel(void)
}
}
ngx_uint_t
ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_message, ngx_str_t *text, const ngx_str_t *template, const ngx_str_t *token, ngx_slab_pool_t *shpool, ngx_pool_t *temp_pool)
{
u_char *last;
if (text != NULL) {
if ((*dst_value = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + text->len + 1)) == NULL) {
return NGX_ERROR;
}
(*dst_value)->len = text->len;
(*dst_value)->data = (u_char *) ((*dst_value) + 1);
last = ngx_copy((*dst_value)->data, text->data, text->len);
*last = '\0';
u_char *aux = ngx_http_push_stream_str_replace(template->data, token->data, text->data, 0, temp_pool);
if (aux == NULL) {
return NGX_ERROR;
}
ngx_str_t *chunk = ngx_http_push_stream_get_formatted_chunk(aux, ngx_strlen(aux), temp_pool);
if ((chunk == NULL) || ((*dst_message) = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + chunk->len + 1)) == NULL) {
return NGX_ERROR;
}
(*dst_message)->len = chunk->len;
(*dst_message)->data = (u_char *) ((*dst_message) + 1);
last = ngx_copy((*dst_message)->data, chunk->data, (*dst_message)->len);
*last = '\0';
}
return NGX_OK;
}
ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool)
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *shm_data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
......@@ -169,7 +202,9 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
}
msg->event_id = NULL;
msg->event_type = NULL;
msg->event_id_message = NULL;
msg->event_type_message = NULL;
msg->formatted_messages = NULL;
if ((msg->raw = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + len + 1)) == NULL) {
......@@ -183,34 +218,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
last = ngx_copy(msg->raw->data, data, len);
*last = '\0';
if (event_id != NULL) {
if ((msg->event_id = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + event_id->len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
msg->event_id->len = event_id->len;
msg->event_id->data = (u_char *) (msg->event_id + 1);
last = ngx_copy(msg->event_id->data, event_id->data, event_id->len);
*last = '\0';
u_char *aux = ngx_http_push_stream_str_replace(NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE.data, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID.data, event_id->data, 0, temp_pool);
if (aux == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
ngx_str_t *chunk = ngx_http_push_stream_get_formatted_chunk(aux, ngx_strlen(aux), temp_pool);
if ((chunk == NULL) || (msg->event_id_message = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + chunk->len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
msg->event_id_message->len = chunk->len;
msg->event_id_message->data = (u_char *) (msg->event_id_message + 1);
last = ngx_copy(msg->event_id_message->data, chunk->data, msg->event_id_message->len);
*last = '\0';
}
ngx_http_push_stream_apply_text_template(&msg->event_id, &msg->event_id_message, event_id, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID, shpool, temp_pool);
ngx_http_push_stream_apply_text_template(&msg->event_type, &msg->event_type_message, event_type, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_EVENT_TEMPLATE, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE, shpool, temp_pool);
msg->deleted = 0;
msg->expires = 0;
......@@ -275,7 +284,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
ngx_http_push_stream_channel_t *
ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool)
ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
......@@ -294,7 +303,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
}
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(text, len, channel, channel->last_message_id + 1, event_id, temp_pool);
msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(text, len, channel, channel->last_message_id + 1, event_id, event_type, temp_pool);
if (msg == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate message in shared memory");
......@@ -426,8 +435,14 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_int_t rc = NGX_OK;
if (pslcf->eventsource_support && (msg->event_id_message != NULL)) {
rc = ngx_http_push_stream_send_response_text(r, msg->event_id_message->data, msg->event_id_message->len, 0);
if (pslcf->eventsource_support) {
if (msg->event_id_message != NULL) {
rc = ngx_http_push_stream_send_response_text(r, msg->event_id_message->data, msg->event_id_message->len, 0);
}
if ((rc == NGX_OK) && (msg->event_type_message != NULL)) {
rc = ngx_http_push_stream_send_response_text(r, msg->event_type_message->data, msg->event_type_message->len, 0);
}
}
if (rc != NGX_ERROR) {
......@@ -673,7 +688,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, 0, 0);
// apply channel deleted message text to message template
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data, ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, temp_pool)) == NULL) {
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data, ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to channel deleted message");
return;
......@@ -893,7 +908,9 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
if (msg->raw != NULL) ngx_slab_free_locked(shpool, msg->raw);
if (msg->event_id != NULL) ngx_slab_free_locked(shpool, msg->event_id);
if (msg->event_type != NULL) ngx_slab_free_locked(shpool, msg->event_type);
if (msg->event_id_message != NULL) ngx_slab_free_locked(shpool, msg->event_id_message);
if (msg->event_type_message != NULL) ngx_slab_free_locked(shpool, msg->event_type_message);
if (msg != NULL) ngx_slab_free_locked(shpool, msg);
}
......@@ -1066,6 +1083,7 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
u_char *channel_id = (channel != NULL) ? channel->id.data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
u_char *event_id = (message->event_id != NULL) ? message->event_id->data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
u_char *event_type = (message->event_type != NULL) ? message->event_type->data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
last = ngx_sprintf(char_id, "%d", message->id);
*last = '\0';
......@@ -1078,6 +1096,7 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
txt = ngx_http_push_stream_str_replace(message_template->data, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID.data, char_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID.data, event_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE.data, event_type, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.data, channel_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, text->data, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME.data, time, 0, temp_pool);
......
......@@ -270,7 +270,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscription_t *subscription = (ngx_http_push_stream_subscription_t *)ngx_queue_head(&ctx->subscriber->subscriptions_sentinel.queue);
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, frame.payload, frame.payload_len, NULL, temp_pool) == NULL) {
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, frame.payload, frame.payload_len, NULL, NULL, temp_pool) == NULL) {
ngx_http_finalize_request(r, 0);
return;
}
......
......@@ -136,6 +136,35 @@ class TestPublishMessages < Test::Unit::TestCase
}
end
def config_test_set_an_event_type_to_the_message_through_header_parameter
@header_template = nil
@message_template = '{\"id\": \"~id~\", \"channel\": \"~channel~\", \"text\": \"~text~\", \"event_type\": \"~event-type~\"}'
end
def test_set_an_event_type_to_the_message_through_header_parameter
event_type = 'event_type_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-type' => event_type }
body = 'test message'
channel = 'ch_test_set_an_event_type_to_the_message_through_header_parameter'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response = JSON.parse(chunk)
assert_equal(1, response["id"].to_i, "Wrong data received")
assert_equal(channel, response["channel"], "Wrong data received")
assert_equal(body, response["text"], "Wrong data received")
assert_equal(event_type, response["event_type"], "Wrong data received")
EventMachine.stop
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout
}
end
def config_test_ignore_event_id_header_parameter_with_not_match_exactly
@header_template = nil
@message_template = '{\"id\": \"~id~\", \"channel\": \"~channel~\", \"text\": \"~text~\", \"event_id\": \"~event-id~\"}'
......
......@@ -128,6 +128,28 @@ class TestSubscriberEventSource < Test::Unit::TestCase
}
end
def test_default_message_template_without_event_type
headers = {'accept' => 'text/html'}
body = 'test message'
channel = 'ch_test_default_message_template_without_event_type'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
if response.include?("\r\n\r\n")
assert_equal(":\r\ndata: #{body}\r\n\r\n", response, "The published message was not received correctly")
EventMachine.stop
end
}
publish_message_inline(channel, headers, body)
add_test_timeout
}
end
def test_default_message_template_with_event_id
event_id = 'event_id_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-Id' => event_id }
......@@ -151,6 +173,29 @@ class TestSubscriberEventSource < Test::Unit::TestCase
}
end
def test_default_message_template_with_event_type
event_type = 'event_type_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-type' => event_type }
body = 'test message'
channel = 'ch_test_default_message_template_with_event_type'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
if response.include?("\r\n\r\n")
assert_equal(":\r\nevent: #{event_type}\r\ndata: #{body}\r\n\r\n", response, "The published message was not received correctly")
EventMachine.stop
end
}
publish_message_inline(channel, headers, body)
add_test_timeout
}
end
def config_test_custom_message_template_without_event_id
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\"}'
end
......@@ -177,6 +222,32 @@ class TestSubscriberEventSource < Test::Unit::TestCase
}
end
def config_test_custom_message_template_without_event_type
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\"}'
end
def test_custom_message_template_without_event_type
headers = {'accept' => 'text/html'}
body = 'test message'
channel = 'ch_test_custom_message_template_without_event_type'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
if response.include?("\r\n\r\n")
assert_equal(%(:\r\ndata: {"id":"1", "message":"#{body}"}\r\n\r\n), response, "The published message was not received correctly")
EventMachine.stop
end
}
publish_message_inline(channel, headers, body)
add_test_timeout
}
end
def config_test_custom_message_template_with_event_id
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\"}'
end
......@@ -204,6 +275,33 @@ class TestSubscriberEventSource < Test::Unit::TestCase
}
end
def config_test_custom_message_template_with_event_type
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\"}'
end
def test_custom_message_template_with_event_type
event_type = 'event_type_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-type' => event_type }
body = 'test message'
channel = 'ch_test_custom_message_template_with_event_type'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response += chunk
if response.include?("\r\n\r\n")
assert_equal(%(:\r\nevent: #{event_type}\r\ndata: {"id":"1", "message":"#{body}"}\r\n\r\n), response, "The published message was not received correctly")
EventMachine.stop
end
}
publish_message_inline(channel, headers, body)
add_test_timeout
}
end
def test_each_line_on_posted_message_should_be_applied_to_template
headers = {'accept' => 'text/html'}
body = "line 1\nline 2\rline 3\r\nline 4"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment