Commit 312f330a authored by Wandenberg's avatar Wandenberg

refactor on ngx_http_push_stream_websocket_reading to deal with NGX_AGAIN

parent 8e340def
......@@ -178,6 +178,21 @@ typedef struct {
ngx_uint_t backtrack_messages;
} ngx_http_push_stream_requested_channel_t;
typedef struct {
unsigned char fin:1;
unsigned char rsv1:1;
unsigned char rsv2:1;
unsigned char rsv3:1;
unsigned char opcode:4;
unsigned char mask:1;
unsigned char mask_key[4];
uint64_t payload_len;
u_char header[8];
u_char *payload;
u_char *last;
ngx_uint_t step;
} ngx_http_push_stream_frame_t;
typedef struct {
ngx_event_t *disconnect_timer;
ngx_event_t *ping_timer;
......@@ -189,6 +204,7 @@ typedef struct {
ngx_http_push_stream_padding_t *padding;
ngx_str_t *callback;
ngx_http_push_stream_requested_channel_t *requested_channels;
ngx_http_push_stream_frame_t *frame;
} ngx_http_push_stream_module_ctx_t;
// messages to worker processes
......
......@@ -33,18 +33,11 @@
#include <ngx_http_push_stream_module_utils.h>
#include <ngx_http_push_stream_module_subscriber.h>
typedef struct {
unsigned char fin:1;
unsigned char rsv1:1;
unsigned char rsv2:1;
unsigned char rsv3:1;
unsigned char opcode:4;
unsigned char mask:1;
unsigned char mask_key[4];
uint64_t payload_len;
u_char *payload;
} ngx_http_push_stream_frame_t;
static ngx_int_t ngx_http_push_stream_websocket_handler(ngx_http_request_t *r);
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP 0
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_REAL_SIZE_STEP 1
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_MASK_KEY_STEP 2
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_PAYLOAD_STEP 3
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_WEBSOCKET_H_ */
......@@ -26,7 +26,8 @@
#include <ngx_http_push_stream_module_websocket.h>
ngx_str_t *ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool);
ngx_int_t ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, u_char *buf, ssize_t len);
ngx_int_t ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len);
void ngx_http_push_stream_set_buffer(ngx_buf_t *buf, u_char *start, u_char *last, ssize_t len);
static ngx_int_t
ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
......@@ -83,6 +84,14 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if ((ctx->frame = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_frame_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create frame structure");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP;
ctx->frame->last = NULL;
ctx->frame->payload = NULL;
if ((sec_accept_header = ngx_http_push_stream_generate_websocket_accept_value(r, sec_key_header, ctx->temp_pool)) == NULL) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: could not generate security accept header value");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
......@@ -177,121 +186,190 @@ void
ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
{
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
u_char buf[8];
ngx_int_t rc;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_int_t rc = NGX_OK;
ngx_event_t *rev;
ngx_connection_t *c;
ngx_http_push_stream_frame_t frame;
ngx_str_t *aux;
uint64_t i;
ngx_pool_t *temp_pool = NULL;
ngx_queue_t *cur = NULL;
ngx_buf_t buf;
ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, ctx->frame->last, 8);
c = r->connection;
rev = c->read;
for (;;) {
switch (ctx->frame->step) {
case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP:
//reading frame header
if ((rc = ngx_http_push_stream_recv(c, rev, buf, 2)) != NGX_OK) {
goto closed;
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 2)) != NGX_OK) {
goto exit;
}
frame.fin = (buf[0] >> 7) & 1;
frame.rsv1 = (buf[0] >> 6) & 1;
frame.rsv2 = (buf[0] >> 5) & 1;
frame.rsv3 = (buf[0] >> 4) & 1;
frame.opcode = buf[0] & 0xf;
ctx->frame->fin = (ctx->frame->header[0] >> 7) & 1;
ctx->frame->rsv1 = (ctx->frame->header[0] >> 6) & 1;
ctx->frame->rsv2 = (ctx->frame->header[0] >> 5) & 1;
ctx->frame->rsv3 = (ctx->frame->header[0] >> 4) & 1;
ctx->frame->opcode = ctx->frame->header[0] & 0xf;
ctx->frame->mask = (ctx->frame->header[1] >> 7) & 1;
ctx->frame->payload_len = ctx->frame->header[1] & 0x7f;
frame.mask = (buf[1] >> 7) & 1;
frame.payload_len = buf[1] & 0x7f;
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_REAL_SIZE_STEP;
if (frame.payload_len == 126) {
if ((rc = ngx_http_push_stream_recv(c, rev, buf, 2)) != NGX_OK) {
goto closed;
break;
case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_REAL_SIZE_STEP:
if (ctx->frame->payload_len == 126) {
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 2)) != NGX_OK) {
goto exit;
}
uint16_t len;
ngx_memcpy(&len, buf, 2);
frame.payload_len = ntohs(len);
} else if (frame.payload_len == 127) {
if ((rc = ngx_http_push_stream_recv(c, rev, buf, 8)) != NGX_OK) {
goto closed;
ngx_memcpy(&len, ctx->frame->header, 2);
ctx->frame->payload_len = ntohs(len);
} else if (ctx->frame->payload_len == 127) {
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 8)) != NGX_OK) {
goto exit;
}
uint64_t len;
ngx_memcpy(&len, buf, 8);
frame.payload_len = ngx_http_push_stream_ntohll(len);
ngx_memcpy(&len, ctx->frame->header, 8);
ctx->frame->payload_len = ngx_http_push_stream_ntohll(len);
}
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_MASK_KEY_STEP;
break;
case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_MASK_KEY_STEP:
if (ctx->frame->mask) {
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 4)) != NGX_OK) {
goto exit;
}
if (frame.mask && ((rc = ngx_http_push_stream_recv(c, rev, frame.mask_key, 4)) != NGX_OK)) {
goto closed;
ngx_memcpy(ctx->frame->mask_key, buf.start, 4);
}
if (frame.payload_len > 0) {
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_PAYLOAD_STEP;
break;
case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_PAYLOAD_STEP:
if (ctx->frame->payload_len > 0) {
//create a temporary pool to allocate temporary elements
if ((temp_pool = ngx_create_pool(4096, r->connection->log)) == NULL) {
if (ctx->temp_pool == NULL) {
if ((ctx->temp_pool = ngx_create_pool(4096, r->connection->log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool");
ngx_http_finalize_request(r, NGX_OK);
return;
}
aux = ngx_http_push_stream_create_str(temp_pool, frame.payload_len);
if ((rc = ngx_http_push_stream_recv(c, rev, aux->data, (ssize_t) frame.payload_len)) != NGX_OK) {
goto closed;
if ((ctx->frame->payload = ngx_pcalloc(ctx->temp_pool, ctx->frame->payload_len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for payload");
ngx_http_finalize_request(r, NGX_OK);
return;
}
if (cf->websocket_allow_publish && (frame.opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_OPCODE)) {
frame.payload = aux->data;
if (frame.mask) {
for (i = 0; i < frame.payload_len; i++) {
frame.payload[i] = frame.payload[i] ^ frame.mask_key[i % 4];
ctx->frame->last = ctx->frame->payload;
}
ngx_http_push_stream_set_buffer(&buf, ctx->frame->payload, ctx->frame->last, ctx->frame->payload_len);
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, (ssize_t) (buf.end - buf.last))) != NGX_OK) {
goto exit;
}
if (cf->websocket_allow_publish && (ctx->frame->opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_OPCODE)) {
if (ctx->frame->mask) {
for (i = 0; i < ctx->frame->payload_len; i++) {
ctx->frame->payload[i] = ctx->frame->payload[i] ^ ctx->frame->mask_key[i % 4];
}
}
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
cur = &ctx->subscriber->subscriptions_sentinel.queue;
while ((cur = ngx_queue_next(cur)) != &ctx->subscriber->subscriptions_sentinel.queue) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, queue);
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, frame.payload, frame.payload_len, NULL, NULL, temp_pool) == NULL) {
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, ctx->temp_pool) == NULL) {
ngx_http_finalize_request(r, NGX_OK);
ngx_destroy_pool(temp_pool);
return;
}
}
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
}
ngx_destroy_pool(temp_pool);
if (ctx->temp_pool != NULL) {
ngx_destroy_pool(ctx->temp_pool);
ctx->temp_pool = NULL;
}
}
if (frame.opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE) {
if (ctx->frame->opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE) {
ngx_http_push_stream_send_response_finalize(r);
}
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP;
ctx->frame->last = NULL;
return;
closed:
if (temp_pool != NULL) {
ngx_destroy_pool(temp_pool);
break;
}
ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, NULL, 8);
}
exit:
if (rc == NGX_AGAIN) {
ctx->frame->last = buf.last;
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: failed to restore read events");
ngx_http_finalize_request(r, NGX_OK);
}
return;
}
if (rc == NGX_ERROR) {
ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "client closed prematurely connection");
ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: client closed prematurely connection");
ngx_http_finalize_request(r, NGX_OK);
return;
}
}
ngx_int_t
ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, u_char *buf, ssize_t len)
ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len)
{
ssize_t n = c->recv(c, buf, len);
if (n == len) {
return NGX_OK;
}
ssize_t n = c->recv(c, buf->last, len);
if (n == NGX_AGAIN) {
return NGX_AGAIN;
}
if ((n == NGX_ERROR) || (n == 0)) {
return NGX_ERROR;
}
buf->last += n;
if (n < len) {
return NGX_AGAIN;
}
return NGX_OK;
}
void
ngx_http_push_stream_set_buffer(ngx_buf_t *buf, u_char *start, u_char *last, ssize_t len)
{
buf->start = start;
buf->pos = buf->start;
buf->last = (last != NULL) ? last : start;
buf->end = buf->start + len;
buf->temporary = 1;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment