Commit 4e0d8669 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

refactor on directives and docs

parent f0f2b472
h2. Version 0.3.0
* Adding Event Source support
* Adding Polling support
* Adding Long Polling support
* Moving some directives to be used on http context instead of location
** push_stream_min_message_buffer_timeout
** push_stream_max_message_buffer_length
** push_stream_max_channel_id_length
** push_stream_ping_message_interval
** push_stream_subscriber_connection_timeout
** push_stream_broadcast_channel_prefix
** push_stream_max_number_of_channels
** push_stream_max_number_of_broadcast_channels
* Renaming some directives
** push_stream_max_reserved_memory -> push_stream_shared_memory_size
** push_stream_memory_cleanup_timeout -> push_stream_shared_memory_cleanup_objects_ttl
** push_stream_min_message_buffer_timeout -> push_stream_message_ttl
** push_stream_max_message_buffer_length -> push_stream_max_messages_stored_per_channel
** push_stream_subscriber_connection_timeout -> push_stream_subscriber_connection_ttl
h2. Version 0.2.7
......
This diff is collapsed.
......@@ -42,37 +42,37 @@ typedef struct {
typedef struct {
size_t shm_size;
ngx_msec_t memory_cleanup_interval;
time_t memory_cleanup_timeout;
time_t shm_cleanup_objects_ttl;
ngx_str_t channel_deleted_message_text;
ngx_str_t ping_message_text;
ngx_uint_t qtd_templates;
ngx_str_t broadcast_channel_prefix;
ngx_uint_t max_number_of_channels;
ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_disconnect_interval;
time_t subscriber_connection_ttl;
ngx_msec_t buffer_cleanup_interval;
time_t message_ttl;
ngx_uint_t max_messages_stored_per_channel;
ngx_uint_t max_channel_id_length;
ngx_http_push_stream_template_queue_t msg_templates;
} ngx_http_push_stream_main_conf_t;
typedef struct {
ngx_int_t index_channel_id;
ngx_int_t index_channels_path;
time_t buffer_timeout;
ngx_uint_t max_messages;
ngx_uint_t authorized_channels_only;
ngx_uint_t store_messages;
ngx_uint_t max_channel_id_length;
ngx_flag_t store_messages;
ngx_str_t header_template;
ngx_str_t message_template;
ngx_int_t message_template_index;
ngx_str_t footer_template;
ngx_str_t content_type;
ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_disconnect_interval;
time_t subscriber_connection_timeout;
ngx_str_t broadcast_channel_prefix;
ngx_uint_t broadcast_channel_max_qtd;
ngx_uint_t max_number_of_channels;
ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t buffer_cleanup_interval;
ngx_uint_t keepalive;
ngx_uint_t publisher_admin;
ngx_flag_t subscriber_eventsource;
ngx_flag_t eventsource_support;
ngx_uint_t subscriber_mode;
} ngx_http_push_stream_loc_conf_t;
......@@ -231,6 +231,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ETAG = ngx_string("Etag");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH = ngx_string("If-None-Match");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_VARY = ngx_string("Vary");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_NORMAL = ngx_string("normal");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN = ngx_string("admin");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_STREAMING = ngx_string("streaming");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_POLLING = ngx_string("polling");
......
......@@ -34,7 +34,7 @@
#include <ngx_http_push_stream_module_subscriber.h>
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT = 30; // 30 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 30; // 30 seconds
#define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE "~text~"
......
......@@ -187,10 +187,10 @@ static ngx_http_push_stream_content_subtype_t subtypes[] = {
};
static const ngx_int_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID = -1;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT = ngx_string("");
#define NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT ""
static const ngx_int_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID = -2;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT = ngx_string("Channel deleted");
#define NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT "Channel deleted"
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string("~event-id~");
......@@ -228,21 +228,21 @@ static ngx_int_t ngx_http_push_stream_send_response_content_header(ng
static void ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer);
static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_conf_t *psmcf);
static ngx_int_t ngx_http_push_stream_buffer_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_memory_cleanup();
static ngx_int_t ngx_http_push_stream_buffer_cleanup();
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_ping_timer_set(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_set(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_memory_cleanup_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_memory_cleanup_timer_set(ngx_http_push_stream_main_conf_t *psmcf);
static void ngx_http_push_stream_buffer_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_buffer_cleanup_timer_set(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_timer_set(ngx_msec_t timer_interval, ngx_event_t *event, ngx_event_handler_pt event_handler, ngx_flag_t start_timer);
static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event);
#define ngx_http_push_stream_ping_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->ping_message_interval, &ngx_http_push_stream_ping_event, ngx_http_push_stream_ping_timer_wake_handler, 1);
#define ngx_http_push_stream_disconnect_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->subscriber_disconnect_interval, &ngx_http_push_stream_disconnect_event, ngx_http_push_stream_disconnect_timer_wake_handler, 1);
#define ngx_http_push_stream_memory_cleanup_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event, ngx_http_push_stream_memory_cleanup_timer_wake_handler, 1);
#define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages);
static void ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
......
......@@ -30,8 +30,19 @@ http {
client_max_body_size 1k;
client_body_buffer_size 1k;
ignore_invalid_headers on;
client_body_in_single_buffer on;
push_stream_max_reserved_memory 10m;
push_stream_shared_memory_size 10m;
push_stream_max_channel_id_length 200;
# max messages to store in memory
push_stream_max_messages_stored_per_channel 20;
# message ttl
push_stream_message_ttl 5m;
# ping frequency
push_stream_ping_message_interval 10s;
# connection ttl to enable recycle
push_stream_subscriber_connection_ttl 15m;
# broadcast
push_stream_broadcast_channel_prefix "broad_";
server {
listen 80;
......@@ -46,18 +57,13 @@ http {
}
location /pub {
# activate publisher mode for this location
push_stream_publisher;
# activate publisher mode for this location, with admin support
push_stream_publisher admin;
# query string based channel id
set $push_stream_channel_id $arg_id;
push_stream_max_channel_id_length 200;
# store messages in memory
push_stream_store_messages on;
# max messages to store in memory
push_stream_max_message_buffer_length 20;
# message ttl
push_stream_min_message_buffer_timeout 5m;
# Message size limit
# client_max_body_size MUST be equal to client_body_buffer_size or
......@@ -72,7 +78,6 @@ http {
# positional channel path
set $push_stream_channels_path $1;
push_stream_max_channel_id_length 200;
# header to be sent when receiving new subscriber connection
push_stream_header_template "<html><head><meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-store\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-cache\">\r\n<meta http-equiv=\"Pragma\" content=\"no-cache\">\r\n<meta http-equiv=\"Expires\" content=\"Thu, 1 Jan 1970 00:00:00 GMT\">\r\n<script type=\"text/javascript\">\r\nwindow.onError = null;\r\ndocument.domain = 'localhost';\r\nparent.PushStream.register(this);\r\n</script>\r\n</head>\r\n<body onload=\"try { parent.PushStream.reset(this) } catch (e) {}\">";
# message template
......@@ -84,12 +89,6 @@ http {
# subscriber may create channels on demand or only authorized
# (publisher) may do it?
push_stream_authorized_channels_only off;
# ping frequency
push_stream_ping_message_interval 10s;
# connection ttl to enable recycle
push_stream_subscriber_connection_timeout 15m;
# broadcast
push_stream_broadcast_channel_prefix "broad_";
push_stream_broadcast_channel_max_qtd 3;
}
}
......
......@@ -42,7 +42,7 @@ ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_
}
// maximum length limiter for channel id
if ((cf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (vv->len > cf->max_channel_id_length)) {
if ((ngx_http_push_stream_module_main_conf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (vv->len > ngx_http_push_stream_module_main_conf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", vv->len);
return NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID;
}
......
......@@ -315,7 +315,7 @@ ngx_http_push_stream_send_worker_ping_message(void)
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (cur->request != NULL) {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(cur->request, ngx_http_push_stream_module);
if (pslcf->subscriber_eventsource) {
if (pslcf->eventsource_support) {
ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
} else {
ngx_http_push_stream_send_response_message(cur->request, NULL, ngx_http_push_stream_ping_msg);
......
......@@ -211,12 +211,12 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
msg->time = ngx_time();
msg->tag = (msg->time == channel->last_message_time) ? (channel->last_message_tag + 1) : 0;
// set message expiration time
msg->expires = (cf->buffer_timeout == NGX_CONF_UNSET ? 0 : (ngx_time() + cf->buffer_timeout));
msg->expires = (ngx_http_push_stream_module_main_conf->message_ttl == NGX_CONF_UNSET ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->message_ttl));
ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue);
channel->stored_messages++;
// now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, cf->max_messages, 0);
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, ngx_http_push_stream_module_main_conf->max_messages_stored_per_channel, 0);
channel->last_message_time = msg->time;
channel->last_message_tag = msg->tag;
......
This diff is collapsed.
......@@ -51,6 +51,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_str_t *last_event_id;
ngx_str_t *push_mode;
ngx_flag_t polling, longpolling;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_push_stream_module_main_conf;
// only accept GET method
if (!(r->method & NGX_HTTP_GET)) {
......@@ -67,7 +68,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
//get channels ids and backtracks from path
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, temp_pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the $push_stream_channel_path variable is required but is not set");
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the $push_stream_channels_path variable is required but is not set");
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
}
......@@ -82,7 +83,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
// could not have a large size
if ((cf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > cf->max_channel_id_length)) {
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", cur->id->len);
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
......@@ -91,7 +92,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// count subscribed channel and broadcasts
subscribed_channels_qtd++;
is_broadcast_channel = 0;
if ((cf->broadcast_channel_prefix.len > 0) && (ngx_strncmp(cur->id->data, cf->broadcast_channel_prefix.data, cf->broadcast_channel_prefix.len) == 0)) {
if ((mcf->broadcast_channel_prefix.len > 0) && (ngx_strncmp(cur->id->data, mcf->broadcast_channel_prefix.data, mcf->broadcast_channel_prefix.len) == 0)) {
is_broadcast_channel = 1;
subscribed_broadcast_channels_qtd++;
}
......@@ -170,8 +171,8 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
// setting disconnect and ping timer
ngx_http_push_stream_disconnect_timer_set(cf);
ngx_http_push_stream_ping_timer_set(cf);
ngx_http_push_stream_disconnect_timer_set();
ngx_http_push_stream_ping_timer_set();
ngx_destroy_pool(temp_pool);
return NGX_DONE;
......@@ -443,7 +444,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid;
worker_subscriber->expires = (cf->subscriber_connection_timeout == NGX_CONF_UNSET) ? 0 : (ngx_time() + cf->subscriber_connection_timeout);
worker_subscriber->expires = (ngx_http_push_stream_module_main_conf->subscriber_connection_ttl == NGX_CONF_UNSET) ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->subscriber_connection_ttl);
ngx_queue_init(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue);
......
......@@ -32,6 +32,10 @@ static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired) {
ngx_http_push_stream_msg_t *sentinel, *msg;
if (max_messages == NGX_CONF_UNSET_UINT) {
return;
}
sentinel = &channel->message_queue;
while (!ngx_queue_empty(&sentinel->queue) && ((channel->stored_messages > max_messages) || expired)) {
......@@ -359,7 +363,7 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
{
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
if (pslcf->subscriber_eventsource && (msg->event_id_message != NULL)) {
if (pslcf->eventsource_support && (msg->event_id_message != NULL)) {
ngx_http_push_stream_send_response_text(r, msg->event_id_message->data, msg->event_id_message->len, 0);
}
......@@ -430,16 +434,6 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
ngx_http_finalize_request(r, NGX_HTTP_OK);
}
static ngx_int_t
ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf)
{
if (pslcf->message_template.len > 0) {
ngx_http_push_stream_alert_worker_send_ping(ngx_pid, ngx_process_slot, ngx_cycle->log);
}
return NGX_OK;
}
static void
ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) {
......@@ -508,7 +502,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
if ((channel->stored_messages == 0) && (channel->subscribers == 0)) {
channel->deleted = 1;
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->memory_cleanup_timeout;
channel->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
(channel->broadcast) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->broadcast_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
// move the channel to trash tree
......@@ -596,7 +590,7 @@ nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
static ngx_int_t
ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_conf_t *psmcf)
ngx_http_push_stream_memory_cleanup()
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
......@@ -609,7 +603,7 @@ ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_co
static ngx_int_t
ngx_http_push_stream_buffer_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf)
ngx_http_push_stream_buffer_cleanup()
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
......@@ -673,82 +667,24 @@ ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *m
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
msg->deleted = 1;
msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->memory_cleanup_timeout;
msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
ngx_queue_insert_tail(&data->messages_to_delete.queue, &msg->queue);
}
static void
ngx_http_push_stream_ping_timer_set(ngx_http_push_stream_loc_conf_t *pslcf)
{
if (pslcf->ping_message_interval != NGX_CONF_UNSET_MSEC) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (ngx_http_push_stream_ping_event.handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_ping_event.handler == NULL) {
ngx_http_push_stream_ping_event.handler = ngx_http_push_stream_ping_timer_wake_handler;
ngx_http_push_stream_ping_event.data = pslcf;
ngx_http_push_stream_ping_event.log = ngx_cycle->log;
ngx_http_push_stream_timer_reset(pslcf->ping_message_interval, &ngx_http_push_stream_ping_event);
}
ngx_shmtx_unlock(&shpool->mutex);
}
}
}
static void
ngx_http_push_stream_disconnect_timer_set(ngx_http_push_stream_loc_conf_t *pslcf)
ngx_http_push_stream_timer_set(ngx_msec_t timer_interval, ngx_event_t *event, ngx_event_handler_pt event_handler, ngx_flag_t start_timer)
{
if (pslcf->subscriber_disconnect_interval != NGX_CONF_UNSET_MSEC) {
if ((timer_interval != NGX_CONF_UNSET_MSEC) && start_timer) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (ngx_http_push_stream_disconnect_event.handler == NULL) {
if (event->handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_disconnect_event.handler == NULL) {
ngx_http_push_stream_disconnect_event.handler = ngx_http_push_stream_disconnect_timer_wake_handler;
ngx_http_push_stream_disconnect_event.data = pslcf;
ngx_http_push_stream_disconnect_event.log = ngx_cycle->log;
ngx_http_push_stream_timer_reset(pslcf->subscriber_disconnect_interval, &ngx_http_push_stream_disconnect_event);
}
ngx_shmtx_unlock(&shpool->mutex);
}
}
}
static void
ngx_http_push_stream_memory_cleanup_timer_set(ngx_http_push_stream_main_conf_t *psmcf)
{
if (psmcf->memory_cleanup_interval != NGX_CONF_UNSET_MSEC) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (ngx_http_push_stream_memory_cleanup_event.handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_memory_cleanup_event.handler == NULL) {
ngx_http_push_stream_memory_cleanup_event.handler = ngx_http_push_stream_memory_cleanup_timer_wake_handler;
ngx_http_push_stream_memory_cleanup_event.data = psmcf;
ngx_http_push_stream_memory_cleanup_event.log = ngx_cycle->log;
ngx_http_push_stream_timer_reset(psmcf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event);
}
ngx_shmtx_unlock(&shpool->mutex);
}
}
}
static void
ngx_http_push_stream_buffer_cleanup_timer_set(ngx_http_push_stream_loc_conf_t *pslcf)
{
if ((pslcf->buffer_cleanup_interval != NGX_CONF_UNSET_MSEC) && pslcf->store_messages) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (ngx_http_push_stream_buffer_cleanup_event.handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_buffer_cleanup_event.handler == NULL) {
ngx_http_push_stream_buffer_cleanup_event.handler = ngx_http_push_stream_buffer_timer_wake_handler;
ngx_http_push_stream_buffer_cleanup_event.data = pslcf;
ngx_http_push_stream_buffer_cleanup_event.log = ngx_cycle->log;
ngx_http_push_stream_timer_reset(pslcf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event);
if (event->handler == NULL) {
event->handler = event_handler;
event->data = NULL;
event->log = ngx_cycle->log;
ngx_http_push_stream_timer_reset(timer_interval, event);
}
ngx_shmtx_unlock(&shpool->mutex);
}
......@@ -775,38 +711,29 @@ ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_e
static void
ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_push_stream_loc_conf_t *pslcf = ev->data;
ngx_http_push_stream_send_ping(ev->log, pslcf);
ngx_http_push_stream_timer_reset(pslcf->ping_message_interval, &ngx_http_push_stream_ping_event);
ngx_http_push_stream_alert_worker_send_ping(ngx_pid, ngx_process_slot, ngx_cycle->log);
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->ping_message_interval, &ngx_http_push_stream_ping_event);
}
static void
ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_push_stream_loc_conf_t *pslcf = ev->data;
ngx_http_push_stream_alert_worker_disconnect_subscribers(ngx_pid, ngx_process_slot, ngx_cycle->log);
ngx_http_push_stream_timer_reset(pslcf->subscriber_disconnect_interval, &ngx_http_push_stream_disconnect_event);
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->subscriber_disconnect_interval, &ngx_http_push_stream_disconnect_event);
}
static void
ngx_http_push_stream_memory_cleanup_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_push_stream_main_conf_t *psmcf = ev->data;
ngx_http_push_stream_memory_cleanup(ev->log, psmcf);
ngx_http_push_stream_timer_reset(psmcf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event);
ngx_http_push_stream_memory_cleanup();
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event);
}
static void
ngx_http_push_stream_buffer_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_push_stream_loc_conf_t *pslcf = ev->data;
ngx_http_push_stream_buffer_cleanup(ev->log, pslcf);
ngx_http_push_stream_timer_reset(pslcf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event);
ngx_http_push_stream_buffer_cleanup();
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event);
}
static u_char *
......
......@@ -174,6 +174,7 @@ ngx_http_push_stream_find_channel_locked(ngx_str_t *id, ngx_log_t *log)
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_push_stream_module_main_conf;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
......@@ -193,12 +194,12 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
return channel;
}
if ((cf->broadcast_channel_prefix.len > 0) && (ngx_strncmp(id->data, cf->broadcast_channel_prefix.data, cf->broadcast_channel_prefix.len) == 0)) {
if ((mcf->broadcast_channel_prefix.len > 0) && (ngx_strncmp(id->data, mcf->broadcast_channel_prefix.data, mcf->broadcast_channel_prefix.len) == 0)) {
is_broadcast_channel = 1;
}
if (((!is_broadcast_channel) && (cf->max_number_of_channels != NGX_CONF_UNSET_UINT) && (cf->max_number_of_channels == data->channels)) ||
((is_broadcast_channel) && (cf->max_number_of_broadcast_channels != NGX_CONF_UNSET_UINT) && (cf->max_number_of_broadcast_channels == data->broadcast_channels))) {
if (((!is_broadcast_channel) && (mcf->max_number_of_channels != NGX_CONF_UNSET_UINT) && (mcf->max_number_of_channels == data->channels)) ||
((is_broadcast_channel) && (mcf->max_number_of_broadcast_channels != NGX_CONF_UNSET_UINT) && (mcf->max_number_of_broadcast_channels == data->broadcast_channels))) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED;
}
......
......@@ -151,11 +151,11 @@ module BaseTestCase
@memory_cleanup_timeout = '5m'
@config_template = nil
@keepalive = 'off'
@publisher_admin = 'off'
@channel_deleted_message_text = nil
@ping_message_text = nil
@subscriber_eventsource = 'off'
@subscriber_mode = nil
@publisher_mode = nil
self.send(:global_configuration) if self.respond_to?(:global_configuration)
end
......@@ -234,10 +234,23 @@ http {
ignore_invalid_headers on;
client_body_in_single_buffer on;
client_body_temp_path <%= @client_body_temp %>;
<%= "push_stream_max_reserved_memory #{@max_reserved_memory};" unless @max_reserved_memory.nil? %>
<%= "push_stream_memory_cleanup_timeout #{@memory_cleanup_timeout};" unless @memory_cleanup_timeout.nil? %>
<%= "push_stream_shared_memory_size #{@max_reserved_memory};" unless @max_reserved_memory.nil? %>
<%= "push_stream_shared_memory_cleanup_objects_ttl #{@memory_cleanup_timeout};" unless @memory_cleanup_timeout.nil? %>
<%= %{push_stream_channel_deleted_message_text "#{@channel_deleted_message_text}";} unless @channel_deleted_message_text.nil? %>
<%= %{push_stream_ping_message_text "#{@ping_message_text}";} unless @ping_message_text.nil? %>
<%= %{push_stream_broadcast_channel_prefix "#{@broadcast_channel_prefix}";} unless @broadcast_channel_prefix.nil? %>
<%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %>
<%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %>
# max messages to store in memory
<%= "push_stream_max_messages_stored_per_channel #{@max_message_buffer_length};" unless @max_message_buffer_length.nil? %>
# message ttl
<%= "push_stream_message_ttl #{@min_message_buffer_timeout};" unless @min_message_buffer_timeout.nil? %>
<%= "push_stream_max_channel_id_length #{@max_channel_id_length};" unless @max_channel_id_length.nil? %>
# ping frequency
<%= "push_stream_ping_message_interval #{@ping_message_interval};" unless @ping_message_interval.nil? %>
# connection ttl to enable recycle
<%= "push_stream_subscriber_connection_ttl #{@subscriber_connection_timeout};" unless @subscriber_connection_timeout.nil? %>
server {
listen <%=nginx_port%>;
......@@ -256,28 +269,15 @@ http {
location /pub {
# activate publisher mode for this location
push_stream_publisher;
# activate publisher admin mode for this location
<%= "push_stream_publisher_admin #{@publisher_admin};" unless @publisher_admin.nil? %>
push_stream_publisher <%= @publisher_mode unless @publisher_mode.nil? || @publisher_mode == "normal" %>;
# query string based channel id
set $push_stream_channel_id $arg_id;
# store messages
<%= "push_stream_store_messages #{@store_messages};" unless @store_messages.nil? %>
# max messages to store in memory
<%= "push_stream_max_message_buffer_length #{@max_message_buffer_length};" unless @max_message_buffer_length.nil? %>
# message ttl
<%= "push_stream_min_message_buffer_timeout #{@min_message_buffer_timeout};" unless @min_message_buffer_timeout.nil? %>
# keepalive
<%= "push_stream_keepalive #{@keepalive};" unless @keepalive.nil? %>
<%= "push_stream_max_channel_id_length #{@max_channel_id_length};" unless @max_channel_id_length.nil? %>
<%= %{push_stream_broadcast_channel_prefix "#{@broadcast_channel_prefix}";} unless @broadcast_channel_prefix.nil? %>
<%= "push_stream_broadcast_channel_max_qtd #{@broadcast_channel_max_qtd};" unless @broadcast_channel_max_qtd.nil? %>
<%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %>
<%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %>
# client_max_body_size MUST be equal to client_body_buffer_size or
# you will be sorry.
client_max_body_size <%= @client_max_body_size.nil? ? '32k' : @client_max_body_size %>;
......@@ -289,11 +289,10 @@ http {
push_stream_subscriber <%= @subscriber_mode unless @subscriber_mode.nil? || @subscriber_mode == "streaming" %>;
# activate event source support for this location
<%= "push_stream_subscriber_eventsource #{@subscriber_eventsource};" unless @subscriber_eventsource.nil? %>
<%= "push_stream_eventsource_support #{@subscriber_eventsource};" unless @subscriber_eventsource.nil? %>
# positional channel path
set $push_stream_channels_path $1;
<%= "push_stream_max_channel_id_length #{@max_channel_id_length};" unless @max_channel_id_length.nil? %>
# header to be sent when receiving new subscriber connection
<%= %{push_stream_header_template "#{@header_template}";} unless @header_template.nil? %>
# message template
......@@ -305,15 +304,7 @@ http {
# subscriber may create channels on demand or only authorized
# (publisher) may do it?
<%= "push_stream_authorized_channels_only #{@authorized_channels_only};" unless @authorized_channels_only.nil? %>
# ping frequency
<%= "push_stream_ping_message_interval #{@ping_message_interval};" unless @ping_message_interval.nil? %>
# connection ttl to enable recycle
<%= "push_stream_subscriber_connection_timeout #{@subscriber_connection_timeout};" unless @subscriber_connection_timeout.nil? %>
<%= %{push_stream_broadcast_channel_prefix "#{@broadcast_channel_prefix}";} unless @broadcast_channel_prefix.nil? %>
<%= "push_stream_broadcast_channel_max_qtd #{@broadcast_channel_max_qtd};" unless @broadcast_channel_max_qtd.nil? %>
<%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %>
<%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %>
}
<%= @extra_location %>
......
......@@ -4,7 +4,7 @@ class TestPublisherAdmin < Test::Unit::TestCase
include BaseTestCase
def global_configuration
@publisher_admin = 'on'
@publisher_mode = 'admin'
end
def test_admin_access_whithout_channel_id
......
......@@ -3,8 +3,7 @@ require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestSetuParameters < Test::Unit::TestCase
include BaseTestCase
def initialize(opts)
super(opts)
def global_configuration
@disable_start_stop_server = true
end
......@@ -27,7 +26,7 @@ class TestSetuParameters < Test::Unit::TestCase
end
def test_subscriber_connection_timeout_cannot_be_zero
expected_error_message = "push_stream_subscriber_connection_timeout cannot be zero"
expected_error_message = "push_stream_subscriber_connection_ttl cannot be zero"
@subscriber_connection_timeout = 0
self.create_config_file
......@@ -45,7 +44,7 @@ class TestSetuParameters < Test::Unit::TestCase
end
def test_min_message_buffer_timeout_cannot_be_zero
expected_error_message = "push_stream_min_message_buffer_timeout cannot be zero"
expected_error_message = "push_stream_message_ttl cannot be zero"
@min_message_buffer_timeout = 0
self.create_config_file
......@@ -54,7 +53,7 @@ class TestSetuParameters < Test::Unit::TestCase
end
def test_max_message_buffer_length_cannot_be_zero
expected_error_message = "push_stream_max_message_buffer_length cannot be zero"
expected_error_message = "push_stream_max_messages_stored_per_channel cannot be zero"
@max_message_buffer_length = 0
self.create_config_file
......@@ -101,16 +100,6 @@ class TestSetuParameters < Test::Unit::TestCase
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_broadcast_channel_prefix_cannot_be_set_without_broadcast_channel_max_qtd
expected_error_message = "cannot set broadcast channel prefix if push_stream_broadcast_channel_max_qtd is not set"
@broadcast_channel_prefix = "broad_"
@broadcast_channel_max_qtd = nil
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_max_number_of_channels_cannot_be_zero
expected_error_message = "push_stream_max_number_of_channels cannot be zero"
@max_number_of_channels = 0
......@@ -141,7 +130,7 @@ class TestSetuParameters < Test::Unit::TestCase
end
def test_memory_cleanup_timeout
expected_error_message = "memory cleanup timeout cannot't be less than 30."
expected_error_message = "memory cleanup objects ttl cannot't be less than 30."
@memory_cleanup_timeout = '15s'
self.create_config_file
......@@ -221,4 +210,42 @@ class TestSetuParameters < Test::Unit::TestCase
self.stop_server
end
def test_invalid_publisher_mode
expected_error_message = "invalid push_stream_publisher mode value: unknown, accepted values (normal, admin)"
@publisher_mode = "unknown"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_valid_publisher_mode
expected_error_message = "invalid push_stream_publisher mode value"
@publisher_mode = ""
self.create_config_file
stderr_msg = self.start_server
assert(!stderr_msg.include?(expected_error_message), "Message error founded: '#{ stderr_msg }'")
self.stop_server
@publisher_mode = "normal"
self.create_config_file
stderr_msg = self.start_server
assert(!stderr_msg.include?(expected_error_message), "Message error founded: '#{ stderr_msg }'")
self.stop_server
@publisher_mode = "admin"
self.create_config_file
stderr_msg = self.start_server
assert(!stderr_msg.include?(expected_error_message), "Message error founded: '#{ stderr_msg }'")
self.stop_server
end
end
......@@ -679,6 +679,7 @@ class TestSubscriber < Test::Unit::TestCase
def config_test_different_message_templates
@message_template = '{\"text\":\"~text~\"}'
@header_template = nil
@subscriber_connection_timeout = '1s'
@extra_location = %q{
location ~ /sub2/(.*)? {
# activate subscriber mode for this location
......@@ -688,7 +689,6 @@ class TestSubscriber < Test::Unit::TestCase
set $push_stream_channels_path $1;
# message template
push_stream_message_template "{\"msg\":\"~text~\"}";
push_stream_subscriber_connection_timeout 1s;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment