Commit 3bd35e64 authored by Wandenberg's avatar Wandenberg

fix stuck connections when sending only header responses

parent 51356e56
...@@ -233,6 +233,7 @@ static void ngx_http_push_stream_get_last_received_message_value ...@@ -233,6 +233,7 @@ static void ngx_http_push_stream_get_last_received_message_value
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value); static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name); static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message); static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static ngx_int_t ngx_http_push_stream_send_only_header_response_and_finalize(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static ngx_str_t * ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, const ngx_str_t *replace, off_t offset, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, const ngx_str_t *replace, off_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *opcode, off_t opcode_len, const u_char *text, off_t text_len, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *opcode, off_t opcode_len, const u_char *text, off_t text_len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
......
...@@ -13,6 +13,7 @@ group :test do ...@@ -13,6 +13,7 @@ group :test do
gem 'rb-inotify', '~> 0.8.8' gem 'rb-inotify', '~> 0.8.8'
gem 'json', '~> 1.7.6' gem 'json', '~> 1.7.6'
gem 'thin', '~> 1.5.1' gem 'thin', '~> 1.5.1'
gem 'net-http-persistent', '~> 2.9', :require => 'net/http/persistent'
gem 'debugger', '~> 1.3.1' gem 'debugger', '~> 1.3.1'
end end
......
...@@ -45,6 +45,7 @@ GEM ...@@ -45,6 +45,7 @@ GEM
libv8 (3.11.8.13) libv8 (3.11.8.13)
listen (0.7.2) listen (0.7.2)
multi_json (1.5.0) multi_json (1.5.0)
net-http-persistent (2.9.4)
nginx_test_helper (0.1.0) nginx_test_helper (0.1.0)
popen4 popen4
nokogiri (1.5.6) nokogiri (1.5.6)
...@@ -92,6 +93,7 @@ DEPENDENCIES ...@@ -92,6 +93,7 @@ DEPENDENCIES
jshintrb (~> 0.2.1) jshintrb (~> 0.2.1)
json (~> 1.7.6) json (~> 1.7.6)
listen (~> 0.7.2) listen (~> 0.7.2)
net-http-persistent (~> 2.9)
nginx_test_helper (~> 0.1.0) nginx_test_helper (~> 0.1.0)
nokogiri (~> 1.5.6) nokogiri (~> 1.5.6)
rake (~> 10.0.3) rake (~> 10.0.3)
......
...@@ -7,7 +7,8 @@ describe "Keepalive" do ...@@ -7,7 +7,8 @@ describe "Keepalive" do
:keepalive_requests => 500, :keepalive_requests => 500,
:header_template => '', :header_template => '',
:message_template => '~text~', :message_template => '~text~',
:footer_template => '' :footer_template => '',
:publisher_mode => 'admin'
} }
end end
...@@ -17,13 +18,46 @@ describe "Keepalive" do ...@@ -17,13 +18,46 @@ describe "Keepalive" do
channels_to_be_created = 4000 channels_to_be_created = 4000
nginx_run_server(config, :timeout => 25) do |conf| nginx_run_server(config, :timeout => 25) do |conf|
http_single = Net::HTTP::Persistent.new "single_channel"
http_double = Net::HTTP::Persistent.new "double_channel"
uri = URI.parse nginx_address
0.step(channels_to_be_created - 1, 500) do |i| 0.step(channels_to_be_created - 1, 500) do |i|
socket = open_socket(nginx_host, nginx_port)
1.upto(500) do |j| 1.upto(500) do |j|
headers, body = post_in_socket("/pub?id=#{channel}#{i + j}", body, socket, {:wait_for => "}\r\n"}) post_single = Net::HTTP::Post.new "/pub?id=#{channel}#{i + j}"
headers.should include("HTTP/1.1 200 OK") post_single.body = body
response_single = http_single.request(uri, post_single)
response_single.code.should eql("200")
response_single.body.should eql(%({"channel": "#{channel}#{i + j}", "published_messages": "1", "stored_messages": "1", "subscribers": "0"}\r\n))
post_double = Net::HTTP::Post.new "/pub?id=#{channel}#{i + j}/#{channel}#{i}_#{j}"
post_double.body = body
response_double = http_double.request(uri, post_double)
response_double.code.should eql("200")
response_double.body.should match_the_pattern(/"hostname": "[^"]*", "time": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", "channels": "#{(i + j) * 2}", "wildcard_channels": "0", "uptime": "[0-9]*", "infos": \[\r\n/)
response_double.body.should match_the_pattern(/"channel": "#{channel}#{i + j}", "published_messages": "2", "stored_messages": "2", "subscribers": "0"},\r\n/)
response_double.body.should match_the_pattern(/"channel": "#{channel}#{i}_#{j}", "published_messages": "1", "stored_messages": "1", "subscribers": "0"}\r\n/)
end
end
end
end
it "should create many channels on the same socket without info on response" do
channel = 'ch_test_create_many_channels_'
body = 'channel started'
channels_to_be_created = 4000
nginx_run_server(config.merge({:channel_info_on_publish => "off"}), :timeout => 25) do |conf|
uri = URI.parse nginx_address
0.step(channels_to_be_created - 1, 500) do |i|
http = Net::HTTP::Persistent.new
1.upto(500) do |j|
post = Net::HTTP::Post.new "/pub?id=#{channel}#{i + j}"
post.body = body
response = http.request(uri, post)
response.code.should eql("200")
response.body.should eql("")
end end
socket.close
end end
end end
end end
...@@ -47,9 +81,30 @@ describe "Keepalive" do ...@@ -47,9 +81,30 @@ describe "Keepalive" do
body.should match_the_pattern(/"channels": "1", "wildcard_channels": "0", "published_messages": "1", "stored_messages": "1", "messages_in_trash": "0", "channels_in_trash": "0", "subscribers": "0", "uptime": "[0-9]*", "by_worker": \[\r\n/) body.should match_the_pattern(/"channels": "1", "wildcard_channels": "0", "published_messages": "1", "stored_messages": "1", "messages_in_trash": "0", "channels_in_trash": "0", "subscribers": "0", "uptime": "[0-9]*", "by_worker": \[\r\n/)
body.should match_the_pattern(/\{"pid": "[0-9]*", "subscribers": "0", "uptime": "[0-9]*"\}/) body.should match_the_pattern(/\{"pid": "[0-9]*", "subscribers": "0", "uptime": "[0-9]*"\}/)
socket.print("DELETE /pub?id=#{channel}_1 HTTP/1.1\r\nHost: test\r\n\r\n")
headers, body = read_response_on_socket(socket)
headers.should include("HTTP/1.1 404 Not Found")
headers, body = get_in_socket("/channels-stats?id=ALL", socket)
body.should match_the_pattern(/"hostname": "[^"]*", "time": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", "channels": "1", "wildcard_channels": "0", "uptime": "[0-9]*", "infos": \[\r\n/)
body.should match_the_pattern(/"channel": "#{channel}", "published_messages": "1", "stored_messages": "1", "subscribers": "0"}\r\n/)
headers, body = get_in_socket("/pub?id=#{channel}", socket) headers, body = get_in_socket("/pub?id=#{channel}", socket)
body.should eql("{\"channel\": \"#{channel}\", \"published_messages\": \"1\", \"stored_messages\": \"1\", \"subscribers\": \"0\"}\r\n") body.should eql("{\"channel\": \"#{channel}\", \"published_messages\": \"1\", \"stored_messages\": \"1\", \"subscribers\": \"0\"}\r\n")
headers, body = post_in_socket("/pub?id=#{channel}/broad_#{channel}", content, socket, {:wait_for => "}\r\n"})
body.should match_the_pattern(/"hostname": "[^"]*", "time": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", "channels": "1", "wildcard_channels": "1", "uptime": "[0-9]*", "infos": \[\r\n/)
body.should match_the_pattern(/"channel": "#{channel}", "published_messages": "2", "stored_messages": "2", "subscribers": "0"},\r\n/)
body.should match_the_pattern(/"channel": "broad_#{channel}", "published_messages": "1", "stored_messages": "1", "subscribers": "0"}\r\n/)
headers, body = get_in_socket("/channels-stats?id=#{channel}", socket)
body.should match_the_pattern(/{"channel": "#{channel}", "published_messages": "2", "stored_messages": "2", "subscribers": "0"}\r\n/)
socket.print("DELETE /pub?id=#{channel} HTTP/1.1\r\nHost: test\r\n\r\n")
headers, body = read_response_on_socket(socket)
headers.should include("X-Nginx-PushStream-Explain: Channel deleted.")
socket.close socket.close
end end
end end
......
...@@ -220,9 +220,9 @@ ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r) ...@@ -220,9 +220,9 @@ ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
} }
if (qtd_channels == 0) { if (qtd_channels == 0) {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_FOUND, NULL); ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_NOT_FOUND, NULL);
} else { } else {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED); ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
} }
} }
...@@ -241,7 +241,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -241,7 +241,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// check if body message wasn't empty // check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) { if (r->headers_in.content_length_n <= 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: Post request was sent with no message"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: Post request was sent with no message");
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE); ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE);
return; return;
} }
...@@ -268,11 +268,10 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -268,11 +268,10 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
if (cf->channel_info_on_publish) { if (cf->channel_info_on_publish) {
ngx_http_push_stream_send_response_channels_info_detailed(r, ctx->requested_channels); ngx_http_push_stream_send_response_channels_info_detailed(r, ctx->requested_channels);
ngx_http_finalize_request(r, NGX_OK);
} else { } else {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, NULL); ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_OK, NULL);
} }
ngx_http_finalize_request(r, NGX_OK);
return;
} }
static ngx_int_t static ngx_int_t
......
...@@ -412,7 +412,15 @@ ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t ...@@ -412,7 +412,15 @@ ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
return rc; return NGX_DONE;
}
static ngx_int_t
ngx_http_push_stream_send_only_header_response_and_finalize(ngx_http_request_t *r, ngx_int_t status_code, const ngx_str_t *explain_error_message)
{
ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message);
ngx_http_finalize_request(r, NGX_OK);
return NGX_DONE;
} }
static ngx_table_elt_t * static ngx_table_elt_t *
...@@ -420,7 +428,6 @@ ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t ...@@ -420,7 +428,6 @@ ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t
{ {
ngx_table_elt_t *h = ngx_list_push(&r->headers_out.headers); ngx_table_elt_t *h = ngx_list_push(&r->headers_out.headers);
if (h == NULL) { if (h == NULL) {
return NULL; return NULL;
} }
...@@ -688,7 +695,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r) ...@@ -688,7 +695,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
} }
} }
ngx_http_finalize_request(r, (rc == NGX_ERROR) ? NGX_DONE : NGX_OK); ngx_http_finalize_request(r, rc);
} }
static void static void
...@@ -717,8 +724,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_ ...@@ -717,8 +724,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_
ngx_http_push_stream_send_response_message(r, NULL, mcf->longpooling_timeout_msg, 1, 0); ngx_http_push_stream_send_response_message(r, NULL, mcf->longpooling_timeout_msg, 1, 0);
ngx_http_push_stream_send_response_finalize(r); ngx_http_push_stream_send_response_finalize(r);
} else { } else {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_MODIFIED, NULL); ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_NOT_MODIFIED, NULL);
ngx_http_finalize_request(r, NGX_DONE);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment