Commit 5b058a21 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

using pool rotate and busy/free chains to decrease memory consumption

parent 30a770a9
......@@ -36,6 +36,12 @@ typedef struct {
void *value;
} ngx_http_push_stream_queue_elem_t;
typedef struct {
ngx_queue_t queue;
time_t expires;
ngx_pool_t *pool;
} ngx_http_push_stream_queue_pool_t;
// template queue
typedef struct {
ngx_queue_t queue; // this MUST be first
......@@ -158,6 +164,8 @@ typedef struct {
ngx_http_push_stream_subscriber_t *subscriber;
ngx_flag_t longpolling;
ngx_pool_t *temp_pool;
ngx_chain_t *free;
ngx_chain_t *busy;
} ngx_http_push_stream_subscriber_ctx_t;
// messages to worker processes
......@@ -172,6 +180,7 @@ typedef struct {
typedef struct {
ngx_http_push_stream_worker_msg_t *messages_queue;
ngx_http_push_stream_queue_elem_t *subscribers_sentinel;
ngx_http_push_stream_queue_pool_t *pools_to_delete;
ngx_uint_t subscribers; // # of subscribers in the worker
time_t startup;
pid_t pid;
......
......@@ -233,6 +233,9 @@ static void ngx_http_push_stream_send_response_finalize_for_long
static ngx_int_t ngx_http_push_stream_memory_cleanup();
static ngx_int_t ngx_http_push_stream_buffer_cleanup();
ngx_chain_t * ngx_http_push_stream_get_buf(ngx_http_request_t *r);
ngx_http_push_stream_channel_t *ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
......@@ -256,6 +259,7 @@ static void ngx_http_push_stream_collect_expired_messages_and_em
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_free_worker_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_pools(ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
static ngx_inline void ngx_http_push_stream_delete_worker_channel(void);
......
......@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("f7ea53db0f1122817cdf2586ae4e14ad12d34cfd");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("30a770a9ae5387e5b5fe972b6ed1527380767a9c");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
......@@ -213,7 +213,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
while (cur != &queue_channel_info) {
next = ngx_queue_next(cur);
ngx_http_push_stream_channel_info_t *channel_info = (ngx_http_push_stream_channel_info_t *) cur;
if ((chain = ngx_pcalloc(r->pool, sizeof(ngx_chain_t))) == NULL) {
if ((chain = ngx_http_push_stream_get_buf(r)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -224,10 +224,6 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if ((chain->buf = ngx_calloc_buf(r->pool)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to wrap channel info");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
chain->buf->last_buf = 0;
chain->buf->memory = 1;
chain->buf->pos = text->data;
......@@ -280,7 +276,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_http_push_stream_send_response_text(r, header_response->data, header_response->len,0);
// send content body
if (first != NULL) {
ngx_http_output_filter(r, first);
ngx_http_push_stream_output_filter(r, first);
}
// send content footer
return ngx_http_push_stream_send_response_text(r, tail->data, tail->len, 1);
......
......@@ -140,10 +140,16 @@ ngx_http_push_stream_ipc_init_worker()
return NGX_ERROR;
}
if ((data->ipc[ngx_process_slot].pools_to_delete == NULL) && ((data->ipc[ngx_process_slot].pools_to_delete = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_queue_pool_t))) == NULL)) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
data->ipc[ngx_process_slot].pid = ngx_pid;
data->ipc[ngx_process_slot].startup = ngx_time();
ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue);
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
ngx_queue_init(&data->ipc[ngx_process_slot].pools_to_delete->queue);
data->subscribers = 0;
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked);
......@@ -179,6 +185,11 @@ ngx_http_push_stream_clean_worker_data()
if (data->ipc[ngx_process_slot].subscribers_sentinel != NULL) {
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
}
if (data->ipc[ngx_process_slot].subscribers_sentinel != NULL) {
ngx_queue_init(&data->ipc[ngx_process_slot].pools_to_delete->queue);
}
ngx_shmtx_unlock(&shpool->mutex);
data->ipc[ngx_process_slot].pid = -1;
......
......@@ -820,6 +820,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->ipc[i].subscribers = 0;
d->ipc[i].messages_queue = NULL;
d->ipc[i].subscribers_sentinel = NULL;
d->ipc[i].pools_to_delete = NULL;
}
d->startup = ngx_time();
......
......@@ -434,6 +434,88 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
return rc;
}
ngx_pool_t *
ngx_http_push_stream_get_temp_pool(ngx_http_request_t *r)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_subscriber_ctx_t *ctx = NULL;
ngx_http_push_stream_queue_pool_t *pool_node;
ngx_pool_t *pool = r->pool, *aux_pool;
if (((ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module)) != NULL) && (ctx->temp_pool != NULL)) {
if ((ctx->temp_pool->d.next != NULL) || (ctx->temp_pool->large != NULL)) {
if ((aux_pool = ngx_create_pool(NGX_MAX_ALLOC_FROM_POOL, ngx_cycle->log)) != NULL) {
if ((pool_node = ngx_palloc(ctx->temp_pool, sizeof(ngx_http_push_stream_queue_pool_t))) != NULL) {
ngx_shmtx_lock(&shpool->mutex);
pool_node->pool = ctx->temp_pool;
pool_node->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
ngx_queue_insert_tail(&thisworker_data->pools_to_delete->queue, &pool_node->queue);
ctx->temp_pool = aux_pool;
ngx_shmtx_unlock(&shpool->mutex);
} else {
ngx_destroy_pool(aux_pool);
}
}
}
pool = ctx->temp_pool;
}
return pool;
}
ngx_chain_t *
ngx_http_push_stream_get_buf(ngx_http_request_t *r)
{
ngx_http_push_stream_subscriber_ctx_t *ctx = NULL;
ngx_chain_t *out = NULL;
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module)) != NULL) {
out = ngx_chain_get_free_buf(r->pool, &ctx->free);
} else {
out = (ngx_chain_t *) ngx_pcalloc(r->pool, sizeof(ngx_chain_t));
if (out == NULL) {
return NULL;
}
out->buf = ngx_calloc_buf(r->pool);
if (out->buf == NULL) {
return NULL;
}
}
if (out != NULL) {
out->buf->tag = (ngx_buf_tag_t) &ngx_http_push_stream_module;
}
return out;
}
ngx_int_t
ngx_http_push_stream_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
ngx_http_push_stream_subscriber_ctx_t *ctx = NULL;
ngx_int_t rc;
rc = ngx_http_output_filter(r, in);
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module)) != NULL) {
ngx_chain_update_chains(&ctx->free, &ctx->busy, &in, (ngx_buf_tag_t) &ngx_http_push_stream_module);
}
return rc;
}
static ngx_int_t
ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code)
{
......@@ -468,13 +550,15 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex
return NGX_ERROR;
}
out = (ngx_chain_t *) ngx_pcalloc(r->pool, sizeof(ngx_chain_t));
b = ngx_calloc_buf(r->pool);
if ((out == NULL) || (b == NULL)) {
out = ngx_http_push_stream_get_buf(r);
if (out == NULL) {
return NGX_ERROR;
}
b = out->buf;
b->last_buf = last_buffer;
b->last_in_chain = 1;
b->flush = 1;
b->memory = 1;
b->pos = (u_char *) text;
......@@ -482,10 +566,9 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex
b->end = b->pos + len;
b->last = b->end;
out->buf = b;
out->next = NULL;
return ngx_http_output_filter(r, out);
return ngx_http_push_stream_output_filter(r, out);
}
static void
......@@ -699,6 +782,7 @@ ngx_http_push_stream_memory_cleanup()
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, data->tree.root, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
ngx_http_push_stream_free_memory_of_expired_pools(0);
return NGX_OK;
}
......@@ -740,6 +824,29 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
}
static ngx_int_t
ngx_http_push_stream_free_memory_of_expired_pools(ngx_flag_t force)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_queue_pool_t *cur;
ngx_shmtx_lock(&shpool->mutex);
while ((cur = (ngx_http_push_stream_queue_pool_t *)ngx_queue_next(&thisworker_data->pools_to_delete->queue)) != thisworker_data->pools_to_delete) {
if (force || (ngx_time() > cur->expires)) {
ngx_queue_remove(&cur->queue);
ngx_destroy_pool(cur->pool);
} else {
break;
}
}
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
static void
ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg)
{
......@@ -973,7 +1080,7 @@ ngx_http_push_stream_add_request_context(ngx_http_request_t *r)
return NULL;
}
if ((ctx->temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, ngx_cycle->log)) == NULL) {
if ((ctx->temp_pool = ngx_create_pool(NGX_MAX_ALLOC_FROM_POOL, ngx_cycle->log)) == NULL) {
return NULL;
}
......
......@@ -254,13 +254,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
}
if (frame.payload_len > 0) {
//create a temporary pool to allocate temporary elements
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, r->connection->log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool");
ngx_http_finalize_request(r, 0);
return;
}
temp_pool = ngx_http_push_stream_get_temp_pool(r);
aux = ngx_http_push_stream_create_str(temp_pool, frame.payload_len);
if (ngx_http_push_stream_recv(c, rev, &err, aux->data, (ssize_t) frame.payload_len) == NGX_ERROR) {
goto closed;
......@@ -278,12 +272,9 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ngx_http_push_stream_subscription_t *subscription = (ngx_http_push_stream_subscription_t *)ngx_queue_head(&ctx->subscriber->subscriptions_sentinel.queue);
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, frame.payload, frame.payload_len, NULL, temp_pool) == NULL) {
ngx_http_finalize_request(r, 0);
ngx_destroy_pool(temp_pool);
return;
}
}
ngx_destroy_pool(temp_pool);
}
if (frame.opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE) {
......@@ -301,9 +292,6 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
return;
closed:
if (temp_pool != NULL) {
ngx_destroy_pool(temp_pool);
}
if (err) {
rev->error = 1;
......
......@@ -5,7 +5,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
def config_test_message_cleanup
@min_message_buffer_timeout = '10s'
@max_reserved_memory = "65k"
@max_reserved_memory = "129k"
@max_message_buffer_length = 100
@memory_cleanup_timeout = '30s'
end
......@@ -155,7 +155,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
def config_test_channel_cleanup
@min_message_buffer_timeout = '10s'
@max_reserved_memory = "65k"
@max_reserved_memory = "129k"
@memory_cleanup_timeout = '30s'
end
......@@ -210,7 +210,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
end
def config_test_message_cleanup_with_store_off_with_subscriber
@max_reserved_memory = "65k"
@max_reserved_memory = "129k"
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
end
......@@ -247,7 +247,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
end
def config_test_message_cleanup_with_store_off_without_subscriber
@max_reserved_memory = "65k"
@max_reserved_memory = "129k"
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment