Commit 80fa531b authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

split parts of the code in other functions and fix memory leak of temp pool when an error occur

parent a81c81d3
...@@ -25,20 +25,19 @@ ...@@ -25,20 +25,19 @@
#include <ngx_http_push_stream_module_subscriber.h> #include <ngx_http_push_stream_module_subscriber.h>
static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static void ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static ngx_int_t static ngx_int_t
ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_pool_cleanup_t *cln;
ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_http_push_stream_worker_subscriber_t *worker_subscriber; ngx_http_push_stream_worker_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur; ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_pool_t *temp_pool; ngx_pool_t *temp_pool;
ngx_uint_t subscribed_channels_qtd = 0; ngx_uint_t subscribed_channels_qtd = 0;
ngx_uint_t subscribed_broadcast_channels_qtd = 0; ngx_uint_t subscribed_broadcast_channels_qtd = 0;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_flag_t is_broadcast_channel; ngx_flag_t is_broadcast_channel;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
...@@ -54,23 +53,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -54,23 +53,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
//attach a cleaner to remove the request from the channel
if ((cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_http_push_stream_subscriber_cleanup_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for cleanup");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if ((worker_subscriber = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_worker_subscriber_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid;
worker_subscriber->expires = (cf->subscriber_connection_timeout == NGX_CONF_UNSET) ? 0 : (ngx_time() + cf->subscriber_connection_timeout);
ngx_queue_init(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue);
//get channels ids and backtracks from path //get channels ids and backtracks from path
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, temp_pool); channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, temp_pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) { if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
...@@ -95,7 +77,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -95,7 +77,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
} }
// count subscribed channel and boradcasts // count subscribed channel and broadcasts
subscribed_channels_qtd++; subscribed_channels_qtd++;
is_broadcast_channel = 0; is_broadcast_channel = 0;
if ((cf->broadcast_channel_prefix.len > 0) && (ngx_strncmp(cur->id->data, cf->broadcast_channel_prefix.data, cf->broadcast_channel_prefix.len) == 0)) { if ((cf->broadcast_channel_prefix.len > 0) && (ngx_strncmp(cur->id->data, cf->broadcast_channel_prefix.data, cf->broadcast_channel_prefix.len) == 0)) {
...@@ -123,33 +105,22 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -123,33 +105,22 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf); channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf);
if (channel == NULL) { if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate memory for new channel"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate memory for new channel");
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
} }
if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) { if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: number of channels were exceeded"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: number of channels were exceeded");
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE);
} }
} }
// set a cleaner to subscriber if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
cln->handler = (ngx_pool_cleanup_pt) ngx_http_push_stream_subscriber_cleanup; ngx_destroy_pool(temp_pool);
clndata = (ngx_http_push_stream_subscriber_cleanup_t *) cln->data; return NGX_HTTP_INTERNAL_SERVER_ERROR;
clndata->worker_subscriber = worker_subscriber; }
clndata->worker_subscriber->clndata = clndata;
// increment request reference count to keep connection open
r->main->count++;
// responding subscriber
r->read_event_handler = ngx_http_test_reading;
r->write_event_handler = ngx_http_request_empty_handler;
r->headers_out.content_type = cf->content_type;
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = -1;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
ngx_http_send_header(r); ngx_http_send_header(r);
// sending response content header // sending response content header
...@@ -160,14 +131,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -160,14 +131,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_registry_subscriber_locked(worker_subscriber);
// adding subscriber to woker list of subscribers
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel->queue, &worker_subscriber->queue);
// increment global subscribers count
data->subscribers++;
thisworker_data->subscribers++;
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
// adding subscriber to channel(s) and send backtrack messages // adding subscriber to channel(s) and send backtrack messages
...@@ -197,19 +161,13 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http ...@@ -197,19 +161,13 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
ngx_http_push_stream_msg_t *message, *message_sentinel; ngx_http_push_stream_msg_t *message, *message_sentinel;
ngx_http_push_stream_subscription_t *subscription; ngx_http_push_stream_subscription_t *subscription;
channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, cf); channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log);
if (channel == NULL) { if (channel == NULL) {
// unable to allocate channel OR channel not found // channel not found
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_ERROR; return NGX_ERROR;
} }
if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: number of channels were exceeded");
return NGX_ERROR;
}
sentinel = &channel->workers_with_subscribers; sentinel = &channel->workers_with_subscribers;
cur = sentinel; cur = sentinel;
...@@ -395,3 +353,64 @@ ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_ ...@@ -395,3 +353,64 @@ ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
} }
} }
static ngx_http_push_stream_worker_subscriber_t *
ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r)
{
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_pool_cleanup_t *cln;
ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_http_push_stream_worker_subscriber_t *worker_subscriber;
// attach a cleaner to remove the request from the channel
if ((cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_http_push_stream_subscriber_cleanup_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for cleanup");
return NULL;
}
if ((worker_subscriber = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_worker_subscriber_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber");
return NULL;
}
worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid;
worker_subscriber->expires = (cf->subscriber_connection_timeout == NGX_CONF_UNSET) ? 0 : (ngx_time() + cf->subscriber_connection_timeout);
ngx_queue_init(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue);
// set a cleaner to subscriber
cln->handler = (ngx_pool_cleanup_pt) ngx_http_push_stream_subscriber_cleanup;
clndata = (ngx_http_push_stream_subscriber_cleanup_t *) cln->data;
clndata->worker_subscriber = worker_subscriber;
clndata->worker_subscriber->clndata = clndata;
// increment request reference count to keep connection open
r->main->count++;
// responding subscriber
r->read_event_handler = ngx_http_test_reading;
r->write_event_handler = ngx_http_request_empty_handler;
r->headers_out.content_type = cf->content_type;
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = -1;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
return worker_subscriber;
}
static void
ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
// adding subscriber to woker list of subscribers
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel->queue, &worker_subscriber->queue);
// increment global subscribers count
data->subscribers++;
thisworker_data->subscribers++;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment