Commit c5abed57 authored by Wandenberg's avatar Wandenberg

change the publish message action through a WebSocket connection to add the...

change the publish message action through a WebSocket connection to add the message to all subscribed channels
parent 67fc7354
h1(#changelog). Changelog h1(#changelog). Changelog
* Change the publish message action through a WebSocket connection to add the message to all subscribed channels
* Added support to get channels statistics, delete channels and publish message to some channels specifying their ids on push_stream_channels_path * Added support to get channels statistics, delete channels and publish message to some channels specifying their ids on push_stream_channels_path
* Avoid reapply formatter to header, message or footer template when inside an if on event source mode * Avoid reapply formatter to header, message or footer template when inside an if on event source mode
* Added support for OPTIONS method on publisher location * Added support for OPTIONS method on publisher location
......
...@@ -278,7 +278,7 @@ describe "Subscriber WebSocket" do ...@@ -278,7 +278,7 @@ describe "Subscriber WebSocket" do
end end
end end
it "should accept publish message on same stream" do it "should publish message to all subscribed channels using the same stream" do
configuration = config.merge({ configuration = config.merge({
:message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}', :message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}',
:extra_location => %q{ :extra_location => %q{
...@@ -297,10 +297,9 @@ describe "Subscriber WebSocket" do ...@@ -297,10 +297,9 @@ describe "Subscriber WebSocket" do
} }
}) })
channel = 'ch_test_publish_message_same_stream'
frame = "%c%c%c%c%c%c%c%c%c%c%c" % [0x81, 0x85, 0xBD, 0xD0, 0xE5, 0x2A, 0xD5, 0xB5, 0x89, 0x46, 0xD2] #send 'hello' text frame = "%c%c%c%c%c%c%c%c%c%c%c" % [0x81, 0x85, 0xBD, 0xD0, 0xE5, 0x2A, 0xD5, 0xB5, 0x89, 0x46, 0xD2] #send 'hello' text
request = "GET /ws/#{channel}.b1 HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n" request = "GET /ws/ch2/ch1 HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
nginx_run_server(configuration) do |conf| nginx_run_server(configuration) do |conf|
socket = open_socket(nginx_host, nginx_port) socket = open_socket(nginx_host, nginx_port)
...@@ -312,27 +311,42 @@ describe "Subscriber WebSocket" do ...@@ -312,27 +311,42 @@ describe "Subscriber WebSocket" do
body.should eql("\211\000") body.should eql("\211\000")
body, dummy = read_response_on_socket(socket) body, dummy = read_response_on_socket(socket)
body.should eql("\x81N{\"channel\":\"ch_test_publish_message_same_stream\", \"id\":\"1\", \"message\":\"hello\"}") body.should eql("\x81.{\"channel\":\"ch2\", \"id\":\"1\", \"message\":\"hello\"}\x81.{\"channel\":\"ch1\", \"id\":\"1\", \"message\":\"hello\"}")
EventMachine.run do EventMachine.run do
pub = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :timeout => 30 pub = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ALL').get :timeout => 30
pub.callback do pub.callback do
pub.should be_http_status(200).with_body pub.should be_http_status(200).with_body
response = JSON.parse(pub.response) response = JSON.parse(pub.response)
response["channel"].to_s.should eql(channel) response["channels"].to_s.should_not be_empty
response["published_messages"].to_i.should eql(1) response["channels"].to_i.should eql(2)
response["stored_messages"].to_i.should eql(1) response["infos"][0]["channel"].should eql("ch2")
response["subscribers"].to_i.should eql(1) response["infos"][0]["published_messages"].should eql("1")
response["infos"][0]["stored_messages"].should eql("1")
response["infos"][1]["channel"].should eql("ch1")
response["infos"][1]["published_messages"].should eql("1")
response["infos"][1]["stored_messages"].should eql("1")
EventMachine.stop
end
end
EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/ch1.b1').get :timeout => 30
sub.stream do |chunk|
line = JSON.parse(chunk.split("\r\n")[0])
line['channel'].should eql("ch1")
line['message'].should eql('hello')
line['id'].to_i.should eql(1)
EventMachine.stop EventMachine.stop
end end
end end
EventMachine.run do EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :timeout => 30 sub = EventMachine::HttpRequest.new(nginx_address + '/sub/ch2.b1').get :timeout => 30
sub.stream do |chunk| sub.stream do |chunk|
line = JSON.parse(chunk.split("\r\n")[0]) line = JSON.parse(chunk.split("\r\n")[0])
line['channel'].should eql(channel.to_s) line['channel'].should eql("ch2")
line['message'].should eql('hello') line['message'].should eql('hello')
line['id'].to_i.should eql(1) line['id'].to_i.should eql(1)
EventMachine.stop EventMachine.stop
......
...@@ -206,6 +206,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -206,6 +206,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
ngx_str_t *aux; ngx_str_t *aux;
uint64_t i; uint64_t i;
ngx_pool_t *temp_pool = NULL; ngx_pool_t *temp_pool = NULL;
ngx_queue_t *cur = NULL;
c = r->connection; c = r->connection;
rev = c->read; rev = c->read;
...@@ -283,14 +284,16 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) ...@@ -283,14 +284,16 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
} }
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module); ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscription_t *subscription = (ngx_http_push_stream_subscription_t *)ngx_queue_head(&ctx->subscriber->subscriptions_sentinel.queue); cur = &ctx->subscriber->subscriptions_sentinel.queue;
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, frame.payload, frame.payload_len, NULL, NULL, temp_pool) == NULL) { while ((cur = ngx_queue_next(cur)) != &ctx->subscriber->subscriptions_sentinel.queue) {
ngx_http_finalize_request(r, NGX_OK); ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, queue);
ngx_destroy_pool(temp_pool); if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, frame.payload, frame.payload_len, NULL, NULL, temp_pool) == NULL) {
return; ngx_http_finalize_request(r, NGX_OK);
} else { ngx_destroy_pool(temp_pool);
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1); return;
}
} }
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
} }
ngx_destroy_pool(temp_pool); ngx_destroy_pool(temp_pool);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment