Commit be0ae92b authored by Wandenberg's avatar Wandenberg

change the way the message template is parsed to be faster when apllying it to a published message

parent e8a6de63
...@@ -38,6 +38,23 @@ typedef struct { ...@@ -38,6 +38,23 @@ typedef struct {
ngx_uint_t message_min_len; ngx_uint_t message_min_len;
} ngx_http_push_stream_padding_t; } ngx_http_push_stream_padding_t;
typedef enum {
PUSH_STREAM_TEMPLATE_PART_TYPE_ID = 0,
PUSH_STREAM_TEMPLATE_PART_TYPE_TAG,
PUSH_STREAM_TEMPLATE_PART_TYPE_TIME,
PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_ID,
PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_TYPE,
PUSH_STREAM_TEMPLATE_PART_TYPE_CHANNEL,
PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT,
PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL
} ngx_http_push_stream_template_part_type;
typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_template_part_type kind;
ngx_str_t text;
} ngx_http_push_stream_template_parts_t;
// template queue // template queue
typedef struct { typedef struct {
ngx_queue_t queue; ngx_queue_t queue;
...@@ -45,7 +62,16 @@ typedef struct { ...@@ -45,7 +62,16 @@ typedef struct {
ngx_uint_t index; ngx_uint_t index;
ngx_flag_t eventsource; ngx_flag_t eventsource;
ngx_flag_t websocket; ngx_flag_t websocket;
} ngx_http_push_stream_template_queue_t; ngx_queue_t parts;
ngx_uint_t qtd_message_id;
ngx_uint_t qtd_event_id;
ngx_uint_t qtd_event_type;
ngx_uint_t qtd_channel;
ngx_uint_t qtd_text;
ngx_uint_t qtd_tag;
ngx_uint_t qtd_time;
size_t literal_len;
} ngx_http_push_stream_template_t;
typedef struct ngx_http_push_stream_msg_s ngx_http_push_stream_msg_t; typedef struct ngx_http_push_stream_msg_s ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_shm_data_s ngx_http_push_stream_shm_data_t; typedef struct ngx_http_push_stream_shm_data_s ngx_http_push_stream_shm_data_t;
......
...@@ -236,8 +236,8 @@ static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_h ...@@ -236,8 +236,8 @@ static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_h
static ngx_int_t ngx_http_push_stream_send_only_header_response_and_finalize(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message); static ngx_int_t ngx_http_push_stream_send_only_header_response_and_finalize(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static ngx_str_t * ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, const ngx_str_t *replace, off_t offset, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, const ngx_str_t *replace, off_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *opcode, off_t opcode_len, const u_char *text, off_t text_len, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *opcode, off_t opcode_len, const u_char *text, off_t text_len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_http_push_stream_template_t *template, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code); static ngx_int_t ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code);
......
...@@ -136,7 +136,7 @@ describe "Comunication Properties" do ...@@ -136,7 +136,7 @@ describe "Comunication Properties" do
if lines.length >= 3 if lines.length >= 3
lines[0].should eql("#{conf.header_template}") lines[0].should eql("#{conf.header_template}")
lines[1].should eql("{\"channel\":\"ch_test_message_and_channel_with_same_pattern_of_the_template~channel~~channel~~channel~~channel~~channel~~channel~~text~~text~~text~~channel~~channel~~channel~~text~~text~~text~~channel~~channel~~channel~~text~~text~~text~\", \"message\":\"~channel~~channel~~channel~~text~~text~~text~\", \"message_id\":\"1\"}") lines[1].should eql("{\"channel\":\"ch_test_message_and_channel_with_same_pattern_of_the_template~channel~~channel~~channel~~text~~text~~text~\", \"message\":\"~channel~~channel~~channel~~text~~text~~text~\", \"message_id\":\"1\"}")
lines[2].should eql("{\"channel\":\"\", \"message\":\" \", \"message_id\":\"-1\"}") lines[2].should eql("{\"channel\":\"\", \"message\":\" \", \"message_id\":\"-1\"}")
EventMachine.stop EventMachine.stop
end end
......
...@@ -276,14 +276,52 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r, ...@@ -276,14 +276,52 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket) { ngx_http_push_stream_check_and_parse_template_pattern(ngx_conf_t *cf, ngx_http_push_stream_template_t *template, u_char *last, u_char *start, const ngx_str_t *token, ngx_http_push_stream_template_part_type part_type)
{
ngx_http_push_stream_template_parts_t *part;
if (ngx_strncasecmp(start, token->data, token->len) == 0) {
if ((start - last) > 0) {
part = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_parts_t));
if (part == NULL) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template part");
return NGX_ERROR;
}
part->kind = PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL;
part->text.data = last;
part->text.len = start - last;
template->literal_len += part->text.len;
ngx_queue_insert_tail(&template->parts, &part->queue);
}
part = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_parts_t));
if (part == NULL) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template part");
return NGX_ERROR;
}
part->kind = part_type;
ngx_queue_insert_tail(&template->parts, &part->queue);
return NGX_OK;
}
return NGX_DECLINED;
}
static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module); ngx_http_push_stream_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
ngx_queue_t *q; ngx_queue_t *q;
ngx_http_push_stream_template_queue_t *cur; ngx_http_push_stream_template_t *cur;
ngx_str_t *aux = NULL; ngx_str_t *aux = NULL;
u_char *start = NULL, *last = NULL;
size_t len = 0;
ngx_http_push_stream_template_parts_t *part;
ngx_int_t rc;
for (q = ngx_queue_head(&mcf->msg_templates); q != ngx_queue_sentinel(&mcf->msg_templates); q = ngx_queue_next(q)) { for (q = ngx_queue_head(&mcf->msg_templates); q != ngx_queue_sentinel(&mcf->msg_templates); q = ngx_queue_next(q)) {
cur = ngx_queue_data(q, ngx_http_push_stream_template_queue_t, queue); cur = ngx_queue_data(q, ngx_http_push_stream_template_t, queue);
if ((ngx_memn2cmp(cur->template->data, template.data, cur->template->len, template.len) == 0) && if ((ngx_memn2cmp(cur->template->data, template.data, cur->template->len, template.len) == 0) &&
(cur->eventsource == eventsource) && (cur->websocket == websocket)) { (cur->eventsource == eventsource) && (cur->websocket == websocket)) {
return cur->index; return cur->index;
...@@ -292,7 +330,7 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n ...@@ -292,7 +330,7 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n
mcf->qtd_templates++; mcf->qtd_templates++;
cur = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_queue_t)); cur = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_t));
aux = ngx_http_push_stream_create_str(cf->pool, template.len); aux = ngx_http_push_stream_create_str(cf->pool, template.len);
if ((cur == NULL) || (aux == NULL)) { if ((cur == NULL) || (aux == NULL)) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template to main configuration"); ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template to main configuration");
...@@ -302,7 +340,70 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n ...@@ -302,7 +340,70 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n
cur->eventsource = eventsource; cur->eventsource = eventsource;
cur->websocket = websocket; cur->websocket = websocket;
cur->index = mcf->qtd_templates; cur->index = mcf->qtd_templates;
cur->qtd_message_id = 0;
cur->qtd_event_id = 0;
cur->qtd_event_type = 0;
cur->qtd_channel = 0;
cur->qtd_text = 0;
cur->qtd_tag = 0;
cur->qtd_time = 0;
cur->literal_len = 0;
ngx_queue_init(&cur->parts);
ngx_memcpy(cur->template->data, template.data, template.len); ngx_memcpy(cur->template->data, template.data, template.len);
ngx_queue_insert_tail(&mcf->msg_templates, &cur->queue); ngx_queue_insert_tail(&mcf->msg_templates, &cur->queue);
len = cur->template->len;
last = start = cur->template->data;
while ((start = ngx_strnstr(start, "~", len)) != NULL) {
if ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID, PUSH_STREAM_TEMPLATE_PART_TYPE_ID)) == NGX_OK) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID.len;
last = start;
cur->qtd_message_id++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID, PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_ID)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID.len;
last = start;
cur->qtd_event_id++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE, PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_TYPE)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE.len;
last = start;
cur->qtd_event_type++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL, PUSH_STREAM_TEMPLATE_PART_TYPE_CHANNEL)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.len;
last = start;
cur->qtd_channel++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT, PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT.len;
last = start;
cur->qtd_text++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG, PUSH_STREAM_TEMPLATE_PART_TYPE_TAG)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG.len;
last = start;
cur->qtd_tag++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME, PUSH_STREAM_TEMPLATE_PART_TYPE_TIME)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME.len;
last = start;
cur->qtd_time++;
} else {
start += 1;
}
if (rc == NGX_ERROR) {
return -1;
}
}
if (last < (cur->template->data + cur->template->len)) {
part = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_parts_t));
if (part == NULL) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template part");
return -1;
}
part->kind = PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL;
part->text.data = last;
part->text.len = (cur->template->data + cur->template->len) - last;
cur->literal_len += part->text.len;
ngx_queue_insert_tail(&cur->parts, &part->queue);
}
return cur->index; return cur->index;
} }
...@@ -748,7 +748,11 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -748,7 +748,11 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING) || (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) || (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET)) { (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET)) {
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE), (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET)); if ((conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE), (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET))) < 0) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push stream module: unable to parse message template: %V", &conf->message_template);
return NGX_CONF_ERROR;
}
if (conf->padding_by_user_agent.len > 0) { if (conf->padding_by_user_agent.len > 0) {
if ((conf->paddings = ngx_http_push_stream_parse_paddings(cf, &conf->padding_by_user_agent)) == NULL) { if ((conf->paddings = ngx_http_push_stream_parse_paddings(cf, &conf->padding_by_user_agent)) == NULL) {
......
...@@ -304,7 +304,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con ...@@ -304,7 +304,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con
} }
for (q = ngx_queue_head(&mcf->msg_templates); q != ngx_queue_sentinel(&mcf->msg_templates); q = ngx_queue_next(q)) { for (q = ngx_queue_head(&mcf->msg_templates); q != ngx_queue_sentinel(&mcf->msg_templates); q = ngx_queue_next(q)) {
ngx_http_push_stream_template_queue_t *cur = ngx_queue_data(q, ngx_http_push_stream_template_queue_t, queue); ngx_http_push_stream_template_t *cur = ngx_queue_data(q, ngx_http_push_stream_template_t, queue);
ngx_str_t *aux = NULL; ngx_str_t *aux = NULL;
if (cur->eventsource) { if (cur->eventsource) {
ngx_http_push_stream_line_t *cur_line; ngx_http_push_stream_line_t *cur_line;
...@@ -317,7 +317,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con ...@@ -317,7 +317,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con
for (q_line = ngx_queue_head(lines); q_line != ngx_queue_sentinel(lines); q_line = ngx_queue_next(q_line )) { for (q_line = ngx_queue_head(lines); q_line != ngx_queue_sentinel(lines); q_line = ngx_queue_next(q_line )) {
cur_line = ngx_queue_data(q_line , ngx_http_push_stream_line_t, queue); cur_line = ngx_queue_data(q_line , ngx_http_push_stream_line_t, queue);
if ((cur_line->line = ngx_http_push_stream_format_message(channel, msg, cur_line->line, cur->template, temp_pool)) == NULL) { if ((cur_line->line = ngx_http_push_stream_format_message(channel, msg, cur_line->line, cur, temp_pool)) == NULL) {
break; break;
} }
} }
...@@ -327,7 +327,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con ...@@ -327,7 +327,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con
ngx_sprintf(aux->data, "%V\n", tmp); ngx_sprintf(aux->data, "%V\n", tmp);
} }
} else { } else {
aux = ngx_http_push_stream_format_message(channel, msg, &msg->raw, cur->template, temp_pool); aux = ngx_http_push_stream_format_message(channel, msg, &msg->raw, cur, temp_pool);
} }
if (aux == NULL) { if (aux == NULL) {
...@@ -535,7 +535,7 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_ ...@@ -535,7 +535,7 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
} }
if (rc == NGX_OK) { if (rc == NGX_OK) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, msg, r->pool); ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, msg);
if (str != NULL) { if (str != NULL) {
if ((rc == NGX_OK) && use_jsonp && send_callback) { if ((rc == NGX_OK) && use_jsonp && send_callback) {
rc = ngx_http_push_stream_send_response_text(r, ctx->callback->data, ctx->callback->len, 0); rc = ngx_http_push_stream_send_response_text(r, ctx->callback->data, ctx->callback->len, 0);
...@@ -830,7 +830,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_ ...@@ -830,7 +830,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_
if (mcf->timeout_with_body && (mcf->longpooling_timeout_msg == NULL)) { if (mcf->timeout_with_body && (mcf->longpooling_timeout_msg == NULL)) {
// create longpooling timeout message // create longpooling timeout message
if ((mcf->longpooling_timeout_msg == NULL) && (mcf->longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, (u_char *) NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) { if ((mcf->longpooling_timeout_msg == NULL) && (mcf->longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, (u_char *) NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, r->pool)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate long pooling timeout message in shared memory"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate long pooling timeout message in shared memory");
} }
} }
...@@ -1197,7 +1197,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev) ...@@ -1197,7 +1197,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
} else { } else {
if (mcf->ping_msg == NULL) { if (mcf->ping_msg == NULL) {
// create ping message // create ping message
if ((mcf->ping_msg == NULL) && (mcf->ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, mcf->ping_message_text.data, mcf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) { if ((mcf->ping_msg == NULL) && (mcf->ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, mcf->ping_message_text.data, mcf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, r->pool)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate ping message in shared memory"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate ping message in shared memory");
} }
} }
...@@ -1274,7 +1274,7 @@ ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, co ...@@ -1274,7 +1274,7 @@ ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, co
static ngx_str_t * static ngx_str_t *
ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_pool_t *pool) ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message)
{ {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
if (pslcf->message_template_index > 0) { if (pslcf->message_template_index > 0) {
...@@ -1285,45 +1285,78 @@ ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_ ...@@ -1285,45 +1285,78 @@ ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_
static ngx_str_t * static ngx_str_t *
ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool) ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_http_push_stream_template_t *template, ngx_pool_t *temp_pool)
{ {
u_char *last; u_char *last;
ngx_str_t *txt = NULL; ngx_str_t *txt = NULL;
size_t len = 0;
ngx_str_t *char_id = ngx_http_push_stream_create_str(temp_pool, NGX_INT_T_LEN); ngx_queue_t *q;
ngx_str_t *tag = ngx_http_push_stream_create_str(temp_pool, NGX_INT_T_LEN); u_char id[NGX_INT_T_LEN + 1];
ngx_str_t *time = ngx_http_push_stream_create_str(temp_pool, NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN); u_char tag[NGX_INT_T_LEN + 1];
if (char_id == NULL || tag == NULL || time == NULL) { u_char time[NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN + 1];
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to replace message values on template"); size_t id_len, tag_len, time_len;
return NULL;
}
ngx_str_t *channel_id = (channel != NULL) ? &channel->id : &NGX_HTTP_PUSH_STREAM_EMPTY; ngx_str_t *channel_id = (channel != NULL) ? &channel->id : &NGX_HTTP_PUSH_STREAM_EMPTY;
ngx_str_t *event_id = (message->event_id != NULL) ? message->event_id : &NGX_HTTP_PUSH_STREAM_EMPTY; ngx_str_t *event_id = (message->event_id != NULL) ? message->event_id : &NGX_HTTP_PUSH_STREAM_EMPTY;
ngx_str_t *event_type = (message->event_type != NULL) ? message->event_type : &NGX_HTTP_PUSH_STREAM_EMPTY; ngx_str_t *event_type = (message->event_type != NULL) ? message->event_type : &NGX_HTTP_PUSH_STREAM_EMPTY;
last = ngx_sprintf(char_id->data, "%d", message->id); ngx_sprintf(id, "%d%Z", message->id);
char_id->len = last - char_id->data; id_len = ngx_strlen(id);
last = ngx_http_time(time->data, message->time); last = ngx_http_time(time, message->time);
time->len = last - time->data; time_len = last - time;
last = ngx_sprintf(tag->data, "%d", message->tag); ngx_sprintf(tag, "%d%Z", message->tag);
tag->len = last - tag->data; tag_len = ngx_strlen(tag);
txt = ngx_http_push_stream_str_replace(message_template, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID, char_id, 0, temp_pool); len += template->qtd_channel * channel_id->len;
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID, event_id, 0, temp_pool); len += template->qtd_event_id * event_id->len;
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE, event_type, 0, temp_pool); len += template->qtd_event_type * event_type->len;
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL, channel_id, 0, temp_pool); len += template->qtd_message_id * id_len;
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME, time, 0, temp_pool); len += template->qtd_time * time_len;
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG, tag, 0, temp_pool); len += template->qtd_tag * tag_len;
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT, text, 0, temp_pool); len += template->qtd_text * text->len;
len += template->literal_len;
txt = ngx_http_push_stream_create_str(temp_pool, len);
if (txt == NULL) { if (txt == NULL) {
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to replace message values on template"); ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to format message");
return NULL; return NULL;
} }
last = txt->data;
for (q = ngx_queue_head(&template->parts); q != ngx_queue_sentinel(&template->parts); q = ngx_queue_next(q)) {
ngx_http_push_stream_template_parts_t *cur = ngx_queue_data(q, ngx_http_push_stream_template_parts_t, queue);
switch (cur->kind) {
case PUSH_STREAM_TEMPLATE_PART_TYPE_CHANNEL:
last = ngx_cpymem(last, channel_id->data, channel_id->len);
break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_ID:
last = ngx_cpymem(last, event_id->data, event_id->len);
break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_TYPE:
last = ngx_cpymem(last, event_type->data, event_type->len);
break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_ID:
last = ngx_cpymem(last, id, id_len);
break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL:
last = ngx_cpymem(last, cur->text.data, cur->text.len);
break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_TAG:
last = ngx_cpymem(last, tag, tag_len);
break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT:
last = ngx_cpymem(last, text->data, text->len);
break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_TIME:
last = ngx_cpymem(last, time, time_len);
break;
default:
break;
}
}
return txt; return txt;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment