Commit ea35acb7 authored by Wandenberg's avatar Wandenberg

start etag as 1 to avoid concurrency problems when subscriber connects on the...

start etag as 1 to avoid concurrency problems when subscriber connects on the same second of a message is published
parent 44856205
...@@ -55,7 +55,7 @@ describe "Subscriber Properties" do ...@@ -55,7 +55,7 @@ describe "Subscriber Properties" do
sub_1.callback do sub_1.callback do
sub_1.should be_http_status(200) sub_1.should be_http_status(200)
sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("") sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("")
sub_1.response_header['ETAG'].to_s.should eql("0") sub_1.response_header['ETAG'].to_s.should eql("1")
sub_1.response.should eql("#{body}") sub_1.response.should eql("#{body}")
EventMachine.stop EventMachine.stop
end end
......
...@@ -215,11 +215,11 @@ describe "Receive old messages" do ...@@ -215,11 +215,11 @@ describe "Receive old messages" do
publish_message(channel.to_s, headers, body_prefix + i.to_s) publish_message(channel.to_s, headers, body_prefix + i.to_s)
end end
sent_headers = headers.merge({'If-Modified-Since' => now.utc.strftime("%a, %d %b %Y %T %Z"), 'If-None-Match' => '5'}) sent_headers = headers.merge({'If-Modified-Since' => now.utc.strftime("%a, %d %b %Y %T %Z"), 'If-None-Match' => '6'})
get_content(nginx_address + '/sub/' + channel.to_s, 4, sent_headers) do |response, response_headers| get_content(nginx_address + '/sub/' + channel.to_s, 4, sent_headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode) if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("") response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should eql("9") response_headers['ETAG'].to_s.should eql("10")
end end
response.should eql("msg 6\r\nmsg 7\r\nmsg 8\r\nmsg 9\r\n") response.should eql("msg 6\r\nmsg 7\r\nmsg 8\r\nmsg 9\r\n")
...@@ -227,6 +227,26 @@ describe "Receive old messages" do ...@@ -227,6 +227,26 @@ describe "Receive old messages" do
end end
end end
it "should receive message published on same second a subscriber connect" do
channel = 'ch_test_receiving_messages_untie_by_etag'
body = 'msg 1'
nginx_run_server(config.merge(:message_template => '~text~')) do |conf|
now = Time.now
publish_message(channel.to_s, headers, body)
sent_headers = headers.merge({'If-Modified-Since' => now.utc.strftime("%a, %d %b %Y %T %Z"), 'If-None-Match' => '0'})
get_content(nginx_address + '/sub/' + channel.to_s, 1, sent_headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should eql("1")
end
response.should eql("msg 1\r\n")
end
end
end
it "should accept modified since and none match values not using headers" do it "should accept modified since and none match values not using headers" do
channel = 'ch_test_send_modified_since_and_none_match_values_not_using_headers' channel = 'ch_test_send_modified_since_and_none_match_values_not_using_headers'
body_prefix = 'msg ' body_prefix = 'msg '
...@@ -239,11 +259,11 @@ describe "Receive old messages" do ...@@ -239,11 +259,11 @@ describe "Receive old messages" do
publish_message(channel.to_s, headers, body_prefix + i.to_s) publish_message(channel.to_s, headers, body_prefix + i.to_s)
end end
params = "time=#{URI.encode(now.utc.strftime("%a, %d %b %Y %T %Z"))}&tag=5" params = "time=#{URI.encode(now.utc.strftime("%a, %d %b %Y %T %Z"))}&tag=6"
get_content(nginx_address + '/sub/' + channel.to_s + '?' + params, 4, headers) do |response, response_headers| get_content(nginx_address + '/sub/' + channel.to_s + '?' + params, 4, headers) do |response, response_headers|
if ["long-polling", "polling"].include?(conf.subscriber_mode) if ["long-polling", "polling"].include?(conf.subscriber_mode)
response_headers['LAST_MODIFIED'].to_s.should_not eql("") response_headers['LAST_MODIFIED'].to_s.should_not eql("")
response_headers['ETAG'].to_s.should eql("9") response_headers['ETAG'].to_s.should eql("10")
end end
response.should eql("msg 6\r\nmsg 7\r\nmsg 8\r\nmsg 9\r\n") response.should eql("msg 6\r\nmsg 7\r\nmsg 8\r\nmsg 9\r\n")
......
...@@ -228,7 +228,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -228,7 +228,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->id = id; msg->id = id;
msg->workers_ref_count = 0; msg->workers_ref_count = 0;
msg->time = (id == -1) ? 0 : ngx_time(); msg->time = (id == -1) ? 0 : ngx_time();
msg->tag = (msg->time == shm_data->last_message_time) ? (shm_data->last_message_tag + 1) : 0; msg->tag = (msg->time == shm_data->last_message_time) ? (shm_data->last_message_tag + 1) : 1;
if ((msg->raw.data = ngx_slab_alloc_locked(shpool, len + 1)) == NULL) { if ((msg->raw.data = ngx_slab_alloc_locked(shpool, len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg); ngx_http_push_stream_free_message_memory_locked(shpool, msg);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment