Commit 7777234d authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

fix bug to accept different message templates

parent 69415086
...@@ -31,10 +31,19 @@ ...@@ -31,10 +31,19 @@
#include <ngx_http.h> #include <ngx_http.h>
#include <nginx.h> #include <nginx.h>
// template queue
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_str_t template;
ngx_uint_t index;
} ngx_http_push_stream_msg_template_t;
typedef struct { typedef struct {
size_t shm_size; size_t shm_size;
ngx_msec_t memory_cleanup_interval; ngx_msec_t memory_cleanup_interval;
time_t memory_cleanup_timeout; time_t memory_cleanup_timeout;
ngx_uint_t qtd_templates;
ngx_http_push_stream_msg_template_t msg_templates;
} ngx_http_push_stream_main_conf_t; } ngx_http_push_stream_main_conf_t;
typedef struct { typedef struct {
...@@ -47,6 +56,7 @@ typedef struct { ...@@ -47,6 +56,7 @@ typedef struct {
ngx_uint_t max_channel_id_length; ngx_uint_t max_channel_id_length;
ngx_str_t header_template; ngx_str_t header_template;
ngx_str_t message_template; ngx_str_t message_template;
ngx_int_t message_template_index;
ngx_str_t content_type; ngx_str_t content_type;
ngx_msec_t ping_message_interval; ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_disconnect_interval; ngx_msec_t subscriber_disconnect_interval;
...@@ -67,6 +77,9 @@ typedef struct { ...@@ -67,6 +77,9 @@ typedef struct {
ngx_buf_t *buf; ngx_buf_t *buf;
time_t expires; time_t expires;
ngx_flag_t deleted; ngx_flag_t deleted;
ngx_int_t id;
ngx_str_t raw;
ngx_str_t *formatted_messages;
} ngx_http_push_stream_msg_t; } ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t; typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t;
...@@ -166,6 +179,8 @@ static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http ...@@ -166,6 +179,8 @@ static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r); static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r); static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID = ngx_string("ALL"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID = ngx_string("ALL");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE = ngx_string("No channel id provided."); static const ngx_str_t NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE = ngx_string("No channel id provided.");
......
...@@ -179,7 +179,7 @@ static ngx_http_push_stream_content_subtype_t subtypes[] = { ...@@ -179,7 +179,7 @@ static ngx_http_push_stream_content_subtype_t subtypes[] = {
&NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_YAML } &NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_YAML }
}; };
static const ngx_str_t NGX_PUSH_STREAM_PING_MESSAGE_ID = ngx_string("-1"); static const ngx_int_t NGX_PUSH_STREAM_PING_MESSAGE_ID = -1;
static const ngx_str_t NGX_PUSH_STREAM_PING_MESSAGE_TEXT = ngx_string(""); static const ngx_str_t NGX_PUSH_STREAM_PING_MESSAGE_TEXT = ngx_string("");
static const ngx_str_t NGX_PUSH_STREAM_PING_CHANNEL_ID = ngx_string(""); static const ngx_str_t NGX_PUSH_STREAM_PING_CHANNEL_ID = ngx_string("");
...@@ -192,14 +192,16 @@ ngx_event_t ngx_http_push_stream_disconnect_event; ...@@ -192,14 +192,16 @@ ngx_event_t ngx_http_push_stream_disconnect_event;
ngx_event_t ngx_http_push_stream_memory_cleanup_event; ngx_event_t ngx_http_push_stream_memory_cleanup_event;
ngx_event_t ngx_http_push_stream_buffer_cleanup_event; ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
ngx_buf_t *ngx_http_push_stream_ping_buf = NULL; ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
// general request handling // general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf); ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_pool_t *temp_pool);
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value); static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message); static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool); static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool);
static ngx_buf_t * ngx_http_push_stream_get_formatted_message(ngx_http_push_stream_loc_conf_t *pslcf, ngx_http_push_stream_channel_t *channel, ngx_buf_t *buf, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t message_template, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_send_response_chunk(ngx_http_request_t *r, const u_char *chunk_text, uint chunk_len, ngx_flag_t last_buffer); static ngx_int_t ngx_http_push_stream_send_response_chunk(ngx_http_request_t *r, const u_char *chunk_text, uint chunk_len, ngx_flag_t last_buffer);
static ngx_int_t ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf);
......
...@@ -52,8 +52,6 @@ http { ...@@ -52,8 +52,6 @@ http {
# query string based channel id # query string based channel id
set $push_stream_channel_id $arg_id; set $push_stream_channel_id $arg_id;
push_stream_max_channel_id_length 200; push_stream_max_channel_id_length 200;
# message template
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
# store messages in memory # store messages in memory
push_stream_store_messages on; push_stream_store_messages on;
# max messages to store in memory # max messages to store in memory
......
...@@ -296,3 +296,32 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -296,3 +296,32 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
// send content tail // send content tail
return ngx_http_push_stream_send_response_chunk(r, tail->data, tail->len, 1); return ngx_http_push_stream_send_response_chunk(r, tail->data, tail->len, 1);
} }
static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template) {
ngx_http_push_stream_msg_template_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
ngx_http_push_stream_msg_template_t *cur = sentinel;
u_char *aux = NULL;
while ((cur = (ngx_http_push_stream_msg_template_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (ngx_strncmp(cur->template.data, template.data, cur->template.len) == 0) {
return cur->index;
}
}
ngx_http_push_stream_module_main_conf->qtd_templates++;
cur = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_msg_template_t));
aux = ngx_pcalloc(cf->pool, template.len + 1);
if ((cur == NULL) || (aux == NULL)) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template to main configuration");
return -1;
}
cur->template.data = aux;
cur->index = ngx_http_push_stream_module_main_conf->qtd_templates;
cur->template.len = template.len;
ngx_memset(cur->template.data, '\0', template.len + 1);
ngx_memcpy(cur->template.data, template.data, template.len);
ngx_queue_insert_tail(&ngx_http_push_stream_module_main_conf->msg_templates.queue, &cur->queue);
return cur->index;
}
...@@ -299,11 +299,13 @@ ngx_http_push_stream_send_worker_ping_message(void) ...@@ -299,11 +299,13 @@ ngx_http_push_stream_send_worker_ping_message(void)
ngx_http_push_stream_worker_subscriber_t *sentinel = &thisworker_data->worker_subscribers_sentinel; ngx_http_push_stream_worker_subscriber_t *sentinel = &thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *cur = sentinel; ngx_http_push_stream_worker_subscriber_t *cur = sentinel;
if ((ngx_http_push_stream_ping_buf != NULL) && (!ngx_queue_empty(&sentinel->queue))) { if ((ngx_http_push_stream_ping_msg != NULL) && (!ngx_queue_empty(&sentinel->queue))) {
uint len = ngx_buf_size(ngx_http_push_stream_ping_buf);
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (cur->request != NULL) { if (cur->request != NULL) {
ngx_http_push_stream_send_response_chunk(cur->request, ngx_http_push_stream_ping_buf->pos, len, 0); ngx_str_t *str = ngx_http_push_stream_get_formatted_message(cur->request, NULL, ngx_http_push_stream_ping_msg, cur->request->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_chunk(cur->request, str->data, str->len, 0);
}
} }
} }
} }
...@@ -430,11 +432,13 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan ...@@ -430,11 +432,13 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
cur = sentinel; cur = sentinel;
if (msg != NULL) { if (msg != NULL) {
uint len = ngx_buf_size(msg->buf);
// now let's respond to some requests! // now let's respond to some requests!
while ((cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_http_push_stream_send_response_chunk(cur->request, msg->buf->pos, len, 0); ngx_str_t *str = ngx_http_push_stream_get_formatted_message(cur->request, channel, msg, cur->request->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_chunk(cur->request, str->data, str->len, 0);
}
} }
} }
......
...@@ -108,7 +108,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -108,7 +108,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
ngx_str_t *id; ngx_str_t *id;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_buf_t *buf = NULL, *buf_msg = NULL; ngx_buf_t *buf = NULL;
ngx_chain_t *chain; ngx_chain_t *chain;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ssize_t n; ssize_t n;
...@@ -174,12 +174,8 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -174,12 +174,8 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
return; return;
} }
// format message
buf_msg = ngx_http_push_stream_get_formatted_message(cf, channel, buf, r->pool);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(buf_msg, NULL, r, "push stream module: unable to format message");
// create a buffer copy in shared mem // create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(buf_msg); msg = ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(buf, channel, channel->last_message_id + 1, r->pool);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(msg, NULL, r, "push stream module: unable to allocate message in shared memory"); NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(msg, NULL, r, "push stream module: unable to allocate message in shared memory");
channel->last_message_id++; channel->last_message_id++;
......
...@@ -279,6 +279,8 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf) ...@@ -279,6 +279,8 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->shm_size = NGX_CONF_UNSET_SIZE; mcf->shm_size = NGX_CONF_UNSET_SIZE;
mcf->memory_cleanup_timeout = NGX_CONF_UNSET; mcf->memory_cleanup_timeout = NGX_CONF_UNSET;
mcf->qtd_templates = 0;
ngx_queue_init(&mcf->msg_templates.queue);
ngx_http_push_stream_module_main_conf = mcf; ngx_http_push_stream_module_main_conf = mcf;
...@@ -327,6 +329,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ...@@ -327,6 +329,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->authorized_channels_only = NGX_CONF_UNSET_UINT; lcf->authorized_channels_only = NGX_CONF_UNSET_UINT;
lcf->store_messages = NGX_CONF_UNSET_UINT; lcf->store_messages = NGX_CONF_UNSET_UINT;
lcf->max_channel_id_length = NGX_CONF_UNSET_UINT; lcf->max_channel_id_length = NGX_CONF_UNSET_UINT;
lcf->message_template_index = -1;
lcf->message_template.data = NULL; lcf->message_template.data = NULL;
lcf->header_template.data = NULL; lcf->header_template.data = NULL;
lcf->ping_message_interval = NGX_CONF_UNSET_MSEC; lcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
...@@ -454,6 +457,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -454,6 +457,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
if (conf->message_template.len > 0) { if (conf->message_template.len > 0) {
conf->message_template.data = ngx_http_push_stream_append_crlf(&conf->message_template, cf->pool); conf->message_template.data = ngx_http_push_stream_append_crlf(&conf->message_template, cf->pool);
conf->message_template.len = ngx_strlen(conf->message_template.data); conf->message_template.len = ngx_strlen(conf->message_template.data);
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template);
} }
// calc buffer cleanup interval // calc buffer cleanup interval
...@@ -470,14 +474,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -470,14 +474,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->subscriber_disconnect_interval = (interval > 1) ? (interval * 1000) + 1000 : 1000; // min 1 second conf->subscriber_disconnect_interval = (interval > 1) ? (interval * 1000) + 1000 : 1000; // min 1 second
} }
// create ping message
if ((conf->message_template.len > 0) && (ngx_http_push_stream_ping_buf == NULL)) {
if ((ngx_http_push_stream_ping_buf = ngx_http_push_stream_get_formatted_message(conf, NULL, NULL, cf->pool)) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to format ping message.");
return NGX_CONF_ERROR;
}
}
return NGX_CONF_OK; return NGX_CONF_OK;
} }
...@@ -591,5 +587,11 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -591,5 +587,11 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
} }
ngx_rbtree_init(&d->channels_to_delete, remove_sentinel, ngx_http_push_stream_rbtree_insert); ngx_rbtree_init(&d->channels_to_delete, remove_sentinel, ngx_http_push_stream_rbtree_insert);
// create ping message
ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(NGX_PUSH_STREAM_PING_MESSAGE_TEXT.data, NGX_PUSH_STREAM_PING_MESSAGE_TEXT.len, NULL, NGX_PUSH_STREAM_PING_MESSAGE_ID, ngx_cycle->pool);
if (ngx_http_push_stream_ping_msg == NULL) {
return NGX_ERROR;
}
return NGX_OK; return NGX_OK;
} }
...@@ -269,7 +269,10 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http ...@@ -269,7 +269,10 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
// positioning at first message, and send the others // positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) { while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (start == 0) { if (start == 0) {
ngx_http_push_stream_send_response_chunk(r, message->buf->pos, ngx_buf_size(message->buf), 0); ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, message, r->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_chunk(r, str->data, str->len, 0);
}
qtd--; qtd--;
} else { } else {
......
...@@ -47,13 +47,19 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_ ...@@ -47,13 +47,19 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
ngx_http_push_stream_msg_t * ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf) ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_pool_t *temp_pool)
{
return ngx_http_push_stream_convert_char_to_msg_on_shared_locked(buf->pos, ngx_buf_size(buf), channel, id, temp_pool);
}
ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_pool_t *temp_pool)
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_msg_template_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
ngx_http_push_stream_msg_template_t *cur = sentinel;
ngx_http_push_stream_msg_t *msg; ngx_http_push_stream_msg_t *msg;
off_t len; int i = 0;
len = ngx_buf_size(buf);
msg = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_msg_t)); msg = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_msg_t));
if (msg == NULL) { if (msg == NULL) {
...@@ -75,7 +81,7 @@ ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf) ...@@ -75,7 +81,7 @@ ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf)
ngx_memset(msg->buf->start, '\0', len + 1); ngx_memset(msg->buf->start, '\0', len + 1);
// copy the message to shared memory // copy the message to shared memory
msg->buf->last = ngx_copy(msg->buf->start, buf->pos, len); msg->buf->last = ngx_copy(msg->buf->start, data, len);
msg->buf->pos = msg->buf->start; msg->buf->pos = msg->buf->start;
msg->buf->end = msg->buf->last + len; msg->buf->end = msg->buf->last + len;
...@@ -85,6 +91,27 @@ ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf) ...@@ -85,6 +91,27 @@ ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf)
msg->expires = 0; msg->expires = 0;
msg->queue.prev = NULL; msg->queue.prev = NULL;
msg->queue.next = NULL; msg->queue.next = NULL;
msg->id = id;
msg->raw.data = msg->buf->start;
msg->raw.len = len;
msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t)*ngx_http_push_stream_module_main_conf->qtd_templates);
while ((cur = (ngx_http_push_stream_msg_template_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_str_t *aux = ngx_http_push_stream_format_message(channel, msg, cur->template, temp_pool);
ngx_str_t *formmated = (msg->formatted_messages + i);
formmated->data = ngx_slab_alloc_locked(shpool, aux->len + 1);
if (formmated->data == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
formmated->len = aux->len;
ngx_memset(formmated->data, '\0', formmated->len + 1);
ngx_memcpy(formmated->data, aux->data, formmated->len);
i++;
}
return msg; return msg;
} }
...@@ -345,6 +372,19 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for ...@@ -345,6 +372,19 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
static void static void
ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg) { ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg) {
u_int i;
for(i = 0; i < ngx_http_push_stream_module_main_conf->qtd_templates; i++) {
ngx_str_t *formmated = (msg->formatted_messages + i);
if ((formmated != NULL) && (formmated->data != NULL)) {
ngx_slab_free_locked(shpool, formmated->data);
}
}
if (msg->formatted_messages != NULL) {
ngx_slab_free_locked(shpool, msg->formatted_messages);
}
ngx_slab_free_locked(shpool, msg->buf->start); ngx_slab_free_locked(shpool, msg->buf->start);
ngx_slab_free_locked(shpool, msg->buf); ngx_slab_free_locked(shpool, msg->buf);
ngx_slab_free_locked(shpool, msg); ngx_slab_free_locked(shpool, msg);
...@@ -520,56 +560,65 @@ ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx ...@@ -520,56 +560,65 @@ ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx
} }
static ngx_buf_t * static ngx_str_t *
ngx_http_push_stream_get_formatted_message(ngx_http_push_stream_loc_conf_t *pslcf, ngx_http_push_stream_channel_t *channel, ngx_buf_t *buf, ngx_pool_t *pool) ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_pool_t *pool)
{
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
if (pslcf->message_template_index > 0) {
return message->formatted_messages + pslcf->message_template_index - 1;
}
return &message->raw;
}
static ngx_str_t *
ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t message_template, ngx_pool_t *pool)
{ {
ngx_uint_t len = 0; ngx_uint_t len = 0;
u_char *txt = NULL; u_char *txt = NULL;
ngx_str_t *str = NULL;
if (pslcf->message_template.len > 0) { if (message_template.len > 0) {
u_char template[pslcf->message_template.len + 1]; u_char template[message_template.len + 1];
ngx_memset(template, '\0', pslcf->message_template.len + 1); ngx_memset(template, '\0', message_template.len + 1);
ngx_memcpy(template, pslcf->message_template.data, pslcf->message_template.len); ngx_memcpy(template, message_template.data, message_template.len);
u_char char_id[NGX_INT_T_LEN]; u_char char_id[NGX_INT_T_LEN];
ngx_memset(char_id, '\0', NGX_INT_T_LEN); ngx_memset(char_id, '\0', NGX_INT_T_LEN);
u_char *msg = NGX_PUSH_STREAM_PING_MESSAGE_TEXT.data; u_char *msg = NGX_PUSH_STREAM_PING_MESSAGE_TEXT.data;
u_char *channel_id = NGX_PUSH_STREAM_PING_CHANNEL_ID.data; u_char *channel_id = NGX_PUSH_STREAM_PING_CHANNEL_ID.data;
ngx_int_t message_id = NGX_PUSH_STREAM_PING_MESSAGE_ID;
if (channel != NULL) {
channel_id = channel->id.data;
}
if ((channel != NULL) && (buf != NULL)) { if (message != NULL) {
ngx_sprintf(char_id, "%d", channel->last_message_id + 1); message_id = message->id;
len = ngx_buf_size(buf); len = ngx_buf_size(message->buf);
msg = ngx_pcalloc(pool, len + 1); msg = ngx_pcalloc(pool, len + 1);
ngx_memset(msg, '\0', len + 1); ngx_memset(msg, '\0', len + 1);
ngx_memcpy(msg, buf->pos, len); ngx_memcpy(msg, message->buf->pos, len);
channel_id = channel->id.data;
} else {
ngx_memcpy(char_id, NGX_PUSH_STREAM_PING_MESSAGE_ID.data, NGX_PUSH_STREAM_PING_MESSAGE_ID.len + 1);
} }
ngx_sprintf(char_id, "%d", message_id);
txt = ngx_http_push_stream_str_replace(template, NGX_PUSH_STREAM_TOKEN_MESSAGE_ID.data, char_id, 0, pool); txt = ngx_http_push_stream_str_replace(template, NGX_PUSH_STREAM_TOKEN_MESSAGE_ID.data, char_id, 0, pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.data, channel_id, 0, pool); txt = ngx_http_push_stream_str_replace(txt, NGX_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.data, channel_id, 0, pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, msg, 0, pool); txt = ngx_http_push_stream_str_replace(txt, NGX_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, msg, 0, pool);
len = ngx_strlen(txt); } else if (message != NULL) {
buf = ngx_calloc_buf(pool); ngx_str_t msg = ngx_string(message->buf->pos);
} else if (buf != NULL) { msg.len = ngx_buf_size(message->buf);
ngx_str_t msg = ngx_string(buf->pos);
msg.len = ngx_buf_size(buf);
txt = ngx_http_push_stream_append_crlf(&msg, pool); txt = ngx_http_push_stream_append_crlf(&msg, pool);
len = ngx_strlen(txt);
} }
// global adjusts if (txt != NULL) {
if (buf != NULL) { len = ngx_strlen(txt);
buf->pos = txt; str = ngx_pcalloc(pool, sizeof(ngx_str_t));
buf->last = buf->pos + len; str->data = txt;
buf->start = buf->pos; str->len = len;
buf->end = buf->last;
buf->temporary = 1;
buf->memory = 1;
} }
return buf; return str;
} }
......
...@@ -236,8 +236,6 @@ http { ...@@ -236,8 +236,6 @@ http {
# query string based channel id # query string based channel id
set $push_stream_channel_id $arg_id; set $push_stream_channel_id $arg_id;
# message template
<%= %{push_stream_message_template "#{@message_template}";} unless @message_template.nil? %>
# store messages # store messages
<%= "push_stream_store_messages #{@store_messages};" unless @store_messages.nil? %> <%= "push_stream_store_messages #{@store_messages};" unless @store_messages.nil? %>
# max messages to store in memory # max messages to store in memory
...@@ -283,6 +281,8 @@ http { ...@@ -283,6 +281,8 @@ http {
<%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %> <%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %>
<%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %> <%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %>
} }
<%= @extra_location %>
} }
} }
} }
......
...@@ -530,4 +530,79 @@ class TestPublisher < Test::Unit::TestCase ...@@ -530,4 +530,79 @@ class TestPublisher < Test::Unit::TestCase
fail_if_connecttion_error(sub_1) fail_if_connecttion_error(sub_1)
} }
end end
def config_test_different_message_templates
@message_template = '{\"text\":\"~text~\"}'
@header_template = nil
@extra_location = %q{
location ~ /sub2/(.*)? {
# activate subscriber mode for this location
push_stream_subscriber;
# positional channel path
set $push_stream_channels_path $1;
# message template
push_stream_message_template "{\"msg\":\"~text~\"}";
push_stream_subscriber_connection_timeout 1s;
}
}
end
def test_different_message_templates
headers = {'accept' => 'application/json'}
channel = 'ch_test_different_message_templates'
body = 'body'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
response = JSON.parse(chunk)
assert_equal(true, response.has_key?('text'), "Wrong message template")
assert_equal(false, response.has_key?('msg'), "Wrong message template")
assert_equal(body, response['text'], "Wrong message")
EventMachine.stop
}
fail_if_connecttion_error(sub_1)
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub2/' + channel.to_s + '.b1').get :head => headers, :timeout => 30
sub_2.stream { |chunk|
response = JSON.parse(chunk)
assert_equal(false, response.has_key?('text'), "Wrong message template")
assert_equal(true, response.has_key?('msg'), "Wrong message template")
assert_equal(body, response['msg'], "Wrong message")
EventMachine.stop
}
fail_if_connecttion_error(sub_2)
#publish a message
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
fail_if_connecttion_error(pub_1)
}
EventMachine.run {
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :head => headers, :timeout => 30
sub_3.stream { |chunk|
response = JSON.parse(chunk)
assert_equal(true, response.has_key?('text'), "Wrong message template")
assert_equal(false, response.has_key?('msg'), "Wrong message template")
assert_equal(body, response['text'], "Wrong message")
EventMachine.stop
}
fail_if_connecttion_error(sub_3)
}
EventMachine.run {
sub_4 = EventMachine::HttpRequest.new(nginx_address + '/sub2/' + channel.to_s + '.b1').get :head => headers, :timeout => 30
sub_4.stream { |chunk|
response = JSON.parse(chunk)
assert_equal(false, response.has_key?('text'), "Wrong message template")
assert_equal(true, response.has_key?('msg'), "Wrong message template")
assert_equal(body, response['msg'], "Wrong message")
EventMachine.stop
}
fail_if_connecttion_error(sub_4)
}
end
end end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment