Commit 8147b89e authored by Wandenberg's avatar Wandenberg

fix on websocket connections when they have reading errors

parent 159c2749
...@@ -785,6 +785,10 @@ ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt ...@@ -785,6 +785,10 @@ ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt
{ {
ngx_pool_cleanup_t *c; ngx_pool_cleanup_t *c;
if (p == NULL) {
return;
}
for (c = p->cleanup; c; c = c->next) { for (c = p->cleanup; c; c = c->next) {
if ((c->handler == handler) && (c->data != NULL)) { if ((c->handler == handler) && (c->data != NULL)) {
c->handler(c->data); c->handler(c->data);
...@@ -1409,6 +1413,8 @@ ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r) ...@@ -1409,6 +1413,8 @@ ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r)
{ {
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module); ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
r->read_event_handler = ngx_http_request_empty_handler;
if (ctx != NULL) { if (ctx != NULL) {
if ((ctx->disconnect_timer != NULL) && ctx->disconnect_timer->timer_set) { if ((ctx->disconnect_timer != NULL) && ctx->disconnect_timer->timer_set) {
ngx_del_timer(ctx->disconnect_timer); ngx_del_timer(ctx->disconnect_timer);
......
...@@ -258,14 +258,12 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -258,14 +258,12 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
if (ctx->temp_pool == NULL) { if (ctx->temp_pool == NULL) {
if ((ctx->temp_pool = ngx_create_pool(4096, r->connection->log)) == NULL) { if ((ctx->temp_pool = ngx_create_pool(4096, r->connection->log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool");
ngx_http_finalize_request(r, NGX_OK); goto finalize;
return;
} }
if ((ctx->frame->payload = ngx_pcalloc(ctx->temp_pool, ctx->frame->payload_len)) == NULL) { if ((ctx->frame->payload = ngx_pcalloc(ctx->temp_pool, ctx->frame->payload_len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for payload"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for payload");
ngx_http_finalize_request(r, NGX_OK); goto finalize;
return;
} }
ctx->frame->last = ctx->frame->payload; ctx->frame->last = ctx->frame->payload;
...@@ -287,8 +285,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -287,8 +285,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
for (q = ngx_queue_head(&ctx->subscriber->subscriptions); q != ngx_queue_sentinel(&ctx->subscriber->subscriptions); q = ngx_queue_next(q)) { for (q = ngx_queue_head(&ctx->subscriber->subscriptions); q != ngx_queue_sentinel(&ctx->subscriber->subscriptions); q = ngx_queue_next(q)) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, queue); ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, queue);
if (ngx_http_push_stream_add_msg_to_channel(r, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, ctx->temp_pool) != NGX_OK) { if (ngx_http_push_stream_add_msg_to_channel(r, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, ctx->temp_pool) != NGX_OK) {
ngx_http_finalize_request(r, NGX_OK); goto finalize;
return;
} }
} }
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1); ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
...@@ -309,35 +306,50 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -309,35 +306,50 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
return; return;
break; break;
default:
ngx_log_debug(NGX_LOG_DEBUG, c->log, 0, "push stream module: unknown websocket step (%d)", ctx->frame->step);
goto finalize;
break;
} }
ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, NULL, 8); ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, NULL, 8);
} }
exit: exit:
if (rc == NGX_AGAIN) { if (rc == NGX_AGAIN) {
ctx->frame->last = buf.last; ctx->frame->last = buf.last;
if (!c->read->ready) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: failed to restore read events"); ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: failed to restore read events");
goto finalize;
ngx_http_finalize_request(r, NGX_OK); }
} }
return;
} }
if (rc == NGX_ERROR) { if (rc == NGX_ERROR) {
rev->eof = 1;
c->error = 1;
ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: client closed prematurely connection"); ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: client closed prematurely connection");
goto finalize;
}
ngx_http_finalize_request(r, NGX_OK);
return; return;
}
finalize:
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context);
ngx_http_finalize_request(r, c->error ? NGX_HTTP_CLIENT_CLOSED_REQUEST : NGX_OK);
} }
ngx_int_t ngx_int_t
ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len) ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len)
{ {
if (c->error || c->timedout || c->close || c->destroyed || rev->closed || rev->eof) {
return NGX_ERROR;
}
ssize_t n = c->recv(c, buf->last, len); ssize_t n = c->recv(c, buf->last, len);
if (n == NGX_AGAIN) { if (n == NGX_AGAIN) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment