Commit e4e53de4 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

- change directive push_stream_memory_cleanup_timeout to be part of main configuration only

- creating different timers to clear the memory, one for expired messages and another for empty channels
parent 03b911e6
...@@ -183,7 +183,7 @@ h3(#directives). Directives ...@@ -183,7 +183,7 @@ h3(#directives). Directives
|push_stream_message_template|unset|any string|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration| |push_stream_message_template|unset|any string|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_max_number_of_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration| |push_stream_max_number_of_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_max_number_of_broadcast_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration| |push_stream_max_number_of_broadcast_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_memory_cleanup_timeout|30 seconds|time constant|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration| |push_stream_memory_cleanup_timeout|30 seconds|time constant|http|main nginx configuration|
h4(#push_stream_channels_statistics). push_stream_channels_statistics h4(#push_stream_channels_statistics). push_stream_channels_statistics
......
...@@ -33,6 +33,8 @@ ...@@ -33,6 +33,8 @@
typedef struct { typedef struct {
size_t shm_size; size_t shm_size;
ngx_msec_t memory_cleanup_interval;
time_t memory_cleanup_timeout;
} ngx_http_push_stream_main_conf_t; } ngx_http_push_stream_main_conf_t;
typedef struct { typedef struct {
...@@ -53,8 +55,7 @@ typedef struct { ...@@ -53,8 +55,7 @@ typedef struct {
ngx_uint_t broadcast_channel_max_qtd; ngx_uint_t broadcast_channel_max_qtd;
ngx_uint_t max_number_of_channels; ngx_uint_t max_number_of_channels;
ngx_uint_t max_number_of_broadcast_channels; ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t memory_cleanup_interval; ngx_msec_t buffer_cleanup_interval;
time_t memory_cleanup_timeout;
} ngx_http_push_stream_loc_conf_t; } ngx_http_push_stream_loc_conf_t;
// shared memory segment name // shared memory segment name
...@@ -157,6 +158,8 @@ typedef struct { ...@@ -157,6 +158,8 @@ typedef struct {
ngx_int_t ngx_http_push_stream_worker_processes; ngx_int_t ngx_http_push_stream_worker_processes;
ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL; ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL;
ngx_http_push_stream_main_conf_t *ngx_http_push_stream_module_main_conf = NULL;
// channel // channel
static ngx_str_t * ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf); static ngx_str_t * ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf);
static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel); static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel);
......
...@@ -60,6 +60,7 @@ static void ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle); ...@@ -60,6 +60,7 @@ static void ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle);
static void ngx_http_push_stream_exit_master(ngx_cycle_t *cycle); static void ngx_http_push_stream_exit_master(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_postconfig(ngx_conf_t *cf); static ngx_int_t ngx_http_push_stream_postconfig(ngx_conf_t *cf);
static void * ngx_http_push_stream_create_main_conf(ngx_conf_t *cf); static void * ngx_http_push_stream_create_main_conf(ngx_conf_t *cf);
static char * ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent);
static void * ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf); static void * ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf);
static char * ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child); static char * ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
......
...@@ -190,6 +190,7 @@ static const ngx_str_t NGX_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~" ...@@ -190,6 +190,7 @@ static const ngx_str_t NGX_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~"
ngx_event_t ngx_http_push_stream_ping_event; ngx_event_t ngx_http_push_stream_ping_event;
ngx_event_t ngx_http_push_stream_disconnect_event; ngx_event_t ngx_http_push_stream_disconnect_event;
ngx_event_t ngx_http_push_stream_memory_cleanup_event; ngx_event_t ngx_http_push_stream_memory_cleanup_event;
ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
ngx_buf_t *ngx_http_push_stream_ping_buf = NULL; ngx_buf_t *ngx_http_push_stream_ping_buf = NULL;
...@@ -202,14 +203,17 @@ static ngx_buf_t * ngx_http_push_stream_get_formatted_message(ngx_http_ ...@@ -202,14 +203,17 @@ static ngx_buf_t * ngx_http_push_stream_get_formatted_message(ngx_http_
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_send_response_chunk(ngx_http_request_t *r, const u_char *chunk_text, uint chunk_len, ngx_flag_t last_buffer); static ngx_int_t ngx_http_push_stream_send_response_chunk(ngx_http_request_t *r, const u_char *chunk_text, uint chunk_len, ngx_flag_t last_buffer);
static ngx_int_t ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_conf_t *psmcf);
static ngx_int_t ngx_http_push_stream_buffer_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_ping_timer_set(ngx_http_push_stream_loc_conf_t *pslcf); static void ngx_http_push_stream_ping_timer_set(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_set(ngx_http_push_stream_loc_conf_t *pslcf); static void ngx_http_push_stream_disconnect_timer_set(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_memory_cleanup_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_memory_cleanup_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_memory_cleanup_timer_set(ngx_http_push_stream_loc_conf_t *pslcf); static void ngx_http_push_stream_memory_cleanup_timer_set(ngx_http_push_stream_main_conf_t *psmcf);
static void ngx_http_push_stream_buffer_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_buffer_cleanup_timer_set(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event); static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event);
...@@ -217,9 +221,10 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in ...@@ -217,9 +221,10 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in
static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_worker_subscriber_t *worker_subscriber); static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
u_char * ngx_http_push_stream_append_crlf(const ngx_str_t *str, ngx_pool_t *pool); u_char * ngx_http_push_stream_append_crlf(const ngx_str_t *str, ngx_pool_t *pool);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force, time_t memory_cleanup_timeout); static void ngx_http_push_stream_collect_expired_messages(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force); static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired, time_t memory_cleanup_timeout); static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request_t *r, ngx_uint_t default_subtype); static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request_t *r, ngx_uint_t default_subtype);
......
...@@ -91,9 +91,6 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -91,9 +91,6 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
return rc; return rc;
} }
// turn on timer to cleanup memory of old messages and channels
ngx_http_push_stream_memory_cleanup_timer_set(cf);
return NGX_DONE; return NGX_DONE;
} }
...@@ -196,7 +193,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -196,7 +193,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
channel->stored_messages++; channel->stored_messages++;
// now see if the queue is too big // now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, cf->max_messages, 0, cf->memory_cleanup_timeout); ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, cf->max_messages, 0);
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -204,8 +201,8 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -204,8 +201,8 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// send an alert to workers // send an alert to workers
ngx_http_push_stream_broadcast(channel, msg, r->connection->log); ngx_http_push_stream_broadcast(channel, msg, r->connection->log);
// turn on timer to cleanup memory of old messages an channels // turn on timer to cleanup buffer of old messages
ngx_http_push_stream_memory_cleanup_timer_set(cf); ngx_http_push_stream_buffer_cleanup_timer_set(cf);
ngx_http_push_stream_send_response_channel_info(r, channel); ngx_http_push_stream_send_response_channel_info(r, channel);
return; return;
......
...@@ -135,10 +135,10 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -135,10 +135,10 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
offsetof(ngx_http_push_stream_loc_conf_t, max_number_of_broadcast_channels), offsetof(ngx_http_push_stream_loc_conf_t, max_number_of_broadcast_channels),
NULL }, NULL },
{ ngx_string("push_stream_memory_cleanup_timeout"), { ngx_string("push_stream_memory_cleanup_timeout"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot, ngx_conf_set_sec_slot,
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, memory_cleanup_timeout), offsetof(ngx_http_push_stream_main_conf_t, memory_cleanup_timeout),
NULL }, NULL },
ngx_null_command ngx_null_command
}; };
...@@ -148,7 +148,7 @@ static ngx_http_module_t ngx_http_push_stream_module_ctx = { ...@@ -148,7 +148,7 @@ static ngx_http_module_t ngx_http_push_stream_module_ctx = {
NULL, /* preconfiguration */ NULL, /* preconfiguration */
ngx_http_push_stream_postconfig, /* postconfiguration */ ngx_http_push_stream_postconfig, /* postconfiguration */
ngx_http_push_stream_create_main_conf, /* create main configuration */ ngx_http_push_stream_create_main_conf, /* create main configuration */
NULL, /* init main configuration */ ngx_http_push_stream_init_main_conf, /* init main configuration */
NULL, /* create server configuration */ NULL, /* create server configuration */
NULL, /* merge server configuration */ NULL, /* merge server configuration */
ngx_http_push_stream_create_loc_conf, /* create location configuration */ ngx_http_push_stream_create_loc_conf, /* create location configuration */
...@@ -196,6 +196,9 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle) ...@@ -196,6 +196,9 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot; ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
thisworker_data->pid = ngx_pid; thisworker_data->pid = ngx_pid;
// turn on timer to cleanup memory of old messages and channels
ngx_http_push_stream_memory_cleanup_timer_set(ngx_http_push_stream_module_main_conf);
return ngx_http_push_stream_register_worker_message_handler(cycle); return ngx_http_push_stream_register_worker_message_handler(cycle);
} }
...@@ -207,7 +210,7 @@ ngx_http_push_stream_exit_master(ngx_cycle_t *cycle) ...@@ -207,7 +210,7 @@ ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
// destroy channel tree in shared memory // destroy channel tree in shared memory
ngx_http_push_stream_collect_expired_messages_and_empty_channels(&data->tree, shpool, data->tree.root, 1, 0); ngx_http_push_stream_collect_expired_messages_and_empty_channels(&data->tree, shpool, data->tree.root, 1);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(1); ngx_http_push_stream_free_memory_of_expired_messages_and_channels(1);
} }
...@@ -241,9 +244,6 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf) ...@@ -241,9 +244,6 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf)
size_t shm_size; size_t shm_size;
// initialize shared memory // initialize shared memory
if (conf->shm_size == NGX_CONF_UNSET_SIZE) {
conf->shm_size = NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE;
}
shm_size = ngx_align(conf->shm_size, ngx_pagesize); shm_size = ngx_align(conf->shm_size, ngx_pagesize);
if (shm_size < 8 * ngx_pagesize) { if (shm_size < 8 * ngx_pagesize) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "The push_stream_max_reserved_memory value must be at least %udKiB", (8 * ngx_pagesize) >> 10); ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "The push_stream_max_reserved_memory value must be at least %udKiB", (8 * ngx_pagesize) >> 10);
...@@ -270,10 +270,39 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf) ...@@ -270,10 +270,39 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
} }
mcf->shm_size = NGX_CONF_UNSET_SIZE; mcf->shm_size = NGX_CONF_UNSET_SIZE;
mcf->memory_cleanup_timeout = NGX_CONF_UNSET;
ngx_http_push_stream_module_main_conf = mcf;
return mcf; return mcf;
} }
static char *
ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
{
ngx_http_push_stream_main_conf_t *conf = parent;
if (conf->memory_cleanup_timeout == NGX_CONF_UNSET) {
conf->memory_cleanup_timeout = NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT;
}
if (conf->shm_size == NGX_CONF_UNSET_SIZE) {
conf->shm_size = NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE;
}
// memory cleanup timeout cannot't be small
if (conf->memory_cleanup_timeout < NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "memory cleanup timeout cannot't be less than %d.", NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT);
return NGX_CONF_ERROR;
}
// calc memory cleanup interval
ngx_uint_t interval = conf->memory_cleanup_timeout / 3;
conf->memory_cleanup_interval = (interval * 1000) + 1000; // min 11 seconds (((30 / 3) * 1000) + 1000)
return NGX_CONF_OK;
}
// location config stuff // location config stuff
static void * static void *
...@@ -300,8 +329,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ...@@ -300,8 +329,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->broadcast_channel_max_qtd = NGX_CONF_UNSET_UINT; lcf->broadcast_channel_max_qtd = NGX_CONF_UNSET_UINT;
lcf->max_number_of_channels = NGX_CONF_UNSET_UINT; lcf->max_number_of_channels = NGX_CONF_UNSET_UINT;
lcf->max_number_of_broadcast_channels = NGX_CONF_UNSET_UINT; lcf->max_number_of_broadcast_channels = NGX_CONF_UNSET_UINT;
lcf->memory_cleanup_interval = NGX_CONF_UNSET_MSEC; lcf->buffer_cleanup_interval = NGX_CONF_UNSET_MSEC;
lcf->memory_cleanup_timeout = NGX_CONF_UNSET;
return lcf; return lcf;
} }
...@@ -327,8 +355,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -327,8 +355,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->broadcast_channel_max_qtd, prev->broadcast_channel_max_qtd, NGX_CONF_UNSET_UINT); ngx_conf_merge_uint_value(conf->broadcast_channel_max_qtd, prev->broadcast_channel_max_qtd, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->max_number_of_channels, prev->max_number_of_channels, NGX_CONF_UNSET_UINT); ngx_conf_merge_uint_value(conf->max_number_of_channels, prev->max_number_of_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->max_number_of_broadcast_channels, prev->max_number_of_broadcast_channels, NGX_CONF_UNSET_UINT); ngx_conf_merge_uint_value(conf->max_number_of_broadcast_channels, prev->max_number_of_broadcast_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->memory_cleanup_interval, prev->memory_cleanup_interval, NGX_CONF_UNSET_MSEC); ngx_conf_merge_uint_value(conf->buffer_cleanup_interval, prev->buffer_cleanup_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_sec_value(conf->memory_cleanup_timeout, prev->memory_cleanup_timeout, NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT);
// sanity checks // sanity checks
...@@ -410,12 +437,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -410,12 +437,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
// memory cleanup timeout cannot't be small
if (conf->memory_cleanup_timeout < NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "memory cleanup timeout cannot't be less than %d.", NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT);
return NGX_CONF_ERROR;
}
// append crlf to templates // append crlf to templates
if (conf->header_template.len > 0) { if (conf->header_template.len > 0) {
conf->header_template.data = ngx_http_push_stream_append_crlf(&conf->header_template, cf->pool); conf->header_template.data = ngx_http_push_stream_append_crlf(&conf->header_template, cf->pool);
...@@ -427,12 +448,12 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -427,12 +448,12 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->message_template.len = ngx_strlen(conf->message_template.data); conf->message_template.len = ngx_strlen(conf->message_template.data);
} }
// calc memory cleanup interval // calc buffer cleanup interval
if (conf->buffer_timeout != NGX_CONF_UNSET) { if (conf->buffer_timeout != NGX_CONF_UNSET) {
ngx_uint_t interval = conf->buffer_timeout / 3; ngx_uint_t interval = conf->buffer_timeout / 3;
conf->memory_cleanup_interval = (interval > 1) ? (interval * 1000) + 1000 : 1000; // min 1 second conf->buffer_cleanup_interval = (interval > 1) ? (interval * 1000) + 1000 : 1000; // min 1 second
} else if (conf->memory_cleanup_interval == NGX_CONF_UNSET_MSEC) { } else if (conf->buffer_cleanup_interval == NGX_CONF_UNSET_MSEC) {
conf->memory_cleanup_interval = 1000; // 1 second conf->buffer_cleanup_interval = 1000; // 1 second
} }
// calc subscriber disconnect interval // calc subscriber disconnect interval
......
...@@ -181,9 +181,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -181,9 +181,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_http_push_stream_disconnect_timer_set(cf); ngx_http_push_stream_disconnect_timer_set(cf);
ngx_http_push_stream_ping_timer_set(cf); ngx_http_push_stream_ping_timer_set(cf);
// turn on timer to cleanup memory of old messages and channels
ngx_http_push_stream_memory_cleanup_timer_set(cf);
ngx_destroy_pool(temp_pool); ngx_destroy_pool(temp_pool);
return NGX_DONE; return NGX_DONE;
} }
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
#include <ngx_http_push_stream_module_utils.h> #include <ngx_http_push_stream_module_utils.h>
static ngx_inline void static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired, time_t memory_cleanup_timeout) { ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired) {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_msg_t *sentinel, *msg; ngx_http_push_stream_msg_t *sentinel, *msg;
...@@ -40,7 +40,7 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_ ...@@ -40,7 +40,7 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
} }
msg->deleted = 1; msg->deleted = 1;
msg->expires = ngx_time() + memory_cleanup_timeout; msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->memory_cleanup_timeout;
channel->stored_messages--; channel->stored_messages--;
ngx_queue_remove(&msg->queue); ngx_queue_remove(&msg->queue);
ngx_queue_insert_tail(&data->messages_to_delete.queue, &msg->queue); ngx_queue_insert_tail(&data->messages_to_delete.queue, &msg->queue);
...@@ -186,7 +186,7 @@ ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t * ...@@ -186,7 +186,7 @@ ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *
static void static void
ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force, time_t memory_cleanup_timeout) ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_rbtree_node_t *sentinel; ngx_rbtree_node_t *sentinel;
...@@ -198,22 +198,22 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *t ...@@ -198,22 +198,22 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *t
if ((!channel->deleted) && (&channel->node != sentinel)) { if ((!channel->deleted) && (&channel->node != sentinel)) {
if ((!channel->deleted) && (channel->node.left != NULL)) { if ((!channel->deleted) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(tree, shpool, node->left, force, memory_cleanup_timeout); ngx_http_push_stream_collect_expired_messages_and_empty_channels(tree, shpool, node->left, force);
} }
if ((!channel->deleted) && (channel->node.right != NULL)) { if ((!channel->deleted) && (channel->node.right != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(tree, shpool, node->right, force, memory_cleanup_timeout); ngx_http_push_stream_collect_expired_messages_and_empty_channels(tree, shpool, node->right, force);
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
if ((channel != NULL) && (!channel->deleted)) { if ((channel != NULL) && (!channel->deleted)) {
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1, memory_cleanup_timeout); ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
if ((channel->stored_messages == 0) && (channel->subscribers == 0)) { if ((channel->stored_messages == 0) && (channel->subscribers == 0)) {
channel->deleted = 1; channel->deleted = 1;
channel->expires = ngx_time() + memory_cleanup_timeout; channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->memory_cleanup_timeout;
(channel->broadcast) ? data->broadcast_channels-- : data->channels--; (channel->broadcast) ? data->broadcast_channels-- : data->channels--;
// move the channel to trash tree // move the channel to trash tree
...@@ -228,6 +228,36 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *t ...@@ -228,6 +228,36 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *t
} }
static void
ngx_http_push_stream_collect_expired_messages(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force)
{
ngx_rbtree_node_t *sentinel;
ngx_http_push_stream_channel_t *channel;
sentinel = tree->sentinel;
channel = (ngx_http_push_stream_channel_t *) node;
if ((!channel->deleted) && (&channel->node != sentinel)) {
if ((!channel->deleted) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages(tree, shpool, node->left, force);
}
if ((!channel->deleted) && (channel->node.right != NULL)) {
ngx_http_push_stream_collect_expired_messages(tree, shpool, node->right, force);
}
ngx_shmtx_lock(&shpool->mutex);
if ((channel != NULL) && (!channel->deleted)) {
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
}
ngx_shmtx_unlock(&shpool->mutex);
}
}
static void static void
ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force) ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force)
{ {
...@@ -272,18 +302,30 @@ ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_rbtree_t *tree, ...@@ -272,18 +302,30 @@ ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_rbtree_t *tree,
static ngx_int_t static ngx_int_t
ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf) ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_conf_t *psmcf)
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_collect_expired_messages_and_empty_channels(&data->tree, shpool, data->tree.root, 0, pslcf->memory_cleanup_timeout); ngx_http_push_stream_collect_expired_messages_and_empty_channels(&data->tree, shpool, data->tree.root, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0); ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
return NGX_OK; return NGX_OK;
} }
static ngx_int_t
ngx_http_push_stream_buffer_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_collect_expired_messages(&data->tree, shpool, data->tree.root, 0);
return NGX_OK;
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force) ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force)
{ {
...@@ -352,18 +394,37 @@ ngx_http_push_stream_disconnect_timer_set(ngx_http_push_stream_loc_conf_t *pslcf ...@@ -352,18 +394,37 @@ ngx_http_push_stream_disconnect_timer_set(ngx_http_push_stream_loc_conf_t *pslcf
static void static void
ngx_http_push_stream_memory_cleanup_timer_set(ngx_http_push_stream_loc_conf_t *pslcf) ngx_http_push_stream_memory_cleanup_timer_set(ngx_http_push_stream_main_conf_t *psmcf)
{ {
if (pslcf->memory_cleanup_interval != NGX_CONF_UNSET_MSEC) { if (psmcf->memory_cleanup_interval != NGX_CONF_UNSET_MSEC) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (ngx_http_push_stream_memory_cleanup_event.handler == NULL) { if (ngx_http_push_stream_memory_cleanup_event.handler == NULL) {
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_memory_cleanup_event.handler == NULL) { if (ngx_http_push_stream_memory_cleanup_event.handler == NULL) {
ngx_http_push_stream_memory_cleanup_event.handler = ngx_http_push_stream_memory_cleanup_timer_wake_handler; ngx_http_push_stream_memory_cleanup_event.handler = ngx_http_push_stream_memory_cleanup_timer_wake_handler;
ngx_http_push_stream_memory_cleanup_event.data = pslcf; ngx_http_push_stream_memory_cleanup_event.data = psmcf;
ngx_http_push_stream_memory_cleanup_event.log = ngx_cycle->log; ngx_http_push_stream_memory_cleanup_event.log = ngx_cycle->log;
ngx_http_push_stream_timer_reset(pslcf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event); ngx_http_push_stream_timer_reset(psmcf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event);
}
ngx_shmtx_unlock(&shpool->mutex);
}
}
}
static void
ngx_http_push_stream_buffer_cleanup_timer_set(ngx_http_push_stream_loc_conf_t *pslcf)
{
if ((pslcf->buffer_cleanup_interval != NGX_CONF_UNSET_MSEC) && pslcf->store_messages) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (ngx_http_push_stream_buffer_cleanup_event.handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_buffer_cleanup_event.handler == NULL) {
ngx_http_push_stream_buffer_cleanup_event.handler = ngx_http_push_stream_buffer_timer_wake_handler;
ngx_http_push_stream_buffer_cleanup_event.data = pslcf;
ngx_http_push_stream_buffer_cleanup_event.log = ngx_cycle->log;
ngx_http_push_stream_timer_reset(pslcf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event);
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
} }
...@@ -409,12 +470,20 @@ ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev) ...@@ -409,12 +470,20 @@ ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev)
static void static void
ngx_http_push_stream_memory_cleanup_timer_wake_handler(ngx_event_t *ev) ngx_http_push_stream_memory_cleanup_timer_wake_handler(ngx_event_t *ev)
{ {
ngx_http_push_stream_loc_conf_t *pslcf = ev->data; ngx_http_push_stream_main_conf_t *psmcf = ev->data;
ngx_http_push_stream_memory_cleanup(ev->log, pslcf); ngx_http_push_stream_memory_cleanup(ev->log, psmcf);
ngx_http_push_stream_timer_reset(pslcf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event); ngx_http_push_stream_timer_reset(psmcf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event);
} }
static void
ngx_http_push_stream_buffer_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_push_stream_loc_conf_t *pslcf = ev->data;
ngx_http_push_stream_buffer_cleanup(ev->log, pslcf);
ngx_http_push_stream_timer_reset(pslcf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event);
}
static u_char * static u_char *
ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_pool_t *pool) ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_pool_t *pool)
......
...@@ -211,6 +211,7 @@ http { ...@@ -211,6 +211,7 @@ http {
client_body_in_single_buffer on; client_body_in_single_buffer on;
client_body_temp_path <%= @client_body_temp %>; client_body_temp_path <%= @client_body_temp %>;
<%= "push_stream_max_reserved_memory #{@max_reserved_memory};" unless @max_reserved_memory.nil? %> <%= "push_stream_max_reserved_memory #{@max_reserved_memory};" unless @max_reserved_memory.nil? %>
<%= "push_stream_memory_cleanup_timeout #{@memory_cleanup_timeout};" unless @memory_cleanup_timeout.nil? %>
server { server {
listen <%=nginx_port%>; listen <%=nginx_port%>;
...@@ -245,8 +246,6 @@ http { ...@@ -245,8 +246,6 @@ http {
<%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %> <%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %>
<%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %> <%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %>
<%= "push_stream_memory_cleanup_timeout #{@memory_cleanup_timeout};" unless @memory_cleanup_timeout.nil? %>
# client_max_body_size MUST be equal to client_body_buffer_size or # client_max_body_size MUST be equal to client_body_buffer_size or
# you will be sorry. # you will be sorry.
client_max_body_size <%= @client_max_body_size.nil? ? '32k' : @client_max_body_size %>; client_max_body_size <%= @client_max_body_size.nil? ? '32k' : @client_max_body_size %>;
...@@ -278,8 +277,6 @@ http { ...@@ -278,8 +277,6 @@ http {
<%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %> <%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %>
<%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %> <%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %>
<%= "push_stream_memory_cleanup_timeout #{@memory_cleanup_timeout};" unless @memory_cleanup_timeout.nil? %>
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment