Commit e2d5d8d3 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

preventing memory leak after the bug fix on reload nginx configuration file

parent 6f2d144c
...@@ -168,7 +168,6 @@ typedef struct { ...@@ -168,7 +168,6 @@ typedef struct {
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
} ngx_http_push_stream_shm_data_t; } ngx_http_push_stream_shm_data_t;
ngx_int_t ngx_http_push_stream_worker_processes;
ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL; ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL;
ngx_http_push_stream_main_conf_t *ngx_http_push_stream_module_main_conf = NULL; ngx_http_push_stream_main_conf_t *ngx_http_push_stream_module_main_conf = NULL;
......
...@@ -62,7 +62,8 @@ static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_s ...@@ -62,7 +62,8 @@ static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_s
static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers); static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle); static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_init_ipc_shm(ngx_int_t workers); static ngx_int_t ngx_http_push_stream_ipc_init_worker();
static void ngx_http_push_stream_clean_worker_data();
static void ngx_http_push_stream_channel_handler(ngx_event_t *ev); static void ngx_http_push_stream_channel_handler(ngx_event_t *ev);
static ngx_inline void ngx_http_push_stream_process_worker_message(void); static ngx_inline void ngx_http_push_stream_process_worker_message(void);
......
...@@ -119,20 +119,23 @@ ngx_http_push_stream_reset_channel_subscribers_count_locked(ngx_http_push_stream ...@@ -119,20 +119,23 @@ ngx_http_push_stream_reset_channel_subscribers_count_locked(ngx_http_push_stream
// will be called many times // will be called many times
static ngx_int_t static ngx_int_t
ngx_http_push_stream_init_ipc_shm(ngx_int_t workers) ngx_http_push_stream_ipc_init_worker()
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
int i; int i;
// cleanning old content if worker die and another one is set on same slot
ngx_http_push_stream_clean_worker_data();
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
if ((data->ipc[ngx_process_slot].messages_queue = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL) { if ((data->ipc[ngx_process_slot].messages_queue == NULL) && ((data->ipc[ngx_process_slot].messages_queue = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL)) {
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR; return NGX_ERROR;
} }
if ((data->ipc[ngx_process_slot].worker_subscribers_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_subscriber_t))) == NULL) { if ((data->ipc[ngx_process_slot].worker_subscribers_sentinel == NULL) && ((data->ipc[ngx_process_slot].worker_subscribers_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_subscriber_t))) == NULL)) {
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR; return NGX_ERROR;
} }
...@@ -157,6 +160,32 @@ ngx_http_push_stream_init_ipc_shm(ngx_int_t workers) ...@@ -157,6 +160,32 @@ ngx_http_push_stream_init_ipc_shm(ngx_int_t workers)
} }
static void
ngx_http_push_stream_clean_worker_data()
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_msg_t *cur_msg;
ngx_shmtx_lock(&shpool->mutex);
if (data->ipc[ngx_process_slot].messages_queue != NULL) {
while ((cur_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&data->ipc[ngx_process_slot].messages_queue->queue)) != data->ipc[ngx_process_slot].messages_queue) {
ngx_queue_remove(&cur_msg->queue);
ngx_slab_free_locked(shpool, cur_msg);
}
}
if (data->ipc[ngx_process_slot].worker_subscribers_sentinel != NULL) {
ngx_queue_init(&data->ipc[ngx_process_slot].worker_subscribers_sentinel->queue);
}
ngx_shmtx_unlock(&shpool->mutex);
data->ipc[ngx_process_slot].pid = -1;
data->ipc[ngx_process_slot].subscribers = 0;
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_register_worker_message_handler(ngx_cycle_t *cycle) ngx_http_push_stream_register_worker_message_handler(ngx_cycle_t *cycle)
{ {
......
...@@ -182,10 +182,8 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle) ...@@ -182,10 +182,8 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle)
return NGX_OK; return NGX_OK;
} }
ngx_http_push_stream_worker_processes = ccf->worker_processes;
// initialize our little IPC // initialize our little IPC
return ngx_http_push_stream_init_ipc(cycle, ngx_http_push_stream_worker_processes); return ngx_http_push_stream_init_ipc(cycle, ccf->worker_processes);
} }
...@@ -196,7 +194,7 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle) ...@@ -196,7 +194,7 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
return NGX_OK; return NGX_OK;
} }
if ((ngx_http_push_stream_init_ipc_shm(ngx_http_push_stream_worker_processes)) != NGX_OK) { if ((ngx_http_push_stream_ipc_init_worker()) != NGX_OK) {
return NGX_ERROR; return NGX_ERROR;
} }
...@@ -234,10 +232,9 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle) ...@@ -234,10 +232,9 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
return; return;
} }
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
// disconnect all subscribers (force_disconnect = 1) // disconnect all subscribers (force_disconnect = 1)
ngx_http_push_stream_disconnect_worker_subscribers(1); ngx_http_push_stream_disconnect_worker_subscribers(1);
ngx_http_push_stream_clean_worker_data();
if (ngx_http_push_stream_ping_event.timer_set) { if (ngx_http_push_stream_ping_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_ping_event); ngx_del_timer(&ngx_http_push_stream_ping_event);
...@@ -256,9 +253,6 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle) ...@@ -256,9 +253,6 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
} }
ngx_http_push_stream_ipc_exit_worker(cycle); ngx_http_push_stream_ipc_exit_worker(cycle);
data->ipc[ngx_process_slot].pid = -1;
data->ipc[ngx_process_slot].subscribers = 0;
} }
...@@ -597,6 +591,8 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -597,6 +591,8 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
for (i = 0; i < NGX_MAX_PROCESSES; i++) { for (i = 0; i < NGX_MAX_PROCESSES; i++) {
d->ipc[i].pid = -1; d->ipc[i].pid = -1;
d->ipc[i].subscribers = 0; d->ipc[i].subscribers = 0;
d->ipc[i].messages_queue = NULL;
d->ipc[i].worker_subscribers_sentinel = NULL;
} }
// initialize rbtree // initialize rbtree
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment