Commit cbfa6b69 authored by Wandenberg's avatar Wandenberg

make cleanup test more stable and faster using a socket to publish messages

parent c5abed57
...@@ -30,67 +30,53 @@ describe "Cleanup Memory" do ...@@ -30,67 +30,53 @@ describe "Cleanup Memory" do
# ensure channel will not be cleaned up # ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do publish_messages_until_fill_the_memory(channel, body) do |status, content|
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
start = Time.now
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_2.callback do
pub_2.should be_http_status(200).with_body
result = JSON.parse(pub_2.response)
stored_messages_setp_1 = result["stored_messages"].to_i
published_messages_setp_1 = result["published_messages"].to_i
messages_in_trash = result["messages_in_trash"].to_i
stored_messages_setp_1.should eql(conf.max_messages_stored_per_channel) start = Time.now
published_messages_setp_1.should be > (conf.max_messages_stored_per_channel) pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
stored_messages_setp_1.should_not eql(0) pub_2.callback do
published_messages_setp_1.should eql(stored_messages_setp_1 + messages_in_trash) pub_2.should be_http_status(200).with_body
result = JSON.parse(pub_2.response)
stored_messages_setp_1 = result["stored_messages"].to_i
published_messages_setp_1 = result["published_messages"].to_i
messages_in_trash = result["messages_in_trash"].to_i
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do stored_messages_setp_1.should eql(conf.max_messages_stored_per_channel)
execute_changes_on_environment(conf) do published_messages_setp_1.should be > (conf.max_messages_stored_per_channel)
# connect a subscriber on new worker stored_messages_setp_1.should_not eql(0)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers published_messages_setp_1.should eql(stored_messages_setp_1 + messages_in_trash)
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do
publish_message_inline_with_callbacks(channel, headers, body, { execute_changes_on_environment(conf) do
:error => Proc.new do |status2, content2| # connect a subscriber on new worker
fill_memory_timer.cancel sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
start = Time.now
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers publish_messages_until_fill_the_memory(channel, body) do |status2, content2|
pub_2.callback do start = Time.now
pub_2.should be_http_status(200).with_body pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i pub_2.callback do
fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2 pub_2.should be_http_status(200).with_body
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, { wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do
:error => Proc.new do |status3, content3| publish_messages_until_fill_the_memory(channel, body) do |status3, content3|
fill_memory_timer.cancel pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers pub_4.callback do
pub_4.callback do pub_4.should be_http_status(200).with_body
pub_4.should be_http_status(200).with_body result = JSON.parse(pub_4.response)
result = JSON.parse(pub_4.response) result["stored_messages"].to_i.should eql(stored_messages_setp_1)
result["stored_messages"].to_i.should eql(stored_messages_setp_1) (result["published_messages"].to_i - published_messages_setp_2).should eql(published_messages_setp_1)
(result["published_messages"].to_i - published_messages_setp_2).should eql(published_messages_setp_1)
EventMachine.stop
EventMachine.stop
end
end
})
end
end
end
end end
}) end
end end
end end
end end
end end
end end
}) end
end end
end end
end end
...@@ -137,8 +123,8 @@ describe "Cleanup Memory" do ...@@ -137,8 +123,8 @@ describe "Cleanup Memory" do
end end
end end
it "should cleanup message memory without max messages stored per channelXXX", :cleanup => true do it "should cleanup message memory without max messages stored per channel", :cleanup => true do
channel = 'ch_test_message_cleanup_without_max_messages_stored_per_chann' channel = 'ch_test_message_cleanup_without_max_messages_stored_per_channel'
body = 'message to create a channel' body = 'message to create a channel'
expected_time_for_clear = 25 expected_time_for_clear = 25
...@@ -151,69 +137,54 @@ describe "Cleanup Memory" do ...@@ -151,69 +137,54 @@ describe "Cleanup Memory" do
# ensure channel will not be cleaned up # ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do publish_messages_until_fill_the_memory(channel, body) do |status, content|
publish_message_inline_with_callbacks(channel, headers, body, { start = Time.now
:error => Proc.new do |status, content| pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
fill_memory_timer.cancel pub_2.callback do
start = Time.now pub_2.should be_http_status(200).with_body
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers result = JSON.parse(pub_2.response)
pub_2.callback do stored_messages_setp_1 = result["stored_messages"].to_i
pub_2.should be_http_status(200).with_body published_messages_setp_1 = result["published_messages"].to_i
result = JSON.parse(pub_2.response) fail("Limited the number of stored messages") if stored_messages_setp_1 <= 100
stored_messages_setp_1 = result["stored_messages"].to_i fail("Don't create any message") if stored_messages_setp_1 == 0
published_messages_setp_1 = result["published_messages"].to_i
fail("Limited the number of stored messages") if stored_messages_setp_1 <= 100
fail("Don't create any message") if stored_messages_setp_1 == 0
execute_changes_on_environment(conf) do execute_changes_on_environment(conf) do
# connect a subscriber on new worker # connect a subscriber on new worker
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do publish_messages_until_fill_the_memory(channel, body) do |status2, content2|
publish_message_inline_with_callbacks(channel, headers, body, { start = Time.now
:error => Proc.new do |status2, content2| pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers
fill_memory_timer.cancel pub_2.callback do
start = Time.now pub_2.should be_http_status(200).with_body
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
pub_2.callback do fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2
pub_2.should be_http_status(200).with_body
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do
fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2 publish_messages_until_fill_the_memory(channel, body) do |status3, content3|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do pub_4.callback do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do pub_4.should be_http_status(200).with_body
publish_message_inline_with_callbacks(channel, headers, body, { result = JSON.parse(pub_4.response)
:error => Proc.new do |status3, content3| result["stored_messages"].to_i.should eql(stored_messages_setp_1)
fill_memory_timer.cancel (result["published_messages"].to_i - published_messages_setp_2).should eql(published_messages_setp_1)
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers EventMachine.stop
pub_4.callback do
pub_4.should be_http_status(200).with_body
result = JSON.parse(pub_4.response)
result["stored_messages"].to_i.should eql(stored_messages_setp_1)
(result["published_messages"].to_i - published_messages_setp_2).should eql(published_messages_setp_1)
EventMachine.stop
end
end
})
end
end
end
end end
}) end
end end
end end
end end
end end
end end
}) end
end end
end end
end end
end end
it "should cleanup memory used for create channels", :cleanup => true do it "should cleanup memory used for create channels", :cleanup => true do
channel = 'ch_test_channel_cleanup_' channel = 'ch_test_channel_cleanup_%d'
body = 'message to create a channel' body = 'message to create a channel'
nginx_run_server(config.merge(:message_ttl => '2s'), :timeout => test_timeout) do |conf| nginx_run_server(config.merge(:message_ttl => '2s'), :timeout => test_timeout) do |conf|
...@@ -223,61 +194,40 @@ describe "Cleanup Memory" do ...@@ -223,61 +194,40 @@ describe "Cleanup Memory" do
expected_time_for_clear = 45 expected_time_for_clear = 45
EventMachine.run do EventMachine.run do
i = 0 publish_messages_until_fill_the_memory(channel, body) do |status, content|
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do start = Time.now
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, { pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
:error => Proc.new do |status, content| pub_2.callback do
fill_memory_timer.cancel pub_2.should be_http_status(200).with_body
start = Time.now channels_setp_1 = JSON.parse(pub_2.response)["channels"].to_i
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers fail("Don't create any channel") if channels_setp_1 == 0
pub_2.callback do
pub_2.should be_http_status(200).with_body
channels_setp_1 = JSON.parse(pub_2.response)["channels"].to_i
fail("Don't create any channel") if channels_setp_1 == 0
execute_changes_on_environment(conf) do execute_changes_on_environment(conf) do
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true, :check_channels => true}) do wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true, :check_channels => true}) do
j = 0 publish_messages_until_fill_the_memory(channel, body) do |status2, content2|
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do start = Time.now
publish_message_inline_with_callbacks(channel + j.to_s, headers, body, { pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
:error => Proc.new do |status2, content2| pub_2.callback do
fill_memory_timer.cancel pub_2.should be_http_status(200).with_body
start = Time.now fail("Don't create more channel") if published_messages_setp_1 == JSON.parse(pub_2.response)["published_messages"].to_i
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_2.callback do wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true, :check_channels => true}) do
pub_2.should be_http_status(200).with_body publish_messages_until_fill_the_memory(channel, body) do |status3, content3|
fail("Don't create more channel") if published_messages_setp_1 == JSON.parse(pub_2.response)["published_messages"].to_i pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_4.callback do
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true, :check_channels => true}) do pub_4.should be_http_status(200).with_body
i = 0 channels_setp_2 = JSON.parse(pub_4.response)["channels"].to_i
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, { channels_setp_2.should eql(channels_setp_1)
:error => Proc.new do |status3, content3| EventMachine.stop
fill_memory_timer.cancel
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_4.callback do
pub_4.should be_http_status(200).with_body
channels_setp_2 = JSON.parse(pub_4.response)["channels"].to_i
channels_setp_2.should eql(channels_setp_1)
EventMachine.stop
end
end
})
i += 1
end
end
end
end end
}) end
j += 1
end end
end end
end end
end end
end end
}) end
i += 1
end end
end end
end end
...@@ -296,65 +246,52 @@ describe "Cleanup Memory" do ...@@ -296,65 +246,52 @@ describe "Cleanup Memory" do
# ensure channel will not be cleaned up # ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do publish_messages_until_fill_the_memory(channel, body) do |status, content|
publish_message_inline_with_callbacks(channel, headers, body, { start = Time.now
:error => Proc.new do |status, content| pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers
fill_memory_timer.cancel pub_2.callback do
start = Time.now pub_2.should be_http_status(200).with_body
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers result = JSON.parse(pub_2.response)
pub_2.callback do published_messages_setp_1 = result["published_messages"].to_i
pub_2.should be_http_status(200).with_body
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
execute_changes_on_environment(conf) do execute_changes_on_environment(conf) do
# connect a subscriber on new worker # connect a subscriber on new worker
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, { publish_messages_until_fill_the_memory(channel, body) do |status2, content2|
:error => Proc.new do |status2, content2| start = Time.now
fill_memory_timer.cancel pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers
start = Time.now pub_2.callback do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers pub_2.should be_http_status(200).with_body
pub_2.callback do published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
pub_2.should be_http_status(200).with_body published_messages_setp_2.should_not eql(published_messages_setp_1)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
published_messages_setp_2.should_not eql(published_messages_setp_1) wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true}) do publish_messages_until_fill_the_memory(channel, body) do |status3, content3|
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers
publish_message_inline_with_callbacks(channel, headers, body, { pub_4.callback do
:error => Proc.new do |status3, content3| pub_4.should be_http_status(200).with_body
fill_memory_timer.cancel result = JSON.parse(pub_4.response)
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers (result["published_messages"].to_i - published_messages_setp_2).should eql(published_messages_setp_1)
pub_4.callback do EventMachine.stop
pub_4.should be_http_status(200).with_body
result = JSON.parse(pub_4.response)
(result["published_messages"].to_i - published_messages_setp_2).should eql(published_messages_setp_1)
EventMachine.stop
end
end
})
end
end
end
end end
}) end
end end
end end
end end
end end
end end
}) end
end end
end end
end end
end end
it "should cleanup memory used for publish messages with store 'off' and without subscriber", :cleanup => true do it "should cleanup memory used for publish messages with store 'off' and without subscriber", :cleanup => true do
channel = 'ch_test_message_cleanup_with_store_off_without_subscriber' channel = 'ch_test_message_cleanup_with_store_off_without_subscriber %d'
body = 'message to create a channel' body = 'message to create a channel'
expected_time_for_clear = 45 expected_time_for_clear = 45
...@@ -363,60 +300,41 @@ describe "Cleanup Memory" do ...@@ -363,60 +300,41 @@ describe "Cleanup Memory" do
published_messages_setp_2 = 0 published_messages_setp_2 = 0
EventMachine.run do EventMachine.run do
i = 0 publish_messages_until_fill_the_memory(channel, body) do |status, content|
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
start = Time.now
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_2.callback do
pub_2.should be_http_status(200).with_body
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
execute_changes_on_environment(conf) do start = Time.now
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true, :check_channels => true}) do pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
j = 0 pub_2.callback do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do pub_2.should be_http_status(200).with_body
publish_message_inline_with_callbacks(channel + j.to_s, headers, body, { result = JSON.parse(pub_2.response)
:error => Proc.new do |status2, content2| published_messages_setp_1 = result["published_messages"].to_i
fill_memory_timer.cancel
start = Time.now execute_changes_on_environment(conf) do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true, :check_channels => true}) do
pub_2.callback do publish_messages_until_fill_the_memory(channel, body) do |status2, content2|
pub_2.should be_http_status(200).with_body start = Time.now
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
fail("Don't create more channel") if published_messages_setp_1 == published_messages_setp_2 pub_2.callback do
pub_2.should be_http_status(200).with_body
wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true, :check_channels => true}) do published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do fail("Don't create more channel") if published_messages_setp_1 == published_messages_setp_2
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status3, content3| wait_until_trash_is_empty(start, expected_time_for_clear, {:check_stored_messages => true, :check_channels => true}) do
fill_memory_timer.cancel publish_messages_until_fill_the_memory(channel, body) do |status3, content3|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_4.callback do pub_4.callback do
pub_4.should be_http_status(200).with_body pub_4.should be_http_status(200).with_body
result = JSON.parse(pub_4.response) result = JSON.parse(pub_4.response)
(result["published_messages"].to_i - published_messages_setp_2).should eql(published_messages_setp_1) (result["published_messages"].to_i - published_messages_setp_2).should eql(published_messages_setp_1)
EventMachine.stop EventMachine.stop
end
end
})
i += 1
end
end
end
end end
}) end
j += 1
end end
end end
end end
end end
end end
}) end
i += 1
end end
end end
end end
...@@ -569,7 +487,8 @@ describe "Cleanup Memory" do ...@@ -569,7 +487,8 @@ describe "Cleanup Memory" do
:daemon => 'on', :daemon => 'on',
:shared_memory_size => "129k", :shared_memory_size => "129k",
:message_ttl => '10s', :message_ttl => '10s',
:max_messages_stored_per_channel => nil :max_messages_stored_per_channel => nil,
:keepalive_requests => 200
} }
end end
......
...@@ -61,14 +61,18 @@ def create_channel_by_subscribe(channel, headers, timeout=60, &block) ...@@ -61,14 +61,18 @@ def create_channel_by_subscribe(channel, headers, timeout=60, &block)
end end
end end
def publish_message_inline_with_callbacks(channel, headers, body, callbacks = {}) def publish_messages_until_fill_the_memory(channel, body, &block)
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body i = 0
pub.callback do resp_headers, resp_body = nil
if pub.response_header.status == 200 socket = open_socket(nginx_host, nginx_port)
callbacks[:success].call(pub.response_header.status, pub.response) unless callbacks[:success].nil? while (true) do
else socket.print("POST /pub?id=#{channel.to_s % (i)} HTTP/1.1\r\nHost: localhost\r\nContent-Length: #{body.size}\r\n\r\n#{body}")
callbacks[:error].call(pub.response_header.status, pub.response) unless callbacks[:error].nil? resp_headers, resp_body = read_response_on_socket(socket, {:wait_for => "}\r\n"})
end break unless resp_headers.match(/200 OK/)
i += 1
end end
pub socket.close
status = resp_headers.match(/HTTP[^ ]* ([^ ]*)/)[1]
block.call(status, resp_body) unless block.nil?
end end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment