Commit 1fe628e7 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

reverting pool rotate, it shows inefficient

parent 0cadb974
...@@ -197,7 +197,6 @@ typedef struct { ...@@ -197,7 +197,6 @@ typedef struct {
typedef struct { typedef struct {
ngx_http_push_stream_worker_msg_t *messages_queue; ngx_http_push_stream_worker_msg_t *messages_queue;
ngx_http_push_stream_queue_elem_t *subscribers_sentinel; ngx_http_push_stream_queue_elem_t *subscribers_sentinel;
ngx_http_push_stream_queue_pool_t *pools_to_delete;
ngx_uint_t subscribers; // # of subscribers in the worker ngx_uint_t subscribers; // # of subscribers in the worker
time_t startup; time_t startup;
pid_t pid; pid_t pid;
......
...@@ -269,7 +269,6 @@ static void ngx_http_push_stream_collect_expired_messages_and_em ...@@ -269,7 +269,6 @@ static void ngx_http_push_stream_collect_expired_messages_and_em
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg); static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_free_worker_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg); static void ngx_http_push_stream_free_worker_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force); static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_pools(ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired); static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
static ngx_inline void ngx_http_push_stream_delete_worker_channel(void); static ngx_inline void ngx_http_push_stream_delete_worker_channel(void);
......
...@@ -27,6 +27,6 @@ ...@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("89a82444dd622c108bdf58cdc5be4c96ec3b46af"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("a1161e8eacdd207760b46975feed4ae268e3f06f");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
...@@ -88,11 +88,10 @@ ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_ ...@@ -88,11 +88,10 @@ ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_
{ {
ngx_str_t *text; ngx_str_t *text;
ngx_http_push_stream_content_subtype_t *subtype; ngx_http_push_stream_content_subtype_t *subtype;
ngx_pool_t *temp_pool = ngx_http_push_stream_get_temp_pool(r);
subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1); subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
text = ngx_http_push_stream_channel_info_formatted(temp_pool, subtype->format_item, &channel->id, channel->last_message_id, channel->stored_messages, channel->subscribers); text = ngx_http_push_stream_channel_info_formatted(r->pool, subtype->format_item, &channel->id, channel->last_message_id, channel->stored_messages, channel->subscribers);
if (text == NULL) { if (text == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
...@@ -111,11 +110,10 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -111,11 +110,10 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *worker_data; ngx_http_push_stream_worker_data_t *worker_data;
ngx_http_push_stream_content_subtype_t *subtype; ngx_http_push_stream_content_subtype_t *subtype;
ngx_pool_t *temp_pool = ngx_http_push_stream_get_temp_pool(r);
subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1); subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
currenttime = ngx_http_push_stream_get_formatted_current_time(temp_pool); currenttime = ngx_http_push_stream_get_formatted_current_time(r->pool);
hostname = ngx_http_push_stream_get_formatted_hostname(temp_pool); hostname = ngx_http_push_stream_get_formatted_hostname(r->pool);
used_slots = 0; used_slots = 0;
for(i = 0; i < NGX_MAX_PROCESSES; i++) { for(i = 0; i < NGX_MAX_PROCESSES; i++) {
...@@ -126,7 +124,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -126,7 +124,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len; len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len;
len = used_slots * (3*NGX_INT_T_LEN + len - 8); //minus 8 sprintf len = used_slots * (3*NGX_INT_T_LEN + len - 8); //minus 8 sprintf
if ((subscribers_by_workers = ngx_pcalloc(temp_pool, len)) == NULL) { if ((subscribers_by_workers = ngx_pcalloc(r->pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate memory to write workers statistics."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate memory to write workers statistics.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -143,7 +141,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -143,7 +141,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
len = 4*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf len = 4*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf
if ((text = ngx_http_push_stream_create_str(temp_pool, len)) == NULL) { if ((text = ngx_http_push_stream_create_str(r->pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -199,7 +197,6 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -199,7 +197,6 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1); ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
ngx_pool_t *temp_pool = ngx_http_push_stream_get_temp_pool(r);
const ngx_str_t *format; const ngx_str_t *format;
const ngx_str_t *head = subtype->format_group_head; const ngx_str_t *head = subtype->format_group_head;
...@@ -208,7 +205,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -208,7 +205,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_queue_init(&queue_channel_info); ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, temp_pool, data->tree.root, &queue_channel_info, prefix); ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, r->pool, data->tree.root, &queue_channel_info, prefix);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
// format content body // format content body
...@@ -222,7 +219,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -222,7 +219,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
} }
format = (next != &queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item; format = (next != &queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item;
if ((text = ngx_http_push_stream_channel_info_formatted(temp_pool, format, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers)) == NULL) { if ((text = ngx_http_push_stream_channel_info_formatted(r->pool, format, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to format channel info"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to format channel info");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -249,13 +246,13 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -249,13 +246,13 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
} }
// get formatted current time // get formatted current time
currenttime = ngx_http_push_stream_get_formatted_current_time(temp_pool); currenttime = ngx_http_push_stream_get_formatted_current_time(r->pool);
// get formatted hostname // get formatted hostname
hostname = ngx_http_push_stream_get_formatted_hostname(temp_pool); hostname = ngx_http_push_stream_get_formatted_hostname(r->pool);
// format content header // format content header
if ((header_response = ngx_http_push_stream_create_str(temp_pool, head->len + hostname->len + currenttime->len + NGX_INT_T_LEN)) == NULL) { if ((header_response = ngx_http_push_stream_create_str(r->pool, head->len + hostname->len + currenttime->len + NGX_INT_T_LEN)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
......
...@@ -140,16 +140,10 @@ ngx_http_push_stream_ipc_init_worker() ...@@ -140,16 +140,10 @@ ngx_http_push_stream_ipc_init_worker()
return NGX_ERROR; return NGX_ERROR;
} }
if ((data->ipc[ngx_process_slot].pools_to_delete == NULL) && ((data->ipc[ngx_process_slot].pools_to_delete = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_queue_pool_t))) == NULL)) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
data->ipc[ngx_process_slot].pid = ngx_pid; data->ipc[ngx_process_slot].pid = ngx_pid;
data->ipc[ngx_process_slot].startup = ngx_time(); data->ipc[ngx_process_slot].startup = ngx_time();
ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue); ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue);
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue); ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
ngx_queue_init(&data->ipc[ngx_process_slot].pools_to_delete->queue);
data->subscribers = 0; data->subscribers = 0;
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked); ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked);
...@@ -186,10 +180,6 @@ ngx_http_push_stream_clean_worker_data() ...@@ -186,10 +180,6 @@ ngx_http_push_stream_clean_worker_data()
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue); ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
} }
if (data->ipc[ngx_process_slot].subscribers_sentinel != NULL) {
ngx_queue_init(&data->ipc[ngx_process_slot].pools_to_delete->queue);
}
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
data->ipc[ngx_process_slot].pid = -1; data->ipc[ngx_process_slot].pid = -1;
......
...@@ -35,8 +35,6 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -35,8 +35,6 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel = NULL; ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_ctx_t *ctx;
r->keepalive = cf->keepalive; r->keepalive = cf->keepalive;
// only accept GET, POST and DELETE methods if enable publisher administration // only accept GET, POST and DELETE methods if enable publisher administration
...@@ -64,13 +62,6 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -64,13 +62,6 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if (r->keepalive) {
if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
// search for a existing channel with this id // search for a existing channel with this id
channel = ngx_http_push_stream_find_channel(id, r->connection->log); channel = ngx_http_push_stream_find_channel(id, r->connection->log);
...@@ -84,7 +75,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -84,7 +75,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
} }
if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && (r->method == NGX_HTTP_DELETE)) { if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && (r->method == NGX_HTTP_DELETE)) {
ngx_http_push_stream_delete_channel(id, ngx_http_push_stream_get_temp_pool(r)); ngx_http_push_stream_delete_channel(id, r->pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
} }
...@@ -144,7 +135,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -144,7 +135,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ssize_t n; ssize_t n;
off_t len; off_t len;
ngx_pool_t *temp_pool = ngx_http_push_stream_get_temp_pool(r);
// check if body message wasn't empty // check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) { if (r->headers_in.content_length_n <= 0) {
...@@ -163,7 +153,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -163,7 +153,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large"); NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large");
// copy request body to a memory buffer // copy request body to a memory buffer
buf = ngx_create_temp_buf(temp_pool, r->headers_in.content_length_n + 1); buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n + 1);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message"); NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message");
ngx_memset(buf->start, '\0', r->headers_in.content_length_n + 1); ngx_memset(buf->start, '\0', r->headers_in.content_length_n + 1);
...@@ -198,7 +188,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -198,7 +188,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID); event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
event_type = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE); event_type = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE);
channel = ngx_http_push_stream_add_msg_to_channel(r, id, buf->pos, ngx_buf_size(buf), event_id, event_type, temp_pool); channel = ngx_http_push_stream_add_msg_to_channel(r, id, buf->pos, ngx_buf_size(buf), event_id, event_type, r->pool);
if (channel == NULL) { if (channel == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return; return;
...@@ -217,8 +207,6 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r) ...@@ -217,8 +207,6 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
ngx_http_push_stream_channel_t *channel = NULL; ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_ctx_t *ctx;
r->keepalive = cf->keepalive; r->keepalive = cf->keepalive;
// only accept GET method // only accept GET method
...@@ -239,13 +227,6 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r) ...@@ -239,13 +227,6 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if (r->keepalive) {
if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
// if not specify a channel id, get info about all channels in a resumed way // if not specify a channel id, get info about all channels in a resumed way
if (id == NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID) { if (id == NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID) {
return ngx_http_push_stream_send_response_all_channels_info_summarized(r); return ngx_http_push_stream_send_response_all_channels_info_summarized(r);
......
...@@ -893,7 +893,6 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -893,7 +893,6 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->ipc[i].subscribers = 0; d->ipc[i].subscribers = 0;
d->ipc[i].messages_queue = NULL; d->ipc[i].messages_queue = NULL;
d->ipc[i].subscribers_sentinel = NULL; d->ipc[i].subscribers_sentinel = NULL;
d->ipc[i].pools_to_delete = NULL;
} }
d->startup = ngx_time(); d->startup = ngx_time();
......
...@@ -474,43 +474,6 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_ ...@@ -474,43 +474,6 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
} }
ngx_pool_t *
ngx_http_push_stream_get_temp_pool(ngx_http_request_t *r)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_subscriber_ctx_t *ctx = NULL;
ngx_http_push_stream_queue_pool_t *pool_node;
ngx_pool_t *pool = r->pool, *aux_pool;
if (((ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module)) != NULL) && (ctx->temp_pool != NULL)) {
if ((ctx->temp_pool->d.next != NULL) || (ctx->temp_pool->large != NULL)) {
if ((aux_pool = ngx_create_pool(NGX_MAX_ALLOC_FROM_POOL, ngx_cycle->log)) != NULL) {
if ((pool_node = ngx_palloc(ctx->temp_pool, sizeof(ngx_http_push_stream_queue_pool_t))) != NULL) {
ngx_shmtx_lock(&shpool->mutex);
pool_node->pool = ctx->temp_pool;
pool_node->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
ngx_queue_insert_tail(&thisworker_data->pools_to_delete->queue, &pool_node->queue);
ctx->temp_pool = aux_pool;
ngx_shmtx_unlock(&shpool->mutex);
} else {
ngx_destroy_pool(aux_pool);
}
}
}
pool = ctx->temp_pool;
}
return pool;
}
ngx_chain_t * ngx_chain_t *
ngx_http_push_stream_get_buf(ngx_http_request_t *r) ngx_http_push_stream_get_buf(ngx_http_request_t *r)
{ {
...@@ -519,6 +482,9 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r) ...@@ -519,6 +482,9 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r)
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module)) != NULL) { if ((ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module)) != NULL) {
out = ngx_chain_get_free_buf(r->pool, &ctx->free); out = ngx_chain_get_free_buf(r->pool, &ctx->free);
if (out != NULL) {
out->buf->tag = (ngx_buf_tag_t) &ngx_http_push_stream_module;
}
} else { } else {
out = (ngx_chain_t *) ngx_pcalloc(r->pool, sizeof(ngx_chain_t)); out = (ngx_chain_t *) ngx_pcalloc(r->pool, sizeof(ngx_chain_t));
if (out == NULL) { if (out == NULL) {
...@@ -531,10 +497,6 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r) ...@@ -531,10 +497,6 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r)
} }
} }
if (out != NULL) {
out->buf->tag = (ngx_buf_tag_t) &ngx_http_push_stream_module;
}
return out; return out;
} }
...@@ -844,7 +806,6 @@ ngx_http_push_stream_memory_cleanup() ...@@ -844,7 +806,6 @@ ngx_http_push_stream_memory_cleanup()
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, data->tree.root, 0); ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, data->tree.root, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0); ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
ngx_http_push_stream_free_memory_of_expired_pools(0);
return NGX_OK; return NGX_OK;
} }
...@@ -886,29 +847,6 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for ...@@ -886,29 +847,6 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
} }
static ngx_int_t
ngx_http_push_stream_free_memory_of_expired_pools(ngx_flag_t force)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_queue_pool_t *cur;
ngx_shmtx_lock(&shpool->mutex);
while ((cur = (ngx_http_push_stream_queue_pool_t *)ngx_queue_next(&thisworker_data->pools_to_delete->queue)) != thisworker_data->pools_to_delete) {
if (force || (ngx_time() > cur->expires)) {
ngx_queue_remove(&cur->queue);
ngx_destroy_pool(cur->pool);
} else {
break;
}
}
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
static void static void
ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg) ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg)
{ {
......
...@@ -254,7 +254,14 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -254,7 +254,14 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
} }
if (frame.payload_len > 0) { if (frame.payload_len > 0) {
temp_pool = ngx_http_push_stream_get_temp_pool(r); //create a temporary pool to allocate temporary elements
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, r->connection->log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool");
ngx_http_finalize_request(r, NGX_OK);
ngx_destroy_pool(temp_pool);
return;
}
aux = ngx_http_push_stream_create_str(temp_pool, frame.payload_len); aux = ngx_http_push_stream_create_str(temp_pool, frame.payload_len);
if (ngx_http_push_stream_recv(c, rev, &err, aux->data, (ssize_t) frame.payload_len) == NGX_ERROR) { if (ngx_http_push_stream_recv(c, rev, &err, aux->data, (ssize_t) frame.payload_len) == NGX_ERROR) {
goto closed; goto closed;
...@@ -271,10 +278,13 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -271,10 +278,13 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module); ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscription_t *subscription = (ngx_http_push_stream_subscription_t *)ngx_queue_head(&ctx->subscriber->subscriptions_sentinel.queue); ngx_http_push_stream_subscription_t *subscription = (ngx_http_push_stream_subscription_t *)ngx_queue_head(&ctx->subscriber->subscriptions_sentinel.queue);
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, frame.payload, frame.payload_len, NULL, NULL, temp_pool) == NULL) { if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, frame.payload, frame.payload_len, NULL, NULL, temp_pool) == NULL) {
ngx_http_finalize_request(r, 0); ngx_http_finalize_request(r, NGX_OK);
ngx_destroy_pool(temp_pool);
return; return;
} }
} }
ngx_destroy_pool(temp_pool);
} }
if (frame.opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE) { if (frame.opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE) {
...@@ -286,12 +296,15 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -286,12 +296,15 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && rev->active) { if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && rev->active) {
if (ngx_del_event(rev, NGX_READ_EVENT, 0) != NGX_OK) { if (ngx_del_event(rev, NGX_READ_EVENT, 0) != NGX_OK) {
ngx_http_finalize_request(r, 0); ngx_http_finalize_request(r, NGX_OK);
} }
} }
return; return;
closed: closed:
if (temp_pool != NULL) {
ngx_destroy_pool(temp_pool);
}
if (err) { if (err) {
rev->error = 1; rev->error = 1;
...@@ -299,5 +312,5 @@ closed: ...@@ -299,5 +312,5 @@ closed:
ngx_log_error(NGX_LOG_INFO, c->log, err, "client closed prematurely connection"); ngx_log_error(NGX_LOG_INFO, c->log, err, "client closed prematurely connection");
ngx_http_finalize_request(r, 0); ngx_http_finalize_request(r, NGX_OK);
} }
...@@ -3,7 +3,7 @@ require File.expand_path('base_test_case', File.dirname(__FILE__)) ...@@ -3,7 +3,7 @@ require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestMeasureMemory < Test::Unit::TestCase class TestMeasureMemory < Test::Unit::TestCase
include BaseTestCase include BaseTestCase
@@message_estimate_size = 200 @@message_estimate_size = 199
@@channel_estimate_size = 536 @@channel_estimate_size = 536
@@subscriber_estimate_size = 230 @@subscriber_estimate_size = 230
@@subscriber_estimate_system_size = 7100 @@subscriber_estimate_system_size = 7100
...@@ -44,7 +44,7 @@ class TestMeasureMemory < Test::Unit::TestCase ...@@ -44,7 +44,7 @@ class TestMeasureMemory < Test::Unit::TestCase
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics") assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
published_messages = JSON.parse(pub_2.response)["published_messages"].to_i published_messages = JSON.parse(pub_2.response)["published_messages"].to_i
assert(((expected_message - 10) < published_messages) && (published_messages < (expected_message + 10)), "Message size is far from %d bytes (expected: %d, published: %d)" % ([@@message_estimate_size, expected_message, published_messages])) assert(((expected_message - 10) <= published_messages) && (published_messages <= (expected_message + 10)), "Message size is far from %d bytes (expected: %d, published: %d)" % ([@@message_estimate_size, expected_message, published_messages]))
EventMachine.stop EventMachine.stop
} }
...@@ -71,7 +71,7 @@ class TestMeasureMemory < Test::Unit::TestCase ...@@ -71,7 +71,7 @@ class TestMeasureMemory < Test::Unit::TestCase
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics") assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
created_channels = JSON.parse(pub_2.response)["channels"].to_i created_channels = JSON.parse(pub_2.response)["channels"].to_i
assert(((expected_channel - 10) < created_channels) && (created_channels < (expected_channel + 10)), "Channel size is far from %d bytes (expected: %d, created: %d)" % ([@@channel_estimate_size, expected_channel, created_channels])) assert(((expected_channel - 10) <= created_channels) && (created_channels <= (expected_channel + 10)), "Channel size is far from %d bytes (expected: %d, created: %d)" % ([@@channel_estimate_size, expected_channel, created_channels]))
EventMachine.stop EventMachine.stop
} }
add_test_timeout add_test_timeout
...@@ -98,7 +98,7 @@ class TestMeasureMemory < Test::Unit::TestCase ...@@ -98,7 +98,7 @@ class TestMeasureMemory < Test::Unit::TestCase
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics") assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
created_subscriber = JSON.parse(pub_2.response)["subscribers"].to_i created_subscriber = JSON.parse(pub_2.response)["subscribers"].to_i
assert(((expected_subscriber - 10) < created_subscriber) && (created_subscriber < (expected_subscriber + 10)), "Subscriber size is far from %d bytes (expected: %d, created: %d)" % ([@@subscriber_estimate_size, expected_subscriber, created_subscriber])) assert(((expected_subscriber - 20) <= created_subscriber) && (created_subscriber <= (expected_subscriber + 20)), "Subscriber size is far from %d bytes (expected: %d, created: %d)" % ([@@subscriber_estimate_size, expected_subscriber, created_subscriber]))
EventMachine.stop EventMachine.stop
} }
end end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment