Commit 3fb374ed authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding last_activity_time to channel, to only delete it after is without...

adding last_activity_time to channel, to only delete it after is without messages and subscribers, and after 30 seconds of inactivity
parent a1755792
...@@ -144,6 +144,7 @@ typedef struct { ...@@ -144,6 +144,7 @@ typedef struct {
ngx_uint_t subscribers; ngx_uint_t subscribers;
ngx_http_push_stream_pid_queue_t workers_with_subscribers; ngx_http_push_stream_pid_queue_t workers_with_subscribers;
ngx_http_push_stream_msg_t message_queue; ngx_http_push_stream_msg_t message_queue;
time_t last_activity_time;
time_t expires; time_t expires;
ngx_flag_t deleted; ngx_flag_t deleted;
ngx_flag_t broadcast; ngx_flag_t broadcast;
......
...@@ -27,6 +27,6 @@ ...@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.3"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.3");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("1745a5e65a0c1a6b53cfab1ace68911c6031b778"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("a17557925f1d0ed8d6d7ebf7ad8d21d20640587a");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
...@@ -709,6 +709,7 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo ...@@ -709,6 +709,7 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
subscription->channel_subscriber_element_ref = element_subscriber; subscription->channel_subscriber_element_ref = element_subscriber;
channel->subscribers++; // do this only when we know everything went okay channel->subscribers++; // do this only when we know everything went okay
channel->last_activity_time = ngx_time();
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue); ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscribers_sentinel.queue, &element_subscriber->queue); ngx_queue_insert_tail(&worker_subscribers_sentinel->subscribers_sentinel.queue, &element_subscriber->queue);
return NGX_OK; return NGX_OK;
......
...@@ -49,6 +49,7 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_ ...@@ -49,6 +49,7 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
} }
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->stored_messages); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->stored_messages);
channel->last_activity_time = ngx_time();
ngx_queue_remove(&msg->queue); ngx_queue_remove(&msg->queue);
ngx_http_push_stream_mark_message_to_delete_locked(msg); ngx_http_push_stream_mark_message_to_delete_locked(msg);
} }
...@@ -322,6 +323,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_ ...@@ -322,6 +323,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
channel->last_message_tag = data->last_message_tag = msg->tag; channel->last_message_tag = data->last_message_tag = msg->tag;
// set message expiration time // set message expiration time
msg->expires = msg->time + ngx_http_push_stream_module_main_conf->message_ttl; msg->expires = msg->time + ngx_http_push_stream_module_main_conf->message_ttl;
channel->last_activity_time = ngx_time();
// put messages on the queue // put messages on the queue
if (cf->store_messages) { if (cf->store_messages) {
...@@ -717,7 +719,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s ...@@ -717,7 +719,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1); ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
if ((channel->stored_messages == 0) && (channel->subscribers == 0)) { if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->last_activity_time + 30 < ngx_time())) {
channel->deleted = 1; channel->deleted = 1;
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl; channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
(channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels); (channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
...@@ -1165,6 +1167,7 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc ...@@ -1165,6 +1167,7 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc
while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers);
cur->channel->last_activity_time = ngx_time();
ngx_queue_remove(&cur->channel_subscriber_element_ref->queue); ngx_queue_remove(&cur->channel_subscriber_element_ref->queue);
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
} }
......
...@@ -84,6 +84,7 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel) ...@@ -84,6 +84,7 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
channel->subscribers = 0; channel->subscribers = 0;
channel->deleted = 0; channel->deleted = 0;
channel->expires = 0; channel->expires = 0;
channel->last_activity_time = ngx_time();
ngx_queue_init(&channel->message_queue.queue); ngx_queue_init(&channel->message_queue.queue);
channel->message_queue.deleted = 0; channel->message_queue.deleted = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment