Commit 3209d995 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

fix memory leak which happens after a worker dies or the server receive the SIGHUP (reload) signal

parent 566fe038
...@@ -27,6 +27,6 @@ ...@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.4"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.4");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("ab5f17888393a57a3191f0f4ea631be717e6a30b"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("7e70cc1cd5f5070d37ca23d7961b4afafebe3668");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
...@@ -161,6 +161,24 @@ ngx_http_push_stream_ipc_init_worker() ...@@ -161,6 +161,24 @@ ngx_http_push_stream_ipc_init_worker()
} }
static ngx_int_t
ngx_http_push_stream_unsubscribe_worker_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *cur = sentinel;
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if ((cur->pid == ngx_pid) || (cur->slot == ngx_process_slot)) {
ngx_queue_remove(&cur->queue);
ngx_slab_free_locked(shpool, cur);
break;
}
}
return NGX_OK;
}
static void static void
ngx_http_push_stream_clean_worker_data() ngx_http_push_stream_clean_worker_data()
{ {
...@@ -180,6 +198,8 @@ ngx_http_push_stream_clean_worker_data() ...@@ -180,6 +198,8 @@ ngx_http_push_stream_clean_worker_data()
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue); ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_sentinel->queue);
} }
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_unsubscribe_worker_locked);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
data->ipc[ngx_process_slot].pid = -1; data->ipc[ngx_process_slot].pid = -1;
......
...@@ -4,6 +4,7 @@ require 'erb' ...@@ -4,6 +4,7 @@ require 'erb'
require 'fileutils' require 'fileutils'
require 'ruby-debug' require 'ruby-debug'
require 'test/unit' require 'test/unit'
require 'eventmachine'
require 'em-http' require 'em-http'
require 'json' require 'json'
require 'socket' require 'socket'
...@@ -175,6 +176,18 @@ module BaseTestCase ...@@ -175,6 +176,18 @@ module BaseTestCase
self.send(:global_configuration) if self.respond_to?(:global_configuration) self.send(:global_configuration) if self.respond_to?(:global_configuration)
end end
def publish_message_inline_with_callbacks(channel, headers, body, callbacks = {})
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 30
pub.callback do
if pub.response_header.status == 200
callbacks[:success].call(pub.response_header.status, pub.response) unless callbacks[:success].nil?
else
callbacks[:error].call(pub.response_header.status, pub.response) unless callbacks[:error].nil?
end
end
pub
end
def publish_message(channel, headers, body) def publish_message(channel, headers, body)
EventMachine.run { EventMachine.run {
pub = publish_message_inline(channel, headers, body) do pub = publish_message_inline(channel, headers, body) do
...@@ -183,6 +196,7 @@ module BaseTestCase ...@@ -183,6 +196,7 @@ module BaseTestCase
assert_equal(channel, response["channel"].to_s, "Channel was not recognized") assert_equal(channel, response["channel"].to_s, "Channel was not recognized")
EventMachine.stop EventMachine.stop
end end
add_test_timeout
} }
end end
......
require File.expand_path('base_test_case', File.dirname(__FILE__)) require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestCreateManyChannels < Test::Unit::TestCase class TestCleanupMemory < Test::Unit::TestCase
include BaseTestCase include BaseTestCase
def global_configuration
ENV['NGINX_WORKERS'] = '1'
@disable_ignore_childs = true
@master_process = 'on'
@daemon = 'on'
end
def config_test_message_cleanup def config_test_message_cleanup
@memory_cleanup_timeout = '30s' @memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k" @max_reserved_memory = "129k"
...@@ -15,56 +22,55 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -15,56 +22,55 @@ class TestCreateManyChannels < Test::Unit::TestCase
headers = {'accept' => 'text/html'} headers = {'accept' => 'text/html'}
body = 'message to create a channel' body = 'message to create a channel'
EventMachine.run { stored_messages_setp_1 = 0
# ensure space for a subscriber after memory was full published_messages_setp_1 = 0
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if pub_1.response_header.status == 500
}
end
}
EventMachine.run { EventMachine.run do
# ensure channel will not be cleaned up # ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60 sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
stored_messages_setp_1 = 0 fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
stored_messages_setp_2 = 0 publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60 pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback { pub_2.callback do
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics") fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
result = JSON.parse(pub_2.response) result = JSON.parse(pub_2.response)
stored_messages_setp_1 = result["stored_messages"].to_i stored_messages_setp_1 = result["stored_messages"].to_i
published_messages_setp_1 = result["published_messages"].to_i
assert_equal(@max_message_buffer_length, stored_messages_setp_1, "Don't limit stored messages") assert_equal(@max_message_buffer_length, stored_messages_setp_1, "Don't limit stored messages")
fail("Don't reached the limit of stored messages") if result["published_messages"].to_i <= @max_message_buffer_length fail("Don't reached the limit of stored messages") if result["published_messages"].to_i <= @max_message_buffer_length
fail("Don't create any message") if stored_messages_setp_1 == 0 fail("Don't create any message") if stored_messages_setp_1 == 0
end
end
})
end
i = 0 EM.add_timer(50) do
EM.add_periodic_timer(1) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60 pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback { pub_3.callback do
assert_equal(200, pub_3.response_header.status, "Don't get channels statistics") fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_not_equal(0, pub_3.response_header.content_length, "Don't received channels statistics") assert_equal(0, JSON.parse(pub_3.response)["stored_messages"].to_i, "Don't cleaned all messages")
stored_messages_setp_2 = JSON.parse(pub_3.response)["stored_messages"].to_i
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
if (stored_messages_setp_1 > stored_messages_setp_2) publish_message_inline_with_callbacks(channel, headers, body, {
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60 :error => Proc.new do |status, content|
pub_4.callback { pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
EventMachine.stop if (pub_4.response_header.status == 200) pub_4.callback do
} fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(stored_messages_setp_1, result["stored_messages"].to_i, "Don't cleaned all messages")
assert_equal(published_messages_setp_1, result["published_messages"].to_i / 2, "Don't cleaned all memory")
EventMachine.stop
end
end
})
end
end end
fail("Don't free the memory in 60 seconds") if (i == 60)
i += 1
}
end end
} add_test_timeout(60)
add_test_timeout(65) end
}
end end
def config_test_discard_old_messages def config_test_discard_old_messages
...@@ -83,37 +89,36 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -83,37 +89,36 @@ class TestCreateManyChannels < Test::Unit::TestCase
count = 0 count = 0
stored_messages_setp_1 = 0 stored_messages_setp_1 = 0
EventMachine.run { EventMachine.run do
EM.add_periodic_timer(messages_to_publish / 12.to_f) do # publish messages before cleanup timer be executed fill_memory_timer = EventMachine::PeriodicTimer.new(messages_to_publish / 12.to_f) do # publish messages before cleanup timer be executed
if (count < messages_to_publish) if (count < messages_to_publish)
publish_message_inline(channel, headers, body) publish_message_inline(channel, headers, body)
elsif (count == messages_to_publish) elsif (count == messages_to_publish)
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60 pub_1 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_1.callback { pub_1.callback do
assert_equal(200, pub_1.response_header.status, "Don't get channels statistics") fill_memory_timer.cancel
assert_not_equal(0, pub_1.response_header.content_length, "Don't received channels statistics") fail("Don't received the stats") if (pub_1.response_header.status != 200) || (pub_1.response_header.content_length == 0)
stored_messages_setp_1 = JSON.parse(pub_1.response)["stored_messages"].to_i stored_messages_setp_1 = JSON.parse(pub_1.response)["stored_messages"].to_i
assert_equal(messages_to_publish, stored_messages_setp_1, "Don't store messages") assert_equal(messages_to_publish, stored_messages_setp_1, "Don't store messages")
} end
end end
count += 1 count += 1
end end
EM.add_timer(15) do # wait cleanup timer to be executed one time EM.add_timer(15) do # wait cleanup timer to be executed one time
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60 pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback { pub_2.callback do
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics") fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
stored_messages_setp_2 = JSON.parse(pub_2.response)["stored_messages"].to_i stored_messages_setp_2 = JSON.parse(pub_2.response)["stored_messages"].to_i
assert(stored_messages_setp_1 > stored_messages_setp_2, "Don't clear messages") assert(stored_messages_setp_1 > stored_messages_setp_2, "Don't clear messages")
assert(stored_messages_setp_2 >= (messages_to_publish / 2), "Cleared all messages") assert(stored_messages_setp_2 >= (messages_to_publish / 2), "Cleared all messages")
EventMachine.stop EventMachine.stop
} end
end end
add_test_timeout(20) add_test_timeout(20)
} end
end end
def config_test_message_cleanup_without_max_messages_stored_per_channel def config_test_message_cleanup_without_max_messages_stored_per_channel
...@@ -124,65 +129,64 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -124,65 +129,64 @@ class TestCreateManyChannels < Test::Unit::TestCase
end end
def test_message_cleanup_without_max_messages_stored_per_channel def test_message_cleanup_without_max_messages_stored_per_channel
channel = 'ch_test_message_cleanup_without_max_messages_stored_per_channel' channel = 'ch_test_message_cleanup_without_max_messages_stored_per_chann'
headers = {'accept' => 'text/html'} headers = {'accept' => 'text/html'}
body = 'message to create a channel' body = 'message to create a channel'
EventMachine.run { stored_messages_setp_1 = 0
# ensure space for a subscriber after memory was full published_messages_setp_1 = 0
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if pub_1.response_header.status == 500
}
end
}
EventMachine.run { EventMachine.run do
# ensure channel will not be cleaned up # ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60 sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
stored_messages_setp_1 = 0 fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
stored_messages_setp_2 = 0 publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60 pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback { pub_2.callback do
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics") fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics") result = JSON.parse(pub_2.response)
stored_messages_setp_1 = JSON.parse(pub_2.response)["stored_messages"].to_i stored_messages_setp_1 = result["stored_messages"].to_i
published_messages_setp_1 = result["published_messages"].to_i
fail("Limited the number of stored messages") if stored_messages_setp_1 <= 100 fail("Limited the number of stored messages") if stored_messages_setp_1 <= 100
fail("Don't create any message") if stored_messages_setp_1 == 0 fail("Don't create any message") if stored_messages_setp_1 == 0
end
end
})
end
i = 0 EM.add_timer(45) do
EM.add_periodic_timer(1) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60 pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback { pub_3.callback do
assert_equal(200, pub_3.response_header.status, "Don't get channels statistics") fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_not_equal(0, pub_3.response_header.content_length, "Don't received channels statistics") assert_equal(0, JSON.parse(pub_3.response)["stored_messages"].to_i, "Don't cleaned all messages")
stored_messages_setp_2 = JSON.parse(pub_3.response)["stored_messages"].to_i
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
if (stored_messages_setp_1 > stored_messages_setp_2) publish_message_inline_with_callbacks(channel, headers, body, {
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60 :error => Proc.new do |status, content|
pub_4.callback { pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
EventMachine.stop if (pub_4.response_header.status == 200) pub_4.callback do
} fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(stored_messages_setp_1, result["stored_messages"].to_i, "Don't cleaned all messages")
assert_equal(published_messages_setp_1, result["published_messages"].to_i / 2, "Don't cleaned all memory")
EventMachine.stop
end end
fail("Don't free the memory in 60 seconds") if (i == 60)
i += 1
}
end end
} })
add_test_timeout(65) end
} end
end
add_test_timeout(50)
end
end end
def config_test_channel_cleanup def config_test_channel_cleanup
@memory_cleanup_timeout = '30s' @memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k" @max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s' @min_message_buffer_timeout = '2s'
@max_message_buffer_length = nil @max_message_buffer_length = nil
end end
...@@ -191,49 +195,58 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -191,49 +195,58 @@ class TestCreateManyChannels < Test::Unit::TestCase
headers = {'accept' => 'text/html'} headers = {'accept' => 'text/html'}
body = 'message to create a channel' body = 'message to create a channel'
EventMachine.run {
i = 0
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + i.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if pub_1.response_header.status == 500
i += 1
}
end
}
EventMachine.run {
channels_setp_1 = 0 channels_setp_1 = 0
channels_setp_2 = 0 channels_setp_2 = 0
EventMachine.run do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60 pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback { pub_2.callback do
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics") fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
channels_setp_1 = JSON.parse(pub_2.response)["channels"].to_i channels_setp_1 = JSON.parse(pub_2.response)["channels"].to_i
fail("Don't create any channel") if channels_setp_1 == 0 fail("Don't create any channel") if channels_setp_1 == 0
end
end
})
i += 1
end
i = 0 EM.add_timer(60) do
EM.add_periodic_timer(1) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60 pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback { pub_3.callback do
assert_equal(200, pub_3.response_header.status, "Don't get channels statistics") fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_not_equal(0, pub_3.response_header.content_length, "Don't received channels statistics") channels = JSON.parse(pub_3.response)["channels"].to_i
channels_setp_2 = JSON.parse(pub_3.response)["channels"].to_i
if (channels_setp_1 > channels_setp_2) assert_equal(0, channels, "Don't removed all channels")
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + (i + 1).to_s).post :head => headers, :body => body, :timeout => 60 EventMachine.stop unless (channels == 0)
pub_4.callback {
EventMachine.stop if (pub_4.response_header.status == 200)
}
end
fail("Don't free the memory in 60 seconds") if (i == 60) EM.add_timer(35) do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
channels_setp_2 = JSON.parse(pub_4.response)["channels"].to_i
assert_equal(channels_setp_1, channels_setp_2, "Don't released all memory")
EventMachine.stop
end
end
})
i += 1 i += 1
}
end end
} end
add_test_timeout(65) end
} end
add_test_timeout(110)
end
end end
def config_test_message_cleanup_with_store_off_with_subscriber def config_test_message_cleanup_with_store_off_with_subscriber
...@@ -249,30 +262,49 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -249,30 +262,49 @@ class TestCreateManyChannels < Test::Unit::TestCase
headers = {'accept' => 'text/html'} headers = {'accept' => 'text/html'}
body = 'message to create a channel' body = 'message to create a channel'
EventMachine.run { published_messages_setp_1 = 0
# ensure space for a subscriber after memory was full
EventMachine.run do
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60 sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
EM.add_periodic_timer(0.001) do fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60 publish_message_inline_with_callbacks(channel, headers, body, {
pub_1.callback { :error => Proc.new do |status, content|
EventMachine.stop if (pub_1.response_header.status == 500) fill_memory_timer.cancel
} pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
end
end
})
end end
}
i = 0 EM.add_timer(45) do
EventMachine.run { pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
EM.add_periodic_timer(1) do pub_3.callback do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60 fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
pub_2.callback { assert_equal(0, JSON.parse(pub_3.response)["channels"].to_i, "Don't cleaned all messages/channels")
fail("Don't free the memory in 60 seconds") if (i == 60)
EventMachine.stop if (pub_2.response_header.status == 200) fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
i += 1 publish_message_inline_with_callbacks(channel, headers, body, {
} :error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(published_messages_setp_1, result["published_messages"].to_i / 2, "Don't cleaned all memory")
EventMachine.stop
end
end
})
end
end
end
add_test_timeout(50)
end end
add_test_timeout(65)
}
end end
def config_test_message_cleanup_with_store_off_without_subscriber def config_test_message_cleanup_with_store_off_without_subscriber
...@@ -288,29 +320,51 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -288,29 +320,51 @@ class TestCreateManyChannels < Test::Unit::TestCase
headers = {'accept' => 'text/html'} headers = {'accept' => 'text/html'}
body = 'message to create a channel' body = 'message to create a channel'
j = 0 published_messages_setp_1 = 0
EventMachine.run {
EM.add_periodic_timer(0.001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + j.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if (pub_1.response_header.status == 500)
j += 1
}
end
}
EventMachine.run do
i = 0 i = 0
EventMachine.run { fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
EM.add_periodic_timer(1) do publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + (j + 1).to_s).post :head => headers, :body => body, :timeout => 60 :error => Proc.new do |status, content|
pub_2.callback { fill_memory_timer.cancel
fail("Don't free the memory in 60 seconds") if (i == 60) pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
EventMachine.stop if (pub_2.response_header.status == 200) pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
end
end
})
i += 1 i += 1
}
end end
add_test_timeout(65)
} EM.add_timer(60) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["channels"].to_i, "Don't cleaned all messages/channels")
EM.add_timer(35) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(published_messages_setp_1, result["published_messages"].to_i / 2, "Don't cleaned all memory")
EventMachine.stop
end
end
})
i += 1
end
end
end
end
add_test_timeout(110)
end
end end
end end
require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestCleanupMemoryAfterKill < Test::Unit::TestCase
include BaseTestCase
@@second_step_timer = 120
@@timeout = 130
def global_configuration
ENV['NGINX_WORKERS'] = '1'
@disable_ignore_childs = true
@master_process = 'on'
@daemon = 'on'
end
def kill_worker(&block)
pub = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :timeout => 30
pub.callback do
fail("Don't received the stats") if (pub.response_header.status != 200) || (pub.response_header.content_length == 0)
resp_1 = JSON.parse(pub.response)
assert_equal(1, resp_1["by_worker"].count, "Didn't return infos by_worker")
pid = resp_1["by_worker"][0]['pid'].to_i
sleep(1)
# send kill signal
`kill -9 #{ pid } > /dev/null 2>&1`
block.call unless block.nil?
end
end
def config_test_message_cleanup_after_kill
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = 100
end
def test_message_cleanup_after_kill
channel = 'ch_test_message_cleanup_after_kill'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
stored_messages_setp_1 = 0
published_messages_setp_1 = 0
published_messages_setp_2 = 0
EventMachine.run do
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
stored_messages_setp_1 = result["stored_messages"].to_i
published_messages_setp_1 = result["published_messages"].to_i
assert_equal(@max_message_buffer_length, stored_messages_setp_1, "Don't limit stored messages")
fail("Don't reached the limit of stored messages") if result["published_messages"].to_i <= @max_message_buffer_length
fail("Don't create any message") if stored_messages_setp_1 == 0
kill_worker do
sleep(1)
# connect a subscriber on new worker
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
end
end
end
})
end
EM.add_timer(40) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2
end
end
})
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["stored_messages"].to_i, "Don't cleaned all messages")
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(stored_messages_setp_1, result["stored_messages"].to_i, "Don't cleaned all messages")
assert_equal(published_messages_setp_1, (result["published_messages"].to_i - published_messages_setp_2), "Don't cleaned all memory")
EventMachine.stop
end
end
})
end
end
end
add_test_timeout(@@timeout)
end
end
def config_test_discard_old_messages_after_kill
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = nil
end
def test_discard_old_messages_after_kill
channel = 'ch_test_discard_old_messages_after_kill'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
messages_to_publish = 10
count = 0
stored_messages_setp_1 = 0
EventMachine.run do
fill_memory_timer = EventMachine::PeriodicTimer.new(messages_to_publish / 12.to_f) do # publish messages before cleanup timer be executed
if (count < messages_to_publish)
publish_message_inline(channel, headers, body)
elsif (count == messages_to_publish)
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_1.callback do
fill_memory_timer.cancel
fail("Don't received the stats") if (pub_1.response_header.status != 200) || (pub_1.response_header.content_length == 0)
stored_messages_setp_1 = JSON.parse(pub_1.response)["stored_messages"].to_i
assert_equal(messages_to_publish, stored_messages_setp_1, "Don't store messages")
kill_worker
end
end
count += 1
end
EM.add_timer(14) do # wait cleanup timer to be executed one time
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
stored_messages_setp_2 = JSON.parse(pub_2.response)["stored_messages"].to_i
assert(stored_messages_setp_1 > stored_messages_setp_2, "Don't clear messages")
assert(stored_messages_setp_2 >= (messages_to_publish / 2), "Cleared all messages")
EventMachine.stop
end
end
add_test_timeout(20)
end
end
def config_test_message_cleanup_without_max_messages_stored_per_channel_after_kill
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = nil
end
def test_message_cleanup_without_max_messages_stored_per_channel_after_kill
channel = 'ch_test_message_cleanup_without_max_messages_stored_after_kill'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
stored_messages_setp_1 = 0
published_messages_setp_1 = 0
published_messages_setp_2 = 0
EventMachine.run do
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
stored_messages_setp_1 = result["stored_messages"].to_i
published_messages_setp_1 = result["published_messages"].to_i
fail("Limited the number of stored messages") if stored_messages_setp_1 <= 100
fail("Don't create any message") if stored_messages_setp_1 == 0
kill_worker do
sleep(1)
# connect a subscriber on new worker
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
end
end
end
})
end
EM.add_timer(45) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2
end
end
})
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["stored_messages"].to_i, "Don't cleaned all messages")
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(stored_messages_setp_1, result["stored_messages"].to_i, "Don't cleaned all messages")
assert_equal(published_messages_setp_1, (result["published_messages"].to_i - published_messages_setp_2), "Don't cleaned all memory")
EventMachine.stop
end
end
})
end
end
end
add_test_timeout(@@timeout)
end
end
def config_test_channel_cleanup_after_kill
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '2s'
@max_message_buffer_length = nil
end
def test_channel_cleanup_after_kill
channel = 'ch_test_channel_cleanup_after_kill'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
channels_setp_1 = 0
channels_setp_2 = 0
published_messages_setp_1 = 0
EventMachine.run do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
channels_setp_1 = result["channels"].to_i
published_messages_setp_1 = result["published_messages"].to_i
fail("Don't create any channel") if channels_setp_1 == 0
kill_worker
end
end
})
i += 1
end
EM.add_timer(40) do
j = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + j.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
fail("Don't create more channel") if published_messages_setp_1 == JSON.parse(pub_2.response)["published_messages"].to_i
end
end
})
j += 1
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
channels = JSON.parse(pub_3.response)["channels"].to_i
assert_equal(0, channels, "Don't removed all channels")
EventMachine.stop unless (channels == 0)
EM.add_timer(35) do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
channels_setp_2 = JSON.parse(pub_4.response)["channels"].to_i
assert_equal(channels_setp_1, channels_setp_2, "Don't released all memory")
EventMachine.stop
end
end
})
i += 1
end
end
end
end
add_test_timeout(@@timeout + 35)
end
end
def config_test_message_cleanup_with_store_off_with_subscriber_after_kill
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = nil
@max_message_buffer_length = nil
end
def test_message_cleanup_with_store_off_with_subscriber_after_kill
channel = 'ch_test_message_cleanup_with_store_off_with_subscriber_after_kill'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
published_messages_setp_1 = 0
published_messages_setp_2 = 0
EventMachine.run do
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
kill_worker do
sleep(1)
# connect a subscriber on new worker
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
end
end
end
})
end
EM.add_timer(40) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2
end
end
})
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["channels"].to_i, "Don't cleaned all messages/channels")
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(published_messages_setp_1, (result["published_messages"].to_i - published_messages_setp_2), "Don't cleaned all memory")
EventMachine.stop
end
end
})
end
end
end
add_test_timeout(@@timeout)
end
end
def config_test_message_cleanup_with_store_off_without_subscriber_after_kill
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = nil
@max_message_buffer_length = nil
end
def test_message_cleanup_with_store_off_without_subscriber_after_kill
channel = 'ch_test_message_cleanup_with_store_off_without_subscriber_after_kill'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
published_messages_setp_1 = 0
published_messages_setp_2 = 0
EventMachine.run do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
kill_worker
end
end
})
i += 1
end
EM.add_timer(40) do
j = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + j.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fail("Don't create more channel") if published_messages_setp_1 == published_messages_setp_2
end
end
})
j += 1
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["channels"].to_i, "Don't cleaned all messages/channels")
EM.add_timer(35) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(published_messages_setp_1, (result["published_messages"].to_i - published_messages_setp_2), "Don't cleaned all memory")
EventMachine.stop
end
end
})
i += 1
end
end
end
end
add_test_timeout(@@timeout + 35)
end
end
end
require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestCleanupMemoryAfterReload < Test::Unit::TestCase
include BaseTestCase
@@second_step_timer = 120
@@timeout = 130
def global_configuration
ENV['NGINX_WORKERS'] = '1'
@disable_ignore_childs = true
@master_process = 'on'
@daemon = 'on'
end
def reload_worker
sleep(1)
# send reload signal
`#{ nginx_executable } -c #{ config_filename } -s reload > /dev/null 2>&1`
end
def config_test_message_cleanup_after_reload
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = 100
end
def test_message_cleanup_after_reload
channel = 'ch_test_message_cleanup_after_reload'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
stored_messages_setp_1 = 0
published_messages_setp_1 = 0
published_messages_setp_2 = 0
EventMachine.run do
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
stored_messages_setp_1 = result["stored_messages"].to_i
published_messages_setp_1 = result["published_messages"].to_i
assert_equal(@max_message_buffer_length, stored_messages_setp_1, "Don't limit stored messages")
fail("Don't reached the limit of stored messages") if result["published_messages"].to_i <= @max_message_buffer_length
fail("Don't create any message") if stored_messages_setp_1 == 0
reload_worker
sleep(1)
# connect a subscriber on new worker
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
end
end
})
end
EM.add_timer(40) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2
end
end
})
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["stored_messages"].to_i, "Don't cleaned all messages")
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(stored_messages_setp_1, result["stored_messages"].to_i, "Don't cleaned all messages")
assert_equal(published_messages_setp_1, (result["published_messages"].to_i - published_messages_setp_2), "Don't cleaned all memory")
EventMachine.stop
end
end
})
end
end
end
add_test_timeout(@@timeout)
end
end
def config_test_discard_old_messages_after_reload
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = nil
end
def test_discard_old_messages_after_reload
channel = 'ch_test_discard_old_messages_after_reload'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
messages_to_publish = 10
count = 0
stored_messages_setp_1 = 0
EventMachine.run do
fill_memory_timer = EventMachine::PeriodicTimer.new(messages_to_publish / 12.to_f) do # publish messages before cleanup timer be executed
if (count < messages_to_publish)
publish_message_inline(channel, headers, body)
elsif (count == messages_to_publish)
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_1.callback do
fill_memory_timer.cancel
fail("Don't received the stats") if (pub_1.response_header.status != 200) || (pub_1.response_header.content_length == 0)
stored_messages_setp_1 = JSON.parse(pub_1.response)["stored_messages"].to_i
assert_equal(messages_to_publish, stored_messages_setp_1, "Don't store messages")
reload_worker
end
end
count += 1
end
EM.add_timer(14) do # wait cleanup timer to be executed one time
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
stored_messages_setp_2 = JSON.parse(pub_2.response)["stored_messages"].to_i
assert(stored_messages_setp_1 > stored_messages_setp_2, "Don't clear messages")
assert(stored_messages_setp_2 >= (messages_to_publish / 2), "Cleared all messages")
EventMachine.stop
end
end
add_test_timeout(20)
end
end
def config_test_message_cleanup_without_max_messages_stored_per_channel_after_reload
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '10s'
@max_message_buffer_length = nil
end
def test_message_cleanup_without_max_messages_stored_per_channel_after_reload
channel = 'ch_test_message_cleanup_without_max_messages_stored_per_channel_after_reload'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
stored_messages_setp_1 = 0
published_messages_setp_1 = 0
published_messages_setp_2 = 0
EventMachine.run do
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
stored_messages_setp_1 = result["stored_messages"].to_i
published_messages_setp_1 = result["published_messages"].to_i
fail("Limited the number of stored messages") if stored_messages_setp_1 <= 100
fail("Don't create any message") if stored_messages_setp_1 == 0
reload_worker
sleep(1)
# connect a subscriber on new worker
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
end
end
})
end
EM.add_timer(45) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2
end
end
})
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["stored_messages"].to_i, "Don't cleaned all messages")
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(stored_messages_setp_1, result["stored_messages"].to_i, "Don't cleaned all messages")
assert_equal(published_messages_setp_1, (result["published_messages"].to_i - published_messages_setp_2), "Don't cleaned all memory")
EventMachine.stop
end
end
})
end
end
end
add_test_timeout(@@timeout)
end
end
def config_test_channel_cleanup_after_reload
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = '2s'
@max_message_buffer_length = nil
end
def test_channel_cleanup_after_reload
channel = 'ch_test_channel_cleanup_after_reload'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
channels_setp_1 = 0
channels_setp_2 = 0
published_messages_setp_1 = 0
EventMachine.run do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
channels_setp_1 = result["channels"].to_i
published_messages_setp_1 = result["published_messages"].to_i
fail("Don't create any channel") if channels_setp_1 == 0
reload_worker
end
end
})
i += 1
end
EM.add_timer(40) do
j = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + j.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
fail("Don't create more channel") if published_messages_setp_1 == JSON.parse(pub_2.response)["published_messages"].to_i
end
end
})
j += 1
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
channels = JSON.parse(pub_3.response)["channels"].to_i
assert_equal(0, channels, "Don't removed all channels")
EventMachine.stop unless (channels == 0)
EM.add_timer(35) do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
channels_setp_2 = JSON.parse(pub_4.response)["channels"].to_i
assert_equal(channels_setp_1, channels_setp_2, "Don't released all memory")
EventMachine.stop
end
end
})
i += 1
end
end
end
end
add_test_timeout(@@timeout + 35)
end
end
def config_test_message_cleanup_with_store_off_with_subscriber_after_reload
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = nil
@max_message_buffer_length = nil
end
def test_message_cleanup_with_store_off_with_subscriber_after_reload
channel = 'ch_test_message_cleanup_with_store_off_with_subscriber_after_reload'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
published_messages_setp_1 = 0
published_messages_setp_2 = 0
EventMachine.run do
# ensure channel will not be cleaned up
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
reload_worker
sleep(1)
# connect a subscriber on new worker
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 60
end
end
})
end
EM.add_timer(40) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fail("Don't publish more messages") if published_messages_setp_1 == published_messages_setp_2
end
end
})
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["channels"].to_i, "Don't cleaned all messages/channels")
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(published_messages_setp_1, (result["published_messages"].to_i - published_messages_setp_2), "Don't cleaned all memory")
EventMachine.stop
end
end
})
end
end
end
add_test_timeout(@@timeout)
end
end
def config_test_message_cleanup_with_store_off_without_subscriber_after_reload
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
@max_reserved_memory = "129k"
@min_message_buffer_timeout = nil
@max_message_buffer_length = nil
end
def test_message_cleanup_with_store_off_without_subscriber_after_reload
channel = 'ch_test_message_cleanup_with_store_off_without_subscriber_after_reload'
headers = {'accept' => 'text/html'}
body = 'message to create a channel'
published_messages_setp_1 = 0
published_messages_setp_2 = 0
EventMachine.run do
i = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
result = JSON.parse(pub_2.response)
published_messages_setp_1 = result["published_messages"].to_i
reload_worker
end
end
})
i += 1
end
EM.add_timer(40) do
j = 0
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + j.to_s, headers, body, {
:error => Proc.new do |status, content|
fill_memory_timer.cancel
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_2.callback do
fail("Don't received the stats") if (pub_2.response_header.status != 200) || (pub_2.response_header.content_length == 0)
published_messages_setp_2 = JSON.parse(pub_2.response)["published_messages"].to_i
fail("Don't create more channel") if published_messages_setp_1 == published_messages_setp_2
end
end
})
j += 1
end
end
EM.add_timer(@@second_step_timer) do
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_3.callback do
fail("Don't received the stats") if (pub_3.response_header.status != 200) || (pub_3.response_header.content_length == 0)
assert_equal(0, JSON.parse(pub_3.response)["channels"].to_i, "Don't cleaned all messages/channels")
EM.add_timer(35) do
fill_memory_timer = EventMachine::PeriodicTimer.new(0.001) do
publish_message_inline_with_callbacks(channel + i.to_s, headers, body, {
:error => Proc.new do |status, content|
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 60
pub_4.callback do
fail("Don't received the stats") if (pub_4.response_header.status != 200) || (pub_4.response_header.content_length == 0)
result = JSON.parse(pub_4.response)
assert_equal(published_messages_setp_1, (result["published_messages"].to_i - published_messages_setp_2), "Don't cleaned all memory")
EventMachine.stop
end
end
})
i += 1
end
end
end
end
add_test_timeout(@@timeout + 35)
end
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment