Commit 8899abcf authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

set \\0 in allocked memory before use, and set flag of discard request body

parent 77c136da
......@@ -22,13 +22,14 @@ ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_
return NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID;
}
if ((id = ngx_pcalloc(r->pool, sizeof(ngx_str_t) + vv->len)) == NULL) {
if ((id = ngx_pcalloc(r->pool, sizeof(ngx_str_t) + vv->len + 1)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for $push_stream_channel_id string");
return NULL;
}
id->data = (u_char *) (id + 1);
id->len = vv->len;
ngx_memset(id->data, '\0', vv->len + 1);
ngx_memcpy(id->data, vv->data, vv->len);
return id;
......@@ -87,6 +88,7 @@ ngx_http_push_stream_send_buf_response(ngx_http_request_t *r, ngx_buf_t *buf, co
r->headers_out.status = status_code;
ngx_http_discard_request_body(r);
r->discard_body = 1;
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->header_only) {
......@@ -135,6 +137,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len;
len = ngx_http_push_stream_worker_processes * (2*NGX_INT_T_LEN + len - 5); //minus 5 sprintf
subscribers_by_workers = ngx_pcalloc(r->pool, len);
ngx_memset(subscribers_by_workers, '\0', len);
start = subscribers_by_workers;
for (i = 0; i < ngx_http_push_stream_worker_processes; i++) {
format = (i < ngx_http_push_stream_worker_processes - 1) ? subtype->format_summarized_worker_item : subtype->format_summarized_worker_last_item;
......@@ -215,6 +218,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
r->headers_out.status = NGX_HTTP_OK;
ngx_http_discard_request_body(r);
r->discard_body = 1;
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->header_only) {
......@@ -242,6 +246,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_memset(header_response.data, '\0', head->len + hostname->len + currenttime->len + 1);
ngx_sprintf(header_response.data, (char *) head->data, hostname->data, currenttime->data, shm_data->channels, shm_data->broadcast_channels);
header_response.len = ngx_strlen(header_response.data);
ngx_http_push_stream_send_response_chunk(r, header_response.data, header_response.len,0);
......
......@@ -87,11 +87,11 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_buf_t *buf = NULL, *buf_msg = NULL;
ngx_chain_t *chain;
ngx_chain_t *chain;
ngx_http_push_stream_channel_t *channel;
ssize_t n;
off_t len;
ngx_http_push_stream_msg_t *msg;
off_t len;
ngx_http_push_stream_msg_t *msg;
// check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) {
......@@ -111,11 +111,16 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// just find the channel. if it's not there, NULL and return error.
channel = ngx_http_push_stream_find_channel(id, r->connection->log);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(channel, NULL, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel");
if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// copy request body to a memory buffer
buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n);
buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n + 1);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message");
ngx_memset(buf->start, '\0', r->headers_in.content_length_n + 1);
chain = r->request_body->bufs;
while ((chain != NULL) && (chain->buf != NULL)) {
......@@ -134,7 +139,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
return;
}
buf->last = buf->last + len;
ngx_close_file(chain->buf->file->fd);
ngx_delete_file(chain->buf->file->name.data);
chain->buf->file->fd = NGX_INVALID_FILE;
} else {
......@@ -147,6 +151,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// discard request body it is no longer needed
ngx_http_discard_request_body(r);
r->discard_body = 1;
// format message
buf_msg = ngx_http_push_stream_get_formatted_message(cf, channel, buf, r->pool);
......
......@@ -24,6 +24,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
ngx_http_discard_request_body(r);
r->discard_body = 1;
//create a temporary pool to allocate temporary elements
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, r->connection->log)) == NULL) {
......@@ -119,6 +120,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// responding subscriber
r->read_event_handler = ngx_http_test_reading;
r->write_event_handler = ngx_http_request_empty_handler;
ngx_http_discard_request_body(r);
r->discard_body = 1;
r->headers_out.content_type = cf->content_type;
......@@ -285,6 +287,7 @@ ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_poo
channels_path->data = (u_char *) (channels_path + 1);
channels_path->len = vv_channels_path->len;
ngx_memset(channels_path->data, '\0', vv_channels_path->len + 1);
ngx_memcpy(channels_path->data, vv_channels_path->data, vv_channels_path->len);
ngx_queue_init(&channels_ids->queue);
......@@ -328,13 +331,13 @@ ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_poo
return NULL;
}
if ((cur->id = ngx_pcalloc(pool, sizeof(ngx_str_t) + len)) == NULL) {
if ((cur->id = ngx_pcalloc(pool, sizeof(ngx_str_t) + len + 1)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id string");
return NULL;
}
cur->id->data = (u_char *) (cur->id + 1);
cur->id->len = len;
ngx_memset(cur->id->data, '\0', len + 1);
ngx_memcpy(cur->id->data, channel_pos, len);
cur->backtrack_messages = (backtrack_messages > 0) ? backtrack_messages : 0;
......
......@@ -44,13 +44,14 @@ ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf)
return NULL;
}
msg->buf->start = ngx_slab_alloc_locked(shpool, len);
msg->buf->start = ngx_slab_alloc_locked(shpool, len + 1);
if (msg->buf->start == NULL) {
ngx_slab_free_locked(shpool, msg->buf);
ngx_slab_free_locked(shpool, msg);
return NULL;
}
ngx_memset(msg->buf->start, '\0', len + 1);
// copy the message to shared memory
msg->buf->last = ngx_copy(msg->buf->start, buf->pos, len);
......@@ -71,7 +72,6 @@ ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t
ngx_int_t rc;
ngx_http_discard_request_body(r);
r->discard_body = 1;
r->keepalive = 0;
r->header_only = 1;
......@@ -121,7 +121,7 @@ ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_htt
}
static ngx_int_t
ngx_http_push_stream_send_response_chunk(ngx_http_request_t *r, const u_char *chunk_text, uint chunk_len, ngx_flag_t las_buffer)
ngx_http_push_stream_send_response_chunk(ngx_http_request_t *r, const u_char *chunk_text, uint chunk_len, ngx_flag_t last_buffer)
{
ngx_buf_t *b;
ngx_chain_t *out;
......@@ -136,7 +136,7 @@ ngx_http_push_stream_send_response_chunk(ngx_http_request_t *r, const u_char *ch
return NGX_ERROR;
}
b->last_buf = las_buffer;
b->last_buf = last_buffer;
b->flush = 1;
b->memory = 1;
b->pos = (u_char *)chunk_text;
......@@ -400,7 +400,8 @@ ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx
if (len_find > 0) {
u_char *ret = (u_char *) ngx_strstr(org, find);
if (ret != NULL) {
u_char *tmp = ngx_pcalloc(pool,len_org + len_replace + len_find);
u_char *tmp = ngx_pcalloc(pool, len_org + len_replace + len_find + 1);
ngx_memset(tmp, '\0', len_org + len_replace + len_find + 1);
u_int len_found = ret-org;
ngx_memcpy(tmp, org, len_found);
......@@ -423,18 +424,20 @@ ngx_http_push_stream_get_formatted_message(ngx_http_push_stream_loc_conf_t *pslc
if (pslcf->message_template.len > 0) {
u_char template[pslcf->message_template.len + 1];
ngx_memset(template, '\0', pslcf->message_template.len + 1);
ngx_memcpy(template, pslcf->message_template.data, pslcf->message_template.len);
template[pslcf->message_template.len] = '\0';
u_char char_id[NGX_INT_T_LEN];
ngx_memset(char_id, '\0', NGX_INT_T_LEN);
u_char *msg = NGX_PUSH_STREAM_PING_MESSAGE_TEXT.data;
u_char *channel_id = NGX_PUSH_STREAM_PING_CHANNEL_ID.data;
if ((channel != NULL) && (buf != NULL)) {
ngx_memzero(char_id, NGX_INT_T_LEN);
ngx_sprintf(char_id, "%d", channel->last_message_id + 1);
msg = ngx_pcalloc(pool, ngx_buf_size(buf) + 1);
ngx_memcpy(msg, buf->pos, ngx_buf_size(buf));
len = ngx_buf_size(buf);
msg = ngx_pcalloc(pool, len + 1);
ngx_memset(msg, '\0', len + 1);
ngx_memcpy(msg, buf->pos, len);
channel_id = channel->id.data;
} else {
ngx_memcpy(char_id, NGX_PUSH_STREAM_PING_MESSAGE_ID.data, NGX_PUSH_STREAM_PING_MESSAGE_ID.len + 1);
......@@ -496,6 +499,7 @@ ngx_http_push_stream_append_crlf(const ngx_str_t *str, ngx_pool_t *pool)
u_char *last, *result;
ngx_str_t crlf = ngx_string(CRLF);
result = ngx_pcalloc(pool, str->len + crlf.len + 1);
ngx_memset(result, '\0', str->len + crlf.len + 1);
last = ngx_copy(result, str->data, str->len);
last = ngx_copy(last, crlf.data, crlf.len);
......@@ -538,7 +542,8 @@ ngx_http_push_stream_get_formatted_current_time(ngx_pool_t *pool)
currenttime = (ngx_str_t *) ngx_pcalloc(pool, sizeof(ngx_str_t) + 20); //ISO 8601 pattern plus 1
if (currenttime != NULL) {
currenttime->data = (u_char *) currenttime + sizeof(ngx_str_t);
currenttime->data = (u_char *) (currenttime + 1);
ngx_memset(currenttime->data, '\0', 20);
ngx_gmtime(ngx_time(), &tm);
ngx_sprintf(currenttime->data, (char *) NGX_PUSH_STREAM_DATE_FORMAT_ISO_8601.data, tm.ngx_tm_year, tm.ngx_tm_mon, tm.ngx_tm_mday, tm.ngx_tm_hour, tm.ngx_tm_min, tm.ngx_tm_sec);
currenttime->len = ngx_strlen(currenttime->data);
......@@ -556,7 +561,8 @@ ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool)
hostname = (ngx_str_t *) ngx_pcalloc(pool, sizeof(ngx_str_t) + ngx_cycle->hostname.len + 1); //hostname length plus 1
if (hostname != NULL) {
hostname->data = (u_char *) hostname + sizeof(ngx_str_t);
hostname->data = (u_char *) (hostname + 1);
ngx_memset(hostname->data, '\0', ngx_cycle->hostname.len + 1);
ngx_memcpy(hostname->data, ngx_cycle->hostname.data, ngx_cycle->hostname.len);
hostname->len = ngx_strlen(hostname->data);
} else {
......
......@@ -65,6 +65,10 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
channel->expires = 0;
(channel->broadcast) ? data->broadcast_channels++ : data->channels++;
// reinitialize queues
ngx_queue_init(&channel->message_queue.queue);
ngx_queue_init(&channel->workers_with_subscribers.queue);
ngx_rbtree_delete(&data->channels_to_delete, (ngx_rbtree_node_t *) channel);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
......@@ -106,10 +110,10 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
return NULL;
}
channel->id.data = (u_char *) (channel+1); // contiguous piggy
ngx_memset(channel, '\0', sizeof(ngx_http_push_stream_channel_t) + id->len + 1);
channel->id.data = (u_char *) (channel + 1);
channel->id.len = (u_char) id->len;
ngx_memzero(channel->id.data, channel->id.len + 1);
channel->id.len = id->len;
ngx_memcpy(channel->id.data, id->data, channel->id.len);
channel->node.key = ngx_crc32_short(id->data, id->len);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment