Commit 813aab64 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

removing push_stream_delete_oldest_received_message and...

removing push_stream_delete_oldest_received_message and push_stream_min_message_buffer_length configurations
parent 347a76ad
...@@ -322,7 +322,7 @@ ngx_http_push_stream_release_message_locked(ngx_http_push_stream_channel_t *chan ...@@ -322,7 +322,7 @@ ngx_http_push_stream_release_message_locked(ngx_http_push_stream_channel_t *chan
// message had been dequeued and nobody needs it anymore // message had been dequeued and nobody needs it anymore
ngx_http_push_stream_free_message_locked(msg, ngx_http_push_stream_shpool); ngx_http_push_stream_free_message_locked(msg, ngx_http_push_stream_shpool);
} }
if (channel->stored_messages > msg->delete_oldest_received_min_messages && ngx_http_push_stream_get_oldest_message_locked(channel) == msg) { if (ngx_http_push_stream_get_oldest_message_locked(channel) == msg) {
ngx_http_push_stream_delete_message_locked(channel, msg, ngx_http_push_stream_shpool); ngx_http_push_stream_delete_message_locked(channel, msg, ngx_http_push_stream_shpool);
} }
} }
......
...@@ -17,11 +17,9 @@ typedef struct { ...@@ -17,11 +17,9 @@ typedef struct {
ngx_int_t index_channel_id; ngx_int_t index_channel_id;
ngx_int_t index_channels_path; ngx_int_t index_channels_path;
time_t buffer_timeout; time_t buffer_timeout;
ngx_int_t min_messages;
ngx_int_t max_messages; ngx_int_t max_messages;
ngx_int_t authorize_channel; ngx_int_t authorize_channel;
ngx_int_t store_messages; ngx_int_t store_messages;
ngx_int_t delete_oldest_received_message;
ngx_int_t max_channel_id_length; ngx_int_t max_channel_id_length;
ngx_str_t header_template; ngx_str_t header_template;
ngx_str_t message_template; ngx_str_t message_template;
...@@ -45,7 +43,6 @@ typedef struct { ...@@ -45,7 +43,6 @@ typedef struct {
ngx_queue_t queue; // this MUST be first ngx_queue_t queue; // this MUST be first
ngx_buf_t *buf; ngx_buf_t *buf;
time_t expires; time_t expires;
ngx_uint_t delete_oldest_received_min_messages; // NGX_MAX_UINT32_VALUE for 'never'
ngx_int_t refcount; ngx_int_t refcount;
ngx_flag_t persistent; ngx_flag_t persistent;
} ngx_http_push_stream_msg_t; } ngx_http_push_stream_msg_t;
...@@ -253,7 +250,6 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID = ngx_string(" ...@@ -253,7 +250,6 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID = ngx_string("
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs #define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
#define NGX_HTTP_PUSH_STREAM_DEFAULT_BUFFER_TIMEOUT 7200 #define NGX_HTTP_PUSH_STREAM_DEFAULT_BUFFER_TIMEOUT 7200
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MIN_MESSAGES 1
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MAX_MESSAGES 10 #define NGX_HTTP_PUSH_STREAM_DEFAULT_MAX_MESSAGES 10
#define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE "" #define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE ""
......
...@@ -131,9 +131,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -131,9 +131,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
msg->expires = (message_timeout == 0 ? 0 : (ngx_time() + message_timeout)); msg->expires = (message_timeout == 0 ? 0 : (ngx_time() + message_timeout));
msg->persistent = (message_timeout == 0 ? 1 : 0); msg->persistent = (message_timeout == 0 ? 1 : 0);
msg->delete_oldest_received_min_messages = cf->delete_oldest_received_message ? (ngx_uint_t) cf->min_messages : NGX_MAX_UINT32_VALUE;
// NGX_MAX_UINT32_VALUE to disable, otherwise = min_message_buffer_size of the publisher location from whence the message came
// FMI (For My Information): shm is still locked. // FMI (For My Information): shm is still locked.
switch (ngx_http_push_stream_broadcast_message_locked(channel, msg, r->connection->log, shpool)) { switch (ngx_http_push_stream_broadcast_message_locked(channel, msg, r->connection->log, shpool)) {
case NGX_HTTP_PUSH_STREAM_MESSAGE_QUEUED: case NGX_HTTP_PUSH_STREAM_MESSAGE_QUEUED:
...@@ -185,11 +182,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -185,11 +182,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// exceeeds max queue size. force-delete oldest message // exceeeds max queue size. force-delete oldest message
ngx_http_push_stream_force_delete_message_locked(channel, ngx_http_push_stream_get_oldest_message_locked(channel), shpool); ngx_http_push_stream_force_delete_message_locked(channel, ngx_http_push_stream_get_oldest_message_locked(channel), shpool);
} }
if (channel->stored_messages > (ngx_uint_t) cf->min_messages) {
// exceeeds min queue size. maybe delete the oldest message
ngx_http_push_stream_msg_t *oldest_msg = ngx_http_push_stream_get_oldest_message_locked(channel);
NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK_LOCKED(oldest_msg, NULL, r, "push stream module: oldest message not found", shpool);
}
published_messages = channel->last_message_id; published_messages = channel->last_message_id;
stored_messages = channel->stored_messages; stored_messages = channel->stored_messages;
......
...@@ -26,24 +26,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -26,24 +26,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, store_messages), offsetof(ngx_http_push_stream_loc_conf_t, store_messages),
NULL }, NULL },
{ ngx_string("push_stream_delete_oldest_received_message"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, delete_oldest_received_message),
NULL },
{ ngx_string("push_stream_min_message_buffer_timeout"), { ngx_string("push_stream_min_message_buffer_timeout"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot, ngx_conf_set_sec_slot,
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, buffer_timeout), offsetof(ngx_http_push_stream_loc_conf_t, buffer_timeout),
NULL }, NULL },
{ ngx_string("push_stream_min_message_buffer_length"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, min_messages),
NULL },
{ ngx_string("push_stream_max_message_buffer_length"), { ngx_string("push_stream_max_message_buffer_length"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot, ngx_conf_set_num_slot,
...@@ -165,7 +153,6 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle) ...@@ -165,7 +153,6 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle)
return NGX_ERROR; return NGX_ERROR;
} }
ngx_http_push_stream_ping_msg->expires = 0; ngx_http_push_stream_ping_msg->expires = 0;
ngx_http_push_stream_ping_msg->delete_oldest_received_min_messages = NGX_MAX_UINT32_VALUE;
ngx_http_push_stream_ping_msg->persistent = 1; ngx_http_push_stream_ping_msg->persistent = 1;
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -271,10 +258,8 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ...@@ -271,10 +258,8 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->buffer_timeout = NGX_CONF_UNSET; lcf->buffer_timeout = NGX_CONF_UNSET;
lcf->max_messages = NGX_CONF_UNSET; lcf->max_messages = NGX_CONF_UNSET;
lcf->min_messages = NGX_CONF_UNSET;
lcf->authorize_channel = NGX_CONF_UNSET; lcf->authorize_channel = NGX_CONF_UNSET;
lcf->store_messages = NGX_CONF_UNSET; lcf->store_messages = NGX_CONF_UNSET;
lcf->delete_oldest_received_message = NGX_CONF_UNSET;
lcf->max_channel_id_length = NGX_CONF_UNSET; lcf->max_channel_id_length = NGX_CONF_UNSET;
lcf->message_template.data = NULL; lcf->message_template.data = NULL;
lcf->header_template.data = NULL; lcf->header_template.data = NULL;
...@@ -297,10 +282,8 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -297,10 +282,8 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_sec_value(conf->buffer_timeout, prev->buffer_timeout, NGX_HTTP_PUSH_STREAM_DEFAULT_BUFFER_TIMEOUT); ngx_conf_merge_sec_value(conf->buffer_timeout, prev->buffer_timeout, NGX_HTTP_PUSH_STREAM_DEFAULT_BUFFER_TIMEOUT);
ngx_conf_merge_value(conf->max_messages, prev->max_messages, NGX_HTTP_PUSH_STREAM_DEFAULT_MAX_MESSAGES); ngx_conf_merge_value(conf->max_messages, prev->max_messages, NGX_HTTP_PUSH_STREAM_DEFAULT_MAX_MESSAGES);
ngx_conf_merge_value(conf->min_messages, prev->min_messages, NGX_HTTP_PUSH_STREAM_DEFAULT_MIN_MESSAGES);
ngx_conf_merge_value(conf->authorize_channel, prev->authorize_channel, 1); ngx_conf_merge_value(conf->authorize_channel, prev->authorize_channel, 1);
ngx_conf_merge_value(conf->store_messages, prev->store_messages, 1); ngx_conf_merge_value(conf->store_messages, prev->store_messages, 1);
ngx_conf_merge_value(conf->delete_oldest_received_message, prev->delete_oldest_received_message, 0);
ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_HTTP_PUSH_STREAM_MAX_CHANNEL_ID_LENGTH); ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_HTTP_PUSH_STREAM_MAX_CHANNEL_ID_LENGTH);
ngx_conf_merge_str_value(conf->header_template, prev->header_template, NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE); ngx_conf_merge_str_value(conf->header_template, prev->header_template, NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE);
ngx_conf_merge_str_value(conf->message_template, prev->message_template, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE); ngx_conf_merge_str_value(conf->message_template, prev->message_template, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE);
...@@ -312,11 +295,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -312,11 +295,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_value(conf->broadcast_channel_max_qtd, prev->broadcast_channel_max_qtd, 1); ngx_conf_merge_value(conf->broadcast_channel_max_qtd, prev->broadcast_channel_max_qtd, 1);
// sanity checks // sanity checks
if (conf->max_messages < conf->min_messages) {
// min/max buffer size makes sense?
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_message_buffer_length cannot be smaller than push_stream_min_message_buffer_length.");
return NGX_CONF_ERROR;
}
if (conf->ping_message_interval == 0) { if (conf->ping_message_interval == 0) {
conf->ping_message_interval = NGX_CONF_UNSET_MSEC; conf->ping_message_interval = NGX_CONF_UNSET_MSEC;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment