Commit 4e7f828a authored by Wandenberg's avatar Wandenberg

fix lazy reload

parent a3bf3446
h1(#changelog). Changelog
* Fix lazy reload
* Normalize use of backtrack, last_event_id and if_modified_since/if_none_match values to get old messages on all subscriber modes
* Added push_stream_last_event_id directive to make possible pass the Last-Event-Id value without set header
* Changed push_stream_store_messages directive to make possible set it inside an if block
......
......@@ -43,6 +43,7 @@
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES = {49, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS = {50, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL = {51, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CLEANUP_SHUTTING_DOWN = {52, 0, 0, -1};
// worker processes of the world, unite.
ngx_socket_t ngx_http_push_stream_socketpairs[NGX_MAX_PROCESSES][2];
......@@ -55,6 +56,7 @@ static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int
#define ngx_http_push_stream_alert_worker_check_messages(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES)
#define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS)
#define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL)
#define ngx_http_push_stream_alert_worker_shutting_down_cleanup(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CLEANUP_SHUTTING_DOWN)
static ngx_int_t ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log);
......@@ -63,9 +65,12 @@ static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle)
static ngx_int_t ngx_http_push_stream_ipc_init_worker();
static void ngx_http_push_stream_clean_worker_data();
static void ngx_http_push_stream_channel_handler(ngx_event_t *ev);
static void ngx_http_push_stream_alert_shutting_down_workers(void);
static ngx_inline void ngx_http_push_stream_process_worker_message(void);
static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void);
static ngx_inline void ngx_http_push_stream_cleanup_shutting_down_worker(void);
static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_http_push_stream_msg_t *msg);
......
......@@ -37,7 +37,7 @@
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
#define NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL 5000 // 5 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 10; // 10 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_INTERVAL = 4000; // 2 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_INTERVAL = 4000; // 4 seconds
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TTL = 1800; // 30 minutes
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_CHANNEL_INACTIVITY_TIME = 30; // 30 seconds
......
......@@ -19,11 +19,41 @@ describe "Send Signals" do
:master_process => 'on',
:daemon => 'on',
:header_template => 'HEADER',
:footer_template => 'FOOTER',
:message_ttl => '60s',
:subscriber_connection_ttl => '65s'
}
end
it "should disconnect subscribers when receives TERM signal" do
channel = 'ch_test_send_term_signal'
body = 'body'
response = ''
nginx_run_server(config, :timeout => 5) do |conf|
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge('X-Nginx-PushStream-Mode' => 'long-polling')
sub_1.callback do
sub_1.should be_http_status(304).without_body
Time.parse(sub_1.response_header['LAST_MODIFIED'].to_s).utc.to_i.should be_in_the_interval(Time.now.utc.to_i-1, Time.now.utc.to_i)
sub_1.response_header['ETAG'].to_s.should eql("0")
end
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_2.stream do |chunk|
# send stop signal
`#{ nginx_executable } -c #{ conf.configuration_filename } -s stop > /dev/null 2>&1`
response += chunk
end
sub_2.callback do
response.should include("FOOTER")
EventMachine.stop
end
end
end
end
it "should reload normaly when receives HUP signal" do
channel = 'ch_test_send_hup_signal'
body = 'body'
......@@ -53,8 +83,6 @@ describe "Send Signals" do
end
end
conectted_after_reloaded = false
i = 0
# check if first worker die
EM.add_periodic_timer(0.5) do
......@@ -64,9 +92,7 @@ describe "Send Signals" do
resp_3 = JSON.parse(pub_4.response)
resp_3.has_key?("by_worker").should be_true
if resp_3["by_worker"].count == 2 && !conectted_after_reloaded
conectted_after_reloaded = true
if (resp_3["by_worker"].count == 1) && (pid != resp_3["by_worker"][0]['pid'].to_i)
# publish a message
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body
pub_2.callback do
......@@ -83,29 +109,65 @@ describe "Send Signals" do
resp_2.has_key?("channels").should be_true
resp_2["channels"].to_i.should eql(1)
resp_2["published_messages"].to_i.should eql(1)
resp_2["subscribers"].to_i.should eql(2)
resp_2["by_worker"].count.should eql(2)
resp_2["subscribers"].to_i.should eql(1)
EventMachine.stop
end
end
end
end
end
end
end
end
end
end
if resp_3["by_worker"].count == 1 && conectted_after_reloaded
resp_3["channels"].to_i.should eql(1)
resp_3["published_messages"].to_i.should eql(1)
resp_3["subscribers"].to_i.should eql(1)
resp_3["by_worker"].count.should eql(1)
pid2 = resp_3["by_worker"][0]['pid'].to_i
shared_examples_for "reload server" do
it "should reload fast" do
channel = 'ch_test_send_hup_signal'
pid = pid2 = 0
pid.should_not eql(pid2)
EventMachine.stop
end
nginx_run_server(config.merge(custom_config), :timeout => 5) do |conf|
EventMachine.run do
# create subscriber
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.stream do |chunk|
end
i = i + 1
if i == 120
fail("Worker didn't die in 60 seconds")
EventMachine.stop
EM.add_timer(1) do
# check statistics
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_1.callback do
pub_1.should be_http_status(200).with_body
resp_1 = JSON.parse(pub_1.response)
resp_1["subscribers"].to_i.should eql(1)
resp_1["channels"].to_i.should eql(1)
resp_1["by_worker"].count.should eql(1)
pid = resp_1["by_worker"][0]['pid'].to_i
# send reload signal
`#{ nginx_executable } -c #{ conf.configuration_filename } -s reload > /dev/null 2>&1`
# check if first worker die
EM.add_periodic_timer(1) do
# check statistics
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_4.callback do
resp_3 = JSON.parse(pub_4.response)
resp_3.has_key?("by_worker").should be_true
if resp_3["by_worker"].count == 1
resp_3["subscribers"].to_i.should eql(0)
resp_3["channels"].to_i.should eql(1)
pid2 = resp_3["by_worker"][0]['pid'].to_i
pid.should_not eql(pid2)
EventMachine.stop
end
end
end
end
end
end
......@@ -113,6 +175,28 @@ describe "Send Signals" do
end
end
context "with a big ping message interval" do
let(:custom_config) do
{
:ping_message_interval => "10m",
:subscriber_connection_ttl => '10s'
}
end
it_should_behave_like "reload server"
end
context "with a big subscriber connection ttl" do
let(:custom_config) do
{
:ping_message_interval => "1s",
:subscriber_connection_ttl => '10m'
}
end
it_should_behave_like "reload server"
end
it "should ignore changes on shared memory size when doing a reload" do
channel = 'ch_test_reload_with_different_shared_memory_size'
body = 'body'
......
......@@ -149,6 +149,20 @@ ngx_http_push_stream_ipc_init_worker()
}
static void
ngx_http_push_stream_alert_shutting_down_workers(void)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
int i;
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (data->ipc[i].pid > 0) {
ngx_http_push_stream_alert_worker_shutting_down_cleanup(ngx_pid, i, ngx_cycle->log);
}
}
}
static ngx_int_t
ngx_http_push_stream_unsubscribe_worker_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
......@@ -244,6 +258,8 @@ ngx_http_push_stream_channel_handler(ngx_event_t *ev)
ngx_http_push_stream_census_worker_subscribers();
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL.command) {
ngx_http_push_stream_delete_worker_channel();
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CLEANUP_SHUTTING_DOWN.command) {
ngx_http_push_stream_cleanup_shutting_down_worker();
}
}
}
......
......@@ -273,7 +273,11 @@ ngx_http_push_stream_init_module(ngx_cycle_t *cycle)
}
// initialize our little IPC
return ngx_http_push_stream_init_ipc(cycle, ccf->worker_processes);
ngx_int_t rc;
if ((rc = ngx_http_push_stream_init_ipc(cycle, ccf->worker_processes)) == NGX_OK) {
ngx_http_push_stream_alert_shutting_down_workers();
}
return rc;
}
......@@ -330,15 +334,9 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
return;
}
ngx_http_push_stream_clean_worker_data();
ngx_http_push_stream_cleanup_shutting_down_worker();
if (ngx_http_push_stream_memory_cleanup_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_memory_cleanup_event);
}
if (ngx_http_push_stream_buffer_cleanup_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_buffer_cleanup_event);
}
ngx_http_push_stream_clean_worker_data();
ngx_http_push_stream_ipc_exit_worker(cycle);
}
......
......@@ -146,6 +146,31 @@ ngx_http_push_stream_delete_worker_channel(void)
ngx_http_push_stream_delete_channels(data, shpool);
}
static ngx_inline void
ngx_http_push_stream_cleanup_shutting_down_worker(void)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *workers_data = data->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
while (!ngx_queue_empty(&thisworker_data->subscribers_queue)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(ngx_queue_head(&thisworker_data->subscribers_queue), ngx_http_push_stream_subscriber_t, worker_queue);
if (subscriber->longpolling) {
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(subscriber->request);
} else {
ngx_http_push_stream_send_response_finalize(subscriber->request);
}
}
if (ngx_http_push_stream_memory_cleanup_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_memory_cleanup_event);
}
if (ngx_http_push_stream_buffer_cleanup_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_buffer_cleanup_event);
}
}
ngx_uint_t
ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_message, ngx_str_t *text, const ngx_str_t *template, const ngx_str_t *token, ngx_slab_pool_t *shpool, ngx_pool_t *temp_pool)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment