Commit c9511c49 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding directives to change text for channel deleted and ping messages

parent 8c6f1070
* Fixing bug which removed default message template
* Adding directives to change text for channel deleted and ping messages
* Adding feature to get channels statistics by prefix (Suggested by Alexey Vdovin)
* Adding publisher administrator feature to delete channels (Suggested by Alexey Vdovin)
* Removing support for versions 0.7.x
* Fixing bug which removed default message template
* Fixing messages sent to subscribers to be a truly transfer encoding chunked connection
* Removing support for versions 0.7.x
* Removing hack to keep connection open (Thanks _Maxim Dounin_)
h2. Version 0.2.4
......
......@@ -174,6 +174,9 @@ h3(#directives). Directives
|push_stream_publisher|-|-|location|-|
|push_stream_subscriber|-|-|location|-|
|push_stream_max_reserved_memory|16 * ngx_pagesize|size greater than 16 * ngx_pagesize|http|main nginx configuration|
|push_stream_memory_cleanup_timeout|30 seconds|time constant|http|main nginx configuration|
|push_stream_channel_deleted_message_text|"Channel deleted"|any string|http|main nginx configuration|
|push_stream_ping_message_text|""|any string|http|main nginx configuration|
|push_stream_store_messages|off|on, off|location|push_stream_publisher|
|push_stream_min_message_buffer_timeout|unset|time constant|location|push_stream_publisher|
|push_stream_max_message_buffer_length|unset|number|location|push_stream_publisher|
......@@ -188,7 +191,6 @@ h3(#directives). Directives
|push_stream_message_template|==~text~==|any string|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_max_number_of_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_max_number_of_broadcast_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_memory_cleanup_timeout|30 seconds|time constant|http|main nginx configuration|
|push_stream_keepalive|off|on, off|http, location|(push_stream_publisher and push_stream_channels_statistics) or main nginx configuration|
|push_stream_publisher_admin|off|on, off|location|push_stream_publisher|
......@@ -291,6 +293,33 @@ context: http
The size of the memory chunk this module will use to store published messages, channels and other shared structures.
When this memory is full any new request for publish a message or subscribe a channel will receive an 500 Internal Server Error response.
h4(#push_stream_memory_cleanup_timeout). push_stream_memory_cleanup_timeout [ time ]
default: 30 seconds
context: http
location: main nginx configuration
The length of time a message or a channel will stay on garbage collection area before it is completly discarded, freeing the shared memory. The minimum length is 30 seconds to ensure that no one is using these elements.
This operation is very important to help Nginx recycle memory consumed to create messages and channels, so do not use a large time.
h4(#push_stream_channel_deleted_message_text). push_stream_channel_deleted_message_text [ string ]
New in version 0.2.5
default: "Channel deleted"
context: http
location: main nginx configuration
The string used on channel deleted message sent to subscribers when the channel is deleted by a publisher.
h4(#push_stream_ping_message_text). push_stream_ping_message_text [ string ]
New in version 0.2.5
default: ""
context: http
location: main nginx configuration
The string used on ping message sent to subscribers.
h4(#push_stream_store_messages). push_stream_store_messages [ on | off ]
default: off
......@@ -377,15 +406,6 @@ location: (push_stream_subscriber and push_stream_publisher) or main nginx confi
The maximum number of concurrent broadcats channels on the server. If you do not want to limit the number of broadcast channels, just not set this directive.
h4(#push_stream_memory_cleanup_timeout). push_stream_memory_cleanup_timeout [ time ]
default: 30 seconds
context: location
location: (push_stream_subscriber and push_stream_publisher) or main nginx configuration
The length of time a message or a channel will stay on garbage collection area before it is completly discarded, freeing the shared memory. The minimum length is 30 seconds to ensure that no one is using these elements.
This operation is very important to help Nginx recycle memory consumed to create messages and channels, so do not use a large time.
h4(#push_stream_keepalive). push_stream_keepalive [ on | off ]
New in version 0.2.4
......
......@@ -42,6 +42,8 @@ typedef struct {
size_t shm_size;
ngx_msec_t memory_cleanup_interval;
time_t memory_cleanup_timeout;
ngx_str_t channel_deleted_message_text;
ngx_str_t ping_message_text;
ngx_uint_t qtd_templates;
ngx_http_push_stream_msg_template_t msg_templates;
} ngx_http_push_stream_main_conf_t;
......
......@@ -181,7 +181,6 @@ static ngx_http_push_stream_content_subtype_t subtypes[] = {
static const ngx_int_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID = -1;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT = ngx_string("");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PING_CHANNEL_ID = ngx_string("");
static const ngx_int_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID = -2;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT = ngx_string("Channel deleted");
......@@ -230,7 +229,7 @@ static void ngx_http_push_stream_worker_subscriber_cleanup_locke
u_char * ngx_http_push_stream_append_crlf(const ngx_str_t *str, ngx_pool_t *pool);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_delete_channel(ngx_str_t *id);
static void ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
......
......@@ -74,7 +74,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
}
if (cf->publisher_admin && (r->method == NGX_HTTP_DELETE)) {
ngx_http_push_stream_delete_channel(id);
ngx_http_push_stream_delete_channel(id, r->pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
}
......
......@@ -50,6 +50,24 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, shm_size),
NULL },
{ ngx_string("push_stream_memory_cleanup_timeout"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, memory_cleanup_timeout),
NULL },
{ ngx_string("push_stream_channel_deleted_message_text"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, channel_deleted_message_text),
NULL },
{ ngx_string("push_stream_ping_message_text"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, ping_message_text),
NULL },
{ ngx_string("push_stream_store_messages"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
......@@ -134,12 +152,6 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, max_number_of_broadcast_channels),
NULL },
{ ngx_string("push_stream_memory_cleanup_timeout"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, memory_cleanup_timeout),
NULL },
{ ngx_string("push_stream_keepalive"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
......@@ -301,6 +313,8 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->shm_size = NGX_CONF_UNSET_SIZE;
mcf->memory_cleanup_timeout = NGX_CONF_UNSET;
mcf->channel_deleted_message_text.data = NULL;
mcf->ping_message_text.data = NULL;
mcf->qtd_templates = 0;
ngx_queue_init(&mcf->msg_templates.queue);
......@@ -323,6 +337,16 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
conf->shm_size = NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE;
}
if (conf->channel_deleted_message_text.data == NULL) {
conf->channel_deleted_message_text.data = NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT.data;
conf->channel_deleted_message_text.len = NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT.len;
}
if (conf->ping_message_text.data == NULL) {
conf->ping_message_text.data = NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT.data;
conf->ping_message_text.len = NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT.len;
}
// memory cleanup timeout cannot't be small
if (conf->memory_cleanup_timeout < NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "memory cleanup timeout cannot't be less than %d.", NGX_HTTP_PUSH_STREAM_DEFAULT_MEMORY_CLEANUP_TIMEOUT);
......@@ -628,7 +652,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
ngx_rbtree_init(&d->unrecoverable_channels, unrecoverable_sentinel, ngx_http_push_stream_rbtree_insert);
// create ping message
ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT.data, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, ngx_cycle->pool);
ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, ngx_cycle->pool);
if (ngx_http_push_stream_ping_msg == NULL) {
return NGX_ERROR;
}
......
......@@ -292,7 +292,7 @@ ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *
static void
ngx_http_push_stream_delete_channel(ngx_str_t *id) {
ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) {
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
......@@ -315,6 +315,9 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id) {
// remove all messages
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, 0, 0);
// apply channel deleted message text to message template
channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data, ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, temp_pool);
// send signal to each worker with subscriber to this channel
cur = &channel->workers_with_subscribers;
......@@ -707,9 +710,9 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
u_char char_id[NGX_INT_T_LEN];
ngx_memset(char_id, '\0', NGX_INT_T_LEN);
u_char *msg = NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT.data;
u_char *channel_id = NGX_HTTP_PUSH_STREAM_PING_CHANNEL_ID.data;
ngx_int_t message_id = NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID;
u_char *msg = NGX_HTTP_PUSH_STREAM_EMPTY.data;
u_char *channel_id = NGX_HTTP_PUSH_STREAM_EMPTY.data;
ngx_int_t message_id = 0;
if (channel != NULL) {
channel_id = channel->id.data;
......
......@@ -174,7 +174,6 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_pool_t *temp_pool;
ngx_flag_t is_broadcast_channel = 0;
channel = ngx_http_push_stream_find_channel(id, log);
......@@ -201,13 +200,6 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
return NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED;
}
//create a temporary pool to allocate temporary elements
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate memory for temporary pool");
ngx_shmtx_unlock(&shpool->mutex);
return NULL;
}
if ((channel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_channel_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NULL;
......@@ -228,8 +220,6 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_http_push_stream_initialize_channel(channel);
channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT.data, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT.len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, temp_pool);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(is_broadcast_channel) ? data->broadcast_channels++ : data->channels++;
......
......@@ -148,6 +148,8 @@ module BaseTestCase
@config_template = nil
@keepalive = 'off'
@publisher_admin = 'off'
@channel_deleted_message_text = nil
@ping_message_text = nil
self.send(:global_configuration) if self.respond_to?(:global_configuration)
end
......@@ -185,6 +187,13 @@ module BaseTestCase
}
end
def add_test_timeout(timeout=5)
EM.add_timer(timeout) do
fail("Test timeout reached")
EventMachine.stop
end
end
@@config_template = %q{
pid <%= @pid_file %>;
error_log <%= @main_error_log %> debug;
......@@ -221,6 +230,8 @@ http {
client_body_temp_path <%= @client_body_temp %>;
<%= "push_stream_max_reserved_memory #{@max_reserved_memory};" unless @max_reserved_memory.nil? %>
<%= "push_stream_memory_cleanup_timeout #{@memory_cleanup_timeout};" unless @memory_cleanup_timeout.nil? %>
<%= %{push_stream_channel_deleted_message_text "#{@channel_deleted_message_text}";} unless @channel_deleted_message_text.nil? %>
<%= %{push_stream_ping_message_text "#{@ping_message_text}";} unless @ping_message_text.nil? %>
server {
listen <%=nginx_port%>;
......
......@@ -454,6 +454,7 @@ class TestPublisherAdmin < Test::Unit::TestCase
response = JSON.parse(resp)
assert_equal(channel, response["channel"], "Wrong channel")
assert_equal(-2, response["id"].to_i, "Wrong message id")
assert_equal("Channel deleted", response["text"], "Wrong message text")
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => {'accept' => 'application/json'}, :timeout => 30
stats.callback {
......@@ -518,6 +519,7 @@ class TestPublisherAdmin < Test::Unit::TestCase
response = JSON.parse(resp)
assert_equal(channel_1, response["channel"], "Wrong channel")
assert_equal(-2, response["id"].to_i, "Wrong message id")
assert_equal("Channel deleted", response["text"], "Wrong message text")
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => {'accept' => 'application/json'}, :timeout => 30
stats.callback {
......@@ -548,4 +550,46 @@ class TestPublisherAdmin < Test::Unit::TestCase
}
end
def config_test_custom_channel_deleted_message_text
@channel_deleted_message_text = "Channel has gone away."
@header_template = " " # send a space as header to has a chunk received
@ping_message_interval = nil
@message_template = '{\"id\":\"~id~\", \"channel\":\"~channel~\", \"text\":\"~text~\"}'
end
def test_custom_channel_deleted_message_text
headers = {'accept' => 'application/json'}
body = 'published message'
channel = 'test_custom_channel_deleted_message_text'
resp = ""
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
resp = resp + chunk
if resp.strip.empty?
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).delete :head => headers, :timeout => 30
pub.callback {
assert_equal(200, pub.response_header.status, "Request was not received")
assert_equal(0, pub.response_header.content_length, "Should response only with headers")
assert_equal("Channel deleted.", pub.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
}
else
begin
response = JSON.parse(resp)
assert_equal(channel, response["channel"], "Wrong channel")
assert_equal(-2, response["id"].to_i, "Wrong message id")
assert_equal(@channel_deleted_message_text, response["text"], "Wrong message text")
rescue JSON::ParserError
fail("Didn't receive a valid response")
end
EventMachine.stop
end
}
add_test_timeout
}
end
end
......@@ -641,10 +641,7 @@ class TestPublisher < Test::Unit::TestCase
EventMachine.stop
}
EM.add_timer(5) do
fail("Test timeout reached")
EventMachine.stop
end
add_test_timeout
}
end
......@@ -660,4 +657,96 @@ class TestPublisher < Test::Unit::TestCase
}
end
def config_test_default_ping_message_with_default_message_template
@header_template = nil
@message_template = nil
@ping_message_text = nil
@ping_message_interval = '1s'
end
def test_default_ping_message_with_default_message_template
headers = {'accept' => 'application/json'}
channel = 'ch_test_default_ping_message_with_default_message_template'
body = 'body'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
assert_equal("\r\n", chunk, "Wrong message")
EventMachine.stop
}
add_test_timeout
}
end
def config_test_custom_ping_message_with_default_message_template
@header_template = nil
@message_template = nil
@ping_message_text = "pinging you!!!"
@ping_message_interval = '1s'
end
def test_custom_ping_message_with_default_message_template
headers = {'accept' => 'application/json'}
channel = 'ch_test_custom_ping_message_with_default_message_template'
body = 'body'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
assert_equal("#{@ping_message_text}\r\n", chunk, "Wrong message")
EventMachine.stop
}
add_test_timeout
}
end
def config_test_default_ping_message_with_custom_message_template
@header_template = nil
@message_template = "~id~:~text~"
@ping_message_text = nil
@ping_message_interval = '1s'
end
def test_default_ping_message_with_custom_message_template
headers = {'accept' => 'application/json'}
channel = 'ch_test_default_ping_message_with_custom_message_template'
body = 'body'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
assert_equal("-1:\r\n", chunk, "Wrong message")
EventMachine.stop
}
add_test_timeout
}
end
def config_test_custom_ping_message_with_default_message_template
@header_template = nil
@message_template = "~id~:~text~"
@ping_message_text = "pinging you!!!"
@ping_message_interval = '1s'
end
def test_custom_ping_message_with_default_message_template
headers = {'accept' => 'application/json'}
channel = 'ch_test_custom_ping_message_with_default_message_template'
body = 'body'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
assert_equal("-1:#{@ping_message_text}\r\n", chunk, "Wrong message")
EventMachine.stop
}
add_test_timeout
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment