Commit a3bf3446 authored by Wandenberg's avatar Wandenberg

normalize use of backtrack, last_event_id and if_modified_since/if_none_match...

normalize use of backtrack, last_event_id and if_modified_since/if_none_match values to get old messages on all subscriber modes
parent ff405e3c
h1(#changelog). Changelog h1(#changelog). Changelog
* Normalize use of backtrack, last_event_id and if_modified_since/if_none_match values to get old messages on all subscriber modes
* Added push_stream_last_event_id directive to make possible pass the Last-Event-Id value without set header
* Changed push_stream_store_messages directive to make possible set it inside an if block * Changed push_stream_store_messages directive to make possible set it inside an if block
* Renamed broadcast feature to wildcard, more adequate with the way it works * Renamed broadcast feature to wildcard, more adequate with the way it works
** push_stream_broadcast_channel_prefix -> push_stream_wildcard_channel_prefix ** push_stream_broadcast_channel_prefix -> push_stream_wildcard_channel_prefix
......
...@@ -128,6 +128,7 @@ h1(#directives). Directives ...@@ -128,6 +128,7 @@ h1(#directives). Directives
| "push_stream_longpolling_connection_ttl":push_stream_longpolling_connection_ttl |   - |   - |   x |   - |   - |   - | | "push_stream_longpolling_connection_ttl":push_stream_longpolling_connection_ttl |   - |   - |   x |   - |   - |   - |
| "push_stream_last_received_message_time":push_stream_last_received_message_time |   - |   - |   x |   - |   - |   - | | "push_stream_last_received_message_time":push_stream_last_received_message_time |   - |   - |   x |   - |   - |   - |
| "push_stream_last_received_message_tag":push_stream_last_received_message_tag |   - |   - |   x |   - |   - |   - | | "push_stream_last_received_message_tag":push_stream_last_received_message_tag |   - |   - |   x |   - |   - |   - |
| "push_stream_last_event_id":push_stream_last_event_id |   - |   - |   x |   - |   - |   - |
| "push_stream_user_agent":push_stream_user_agent |   - |   - |   x |   - |   - |   - | | "push_stream_user_agent":push_stream_user_agent |   - |   - |   x |   - |   - |   - |
| "push_stream_padding_by_user_agent":push_stream_padding_by_user_agent |   - |   - |   x |   - |   - |   - | | "push_stream_padding_by_user_agent":push_stream_padding_by_user_agent |   - |   - |   x |   - |   - |   - |
| "push_stream_allowed_origins":push_stream_allowed_origins |   - |   - |   x |   - |   - |   - | | "push_stream_allowed_origins":push_stream_allowed_origins |   - |   - |   x |   - |   - |   - |
...@@ -231,6 +232,7 @@ h1(#contributors). Contributors ...@@ -231,6 +232,7 @@ h1(#contributors). Contributors
[push_stream_longpolling_connection_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_longpolling_connection_ttl [push_stream_longpolling_connection_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_longpolling_connection_ttl
[push_stream_last_received_message_time]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_received_message_time [push_stream_last_received_message_time]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_received_message_time
[push_stream_last_received_message_tag]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_received_message_tag [push_stream_last_received_message_tag]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_received_message_tag
[push_stream_last_event_id]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_last_event_id
[push_stream_user_agent]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_user_agent [push_stream_user_agent]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_user_agent
[push_stream_padding_by_user_agent]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_padding_by_user_agent [push_stream_padding_by_user_agent]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_padding_by_user_agent
[push_stream_store_messages]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/publishers.textile#push_stream_store_messages [push_stream_store_messages]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/publishers.textile#push_stream_store_messages
......
...@@ -222,6 +222,19 @@ h2(#push_stream_last_received_message_tag). push_stream_last_received_message_ta ...@@ -222,6 +222,19 @@ h2(#push_stream_last_received_message_tag). push_stream_last_received_message_ta
Set the tag of the last message received to the server knows which messages has to be sent to subscriber. Is a replacement for If-None-Match header. Example, $arg_tag indicate that the value will be take from tag argument. Set the tag of the last message received to the server knows which messages has to be sent to subscriber. Is a replacement for If-None-Match header. Example, $arg_tag indicate that the value will be take from tag argument.
h2(#push_stream_last_event_id). push_stream_last_event_id <a name="push_stream_last_event_id" href="#">&nbsp;</a>
*syntax:* _push_stream_last_event_id string_
*default:* _none_
*context:* _location_
*release version:* _0.4.0_
Set the last event id of a message to the server knows which messages has to be sent to subscriber. Is a replacement for Last-Event-Id header. Example, $arg_last_event indicate that the value will be take from last_event argument.
h2(#push_stream_user_agent). push_stream_user_agent <a name="push_stream_user_agent" href="#">&nbsp;</a> h2(#push_stream_user_agent). push_stream_user_agent <a name="push_stream_user_agent" href="#">&nbsp;</a>
*syntax:* _push_stream_user_agent string_ *syntax:* _push_stream_user_agent string_
......
...@@ -83,6 +83,7 @@ typedef struct { ...@@ -83,6 +83,7 @@ typedef struct {
ngx_flag_t channel_info_on_publish; ngx_flag_t channel_info_on_publish;
ngx_http_complex_value_t *last_received_message_time; ngx_http_complex_value_t *last_received_message_time;
ngx_http_complex_value_t *last_received_message_tag; ngx_http_complex_value_t *last_received_message_tag;
ngx_http_complex_value_t *last_event_id;
ngx_http_complex_value_t *user_agent; ngx_http_complex_value_t *user_agent;
ngx_str_t padding_by_user_agent; ngx_str_t padding_by_user_agent;
ngx_http_push_stream_padding_t *paddings; ngx_http_push_stream_padding_t *paddings;
......
...@@ -227,6 +227,7 @@ ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL; ...@@ -227,6 +227,7 @@ ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool); ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r); static ngx_int_t ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r);
static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool); static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_get_last_received_message_values(ngx_http_request_t *r, time_t *if_modified_since, ngx_int_t *tag, ngx_str_t **last_event_id);
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value); static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name); static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message); static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
......
...@@ -40,6 +40,7 @@ module NginxConfiguration ...@@ -40,6 +40,7 @@ module NginxConfiguration
:ping_message_text => nil, :ping_message_text => nil,
:last_received_message_time => nil, :last_received_message_time => nil,
:last_received_message_tag => nil, :last_received_message_tag => nil,
:last_event_id => nil,
:user_agent => nil, :user_agent => nil,
:authorized_channels_only => 'off', :authorized_channels_only => 'off',
...@@ -136,6 +137,7 @@ http { ...@@ -136,6 +137,7 @@ http {
<%= write_directive("push_stream_last_received_message_time", last_received_message_time) %> <%= write_directive("push_stream_last_received_message_time", last_received_message_time) %>
<%= write_directive("push_stream_last_received_message_tag", last_received_message_tag) %> <%= write_directive("push_stream_last_received_message_tag", last_received_message_tag) %>
<%= write_directive("push_stream_last_event_id", last_event_id) %>
<%= write_directive("push_stream_channel_deleted_message_text", channel_deleted_message_text) %> <%= write_directive("push_stream_channel_deleted_message_text", channel_deleted_message_text) %>
......
...@@ -314,49 +314,4 @@ describe "Subscriber Event Source" do ...@@ -314,49 +314,4 @@ describe "Subscriber Event Source" do
end end
end end
end end
it "should get old messages by last event id" do
channel = 'ch_test_get_old_messages_by_last_event_id'
response = ''
nginx_run_server(config) do |conf|
EventMachine.run do
publish_message_inline(channel, headers.merge({'Event-Id' => 'event 1'}), 'msg 1')
publish_message_inline(channel, headers.merge({'Event-Id' => 'event 2'}), 'msg 2')
publish_message_inline(channel, headers, 'msg 3')
publish_message_inline(channel, headers.merge({'Event-Id' => 'event 3'}), 'msg 4')
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => {'Last-Event-Id' => 'event 2' }
sub.stream do |chunk|
response += chunk
if response.include?("msg 4")
response.should eql("data: msg 3\r\n\r\nid: event 3\r\ndata: msg 4\r\n\r\n")
EventMachine.stop
end
end
end
end
end
it "should get old messages by last event id without found an event" do
channel = 'ch_test_get_old_messages_by_last_event_id_without_found_event'
response = ''
nginx_run_server(config.merge(:ping_message_interval => '1s')) do |conf|
EventMachine.run do
publish_message_inline(channel, headers.merge({'Event-Id' => 'event 1'}), 'msg 1')
publish_message_inline(channel, headers.merge({'Event-Id' => 'event 2'}), 'msg 2')
publish_message_inline(channel, headers, 'msg 3')
publish_message_inline(channel, headers.merge({'Event-Id' => 'event 3'}), 'msg 4')
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => {'Last-Event-Id' => 'event_not_found' }
sub.stream do |chunk|
if chunk.include?("-1")
chunk.should eql(": -1\r\n")
EventMachine.stop
end
end
end
end
end
end end
...@@ -37,42 +37,7 @@ describe "Subscriber Properties" do ...@@ -37,42 +37,7 @@ describe "Subscriber Properties" do
end end
end end
it "should disconnect after receive old messages by backtrack" do it "should disconnect after receive old messages" do
channel = 'ch_test_disconnect_after_receive_old_messages_by_backtrack_when_longpolling_is_on'
response = ""
nginx_run_server(config) do |conf|
EventMachine.run do
publish_message_inline(channel, {}, 'msg 1')
publish_message_inline(channel, {}, 'msg 2')
publish_message_inline(channel, {}, 'msg 3')
publish_message_inline(channel, {}, 'msg 4')
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b2').get :head => headers
sub.stream do |chunk|
response += chunk
end
sub.callback do |chunk|
response.should eql("msg 3\r\nmsg 4\r\n")
response = ''
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge({'If-Modified-Since' => sub.response_header['LAST_MODIFIED'], 'If-None-Match' => sub.response_header['ETAG']})
sub_1.stream do |chunk2|
response += chunk2
end
sub_1.callback do
response.should eql("msg 5\r\n")
EventMachine.stop
end
publish_message_inline(channel, {}, 'msg 5')
end
end
end
end
it "should disconnect after receive old messages by 'last_event_id'" do
channel = 'ch_test_disconnect_after_receive_old_messages_by_last_event_id_when_longpolling_is_on' channel = 'ch_test_disconnect_after_receive_old_messages_by_last_event_id_when_longpolling_is_on'
response = "" response = ""
...@@ -95,54 +60,6 @@ describe "Subscriber Properties" do ...@@ -95,54 +60,6 @@ describe "Subscriber Properties" do
end end
end end
it "should disconnect after receive old messages from different channels" do
channel_1 = 'ch_test_receive_old_messages_from_different_channels_1'
channel_2 = 'ch_test_receive_old_messages_from_different_channels_2'
body = 'body'
response = ""
nginx_run_server(config) do |conf|
EventMachine.run do
publish_message_inline(channel_1, {}, body + "_1")
publish_message_inline(channel_2, {}, body + "_2")
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s + '/' + channel_1.to_s).get :head => headers
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("")
sub_1.response_header['ETAG'].to_s.should_not eql("")
sub_1.response.should eql("#{body}_2\r\n#{body}_1\r\n")
sent_headers = headers.merge({'If-Modified-Since' => sub_1.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_1.response_header['ETAG']})
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s + '/' + channel_1.to_s).get :head => sent_headers
sub_2.callback do
sub_2.should be_http_status(200)
sub_2.response_header['LAST_MODIFIED'].to_s.should_not eql(sub_1.response_header['LAST_MODIFIED'])
sub_2.response_header['ETAG'].to_s.should eql("0")
sub_2.response.should eql("#{body}1_1\r\n")
sent_headers = headers.merge({'If-Modified-Since' => sub_2.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_2.response_header['ETAG']})
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s + '/' + channel_1.to_s).get :head => sent_headers
sub_3.callback do
sub_3.should be_http_status(200)
sub_3.response_header['LAST_MODIFIED'].to_s.should_not eql(sub_2.response_header['LAST_MODIFIED'])
sub_3.response_header['ETAG'].to_s.should eql("0")
sub_3.response.should eql("#{body}1_2\r\n")
EventMachine.stop
end
sleep(1) # to publish the second message in a different second from the first
publish_message_inline(channel_2, {}, body + "1_2")
end
sleep(1) # to publish the second message in a different second from the first
publish_message_inline(channel_1, {}, body + "1_1")
end
end
end
end
it "should disconnect after timeout is reached" do it "should disconnect after timeout is reached" do
channel = 'ch_test_disconnect_long_polling_subscriber_when_longpolling_timeout_is_set' channel = 'ch_test_disconnect_long_polling_subscriber_when_longpolling_timeout_is_set'
...@@ -217,49 +134,6 @@ describe "Subscriber Properties" do ...@@ -217,49 +134,6 @@ describe "Subscriber Properties" do
end end
end end
it "should receive messages with etag greather than recent message" do
channel = 'ch_test_receiving_messages_with_etag_greather_than_recent_message'
body_prefix = 'published message '
messagens_to_publish = 10
nginx_run_server(config.merge(:store_messages => "on", :message_template => '{\"id\":\"~id~\", \"message\":\"~text~\"}')) do |conf|
EventMachine.run do
i = 0
stored_messages = 0
EM.add_periodic_timer(0.001) do
if i < messagens_to_publish
i += 1
publish_message_inline(channel.to_s, headers, body_prefix + i.to_s)
else
end
end
EM.add_timer(1) do
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body_prefix + i.to_s
pub.callback do
response = JSON.parse(pub.response)
stored_messages = response["stored_messages"].to_i
end
end
EM.add_timer(2) do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge({'If-Modified-Since' => 'Thu, 1 Jan 1970 00:00:00 GMT', 'If-None-Match' => 0})
sub.callback do
sub.should be_http_status(200)
stored_messages.should eql(messagens_to_publish + 1)
messages = sub.response.split("\r\n")
messages.count.should eql(messagens_to_publish + 1)
messages.each_with_index do |content, index|
message = JSON.parse(content)
message["id"].to_i.should eql(index + 1)
end
EventMachine.stop
end
end
end
end
end
it "should receive messages when connected in more than one channel" do it "should receive messages when connected in more than one channel" do
channel_1 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_1' channel_1 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_1'
channel_2 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_2' channel_2 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_2'
...@@ -315,41 +189,6 @@ describe "Subscriber Properties" do ...@@ -315,41 +189,6 @@ describe "Subscriber Properties" do
end end
end end
it "should accept send modified since and none match values without using header" do
channel = 'ch_test_send_modified_since_and_none_match_values_not_using_headers'
body = 'body'
response = ""
nginx_run_server(config.merge(:last_received_message_time => "$arg_time", :last_received_message_tag => "$arg_tag")) do |conf|
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.stream do |chunk|
response += chunk
end
sub_1.callback do |chunk|
response.should eql("#{body}\r\n")
time = sub_1.response_header['LAST_MODIFIED']
tag = sub_1.response_header['ETAG']
response = ""
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?time=' + time + '&tag=' + tag).get :head => headers
sub_2.stream do |chunk2|
response += chunk2
end
sub_2.callback do
response.should eql("#{body} 1\r\n")
EventMachine.stop
end
publish_message_inline(channel, {}, body + " 1")
end
publish_message_inline(channel, {}, body)
end
end
end
it "should accept a callback parameter to be used with JSONP" do it "should accept a callback parameter to be used with JSONP" do
channel = 'ch_test_return_message_using_function_name_specified_in_callback_parameter' channel = 'ch_test_return_message_using_function_name_specified_in_callback_parameter'
body = 'body' body = 'body'
...@@ -423,7 +262,7 @@ describe "Subscriber Properties" do ...@@ -423,7 +262,7 @@ describe "Subscriber Properties" do
end end
end end
it "should accpet return content gzipped" do it "should accept return content gzipped" do
channel = 'ch_test_get_content_gzipped' channel = 'ch_test_get_content_gzipped'
body = 'body' body = 'body'
actual_response = '' actual_response = ''
......
...@@ -51,7 +51,7 @@ describe "Subscriber Properties" do ...@@ -51,7 +51,7 @@ describe "Subscriber Properties" do
EventMachine.run do EventMachine.run do
publish_message_inline(channel, {}, body) publish_message_inline(channel, {}, body)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge({'If-Modified-Since' => Time.at(0).utc.strftime("%a, %d %b %Y %T %Z")})
sub_1.callback do sub_1.callback do
sub_1.should be_http_status(200) sub_1.should be_http_status(200)
sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("") sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("")
...@@ -63,225 +63,6 @@ describe "Subscriber Properties" do ...@@ -63,225 +63,6 @@ describe "Subscriber Properties" do
end end
end end
it "should receive old messages by if_modified_since header" do
channel = 'ch_test_getting_messages_by_if_modified_since_header'
body = 'body'
nginx_run_server(config) do |conf|
EventMachine.run do
publish_message_inline(channel, {}, body)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("")
sub_1.response_header['ETAG'].to_s.should_not eql("")
sub_1.response.should eql("#{body}\r\n")
sent_headers = headers.merge({'If-Modified-Since' => sub_1.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_1.response_header['ETAG']})
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers
sub_2.callback do
sub_2.should be_http_status(304).without_body
sub_2.response_header['LAST_MODIFIED'].to_s.should eql(sub_1.response_header['LAST_MODIFIED'])
sub_2.response_header['ETAG'].to_s.should eql(sub_1.response_header['ETAG'])
sleep(1) # to publish the second message in a different second from the first
publish_message_inline(channel, {}, body + "1")
sent_headers = headers.merge({'If-Modified-Since' => sub_2.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_2.response_header['ETAG']})
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers
sub_3.callback do
sub_3.should be_http_status(200)
sub_3.response_header['LAST_MODIFIED'].to_s.should_not eql(sub_2.response_header['LAST_MODIFIED'])
sub_3.response_header['ETAG'].to_s.should eql("0")
sub_3.response.should eql("#{body}1\r\n")
EventMachine.stop
end
end
end
end
end
end
it "should receive old messages by backtrack" do
channel = 'ch_test_getting_messages_by_backtrack'
body = 'body'
nginx_run_server(config) do |conf|
EventMachine.run do
publish_message_inline(channel, {}, body)
publish_message_inline(channel, {}, body + "1")
publish_message_inline(channel, {}, body + "2")
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :head => headers
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("")
sub_1.response_header['ETAG'].to_s.should eql("2")
sub_1.response.should eql("#{body}2\r\n")
sent_headers = headers.merge({'If-Modified-Since' => sub_1.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_1.response_header['ETAG']})
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers
sub_2.callback do
sub_2.should be_http_status(304).without_body
sub_2.response_header['LAST_MODIFIED'].to_s.should eql(sub_1.response_header['LAST_MODIFIED'])
sub_2.response_header['ETAG'].to_s.should eql(sub_1.response_header['ETAG'])
sleep(1) # to publish the second message in a different second from the first
publish_message_inline(channel, {}, body + "3")
sent_headers = headers.merge({'If-Modified-Since' => sub_2.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_2.response_header['ETAG']})
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers
sub_3.callback do
sub_3.should be_http_status(200)
sub_3.response_header['LAST_MODIFIED'].to_s.should_not eql(sub_2.response_header['LAST_MODIFIED'])
sub_3.response_header['ETAG'].to_s.should eql("0")
sub_3.response.should eql("#{body}3\r\n")
EventMachine.stop
end
end
end
end
end
end
it "should receive old messages by last_event_id header" do
channel = 'ch_test_getting_messages_by_last_event_id_header'
body = 'body'
nginx_run_server(config) do |conf|
EventMachine.run do
publish_message_inline(channel, {'Event-Id' => 'event 1'}, 'msg 1')
publish_message_inline(channel, {'Event-Id' => 'event 2'}, 'msg 2')
publish_message_inline(channel, {}, 'msg 3')
publish_message_inline(channel, {'Event-Id' => 'event 3'}, 'msg 4')
sent_headers = headers.merge({'Last-Event-Id' => 'event 2'})
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("")
sub_1.response_header['ETAG'].to_s.should eql("3")
sub_1.response.should eql("msg 3\r\nmsg 4\r\n")
sent_headers = headers.merge({'If-Modified-Since' => sub_1.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_1.response_header['ETAG']})
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers
sub_2.callback do
sub_2.should be_http_status(304).without_body
sub_2.response_header['LAST_MODIFIED'].to_s.should eql(sub_1.response_header['LAST_MODIFIED'])
sub_2.response_header['ETAG'].to_s.should eql(sub_1.response_header['ETAG'])
sleep(1) # to publish the second message in a different second from the first
publish_message_inline(channel, {}, body + "3")
sent_headers = headers.merge({'If-Modified-Since' => sub_2.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_2.response_header['ETAG']})
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers
sub_3.callback do
sub_3.should be_http_status(200)
sub_3.response_header['LAST_MODIFIED'].to_s.should_not eql(sub_2.response_header['LAST_MODIFIED'])
sub_3.response_header['ETAG'].to_s.should eql("0")
sub_3.response.should eql("#{body}3\r\n")
EventMachine.stop
end
end
end
end
end
end
it "should receive old messages from different channels" do
channel_1 = 'ch_test_receive_old_messages_from_different_channels_1'
channel_2 = 'ch_test_receive_old_messages_from_different_channels_2'
body = 'body'
nginx_run_server(config) do |conf|
EventMachine.run do
publish_message_inline(channel_1, {}, body + "_1")
publish_message_inline(channel_2, {}, body + "_2")
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s + '/' + channel_1.to_s).get :head => headers
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("")
sub_1.response_header['ETAG'].to_s.should_not eql("")
sub_1.response.should eql("#{body}_2\r\n#{body}_1\r\n")
sent_headers = headers.merge({'If-Modified-Since' => sub_1.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_1.response_header['ETAG']})
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s + '/' + channel_1.to_s).get :head => sent_headers
sub_2.callback do
sub_2.should be_http_status(304).without_body
sub_2.response_header['LAST_MODIFIED'].to_s.should eql(sub_1.response_header['LAST_MODIFIED'])
sub_2.response_header['ETAG'].to_s.should eql(sub_1.response_header['ETAG'])
sleep(1) # to publish the second message in a different second from the first
publish_message_inline(channel_1, {}, body + "1_1")
sent_headers = headers.merge({'If-Modified-Since' => sub_2.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_2.response_header['ETAG']})
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s + '/' + channel_1.to_s).get :head => sent_headers
sub_3.callback do
sub_3.should be_http_status(200)
sub_3.response_header['LAST_MODIFIED'].to_s.should_not eql(sub_2.response_header['LAST_MODIFIED'])
sub_3.response_header['ETAG'].to_s.should eql("0")
sub_3.response.should eql("#{body}1_1\r\n")
sent_headers = headers.merge({'If-Modified-Since' => sub_3.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_3.response_header['ETAG']})
sub_4 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s + '/' + channel_1.to_s).get :head => sent_headers
sub_4.callback do
sub_4.should be_http_status(304).without_body
sub_4.response_header['LAST_MODIFIED'].to_s.should eql(sub_3.response_header['LAST_MODIFIED'])
sub_4.response_header['ETAG'].to_s.should eql(sub_3.response_header['ETAG'])
sleep(1) # to publish the second message in a different second from the first
publish_message_inline(channel_2, {}, body + "1_2")
sent_headers = headers.merge({'If-Modified-Since' => sub_4.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_4.response_header['ETAG']})
sub_5 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2.to_s + '/' + channel_1.to_s).get :head => sent_headers
sub_5.callback do
sub_5.should be_http_status(200)
sub_5.response_header['LAST_MODIFIED'].to_s.should_not eql(sub_4.response_header['LAST_MODIFIED'])
sub_5.response_header['ETAG'].to_s.should eql("0")
sub_5.response.should eql("#{body}1_2\r\n")
EventMachine.stop
end
end
end
end
end
end
end
end
it "should accept modified since and none match values not using headers when polling" do
channel = 'ch_test_send_modified_since_and_none_match_values_not_using_headers_when_polling'
body = 'body'
nginx_run_server(config.merge(:last_received_message_time => "$arg_time", :last_received_message_tag => "$arg_tag")) do |conf|
EventMachine.run do
publish_message_inline(channel, {}, body)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.callback do
sub_1.response.should eql("#{body}\r\n")
time = sub_1.response_header['LAST_MODIFIED']
tag = sub_1.response_header['ETAG']
publish_message_inline(channel, {}, body + " 1")
response = ""
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?time=' + time + '&tag=' + tag).get :head => headers
sub_2.callback do
sub_2.response.should eql("#{body} 1\r\n")
EventMachine.stop
end
end
end
end
end
it "should accept a callback parameter to works with JSONP" do it "should accept a callback parameter to works with JSONP" do
channel = 'ch_test_return_message_using_function_name_specified_in_callback_parameter_when_polling' channel = 'ch_test_return_message_using_function_name_specified_in_callback_parameter_when_polling'
body = 'body' body = 'body'
...@@ -291,7 +72,7 @@ describe "Subscriber Properties" do ...@@ -291,7 +72,7 @@ describe "Subscriber Properties" do
nginx_run_server(config) do |conf| nginx_run_server(config) do |conf|
EventMachine.run do EventMachine.run do
publish_message_inline(channel, {}, body) publish_message_inline(channel, {}, body)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?callback=' + callback_function_name).get :head => headers sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?callback=' + callback_function_name).get :head => headers.merge({'If-Modified-Since' => Time.at(0).utc.strftime("%a, %d %b %Y %T %Z")})
sub_1.callback do sub_1.callback do
sub_1.response.should eql("#{callback_function_name}\r\n([#{body}\r\n,]);\r\n") sub_1.response.should eql("#{callback_function_name}\r\n([#{body}\r\n,]);\r\n")
EventMachine.stop EventMachine.stop
...@@ -339,7 +120,7 @@ describe "Subscriber Properties" do ...@@ -339,7 +120,7 @@ describe "Subscriber Properties" do
end end
end end
it "should accpet return content gzipped" do it "should accept return content gzipped" do
channel = 'ch_test_get_content_gzipped' channel = 'ch_test_get_content_gzipped'
body = 'body' body = 'body'
actual_response = '' actual_response = ''
...@@ -348,7 +129,7 @@ describe "Subscriber Properties" do ...@@ -348,7 +129,7 @@ describe "Subscriber Properties" do
EventMachine.run do EventMachine.run do
publish_message_inline(channel, {}, body) publish_message_inline(channel, {}, body)
sent_headers = headers.merge({'accept-encoding' => 'gzip, compressed'}) sent_headers = headers.merge({'accept-encoding' => 'gzip, compressed', 'If-Modified-Since' => Time.at(0).utc.strftime("%a, %d %b %Y %T %Z")})
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers, :decoding => false sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => sent_headers, :decoding => false
sub_1.stream do |chunk| sub_1.stream do |chunk|
actual_response << chunk actual_response << chunk
......
...@@ -311,62 +311,6 @@ describe "Subscriber Properties" do ...@@ -311,62 +311,6 @@ describe "Subscriber Properties" do
end end
end end
it "should receive old messages in multi channel subscriber" do
channel_1 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_1'
channel_2 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_2'
channel_3 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_3'
body = 'body'
response = ""
nginx_run_server(config.merge(:header_template => 'HEADER', :message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}')) do |conf|
#create channels with some messages
1.upto(3) do |i|
publish_message(channel_1, headers, body + i.to_s)
publish_message(channel_2, headers, body + i.to_s)
publish_message(channel_3, headers, body + i.to_s)
end
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '.b5' + '/' + channel_3.to_s + '.b2').get :head => headers
sub_1.stream do |chunk|
response += chunk
lines = response.split("\r\n")
if lines.length >= 6
lines[0].should eql('HEADER')
line = JSON.parse(lines[1])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body1')
line['id'].to_i.should eql(1)
line = JSON.parse(lines[2])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[3])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[4])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[5])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
EventMachine.stop
end
end
end
end
end
it "should receive new messages in a multi channel subscriber" do it "should receive new messages in a multi channel subscriber" do
channel_1 = 'test_retreive_new_messages_in_multichannel_subscribe_1' channel_1 = 'test_retreive_new_messages_in_multichannel_subscribe_1'
channel_2 = 'test_retreive_new_messages_in_multich_subscribe_2' channel_2 = 'test_retreive_new_messages_in_multich_subscribe_2'
...@@ -430,138 +374,6 @@ describe "Subscriber Properties" do ...@@ -430,138 +374,6 @@ describe "Subscriber Properties" do
end end
end end
it "should receive old messages in a multi channel subscriber using 'if_modified_since' header" do
channel_1 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_1'
channel_2 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_2'
channel_3 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_3'
body = 'body'
nginx_run_server(config.merge(:header_template => 'HEADER', :message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}'), :timeout => 40) do |conf|
#create channels with some messages with progressive interval (2,4,6,10,14,18,24,30,36 seconds)
1.upto(3) do |i|
sleep(i * 2)
publish_message(channel_1, headers, body + i.to_s)
sleep(i * 2)
publish_message(channel_2, headers, body + i.to_s)
sleep(i * 2)
publish_message(channel_3, headers, body + i.to_s)
end
#get messages published less then 20 seconds ago
t = Time.now
t = t - 20
sent_headers = headers.merge({'If-Modified-Since' => t.utc.strftime("%a, %d %b %Y %T %Z")})
response = ""
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '/' + channel_3.to_s).get :head => sent_headers
sub_1.stream do |chunk|
response += chunk
lines = response.split("\r\n")
if lines.length >= 5
lines[0].should eql('HEADER')
line = JSON.parse(lines[1])
line['channel'].should eql(channel_1.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[2])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[3])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[4])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
EventMachine.stop
end
end
end
end
end
it "should receive old messages in a multi channel subscriber using 'if_modified_since' header and backtrack mixed" do
channel_1 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_1'
channel_2 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_2'
channel_3 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_3'
body = 'body'
nginx_run_server(config.merge(:header_template => 'HEADER', :message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}'), :timeout => 40) do |conf|
#create channels with some messages with progressive interval (2,4,6,10,14,18,24,30,36 seconds)
1.upto(3) do |i|
sleep(i * 2)
publish_message(channel_1, headers, body + i.to_s)
sleep(i * 2)
publish_message(channel_2, headers, body + i.to_s)
sleep(i * 2)
publish_message(channel_3, headers, body + i.to_s)
end
#get messages published less then 20 seconds ago
t = Time.now
t = t - 20
sent_headers = headers.merge({'If-Modified-Since' => t.utc.strftime("%a, %d %b %Y %T %Z")})
response = ""
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '.b5' + '/' + channel_3.to_s).get :head => sent_headers
sub_1.stream do |chunk|
response += chunk
lines = response.split("\r\n")
if lines.length >= 7
lines[0].should eql('HEADER')
line = JSON.parse(lines[1])
line['channel'].should eql(channel_1.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[2])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body1')
line['id'].to_i.should eql(1)
line = JSON.parse(lines[3])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[4])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[5])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[6])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
EventMachine.stop
end
end
end
end
end
it "should limit the number of channels" do it "should limit the number of channels" do
channel = 'ch_test_max_number_of_channels_' channel = 'ch_test_max_number_of_channels_'
...@@ -974,7 +786,7 @@ describe "Subscriber Properties" do ...@@ -974,7 +786,7 @@ describe "Subscriber Properties" do
end end
end end
it "should accpet return content gzipped" do it "should accept return content gzipped" do
channel = 'ch_test_get_content_gzipped' channel = 'ch_test_get_content_gzipped'
body = 'body' body = 'body'
actual_response = '' actual_response = ''
......
require 'spec_helper'
describe "Receive old messages" do
let(:config) do
{
:header_template => nil,
:footer_template => nil,
:message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}',
:subscriber_mode => subscriber_mode
}
end
shared_examples_for "can receive old messages" do
it "should receive old messages in a multi channel subscriber using backtrack" do
channel_1 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_1'
channel_2 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_2'
channel_3 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_3'
body = 'body'
nginx_run_server(config.merge(:header_template => 'HEADER')) do |conf|
#create channels with some messages
1.upto(3) do |i|
publish_message(channel_1, headers, body + i.to_s)
publish_message(channel_2, headers, body + i.to_s)
publish_message(channel_3, headers, body + i.to_s)
end
get_content(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '.b5' + '/' + channel_3.to_s + '.b2', 6, headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should_not eql("")
end
lines = response.split("\r\n")
lines[0].should eql('HEADER')
line = JSON.parse(lines[1])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body1')
line['id'].to_i.should eql(1)
line = JSON.parse(lines[2])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[3])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[4])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[5])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
end
end
end
it "should receive old messages in a multi channel subscriber using 'if_modified_since' header" do
channel_1 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_1'
channel_2 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_2'
channel_3 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_3'
body = 'body'
nginx_run_server(config.merge(:header_template => 'HEADER'), :timeout => 45) do |conf|
#create channels with some messages with progressive interval (1,2,3,5,7,9,12,15,18 seconds)
1.upto(3) do |i|
sleep(i)
publish_message(channel_1, headers, body + i.to_s)
sleep(i)
publish_message(channel_2, headers, body + i.to_s)
sleep(i)
publish_message(channel_3, headers, body + i.to_s)
end
#get messages published less then 10 seconds ago
t = Time.now - 10
sent_headers = headers.merge({'If-Modified-Since' => t.utc.strftime("%a, %d %b %Y %T %Z")})
get_content(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '/' + channel_3.to_s, 5, sent_headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should_not eql("")
end
lines = response.split("\r\n")
lines[0].should eql('HEADER')
line = JSON.parse(lines[1])
line['channel'].should eql(channel_1.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[2])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[3])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[4])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
end
end
end
it "should receive old messages in a multi channel subscriber using 'if_modified_since' header and backtrack mixed" do
channel_1 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_1'
channel_2 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_2'
channel_3 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_3'
body = 'body'
nginx_run_server(config.merge(:header_template => 'HEADER'), :timeout => 45) do |conf|
#create channels with some messages with progressive interval (1,2,3,5,7,9,12,15,18 seconds)
1.upto(3) do |i|
sleep(i)
publish_message(channel_1, headers, body + i.to_s)
sleep(i)
publish_message(channel_2, headers, body + i.to_s)
sleep(i)
publish_message(channel_3, headers, body + i.to_s)
end
#get messages published less then 10 seconds ago
t = Time.now - 10
sent_headers = headers.merge({'If-Modified-Since' => t.utc.strftime("%a, %d %b %Y %T %Z")})
get_content(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '.b5' + '/' + channel_3.to_s, 7, sent_headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should_not eql("")
end
lines = response.split("\r\n")
lines[0].should eql('HEADER')
line = JSON.parse(lines[1])
line['channel'].should eql(channel_1.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[2])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body1')
line['id'].to_i.should eql(1)
line = JSON.parse(lines[3])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[4])
line['channel'].should eql(channel_2.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
line = JSON.parse(lines[5])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body2')
line['id'].to_i.should eql(2)
line = JSON.parse(lines[6])
line['channel'].should eql(channel_3.to_s)
line['message'].should eql('body3')
line['id'].to_i.should eql(3)
end
end
end
it "should receive old messages by 'last_event_id'" do
channel = 'ch_test_disconnect_after_receive_old_messages_by_last_event_id_when_longpolling_is_on'
nginx_run_server(config.merge(:message_template => '~text~')) do |conf|
publish_message(channel, {'Event-Id' => 'event 1'}, 'msg 1')
publish_message(channel, {'Event-Id' => 'event 2'}, 'msg 2')
publish_message(channel, {}, 'msg 3')
publish_message(channel, {'Event-Id' => 'event 3'}, 'msg 4')
sent_headers = headers.merge({'Last-Event-Id' => 'event 2'})
get_content(nginx_address + '/sub/' + channel.to_s, 2, sent_headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should_not eql("")
end
response.should eql("msg 3\r\nmsg 4\r\n")
end
end
end
it "should receive old messages with equals 'if_modified_since' header untie them by the 'if_none_match' header" do
channel = 'ch_test_receiving_messages_untie_by_etag'
body_prefix = 'msg '
messages_to_publish = 10
now = nil
nginx_run_server(config.merge(:message_template => '~text~')) do |conf|
messages_to_publish.times do |i|
now = Time.now if i == 5
publish_message(channel.to_s, headers, body_prefix + i.to_s)
end
sent_headers = headers.merge({'If-Modified-Since' => now.utc.strftime("%a, %d %b %Y %T %Z"), 'If-None-Match' => '5'})
get_content(nginx_address + '/sub/' + channel.to_s, 4, sent_headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should eql("9")
end
response.should eql("msg 6\r\nmsg 7\r\nmsg 8\r\nmsg 9\r\n")
end
end
end
it "should accept modified since and none match values not using headers" do
channel = 'ch_test_send_modified_since_and_none_match_values_not_using_headers'
body_prefix = 'msg '
messages_to_publish = 10
now = nil
nginx_run_server(config.merge(:last_received_message_time => "$arg_time", :last_received_message_tag => "$arg_tag", :message_template => '~text~')) do |conf|
messages_to_publish.times do |i|
now = Time.now if i == 5
publish_message(channel.to_s, headers, body_prefix + i.to_s)
end
params = "time=#{URI.encode(now.utc.strftime("%a, %d %b %Y %T %Z"))}&tag=5"
get_content(nginx_address + '/sub/' + channel.to_s + '?' + params, 4, headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should eql("9")
end
response.should eql("msg 6\r\nmsg 7\r\nmsg 8\r\nmsg 9\r\n")
end
end
end
it "should accept event id value not using headers" do
channel = 'ch_test_send_event_id_value_not_using_headers'
body_prefix = 'msg '
messages_to_publish = 10
now = nil
nginx_run_server(config.merge(:last_event_id => "$arg_event_id", :message_template => '~text~')) do |conf|
publish_message(channel, {'Event-Id' => 'event 1'}, 'msg 1')
publish_message(channel, {'Event-Id' => 'event 2'}, 'msg 2')
publish_message(channel, {}, 'msg 3')
publish_message(channel, {'Event-Id' => 'event 3'}, 'msg 4')
params = "event_id=#{URI.escape("event 2")}"
get_content(nginx_address + '/sub/' + channel.to_s + '?' + params, 2, headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should_not eql("")
end
response.should eql("msg 3\r\nmsg 4\r\n")
end
end
end
end
def get_content(url, number_expected_lines, request_headers, &block)
response = ''
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(url).get :head => request_headers
sub_1.stream do |chunk|
response += chunk
lines = response.split("\r\n").map {|line| line.gsub(/^: /, "").gsub(/^data: /, "").gsub(/^id: .*/, "") }.delete_if{|line| line.empty?}.compact
if lines.length >= number_expected_lines
EventMachine.stop
block.call("#{lines.join("\r\n")}\r\n", sub_1.response_header) unless block.nil?
end
end
end
end
context "in stream mode" do
let(:subscriber_mode) { "streaming" }
it_should_behave_like "can receive old messages"
end
context "in pooling mode" do
let(:subscriber_mode) { "polling" }
it_should_behave_like "can receive old messages"
end
context "in long-pooling mode" do
let(:subscriber_mode) { "long-polling" }
it_should_behave_like "can receive old messages"
end
context "in event source mode" do
let(:subscriber_mode) { "eventsource" }
it_should_behave_like "can receive old messages"
end
context "in websocket mode" do
let(:subscriber_mode) { "websocket" }
def get_content(url, number_expected_lines, request_headers, &block)
uri = URI.parse url
request_headers = request_headers.empty? ? "" : "#{request_headers.each_key.map{|k| "#{k}: #{request_headers[k]}"}.join("\r\n")}\r\n"
request = "GET #{uri.request_uri} HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n#{request_headers}"
socket = open_socket(uri.host, uri.port)
socket.print("#{request}\r\n")
resp_headers, body = read_response_on_socket(socket, "\x89\x00")
resp_headers = resp_headers.split("\r\n").inject({}) do |hash_headers, header|
parts = header.split(":")
hash_headers[parts[0]] = parts[1] if parts.count == 2
hash_headers
end
lines = body.gsub(/[^\w{:,}" ]/, "\n").gsub("d{", "{").split("\n").delete_if{|line| line.empty?}.compact
lines.length.should be >= number_expected_lines
if lines.length >= number_expected_lines
block.call("#{lines.join("\r\n")}\r\n", resp_headers) unless block.nil?
end
end
it_should_behave_like "can receive old messages"
end
end
...@@ -206,6 +206,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -206,6 +206,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, last_received_message_tag), offsetof(ngx_http_push_stream_loc_conf_t, last_received_message_tag),
NULL }, NULL },
{ ngx_string("push_stream_last_event_id"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, last_event_id),
NULL },
{ ngx_string("push_stream_user_agent"), { ngx_string("push_stream_user_agent"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot, ngx_http_set_complex_value_slot,
...@@ -522,6 +528,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ...@@ -522,6 +528,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->channel_info_on_publish = NGX_CONF_UNSET_UINT; lcf->channel_info_on_publish = NGX_CONF_UNSET_UINT;
lcf->last_received_message_time = NULL; lcf->last_received_message_time = NULL;
lcf->last_received_message_tag = NULL; lcf->last_received_message_tag = NULL;
lcf->last_event_id = NULL;
lcf->user_agent = NULL; lcf->user_agent = NULL;
lcf->padding_by_user_agent.data = NULL; lcf->padding_by_user_agent.data = NULL;
lcf->paddings = NULL; lcf->paddings = NULL;
...@@ -567,6 +574,10 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ...@@ -567,6 +574,10 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->last_received_message_tag = prev->last_received_message_tag; conf->last_received_message_tag = prev->last_received_message_tag;
} }
if (conf->last_event_id == NULL) {
conf->last_event_id = prev->last_event_id;
}
if (conf->user_agent == NULL) { if (conf->user_agent == NULL) {
conf->user_agent = prev->user_agent; conf->user_agent = prev->user_agent;
} }
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include <ngx_http_push_stream_module_subscriber.h> #include <ngx_http_push_stream_module_subscriber.h>
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool); static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool);
static ngx_http_push_stream_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r); static ngx_http_push_stream_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber); static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id); static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
...@@ -33,7 +33,7 @@ static void ngx_http_push_stream_send_old_m ...@@ -33,7 +33,7 @@ static void ngx_http_push_stream_send_old_m
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log); static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log);
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber); static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log); static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool); static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool);
static ngx_http_push_stream_padding_t *ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r); static ngx_http_push_stream_padding_t *ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r);
static ngx_int_t static ngx_int_t
...@@ -44,8 +44,9 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -44,8 +44,9 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_http_push_stream_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur; ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_subscriber_ctx_t *ctx; ngx_http_push_stream_subscriber_ctx_t *ctx;
ngx_int_t tag;
time_t if_modified_since; time_t if_modified_since;
ngx_str_t *last_event_id, vv_time = ngx_null_string; ngx_str_t *last_event_id = NULL;
ngx_str_t *push_mode; ngx_str_t *push_mode;
ngx_flag_t polling, longpolling; ngx_flag_t polling, longpolling;
ngx_int_t rc; ngx_int_t rc;
...@@ -88,22 +89,15 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -88,22 +89,15 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message); return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message);
} }
if (cf->last_received_message_time != NULL) { // get control values
ngx_http_push_stream_complex_value(r, cf->last_received_message_time, &vv_time); ngx_http_push_stream_get_last_received_message_values(r, &if_modified_since, &tag, &last_event_id);
} else if (r->headers_in.if_modified_since != NULL) {
vv_time = r->headers_in.if_modified_since->value;
}
// get control headers
if_modified_since = vv_time.len ? ngx_http_parse_time(vv_time.data, vv_time.len) : -1;
last_event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_LAST_EVENT_ID);
push_mode = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_MODE); push_mode = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_MODE);
polling = ((cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_POLLING) || ((push_mode != NULL) && (push_mode->len == NGX_HTTP_PUSH_STREAM_MODE_POLLING.len) && (ngx_strncasecmp(push_mode->data, NGX_HTTP_PUSH_STREAM_MODE_POLLING.data, NGX_HTTP_PUSH_STREAM_MODE_POLLING.len) == 0))); polling = ((cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_POLLING) || ((push_mode != NULL) && (push_mode->len == NGX_HTTP_PUSH_STREAM_MODE_POLLING.len) && (ngx_strncasecmp(push_mode->data, NGX_HTTP_PUSH_STREAM_MODE_POLLING.data, NGX_HTTP_PUSH_STREAM_MODE_POLLING.len) == 0)));
longpolling = ((cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING) || ((push_mode != NULL) && (push_mode->len == NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) && (ngx_strncasecmp(push_mode->data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) == 0))); longpolling = ((cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING) || ((push_mode != NULL) && (push_mode->len == NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) && (ngx_strncasecmp(push_mode->data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) == 0)));
if (polling || longpolling) { if (polling || longpolling) {
ngx_int_t result = ngx_http_push_stream_subscriber_polling_handler(r, channels_ids, if_modified_since, last_event_id, longpolling, ctx->temp_pool); ngx_int_t result = ngx_http_push_stream_subscriber_polling_handler(r, channels_ids, if_modified_since, tag, last_event_id, longpolling, ctx->temp_pool);
if (ctx->temp_pool != NULL) { if (ctx->temp_pool != NULL) {
ngx_destroy_pool(ctx->temp_pool); ngx_destroy_pool(ctx->temp_pool);
ctx->temp_pool = NULL; ctx->temp_pool = NULL;
...@@ -134,10 +128,10 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -134,10 +128,10 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
// adding subscriber to channel(s) and send backtrack messages // adding subscriber to channel(s) and send old messages
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) { if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
} }
...@@ -150,7 +144,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -150,7 +144,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool) ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool)
{ {
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr;
...@@ -159,20 +153,11 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -159,20 +153,11 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription; ngx_http_push_stream_subscription_t *subscription;
ngx_str_t *etag = NULL, vv_etag = ngx_null_string;
ngx_int_t tag;
time_t greater_message_time; time_t greater_message_time;
ngx_int_t greater_message_tag; ngx_int_t greater_message_tag;
ngx_flag_t has_message_to_send = 0; ngx_flag_t has_message_to_send = 0;
ngx_str_t callback_function_name; ngx_str_t callback_function_name;
if (cf->last_received_message_tag != NULL) {
ngx_http_push_stream_complex_value(r, cf->last_received_message_tag, &vv_etag);
etag = vv_etag.len ? &vv_etag : NULL;
} else {
etag = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH);
}
if (ngx_http_arg(r, NGX_HTTP_PUSH_STREAM_CALLBACK.data, NGX_HTTP_PUSH_STREAM_CALLBACK.len, &callback_function_name) == NGX_OK) { if (ngx_http_arg(r, NGX_HTTP_PUSH_STREAM_CALLBACK.data, NGX_HTTP_PUSH_STREAM_CALLBACK.len, &callback_function_name) == NGX_OK) {
ngx_http_push_stream_unescape_uri(&callback_function_name); ngx_http_push_stream_unescape_uri(&callback_function_name);
if ((ctx->callback = ngx_http_push_stream_get_formatted_chunk(callback_function_name.data, callback_function_name.len, r->pool)) == NULL) { if ((ctx->callback = ngx_http_push_stream_get_formatted_chunk(callback_function_name.data, callback_function_name.len, r->pool)) == NULL) {
...@@ -181,10 +166,8 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -181,10 +166,8 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
} }
} }
tag = ((etag != NULL) && ((tag = ngx_atoi(etag->data, etag->len)) != NGX_ERROR)) ? ngx_abs(tag) : -1;
greater_message_tag = tag; greater_message_tag = tag;
greater_message_time = if_modified_since = (if_modified_since < 0) ? 0 : if_modified_since; greater_message_time = (if_modified_since < 0) ? 0 : if_modified_since;
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
...@@ -250,7 +233,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -250,7 +233,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
// polling or long polling without messages to send // polling or long polling with messages to send
ngx_http_push_stream_add_polling_headers(r, greater_message_time, greater_message_tag, temp_pool); ngx_http_push_stream_add_polling_headers(r, greater_message_time, greater_message_tag, temp_pool);
...@@ -301,7 +284,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -301,7 +284,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool) ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool)
{ {
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription; ngx_http_push_stream_subscription_t *subscription;
...@@ -318,7 +301,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http ...@@ -318,7 +301,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
} }
// send old messages to new subscriber // send old messages to new subscriber
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, 0, 0, -1, last_event_id); ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, tag, 0, -1, last_event_id);
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, &subscriber->subscriptions_sentinel, r->connection->log); result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, &subscriber->subscriptions_sentinel, r->connection->log);
...@@ -575,7 +558,7 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch ...@@ -575,7 +558,7 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
continue; continue;
} }
if ((!found) && (last_event_id == NULL) && (if_modified_since >= 0) && ((message->time > if_modified_since) || ((message->time == if_modified_since) && (tag >= 0) && (message->tag >= tag)))) { if ((!found) && (if_modified_since >= 0) && ((message->time > if_modified_since) || ((message->time == if_modified_since) && (tag >= 0) && (message->tag >= tag)))) {
found = 1; found = 1;
if ((message->time == if_modified_since) && (message->tag == tag)) { if ((message->time == if_modified_since) && (message->tag == tag)) {
continue; continue;
...@@ -630,7 +613,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -630,7 +613,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
continue; continue;
} }
if ((!found) && (last_event_id == NULL) && (if_modified_since >= 0) && ((message->time > if_modified_since) || ((message->time == if_modified_since) && (tag >= 0) && (message->tag >= tag)))) { if ((!found) && (if_modified_since >= 0) && ((message->time > if_modified_since) || ((message->time == if_modified_since) && (tag >= 0) && (message->tag >= tag)))) {
found = 1; found = 1;
if ((message->time == if_modified_since) && (message->tag == tag)) { if ((message->time == if_modified_since) && (message->tag == tag)) {
continue; continue;
......
...@@ -1454,6 +1454,43 @@ ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modi ...@@ -1454,6 +1454,43 @@ ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modi
} }
} }
static void
ngx_http_push_stream_get_last_received_message_values(ngx_http_request_t *r, time_t *if_modified_since, ngx_int_t *tag, ngx_str_t **last_event_id)
{
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_str_t *etag = NULL, vv_etag = ngx_null_string;
ngx_str_t vv_event_id = ngx_null_string, vv_time = ngx_null_string;
if (cf->last_received_message_time != NULL) {
ngx_http_push_stream_complex_value(r, cf->last_received_message_time, &vv_time);
} else if (r->headers_in.if_modified_since != NULL) {
vv_time = r->headers_in.if_modified_since->value;
}
if (cf->last_received_message_tag != NULL) {
ngx_http_push_stream_complex_value(r, cf->last_received_message_tag, &vv_etag);
etag = vv_etag.len ? &vv_etag : NULL;
} else {
etag = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH);
}
if (cf->last_event_id != NULL) {
ngx_http_push_stream_complex_value(r, cf->last_event_id, &vv_event_id);
if (vv_event_id.len) {
*last_event_id = ngx_http_push_stream_create_str(ctx->temp_pool, vv_event_id.len);
ngx_memcpy(((ngx_str_t *)*last_event_id)->data, vv_event_id.data, vv_event_id.len);
}
} else {
*last_event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_LAST_EVENT_ID);
}
*if_modified_since = vv_time.len ? ngx_http_parse_time(vv_time.data, vv_time.len) : -1;
*tag = ((etag != NULL) && ((*tag = ngx_atoi(etag->data, etag->len)) != NGX_ERROR)) ? ngx_abs(*tag) : -1;
}
/** /**
* Copied from nginx code to only send headers added on this module code * Copied from nginx code to only send headers added on this module code
* */ * */
......
...@@ -40,6 +40,9 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -40,6 +40,9 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_http_push_stream_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur; ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_subscriber_ctx_t *ctx; ngx_http_push_stream_subscriber_ctx_t *ctx;
ngx_int_t tag;
time_t if_modified_since;
ngx_str_t *last_event_id = NULL;
ngx_int_t rc; ngx_int_t rc;
ngx_int_t status_code; ngx_int_t status_code;
ngx_str_t *explain_error_message; ngx_str_t *explain_error_message;
...@@ -88,6 +91,9 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -88,6 +91,9 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message); return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message);
} }
// get control values
ngx_http_push_stream_get_last_received_message_values(r, &if_modified_since, &tag, &last_event_id);
// stream access // stream access
if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) { if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
...@@ -123,7 +129,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -123,7 +129,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
// adding subscriber to channel(s) and send backtrack messages // adding subscriber to channel(s) and send backtrack messages
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, -1, NULL, worker_subscriber, ctx->temp_pool) != NGX_OK) { if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment