Commit dd12880f authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

including channels_statistics handler, adding some sanity check for...

including channels_statistics handler, adding some sanity check for configuration and removing unnecessary code for this version
parent 65a25ea5
/*
* ngx_http_push_stream_module_setup.h
*
* Created on: Oct 26, 2010
* Authors: Wandenberg Peixoto <wandenberg@gmail.com> & Rogério Schneider <stockrt@gmail.com>
*/
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_SETUP_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_SETUP_H_
#include <ngx_http_push_stream_module.h>
#include <ngx_http_push_stream_module_publisher.h>
#include <ngx_http_push_stream_module_subscriber.h>
#define NGX_HTTP_PUSH_STREAM_MAX_CHANNEL_ID_LENGTH 1024 // bytes
//
//#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
#define NGX_HTTP_PUSH_STREAM_DEFAULT_BUFFER_TIMEOUT 7200 // 2 hours
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MAX_MESSAGES 10
#define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_CONTENT_TYPE "text/plain"
#define NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX ""
// variables
static ngx_str_t ngx_http_push_stream_channel_id = ngx_string("push_stream_channel_id");
static ngx_str_t ngx_http_push_stream_channels_path = ngx_string("push_stream_channels_path");
static char * push_stream_channels_statistics(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// publisher
static char * ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// subscriber
static char * ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// setup
static char * ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *));
static ngx_int_t ngx_http_push_stream_init_module(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_init_worker(ngx_cycle_t *cycle);
static void ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle);
static void ngx_http_push_stream_exit_master(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_postconfig(ngx_conf_t *cf);
static void * ngx_http_push_stream_create_main_conf(ngx_conf_t *cf);
static void * ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf);
static char * ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
static ngx_int_t ngx_http_push_stream_movezig_channel_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SETUP_H_ */
#include <ngx_http_push_stream_module.h> /*
* ngx_http_push_stream_module_setup.c
*
* Created on: Oct 26, 2010
* Authors: Wandenberg Peixoto <wandenberg@gmail.com> & Rogério Schneider <stockrt@gmail.com>
*/
#include <ngx_http_push_stream_module_setup.h>
static ngx_command_t ngx_http_push_stream_commands[] = { static ngx_command_t ngx_http_push_stream_commands[] = {
{ ngx_string("push_stream_channels_statistics"),
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
push_stream_channels_statistics,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
{ ngx_string("push_stream_publisher"), { ngx_string("push_stream_publisher"),
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS, NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
ngx_http_push_stream_publisher, ngx_http_push_stream_publisher,
...@@ -48,7 +60,7 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -48,7 +60,7 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot, ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, authorize_channel), offsetof(ngx_http_push_stream_loc_conf_t, authorized_channels_only),
NULL }, NULL },
{ ngx_string("push_stream_header_template"), { ngx_string("push_stream_header_template"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
...@@ -137,29 +149,6 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle) ...@@ -137,29 +149,6 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle)
ngx_http_push_stream_worker_processes = ccf->worker_processes; ngx_http_push_stream_worker_processes = ccf->worker_processes;
// initialize subscriber queues
// pool, please
if ((ngx_http_push_stream_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, cycle->log)) == NULL) { // I trust the cycle pool size to be a well-tuned one.
return NGX_ERROR;
}
if (ngx_http_push_stream_ping_msg == NULL) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_ping_msg == NULL) {
if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_slab_alloc_locked(sizeof(ngx_http_push_stream_msg_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, cycle->log, 0, "push stream module: unable to allocate memory for ngx_http_push_stream_ping_msg");
return NGX_ERROR;
}
ngx_http_push_stream_ping_msg->expires = 0;
ngx_http_push_stream_ping_msg->persistent = 1;
}
ngx_shmtx_unlock(&shpool->mutex);
}
NGX_HTTP_PUSH_STREAM_MAKE_IN_MEMORY_CHAIN(ngx_http_push_stream_header_chain, ngx_http_push_stream_pool, "push stream module: unable to allocate chain to send header to new subscribers");
NGX_HTTP_PUSH_STREAM_MAKE_IN_MEMORY_CHAIN(ngx_http_push_stream_crlf_chain, ngx_http_push_stream_pool, "push stream module: unable to allocate chain to send crlf to subscribers, on flush");
// initialize our little IPC // initialize our little IPC
return ngx_http_push_stream_init_ipc(cycle, ngx_http_push_stream_worker_processes); return ngx_http_push_stream_init_ipc(cycle, ngx_http_push_stream_worker_processes);
...@@ -180,12 +169,11 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle) ...@@ -180,12 +169,11 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
static void static void
ngx_http_push_stream_exit_master(ngx_cycle_t *cycle) ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
{ {
ngx_pfree(ngx_http_push_stream_pool, ngx_http_push_stream_ping_buf); ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_pfree(ngx_http_push_stream_pool, ngx_http_push_stream_ping_msg);
ngx_free_chain(ngx_http_push_stream_pool, ngx_http_push_stream_header_chain);
ngx_free_chain(ngx_http_push_stream_pool, ngx_http_push_stream_crlf_chain);
// destroy channel tree in shared memory // destroy channel tree in shared memory
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_movezig_channel_locked); ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_movezig_channel_locked);
ngx_shmtx_unlock(&shpool->mutex);
} }
...@@ -199,6 +187,10 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle) ...@@ -199,6 +187,10 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
ngx_del_timer(&ngx_http_push_stream_ping_event); ngx_del_timer(&ngx_http_push_stream_ping_event);
} }
if (ngx_http_push_stream_disconnect_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_disconnect_event);
}
ngx_http_push_stream_ipc_exit_worker(cycle); ngx_http_push_stream_ipc_exit_worker(cycle);
} }
...@@ -209,7 +201,6 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf) ...@@ -209,7 +201,6 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf)
ngx_http_push_stream_main_conf_t *conf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module); ngx_http_push_stream_main_conf_t *conf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
size_t shm_size; size_t shm_size;
// initialize shared memory // initialize shared memory
if (conf->shm_size == NGX_CONF_UNSET_SIZE) { if (conf->shm_size == NGX_CONF_UNSET_SIZE) {
conf->shm_size = NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE; conf->shm_size = NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE;
...@@ -232,7 +223,7 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf) ...@@ -232,7 +223,7 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf)
static void * static void *
ngx_http_push_stream_create_main_conf(ngx_conf_t *cf) ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
{ {
ngx_http_push_stream_main_conf_t *mcf = ngx_pcalloc(cf->pool, sizeof(*mcf)); ngx_http_push_stream_main_conf_t *mcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_main_conf_t));
if (mcf == NULL) { if (mcf == NULL) {
...@@ -249,18 +240,17 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf) ...@@ -249,18 +240,17 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
static void * static void *
ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
{ {
ngx_http_push_stream_loc_conf_t *lcf = ngx_pcalloc(cf->pool, sizeof(*lcf)); ngx_http_push_stream_loc_conf_t *lcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_loc_conf_t));
if (lcf == NULL) { if (lcf == NULL) {
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
lcf->buffer_timeout = NGX_CONF_UNSET; lcf->buffer_timeout = NGX_CONF_UNSET;
lcf->max_messages = NGX_CONF_UNSET; lcf->max_messages = NGX_CONF_UNSET_UINT;
lcf->authorize_channel = NGX_CONF_UNSET; lcf->authorized_channels_only = NGX_CONF_UNSET_UINT;
lcf->store_messages = NGX_CONF_UNSET; lcf->store_messages = NGX_CONF_UNSET_UINT;
lcf->max_channel_id_length = NGX_CONF_UNSET; lcf->max_channel_id_length = NGX_CONF_UNSET_UINT;
lcf->message_template.data = NULL; lcf->message_template.data = NULL;
lcf->header_template.data = NULL; lcf->header_template.data = NULL;
lcf->ping_message_interval = NGX_CONF_UNSET_MSEC; lcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
...@@ -268,7 +258,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ...@@ -268,7 +258,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->subscriber_disconnect_interval = NGX_CONF_UNSET_MSEC; lcf->subscriber_disconnect_interval = NGX_CONF_UNSET_MSEC;
lcf->subscriber_connection_timeout = NGX_CONF_UNSET; lcf->subscriber_connection_timeout = NGX_CONF_UNSET;
lcf->broadcast_channel_prefix.data = NULL; lcf->broadcast_channel_prefix.data = NULL;
lcf->broadcast_channel_max_qtd = NGX_CONF_UNSET; lcf->broadcast_channel_max_qtd = NGX_CONF_UNSET_UINT;
return lcf; return lcf;
} }
...@@ -279,12 +269,11 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -279,12 +269,11 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{ {
ngx_http_push_stream_loc_conf_t *prev = parent, *conf = child; ngx_http_push_stream_loc_conf_t *prev = parent, *conf = child;
ngx_conf_merge_sec_value(conf->buffer_timeout, prev->buffer_timeout, NGX_CONF_UNSET);
ngx_conf_merge_sec_value(conf->buffer_timeout, prev->buffer_timeout, NGX_HTTP_PUSH_STREAM_DEFAULT_BUFFER_TIMEOUT); ngx_conf_merge_uint_value(conf->max_messages, prev->max_messages, NGX_CONF_UNSET_UINT);
ngx_conf_merge_value(conf->max_messages, prev->max_messages, NGX_HTTP_PUSH_STREAM_DEFAULT_MAX_MESSAGES); ngx_conf_merge_uint_value(conf->authorized_channels_only, prev->authorized_channels_only, 0);
ngx_conf_merge_value(conf->authorize_channel, prev->authorize_channel, 1); ngx_conf_merge_uint_value(conf->store_messages, prev->store_messages, 0);
ngx_conf_merge_value(conf->store_messages, prev->store_messages, 1); ngx_conf_merge_uint_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_CONF_UNSET_UINT);
ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_HTTP_PUSH_STREAM_MAX_CHANNEL_ID_LENGTH);
ngx_conf_merge_str_value(conf->header_template, prev->header_template, NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE); ngx_conf_merge_str_value(conf->header_template, prev->header_template, NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE);
ngx_conf_merge_str_value(conf->message_template, prev->message_template, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE); ngx_conf_merge_str_value(conf->message_template, prev->message_template, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE);
ngx_conf_merge_msec_value(conf->ping_message_interval, prev->ping_message_interval, NGX_CONF_UNSET_MSEC); ngx_conf_merge_msec_value(conf->ping_message_interval, prev->ping_message_interval, NGX_CONF_UNSET_MSEC);
...@@ -292,16 +281,96 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -292,16 +281,96 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_msec_value(conf->subscriber_disconnect_interval, prev->subscriber_disconnect_interval, NGX_CONF_UNSET_MSEC); ngx_conf_merge_msec_value(conf->subscriber_disconnect_interval, prev->subscriber_disconnect_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_sec_value(conf->subscriber_connection_timeout, prev->subscriber_connection_timeout, NGX_CONF_UNSET); ngx_conf_merge_sec_value(conf->subscriber_connection_timeout, prev->subscriber_connection_timeout, NGX_CONF_UNSET);
ngx_conf_merge_str_value(conf->broadcast_channel_prefix, prev->broadcast_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX); ngx_conf_merge_str_value(conf->broadcast_channel_prefix, prev->broadcast_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX);
ngx_conf_merge_value(conf->broadcast_channel_max_qtd, prev->broadcast_channel_max_qtd, 1); ngx_conf_merge_uint_value(conf->broadcast_channel_max_qtd, prev->broadcast_channel_max_qtd, NGX_CONF_UNSET_UINT);
// sanity checks // sanity checks
// ping message interval cannot be zero
if ((conf->ping_message_interval != NGX_CONF_UNSET_MSEC) && (conf->ping_message_interval == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_ping_message_interval cannot be zero.");
return NGX_CONF_ERROR;
}
if (conf->ping_message_interval == 0) { // ping message interval cannot be set without a message template
conf->ping_message_interval = NGX_CONF_UNSET_MSEC; if ((conf->ping_message_interval != NGX_CONF_UNSET_MSEC) && (conf->ping_message_interval > 0) && (conf->message_template.len == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "cannot have ping message if push_stream_message_template is not set or blank.");
return NGX_CONF_ERROR;
}
// subscriber disconnect interval cannot be zero
if ((conf->subscriber_disconnect_interval != NGX_CONF_UNSET_MSEC) && (conf->subscriber_disconnect_interval == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_disconnect_interval cannot be zero.");
return NGX_CONF_ERROR;
}
// subscriber connection timeout cannot be zero
if ((conf->subscriber_connection_timeout != NGX_CONF_UNSET) && (conf->subscriber_connection_timeout == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_connection_timeout cannot be zero.");
return NGX_CONF_ERROR;
} }
if (conf->subscriber_disconnect_interval == 0) { // subscriber disconnect interval cannot be set without a connection timeout
conf->subscriber_disconnect_interval = NGX_CONF_UNSET_MSEC; if ((conf->subscriber_disconnect_interval != NGX_CONF_UNSET_MSEC) && (conf->subscriber_disconnect_interval > 0) && (conf->subscriber_connection_timeout == NGX_CONF_UNSET)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "cannot set subscriber disconnect interval if push_stream_subscriber_connection_timeout is not set or zero.");
return NGX_CONF_ERROR;
}
// subscriber connection timeout cannot be set without a disconnect interval
if ((conf->subscriber_connection_timeout != NGX_CONF_UNSET) && (conf->subscriber_connection_timeout > 0) && (conf->subscriber_disconnect_interval == NGX_CONF_UNSET_MSEC)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "cannot set subscriber connection timeout if push_stream_subscriber_disconnect_interval is not set or zero.");
return NGX_CONF_ERROR;
}
// buffer timeout cannot be zero
if ((conf->buffer_timeout != NGX_CONF_UNSET) && (conf->buffer_timeout == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_min_message_buffer_timeout cannot be zero.");
return NGX_CONF_ERROR;
}
// max buffer message cannot be zero
if ((conf->max_messages != NGX_CONF_UNSET_UINT) && (conf->max_messages == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_message_buffer_length cannot be zero.");
return NGX_CONF_ERROR;
}
// store messages cannot be set without buffer timeout or max messages
if ((conf->store_messages != NGX_CONF_UNSET_UINT) && (conf->store_messages) && (conf->buffer_timeout == NGX_CONF_UNSET) && (conf->max_messages == NGX_CONF_UNSET_UINT)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_store_messages cannot be set without set max message buffer length or min message buffer timeout.");
return NGX_CONF_ERROR;
}
// max channel id length cannot be zero
if ((conf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (conf->max_channel_id_length == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_channel_id_length cannot be zero.");
return NGX_CONF_ERROR;
}
// broadcast channel max qtd cannot be zero
if ((conf->broadcast_channel_max_qtd != NGX_CONF_UNSET_UINT) && (conf->broadcast_channel_max_qtd == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_broadcast_channel_max_qtd cannot be zero.");
return NGX_CONF_ERROR;
}
// broadcast channel max qtd cannot be set without a channel prefix
if ((conf->broadcast_channel_max_qtd != NGX_CONF_UNSET_UINT) && (conf->broadcast_channel_max_qtd > 0) && (conf->broadcast_channel_prefix.len == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "cannot set broadcast channel max qtd if push_stream_broadcast_channel_prefix is not set or blank.");
return NGX_CONF_ERROR;
}
// broadcast channel prefix cannot be set without a channel max qtd
if ((conf->broadcast_channel_prefix.len > 0) && (conf->broadcast_channel_max_qtd == NGX_CONF_UNSET_UINT)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "cannot set broadcast channel prefix if push_stream_broadcast_channel_max_qtd is not set.");
return NGX_CONF_ERROR;
}
// append crlf to templates
if (conf->header_template.len > 0) {
conf->header_template.data = ngx_http_push_stream_append_crlf(&conf->header_template, cf->pool);
conf->header_template.len = ngx_strlen(conf->header_template.data);
}
if (conf->message_template.len > 0) {
conf->message_template.data = ngx_http_push_stream_append_crlf(&conf->message_template, cf->pool);
conf->message_template.len = ngx_strlen(conf->message_template.data);
} }
return NGX_CONF_OK; return NGX_CONF_OK;
...@@ -312,25 +381,33 @@ static char * ...@@ -312,25 +381,33 @@ static char *
ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *)) ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *))
{ {
ngx_http_core_loc_conf_t *clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module); ngx_http_core_loc_conf_t *clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
ngx_http_push_stream_loc_conf_t *pslcf = conf;
clcf->handler = handler; clcf->handler = handler;
clcf->if_modified_since = NGX_HTTP_IMS_OFF; clcf->if_modified_since = NGX_HTTP_IMS_OFF;
pslcf->index_channel_id = ngx_http_get_variable_index(cf, &ngx_http_push_stream_channel_id);
if (pslcf->index_channel_id == NGX_ERROR) {
return NGX_CONF_ERROR;
}
return NGX_CONF_OK; return NGX_CONF_OK;
} }
static char *
push_stream_channels_statistics(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
return ngx_http_push_stream_setup_handler(cf, conf, &push_stream_channels_statistics_handler);
}
static char * static char *
ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{ {
return ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_publisher_handler); char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_publisher_handler);
if (rc == NGX_CONF_OK) {
ngx_http_push_stream_loc_conf_t *pslcf = conf;
pslcf->index_channel_id = ngx_http_get_variable_index(cf, &ngx_http_push_stream_channel_id);
if (pslcf->index_channel_id == NGX_ERROR) {
rc = NGX_CONF_ERROR;
}
}
return rc;
} }
...@@ -339,7 +416,6 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) ...@@ -339,7 +416,6 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{ {
char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_subscriber_handler); char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_subscriber_handler);
if (rc == NGX_CONF_OK) { if (rc == NGX_CONF_OK) {
ngx_http_push_stream_loc_conf_t *pslcf = conf; ngx_http_push_stream_loc_conf_t *pslcf = conf;
pslcf->index_channels_path = ngx_http_get_variable_index(cf, &ngx_http_push_stream_channels_path); pslcf->index_channels_path = ngx_http_get_variable_index(cf, &ngx_http_push_stream_channels_path);
...@@ -399,17 +475,12 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -399,17 +475,12 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
} }
// great justice appears to be at hand
static ngx_int_t static ngx_int_t
ngx_http_push_stream_movezig_channel_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool) ngx_http_push_stream_movezig_channel_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{ {
ngx_queue_t *sentinel = &channel->message_queue->queue; ngx_http_push_stream_msg_t *sentinel = &channel->message_queue;
ngx_http_push_stream_msg_t *msg = NULL; while (!ngx_queue_empty(&sentinel->queue)) {
ngx_http_push_stream_force_delete_message_locked(channel, (ngx_http_push_stream_msg_t *) ngx_queue_next(&sentinel->queue), shpool);
while (!ngx_queue_empty(sentinel)) {
msg = ngx_queue_data(ngx_queue_head(sentinel), ngx_http_push_stream_msg_t, queue);
ngx_http_push_stream_force_delete_message_locked(channel, msg, shpool);
} }
return NGX_OK; return NGX_OK;
......
#require 'rubygems' require File.expand_path('base_test_case', File.dirname(__FILE__))
#require 'popen4'
#require 'test/unit' class TestSetuParameters < Test::Unit::TestCase
#require File.expand_path('base_test_case', File.dirname(__FILE__)) include BaseTestCase
#
#class TestSetuParameters < Test::Unit::TestCase def test_ping_message_interval_cannot_be_zero
# include BaseTestCase expected_error_message = "push_stream_ping_message_interval cannot be zero"
# @test_config_file = "test_ping_message_interval_cannot_be_zero.conf"
# def test_min_buffer_messages_greater_them_max_buffer_messages @ping_message_interval = 0
# expected_error_message = "push_stream_max_message_buffer_length cannot be smaller than push_stream_min_message_buffer_length"
# @test_config_file = "test_min_buffer_messages_greater_them_max_buffer_messages.conf" self.create_config_file
# @max_message_buffer_length = 20 stderr_msg = self.start_server
# @min_message_buffer_length = 21 assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
# end
# self.create_config_file
# stderr_msg = self.start_server def test_ping_message_interval_cannot_be_set_without_a_message_template
# assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'") expected_error_message = "cannot have ping message if push_stream_message_template is not set or blank"
# end @test_config_file = "test_ping_message_interval_cannot_be_set_without_a_message_template.conf"
# @ping_message_interval = "1s"
#end
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_ping_message_interval_cannot_be_set_if_message_template_is_blank
expected_error_message = "cannot have ping message if push_stream_message_template is not set or blank"
@test_config_file = "test_ping_message_interval_cannot_be_set_if_message_template_is_blank.conf"
@ping_message_interval = "1s"
@message_template = ""
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_subscriber_disconnect_interval_cannot_be_zero
expected_error_message = "push_stream_subscriber_disconnect_interval cannot be zero"
@test_config_file = "test_subscriber_disconnect_interval_cannot_be_zero.conf"
@subscriber_disconnect_interval = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_subscriber_connection_timeout_cannot_be_zero
expected_error_message = "push_stream_subscriber_connection_timeout cannot be zero"
@test_config_file = "test_subscriber_connection_timeout_cannot_be_zero.conf"
@subscriber_connection_timeout = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_subscriber_disconnect_interval_cannot_be_set_without_a_connection_timeout
expected_error_message = "cannot set subscriber disconnect interval if push_stream_subscriber_connection_timeout is not set or zero"
@test_config_file = "test_subscriber_disconnect_interval_cannot_be_set_without_a_connection_timeout.conf"
@subscriber_disconnect_interval = "1s"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_subscriber_connection_timeout_cannot_be_set_without_a_disconnect_interval
expected_error_message = "cannot set subscriber connection timeout if push_stream_subscriber_disconnect_interval is not set or zero"
@test_config_file = "test_subscriber_connection_timeout_cannot_be_set_without_a_disconnect_interval.conf"
@subscriber_connection_timeout = "1s"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_max_channel_id_length_cannot_be_zero
expected_error_message = "push_stream_max_channel_id_length cannot be zero"
@test_config_file = "test_max_channel_id_length_cannot_be_zero.conf"
@max_channel_id_length = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_min_message_buffer_timeout_cannot_be_zero
expected_error_message = "push_stream_min_message_buffer_timeout cannot be zero"
@test_config_file = "test_min_message_buffer_timeout_cannot_be_zero.conf"
@min_message_buffer_timeout = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_max_message_buffer_length_cannot_be_zero
expected_error_message = "push_stream_max_message_buffer_length cannot be zero"
@test_config_file = "test_max_message_buffer_length_cannot_be_zero.conf"
@max_message_buffer_length = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_store_messages_cannot_be_set_without_set_max_message_buffer_length_or_min_message_buffer_timeout
expected_error_message = "push_stream_store_messages cannot be set without set max message buffer length or min message buffer timeout"
@test_config_file = "test_store_messages_cannot_be_set_without_set_max_message_buffer_length_or_min_message_buffer_timeout.conf"
@store_messages = 'on'
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_broadcast_channel_max_qtd_cannot_be_zero
expected_error_message = "push_stream_broadcast_channel_max_qtd cannot be zero"
@test_config_file = "test_broadcast_channel_max_qtd_cannot_be_zero.conf"
@broadcast_channel_max_qtd = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_broadcast_channel_max_qtd_cannot_be_set_without_broadcast_channel_prefix
expected_error_message = "cannot set broadcast channel max qtd if push_stream_broadcast_channel_prefix is not set or blank"
@test_config_file = "test_broadcast_channel_max_qtd_cannot_be_set_without_broadcast_channel_prefix.conf"
@broadcast_channel_max_qtd = 1
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_broadcast_channel_max_qtd_cannot_be_set_without_broadcast_channel_prefix
expected_error_message = "cannot set broadcast channel max qtd if push_stream_broadcast_channel_prefix is not set or blank"
@test_config_file = "test_broadcast_channel_max_qtd_cannot_be_set_without_broadcast_channel_prefix.conf"
@broadcast_channel_max_qtd = 1
@broadcast_channel_prefix = ""
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_broadcast_channel_prefix_cannot_be_set_without_broadcast_channel_max_qtd
expected_error_message = "cannot set broadcast channel prefix if push_stream_broadcast_channel_max_qtd is not set"
@test_config_file = "test_broadcast_channel_prefix_cannot_be_set_without_broadcast_channel_max_qtd.conf"
@broadcast_channel_prefix = "broad_"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment