Commit 84739f70 authored by Wandenberg's avatar Wandenberg

add support to fragmented frames on websocket connection

parent 6b52c936
......@@ -224,8 +224,11 @@ typedef struct {
uint64_t payload_len;
u_char header[8];
u_char *payload;
u_char *last;
ngx_uint_t step;
ngx_buf_t buf;
ngx_str_t consolidated;
unsigned char fragmented:1;
unsigned char last_fragment:1;
} ngx_http_push_stream_frame_t;
typedef struct {
......
......@@ -400,22 +400,8 @@ describe "Subscriber WebSocket" do
channel = 'ch_test_publish_message_low_bitrate'
configuration = config.merge({
:shared_memory_size => '15m',
:message_template => '{\"channel\":\"~channel~\", \"message\":\"~text~\"}',
:extra_location => %q{
location ~ /ws/(.*)? {
# activate websocket mode for this location
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $1;
# allow subscriber to publish
push_stream_websocket_allow_publish on;
# store messages
push_stream_store_messages on;
}
}
shared_memory_size: '15m',
message_template: '{\"channel\":\"~channel~\", \"message\":\"~text~\"}',
})
count = 0
......@@ -734,4 +720,169 @@ describe "Subscriber WebSocket" do
end
end
end
it "should accept unmasked frames" do
channel = 'ch_test_publish_unmasked_frames'
configuration = config.merge({
shared_memory_size: '15m',
message_template: '{\"channel\":\"~channel~\", \"message\":\"~text~\"}',
subscriber_mode: 'long-polling',
})
nginx_run_server(configuration, timeout: 60) do |conf|
EventMachine.run do
frame = "%c%c%c%c%c%c%c" % [0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f] #send 'hello' frame
request = "GET /ws/#{channel} HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
socket.print(frame)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_unmasked_frames", "message":"Hello"}])
socket.close
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :timeout => 30
sub.callback do
expect(sub).to be_http_status(200)
response = JSON.parse(sub.response)
expect(response["channel"].to_s).to eql(channel)
expect(response["message"]).to eql("Hello")
EventMachine.stop
end
end
end
end
it "should accept masked frames" do
channel = 'ch_test_publish_masked_frames'
configuration = config.merge({
shared_memory_size: '15m',
message_template: '{\"channel\":\"~channel~\", \"message\":\"~text~\"}',
subscriber_mode: 'long-polling',
})
nginx_run_server(configuration, timeout: 60) do |conf|
EventMachine.run do
frame = "%c%c%c%c%c%c%c%c%c%c%c" % [0x81, 0x85, 0x37, 0xfa, 0x21, 0x3d, 0x7f, 0x9f, 0x4d, 0x51, 0x58] #send 'hello' frame
request = "GET /ws/#{channel} HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
socket.print(frame)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_masked_frames", "message":"Hello"}])
socket.close
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :timeout => 30
sub.callback do
expect(sub).to be_http_status(200)
response = JSON.parse(sub.response)
expect(response["channel"].to_s).to eql(channel)
expect(response["message"]).to eql("Hello")
EventMachine.stop
end
end
end
end
it "should accept fragmented unmasked frames" do
channel = 'ch_test_publish_fragmented_unmasked_frames'
configuration = config.merge({
shared_memory_size: '15m',
message_template: '{\"channel\":\"~channel~\", \"message\":\"~text~\"}',
subscriber_mode: 'long-polling',
})
nginx_run_server(configuration, timeout: 60) do |conf|
EventMachine.run do
frame_part1 = "%c%c%c%c%c" % [0x01, 0x03, 0x48, 0x65, 0x6c] #send 'Hel' frame
frame_part2 = "%c%c%c%c" % [0x80, 0x02, 0x6c, 0x6f] #send 'lo' frame
request = "GET /ws/#{channel} HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
socket.print(frame_part1)
socket.print(frame_part2)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_fragmented_unmasked_frames", "message":"Hello"}])
socket.close
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :timeout => 30
sub.callback do
expect(sub).to be_http_status(200)
response = JSON.parse(sub.response)
expect(response["channel"].to_s).to eql(channel)
expect(response["message"]).to eql("Hello")
EventMachine.stop
end
end
end
end
it "should accept all kinds of frames mixed" do
channel = 'ch_test_publish_frames_mixed'
configuration = config.merge({
shared_memory_size: '15m',
message_template: '{\"channel\":\"~channel~\", \"message\":\"~text~\"}',
})
nginx_run_server(configuration, timeout: 60) do |conf|
frame_unmasked = "%c%c%c%c%c%c%c" % [0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f] #send 'hello' frame
frame_masked = "%c%c%c%c%c%c%c%c%c%c%c" % [0x81, 0x85, 0x37, 0xfa, 0x21, 0x3d, 0x7f, 0x9f, 0x4d, 0x51, 0x58] #send 'hello' frame
frame_part1 = "%c%c%c%c%c" % [0x01, 0x03, 0x48, 0x65, 0x6c] #send 'Hel' frame
frame_part2 = "%c%c%c%c" % [0x80, 0x02, 0x6c, 0x6f] #send 'lo' frame
request = "GET /ws/#{channel} HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
socket.print(frame_unmasked)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_frames_mixed", "message":"Hello"}])
socket.print(frame_masked)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_frames_mixed", "message":"Hello"}])
socket.print(frame_part1)
socket.print(frame_part2)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_frames_mixed", "message":"Hello"}])
socket.print(frame_masked)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_frames_mixed", "message":"Hello"}])
socket.print(frame_unmasked)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_frames_mixed", "message":"Hello"}])
socket.print(frame_part1)
socket.print(frame_part2)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_frames_mixed", "message":"Hello"}])
socket.print(frame_unmasked)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_frames_mixed", "message":"Hello"}])
socket.print(frame_masked)
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_frames_mixed", "message":"Hello"}])
socket.close
end
end
end
......@@ -88,8 +88,11 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP;
ctx->frame->last = NULL;
ctx->frame->payload = NULL;
ctx->frame->last_fragment = 0;
ctx->frame->fragmented = 0;
ngx_str_set(&ctx->frame->consolidated, "");
ngx_http_push_stream_set_buffer(&ctx->frame->buf, ctx->frame->header, NULL, 8);
if ((sec_accept_header = ngx_http_push_stream_generate_websocket_accept_value(r, sec_key_header, ctx->temp_pool)) == NULL) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: could not generate security accept header value");
......@@ -187,10 +190,11 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ngx_event_t *rev;
ngx_connection_t *c;
uint64_t i;
ngx_buf_t buf;
ngx_queue_t *q;
u_char *aux, *last;
unsigned char opcode;
ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, ctx->frame->last, 8);
ngx_http_push_stream_set_buffer(&ctx->frame->buf, ctx->frame->buf.start, ctx->frame->buf.last, 0);
c = r->connection;
rev = c->read;
......@@ -203,7 +207,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
switch (ctx->frame->step) {
case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP:
//reading frame header
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 2)) != NGX_OK) {
if ((rc = ngx_http_push_stream_recv(c, rev, &ctx->frame->buf, 2)) != NGX_OK) {
goto exit;
}
......@@ -211,26 +215,65 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ctx->frame->rsv1 = (ctx->frame->header[0] >> 6) & 1;
ctx->frame->rsv2 = (ctx->frame->header[0] >> 5) & 1;
ctx->frame->rsv3 = (ctx->frame->header[0] >> 4) & 1;
ctx->frame->opcode = ctx->frame->header[0] & 0xf;
opcode = ctx->frame->header[0] & 0xf;
ctx->frame->mask = (ctx->frame->header[1] >> 7) & 1;
ctx->frame->payload_len = ctx->frame->header[1] & 0x7f;
if (ctx->frame->fin == 0) {
if (opcode == 0) {
if (!ctx->frame->fragmented) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: wrong websocket frames sequence");
goto close;
}
} else {
if (!ctx->frame->fragmented) {
ctx->frame->fragmented = 1;
ctx->frame->opcode = opcode;
}
}
} else {
if (opcode == 0) {
if (ctx->frame->fragmented) {
ctx->frame->last_fragment = 1;
} else {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: wrong websocket frames sequence");
goto close;
}
} else {
if (ctx->frame->fragmented) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: wrong websocket frames sequence");
goto close;
} else {
ctx->frame->last_fragment = 1;
ctx->frame->opcode = opcode;
}
}
}
if ((ctx->frame->payload_len == 126) || (ctx->frame->payload_len == 127)) {
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_REAL_SIZE_STEP;
ngx_http_push_stream_set_buffer(&ctx->frame->buf, ctx->frame->header, NULL, 8);
} else if (ctx->frame->mask) {
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_MASK_KEY_STEP;
ngx_http_push_stream_set_buffer(&ctx->frame->buf, ctx->frame->mask_key, NULL, 4);
} else {
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_PAYLOAD_STEP;
}
break;
case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_REAL_SIZE_STEP:
if (ctx->frame->payload_len == 126) {
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 2)) != NGX_OK) {
if ((rc = ngx_http_push_stream_recv(c, rev, &ctx->frame->buf, 2)) != NGX_OK) {
goto exit;
}
uint16_t len;
ngx_memcpy(&len, ctx->frame->header, 2);
ctx->frame->payload_len = ntohs(len);
} else if (ctx->frame->payload_len == 127) {
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 8)) != NGX_OK) {
if ((rc = ngx_http_push_stream_recv(c, rev, &ctx->frame->buf, 8)) != NGX_OK) {
goto exit;
}
uint64_t len;
......@@ -238,20 +281,21 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ctx->frame->payload_len = ngx_http_push_stream_ntohll(len);
}
if (ctx->frame->mask) {
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_MASK_KEY_STEP;
ngx_http_push_stream_set_buffer(&ctx->frame->buf, ctx->frame->mask_key, NULL, 4);
} else {
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_PAYLOAD_STEP;
}
break;
case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_MASK_KEY_STEP:
if (ctx->frame->mask) {
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 4)) != NGX_OK) {
if ((rc = ngx_http_push_stream_recv(c, rev, &ctx->frame->buf, 4)) != NGX_OK) {
goto exit;
}
ngx_memcpy(ctx->frame->mask_key, buf.start, 4);
}
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_PAYLOAD_STEP;
break;
......@@ -263,8 +307,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
(ctx->frame->opcode != NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_OPCODE) &&
(ctx->frame->opcode != NGX_HTTP_PUSH_STREAM_WEBSOCKET_PONG_OPCODE)
) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE), 1);
goto finalize;
goto close;
}
if (ctx->frame->payload_len > 0) {
......@@ -275,17 +318,18 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
goto finalize;
}
}
if (ctx->frame->payload == NULL) {
if ((ctx->frame->payload = ngx_pcalloc(ctx->temp_pool, ctx->frame->payload_len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for payload");
goto finalize;
}
ctx->frame->last = ctx->frame->payload;
ngx_http_push_stream_set_buffer(&ctx->frame->buf, ctx->frame->payload, NULL, ctx->frame->payload_len);
}
ngx_http_push_stream_set_buffer(&buf, ctx->frame->payload, ctx->frame->last, ctx->frame->payload_len);
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, ctx->frame->payload_len)) != NGX_OK) {
if ((rc = ngx_http_push_stream_recv(c, rev, &ctx->frame->buf, ctx->frame->payload_len)) != NGX_OK) {
goto exit;
}
......@@ -299,7 +343,28 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
goto finalize;
}
if (cf->websocket_allow_publish && (ctx->frame->opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_OPCODE)) {
if (ctx->frame->fragmented) {
if (ctx->frame->consolidated.len == 0) {
ctx->frame->consolidated.data = ctx->frame->payload;
ctx->frame->consolidated.len = ctx->frame->payload_len;
} else {
if ((aux = ngx_pcalloc(ctx->temp_pool, ctx->frame->payload_len + ctx->frame->consolidated.len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for consolidated payload for %ui bytes", ctx->frame->payload_len + ctx->frame->consolidated.len);
goto finalize;
}
last = ngx_cpymem(aux, ctx->frame->consolidated.data, ctx->frame->consolidated.len);
ngx_memcpy(last, ctx->frame->payload, ctx->frame->payload_len);
ctx->frame->consolidated.data = aux;
ctx->frame->consolidated.len = ctx->frame->payload_len + ctx->frame->consolidated.len;
}
if (ctx->frame->last_fragment) {
ctx->frame->payload = ctx->frame->consolidated.data;
ctx->frame->payload_len = ctx->frame->consolidated.len;
}
}
if (cf->websocket_allow_publish && ctx->frame->last_fragment && (ctx->frame->opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_OPCODE)) {
for (q = ngx_queue_head(&ctx->subscriber->subscriptions); q != ngx_queue_sentinel(&ctx->subscriber->subscriptions); q = ngx_queue_next(q)) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, queue);
if (subscription->channel->for_events) {
......@@ -312,23 +377,28 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
}
}
}
}
if (ctx->frame->last_fragment) {
ctx->frame->last_fragment = 0;
ctx->frame->fragmented = 0;
ngx_str_set(&ctx->frame->consolidated, "");
if (ctx->temp_pool != NULL) {
ngx_destroy_pool(ctx->temp_pool);
ctx->temp_pool = NULL;
}
}
ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP;
ctx->frame->last = NULL;
ctx->frame->payload = NULL;
ngx_http_push_stream_set_buffer(&ctx->frame->buf, ctx->frame->header, NULL, 8);
if (ctx->frame->opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_OPCODE) {
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PONG_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PONG_LAST_FRAME_BYTE), 1);
}
if (ctx->frame->opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE), 1);
goto finalize;
goto close;
}
return;
......@@ -339,13 +409,10 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
goto finalize;
break;
}
ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, NULL, 8);
}
exit:
if (rc == NGX_AGAIN) {
ctx->frame->last = buf.last;
if (!c->read->ready) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: failed to restore read events");
......@@ -363,6 +430,9 @@ exit:
return;
close:
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE), 1);
finalize:
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context);
ngx_http_finalize_request(r, c->error ? NGX_HTTP_CLIENT_CLOSED_REQUEST : NGX_OK);
......@@ -372,7 +442,12 @@ finalize:
ngx_int_t
ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len)
{
ssize_t n = c->recv(c, buf->last, (ssize_t) len - (buf->last - buf->start));
ssize_t size = len - (buf->last - buf->start);
if (size == 0) {
return NGX_OK;
}
ssize_t n = c->recv(c, buf->last, size);
if (n == NGX_AGAIN) {
return NGX_AGAIN;
......@@ -398,7 +473,7 @@ ngx_http_push_stream_set_buffer(ngx_buf_t *buf, u_char *start, u_char *last, ssi
buf->start = start;
buf->pos = buf->start;
buf->last = (last != NULL) ? last : start;
buf->end = buf->start + len;
buf->end = len ? buf->start + len : buf->end;
buf->temporary = 0;
buf->memory = 1;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment