Commit 79eb641f authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

renaming some structure members and functions to be adequate with their use

parent de3a1935
...@@ -215,8 +215,8 @@ typedef struct { ...@@ -215,8 +215,8 @@ typedef struct {
ngx_uint_t subscribers; // # of subscribers in all channels ngx_uint_t subscribers; // # of subscribers in all channels
ngx_http_push_stream_msg_t messages_to_delete; ngx_http_push_stream_msg_t messages_to_delete;
ngx_queue_t channels_queue; ngx_queue_t channels_queue;
ngx_queue_t channels_trash;
ngx_queue_t channels_to_delete; ngx_queue_t channels_to_delete;
ngx_queue_t unrecoverable_channels;
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup; time_t startup;
time_t last_message_time; time_t last_message_time;
......
...@@ -921,7 +921,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -921,7 +921,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
ngx_queue_init(&d->channels_queue); ngx_queue_init(&d->channels_queue);
ngx_queue_init(&d->channels_to_delete); ngx_queue_init(&d->channels_to_delete);
ngx_queue_init(&d->unrecoverable_channels); ngx_queue_init(&d->channels_trash);
// create ping message // create ping message
if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) { if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
......
...@@ -58,16 +58,16 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_ ...@@ -58,16 +58,16 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
static void static void
ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool) ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool)
{ {
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *cur_worker; ngx_http_push_stream_pid_queue_t *cur_worker;
ngx_http_push_stream_queue_elem_t *cur; ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_subscription_t *cur_subscription; ngx_http_push_stream_subscription_t *cur_subscription;
ngx_queue_t *prev_channel, *cur_channel = &data->unrecoverable_channels; ngx_queue_t *prev_channel, *cur_channel = &data->channels_to_delete;
while ((cur_channel = ngx_queue_next(cur_channel)) != &data->unrecoverable_channels) { while ((cur_channel = ngx_queue_next(cur_channel)) != &data->channels_to_delete) {
channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue); channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue);
// remove subscribers if any // remove subscribers if any
...@@ -121,19 +121,20 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data ...@@ -121,19 +121,20 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
while (((cur_channel = ngx_queue_next(cur_channel)) != &data->unrecoverable_channels) && (prev_channel = ngx_queue_prev(cur_channel))) { while (((cur_channel = ngx_queue_next(cur_channel)) != &data->channels_to_delete) && (prev_channel = ngx_queue_prev(cur_channel))) {
channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue); channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue);
// channel has not subscribers and can be released // channel has not subscribers and can be released
if (channel->subscribers == 0) { if (channel->subscribers == 0) {
if (channel->expires == 0) {
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
} else if (ngx_time() > channel->expires) {
// go back one node on queue, since the current node will be removed // go back one node on queue, since the current node will be removed
cur_channel = prev_channel; cur_channel = prev_channel;
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
// move the channel to trash queue
ngx_queue_remove(&channel->queue); ngx_queue_remove(&channel->queue);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel); ngx_queue_insert_tail(&data->channels_trash, &channel->queue);
} channel->queue_sentinel = &data->channels_trash;
} }
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -146,7 +147,7 @@ ngx_http_push_stream_delete_worker_channel(void) ...@@ -146,7 +147,7 @@ ngx_http_push_stream_delete_worker_channel(void)
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool); ngx_http_push_stream_delete_channels(data, shpool);
} }
ngx_uint_t ngx_uint_t
...@@ -672,8 +673,8 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) ...@@ -672,8 +673,8 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
// move the channel to unrecoverable queue // move the channel to unrecoverable queue
ngx_rbtree_delete(&data->tree, &channel->node); ngx_rbtree_delete(&data->tree, &channel->node);
ngx_queue_remove(&channel->queue); ngx_queue_remove(&channel->queue);
ngx_queue_insert_tail(&data->unrecoverable_channels, &channel->queue); ngx_queue_insert_tail(&data->channels_to_delete, &channel->queue);
channel->queue_sentinel = &data->unrecoverable_channels; channel->queue_sentinel = &data->channels_to_delete;
// remove all messages // remove all messages
...@@ -723,11 +724,11 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s ...@@ -723,11 +724,11 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl; channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
(channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels); (channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
// move the channel to trash tree // move the channel to trash queue
ngx_rbtree_delete(&data->tree, &channel->node); ngx_rbtree_delete(&data->tree, &channel->node);
ngx_queue_remove(&channel->queue); ngx_queue_remove(&channel->queue);
ngx_queue_insert_tail(&data->channels_to_delete, &channel->queue); ngx_queue_insert_tail(&data->channels_trash, &channel->queue);
channel->queue_sentinel = &data->channels_to_delete; channel->queue_sentinel = &data->channels_trash;
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -760,7 +761,7 @@ ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_http_push_stream ...@@ -760,7 +761,7 @@ ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_http_push_stream
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_queue_t *cur; ngx_queue_t *cur;
while ((cur = ngx_queue_head(&data->channels_to_delete)) != &data->channels_to_delete) { while ((cur = ngx_queue_head(&data->channels_trash)) != &data->channels_trash) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue); channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
if ((ngx_time() > channel->expires) || force) { if ((ngx_time() > channel->expires) || force) {
...@@ -796,7 +797,7 @@ ngx_http_push_stream_memory_cleanup() ...@@ -796,7 +797,7 @@ ngx_http_push_stream_memory_cleanup()
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_delete_unrecoverable_channels(data, shpool); ngx_http_push_stream_delete_channels(data, shpool);
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, 0); ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0); ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment