Commit 1a79b025 authored by Wandenberg's avatar Wandenberg

fix socket leak on reload process for sockets used in inter process communication and connections

parent 3d3a2041
...@@ -59,6 +59,7 @@ describe "Send Signals" do ...@@ -59,6 +59,7 @@ describe "Send Signals" do
body = 'body' body = 'body'
response = response2 = '' response = response2 = ''
pid = pid2 = 0 pid = pid2 = 0
open_sockets_1 = 0
nginx_run_server(config, :timeout => 60) do |conf| nginx_run_server(config, :timeout => 60) do |conf|
EventMachine.run do EventMachine.run do
...@@ -77,6 +78,11 @@ describe "Send Signals" do ...@@ -77,6 +78,11 @@ describe "Send Signals" do
resp_1["by_worker"].count.should eql(1) resp_1["by_worker"].count.should eql(1)
pid = resp_1["by_worker"][0]['pid'].to_i pid = resp_1["by_worker"][0]['pid'].to_i
open_sockets_1 = `lsof -p #{Process.getpgid pid} | grep socket | wc -l`.strip
socket = open_socket(nginx_host, nginx_port)
socket.print "GET /sub/#{channel} HTTP/1.1\r\nHost: test\r\nX-Nginx-PushStream-Mode: long-polling\r\n\r\n"
# send reload signal # send reload signal
`#{ nginx_executable } -c #{ conf.configuration_filename } -s reload > /dev/null 2>&1` `#{ nginx_executable } -c #{ conf.configuration_filename } -s reload > /dev/null 2>&1`
end end
...@@ -84,7 +90,7 @@ describe "Send Signals" do ...@@ -84,7 +90,7 @@ describe "Send Signals" do
end end
# check if first worker die # check if first worker die
EM.add_periodic_timer(0.5) do timer = EM.add_periodic_timer(0.5) do
# check statistics again # check statistics again
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
...@@ -92,7 +98,10 @@ describe "Send Signals" do ...@@ -92,7 +98,10 @@ describe "Send Signals" do
resp_3 = JSON.parse(pub_4.response) resp_3 = JSON.parse(pub_4.response)
resp_3.has_key?("by_worker").should be_true resp_3.has_key?("by_worker").should be_true
if (resp_3["by_worker"].count == 1) && (pid != resp_3["by_worker"][0]['pid'].to_i) old_process_running = Process.getpgid(pid) rescue false
if !old_process_running && (resp_3["by_worker"].count == 1) && (pid != resp_3["by_worker"][0]['pid'].to_i)
timer.cancel
# publish a message # publish a message
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body
pub_2.callback do pub_2.callback do
...@@ -111,7 +120,15 @@ describe "Send Signals" do ...@@ -111,7 +120,15 @@ describe "Send Signals" do
resp_2["published_messages"].to_i.should eql(1) resp_2["published_messages"].to_i.should eql(1)
resp_2["subscribers"].to_i.should eql(1) resp_2["subscribers"].to_i.should eql(1)
open_sockets_2 = `lsof -p #{Process.getpgid resp_3["by_worker"][0]['pid'].to_i} | grep socket | wc -l`.strip
open_sockets_2.should eql(open_sockets_1)
EventMachine.stop EventMachine.stop
# send stop signal
`#{ nginx_executable } -c #{ conf.configuration_filename } -s stop > /dev/null 2>&1`
error_log = File.read(conf.error_log)
error_log.should_not include("open socket")
end end
end end
end end
......
...@@ -57,7 +57,7 @@ ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers) ...@@ -57,7 +57,7 @@ ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers)
*/ */
for(i=0; i<workers; i++) { for(i=0; i<workers; i++) {
while (s < last_expected_process && ngx_processes[s].pid != -1) { while (s < last_expected_process && ngx_processes[s].pid != NGX_INVALID_FILE) {
// find empty existing slot // find empty existing slot
s++; s++;
} }
...@@ -188,6 +188,9 @@ ngx_http_push_stream_alert_shutting_down_workers(void) ...@@ -188,6 +188,9 @@ ngx_http_push_stream_alert_shutting_down_workers(void)
for(i = 0; i < NGX_MAX_PROCESSES; i++) { for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (global_data->ipc[i].pid > 0) { if (global_data->ipc[i].pid > 0) {
ngx_http_push_stream_alert_worker_shutting_down_cleanup(global_data->ipc[i].pid, i, ngx_cycle->log); ngx_http_push_stream_alert_worker_shutting_down_cleanup(global_data->ipc[i].pid, i, ngx_cycle->log);
ngx_close_channel((ngx_socket_t *) ngx_http_push_stream_socketpairs[i], ngx_cycle->log);
ngx_http_push_stream_socketpairs[i][0] = NGX_INVALID_FILE;
ngx_http_push_stream_socketpairs[i][1] = NGX_INVALID_FILE;
} }
} }
} }
...@@ -236,7 +239,7 @@ ngx_http_push_stream_clean_worker_data(ngx_http_push_stream_shm_data_t *data) ...@@ -236,7 +239,7 @@ ngx_http_push_stream_clean_worker_data(ngx_http_push_stream_shm_data_t *data)
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
data->ipc[ngx_process_slot].pid = -1; data->ipc[ngx_process_slot].pid = NGX_INVALID_FILE;
data->ipc[ngx_process_slot].subscribers = 0; data->ipc[ngx_process_slot].subscribers = 0;
} }
...@@ -302,7 +305,10 @@ ngx_http_push_stream_channel_handler(ngx_event_t *ev) ...@@ -302,7 +305,10 @@ ngx_http_push_stream_channel_handler(ngx_event_t *ev)
static ngx_int_t static ngx_int_t
ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command) ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command)
{ {
return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &command, sizeof(ngx_channel_t), log); if (ngx_http_push_stream_socketpairs[slot][0] != NGX_INVALID_FILE) {
return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &command, sizeof(ngx_channel_t), log);
}
return NGX_OK;
} }
......
...@@ -182,7 +182,6 @@ ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_ ...@@ -182,7 +182,6 @@ ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_
while (!ngx_queue_empty(&thisworker_data->subscribers_queue)) { while (!ngx_queue_empty(&thisworker_data->subscribers_queue)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(ngx_queue_head(&thisworker_data->subscribers_queue), ngx_http_push_stream_subscriber_t, worker_queue); ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(ngx_queue_head(&thisworker_data->subscribers_queue), ngx_http_push_stream_subscriber_t, worker_queue);
subscriber->request->keepalive = 0;
if (subscriber->longpolling) { if (subscriber->longpolling) {
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(subscriber->request); ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(subscriber->request);
} else { } else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment