Commit 80f8ba54 authored by Wandenberg's avatar Wandenberg

remove global references to main module conf, ping and long polling timeout...

remove global references to main module conf, ping and long polling timeout messages, and shared memory zones
parent e871c8ca
......@@ -47,9 +47,12 @@ typedef struct {
ngx_flag_t websocket;
} ngx_http_push_stream_template_queue_t;
typedef struct ngx_http_push_stream_msg_s ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_shm_data_s ngx_http_push_stream_shm_data_t;
typedef struct ngx_http_push_stream_global_shm_data_s ngx_http_push_stream_global_shm_data_t;
typedef struct {
ngx_flag_t enabled;
size_t shm_size;
ngx_str_t channel_deleted_message_text;
time_t channel_inactivity_time;
ngx_str_t ping_message_text;
......@@ -64,6 +67,11 @@ typedef struct {
ngx_http_push_stream_template_queue_t msg_templates;
ngx_flag_t timeout_with_body;
ngx_regex_t *backtrack_parser_regex;
ngx_http_push_stream_msg_t *ping_msg;
ngx_http_push_stream_msg_t *longpooling_timeout_msg;
ngx_shm_zone_t *shm_zone;
ngx_slab_pool_t *shpool;
ngx_http_push_stream_shm_data_t*shm_data;
} ngx_http_push_stream_main_conf_t;
typedef struct {
......@@ -92,9 +100,10 @@ typedef struct {
// shared memory segment name
static ngx_str_t ngx_http_push_stream_shm_name = ngx_string("push_stream_module");
static ngx_str_t ngx_http_push_stream_global_shm_name = ngx_string("push_stream_module_global");
// message queue
typedef struct {
struct ngx_http_push_stream_msg_s {
ngx_queue_t queue; // this MUST be first
time_t expires;
time_t time;
......@@ -108,7 +117,8 @@ typedef struct {
ngx_str_t *event_type_message;
ngx_str_t *formatted_messages;
ngx_int_t workers_ref_count;
} ngx_http_push_stream_msg_t;
ngx_uint_t qtd_templates;
};
typedef struct ngx_http_push_stream_subscriber_s ngx_http_push_stream_subscriber_t;
......@@ -132,7 +142,6 @@ typedef struct {
ngx_uint_t subscribers;
ngx_queue_t workers_with_subscribers;
ngx_queue_t message_queue;
time_t last_activity_time;
time_t expires;
ngx_flag_t deleted;
ngx_flag_t wildcard;
......@@ -189,6 +198,7 @@ typedef struct {
ngx_pid_t pid;
ngx_http_push_stream_channel_t *channel; // ->shared memory
ngx_queue_t *subscriptions_sentinel; // ->a worker's local pool
ngx_http_push_stream_main_conf_t *mcf;
} ngx_http_push_stream_worker_msg_t;
typedef struct {
......@@ -200,7 +210,13 @@ typedef struct {
} ngx_http_push_stream_worker_data_t;
// shared memory
typedef struct {
struct ngx_http_push_stream_global_shm_data_s {
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup;
ngx_queue_t shm_datas_queue;
};
struct ngx_http_push_stream_shm_data_s {
ngx_rbtree_t tree;
ngx_uint_t channels; // # of channels being used
ngx_uint_t wildcard_channels; // # of wildcard channels being used
......@@ -217,12 +233,13 @@ typedef struct {
time_t startup;
time_t last_message_time;
ngx_int_t last_message_tag;
} ngx_http_push_stream_shm_data_t;
ngx_uint_t ngx_http_push_stream_shm_size;
ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL;
ngx_queue_t shm_data_queue;
ngx_http_push_stream_main_conf_t *mcf;
ngx_shm_zone_t *shm_zone;
ngx_slab_pool_t *shpool;
};
ngx_http_push_stream_main_conf_t *ngx_http_push_stream_module_main_conf = NULL;
ngx_shm_zone_t *ngx_http_push_stream_global_shm_zone = NULL;
ngx_str_t **ngx_http_push_stream_module_paddings_chunks = NULL;
......@@ -336,7 +353,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOWED_HEADERS = ngx_string("If-Mo
#define NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(val, fail, r, errormessage) \
if (val == fail) { \
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, errormessage); \
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, errormessage); \
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); \
return; \
}
......@@ -344,7 +361,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOWED_HEADERS = ngx_string("If-Mo
#define NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(val, fail, r, errormessage) \
if (val == fail) { \
ngx_shmtx_unlock(&(shpool)->mutex); \
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, errormessage); \
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, errormessage); \
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); \
return; \
}
......
......@@ -50,7 +50,7 @@ ngx_socket_t ngx_http_push_stream_socketpairs[NGX_MAX_PROCESSES][2];
static ngx_int_t ngx_http_push_stream_register_worker_message_handler(ngx_cycle_t *cycle);
static void ngx_http_push_stream_wildcard(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log);
static void ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command);
#define ngx_http_push_stream_alert_worker_check_messages(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES)
......@@ -58,12 +58,12 @@ static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int
#define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL)
#define ngx_http_push_stream_alert_worker_shutting_down_cleanup(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CLEANUP_SHUTTING_DOWN)
static ngx_int_t ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_ipc_init_worker();
static void ngx_http_push_stream_clean_worker_data();
static void ngx_http_push_stream_clean_worker_data(ngx_http_push_stream_shm_data_t *data);
static void ngx_http_push_stream_channel_handler(ngx_event_t *ev);
static void ngx_http_push_stream_alert_shutting_down_workers(void);
......
......@@ -34,7 +34,6 @@
#include <ngx_http_push_stream_module_subscriber.h>
#include <ngx_http_push_stream_module_websocket.h>
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
#define NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL 5000 // 5 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 10; // 10 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_INTERVAL = 4000; // 4 seconds
......@@ -65,6 +64,7 @@ static ngx_int_t ngx_http_push_stream_init_module(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_init_worker(ngx_cycle_t *cycle);
static void ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle);
static void ngx_http_push_stream_exit_master(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_preconfig(ngx_conf_t *cf);
static ngx_int_t ngx_http_push_stream_postconfig(ngx_conf_t *cf);
static void * ngx_http_push_stream_create_main_conf(ngx_conf_t *cf);
static char * ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent);
......@@ -72,7 +72,9 @@ static void * ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf);
static char * ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
// shared memory
static ngx_int_t ngx_http_push_stream_set_up_shm(ngx_conf_t *cf, size_t shm_size);
static ngx_int_t ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data);
char * ngx_http_push_stream_set_shm_size_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
ngx_int_t ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data);
ngx_int_t ngx_http_push_stream_init_global_shm_zone(ngx_shm_zone_t *shm_zone, void *data);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SETUP_H_ */
......@@ -225,11 +225,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN = ngx
ngx_event_t ngx_http_push_stream_memory_cleanup_event;
ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
ngx_http_push_stream_msg_t *ngx_http_push_stream_longpooling_timeout_msg = NULL;
// general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_main_conf_t *mcf, u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r);
static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_get_last_received_message_values(ngx_http_request_t *r, time_t *if_modified_since, ngx_int_t *tag, ngx_str_t **last_event_id);
......@@ -249,7 +246,6 @@ static void ngx_http_push_stream_send_response_finalize(ngx_http
static void ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_websocket_close_frame(ngx_http_request_t *r, ngx_uint_t http_status, const ngx_str_t *reason);
static ngx_int_t ngx_http_push_stream_memory_cleanup();
static ngx_int_t ngx_http_push_stream_buffer_cleanup();
ngx_chain_t * ngx_http_push_stream_get_buf(ngx_http_request_t *r);
static void ngx_http_push_stream_unescape_uri(ngx_str_t *value);
......@@ -272,14 +268,14 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in
static void ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg);
static ngx_flag_t ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg, ngx_http_push_stream_shm_data_t *data);
static ngx_flag_t ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_free_worker_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_shm_data_t *data, ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
static ngx_inline void ngx_http_push_stream_delete_worker_channel(void);
static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request_t *r, ngx_uint_t default_subtype);
......
......@@ -34,8 +34,8 @@
#ifndef NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_
#define NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_
static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf, ngx_http_push_stream_main_conf_t *mcf);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
static void ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
......
......@@ -437,16 +437,16 @@ describe "Cleanup Memory" do
def create_and_delete_channel(channel, body, headers, &block)
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :body => body, :head => headers
pub_1.callback do
if pub_1.response_header.status == 200
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).delete :head => headers
pub.callback do
if pub_1.response_header.status == 200
block.call((pub.response_header.status == 200) ? :success : :error)
end
else
block.call(:error)
end
end
end
end
def create_and_delete_channel_in_loop(channel, body, headers, &block)
create_and_delete_channel(channel, body, headers) do |status|
......
......@@ -16,8 +16,8 @@ describe "Measure Memory" do
message_estimate_size = 168
channel_estimate_size = 270
subscriber_estimate_size = 160
subscriber_estimate_system_size = 7000
subscriber_estimate_size = 154
subscriber_estimate_system_size = 6528
it "should check message size" do
channel = 'ch_test_message_size'
......
......@@ -101,4 +101,17 @@ describe "Setup Parameters" do
nginx_test_configuration({:padding_by_user_agent => "user_agent;10;0"}).should include("padding pattern not match the value user_agent;10;0")
nginx_test_configuration({:padding_by_user_agent => "user_agent,10,0:other_user_agent;20;0:another_user_agent,30,0"}).should include("error applying padding pattern to other_user_agent;20;0:another_user_agent,30,0")
end
it "should not accept an invalid shared memory size" do
nginx_test_configuration({:shared_memory_size => nil}).should include("push_stream_shared_memory_size must be set.")
end
it "should not accept a small shared memory size" do
nginx_test_configuration({:shared_memory_size => "100k"}).should include("The push_stream_shared_memory_size value must be at least")
end
it "should not accept an invalid channels path value" do
nginx_test_configuration({:channels_path => nil}).should include("push stream module: push_stream_channels_path must be set.")
nginx_test_configuration({:channels_path_for_pub => nil}).should include("push stream module: push_stream_channels_path must be set.")
end
end
......@@ -57,7 +57,8 @@ module NginxConfiguration
:channels_path_for_pub => '$arg_id',
:channels_path => '$1',
:extra_location => ''
:extra_location => '',
:extra_configuration => ''
}
end
......@@ -184,6 +185,8 @@ http {
<%= extra_location %>
}
}
<%= extra_configuration %>
)
end
end
......@@ -811,4 +811,100 @@ describe "Subscriber Properties" do
end
end
end
it "should accept a configuration with more than one http block" do
extra_config = {
:subscriber_connection_ttl => '1s',
:content_type => "text/html",
:extra_configuration => %(
http {
server {
listen #{nginx_port.to_i + 1};
location / {
return 200 "extra server configuration";
}
}
}
)
}
channel = 'ch_test_extra_http'
body = 'body'
actual_response = ''
nginx_run_server(config.merge(extra_config)) do |conf|
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.stream do |chunk|
actual_response += chunk
end
sub_1.callback do
sub_1.should be_http_status(200)
actual_response.should eql("HEADER\r\nTEMPLATE\r\n1234\r\n<script>p(1,'ch_test_extra_http','body');</script></body></html>")
req = EventMachine::HttpRequest.new("http://#{nginx_host}:#{nginx_port.to_i + 1}/").get
req.callback do
req.response.should eql("extra server configuration")
EventMachine.stop
end
end
publish_message_inline(channel, {}, body)
end
end
end
it "should accept a configuration with two shared memory zones without mix messages" do
extra_config = {
:subscriber_connection_ttl => '1s',
:content_type => "text/html",
:extra_configuration => %(
http {
push_stream_shared_memory_size 10m second;
push_stream_subscriber_connection_ttl 1s;
server {
listen #{nginx_port.to_i + 1};
location /pub {
push_stream_publisher;
push_stream_channels_path $arg_id;
}
location ~ /sub/(.*) {
push_stream_subscriber;
push_stream_channels_path $1;
}
}
}
)
}
channel = 'ch_test_extra_http'
body = 'body'
actual_response_1 = ''
actual_response_2 = ''
nginx_run_server(config.merge(extra_config)) do |conf|
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new("http://#{nginx_host}:#{nginx_port.to_i}/sub/" + channel.to_s).get
sub_1.stream do |chunk|
actual_response_1 += chunk
end
sub_2 = EventMachine::HttpRequest.new("http://#{nginx_host}:#{nginx_port.to_i + 1}/sub/" + channel.to_s).get
sub_2.stream do |chunk|
actual_response_2 += chunk
end
sub_2.callback do
sub_1.should be_http_status(200)
sub_2.should be_http_status(200)
actual_response_1.should eql("HEADER\r\nTEMPLATE\r\n1234\r\n<script>p(1,'ch_test_extra_http','body_1');</script></body></html>")
actual_response_2.should eql("body_2")
EventMachine.stop
end
EventMachine::HttpRequest.new("http://#{nginx_host}:#{nginx_port.to_i}/pub/?id=" + channel.to_s).post :body => "#{body}_1"
EventMachine::HttpRequest.new("http://#{nginx_host}:#{nginx_port.to_i + 1}/pub/?id=" + channel.to_s).post :body => "#{body}_2"
end
end
end
end
......@@ -62,7 +62,8 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
ngx_str_t *currenttime, *hostname, *format, *text;
u_char *subscribers_by_workers, *start;
int i, j, used_slots;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_worker_data_t *worker_data;
ngx_http_push_stream_content_subtype_t *subtype;
......@@ -114,7 +115,8 @@ ngx_http_push_stream_send_response_channels_info(ngx_http_request_t *r, ngx_queu
ngx_chain_t *chain, *first = NULL, *last = NULL;
ngx_str_t *currenttime, *hostname, *text, *header_response;
ngx_queue_t *cur, *next;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
const ngx_str_t *format;
......@@ -196,10 +198,12 @@ ngx_http_push_stream_send_response_channels_info(ngx_http_request_t *r, ngx_queu
}
static ngx_int_t
ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix) {
ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_queue_t queue_channel_info;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_queue_t *cur = &data->channels_queue;
ngx_http_push_stream_channel_t *channel;
......@@ -234,7 +238,8 @@ static ngx_int_t
ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channels) {
ngx_str_t *text;
ngx_queue_t queue_channel_info;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
ngx_http_push_stream_channel_info_t *channel_info;
ngx_http_push_stream_channel_t *channel = NULL;
......@@ -249,7 +254,7 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
requested_channel = ngx_queue_data(cur, ngx_http_push_stream_requested_channel_t, queue);
// search for a existing channel with this id
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log);
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
if ((channel != NULL) && ((channel_info = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_channel_info_t))) != NULL)) {
channel_info->id.data = channel->id.data;
channel_info->id.len = channel->id.len;
......@@ -283,7 +288,8 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket) {
ngx_http_push_stream_template_queue_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
ngx_http_push_stream_template_queue_t *sentinel = &mcf->msg_templates;
ngx_http_push_stream_template_queue_t *cur = sentinel;
ngx_str_t *aux = NULL;
......@@ -294,7 +300,7 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n
}
}
ngx_http_push_stream_module_main_conf->qtd_templates++;
mcf->qtd_templates++;
cur = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_queue_t));
aux = ngx_http_push_stream_create_str(cf->pool, template.len);
......@@ -305,8 +311,8 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n
cur->template = aux;
cur->eventsource = eventsource;
cur->websocket = websocket;
cur->index = ngx_http_push_stream_module_main_conf->qtd_templates;
cur->index = mcf->qtd_templates;
ngx_memcpy(cur->template->data, template.data, template.len);
ngx_queue_insert_tail(&ngx_http_push_stream_module_main_conf->msg_templates.queue, &cur->queue);
ngx_queue_insert_tail(&mcf->msg_templates.queue, &cur->queue);
return cur->index;
}
......@@ -33,6 +33,11 @@
#include <ngx_http_push_stream_module_ipc.h>
void ngx_http_push_stream_ipc_init_worker_data(ngx_http_push_stream_shm_data_t *data);
ngx_inline void ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_data_t *data);
ngx_inline void ngx_http_push_stream_process_worker_message_data(ngx_http_push_stream_shm_data_t *data);
static ngx_int_t
ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers)
{
......@@ -112,14 +117,46 @@ ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle)
static ngx_int_t
ngx_http_push_stream_ipc_init_worker()
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *global_shpool = (ngx_slab_pool_t *) ngx_http_push_stream_global_shm_zone->shm.addr;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
int i;
ngx_shmtx_lock(&global_shpool->mutex);
global_data->ipc[ngx_process_slot].pid = ngx_pid;
global_data->ipc[ngx_process_slot].startup = ngx_time();
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_ipc_init_worker_data(data);
}
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (global_data->ipc[i].pid > 0) {
global_data->ipc[i].subscribers = 0;
}
}
ngx_shmtx_unlock(&global_shpool->mutex);
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (global_data->ipc[i].pid > 0) {
ngx_http_push_stream_alert_worker_census_subscribers(global_data->ipc[i].pid, i, ngx_cycle->log);
}
}
return NGX_OK;
}
void
ngx_http_push_stream_ipc_init_worker_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_queue_t *cur = &data->channels_queue;
ngx_http_push_stream_channel_t *channel;
int i;
// cleanning old content if worker die and another one is set on same slot
ngx_http_push_stream_clean_worker_data();
ngx_http_push_stream_clean_worker_data(data);
ngx_shmtx_lock(&shpool->mutex);
......@@ -132,28 +169,25 @@ ngx_http_push_stream_ipc_init_worker()
channel->subscribers = 0;
}
ngx_shmtx_unlock(&shpool->mutex);
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (data->ipc[i].pid > 0) {
data->ipc[i].subscribers = 0;
ngx_http_push_stream_alert_worker_census_subscribers(ngx_pid, i, ngx_cycle->log);
}
}
return NGX_OK;
ngx_shmtx_unlock(&shpool->mutex);
}
static void
ngx_http_push_stream_alert_shutting_down_workers(void)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
int i;
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (data->ipc[i].pid > 0) {
ngx_http_push_stream_alert_worker_shutting_down_cleanup(ngx_pid, i, ngx_cycle->log);
if (global_data->ipc[i].pid > 0) {
ngx_http_push_stream_alert_worker_shutting_down_cleanup(global_data->ipc[i].pid, i, ngx_cycle->log);
}
}
}
......@@ -180,10 +214,9 @@ ngx_http_push_stream_unsubscribe_worker_locked(ngx_http_push_stream_channel_t *c
static void
ngx_http_push_stream_clean_worker_data()
ngx_http_push_stream_clean_worker_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = data->shpool;
ngx_queue_t *cur = &data->channels_queue;
ngx_http_push_stream_channel_t *channel;
......@@ -276,8 +309,22 @@ ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log,
static ngx_inline void
ngx_http_push_stream_census_worker_subscribers(void)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *global_shpool = (ngx_slab_pool_t *) ngx_http_push_stream_global_shm_zone->shm.addr;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_shmtx_lock(&global_shpool->mutex);
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_census_worker_subscribers_data(data);
}
ngx_shmtx_unlock(&global_shpool->mutex);
}
ngx_inline void
ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_worker_data_t *workers_data = data->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_queue_t *cur;
......@@ -303,12 +350,24 @@ ngx_http_push_stream_census_worker_subscribers(void)
static ngx_inline void
ngx_http_push_stream_process_worker_message(void)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_process_worker_message_data(data);
}
}
ngx_inline void
ngx_http_push_stream_process_worker_message_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_http_push_stream_worker_msg_t *worker_msg;
ngx_queue_t *cur;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
while ((cur = ngx_queue_head(&thisworker_data->messages_queue)) && (cur != NULL) && (cur != &thisworker_data->messages_queue)) {
......@@ -349,11 +408,10 @@ ngx_http_push_stream_process_worker_message(void)
static ngx_int_t
ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log)
ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + worker_slot;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_worker_data_t *thisworker_data = mcf->shm_data->ipc + worker_slot;
ngx_http_push_stream_worker_msg_t *newmessage;
if ((newmessage = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL) {
......@@ -366,6 +424,7 @@ ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *
newmessage->pid = pid;
newmessage->subscriptions_sentinel = subscriptions_sentinel;
newmessage->channel = channel;
newmessage->mcf = mcf;
*queue_was_empty = ngx_queue_empty(&thisworker_data->messages_queue);
ngx_queue_insert_tail(&thisworker_data->messages_queue, &newmessage->queue);
......@@ -374,20 +433,20 @@ ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *
static void
ngx_http_push_stream_wildcard(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log)
ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
// subscribers are queued up in a local pool. Queue heads, however, are located
// in shared memory, identified by pid.
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_flag_t queue_was_empty[NGX_MAX_PROCESSES];
ngx_shmtx_lock(&shpool->mutex);
cur_worker = &channel->workers_with_subscribers;
while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscriptions_queue, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log);
ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscriptions_queue, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log, mcf);
}
ngx_shmtx_unlock(&shpool->mutex);
......@@ -402,7 +461,7 @@ ngx_http_push_stream_wildcard(ngx_http_push_stream_channel_t *channel, ngx_http_
if ((msg->queue.prev == NULL) && (msg->queue.next == NULL)) {
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_mark_message_to_delete_locked(msg);
ngx_http_push_stream_mark_message_to_delete_locked(msg, mcf->shm_data);
ngx_shmtx_unlock(&shpool->mutex);
}
}
......
......@@ -32,6 +32,7 @@ static ngx_int_t
ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
{
ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_module_ctx_t *ctx;
......@@ -82,21 +83,21 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
}
// could not have a large size
if ((ngx_http_push_stream_module_main_conf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > ngx_http_push_stream_module_main_conf->max_channel_id_length)) {
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", cur->id->len);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
}
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
// create the channel if doesn't exist
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf);
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf, mcf);
if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate memory for new channel");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
}
if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: number of channels were exceeded");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: number of channels were exceeded");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE);
}
}
......@@ -163,7 +164,7 @@ ngx_http_push_stream_read_request_body_to_buffer(ngx_http_request_t *r)
if (chain->buf->in_file) {
n = ngx_read_file(chain->buf->file, buf->start, len, 0);
if (n == NGX_FILE_ERROR) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: cannot read file with request body");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: cannot read file with request body");
return NULL;
}
buf->last = buf->last + len;
......@@ -183,10 +184,11 @@ ngx_http_push_stream_read_request_body_to_buffer(ngx_http_request_t *r)
static void
ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL;
u_char *text = ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data;
size_t len = ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len;
u_char *text = mcf->channel_deleted_message_text.data;
size_t len = mcf->channel_deleted_message_text.len;
ngx_uint_t qtd_channels = 0;
ngx_http_push_stream_requested_channel_t *requested_channel;
......@@ -206,7 +208,7 @@ ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
while ((cur = ngx_queue_next(cur)) != &ctx->requested_channels->queue) {
requested_channel = ngx_queue_data(cur, ngx_http_push_stream_requested_channel_t, queue);
if (ngx_http_push_stream_delete_channel(requested_channel->id, text, len, r->pool)) {
if (ngx_http_push_stream_delete_channel(mcf, requested_channel->id, text, len, r->pool)) {
qtd_channels++;
}
}
......@@ -232,7 +234,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: Post request was sent with no message");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: Post request was sent with no message");
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE);
return;
}
......@@ -270,6 +272,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
static ngx_int_t
ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
char *pos = NULL;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
......@@ -296,7 +299,7 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
// could not have a large size
if ((ngx_http_push_stream_module_main_conf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > ngx_http_push_stream_module_main_conf->max_channel_id_length)) {
if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > mcf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", cur->id->len);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
}
......
......@@ -26,6 +26,7 @@
#include <ngx_http_push_stream_module_setup.h>
ngx_uint_t ngx_http_push_stream_padding_max_len = 0;
ngx_flag_t ngx_http_push_stream_enabled = 0;
static ngx_command_t ngx_http_push_stream_commands[] = {
{ ngx_string("push_stream_channels_statistics"),
......@@ -49,10 +50,10 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
/* Main directives*/
{ ngx_string("push_stream_shared_memory_size"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE12,
ngx_http_push_stream_set_shm_size_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, shm_size),
0,
NULL },
{ ngx_string("push_stream_channel_deleted_message_text"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
......@@ -235,7 +236,7 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
static ngx_http_module_t ngx_http_push_stream_module_ctx = {
NULL, /* preconfiguration */
ngx_http_push_stream_preconfig, /* preconfiguration */
ngx_http_push_stream_postconfig, /* postconfiguration */
ngx_http_push_stream_create_main_conf, /* create main configuration */
ngx_http_push_stream_init_main_conf, /* init main configuration */
......@@ -267,7 +268,7 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle)
{
ngx_core_conf_t *ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
if ((ngx_http_push_stream_module_main_conf == NULL) || !ngx_http_push_stream_module_main_conf->enabled) {
if (!ngx_http_push_stream_enabled) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "ngx_http_push_stream_module will not be used with this configuration.");
return NGX_OK;
}
......@@ -284,7 +285,7 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle)
static ngx_int_t
ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
{
if ((ngx_http_push_stream_module_main_conf == NULL) || !ngx_http_push_stream_module_main_conf->enabled) {
if (!ngx_http_push_stream_enabled) {
return NGX_OK;
}
......@@ -296,10 +297,6 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
return NGX_ERROR;
}
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
thisworker_data->pid = ngx_pid;
// turn on timer to cleanup memory of old messages and channels
ngx_http_push_stream_memory_cleanup_timer_set();
......@@ -310,15 +307,12 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
static void
ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
{
if ((ngx_http_push_stream_module_main_conf == NULL) || !ngx_http_push_stream_module_main_conf->enabled) {
if (!ngx_http_push_stream_enabled) {
return;
}
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
// destroy channel tree in shared memory
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, 1);
ngx_http_push_stream_collect_expired_messages_and_empty_channels(1);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(1);
}
......@@ -326,7 +320,7 @@ ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
static void
ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
{
if ((ngx_http_push_stream_module_main_conf == NULL) || !ngx_http_push_stream_module_main_conf->enabled) {
if (!ngx_http_push_stream_enabled) {
return;
}
......@@ -336,38 +330,31 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
ngx_http_push_stream_cleanup_shutting_down_worker();
ngx_http_push_stream_clean_worker_data();
ngx_http_push_stream_ipc_exit_worker(cycle);
}
static ngx_int_t
ngx_http_push_stream_postconfig(ngx_conf_t *cf)
ngx_http_push_stream_preconfig(ngx_conf_t *cf)
{
ngx_http_push_stream_main_conf_t *conf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
size_t shm_size;
size_t shm_size_limit = 32 * ngx_pagesize;
size_t size = ngx_align(2 * sizeof(ngx_http_push_stream_global_shm_data_t), ngx_pagesize);
ngx_shm_zone_t *shm_zone = ngx_shared_memory_add(cf, &ngx_http_push_stream_global_shm_name, size, &ngx_http_push_stream_module);
if (!conf->enabled) {
return NGX_OK;
if (shm_zone == NULL) {
return NGX_ERROR;
}
// initialize shared memory
shm_size = ngx_align(conf->shm_size, ngx_pagesize);
if (shm_size < shm_size_limit) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "The push_stream_shared_memory_size value must be at least %udKiB", shm_size_limit >> 10);
shm_size = shm_size_limit;
}
shm_zone->init = ngx_http_push_stream_init_global_shm_zone;
shm_zone->data = (void *) 1;
return NGX_OK;
}
if (ngx_http_push_stream_shm_size && ngx_http_push_stream_shm_size != shm_size) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Cannot change memory area size without restart, ignoring change");
} else {
ngx_http_push_stream_shm_size = shm_size;
}
ngx_conf_log_error(NGX_LOG_INFO, cf, 0, "Using %udKiB of shared memory for push stream module", shm_size >> 10);
if (ngx_http_push_stream_padding_max_len > 0) {
static ngx_int_t
ngx_http_push_stream_postconfig(ngx_conf_t *cf)
{
if ((ngx_http_push_stream_padding_max_len > 0) && (ngx_http_push_stream_module_paddings_chunks == NULL)) {
ngx_uint_t steps = ngx_http_push_stream_padding_max_len / 100;
if ((ngx_http_push_stream_module_paddings_chunks = ngx_palloc(cf->pool, sizeof(ngx_str_t) * (steps + 1))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to create padding messages");
......@@ -399,7 +386,7 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf)
}
}
return ngx_http_push_stream_set_up_shm(cf, ngx_http_push_stream_shm_size);
return NGX_OK;
}
......@@ -414,7 +401,6 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
}
mcf->enabled = 0;
mcf->shm_size = NGX_CONF_UNSET_SIZE;
mcf->channel_deleted_message_text.data = NULL;
mcf->channel_inactivity_time = NGX_CONF_UNSET;
mcf->ping_message_text.data = NULL;
......@@ -427,10 +413,10 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->max_messages_stored_per_channel = NGX_CONF_UNSET_UINT;
mcf->qtd_templates = 0;
mcf->timeout_with_body = NGX_CONF_UNSET;
mcf->ping_msg = NULL;
mcf->longpooling_timeout_msg = NULL;
ngx_queue_init(&mcf->msg_templates.queue);
ngx_http_push_stream_module_main_conf = mcf;
return mcf;
}
......@@ -445,7 +431,6 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
}
ngx_conf_init_value(conf->message_ttl, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TTL);
ngx_conf_init_size_value(conf->shm_size, NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE);
ngx_conf_init_value(conf->channel_inactivity_time, NGX_HTTP_PUSH_STREAM_DEFAULT_CHANNEL_INACTIVITY_TIME);
ngx_conf_merge_str_value(conf->channel_deleted_message_text, conf->channel_deleted_message_text, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT);
ngx_conf_merge_str_value(conf->ping_message_text, conf->ping_message_text, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT);
......@@ -453,39 +438,45 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
ngx_conf_init_value(conf->timeout_with_body, 0);
// sanity checks
// shm size should be set
if (conf->shm_zone == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_shared_memory_size must be set.");
return NGX_CONF_ERROR;
}
// max number of channels cannot be zero
if ((conf->max_number_of_channels != NGX_CONF_UNSET_UINT) && (conf->max_number_of_channels == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_number_of_channels cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_max_number_of_channels cannot be zero.");
return NGX_CONF_ERROR;
}
// max number of wildcard channels cannot be zero
if ((conf->max_number_of_wildcard_channels != NGX_CONF_UNSET_UINT) && (conf->max_number_of_wildcard_channels == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_number_of_wildcard_channels cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_max_number_of_wildcard_channels cannot be zero.");
return NGX_CONF_ERROR;
}
// message ttl cannot be zero
if ((conf->message_ttl != NGX_CONF_UNSET) && (conf->message_ttl == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_message_ttl cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_message_ttl cannot be zero.");
return NGX_CONF_ERROR;
}
// max subscriber per channel cannot be zero
if ((conf->max_subscribers_per_channel != NGX_CONF_UNSET_UINT) && (conf->max_subscribers_per_channel == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_subscribers_per_channel cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_max_subscribers_per_channel cannot be zero.");
return NGX_CONF_ERROR;
}
// max messages stored per channel cannot be zero
if ((conf->max_messages_stored_per_channel != NGX_CONF_UNSET_UINT) && (conf->max_messages_stored_per_channel == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_messages_stored_per_channel cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_max_messages_stored_per_channel cannot be zero.");
return NGX_CONF_ERROR;
}
// max channel id length cannot be zero
if ((conf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (conf->max_channel_id_length == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_channel_id_length cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_max_channel_id_length cannot be zero.");
return NGX_CONF_ERROR;
}
......@@ -493,7 +484,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
u_char errstr[NGX_MAX_CONF_ERRSTR];
if ((backtrack_parser = ngx_pcalloc(cf->pool, sizeof(ngx_regex_compile_t))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to compile backtrack parser");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push stream module: unable to allocate memory to compile backtrack parser");
return NGX_CONF_ERROR;
}
......@@ -552,6 +543,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
static char *
ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *prev = parent, *conf = child;
ngx_conf_merge_uint_value(conf->authorized_channels_only, prev->authorized_channels_only, 0);
......@@ -559,7 +551,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_str_value(conf->header_template, prev->header_template, NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE);
ngx_conf_merge_str_value(conf->message_template, prev->message_template, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE);
ngx_conf_merge_str_value(conf->footer_template, prev->footer_template, NGX_HTTP_PUSH_STREAM_DEFAULT_FOOTER_TEMPLATE);
ngx_conf_merge_uint_value(conf->wildcard_channel_max_qtd, prev->wildcard_channel_max_qtd, ngx_http_push_stream_module_main_conf->max_number_of_wildcard_channels);
ngx_conf_merge_uint_value(conf->wildcard_channel_max_qtd, prev->wildcard_channel_max_qtd, mcf->max_number_of_wildcard_channels);
ngx_conf_merge_msec_value(conf->ping_message_interval, prev->ping_message_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_msec_value(conf->subscriber_connection_ttl, prev->subscriber_connection_ttl, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_msec_value(conf->longpolling_connection_ttl, prev->longpolling_connection_ttl, conf->subscriber_connection_ttl);
......@@ -594,7 +586,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
}
if (conf->channels_path == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_channels_path must be set on statistics and publisher location");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_channels_path must be set.");
return NGX_CONF_ERROR;
}
......@@ -605,7 +597,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
if (conf->header_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_apply_template_to_each_line(&conf->header_template, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE, cf->pool);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_message_module failed to apply template to header message.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_message_module failed to apply template to header message.");
return NGX_CONF_ERROR;
}
conf->header_template.data = aux->data;
......@@ -637,7 +629,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
if (conf->footer_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_apply_template_to_each_line(&conf->footer_template, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE, cf->pool);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_message_module failed to apply template to footer message.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_message_module failed to apply template to footer message.");
return NGX_CONF_ERROR;
}
......@@ -671,43 +663,43 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// sanity checks
// ping message interval cannot be zero
if ((conf->ping_message_interval != NGX_CONF_UNSET_MSEC) && (conf->ping_message_interval == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_ping_message_interval cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_ping_message_interval cannot be zero.");
return NGX_CONF_ERROR;
}
// subscriber connection ttl cannot be zero
if ((conf->subscriber_connection_ttl != NGX_CONF_UNSET_MSEC) && (conf->subscriber_connection_ttl == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_connection_ttl cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_subscriber_connection_ttl cannot be zero.");
return NGX_CONF_ERROR;
}
// long polling connection ttl cannot be zero
if ((conf->longpolling_connection_ttl != NGX_CONF_UNSET_MSEC) && (conf->longpolling_connection_ttl == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_longpolling_connection_ttl cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_longpolling_connection_ttl cannot be zero.");
return NGX_CONF_ERROR;
}
// message template cannot be blank
if (conf->message_template.len == 0) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_message_template cannot be blank.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_message_template cannot be blank.");
return NGX_CONF_ERROR;
}
// wildcard channel max qtd cannot be zero
if ((conf->wildcard_channel_max_qtd != NGX_CONF_UNSET_UINT) && (conf->wildcard_channel_max_qtd == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_wildcard_channel_max_qtd cannot be zero.");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_wildcard_channel_max_qtd cannot be zero.");
return NGX_CONF_ERROR;
}
// wildcard channel max qtd cannot be set without a channel prefix
if ((conf->wildcard_channel_max_qtd != NGX_CONF_UNSET_UINT) && (conf->wildcard_channel_max_qtd > 0) && (ngx_http_push_stream_module_main_conf->wildcard_channel_prefix.len == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "cannot set wildcard channel max qtd if push_stream_wildcard_channel_prefix is not set or blank.");
if ((conf->wildcard_channel_max_qtd != NGX_CONF_UNSET_UINT) && (conf->wildcard_channel_max_qtd > 0) && (mcf->wildcard_channel_prefix.len == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: cannot set wildcard channel max qtd if push_stream_wildcard_channel_prefix is not set or blank.");
return NGX_CONF_ERROR;
}
// max number of wildcard channels cannot be smaller than value in wildcard channel max qtd
if ((ngx_http_push_stream_module_main_conf->max_number_of_wildcard_channels != NGX_CONF_UNSET_UINT) && (conf->wildcard_channel_max_qtd != NGX_CONF_UNSET_UINT) && (ngx_http_push_stream_module_main_conf->max_number_of_wildcard_channels < conf->wildcard_channel_max_qtd)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "max number of wildcard channels cannot be smaller than value in push_stream_wildcard_channel_max_qtd.");
if ((mcf->max_number_of_wildcard_channels != NGX_CONF_UNSET_UINT) && (conf->wildcard_channel_max_qtd != NGX_CONF_UNSET_UINT) && (mcf->max_number_of_wildcard_channels < conf->wildcard_channel_max_qtd)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: max number of wildcard channels cannot be smaller than value in push_stream_wildcard_channel_max_qtd.");
return NGX_CONF_ERROR;
}
......@@ -720,7 +712,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
if (conf->padding_by_user_agent.len > 0) {
if ((conf->paddings = ngx_http_push_stream_parse_paddings(cf, &conf->padding_by_user_agent)) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to parse paddings by user agent");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push stream module: unable to parse paddings by user agent");
return NGX_CONF_ERROR;
}
......@@ -740,9 +732,10 @@ static char *
ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *))
{
ngx_http_core_loc_conf_t *clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
ngx_http_push_stream_main_conf_t *psmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
ngx_http_push_stream_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
psmcf->enabled = 1;
ngx_http_push_stream_enabled = 1;
mcf->enabled = 1;
clcf->handler = handler;
clcf->if_modified_since = NGX_HTTP_IMS_OFF;
......@@ -780,7 +773,7 @@ ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
} else if ((value.len == NGX_HTTP_PUSH_STREAM_MODE_ADMIN.len) && (ngx_strncasecmp(value.data, NGX_HTTP_PUSH_STREAM_MODE_ADMIN.data, NGX_HTTP_PUSH_STREAM_MODE_ADMIN.len) == 0)) {
*field = NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN;
} else {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid push_stream_publisher mode value: %V, accepted values (%s, %s)", &value, NGX_HTTP_PUSH_STREAM_MODE_NORMAL.data, NGX_HTTP_PUSH_STREAM_MODE_ADMIN.data);
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: invalid push_stream_publisher mode value: %V, accepted values (%s, %s)", &value, NGX_HTTP_PUSH_STREAM_MODE_NORMAL.data, NGX_HTTP_PUSH_STREAM_MODE_ADMIN.data);
return NGX_CONF_ERROR;
}
}
......@@ -811,7 +804,7 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
} else if ((value.len == NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET.len) && (ngx_strncasecmp(value.data, NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET.data, NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET.len) == 0)) {
*field = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET;
} else {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid push_stream_subscriber mode value: %V, accepted values (%V, %V, %V, %V, %V)", &value, &NGX_HTTP_PUSH_STREAM_MODE_STREAMING, &NGX_HTTP_PUSH_STREAM_MODE_POLLING, &NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING, &NGX_HTTP_PUSH_STREAM_MODE_EVENTSOURCE, &NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET);
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: invalid push_stream_subscriber mode value: %V, accepted values (%V, %V, %V, %V, %V)", &value, &NGX_HTTP_PUSH_STREAM_MODE_STREAMING, &NGX_HTTP_PUSH_STREAM_MODE_POLLING, &NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING, &NGX_HTTP_PUSH_STREAM_MODE_EVENTSOURCE, &NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET);
return NGX_CONF_ERROR;
}
}
......@@ -825,7 +818,7 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
}
#else
rc = NGX_CONF_ERROR;
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: sha1 support is needed to use WebSocket");
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push stream module: sha1 support is needed to use WebSocket");
#endif
return rc;
}
......@@ -834,40 +827,124 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
// shared memory
static ngx_int_t
ngx_http_push_stream_set_up_shm(ngx_conf_t *cf, size_t shm_size)
char *
ngx_http_push_stream_set_shm_size_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_push_stream_shm_zone = ngx_shared_memory_add(cf, &ngx_http_push_stream_shm_name, shm_size, &ngx_http_push_stream_module);
ngx_http_push_stream_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
size_t shm_size;
size_t shm_size_limit = 32 * ngx_pagesize;
ngx_str_t *value;
ngx_str_t *name;
value = cf->args->elts;
if (ngx_http_push_stream_shm_zone == NULL) {
shm_size = ngx_align(ngx_parse_size(&value[1]), ngx_pagesize);
if (shm_size < shm_size_limit) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "The push_stream_shared_memory_size value must be at least %ulKiB", shm_size_limit >> 10);
return NGX_CONF_ERROR;
}
name = (cf->args->nelts > 2) ? &value[2] : &ngx_http_push_stream_shm_name;
if ((ngx_http_push_stream_global_shm_zone != NULL) && (ngx_http_push_stream_global_shm_zone->data != NULL)) {
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
if ((name->len == data->shm_zone->shm.name.len) &&
(ngx_strncmp(name->data, data->shm_zone->shm.name.data, name->len) == 0) &&
(data->shm_zone->shm.size != shm_size)) {
shm_size = data->shm_zone->shm.size;
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Cannot change memory area size without restart, ignoring change on zone: %V", name);
}
}
}
ngx_conf_log_error(NGX_LOG_INFO, cf, 0, "Using %udKiB of shared memory for push stream module on zone: %V", shm_size >> 10, name);
mcf->shm_zone = ngx_shared_memory_add(cf, name, shm_size, &ngx_http_push_stream_module);
if (mcf->shm_zone == NULL) {
return NGX_CONF_ERROR;
}
if (mcf->shm_zone->data) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "duplicate zone \"%V\"", name);
return NGX_CONF_ERROR;
}
mcf->shm_zone->init = ngx_http_push_stream_init_shm_zone;
mcf->shm_zone->data = mcf;
return NGX_CONF_OK;
}
// shared memory zone initializer
ngx_int_t
ngx_http_push_stream_init_global_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
ngx_http_push_stream_global_shm_data_t *d;
int i;
if (data) { /* zone already initialized */
shm_zone->data = data;
ngx_queue_init(&((ngx_http_push_stream_global_shm_data_t *) data)->shm_datas_queue);
ngx_http_push_stream_global_shm_zone = shm_zone;
return NGX_OK;
}
if ((d = (ngx_http_push_stream_global_shm_data_t *) ngx_slab_alloc(shpool, sizeof(*d))) == NULL) { //shm_data plus an array.
return NGX_ERROR;
}
shm_zone->data = d;
for (i = 0; i < NGX_MAX_PROCESSES; i++) {
d->ipc[i].pid = -1;
d->ipc[i].startup = 0;
d->ipc[i].subscribers = 0;
ngx_queue_init(&d->ipc[i].messages_queue);
ngx_queue_init(&d->ipc[i].subscribers_queue);
}
d->startup = ngx_time();
ngx_http_push_stream_shm_zone->init = ngx_http_push_stream_init_shm_zone;
ngx_http_push_stream_shm_zone->data = (void *) 1;
ngx_queue_init(&d->shm_datas_queue);
ngx_http_push_stream_global_shm_zone = shm_zone;
return NGX_OK;
}
// shared memory zone initializer
static ngx_int_t
ngx_int_t
ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
{
ngx_http_push_stream_global_shm_data_t *global_shm_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_http_push_stream_main_conf_t *mcf = shm_zone->data;
ngx_http_push_stream_shm_data_t *d;
int i;
mcf->shm_zone = shm_zone;
mcf->shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
if (data) { /* zone already initialized */
shm_zone->data = data;
d = (ngx_http_push_stream_shm_data_t *) data;
d->mcf = mcf;
d->shm_zone = shm_zone;
d->shpool = mcf->shpool;
mcf->shm_data = data;
ngx_queue_insert_tail(&global_shm_data->shm_datas_queue, &d->shm_data_queue);
return NGX_OK;
}
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
ngx_rbtree_node_t *sentinel;
ngx_http_push_stream_shm_data_t *d;
if ((d = (ngx_http_push_stream_shm_data_t *) ngx_slab_alloc(shpool, sizeof(*d))) == NULL) { //shm_data plus an array.
if ((d = (ngx_http_push_stream_shm_data_t *) ngx_slab_alloc(mcf->shpool, sizeof(*d))) == NULL) { //shm_data plus an array.
return NGX_ERROR;
}
d->mcf = mcf;
mcf->shm_data = d;
shm_zone->data = d;
for (i = 0; i < NGX_MAX_PROCESSES; i++) {
d->ipc[i].pid = -1;
......@@ -887,9 +964,11 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->startup = ngx_time();
d->last_message_time = 0;
d->last_message_tag = 0;
d->shm_zone = shm_zone;
d->shpool = mcf->shpool;
// initialize rbtree
if ((sentinel = ngx_slab_alloc(shpool, sizeof(*sentinel))) == NULL) {
if ((sentinel = ngx_slab_alloc(mcf->shpool, sizeof(*sentinel))) == NULL) {
return NGX_ERROR;
}
ngx_rbtree_init(&d->tree, sentinel, ngx_http_push_stream_rbtree_insert);
......@@ -899,15 +978,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
ngx_queue_init(&d->channels_to_delete);
ngx_queue_init(&d->channels_trash);
// create ping message
if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
return NGX_ERROR;
}
// create longpooling timeout message
if ((ngx_http_push_stream_longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked((u_char *)NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
return NGX_ERROR;
}
ngx_queue_insert_tail(&global_shm_data->shm_datas_queue, &d->shm_data_queue);
return NGX_OK;
}
......@@ -25,12 +25,12 @@
#include <ngx_http_push_stream_module_subscriber.h>
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool);
static ngx_http_push_stream_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool);
......@@ -40,8 +40,9 @@ void ngx_http_push_stream_websocket_
static ngx_int_t
ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_module_ctx_t *ctx;
......@@ -117,7 +118,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// sending response content header
if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: could not send content header to subscriber");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -132,7 +133,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// adding subscriber to channel(s) and send old messages
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
if (ngx_http_push_stream_subscriber_assign_channel(mcf, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
......@@ -147,8 +148,9 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
static ngx_int_t
ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_requested_channel_t *cur;
ngx_http_push_stream_subscriber_t *worker_subscriber;
......@@ -177,7 +179,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
// check if has any message to send
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log);
channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log, mcf);
if (channel == NULL) {
// channel not found
ngx_shmtx_unlock(&shpool->mutex);
......@@ -215,7 +217,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
// adding subscriber to channel(s)
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if ((channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log)) == NULL) {
if ((channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log, mcf)) == NULL) {
// channel not found
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data);
......@@ -253,7 +255,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
// sending response content header
if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: could not send content header to subscriber");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -264,7 +266,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log);
channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log, mcf);
if (channel == NULL) {
// channel not found
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data);
......@@ -287,13 +289,14 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
}
static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool)
ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription;
ngx_int_t result;
ngx_slab_pool_t *shpool = mcf->shpool;
if ((channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log)) == NULL) {
if ((channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf)) == NULL) {
// channel not found
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_ERROR;
......@@ -317,7 +320,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
static ngx_int_t
ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, ngx_int_t *status_code, ngx_str_t **explain_error_message)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_push_stream_module_main_conf;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_requested_channel_t *cur = channels_ids;
ngx_uint_t subscribed_channels_qtd = 0;
......@@ -349,15 +352,17 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
subscribed_wildcard_channels_qtd++;
}
channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log, mcf);
// check if channel exists when authorized_channels_only is on
if (cf->authorized_channels_only && !is_wildcard_channel && (((channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log)) == NULL) || (channel->stored_messages == 0))) {
if (cf->authorized_channels_only && !is_wildcard_channel && ((channel == NULL) || (channel->stored_messages == 0))) {
*status_code = NGX_HTTP_FORBIDDEN;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS;
return NGX_ERROR;
}
// check if channel is full of subscribers
if ((mcf->max_subscribers_per_channel != NGX_CONF_UNSET_UINT) && (((channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log)) != NULL) && (channel->subscribers >= mcf->max_subscribers_per_channel))) {
if ((mcf->max_subscribers_per_channel != NGX_CONF_UNSET_UINT) && ((channel != NULL) && (channel->subscribers >= mcf->max_subscribers_per_channel))) {
*status_code = NGX_HTTP_FORBIDDEN;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL;
return NGX_ERROR;
......@@ -375,16 +380,16 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
// create the channels in advance, if doesn't exist, to ensure max number of channels in the server
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf);
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf, mcf);
if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate memory for new channel");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
*status_code = NGX_HTTP_INTERNAL_SERVER_ERROR;
*explain_error_message = NULL;
return NGX_ERROR;
}
if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: number of channels were exceeded");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: number of channels were exceeded");
*status_code = NGX_HTTP_FORBIDDEN;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE;
return NGX_ERROR;
......@@ -436,9 +441,10 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
static ngx_int_t
ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
......@@ -580,13 +586,13 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
}
static ngx_http_push_stream_pid_queue_t *
ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log)
ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
ngx_http_push_stream_pid_queue_t *worker_sentinel;
ngx_http_push_stream_channel_t *channel;
// check if channel still exists
if ((channel = ngx_http_push_stream_find_channel(channel_id, log)) == NULL) {
if ((channel = ngx_http_push_stream_find_channel(channel_id, log, mcf)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NULL;
}
......@@ -625,12 +631,13 @@ ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http
static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(subscription->subscriber->request, ngx_http_push_stream_module);
ngx_queue_t *cur_worker;
ngx_http_push_stream_pid_queue_t *worker, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel;
// check if channel still exists
if ((channel = ngx_http_push_stream_find_channel(channel_id, log)) == NULL) {
if ((channel = ngx_http_push_stream_find_channel(channel_id, log, mcf)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NGX_ERROR;
}
......@@ -645,14 +652,14 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
}
if (worker_subscribers_sentinel == NULL) { // found nothing
worker_subscribers_sentinel = ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(shpool, channel_id, log);
worker_subscribers_sentinel = ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(shpool, channel_id, log, mcf);
if (worker_subscribers_sentinel == NULL) {
return NGX_ERROR;
}
}
channel->subscribers++; // do this only when we know everything went okay
channel->last_activity_time = ngx_time();
channel->expires = ngx_time() + mcf->channel_inactivity_time;
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriptions_queue, &subscription->channel_worker_queue);
return NGX_OK;
......
......@@ -29,11 +29,15 @@ static void nxg_http_push_stream_free_channel_memory_locked(ngx_slab_
static void ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler);
static void ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_padding(ngx_http_request_t *r, size_t len, ngx_flag_t sending_header);
void ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data);
void ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force);
void ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force);
ngx_inline void ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_data_t *data);
static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_shm_data_t *data, ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_msg_t *msg;
ngx_queue_t *cur;
......@@ -50,16 +54,28 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->stored_messages);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->stored_messages);
channel->last_activity_time = ngx_time();
ngx_queue_remove(&msg->queue);
ngx_http_push_stream_mark_message_to_delete_locked(msg);
ngx_http_push_stream_mark_message_to_delete_locked(msg, data);
}
}
static void
ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool)
ngx_http_push_stream_delete_channels()
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_delete_channels_data(data);
}
}
void
ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker, *cur;
......@@ -140,18 +156,29 @@ ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_
static ngx_inline void
ngx_http_push_stream_delete_worker_channel(void)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_delete_channels(data, shpool);
ngx_http_push_stream_delete_channels();
}
static ngx_inline void
ngx_http_push_stream_cleanup_shutting_down_worker(void)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *workers_data = data->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_cleanup_shutting_down_worker_data(data);
}
global_data->ipc[ngx_process_slot].pid = -1;
global_data->ipc[ngx_process_slot].subscribers = 0;
}
ngx_inline void
ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
while (!ngx_queue_empty(&thisworker_data->subscribers_queue)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(ngx_queue_head(&thisworker_data->subscribers_queue), ngx_http_push_stream_subscriber_t, worker_queue);
......@@ -170,6 +197,8 @@ ngx_http_push_stream_cleanup_shutting_down_worker(void)
if (ngx_http_push_stream_buffer_cleanup_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_buffer_cleanup_event);
}
ngx_http_push_stream_clean_worker_data(data);
}
ngx_uint_t
......@@ -203,11 +232,11 @@ ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_
}
ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_main_conf_t *mcf, u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *shm_data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_template_queue_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *shm_data = mcf->shm_data;
ngx_http_push_stream_template_queue_t *sentinel = &mcf->msg_templates;
ngx_http_push_stream_template_queue_t *cur = sentinel;
ngx_http_push_stream_msg_t *msg;
int i = 0;
......@@ -229,6 +258,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->workers_ref_count = 0;
msg->time = (id < 0) ? 0 : ngx_time();
msg->tag = (id < 0) ? 0 : ((msg->time == shm_data->last_message_time) ? (shm_data->last_message_tag + 1) : 1);
msg->qtd_templates = mcf->qtd_templates;
if ((msg->raw.data = ngx_slab_alloc_locked(shpool, len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
......@@ -251,7 +281,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
return NULL;
}
if ((msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t)*ngx_http_push_stream_module_main_conf->qtd_templates)) == NULL) {
if ((msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) * msg->qtd_templates)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
......@@ -309,27 +339,28 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
ngx_http_push_stream_channel_t *
ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_msg_t *msg;
ngx_shmtx_lock(&shpool->mutex);
// just find the channel. if it's not there, NULL and return error.
channel = ngx_http_push_stream_find_channel(id, r->connection->log);
channel = ngx_http_push_stream_find_channel(id, r->connection->log, mcf);
if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
return NULL;
}
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(text, len, channel, channel->last_message_id + 1, event_id, event_type, temp_pool);
msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, text, len, channel, channel->last_message_id + 1, event_id, event_type, temp_pool);
if (msg == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate message in shared memory");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate message in shared memory");
return NULL;
}
......@@ -340,8 +371,8 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
channel->last_message_time = data->last_message_time = msg->time;
channel->last_message_tag = data->last_message_tag = msg->tag;
// set message expiration time
msg->expires = msg->time + ngx_http_push_stream_module_main_conf->message_ttl;
channel->last_activity_time = ngx_time();
msg->expires = msg->time + mcf->message_ttl;
channel->expires = ngx_time() + mcf->channel_inactivity_time;
// put messages on the queue
if (cf->store_messages) {
......@@ -350,13 +381,13 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
data->stored_messages++;
// now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, ngx_http_push_stream_module_main_conf->max_messages_stored_per_channel, 0);
ngx_http_push_stream_ensure_qtd_of_messages_locked(data, channel, mcf->max_messages_stored_per_channel, 0);
}
ngx_shmtx_unlock(&shpool->mutex);
// send an alert to workers
ngx_http_push_stream_wildcard(channel, msg, r->connection->log);
ngx_http_push_stream_broadcast(channel, msg, r->connection->log, mcf);
// turn on timer to cleanup buffer of old messages
ngx_http_push_stream_buffer_cleanup_timer_set(cf);
......@@ -665,15 +696,27 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
static void
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context);
ngx_http_push_stream_add_polling_headers(r, ngx_time(), 0, r->pool);
if (ngx_http_push_stream_module_main_conf->timeout_with_body) {
if (mcf->timeout_with_body && (mcf->longpooling_timeout_msg == NULL)) {
// create longpooling timeout message
ngx_shmtx_lock(&shpool->mutex);
if ((mcf->longpooling_timeout_msg == NULL) && (mcf->longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, (u_char *)NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate long pooling timeout message in shared memory");
}
ngx_shmtx_unlock(&shpool->mutex);
}
if (mcf->timeout_with_body && (mcf->longpooling_timeout_msg != NULL)) {
ngx_http_send_header(r);
ngx_http_push_stream_send_response_content_header(r, ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module));
ngx_http_push_stream_send_response_message(r, NULL, ngx_http_push_stream_longpooling_timeout_msg, 1, 0);
ngx_http_push_stream_send_response_message(r, NULL, mcf->longpooling_timeout_msg, 1, 0);
ngx_http_push_stream_send_response_finalize(r);
} else {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_MODIFIED, NULL);
......@@ -698,17 +741,17 @@ ngx_http_push_stream_send_websocket_close_frame(ngx_http_request_t *r, ngx_uint_
}
static ngx_flag_t
ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool)
ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker;
ngx_shmtx_lock(&shpool->mutex);
channel = ngx_http_push_stream_find_channel(id, ngx_cycle->log);
channel = ngx_http_push_stream_find_channel(id, ngx_cycle->log, mcf);
if (channel != NULL) {
// remove channel from tree
channel->deleted = 1;
......@@ -722,10 +765,10 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx
// remove all messages
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, 0, 0);
ngx_http_push_stream_ensure_qtd_of_messages_locked(data, channel, 0, 0);
// apply channel deleted message text to message template
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(text, len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) {
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, text, len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to channel deleted message");
return 0;
......@@ -750,12 +793,26 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx
static void
ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force)
ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_flag_t force)
{
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(data, force);
}
}
void
ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *prev, *cur = &data->channels_queue;
ngx_http_push_stream_collect_expired_messages(data, shpool, force);
ngx_http_push_stream_collect_expired_messages_data(data, force);
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &data->channels_queue) && (prev = ngx_queue_prev(cur))) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
......@@ -764,8 +821,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
continue;
}
if ((channel->stored_messages == 0) && (channel->subscribers == 0) &&
(channel->last_activity_time + ngx_http_push_stream_module_main_conf->channel_inactivity_time < ngx_time())) {
if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->expires < ngx_time())) {
// go back one node on queue, since the current node will be removed
cur = prev;
ngx_shmtx_lock(&shpool->mutex);
......@@ -790,8 +846,9 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
static void
ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force)
ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *cur = &data->channels_queue;
......@@ -800,7 +857,7 @@ ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *d
while ((cur = ngx_queue_next(cur)) != &data->channels_queue) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1);
ngx_http_push_stream_ensure_qtd_of_messages_locked(data, channel, (force) ? 0 : channel->stored_messages, 1);
}
ngx_shmtx_unlock(&shpool->mutex);
......@@ -849,12 +906,17 @@ nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
static ngx_int_t
ngx_http_push_stream_memory_cleanup()
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_delete_channels(data, shpool);
ngx_http_push_stream_collect_expired_messages_and_empty_channels(data, shpool, 0);
ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(data, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
}
return NGX_OK;
}
......@@ -863,10 +925,13 @@ ngx_http_push_stream_memory_cleanup()
static ngx_int_t
ngx_http_push_stream_buffer_cleanup()
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
ngx_http_push_stream_collect_expired_messages(data, shpool, 0);
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_collect_expired_messages_data(data, 0);
}
return NGX_OK;
}
......@@ -875,8 +940,22 @@ ngx_http_push_stream_buffer_cleanup()
static ngx_int_t
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *cur = &global_data->shm_datas_queue;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(data, 0);
}
return NGX_OK;
}
void
ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_msg_t *message;
ngx_queue_t *cur;
......@@ -894,8 +973,6 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
}
ngx_http_push_stream_free_memory_of_expired_channels_locked(data, shpool, force);
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
......@@ -909,7 +986,7 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
}
if (msg->formatted_messages != NULL) {
for (i = 0; i < ngx_http_push_stream_module_main_conf->qtd_templates; i++) {
for (i = 0; i < msg->qtd_templates; i++) {
ngx_str_t *formmated = (msg->formatted_messages + i);
if ((formmated != NULL) && (formmated->data != NULL)) {
ngx_slab_free_locked(shpool, formmated->data);
......@@ -941,10 +1018,8 @@ ngx_http_push_stream_free_worker_message_memory_locked(ngx_slab_pool_t *shpool,
static void
ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg)
ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg, ngx_http_push_stream_shm_data_t *data)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
msg->deleted = 1;
msg->expires = ngx_time() + NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL;
ngx_queue_insert_tail(&data->messages_trash, &msg->queue);
......@@ -956,7 +1031,7 @@ static void
ngx_http_push_stream_timer_set(ngx_msec_t timer_interval, ngx_event_t *event, ngx_event_handler_pt event_handler, ngx_flag_t start_timer)
{
if ((timer_interval != NGX_CONF_UNSET_MSEC) && start_timer) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_global_shm_zone->shm.addr;
if (event->handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
......@@ -988,15 +1063,28 @@ static void
ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_request_t *r = (ngx_http_request_t *) ev->data;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_int_t rc;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_int_t rc = NGX_OK;
if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
} else if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
} else {
rc = ngx_http_push_stream_send_response_message(r, NULL, ngx_http_push_stream_ping_msg, 1, 1);
if (mcf->ping_msg == NULL) {
ngx_shmtx_lock(&shpool->mutex);
// create ping message
if ((mcf->ping_msg == NULL) && (mcf->ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, mcf->ping_message_text.data, mcf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate ping message in shared memory");
}
ngx_shmtx_unlock(&shpool->mutex);
}
if (mcf->ping_msg != NULL) {
rc = ngx_http_push_stream_send_response_message(r, NULL, mcf->ping_msg, 1, 1);
}
}
if (rc != NGX_OK) {
......@@ -1166,7 +1254,8 @@ ngx_http_push_stream_add_request_context(ngx_http_request_t *r)
static void
ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_shmtx_lock(&shpool->mutex);
......@@ -1196,14 +1285,14 @@ ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r)
static void
ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(worker_subscriber->request, ngx_http_push_stream_module);
ngx_http_push_stream_subscription_t *cur, *sentinel;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
sentinel = &worker_subscriber->subscriptions_sentinel;
while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers);
cur->channel->last_activity_time = ngx_time();
ngx_queue_remove(&cur->channel_worker_queue);
ngx_queue_remove(&cur->queue);
}
......
......@@ -34,8 +34,9 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: sha1 support is needed to use WebSocket");
return NGX_OK;
#endif
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_module_ctx_t *ctx;
......@@ -115,7 +116,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
// sending response content header
if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: could not send content header to subscriber");
return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
}
......@@ -130,7 +131,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
// adding subscriber to channel(s) and send backtrack messages
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
if (ngx_http_push_stream_subscriber_assign_channel(mcf, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
}
}
......
......@@ -73,9 +73,9 @@ ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbt
}
static void
ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
ngx_http_push_stream_initialize_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_channel_t *channel)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
channel->channel_deleted_message = NULL;
channel->last_message_id = 0;
......@@ -84,8 +84,7 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
channel->stored_messages = 0;
channel->subscribers = 0;
channel->deleted = 0;
channel->expires = 0;
channel->last_activity_time = ngx_time();
channel->expires = ngx_time() + mcf->channel_inactivity_time;
ngx_queue_init(&channel->message_queue);
......@@ -97,9 +96,9 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
}
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_channel_t *channel = NULL;
if (id == NULL) {
......@@ -118,15 +117,14 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
// find a channel by id. if channel not found, make one, insert it, and return that.
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf)
ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf, ngx_http_push_stream_main_conf_t *mcf)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_push_stream_module_main_conf;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_flag_t is_wildcard_channel = 0;
channel = ngx_http_push_stream_find_channel(id, log);
channel = ngx_http_push_stream_find_channel(id, log, mcf);
if (channel != NULL) { // we found our channel
return channel;
}
......@@ -134,7 +132,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_shmtx_lock(&shpool->mutex);
// check again to see if any other worker didn't create the channel
channel = ngx_http_push_stream_find_channel(id, log);
channel = ngx_http_push_stream_find_channel(id, log, mcf);
if (channel != NULL) { // we found our channel
ngx_shmtx_unlock(&shpool->mutex);
return channel;
......@@ -167,7 +165,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
channel->wildcard = is_wildcard_channel;
ngx_http_push_stream_initialize_channel(channel);
ngx_http_push_stream_initialize_channel(mcf, channel);
// initialize workers_with_subscribers queues only when a channel is created
ngx_queue_init(&channel->workers_with_subscribers);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment