Commit 25771076 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

fix bug when reload configuration file and increase minimun needed for shared memory

parent 199fde61
......@@ -168,7 +168,7 @@ h3(#directives). Directives
|push_stream_channels_statistics|-|-|location|-|
|push_stream_publisher|-|-|location|-|
|push_stream_subscriber|-|-|location|-|
|push_stream_max_reserved_memory|32M|size greater than 8 * ngx_pagesize|http|main nginx configuration|
|push_stream_max_reserved_memory|16 * ngx_pagesize|size greater than 16 * ngx_pagesize|http|main nginx configuration|
|push_stream_store_messages|off|on, off|location|push_stream_publisher|
|push_stream_min_message_buffer_timeout|unset|time constant|location|push_stream_publisher|
|push_stream_max_message_buffer_length|unset|number|location|push_stream_publisher|
......
......@@ -150,8 +150,8 @@ typedef struct {
} ngx_http_push_stream_worker_msg_t;
typedef struct {
ngx_http_push_stream_worker_msg_t messages_queue;
ngx_http_push_stream_worker_subscriber_t worker_subscribers_sentinel;
ngx_http_push_stream_worker_msg_t *messages_queue;
ngx_http_push_stream_worker_subscriber_t *worker_subscribers_sentinel;
ngx_uint_t subscribers; // # of subscribers in the worker
pid_t pid;
} ngx_http_push_stream_worker_data_t;
......@@ -165,7 +165,7 @@ typedef struct {
ngx_uint_t subscribers; // # of subscribers in all channels
ngx_http_push_stream_msg_t messages_to_delete;
ngx_rbtree_t channels_to_delete;
ngx_http_push_stream_worker_data_t *ipc; // interprocess stuff
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
} ngx_http_push_stream_shm_data_t;
ngx_int_t ngx_http_push_stream_worker_processes;
......
......@@ -146,8 +146,8 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
ngx_uint_t len;
ngx_str_t *currenttime, *hostname, *format;
u_char *subscribers_by_workers, *start;
int i;
ngx_http_push_stream_shm_data_t *shm_data;
int i, j, used_slots;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *worker_data;
ngx_http_push_stream_content_subtype_t *subtype;
......@@ -155,17 +155,25 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
currenttime = ngx_http_push_stream_get_formatted_current_time(r->pool);
hostname = ngx_http_push_stream_get_formatted_hostname(r->pool);
shm_data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
used_slots = 0;
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (data->ipc[i].pid > 0) {
used_slots++;
}
}
len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len;
len = ngx_http_push_stream_worker_processes * (2*NGX_INT_T_LEN + len - 5); //minus 5 sprintf
len = used_slots * (2*NGX_INT_T_LEN + len - 5); //minus 5 sprintf
subscribers_by_workers = ngx_pcalloc(r->pool, len);
ngx_memset(subscribers_by_workers, '\0', len);
start = subscribers_by_workers;
for (i = 0; i < ngx_http_push_stream_worker_processes; i++) {
format = (i < ngx_http_push_stream_worker_processes - 1) ? subtype->format_summarized_worker_item : subtype->format_summarized_worker_last_item;
worker_data = shm_data->ipc + i;
start = ngx_sprintf(start, (char *) format->data, worker_data->pid, worker_data->subscribers);
for (i = 0, j = 0; (i < used_slots) && (j < NGX_MAX_PROCESSES); j++) {
worker_data = data->ipc + j;
if (worker_data->pid > 0) {
format = (i < used_slots - 1) ? subtype->format_summarized_worker_item : subtype->format_summarized_worker_last_item;
start = ngx_sprintf(start, (char *) format->data, worker_data->pid, worker_data->subscribers);
i++;
}
}
len = 3*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 18;// minus 18 sprintf
......@@ -176,7 +184,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
}
ngx_memset(b->start, '\0', len);
b->last = ngx_sprintf(b->start, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, shm_data->channels, shm_data->broadcast_channels, shm_data->published_messages, shm_data->subscribers, subscribers_by_workers);
b->last = ngx_sprintf(b->start, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, data->published_messages, data->subscribers, subscribers_by_workers);
return ngx_http_push_stream_send_buf_response(r, b, subtype->content_type, NGX_HTTP_OK);
}
......
......@@ -123,52 +123,36 @@ ngx_http_push_stream_init_ipc_shm(ngx_int_t workers)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *workers_data;
int i;
ngx_shmtx_lock(&shpool->mutex);
if (data->ipc != NULL) {
// already initialized... reset channel subscribers counters and census subscribers
ngx_http_push_stream_worker_data_t *worker_data = NULL;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = &thisworker_data->worker_subscribers_sentinel;
ngx_queue_init(&sentinel->queue);
for(i=0; i<workers; i++) {
worker_data = data->ipc + i;
worker_data->subscribers = 0;
}
data->subscribers = 0;
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked);
if ((data->ipc[ngx_process_slot].messages_queue = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
for(i=0; i<workers; i++) {
ngx_http_push_stream_alert_worker_census_subscribers(ngx_pid, i, ngx_cycle->log);
}
return NGX_OK;
return NGX_ERROR;
}
// initialize worker message queues
if ((workers_data = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_data_t)*workers)) == NULL) {
if ((data->ipc[ngx_process_slot].worker_subscribers_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_subscriber_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
for(i=0; i<workers; i++) {
ngx_queue_init(&workers_data[i].messages_queue.queue);
ngx_queue_init(&workers_data[i].worker_subscribers_sentinel.queue);
}
data->ipc[ngx_process_slot].pid = ngx_pid;
ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue);
ngx_queue_init(&data->ipc[ngx_process_slot].worker_subscribers_sentinel->queue);
data->ipc = workers_data;
ngx_queue_init(&data->messages_to_delete.queue);
data->subscribers = 0;
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked);
ngx_shmtx_unlock(&shpool->mutex);
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (data->ipc[i].pid > 0) {
data->ipc[i].subscribers = 0;
ngx_http_push_stream_alert_worker_census_subscribers(ngx_pid, i, ngx_cycle->log);
}
}
return NGX_OK;
}
......@@ -251,7 +235,7 @@ ngx_http_push_stream_census_worker_subscribers(void)
ngx_shmtx_lock(&shpool->mutex);
sentinel = &thisworker_data->worker_subscribers_sentinel;
sentinel = thisworker_data->worker_subscribers_sentinel;
cur = sentinel;
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
sentinel_subscription = &cur->subscriptions_sentinel;
......@@ -272,14 +256,14 @@ ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect)
{
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = &thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *cur = sentinel;
time_t now = ngx_time();
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
if ((cur->request != NULL) && ((force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) {
if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) {
ngx_http_push_stream_worker_subscriber_cleanup(cur);
cur->request->keepalive = 0;
ngx_http_send_special(cur->request, NGX_HTTP_LAST | NGX_HTTP_FLUSH);
......@@ -296,7 +280,7 @@ ngx_http_push_stream_send_worker_ping_message(void)
{
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = &thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *cur = sentinel;
if ((ngx_http_push_stream_ping_msg != NULL) && (!ngx_queue_empty(&sentinel->queue))) {
......@@ -321,7 +305,7 @@ ngx_http_push_stream_process_worker_message(void)
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
sentinel = (ngx_http_push_stream_worker_msg_t *) &thisworker_data->messages_queue;
sentinel = thisworker_data->messages_queue;
worker_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&sentinel->queue);
while (worker_msg != sentinel) {
if (worker_msg->pid == ngx_pid) {
......@@ -383,7 +367,7 @@ ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel
newmessage->pid = pid;
newmessage->subscriber_sentinel = subscriber_sentinel;
newmessage->channel = channel;
ngx_queue_insert_tail(&thisworker_data->messages_queue.queue, &newmessage->queue);
ngx_queue_insert_tail(&thisworker_data->messages_queue->queue, &newmessage->queue);
ngx_shmtx_unlock(&shpool->mutex);
......
......@@ -214,6 +214,10 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
static void
ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
{
if (ngx_http_push_stream_shm_zone == NULL) {
return;
}
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
......@@ -226,6 +230,12 @@ ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
static void
ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
{
if (ngx_http_push_stream_shm_zone == NULL) {
return;
}
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
// disconnect all subscribers (force_disconnect = 1)
ngx_http_push_stream_disconnect_worker_subscribers(1);
......@@ -241,7 +251,14 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
ngx_del_timer(&ngx_http_push_stream_memory_cleanup_event);
}
if (ngx_http_push_stream_buffer_cleanup_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_buffer_cleanup_event);
}
ngx_http_push_stream_ipc_exit_worker(cycle);
data->ipc[ngx_process_slot].pid = -1;
data->ipc[ngx_process_slot].subscribers = 0;
}
......@@ -253,9 +270,9 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf)
// initialize shared memory
shm_size = ngx_align(conf->shm_size, ngx_pagesize);
if (shm_size < 8 * ngx_pagesize) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "The push_stream_max_reserved_memory value must be at least %udKiB", (8 * ngx_pagesize) >> 10);
shm_size = 8 * ngx_pagesize;
if (shm_size < 16 * ngx_pagesize) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "The push_stream_max_reserved_memory value must be at least %udKiB", (16 * ngx_pagesize) >> 10);
shm_size = 16 * ngx_pagesize;
}
if (ngx_http_push_stream_shm_zone && ngx_http_push_stream_shm_zone->shm.size != shm_size) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Cannot change memory area size without restart, ignoring change");
......@@ -272,7 +289,6 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_main_conf_t));
if (mcf == NULL) {
return NGX_CONF_ERROR;
}
......@@ -562,6 +578,8 @@ ngx_http_push_stream_set_up_shm(ngx_conf_t *cf, size_t shm_size)
static ngx_int_t
ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
{
int i;
if (data) { /* zone already initialized */
shm_zone->data = data;
return NGX_OK;
......@@ -575,7 +593,12 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
return NGX_ERROR;
}
shm_zone->data = d;
d->ipc = NULL;
ngx_queue_init(&d->messages_to_delete.queue);
for (i = 0; i < NGX_MAX_PROCESSES; i++) {
d->ipc[i].pid = -1;
d->ipc[i].subscribers = 0;
}
// initialize rbtree
if ((sentinel = ngx_slab_alloc(shpool, sizeof(*sentinel))) == NULL) {
return NGX_ERROR;
......
......@@ -160,7 +160,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_shmtx_lock(&shpool->mutex);
// adding subscriber to woker list of subscribers
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel.queue, &worker_subscriber->queue);
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel->queue, &worker_subscriber->queue);
// increment global subscribers count
data->subscribers++;
......
......@@ -491,7 +491,7 @@ ngx_http_push_stream_buffer_cleanup_timer_set(ngx_http_push_stream_loc_conf_t *p
static void
ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event)
{
if (timer_interval != NGX_CONF_UNSET_MSEC) {
if (!ngx_exiting && (timer_interval != NGX_CONF_UNSET_MSEC)) {
if (timer_event->timedout) {
#if defined nginx_version && nginx_version >= 7066
ngx_time_update();
......
......@@ -5,7 +5,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
def config_test_message_cleanup
@min_message_buffer_timeout = '10s'
@max_reserved_memory = "32k"
@max_reserved_memory = "64k"
@max_message_buffer_length = 100
@memory_cleanup_timeout = '30s'
end
......@@ -64,7 +64,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
def config_test_channel_cleanup
@min_message_buffer_timeout = '10s'
@max_reserved_memory = "32k"
@max_reserved_memory = "64k"
@memory_cleanup_timeout = '30s'
end
......@@ -117,7 +117,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
end
def config_test_message_cleanup_with_store_off_with_subscriber
@max_reserved_memory = "32k"
@max_reserved_memory = "64k"
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
end
......@@ -153,7 +153,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
end
def config_test_message_cleanup_with_store_off_without_subscriber
@max_reserved_memory = "32k"
@max_reserved_memory = "64k"
@store_messages = 'off'
@memory_cleanup_timeout = '30s'
end
......
require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestSendSignals < Test::Unit::TestCase
include BaseTestCase
def config_test_send_hup_signal
ENV['NGINX_WORKERS'] = '1'
@memory_cleanup_timeout = '2m'
@min_message_buffer_timeout = '2m'
@subscriber_connection_timeout = '2m'
@master_process = 'on'
@daemon = 'on'
@header_template = 'HEADER'
end
def test_send_hup_signal
headers = {'accept' => 'application/json'}
channel = 'ch_test_send_hup_signal'
body = 'body'
response = ''
response2 = ''
pid = 0
pid2 = 0
EventMachine.run {
# create subscriber
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
response = response + chunk
if response.strip == @header_template
# check statistics
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30
pub_1.callback {
assert_equal(200, pub_1.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_1.response_header.content_length, "Don't received channels statistics")
begin
resp_1 = JSON.parse(pub_1.response)
assert(resp_1.has_key?("channels"), "Didn't received the correct answer with channels info")
assert_equal(1, resp_1["channels"].to_i, "Didn't create channel")
assert_equal(1, resp_1["by_worker"].count, "Didn't return infos by_worker")
pid = resp_1["by_worker"][0]['pid'].to_i
# send reload signal
POpen4::popen4("#{ nginx_executable } -c #{ config_filename } -s reload") do
# publish a message
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 30
pub_2.callback {
# add new subscriber
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :head => headers, :timeout => 30
sub_2.stream { |chunk|
response2 = response2 + chunk
if response2.strip == @header_template
# check statistics again
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30
pub_3.callback {
resp_2 = JSON.parse(pub_3.response)
assert(resp_2.has_key?("channels"), "Didn't received the correct answer with channels info")
assert_equal(1, resp_2["channels"].to_i, "Didn't create channel")
assert_equal(1, resp_2["published_messages"].to_i, "Didn't create messages")
assert_equal(2, resp_2["subscribers"].to_i, "Didn't create subscribers")
assert_equal(2, resp_2["by_worker"].count, "Didn't return infos by_worker")
}
end
}
}
end
rescue JSON::ParserError
fail("Didn't receive a valid response")
EventMachine.stop
end
}
end
}
# wait some time to first worker die
EM.add_timer(45) do
# check statistics again
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30
pub_4.callback {
resp_3 = JSON.parse(pub_4.response)
assert(resp_3.has_key?("channels"), "Didn't received the correct answer with channels info")
assert_equal(1, resp_3["channels"].to_i, "Didn't create channel")
assert_equal(1, resp_3["published_messages"].to_i, "Didn't create messages")
assert_equal(1, resp_3["subscribers"].to_i, "Didn't create subscribers")
assert_equal(1, resp_3["by_worker"].count, "Didn't return infos by_worker")
pid2 = resp_3["by_worker"][0]['pid'].to_i
assert_not_equal(pid, pid2, "Didn't recreate worker")
EventMachine.stop
}
end
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment