Commit 04f07211 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

optimizing message creation and check return of function...

optimizing message creation and check return of function ngx_http_push_stream_apply_text_template calls for errors
parent c3f6fdd8
...@@ -113,7 +113,7 @@ typedef struct { ...@@ -113,7 +113,7 @@ typedef struct {
time_t time; time_t time;
ngx_flag_t deleted; ngx_flag_t deleted;
ngx_int_t id; ngx_int_t id;
ngx_str_t *raw; ngx_str_t raw;
ngx_int_t tag; ngx_int_t tag;
ngx_str_t *event_id; ngx_str_t *event_id;
ngx_str_t *event_type; ngx_str_t *event_type;
......
...@@ -27,6 +27,6 @@ ...@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("a690c2e0d32cf4a18de72aac6e9ac385e3148675"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("c3f6fdd8c1a9d2da63552e9949bf82f430aeb939");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
...@@ -195,7 +195,6 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -195,7 +195,6 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
ngx_http_push_stream_template_queue_t *cur = sentinel; ngx_http_push_stream_template_queue_t *cur = sentinel;
ngx_http_push_stream_msg_t *msg; ngx_http_push_stream_msg_t *msg;
int i = 0; int i = 0;
u_char *last;
if ((msg = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_msg_t))) == NULL) { if ((msg = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_msg_t))) == NULL) {
return NULL; return NULL;
...@@ -206,21 +205,6 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -206,21 +205,6 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->event_id_message = NULL; msg->event_id_message = NULL;
msg->event_type_message = NULL; msg->event_type_message = NULL;
msg->formatted_messages = NULL; msg->formatted_messages = NULL;
if ((msg->raw = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
msg->raw->len = len;
msg->raw->data = (u_char *) (msg->raw + 1);
// copy the message to shared memory
last = ngx_copy(msg->raw->data, data, len);
*last = '\0';
ngx_http_push_stream_apply_text_template(&msg->event_id, &msg->event_id_message, event_id, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID, shpool, temp_pool);
ngx_http_push_stream_apply_text_template(&msg->event_type, &msg->event_type_message, event_type, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_EVENT_TEMPLATE, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE, shpool, temp_pool);
msg->deleted = 0; msg->deleted = 0;
msg->expires = 0; msg->expires = 0;
msg->queue.prev = NULL; msg->queue.prev = NULL;
...@@ -230,6 +214,27 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -230,6 +214,27 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->time = (id == -1) ? 0 : ngx_time(); msg->time = (id == -1) ? 0 : ngx_time();
msg->tag = (msg->time == shm_data->last_message_time) ? (shm_data->last_message_tag + 1) : 0; msg->tag = (msg->time == shm_data->last_message_time) ? (shm_data->last_message_tag + 1) : 0;
if ((msg->raw.data = ngx_slab_alloc_locked(shpool, len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
msg->raw.len = len;
// copy the message to shared memory
ngx_memcpy(msg->raw.data, data, len);
msg->raw.data[msg->raw.len] = '\0';
if (ngx_http_push_stream_apply_text_template(&msg->event_id, &msg->event_id_message, event_id, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID, shpool, temp_pool) != NGX_OK) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
if (ngx_http_push_stream_apply_text_template(&msg->event_type, &msg->event_type_message, event_type, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_EVENT_TEMPLATE, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE, shpool, temp_pool) != NGX_OK) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
if ((msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t)*ngx_http_push_stream_module_main_conf->qtd_templates)) == NULL) { if ((msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t)*ngx_http_push_stream_module_main_conf->qtd_templates)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg); ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL; return NULL;
...@@ -239,7 +244,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -239,7 +244,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
if (cur->eventsource) { if (cur->eventsource) {
ngx_http_push_stream_line_t *lines, *cur_line; ngx_http_push_stream_line_t *lines, *cur_line;
if ((lines = ngx_http_push_stream_split_by_crlf(msg->raw, temp_pool)) == NULL) { if ((lines = ngx_http_push_stream_split_by_crlf(&msg->raw, temp_pool)) == NULL) {
return NULL; return NULL;
} }
...@@ -251,7 +256,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -251,7 +256,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
} }
aux = ngx_http_push_stream_join_with_crlf(lines, temp_pool); aux = ngx_http_push_stream_join_with_crlf(lines, temp_pool);
} else { } else {
aux = ngx_http_push_stream_format_message(channel, msg, msg->raw, cur->template, temp_pool); aux = ngx_http_push_stream_format_message(channel, msg, &msg->raw, cur->template, temp_pool);
} }
if (aux == NULL) { if (aux == NULL) {
...@@ -273,8 +278,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -273,8 +278,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
} }
formmated->len = text->len; formmated->len = text->len;
last = ngx_copy(formmated->data, text->data, formmated->len); ngx_memcpy(formmated->data, text->data, formmated->len);
*last = '\0'; formmated->data[formmated->len] = '\0';
i++; i++;
} }
...@@ -852,6 +857,10 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt ...@@ -852,6 +857,10 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
{ {
u_int i; u_int i;
if (msg == NULL) {
return;
}
if (msg->formatted_messages != NULL) { if (msg->formatted_messages != NULL) {
for (i = 0; i < ngx_http_push_stream_module_main_conf->qtd_templates; i++) { for (i = 0; i < ngx_http_push_stream_module_main_conf->qtd_templates; i++) {
ngx_str_t *formmated = (msg->formatted_messages + i); ngx_str_t *formmated = (msg->formatted_messages + i);
...@@ -863,12 +872,12 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt ...@@ -863,12 +872,12 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
ngx_slab_free_locked(shpool, msg->formatted_messages); ngx_slab_free_locked(shpool, msg->formatted_messages);
} }
if (msg->raw != NULL) ngx_slab_free_locked(shpool, msg->raw); if (msg->raw.data != NULL) ngx_slab_free_locked(shpool, msg->raw.data);
if (msg->event_id != NULL) ngx_slab_free_locked(shpool, msg->event_id); if (msg->event_id != NULL) ngx_slab_free_locked(shpool, msg->event_id);
if (msg->event_type != NULL) ngx_slab_free_locked(shpool, msg->event_type); if (msg->event_type != NULL) ngx_slab_free_locked(shpool, msg->event_type);
if (msg->event_id_message != NULL) ngx_slab_free_locked(shpool, msg->event_id_message); if (msg->event_id_message != NULL) ngx_slab_free_locked(shpool, msg->event_id_message);
if (msg->event_type_message != NULL) ngx_slab_free_locked(shpool, msg->event_type_message); if (msg->event_type_message != NULL) ngx_slab_free_locked(shpool, msg->event_type_message);
if (msg != NULL) ngx_slab_free_locked(shpool, msg); ngx_slab_free_locked(shpool, msg);
} }
...@@ -1020,7 +1029,7 @@ ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_ ...@@ -1020,7 +1029,7 @@ ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_
if (pslcf->message_template_index > 0) { if (pslcf->message_template_index > 0) {
return message->formatted_messages + pslcf->message_template_index - 1; return message->formatted_messages + pslcf->message_template_index - 1;
} }
return message->raw; return &message->raw;
} }
......
...@@ -3,10 +3,10 @@ require File.expand_path('base_test_case', File.dirname(__FILE__)) ...@@ -3,10 +3,10 @@ require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestMeasureMemory < Test::Unit::TestCase class TestMeasureMemory < Test::Unit::TestCase
include BaseTestCase include BaseTestCase
@@message_estimate_size = 199 @@message_estimate_size = 174
@@channel_estimate_size = 536 @@channel_estimate_size = 536
@@subscriber_estimate_size = 230 @@subscriber_estimate_size = 230
@@subscriber_estimate_system_size = 7100 @@subscriber_estimate_system_size = 6780
def global_configuration def global_configuration
@max_reserved_memory = "2m" @max_reserved_memory = "2m"
...@@ -44,7 +44,7 @@ class TestMeasureMemory < Test::Unit::TestCase ...@@ -44,7 +44,7 @@ class TestMeasureMemory < Test::Unit::TestCase
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics") assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
published_messages = JSON.parse(pub_2.response)["published_messages"].to_i published_messages = JSON.parse(pub_2.response)["published_messages"].to_i
assert(((expected_message - 10) <= published_messages) && (published_messages <= (expected_message + 10)), "Message size is far from %d bytes (expected: %d, published: %d)" % ([@@message_estimate_size, expected_message, published_messages])) assert(((expected_message - 20) <= published_messages) && (published_messages <= (expected_message + 20)), "Message size is far from %d bytes (expected: %d, published: %d)" % ([@@message_estimate_size, expected_message, published_messages]))
EventMachine.stop EventMachine.stop
} }
...@@ -125,13 +125,13 @@ class TestMeasureMemory < Test::Unit::TestCase ...@@ -125,13 +125,13 @@ class TestMeasureMemory < Test::Unit::TestCase
} }
EventMachine.run { EventMachine.run {
memory_1 = `ps -eo vsz,cmd | grep -E 'ngin[xX] -c '`.split(' ')[0].to_i memory_1 = `ps -eo rss,cmd | grep -E 'ngin[xX] -c '`.split(' ')[0].to_i
subscriber_in_loop_with_limit(channel, headers, body, 1000, 1199) do subscriber_in_loop_with_limit(channel, headers, body, 1000, 1199) do
memory_2 = `ps -eo vsz,cmd | grep -E 'ngin[xX] -c '`.split(' ')[0].to_i memory_2 = `ps -eo rss,cmd | grep -E 'ngin[xX] -c '`.split(' ')[0].to_i
per_subscriber = ((memory_2 - memory_1).to_f / 200) * 1000 per_subscriber = ((memory_2 - memory_1).to_f / 200) * 1000
assert(((@@subscriber_estimate_system_size - 100) < per_subscriber) && (per_subscriber < (@@subscriber_estimate_system_size + 100)), "Subscriber system size is far from %d bytes (measured: %d)" % ([@@subscriber_estimate_system_size, per_subscriber])) assert(((@@subscriber_estimate_system_size - 10) < per_subscriber) && (per_subscriber < (@@subscriber_estimate_system_size + 10)), "Subscriber system size is far from %d bytes (measured: %d)" % ([@@subscriber_estimate_system_size, per_subscriber]))
EventMachine.stop EventMachine.stop
end end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment