Commit 2ea1ec09 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

refactoring the module: organizing code, improving treatment of memory and validations

parent aa417e95
/*
* ngx_http_push_stream_module.h
*
* Created on: Oct 26, 2010
* Authors: Wandenberg Peixoto <wandenberg@gmail.com> & Rogério Schneider <stockrt@gmail.com>
*/
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_H_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <nginx.h>
typedef struct {
size_t shm_size;
} ngx_http_push_stream_main_conf_t;
typedef struct {
ngx_int_t index_channel_id;
ngx_int_t index_channels_path;
time_t buffer_timeout;
ngx_uint_t max_messages;
ngx_uint_t authorized_channels_only;
ngx_uint_t store_messages;
ngx_uint_t max_channel_id_length;
ngx_str_t header_template;
ngx_str_t message_template;
ngx_str_t content_type;
ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_disconnect_interval;
time_t subscriber_connection_timeout;
ngx_str_t broadcast_channel_prefix;
ngx_uint_t broadcast_channel_max_qtd;
ngx_uint_t max_number_of_channels;
ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t memory_cleanup_interval;
time_t memory_cleanup_timeout;
} ngx_http_push_stream_loc_conf_t;
// shared memory segment name
static ngx_str_t ngx_push_stream_shm_name = ngx_string("push_stream_module");
// message queue
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_buf_t *buf;
time_t expires;
ngx_flag_t deleted;
} ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t;
// subscriber request queue
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_http_request_t *request;
} ngx_http_push_stream_subscriber_t;
typedef struct {
ngx_queue_t queue;
pid_t pid;
ngx_int_t slot;
ngx_http_push_stream_subscriber_t subscriber_sentinel;
} ngx_http_push_stream_pid_queue_t;
// our typecast-friendly rbtree node (channel)
typedef struct {
ngx_rbtree_node_t node; // this MUST be first
ngx_str_t id;
ngx_uint_t last_message_id;
ngx_uint_t stored_messages;
ngx_uint_t subscribers;
ngx_http_push_stream_pid_queue_t workers_with_subscribers;
ngx_http_push_stream_msg_t message_queue;
time_t expires;
ngx_flag_t deleted;
ngx_flag_t broadcast;
} ngx_http_push_stream_channel_t;
typedef struct {
ngx_queue_t queue;
ngx_str_t id;
ngx_uint_t published_messages;
ngx_uint_t stored_messages;
ngx_uint_t subscribers;
} ngx_http_push_stream_channel_info_t;
typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_subscriber_t *subscriber;
ngx_http_push_stream_channel_t *channel;
} ngx_http_push_stream_subscription_t;
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_http_request_t *request;
ngx_http_push_stream_subscription_t subscriptions_sentinel;
ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_pid_t worker_subscribed_pid;
time_t expires;
} ngx_http_push_stream_worker_subscriber_t;
// cleaning supplies
struct ngx_http_push_stream_subscriber_cleanup_s {
ngx_http_push_stream_worker_subscriber_t *worker_subscriber;
};
// messages to worker processes
typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_msg_t *msg; // ->shared memory
ngx_pid_t pid;
ngx_http_push_stream_channel_t *channel; // ->shared memory
ngx_http_push_stream_subscriber_t *subscriber_sentinel; // ->a worker's local pool
} ngx_http_push_stream_worker_msg_t;
typedef struct {
ngx_http_push_stream_worker_msg_t messages_queue;
ngx_http_push_stream_worker_subscriber_t worker_subscribers_sentinel;
} ngx_http_push_stream_worker_data_t;
// shared memory
typedef struct {
ngx_rbtree_t tree;
ngx_uint_t channels; // # of channels being used
ngx_uint_t broadcast_channels; // # of broadcast channels being used
ngx_uint_t published_messages; // # of published messagens in all channels
ngx_uint_t subscribers; // # of subscribers in all channels
ngx_http_push_stream_msg_t messages_to_delete;
ngx_rbtree_t channels_to_delete;
ngx_http_push_stream_worker_data_t *ipc; // interprocess stuff
} ngx_http_push_stream_shm_data_t;
ngx_int_t ngx_http_push_stream_worker_processes;
ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL;
// channel
static ngx_str_t * ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf);
static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID = ngx_string("ALL");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE = ngx_string("No channel id provided.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE = ngx_string("Channel id not authorized for this method.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE = ngx_string("Empty post requests are not allowed.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE = ngx_string("Channel id is too large.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_MUCH_BROADCAST_CHANNELS = ngx_string("Subscribed too much broadcast channels.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS = ngx_string("Subscriber could not create channels.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE = ngx_string("Number of channels were exceeded.");
#define NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID (void *) -1
#define NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID (void *) -2
#define NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED (void *) -3
static ngx_str_t NGX_HTTP_PUSH_STREAM_EMPTY = ngx_string("");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_BACKTRACK_SEP = ngx_string(".b");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_SLASH = ngx_string("/");
static const ngx_str_t NGX_PUSH_STREAM_DATE_FORMAT_ISO_8601 = ngx_string("%4d-%02d-%02dT%02d:%02d:%02d");
//// headers
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ALLOW = ngx_string("Allow");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EXPLAIN = ngx_string("X-Nginx-PushStream-Explain");
// other stuff
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOWED_METHODS = ngx_string("GET, POST");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET");
#define NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(val, fail, r, errormessage) \
if (val == fail) { \
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, errormessage); \
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); \
return; \
}
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_H_ */
/*
* ngx_http_push_stream_module_ipc.h
*
* Created on: Oct 26, 2010
* Authors: Wandenberg Peixoto <wandenberg@gmail.com> & Rogério Schneider <stockrt@gmail.com>
*/
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_
#include <ngx_http_push_stream_module.h>
#include <ngx_http_push_stream_module_subscriber.h>
#include <ngx_channel.h>
// constants
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES = {49, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_SEND_PING = {50, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS = {51, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS = {52, 0, 0, -1};
// worker processes of the world, unite.
ngx_socket_t ngx_http_push_stream_socketpairs[NGX_MAX_PROCESSES][2];
static void ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command);
#define ngx_http_push_stream_alert_worker_check_messages(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES);
#define ngx_http_push_stream_alert_worker_send_ping(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_SEND_PING);
#define ngx_http_push_stream_alert_worker_disconnect_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS);
#define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS);
static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_init_ipc_shm(ngx_int_t workers);
static void ngx_http_push_stream_channel_handler(ngx_event_t *ev);
static ngx_inline void ngx_http_push_stream_process_worker_message(void);
static ngx_inline void ngx_http_push_stream_send_worker_ping_message(void);
static ngx_inline void ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect);
static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void);
static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *sentinel, ngx_http_push_stream_msg_t *msg);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_ */
/*
* ngx_http_push_stream_module_publisher.h
*
* Created on: Oct 26, 2010
* Authors: Wandenberg Peixoto <wandenberg@gmail.com> & Rogério Schneider <stockrt@gmail.com>
*/
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_PUBLISHER_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_PUBLISHER_H_
#include <ngx_http_push_stream_module.h>
static ngx_int_t push_stream_channels_statistics_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_publisher_handler(ngx_http_request_t *r);
static void ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_PUBLISHER_H_ */
......@@ -12,7 +12,8 @@
#include <ngx_http_push_stream_module_publisher.h>
#include <ngx_http_push_stream_module_subscriber.h>
//#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT = 30; // 30 seconds
#define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE ""
......@@ -34,7 +35,7 @@ static char * ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t
static char * ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// setup
static char * ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *));
static char * ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *));
static ngx_int_t ngx_http_push_stream_init_module(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_init_worker(ngx_cycle_t *cycle);
static void ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle);
......@@ -44,6 +45,8 @@ static void * ngx_http_push_stream_create_main_conf(ngx_conf_t *cf);
static void * ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf);
static char * ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
static ngx_int_t ngx_http_push_stream_movezig_channel_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool);
// shared memory
static ngx_int_t ngx_http_push_stream_set_up_shm(ngx_conf_t *cf, size_t shm_size);
static ngx_int_t ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SETUP_H_ */
/*
* ngx_http_push_stream_module_subscriber.h
*
* Created on: Oct 26, 2010
* Authors: Wandenberg Peixoto <wandenberg@gmail.com> & Rogério Schneider <stockrt@gmail.com>
*/
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_str_t *id;
ngx_uint_t backtrack_messages;
} ngx_http_push_stream_requested_channel_t;
static ngx_int_t ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r);
ngx_http_push_stream_requested_channel_t * ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool);
static void ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data);
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_ */
This diff is collapsed.
/*
* ngx_http_push_stream_rbtree_util.h
*
* Created on: Oct 26, 2010
* Authors: Wandenberg Peixoto <wandenberg@gmail.com> & Rogério Schneider <stockrt@gmail.com>
*/
#ifndef NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_
#define NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_
static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log);
static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
static void ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
static int ngx_http_push_stream_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right);
#endif /* NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_ */
pid logs/nginx.pid;
error_log logs/nginx-main_error.log;
error_log logs/nginx-main_error.log debug;
# Development Mode
master_process off;
daemon off;
worker_processes 2;
events {
worker_connections 1024;
use epoll;
}
http {
......@@ -11,7 +15,7 @@ http {
default_type application/octet-stream;
access_log logs/nginx-http_access.log;
error_log logs/nginx-http_error.log;
error_log logs/nginx-http_error.log debug;
tcp_nopush on;
tcp_nodelay on;
......@@ -26,11 +30,20 @@ http {
client_body_buffer_size 1k;
ignore_invalid_headers on;
client_body_in_single_buffer on;
push_stream_max_reserved_memory 10m;
server {
listen 80;
server_name localhost;
location /channels_stats {
# activate channels statistics mode for this location
push_stream_channels_statistics;
# query string based channel id
set $push_stream_channel_id $arg_id;
}
location /pub {
# activate publisher mode for this location
push_stream_publisher;
......@@ -39,11 +52,15 @@ http {
set $push_stream_channel_id $arg_id;
# message template
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
# store messages
push_stream_store_messages on;
# max messages to store in memory
push_stream_max_message_buffer_length 20;
# message ttl
push_stream_min_message_buffer_timeout 5m;
push_stream_max_channel_id_length 200;
# client_max_body_size MUST be equal to client_body_buffer_size or
# you will be sorry.
client_max_body_size 32k;
......@@ -56,6 +73,7 @@ http {
# positional channel path
set $push_stream_channels_path $1;
push_stream_max_channel_id_length 200;
# header to be sent when receiving new subscriber connection
push_stream_header_template "<html><head><meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-store\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-cache\">\r\n<meta http-equiv=\"Pragma\" content=\"no-cache\">\r\n<meta http-equiv=\"Expires\" content=\"Thu, 1 Jan 1970 00:00:00 GMT\">\r\n<script type=\"text/javascript\">\r\nwindow.onError = null;\r\ndocument.domain = 'localhost';\r\nparent.PushStream.register(this);\r\n</script>\r\n</head>\r\n<body onload=\"try { parent.PushStream.reset(this) } catch (e) {}\">";
# message template
......@@ -71,6 +89,12 @@ http {
push_stream_subscriber_disconnect_interval 30s;
# connection ttl to enable recycle
push_stream_subscriber_connection_timeout 15m;
push_stream_broadcast_channel_prefix "broad_";
push_stream_broadcast_channel_max_qtd 3;
# solving some leakage problem with persitent connections in
# Nginx's chunked filter (ngx_http_chunked_filter_module.c)
chunked_transfer_encoding off;
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -122,6 +122,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, max_number_of_broadcast_channels),
NULL },
{ ngx_string("push_stream_memory_cleanup_timeout"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, memory_cleanup_timeout),
NULL },
ngx_null_command
};
......@@ -181,11 +187,12 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
static void
ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
// destroy channel tree in shared memory
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_movezig_channel_locked);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_collect_expired_messages_and_empty_channels(&data->tree, shpool, data->tree.root, 1, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(1);
}
......@@ -203,6 +210,10 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
ngx_del_timer(&ngx_http_push_stream_disconnect_event);
}
if (ngx_http_push_stream_memory_cleanup_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_memory_cleanup_event);
}
ngx_http_push_stream_ipc_exit_worker(cycle);
}
......@@ -273,6 +284,8 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->broadcast_channel_max_qtd = NGX_CONF_UNSET_UINT;
lcf->max_number_of_channels = NGX_CONF_UNSET_UINT;
lcf->max_number_of_broadcast_channels = NGX_CONF_UNSET_UINT;
lcf->memory_cleanup_interval = NGX_CONF_UNSET_MSEC;
lcf->memory_cleanup_timeout = NGX_CONF_UNSET;
return lcf;
}
......@@ -298,6 +311,9 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->broadcast_channel_max_qtd, prev->broadcast_channel_max_qtd, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->max_number_of_channels, prev->max_number_of_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->max_number_of_broadcast_channels, prev->max_number_of_broadcast_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->memory_cleanup_interval, prev->memory_cleanup_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_sec_value(conf->memory_cleanup_timeout, prev->memory_cleanup_timeout, NGX_CONF_UNSET);
// sanity checks
// ping message interval cannot be zero
......@@ -396,6 +412,12 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
return NGX_CONF_ERROR;
}
// memory cleanup timeout cannot't be small
if ((conf->memory_cleanup_timeout != NGX_CONF_UNSET) && (conf->memory_cleanup_timeout < NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "memory cleanup timeout cannot't be less than %d.", NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT);
return NGX_CONF_ERROR;
}
// append crlf to templates
if (conf->header_template.len > 0) {
conf->header_template.data = ngx_http_push_stream_append_crlf(&conf->header_template, cf->pool);
......@@ -407,6 +429,22 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->message_template.len = ngx_strlen(conf->message_template.data);
}
// calc memory cleanup interval
if (conf->buffer_timeout != NGX_CONF_UNSET) {
ngx_uint_t interval = conf->buffer_timeout / 3;
conf->memory_cleanup_interval = (interval > 1) ? interval * 1000 : 1000; // min 1 second
} else if (conf->memory_cleanup_interval == NGX_CONF_UNSET_MSEC) {
conf->memory_cleanup_interval = 1000; // 1 second
}
// create ping message
if ((conf->message_template.len > 0) && (ngx_http_push_stream_ping_buf == NULL)) {
if ((ngx_http_push_stream_ping_buf = ngx_http_push_stream_get_formatted_message(conf, NULL, NULL, cf->pool)) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to format ping message.");
return NGX_CONF_ERROR;
}
}
return NGX_CONF_OK;
}
......@@ -499,7 +537,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
ngx_rbtree_node_t *sentinel;
ngx_rbtree_node_t *sentinel, *remove_sentinel;
ngx_http_push_stream_shm_data_t *d;
if ((d = (ngx_http_push_stream_shm_data_t *) ngx_slab_alloc(shpool, sizeof(*d))) == NULL) { //shm_data plus an array.
......@@ -513,17 +551,10 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
ngx_rbtree_init(&d->tree, sentinel, ngx_http_push_stream_rbtree_insert);
return NGX_OK;
}
static ngx_int_t
ngx_http_push_stream_movezig_channel_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
ngx_http_push_stream_msg_t *sentinel = &channel->message_queue;
while (!ngx_queue_empty(&sentinel->queue)) {
ngx_http_push_stream_force_delete_message_locked(channel, (ngx_http_push_stream_msg_t *) ngx_queue_next(&sentinel->queue), shpool);
if ((remove_sentinel = ngx_slab_alloc(shpool, sizeof(*remove_sentinel))) == NULL) {
return NGX_ERROR;
}
ngx_rbtree_init(&d->channels_to_delete, remove_sentinel, ngx_http_push_stream_rbtree_insert);
return NGX_OK;
}
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment