Commit a48ca949 authored by Wandenberg's avatar Wandenberg

refactor on ngx_http_push_stream_output_filter to deal with NGX_AGAIN while writing the response

parent 312f330a
...@@ -15,6 +15,7 @@ group :test do ...@@ -15,6 +15,7 @@ group :test do
gem 'json', '~> 1.8.1' gem 'json', '~> 1.8.1'
gem 'thin', '~> 1.5.1' gem 'thin', '~> 1.5.1'
gem 'net-http-persistent', '~> 2.9', :require => 'net/http/persistent' gem 'net-http-persistent', '~> 2.9', :require => 'net/http/persistent'
gem 'websocket-eventmachine-client'
gem 'byebug', '~> 1.3.1' gem 'byebug', '~> 1.3.1'
end end
......
...@@ -80,6 +80,13 @@ GEM ...@@ -80,6 +80,13 @@ GEM
rack (>= 1.0.0) rack (>= 1.0.0)
trollop (2.0) trollop (2.0)
websocket (1.0.7) websocket (1.0.7)
websocket-eventmachine-base (1.1.0)
eventmachine (~> 1.0)
websocket (~> 1.0)
websocket-native (~> 1.0)
websocket-eventmachine-client (1.1.0)
websocket-eventmachine-base (~> 1.0)
websocket-native (1.0.0)
PLATFORMS PLATFORMS
ruby ruby
...@@ -103,3 +110,4 @@ DEPENDENCIES ...@@ -103,3 +110,4 @@ DEPENDENCIES
rspec (~> 2.14.1) rspec (~> 2.14.1)
therubyracer therubyracer
thin (~> 1.5.1) thin (~> 1.5.1)
websocket-eventmachine-client
...@@ -7,15 +7,20 @@ daemon off; ...@@ -7,15 +7,20 @@ daemon off;
worker_rlimit_core 2500M; worker_rlimit_core 2500M;
working_directory /tmp; working_directory /tmp;
debug_points abort; debug_points abort;
env MOCKEAGAIN_VERBOSE;
env MOCKEAGAIN_WRITE_TIMEOUT_PATTERN;
#env MOCKEAGAIN;
env LD_PRELOAD;
worker_processes 2; worker_processes 2;
events { events {
worker_connections 1024; worker_connections 1024;
use epoll; use poll;
} }
http { http {
postpone_output 1; # only postpone a single byte, default 1460 bytes
access_log logs/nginx-http_access.log; access_log logs/nginx-http_access.log;
push_stream_shared_memory_size 100m; push_stream_shared_memory_size 100m;
...@@ -25,7 +30,7 @@ http { ...@@ -25,7 +30,7 @@ http {
# message ttl # message ttl
push_stream_message_ttl 5m; push_stream_message_ttl 5m;
# ping frequency # ping frequency
push_stream_ping_message_interval 1s; push_stream_ping_message_interval 30s;
# connection ttl to enable recycle # connection ttl to enable recycle
push_stream_subscriber_connection_ttl 15m; push_stream_subscriber_connection_ttl 15m;
# connection ttl for long polling # connection ttl for long polling
......
...@@ -79,34 +79,58 @@ describe "Publisher Publishing Messages" do ...@@ -79,34 +79,58 @@ describe "Publisher Publishing Messages" do
it "should receive large messages" do it "should receive large messages" do
channel = 'ch_test_publish_large_messages' channel = 'ch_test_publish_large_messages'
body = "|123456789" * 102400 + "|" small_message = "^|" + ("0123456789" * 1020) + "|$"
response = '' large_message = "^|" + ("0123456789" * 419430) + "|$"
response_sub = ''
response_sub_1 = ''
nginx_run_server(config.merge(:client_max_body_size => '2000k', :client_body_buffer_size => '2000k', :subscriber_connection_ttl => '2s'), :timeout => 15) do |conf| nginx_run_server(config.merge(client_max_body_size: '5m', client_body_buffer_size: '1m', subscriber_connection_ttl: '5s', shared_memory_size: '15m'), timeout: 10) do |conf|
EventMachine.run do EventMachine.run do
start = Time.now start = Time.now
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub.stream do |chunk| sub.stream do |chunk|
response += chunk response_sub += chunk
end
sub.callback do
(Time.now - start).should be < 2 #should be disconnect right after receive the large message
response.should eql(body)
response = '' if response_sub.include?('A')
start = Time.now response_sub.should eql(large_message + 'A')
response_sub = ''
# check if getting old messages works fine too
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + ".b1").get :head => headers sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + ".b1").get :head => headers
sub_1.stream do |chunk| sub_1.stream do |chunk_1|
response += chunk response_sub_1 += chunk_1
if response_sub_1.include?('A')
response_sub_1.should eql(large_message + 'A')
response_sub_1 = ''
publish_message_inline(channel, headers, small_message + 'B')
end end
end
sub_1.callback do sub_1.callback do
(Time.now - start).should be > 2 #should be disconnected only when timeout happens fail("should not disconnect the client")
response.should eql(body) end
end
end
sub.callback do
fail("should not disconnect the client")
end
EM.add_timer(3) do
if response_sub.include?('B') && response_sub_1.include?('B')
response_sub.should eql(small_message + 'B')
response_sub_1.should eql(small_message + 'B')
large_message.size.should eql(4194304) # 4mb
small_message.size.should eql(10204) # 10k
EventMachine.stop EventMachine.stop
end end
end end
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body publish_message_inline(channel, headers, large_message + 'A')
end end
end end
end end
......
...@@ -384,6 +384,57 @@ describe "Subscriber WebSocket" do ...@@ -384,6 +384,57 @@ describe "Subscriber WebSocket" do
end end
end end
it "should publish large message" do
channel = 'ch_test_publish_large_message'
configuration = config.merge({
:shared_memory_size => '15m',
:message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}',
:extra_location => %q{
location ~ /ws/(.*)? {
# activate websocket mode for this location
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $1;
# allow subscriber to publish
push_stream_websocket_allow_publish on;
# store messages
push_stream_store_messages on;
}
}
})
small_message = "^|" + ("0123456789" * 1020) + "|$"
large_message = "^|" + ("0123456789" * 419430) + "|$"
received_messages = 0;
nginx_run_server(configuration, timeout: 10) do |conf|
EventMachine.run do
ws = WebSocket::EventMachine::Client.connect(:uri => "ws://#{nginx_host}:#{nginx_port}/ws/#{channel}")
ws.onmessage do |text, type|
received_messages += 1
msg = JSON.parse(text)
msg['channel'].should eql(channel)
if received_messages == 1
msg['message'].should eql(large_message)
msg['message'].size.should eql(4194304) # 4mb
ws.send small_message
elsif received_messages == 2
msg['message'].should eql(small_message)
msg['message'].size.should eql(10204) # 10kb
EventMachine.stop
end
end
EM.add_timer(1) do
ws.send large_message
end
end
end
end
it "should accept pong message" do it "should accept pong message" do
channel = 'ch_test_accept_pong_message' channel = 'ch_test_accept_pong_message'
frame = "%c%c%c%c%c%c" % [0x8A, 0x80, 0xBD, 0xD0, 0xE5, 0x2A] #send 'pong' frame frame = "%c%c%c%c%c%c" % [0x8A, 0x80, 0xBD, 0xD0, 0xE5, 0x2A] #send 'pong' frame
......
...@@ -33,6 +33,7 @@ void ngx_http_push_stream_delete_channels_data(ngx_http_push_s ...@@ -33,6 +33,7 @@ void ngx_http_push_stream_delete_channels_data(ngx_http_push_s
void ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force); void ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force);
void ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force); void ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_data_t *data); static ngx_inline void ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_data_t *data);
static void ngx_http_push_stream_flush_pending_output(ngx_http_request_t *r);
static ngx_inline void static ngx_inline void
...@@ -575,19 +576,111 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r) ...@@ -575,19 +576,111 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r)
ngx_int_t ngx_int_t
ngx_http_push_stream_output_filter(ngx_http_request_t *r, ngx_chain_t *in) ngx_http_push_stream_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{ {
ngx_http_core_loc_conf_t *clcf;
ngx_http_push_stream_module_ctx_t *ctx = NULL; ngx_http_push_stream_module_ctx_t *ctx = NULL;
ngx_int_t rc; ngx_int_t rc;
rc = ngx_http_output_filter(r, in); rc = ngx_http_output_filter(r, in);
if (rc == NGX_AGAIN) {
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
r->write_event_handler = ngx_http_push_stream_flush_pending_output;
if (ngx_handle_write_event(r->connection->write, clcf->send_lowat) != NGX_OK) {
return NGX_ERROR;
}
return NGX_OK;
}
if (rc == NGX_OK) {
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module)) != NULL) { if ((ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module)) != NULL) {
ngx_chain_update_chains(r->pool, &ctx->free, &ctx->busy, &in, (ngx_buf_tag_t) &ngx_http_push_stream_module); ngx_chain_update_chains(r->pool, &ctx->free, &ctx->busy, &in, (ngx_buf_tag_t) &ngx_http_push_stream_module);
} }
}
return rc; return rc;
} }
static void
ngx_http_push_stream_flush_pending_output(ngx_http_request_t *r)
{
int rc;
ngx_event_t *wev;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
c = r->connection;
wev = c->write;
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, wev->log, 0, "push stream module http writer handler: \"%V?%V\"", &r->uri, &r->args);
clcf = ngx_http_get_module_loc_conf(r->main, ngx_http_core_module);
if (wev->timedout) {
if (!wev->delayed) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "push stream module: client timed out");
c->timedout = 1;
ngx_http_finalize_request(r, NGX_HTTP_REQUEST_TIME_OUT);
return;
}
wev->timedout = 0;
wev->delayed = 0;
if (!wev->ready) {
ngx_add_timer(wev, clcf->send_timeout);
if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) {
ngx_http_finalize_request(r, 0);
}
return;
}
}
if (wev->delayed || r->aio) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, wev->log, 0, "push stream module http writer delayed");
if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) {
ngx_http_finalize_request(r, 0);
}
return;
}
rc = ngx_http_push_stream_output_filter(r, NULL);
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, "push stream module http writer output filter: %d, \"%V?%V\"", rc, &r->uri, &r->args);
if (rc == NGX_ERROR) {
ngx_http_finalize_request(r, rc);
return;
}
if (r->buffered || r->postponed || (r == r->main && c->buffered)) {
if (!wev->delayed) {
ngx_add_timer(wev, clcf->send_timeout);
}
if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) {
ngx_http_finalize_request(r, 0);
}
return;
}
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, wev->log, 0, "push stream module http writer done: \"%V?%V\"", &r->uri, &r->args);
r->write_event_handler = ngx_http_request_empty_handler;
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code) ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment