Commit 6b436e61 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

setting an event id to the message through header parameter

parent 48f04d14
......@@ -84,6 +84,7 @@ typedef struct {
ngx_flag_t deleted;
ngx_int_t id;
ngx_str_t *raw;
ngx_str_t *event_id;
ngx_str_t *formatted_messages;
} ngx_http_push_stream_msg_t;
......@@ -210,7 +211,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_SLASH = ngx_string("/");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_DATE_FORMAT_ISO_8601 = ngx_string("%4d-%02d-%02dT%02d:%02d:%02d");
//// headers
// headers
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID = ngx_string("Event-Id");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ALLOW = ngx_string("Allow");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EXPLAIN = ngx_string("X-Nginx-PushStream-Explain");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING = ngx_string("Transfer-Encoding");
......
......@@ -188,6 +188,7 @@ static const ngx_int_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID = -2;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT = ngx_string("Channel deleted");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string("~event-id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
......@@ -201,9 +202,10 @@ ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
// general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool);
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_chunk(const u_char *text, off_t len, ngx_pool_t *temp_pool);
......
......@@ -127,6 +127,7 @@ static void
ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
{
ngx_str_t *id;
ngx_str_t *event_id;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_buf_t *buf = NULL;
......@@ -184,6 +185,8 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
buf->start = buf->last;
}
event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
ngx_shmtx_lock(&shpool->mutex);
// just find the channel. if it's not there, NULL and return error.
......@@ -196,7 +199,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
}
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(buf, channel, channel->last_message_id + 1, r->pool);
msg = ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(buf, channel, channel->last_message_id + 1, event_id, r->pool);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(msg, NULL, r, "push stream module: unable to allocate message in shared memory");
channel->last_message_id++;
......
......@@ -677,7 +677,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
ngx_rbtree_init(&d->unrecoverable_channels, unrecoverable_sentinel, ngx_http_push_stream_rbtree_insert);
// create ping message
ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, ngx_cycle->pool);
ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, ngx_cycle->pool);
if (ngx_http_push_stream_ping_msg == NULL) {
return NGX_ERROR;
}
......
......@@ -123,14 +123,14 @@ ngx_http_push_stream_delete_worker_channel(void)
ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_pool_t *temp_pool)
ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool)
{
return ngx_http_push_stream_convert_char_to_msg_on_shared_locked(buf->pos, ngx_buf_size(buf), channel, id, temp_pool);
return ngx_http_push_stream_convert_char_to_msg_on_shared_locked(buf->pos, ngx_buf_size(buf), channel, id, event_id, temp_pool);
}
ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_pool_t *temp_pool)
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_template_queue_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
......@@ -142,6 +142,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
return NULL;
}
msg->event_id = NULL;
msg->formatted_messages = NULL;
if ((msg->raw = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + len + 1)) == NULL) {
......@@ -155,6 +156,18 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
// copy the message to shared memory
ngx_memcpy(msg->raw->data, data, len);
if (event_id != NULL) {
if ((msg->event_id = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + event_id->len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
msg->event_id->len = event_id->len;
msg->event_id->data = (u_char *) (msg->event_id + 1);
ngx_memset(msg->event_id->data, '\0', event_id->len + 1);
ngx_memcpy(msg->event_id->data, event_id->data, event_id->len);
}
msg->deleted = 0;
msg->expires = 0;
msg->queue.prev = NULL;
......@@ -229,6 +242,41 @@ ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t
return h;
}
static ngx_str_t *
ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name)
{
ngx_table_elt_t *h;
ngx_list_part_t *part;
ngx_uint_t i;
ngx_str_t *aux = NULL;
part = &r->headers_in.headers.part;
h = part->elts;
for (i = 0; /* void */; i++) {
if (i >= part->nelts) {
if (part->next == NULL) {
break;
}
part = part->next;
h = part->elts;
i = 0;
}
if ((h[i].key.len == header_name->len) && (ngx_strncasecmp(h[i].key.data, header_name->data, header_name->len) == 0)) {
aux = ngx_http_push_stream_create_str(r->pool, h[i].value.len);
if (aux != NULL) {
ngx_memcpy(aux->data, h[i].value.data, h[i].value.len);
}
break;
}
}
return aux;
}
static ngx_int_t
ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf)
{
......@@ -329,7 +377,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) {
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, 0, 0);
// apply channel deleted message text to message template
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data, ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, temp_pool)) == NULL) {
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data, ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to channel deleted message");
return;
......@@ -530,6 +578,7 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
}
if (msg->raw != NULL) ngx_slab_free_locked(shpool, msg->raw);
if (msg->event_id != NULL) ngx_slab_free_locked(shpool, msg->event_id);
if (msg != NULL) ngx_slab_free_locked(shpool, msg);
}
......@@ -722,6 +771,7 @@ ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_
return message->raw;
}
static ngx_str_t *
ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool)
{
......@@ -731,10 +781,12 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
u_char char_id[NGX_INT_T_LEN];
ngx_memset(char_id, '\0', NGX_INT_T_LEN);
u_char *channel_id = (channel != NULL) ? channel->id.data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
u_char *event_id = (message->event_id != NULL) ? message->event_id->data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
ngx_sprintf(char_id, "%d", message->id);
txt = ngx_http_push_stream_str_replace(message_template->data, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID.data, char_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID.data, event_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.data, channel_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, text->data, 0, temp_pool);
......@@ -747,6 +799,7 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to return message applied to template");
return NULL;
}
str->data = txt;
str->len = ngx_strlen(txt);
return str;
......
......@@ -124,4 +124,34 @@ class TestPublishMessages < Test::Unit::TestCase
end
}
end
def config_test_set_an_event_id_to_the_message_through_header_parameter
@header_template = nil
@message_template = '{\"id\": \"~id~\", \"channel\": \"~channel~\", \"text\": \"~text~\", \"event_id\": \"~event-id~\"}'
end
def test_set_an_event_id_to_the_message_through_header_parameter
event_id = 'event_id_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-Id' => event_id }
body = 'test message'
channel = 'ch_test_set_an_event_id_to_the_message_through_header_parameter'
response = ''
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
response = JSON.parse(chunk)
assert_equal(1, response["id"].to_i, "Wrong data received")
assert_equal(channel, response["channel"], "Wrong data received")
assert_equal(body, response["text"], "Wrong data received")
assert_equal(event_id, response["event_id"], "Wrong data received")
EventMachine.stop
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
add_test_timeout
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment