Commit 3ddb9d05 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

finalizing failed connections on sending messages or ping to subscribers

parent 2f9d07fc
...@@ -319,10 +319,17 @@ ngx_http_push_stream_send_worker_ping_message(void) ...@@ -319,10 +319,17 @@ ngx_http_push_stream_send_worker_ping_message(void)
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if ((cur->request != NULL) && (!cur->longpolling)) { if ((cur->request != NULL) && (!cur->longpolling)) {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(cur->request, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(cur->request, ngx_http_push_stream_module);
ngx_int_t rc;
if (pslcf->eventsource_support) { if (pslcf->eventsource_support) {
ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0); rc = ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
} else { } else {
ngx_http_push_stream_send_response_message(cur->request, NULL, ngx_http_push_stream_ping_msg); rc = ngx_http_push_stream_send_response_message(cur->request, NULL, ngx_http_push_stream_ping_msg);
}
if (rc == NGX_ERROR) {
ngx_http_push_stream_worker_subscriber_t *prev = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_send_response_finalize(cur->request);
cur = prev;
} }
} }
} }
...@@ -454,7 +461,7 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan ...@@ -454,7 +461,7 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
// now let's respond to some requests! // now let's respond to some requests!
while ((cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (cur->longpolling) { if (cur->longpolling) {
ngx_http_push_stream_subscriber_t *prev = (ngx_http_push_stream_subscriber_t *) ngx_queue_prev(&cur->queue); ngx_http_push_stream_subscriber_t *prev = (ngx_http_push_stream_subscriber_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_add_polling_headers(cur->request, msg->time, msg->tag, cur->request->pool); ngx_http_push_stream_add_polling_headers(cur->request, msg->time, msg->tag, cur->request->pool);
ngx_http_send_header(cur->request); ngx_http_send_header(cur->request);
...@@ -465,7 +472,11 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan ...@@ -465,7 +472,11 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
cur = prev; cur = prev;
} else { } else {
ngx_http_push_stream_send_response_message(cur->request, channel, msg); if (ngx_http_push_stream_send_response_message(cur->request, channel, msg) == NGX_ERROR) {
ngx_http_push_stream_subscriber_t *prev = (ngx_http_push_stream_subscriber_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_send_response_finalize(cur->request);
cur = prev;
}
} }
} }
} }
......
...@@ -437,7 +437,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r) ...@@ -437,7 +437,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
} }
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1); ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
ngx_http_finalize_request(r, NGX_HTTP_OK); ngx_http_finalize_request(r, NGX_DONE);
} }
static void static void
...@@ -450,7 +450,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_ ...@@ -450,7 +450,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_
ngx_http_send_header(r); ngx_http_send_header(r);
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1); ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
ngx_http_finalize_request(r, NGX_HTTP_NOT_MODIFIED); ngx_http_finalize_request(r, NGX_DONE);
} }
static void static void
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment