Commit bae2c292 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

fix ping and disconnect timers behaviour when working with long polling

parent 1a9e8c8a
......@@ -148,6 +148,7 @@ typedef struct {
ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_pid_t worker_subscribed_pid;
time_t expires;
ngx_flag_t longpolling;
} ngx_http_push_stream_worker_subscriber_t;
// cleaning supplies
......
......@@ -228,6 +228,7 @@ static ngx_int_t ngx_http_push_stream_send_response_content_header(ng
static void ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer);
static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r);
static void ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_memory_cleanup();
static ngx_int_t ngx_http_push_stream_buffer_cleanup();
......
......@@ -295,7 +295,11 @@ ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect)
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) {
ngx_http_push_stream_send_response_finalize(cur->request);
if (cur->longpolling) {
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(cur->request);
} else {
ngx_http_push_stream_send_response_finalize(cur->request);
}
} else {
break;
}
......@@ -313,7 +317,7 @@ ngx_http_push_stream_send_worker_ping_message(void)
if ((ngx_http_push_stream_ping_msg != NULL) && (!ngx_queue_empty(&sentinel->queue))) {
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (cur->request != NULL) {
if ((cur->request != NULL) && (!cur->longpolling)) {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(cur->request, ngx_http_push_stream_module);
if (pslcf->eventsource_support) {
ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
......
......@@ -224,6 +224,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_shmtx_unlock(&shpool->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
worker_subscriber->longpolling = 1;
ngx_http_push_stream_registry_subscriber_locked(worker_subscriber);
......@@ -438,6 +439,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
return NULL;
}
worker_subscriber->longpolling = 0;
worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid;
worker_subscriber->expires = (ngx_http_push_stream_module_main_conf->subscriber_connection_ttl == NGX_CONF_UNSET) ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->subscriber_connection_ttl);
......
......@@ -435,6 +435,18 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
ngx_http_finalize_request(r, NGX_HTTP_OK);
}
static void
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r)
{
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_subscriber_cleanup);
ngx_http_push_stream_add_polling_headers(r, ngx_time(), 0, r->pool);
r->headers_out.status = NGX_HTTP_NOT_MODIFIED;
ngx_http_send_header(r);
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
ngx_http_finalize_request(r, NGX_HTTP_NOT_MODIFIED);
}
static void
ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
......
......@@ -314,4 +314,50 @@ class TestSubscriberLongPolling < Test::Unit::TestCase
add_test_timeout
}
end
def config_test_disconnect_long_polling_subscriber_when_disconnect_timeout_is_set
@subscriber_connection_timeout = "15s"
end
def test_disconnect_long_polling_subscriber_when_disconnect_timeout_is_set
channel = 'ch_test_disconnect_long_polling_subscriber_when_disconnect_timeout_is_set'
start = Time.now
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :timeout => 30
sub.callback {
stop = Time.now
elapsed = time_diff_sec(start, stop)
assert(elapsed >= 15 && elapsed <= 20, "Disconnect was in #{elapsed} seconds")
assert_equal(304, sub.response_header.status, "Wrong status")
assert_equal(Time.now.utc.strftime("%a, %d %b %Y %T %Z"), sub.response_header['LAST_MODIFIED'].to_s, "Wrong header")
assert_equal("0", sub.response_header['ETAG'].to_s, "Wrong header")
assert_equal("", sub.response, "Wrong header")
EventMachine.stop
}
add_test_timeout(30)
}
end
def config_test_not_receive_ping_message
@subscriber_connection_timeout = "5s"
@ping_message_interval = "1s"
end
def test_not_receive_ping_message
channel = 'ch_test_disconnect_long_polling_subscriber_when_disconnect_timeout_is_set'
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :timeout => 30
sub.callback {
assert_equal(304, sub.response_header.status, "Wrong status")
assert_equal("", sub.response, "Wrong header")
EventMachine.stop
}
add_test_timeout(10)
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment