Commit 47ec083e authored by Wandenberg's avatar Wandenberg

fix calls to recv when the buffer was partially filled on websocket connections

parent fe22345a
......@@ -428,6 +428,63 @@ describe "Subscriber WebSocket" do
end
end
it "should publish message with a low bitrate" do
channel = 'ch_test_publish_message_low_bitrate'
configuration = config.merge({
:shared_memory_size => '15m',
:message_template => '{\"channel\":\"~channel~\", \"message\":\"~text~\"}',
:extra_location => %q{
location ~ /ws/(.*)? {
# activate websocket mode for this location
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $1;
# allow subscriber to publish
push_stream_websocket_allow_publish on;
# store messages
push_stream_store_messages on;
}
}
})
count = 0
nginx_run_server(configuration, timeout: 60) do |conf|
EventMachine.run do
frame = "%c%c%c%c%c%c%c%c%c%c%c" % [0x81, 0x85, 0x37, 0xfa, 0x21, 0x3d, 0x7f, 0x9f, 0x4d, 0x51, 0x58] #send 'hello' frame
request = "GET /ws/#{channel} HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
EM.add_periodic_timer(2) do
socket.print(frame[count])
count += 1
end
EM.add_timer(frame.size * 3) do
pub = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :timeout => 30
pub.callback do
body, dummy = read_response_on_socket(socket, "llo")
expect(body).to include(%[{"channel":"ch_test_publish_message_low_bitrate", "message":"Hello"}])
socket.close
expect(pub).to be_http_status(200).with_body
response = JSON.parse(pub.response)
expect(response["channel"].to_s).to eql(channel)
expect(response["published_messages"].to_i).to eql(1)
expect(response["stored_messages"].to_i).to eql(1)
expect(response["subscribers"].to_i).to eql(1)
EventMachine.stop
end
end
end
end
end
it "should accept pong message" do
channel = 'ch_test_accept_pong_message'
frame = "%c%c%c%c%c%c" % [0x8A, 0x80, 0xBD, 0xD0, 0xE5, 0x2A] #send 'pong' frame
......
......@@ -276,7 +276,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ngx_http_push_stream_set_buffer(&buf, ctx->frame->payload, ctx->frame->last, ctx->frame->payload_len);
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, (ssize_t) (buf.end - buf.last))) != NGX_OK) {
if ((rc = ngx_http_push_stream_recv(c, rev, &buf, ctx->frame->payload_len)) != NGX_OK) {
goto exit;
}
......@@ -355,7 +355,7 @@ finalize:
ngx_int_t
ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len)
{
ssize_t n = c->recv(c, buf->last, len);
ssize_t n = c->recv(c, buf->last, (ssize_t) len - (buf->last - buf->start));
if (n == NGX_AGAIN) {
return NGX_AGAIN;
......@@ -367,7 +367,7 @@ ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf,
buf->last += n;
if (n < len) {
if ((buf->last - buf->start) < len) {
return NGX_AGAIN;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment