Commit 49d9345e authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

renaming the queue of messages to be removed from the memory and changing the...

renaming the queue of messages to be removed from the memory and changing the way the queue is cleared
parent f98bd08a
...@@ -206,7 +206,7 @@ typedef struct { ...@@ -206,7 +206,7 @@ typedef struct {
ngx_uint_t broadcast_channels; // # of broadcast channels being used ngx_uint_t broadcast_channels; // # of broadcast channels being used
ngx_uint_t published_messages; // # of published messagens in all channels ngx_uint_t published_messages; // # of published messagens in all channels
ngx_uint_t subscribers; // # of subscribers in all channels ngx_uint_t subscribers; // # of subscribers in all channels
ngx_http_push_stream_msg_t messages_to_delete; ngx_queue_t messages_trash;
ngx_queue_t channels_queue; ngx_queue_t channels_queue;
ngx_queue_t channels_trash; ngx_queue_t channels_trash;
ngx_queue_t channels_to_delete; ngx_queue_t channels_to_delete;
......
...@@ -903,7 +903,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -903,7 +903,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
return NGX_ERROR; return NGX_ERROR;
} }
shm_zone->data = d; shm_zone->data = d;
ngx_queue_init(&d->messages_to_delete.queue); ngx_queue_init(&d->messages_trash);
for (i = 0; i < NGX_MAX_PROCESSES; i++) { for (i = 0; i < NGX_MAX_PROCESSES; i++) {
d->ipc[i].pid = -1; d->ipc[i].pid = -1;
d->ipc[i].startup = 0; d->ipc[i].startup = 0;
......
...@@ -822,16 +822,18 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for ...@@ -822,16 +822,18 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_msg_t *cur, *prev; ngx_http_push_stream_msg_t *message;
ngx_queue_t *cur;
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
cur = &data->messages_to_delete; while ((cur = ngx_queue_head(&data->messages_trash)) != &data->messages_trash) {
while ((cur = (ngx_http_push_stream_msg_t *)ngx_queue_next(&cur->queue)) != &data->messages_to_delete) { message = ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
if (force || ((cur->workers_ref_count <= 0) && (ngx_time() > cur->expires))) {
prev = (ngx_http_push_stream_msg_t *)ngx_queue_prev(&cur->queue); if (force || ((message->workers_ref_count <= 0) && (ngx_time() > message->expires))) {
ngx_queue_remove(&cur->queue); ngx_queue_remove(&message->queue);
ngx_http_push_stream_free_message_memory_locked(shpool, cur); ngx_http_push_stream_free_message_memory_locked(shpool, message);
cur = prev; } else {
break;
} }
} }
ngx_http_push_stream_free_memory_of_expired_channels_locked(data, shpool, force); ngx_http_push_stream_free_memory_of_expired_channels_locked(data, shpool, force);
...@@ -889,7 +891,7 @@ ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *m ...@@ -889,7 +891,7 @@ ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *m
msg->deleted = 1; msg->deleted = 1;
msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl; msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
ngx_queue_insert_tail(&data->messages_to_delete.queue, &msg->queue); ngx_queue_insert_tail(&data->messages_trash, &msg->queue);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment