Commit 44856205 authored by Wandenberg's avatar Wandenberg

send a reason on close frame in websocket connection indicating the error

parent c7071475
...@@ -324,6 +324,8 @@ static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE[] = {NG ...@@ -324,6 +324,8 @@ static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE[] = {NG
static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_16_BYTE = 126; static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_16_BYTE = 126;
static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_64_BYTE = 127; static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_64_BYTE = 127;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_REASON = ngx_string("\x03\xF0{\"http_status\": %d, \"explain\":\"%V\"}");
// other stuff // other stuff
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_DELETE_METHODS = ngx_string("GET, POST, PUT, DELETE"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_DELETE_METHODS = ngx_string("GET, POST, PUT, DELETE");
......
...@@ -233,7 +233,7 @@ static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_re ...@@ -233,7 +233,7 @@ static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_re
static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name); static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message); static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool); static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *text, off_t len, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *opcode, off_t opcode_len, const u_char *text, off_t text_len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool);
...@@ -243,6 +243,7 @@ static ngx_int_t ngx_http_push_stream_send_response_message(ngx_http_ ...@@ -243,6 +243,7 @@ static ngx_int_t ngx_http_push_stream_send_response_message(ngx_http_
static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer); static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer);
static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r); static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r);
static void ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r); static void ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_websocket_close_frame(ngx_http_request_t *r, ngx_uint_t http_status, const ngx_str_t *reason);
static ngx_int_t ngx_http_push_stream_memory_cleanup(); static ngx_int_t ngx_http_push_stream_memory_cleanup();
static ngx_int_t ngx_http_push_stream_buffer_cleanup(); static ngx_int_t ngx_http_push_stream_buffer_cleanup();
......
...@@ -191,6 +191,21 @@ describe "Subscriber WebSocket" do ...@@ -191,6 +191,21 @@ describe "Subscriber WebSocket" do
end end
end end
it "should receive explain message on close frame" do
channel = 'ch_test_receive_explain_message_close_frame'
request = "GET /ws/#{channel}.b1 HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
nginx_run_server(config.merge(:subscriber_connection_ttl => '1s', :authorized_channels_only => 'on')) do |conf|
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
#wait for disconnect
sleep(1)
body, dummy = read_response_on_socket(socket, "\"}")
body.should eql("\x88I\x03\xF0{\"http_status\": 403, \"explain\":\"Subscriber could not create channels.\"}")
end
end
it "should receive footer template" do it "should receive footer template" do
channel = 'ch_test_receive_footer_template' channel = 'ch_test_receive_footer_template'
request = "GET /ws/#{channel}.b1 HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n" request = "GET /ws/#{channel}.b1 HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
......
...@@ -640,7 +640,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -640,7 +640,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
} else if (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) { } else if (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
// formatting header and footer template for chunk transfer // formatting header and footer template for chunk transfer
if (conf->header_template.len > 0) { if (conf->header_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->header_template.data, conf->header_template.len, cf->pool); ngx_str_t *aux = ngx_http_push_stream_get_formatted_websocket_frame(&NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE), conf->header_template.data, conf->header_template.len, cf->pool);
if (aux == NULL) { if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format header template"); ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format header template");
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
...@@ -650,7 +650,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -650,7 +650,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
} }
if (conf->footer_template.len > 0) { if (conf->footer_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->footer_template.data, conf->footer_template.len, cf->pool); ngx_str_t *aux = ngx_http_push_stream_get_formatted_websocket_frame(&NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE), conf->footer_template.data, conf->footer_template.len, cf->pool);
if (aux == NULL) { if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format footer template"); ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format footer template");
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
......
...@@ -35,6 +35,7 @@ static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_cha ...@@ -35,6 +35,7 @@ static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_cha
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log); static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool); static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool);
static ngx_http_push_stream_padding_t *ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r); static ngx_http_push_stream_padding_t *ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r);
void ngx_http_push_stream_websocket_reading(ngx_http_request_t *r);
static ngx_int_t static ngx_int_t
ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
...@@ -416,7 +417,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque ...@@ -416,7 +417,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
r->main->count++; r->main->count++;
// responding subscriber // responding subscriber
r->read_event_handler = ngx_http_test_reading; r->read_event_handler = (cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) ? ngx_http_push_stream_websocket_reading : ngx_http_test_reading;
r->write_event_handler = ngx_http_request_empty_handler; r->write_event_handler = ngx_http_request_empty_handler;
if (cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) { if (cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) {
......
...@@ -287,7 +287,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -287,7 +287,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
ngx_str_t *text = aux; ngx_str_t *text = aux;
if (cur->websocket) { if (cur->websocket) {
text = ngx_http_push_stream_get_formatted_websocket_frame(aux->data, aux->len, temp_pool); text = ngx_http_push_stream_get_formatted_websocket_frame(&NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE), aux->data, aux->len, temp_pool);
} }
ngx_str_t *formmated = (msg->formatted_messages + i); ngx_str_t *formmated = (msg->formatted_messages + i);
...@@ -673,6 +673,22 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_ ...@@ -673,6 +673,22 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_
ngx_http_finalize_request(r, NGX_DONE); ngx_http_finalize_request(r, NGX_DONE);
} }
static ngx_int_t
ngx_http_push_stream_send_websocket_close_frame(ngx_http_request_t *r, ngx_uint_t http_status, const ngx_str_t *reason)
{
ngx_int_t rc;
ngx_str_t *text = ngx_http_push_stream_create_str(r->pool, reason->len + NGX_INT_T_LEN + NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_REASON.len);
if (text == NULL) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE), 1);
} else {
u_char *last = ngx_sprintf(text->data, (char *) NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_REASON.data, http_status, reason);
text->len = last - text->data;
ngx_str_t *frame = ngx_http_push_stream_get_formatted_websocket_frame(NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE, 1, text->data, text->len, r->pool);
rc = ngx_http_push_stream_send_response_text(r, (const u_char *) frame->data, frame->len, 1);
}
return (rc == NGX_ERROR) ? NGX_DONE : NGX_OK;
}
static ngx_flag_t static ngx_flag_t
ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool) ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool)
{ {
...@@ -1286,14 +1302,14 @@ ngx_http_push_stream_ntohll(uint64_t value) { ...@@ -1286,14 +1302,14 @@ ngx_http_push_stream_ntohll(uint64_t value) {
static ngx_str_t * static ngx_str_t *
ngx_http_push_stream_get_formatted_websocket_frame(const u_char *text, off_t len, ngx_pool_t *temp_pool) ngx_http_push_stream_get_formatted_websocket_frame(const u_char *opcode, off_t opcode_len, const u_char *text, off_t len, ngx_pool_t *temp_pool)
{ {
ngx_str_t *frame; ngx_str_t *frame;
u_char *last; u_char *last;
frame = ngx_http_push_stream_create_str(temp_pool, NGX_HTTP_PUSH_STREAM_WEBSOCKET_FRAME_HEADER_MAX_LENGTH + len); frame = ngx_http_push_stream_create_str(temp_pool, NGX_HTTP_PUSH_STREAM_WEBSOCKET_FRAME_HEADER_MAX_LENGTH + len);
if (frame != NULL) { if (frame != NULL) {
last = ngx_copy(frame->data, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE)); last = ngx_copy(frame->data, opcode, opcode_len);
if (len <= 125) { if (len <= 125) {
last = ngx_copy(last, &len, 1); last = ngx_copy(last, &len, 1);
......
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include <ngx_http_push_stream_module_websocket.h> #include <ngx_http_push_stream_module_websocket.h>
static ngx_str_t *ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool); static ngx_str_t *ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool);
void ngx_http_push_stream_websocket_reading(ngx_http_request_t *r);
static ngx_int_t static ngx_int_t
ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
...@@ -79,6 +78,18 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -79,6 +78,18 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if ((sec_accept_header = ngx_http_push_stream_generate_websocket_accept_value(r, sec_key_header, ctx->temp_pool)) == NULL) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: could not generate security accept header value");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_UPGRADE, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_UPGRADE);
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_CONNECTION, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_CONNECTION);
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_ACCEPT, sec_accept_header);
r->headers_out.status_line = NGX_HTTP_PUSH_STREAM_101_STATUS_LINE;
ngx_http_push_stream_send_only_added_headers(r);
//get channels ids and backtracks from path //get channels ids and backtracks from path
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, ctx->temp_pool); channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, ctx->temp_pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) { if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
...@@ -88,7 +99,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -88,7 +99,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers //validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
if (ngx_http_push_stream_validate_channels(r, channels_ids, &status_code, &explain_error_message) == NGX_ERROR) { if (ngx_http_push_stream_validate_channels(r, channels_ids, &status_code, &explain_error_message) == NGX_ERROR) {
return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message); return ngx_http_push_stream_send_websocket_close_frame(r, status_code, explain_error_message);
} }
// get control values // get control values
...@@ -96,26 +107,13 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -96,26 +107,13 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
// stream access // stream access
if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) { if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR; return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
}
if ((sec_accept_header = ngx_http_push_stream_generate_websocket_accept_value(r, sec_key_header, ctx->temp_pool)) == NULL) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: could not generate security accept heade value");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_UPGRADE, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_UPGRADE);
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_CONNECTION, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_CONNECTION);
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_ACCEPT, sec_accept_header);
r->headers_out.status_line = NGX_HTTP_PUSH_STREAM_101_STATUS_LINE;
r->read_event_handler = ngx_http_push_stream_websocket_reading;
ngx_http_push_stream_send_only_added_headers(r);
// sending response content header // sending response content header
if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) { if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
...@@ -123,14 +121,14 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -123,14 +121,14 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
if (rc == NGX_ERROR) { if (rc == NGX_ERROR) {
return NGX_HTTP_INTERNAL_SERVER_ERROR; return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
} }
// adding subscriber to channel(s) and send backtrack messages // adding subscriber to channel(s) and send backtrack messages
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) { if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR; return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment