Commit 71c511d4 authored by Wandenberg's avatar Wandenberg

fix send messages through websocket connection when the server is using kqueue

parent 800ef8ac
# encoding: ascii
require 'spec_helper' require 'spec_helper'
describe "Subscriber WebSocket" do describe "Subscriber WebSocket" do
...@@ -247,7 +248,7 @@ describe "Subscriber WebSocket" do ...@@ -247,7 +248,7 @@ describe "Subscriber WebSocket" do
socket = open_socket(nginx_host, nginx_port) socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n") socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket, "aaa") headers, body = read_response_on_socket(socket, "aaa")
body.should match_the_pattern(/^\201~\377\377aaa/) body.should match_the_pattern(/^\201\176\377\377aaa/)
end end
end end
...@@ -319,11 +320,8 @@ describe "Subscriber WebSocket" do ...@@ -319,11 +320,8 @@ describe "Subscriber WebSocket" do
headers, body = read_response_on_socket(socket) headers, body = read_response_on_socket(socket)
socket.print(frame) socket.print(frame)
body, dummy = read_response_on_socket(socket) body, dummy = read_response_on_socket(socket, "ch1")
body.should eql("\211\000") body.should eql("\211\000\x81.{\"channel\":\"ch2\", \"id\":\"1\", \"message\":\"hello\"}\x81.{\"channel\":\"ch1\", \"id\":\"1\", \"message\":\"hello\"}")
body, dummy = read_response_on_socket(socket)
body.should eql("\x81.{\"channel\":\"ch2\", \"id\":\"1\", \"message\":\"hello\"}\x81.{\"channel\":\"ch1\", \"id\":\"1\", \"message\":\"hello\"}")
EventMachine.run do EventMachine.run do
......
...@@ -25,7 +25,8 @@ ...@@ -25,7 +25,8 @@
#include <ngx_http_push_stream_module_websocket.h> #include <ngx_http_push_stream_module_websocket.h>
static ngx_str_t *ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool); ngx_str_t *ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool);
ngx_int_t ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, u_char *buf, ssize_t len);
static ngx_int_t static ngx_int_t
ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
...@@ -144,7 +145,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -144,7 +145,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
} }
static ngx_str_t * ngx_str_t *
ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool) ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool)
{ {
#if (NGX_HAVE_SHA1) #if (NGX_HAVE_SHA1)
...@@ -171,37 +172,13 @@ ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_ ...@@ -171,37 +172,13 @@ ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_
#endif #endif
} }
ngx_int_t
ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_err_t *err, u_char *buf, ssize_t len)
{
ssize_t n = c->recv(c, buf, len);
if (n == -1) {
*err = ngx_socket_errno;
if (*err != NGX_EAGAIN) {
rev->eof = 1;
c->error = 1;
return NGX_ERROR;
}
} else if ((n == 0) || (n != len)) {
rev->eof = 1;
c->error = 1;
*err = 0;
return NGX_ERROR;
}
return NGX_OK;
}
void void
ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
{ {
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
u_char buf[8]; u_char buf[8];
ngx_err_t err; ngx_int_t rc;
ngx_event_t *rev; ngx_event_t *rev;
ngx_connection_t *c; ngx_connection_t *c;
ngx_http_push_stream_frame_t frame; ngx_http_push_stream_frame_t frame;
...@@ -213,25 +190,8 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -213,25 +190,8 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
c = r->connection; c = r->connection;
rev = c->read; rev = c->read;
#if (NGX_HAVE_KQUEUE)
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
if (!rev->pending_eof) {
return;
}
rev->eof = 1;
c->error = 1;
err = rev->kq_errno;
goto closed;
}
#endif
//reading frame header //reading frame header
if (ngx_http_push_stream_recv(c, rev, &err, buf, 2) == NGX_ERROR) { if ((rc = ngx_http_push_stream_recv(c, rev, buf, 2)) != NGX_OK) {
goto closed; goto closed;
} }
...@@ -245,14 +205,14 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -245,14 +205,14 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
frame.payload_len = buf[1] & 0x7f; frame.payload_len = buf[1] & 0x7f;
if (frame.payload_len == 126) { if (frame.payload_len == 126) {
if (ngx_http_push_stream_recv(c, rev, &err, buf, 2) == NGX_ERROR) { if ((rc = ngx_http_push_stream_recv(c, rev, buf, 2)) != NGX_OK) {
goto closed; goto closed;
} }
uint16_t len; uint16_t len;
ngx_memcpy(&len, buf, 2); ngx_memcpy(&len, buf, 2);
frame.payload_len = ntohs(len); frame.payload_len = ntohs(len);
} else if (frame.payload_len == 127) { } else if (frame.payload_len == 127) {
if (ngx_http_push_stream_recv(c, rev, &err, buf, 8) == NGX_ERROR) { if ((rc = ngx_http_push_stream_recv(c, rev, buf, 8)) != NGX_OK) {
goto closed; goto closed;
} }
uint64_t len; uint64_t len;
...@@ -260,7 +220,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -260,7 +220,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
frame.payload_len = ngx_http_push_stream_ntohll(len); frame.payload_len = ngx_http_push_stream_ntohll(len);
} }
if (frame.mask && (ngx_http_push_stream_recv(c, rev, &err, frame.mask_key, 4) == NGX_ERROR)) { if (frame.mask && ((rc = ngx_http_push_stream_recv(c, rev, frame.mask_key, 4)) != NGX_OK)) {
goto closed; goto closed;
} }
...@@ -273,7 +233,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -273,7 +233,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
} }
aux = ngx_http_push_stream_create_str(temp_pool, frame.payload_len); aux = ngx_http_push_stream_create_str(temp_pool, frame.payload_len);
if (ngx_http_push_stream_recv(c, rev, &err, aux->data, (ssize_t) frame.payload_len) == NGX_ERROR) { if ((rc = ngx_http_push_stream_recv(c, rev, aux->data, (ssize_t) frame.payload_len)) != NGX_OK) {
goto closed; goto closed;
} }
...@@ -305,14 +265,6 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -305,14 +265,6 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ngx_http_push_stream_send_response_finalize(r); ngx_http_push_stream_send_response_finalize(r);
} }
/* aio does not call this handler */
if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && rev->active) {
if (ngx_del_event(rev, NGX_READ_EVENT, 0) != NGX_OK) {
ngx_http_finalize_request(r, NGX_OK);
}
}
return; return;
closed: closed:
...@@ -320,11 +272,26 @@ closed: ...@@ -320,11 +272,26 @@ closed:
ngx_destroy_pool(temp_pool); ngx_destroy_pool(temp_pool);
} }
if (err) { if (rc == NGX_ERROR) {
rev->error = 1; ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "client closed prematurely connection");
ngx_http_finalize_request(r, NGX_OK);
} }
}
ngx_log_error(NGX_LOG_INFO, c->log, err, "client closed prematurely connection");
ngx_http_finalize_request(r, NGX_OK); ngx_int_t
ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, u_char *buf, ssize_t len)
{
ssize_t n = c->recv(c, buf, len);
if (n == len) {
return NGX_OK;
}
if (n == NGX_AGAIN) {
return NGX_AGAIN;
}
return NGX_ERROR;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment