Commit 17fc99c1 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

applying pool rotate and busy/free chains to publisher

parent 5b058a21
...@@ -27,6 +27,6 @@ ...@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("30a770a9ae5387e5b5fe972b6ed1527380767a9c"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("5b058a215e2e094b5bbc95bfbe56382b97eca7bb");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
...@@ -88,10 +88,11 @@ ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_ ...@@ -88,10 +88,11 @@ ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_
{ {
ngx_str_t *text; ngx_str_t *text;
ngx_http_push_stream_content_subtype_t *subtype; ngx_http_push_stream_content_subtype_t *subtype;
ngx_pool_t *temp_pool = ngx_http_push_stream_get_temp_pool(r);
subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1); subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
text = ngx_http_push_stream_channel_info_formatted(r->pool, subtype->format_item, &channel->id, channel->last_message_id, channel->stored_messages, channel->subscribers); text = ngx_http_push_stream_channel_info_formatted(temp_pool, subtype->format_item, &channel->id, channel->last_message_id, channel->stored_messages, channel->subscribers);
if (text == NULL) { if (text == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
...@@ -110,10 +111,11 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -110,10 +111,11 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *worker_data; ngx_http_push_stream_worker_data_t *worker_data;
ngx_http_push_stream_content_subtype_t *subtype; ngx_http_push_stream_content_subtype_t *subtype;
ngx_pool_t *temp_pool = ngx_http_push_stream_get_temp_pool(r);
subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1); subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
currenttime = ngx_http_push_stream_get_formatted_current_time(r->pool); currenttime = ngx_http_push_stream_get_formatted_current_time(temp_pool);
hostname = ngx_http_push_stream_get_formatted_hostname(r->pool); hostname = ngx_http_push_stream_get_formatted_hostname(temp_pool);
used_slots = 0; used_slots = 0;
for(i = 0; i < NGX_MAX_PROCESSES; i++) { for(i = 0; i < NGX_MAX_PROCESSES; i++) {
...@@ -124,7 +126,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -124,7 +126,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len; len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len;
len = used_slots * (3*NGX_INT_T_LEN + len - 8); //minus 8 sprintf len = used_slots * (3*NGX_INT_T_LEN + len - 8); //minus 8 sprintf
if ((subscribers_by_workers = ngx_pcalloc(r->pool, len)) == NULL) { if ((subscribers_by_workers = ngx_pcalloc(temp_pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate memory to write workers statistics."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate memory to write workers statistics.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -141,7 +143,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -141,7 +143,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
len = 4*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf len = 4*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf
if ((text = ngx_http_push_stream_create_str(r->pool, len)) == NULL) { if ((text = ngx_http_push_stream_create_str(temp_pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -197,6 +199,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -197,6 +199,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1); ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
ngx_pool_t *temp_pool = ngx_http_push_stream_get_temp_pool(r);
const ngx_str_t *format; const ngx_str_t *format;
const ngx_str_t *head = subtype->format_group_head; const ngx_str_t *head = subtype->format_group_head;
...@@ -205,7 +208,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -205,7 +208,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_queue_init(&queue_channel_info); ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, r->pool, data->tree.root, &queue_channel_info, prefix); ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, temp_pool, data->tree.root, &queue_channel_info, prefix);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
// format content body // format content body
...@@ -219,7 +222,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -219,7 +222,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
} }
format = (next != &queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item; format = (next != &queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item;
if ((text = ngx_http_push_stream_channel_info_formatted(r->pool, format, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers)) == NULL) { if ((text = ngx_http_push_stream_channel_info_formatted(temp_pool, format, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to format channel info"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to format channel info");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -246,13 +249,13 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -246,13 +249,13 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
} }
// get formatted current time // get formatted current time
currenttime = ngx_http_push_stream_get_formatted_current_time(r->pool); currenttime = ngx_http_push_stream_get_formatted_current_time(temp_pool);
// get formatted hostname // get formatted hostname
hostname = ngx_http_push_stream_get_formatted_hostname(r->pool); hostname = ngx_http_push_stream_get_formatted_hostname(temp_pool);
// format content header // format content header
if ((header_response = ngx_http_push_stream_create_str(r->pool, head->len + hostname->len + currenttime->len + NGX_INT_T_LEN)) == NULL) { if ((header_response = ngx_http_push_stream_create_str(temp_pool, head->len + hostname->len + currenttime->len + NGX_INT_T_LEN)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
......
...@@ -35,6 +35,8 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -35,6 +35,8 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel = NULL; ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_ctx_t *ctx;
r->keepalive = cf->keepalive; r->keepalive = cf->keepalive;
// only accept GET, POST and DELETE methods if enable publisher administration // only accept GET, POST and DELETE methods if enable publisher administration
...@@ -62,6 +64,13 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -62,6 +64,13 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if (r->keepalive) {
if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
// search for a existing channel with this id // search for a existing channel with this id
channel = ngx_http_push_stream_find_channel(id, r->connection->log); channel = ngx_http_push_stream_find_channel(id, r->connection->log);
...@@ -75,7 +84,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -75,7 +84,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
} }
if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && (r->method == NGX_HTTP_DELETE)) { if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && (r->method == NGX_HTTP_DELETE)) {
ngx_http_push_stream_delete_channel(id, r->pool); ngx_http_push_stream_delete_channel(id, ngx_http_push_stream_get_temp_pool(r));
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
} }
...@@ -135,6 +144,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -135,6 +144,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ssize_t n; ssize_t n;
off_t len; off_t len;
ngx_pool_t *temp_pool = ngx_http_push_stream_get_temp_pool(r);
// check if body message wasn't empty // check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) { if (r->headers_in.content_length_n <= 0) {
...@@ -153,7 +163,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -153,7 +163,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large"); NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large");
// copy request body to a memory buffer // copy request body to a memory buffer
buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n + 1); buf = ngx_create_temp_buf(temp_pool, r->headers_in.content_length_n + 1);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message"); NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message");
ngx_memset(buf->start, '\0', r->headers_in.content_length_n + 1); ngx_memset(buf->start, '\0', r->headers_in.content_length_n + 1);
...@@ -187,7 +197,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -187,7 +197,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID); event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
channel = ngx_http_push_stream_add_msg_to_channel(r, id, buf->pos, ngx_buf_size(buf), event_id, r->pool); channel = ngx_http_push_stream_add_msg_to_channel(r, id, buf->pos, ngx_buf_size(buf), event_id, temp_pool);
if (channel == NULL) { if (channel == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return; return;
...@@ -206,10 +216,9 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r) ...@@ -206,10 +216,9 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel = NULL; ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
r->keepalive = cf->keepalive; ngx_http_push_stream_subscriber_ctx_t *ctx;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TAG, &NGX_HTTP_PUSH_STREAM_TAG); r->keepalive = cf->keepalive;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_COMMIT, &NGX_HTTP_PUSH_STREAM_COMMIT);
// only accept GET method // only accept GET method
if (!(r->method & NGX_HTTP_GET)) { if (!(r->method & NGX_HTTP_GET)) {
...@@ -217,6 +226,9 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r) ...@@ -217,6 +226,9 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
} }
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TAG, &NGX_HTTP_PUSH_STREAM_TAG);
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_COMMIT, &NGX_HTTP_PUSH_STREAM_COMMIT);
// get and check channel id value // get and check channel id value
id = ngx_http_push_stream_get_channel_id(r, cf); id = ngx_http_push_stream_get_channel_id(r, cf);
if ((id == NULL) || (id == NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID)) { if ((id == NULL) || (id == NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID)) {
...@@ -226,6 +238,13 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r) ...@@ -226,6 +238,13 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if (r->keepalive) {
if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
// if not specify a channel id, get info about all channels in a resumed way // if not specify a channel id, get info about all channels in a resumed way
if (id == NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID) { if (id == NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID) {
return ngx_http_push_stream_send_response_all_channels_info_summarized(r); return ngx_http_push_stream_send_response_all_channels_info_summarized(r);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment