Commit 8c6f1070 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding feature to get channels statistics by prefix

parent 9b272cf3
* Fixing bug which removed default message template
* Adding feature to get channels statistics by prefix (Suggested by Alexey Vdovin)
* Adding publisher administrator feature to delete channels (Suggested by Alexey Vdovin)
* Removing support for versions 0.7.x
* Fixing messages sent to subscribers to be a truly transfer encoding chunked connection
......
......@@ -105,22 +105,27 @@ one terminal and start playing http pubsub:
<pre>
<code>
# Pub
curl -s -v -X POST "http://localhost/pub?id=my_channel_1" -d "Hello World!"
curl -s -v -X POST 'http://localhost/pub?id=my_channel_1' -d 'Hello World!'
curl -s -v -X POST 'http://localhost/pub?id=your_channel_1' -d 'Hi everybody!'
curl -s -v -X POST 'http://localhost/pub?id=your_channel_2' -d 'Goodbye!'
# Sub
curl -s -v "http://localhost/sub/my_channel_1.b20"
curl -s -v 'http://localhost/sub/my_channel_1.b20'
# Channel Stats for publisher (json format)
curl -s -v "http://localhost/pub?id=my_channel_1"
curl -s -v 'http://localhost/pub?id=my_channel_1'
# All Channels Stats summarized (json format)
curl -s -v "http://localhost/channels-stats"
curl -s -v 'http://localhost/channels-stats'
# All Channels Stats detailed (json format)
curl -s -v "http://localhost/channels-stats?id=ALL"
curl -s -v 'http://localhost/channels-stats?id=ALL'
# Prefixed Channels Stats detailed (json format)
curl -s -v 'http://localhost/channels-stats?id=your_channel_*'
# Channels Stats (json format)
curl -s -v "http://localhost/channels-stats?id=my_channel_1"
curl -s -v 'http://localhost/channels-stats?id=my_channel_1'
</code>
</pre>
......@@ -195,6 +200,7 @@ context: location
Defines a location as a source of statistics. You can use this location to get statistics about a specific channel, or about all channels, in a resumed ou summarized way.
To get statistics about all channels in a summarized way you have to make a GET in this location without specify a name in the push_stream_channel_id variable.
To get statistics about all channels in a detailed way you have to specify "ALL" in the push_stream_channel_id.
To get statistics about prefixed channels in a detailed way you have to specify "_prefix_*" in the push_stream_channel_id.
To get statistics about a channel you have to specify the name in the push_stream_channel_id.
You can get statistics in the formats plain, xml, yaml and json. The default is json, to change this behavior you can use *Accept* header parameter passing values like "text/plain", "application/xml", "application/yaml" and "application/json" respectivelly.
......@@ -208,6 +214,7 @@ You can get statistics in the formats plain, xml, yaml and json. The default is
# /channels-stats -> get statistics about all channels in a summarized way
# /channels-stats?id=ALL -> get statistics about all channels in a detailed way
# /channels-stats?id=channel_* -> get statistics about all channels which starts with 'channel_'
# /channels-stats?id=channel_id -> get statistics about a channel
</code>
</pre>
......
......@@ -180,7 +180,7 @@ ngx_http_push_stream_main_conf_t *ngx_http_push_stream_module_main_conf = NULL;
static ngx_str_t * ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf);
static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix);
static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template);
......
......@@ -189,7 +189,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
}
static void
ngx_http_push_stream_rbtree_walker_channel_info_locked(ngx_rbtree_t *tree, ngx_pool_t *pool, ngx_rbtree_node_t *node, ngx_queue_t *queue_channel_info)
ngx_http_push_stream_rbtree_walker_channel_info_locked(ngx_rbtree_t *tree, ngx_pool_t *pool, ngx_rbtree_node_t *node, ngx_queue_t *queue_channel_info, ngx_str_t *prefix)
{
ngx_rbtree_node_t *sentinel = tree->sentinel;
......@@ -198,6 +198,8 @@ ngx_http_push_stream_rbtree_walker_channel_info_locked(ngx_rbtree_t *tree, ngx_p
ngx_http_push_stream_channel_t *channel = (ngx_http_push_stream_channel_t *) node;
ngx_http_push_stream_channel_info_t *channel_info;
if(!prefix || (ngx_strncmp(channel->id.data, prefix->data, prefix->len) == 0)) {
if ((channel_info = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_channel_info_t))) == NULL) {
return;
}
......@@ -209,19 +211,20 @@ ngx_http_push_stream_rbtree_walker_channel_info_locked(ngx_rbtree_t *tree, ngx_p
channel_info->subscribers = channel->subscribers;
ngx_queue_insert_tail(queue_channel_info, &channel_info->queue);
}
if (node->left != NULL) {
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, pool, node->left, queue_channel_info);
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, pool, node->left, queue_channel_info, prefix);
}
if (node->right != NULL) {
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, pool, node->right, queue_channel_info);
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, pool, node->right, queue_channel_info, prefix);
}
}
}
static ngx_int_t
ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r) {
ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix) {
ngx_int_t rc, content_len = 0;
ngx_chain_t *chain, *first = NULL, *last = NULL;
ngx_str_t *currenttime, *hostname;
......@@ -239,7 +242,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, r->pool, data->tree.root, &queue_channel_info);
ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, r->pool, data->tree.root, &queue_channel_info, prefix);
ngx_shmtx_unlock(&shpool->mutex);
// format content body
......
......@@ -88,8 +88,8 @@ ngx_http_push_stream_publisher_handle_post(ngx_http_push_stream_loc_conf_t *cf,
ngx_int_t rc;
ngx_http_push_stream_channel_t *channel = NULL;
// check if channel id isn't equals to ALL
if (ngx_memn2cmp(id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
// check if channel id isn't equals to ALL or contain wildcard
if ((ngx_memn2cmp(id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(id->data, '*') != NULL)) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE);
}
......@@ -229,6 +229,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
static ngx_int_t
ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
{
char *pos = NULL;
ngx_str_t *id = NULL;
ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
......@@ -255,9 +256,19 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_response_all_channels_info_summarized(r);
}
if ((pos = ngx_strchr(id->data, '*')) != NULL) {
ngx_str_t *aux = NULL;
if (pos != (char *) id->data) {
*pos = '\0';
id->len = ngx_strlen(id->data);
aux = id;
}
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, aux);
}
// if specify a channel id equals to ALL, get info about all channels in a detailed way
if (ngx_memn2cmp(id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
return ngx_http_push_stream_send_response_all_channels_info_detailed(r);
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, NULL);
}
// if specify a channel id != ALL, get info about specified channel if it exists
......
......@@ -82,8 +82,8 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
// could not be ALL channel
if (ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
// could not be ALL channel or contain wildcard
if ((ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(cur->id->data, '*') != NULL)) {
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE);
}
......
......@@ -385,4 +385,164 @@ class TestChannelStatistics < Test::Unit::TestCase
}
end
def test_get_detailed_channels_statistics_whithout_created_channels_using_prefix
headers = {'accept' => 'application/json'}
EventMachine.run {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=prefix_*').get :head => headers, :timeout => 30
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_2.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_2.response)
assert_equal(0, response["infos"].length, "Received info whithout_created_channels")
EventMachine.stop
}
}
end
def test_get_detailed_channels_statistics_to_existing_channel_using_prefix
headers = {'accept' => 'application/json'}
channel = 'ch_test_get_detailed_channels_statistics_to_existing_channel_using_prefix'
channel_1 = 'another_ch_test_get_detailed_channels_statistics_to_existing_channel_using_prefix'
body = 'body'
#create channels
publish_message(channel, headers, body)
publish_message(channel_1, headers, body)
EventMachine.run {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ch_test_*').get :head => headers, :timeout => 30
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_2.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_2.response)
assert_equal(1, response["infos"].length, "Didn't received info about the only created channel")
assert_equal(channel, response["infos"][0]["channel"].to_s, "Channel was not recognized")
assert_equal(1, response["infos"][0]["published_messages"].to_i, "Message was not published")
assert_equal(1, response["infos"][0]["stored_messages"].to_i, "Message was not stored")
assert_equal(0, response["infos"][0]["subscribers"].to_i, "Wrong number for subscribers")
EventMachine.stop
}
}
end
def test_get_detailed_channels_statistics_using_prefix_as_same_behavior_ALL
headers = {'accept' => 'application/json'}
channel = 'ch_test_get_detailed_channels_statistics_using_prefix_as_same_behavior_ALL'
channel_1 = 'another_ch_test_get_detailed_channels_statistics_using_prefix_as_same_behavior_ALL'
body = 'body'
#create channels
publish_message(channel, headers, body)
publish_message(channel_1, headers, body)
EventMachine.run {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=*').get :head => headers, :timeout => 30
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_2.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_2.response)
assert_equal(2, response["infos"].length, "Didn't received info about the only created channel")
assert_equal(channel, response["infos"][0]["channel"].to_s, "Channel was not recognized")
assert_equal(1, response["infos"][0]["published_messages"].to_i, "Message was not published")
assert_equal(1, response["infos"][0]["stored_messages"].to_i, "Message was not stored")
assert_equal(0, response["infos"][0]["subscribers"].to_i, "Wrong number for subscribers")
assert_equal(channel_1, response["infos"][1]["channel"].to_s, "Channel was not recognized")
assert_equal(1, response["infos"][1]["published_messages"].to_i, "Message was not published")
assert_equal(1, response["infos"][1]["stored_messages"].to_i, "Message was not stored")
assert_equal(0, response["infos"][1]["subscribers"].to_i, "Wrong number for subscribers")
EventMachine.stop
}
}
end
def config_test_get_detailed_channels_statistics_to_existing_broadcast_channel_using_prefix
@broadcast_channel_prefix = 'bd_'
@broadcast_channel_max_qtd = 1
end
def test_get_detailed_channels_statistics_to_existing_broadcast_channel_using_prefix
headers = {'accept' => 'application/json'}
channel = 'bd_test_get_detailed_channels_statistics_to_existing_broadcast_channel_using_prefix'
body = 'body'
#create channel
publish_message(channel, headers, body)
EventMachine.run {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=bd_test_*').get :head => headers, :timeout => 30
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_2.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_2.response)
assert_equal(1, response["infos"].length, "Didn't received info about the only created channel")
assert_equal(0, response["channels"].to_i, "Channel was not recognized")
assert_equal(1, response["broadcast_channels"].to_i, "Channel was not recognized")
assert_equal(channel, response["infos"][0]["channel"].to_s, "Channel was not recognized")
assert_equal(1, response["infos"][0]["published_messages"].to_i, "Message was not published")
assert_equal(1, response["infos"][0]["stored_messages"].to_i, "Message was not stored")
assert_equal(0, response["infos"][0]["subscribers"].to_i, "Wrong number for subscribers")
EventMachine.stop
}
}
end
def test_detailed_channels_statistics_to_existing_channel_with_subscriber_using_prefix
headers = {'accept' => 'application/json'}
channel = 'ch_test_detailed_channels_statistics_to_existing_channel_with_subscriber_using_prefix'
body = 'body'
create_channel_by_subscribe(channel, headers) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ch_test_*').get :head => headers, :timeout => 30
pub_1.callback {
assert_equal(200, pub_1.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_1.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_1.response)
assert_equal(1, response["infos"].length, "Didn't received info about the only created channel")
assert_equal(channel, response["infos"][0]["channel"].to_s, "Channel was not recognized")
assert_equal(0, response["infos"][0]["published_messages"].to_i, "Wrong number for published messages")
assert_equal(0, response["infos"][0]["stored_messages"].to_i, "Wrong number for stored messages")
assert_equal(1, response["infos"][0]["subscribers"].to_i, "Wrong number for subscribers")
EventMachine.stop
}
end
end
def config_test_get_detailed_channels_statistics_to_many_channels_using_prefix
@max_reserved_memory = '200m'
end
def test_get_detailed_channels_statistics_to_many_channels_using_prefix
headers = {'accept' => 'application/json'}
channel = 'ch_test_get_detailed_channels_statistics_to_many_channels_using_prefix_'
body = 'body'
number_of_channels = 20000
#create channels
0.step(number_of_channels - 1, 10) do |i|
EventMachine.run {
publish_message_inline("#{channel}#{i + 1}", headers, body)
publish_message_inline("#{channel}#{i + 2}", headers, body)
publish_message_inline("#{channel}#{i + 3}", headers, body)
publish_message_inline("#{channel}#{i + 4}", headers, body)
publish_message_inline("#{channel}#{i + 5}", headers, body)
publish_message_inline("#{channel}#{i + 6}", headers, body)
publish_message_inline("#{channel}#{i + 7}", headers, body)
publish_message_inline("#{channel}#{i + 8}", headers, body)
publish_message_inline("#{channel}#{i + 9}", headers, body)
publish_message_inline("#{channel}#{i + 10}", headers, body) { EventMachine.stop }
}
end
EventMachine.run {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ch_test_*').get :head => headers, :timeout => 30
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_2.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_2.response)
assert_equal(number_of_channels, response["infos"].length, "Didn't received info about the created channels")
EventMachine.stop
}
}
end
end
......@@ -125,6 +125,32 @@ class TestPublisher < Test::Unit::TestCase
}
end
def test_cannot_create_a_channel_with_id_containing_wildcard
headers = {'accept' => 'application/json'}
body = 'body'
channel_1 = 'abcd*efgh'
channel_2 = '*abcdefgh'
channel_3 = 'abcdefgh*'
EventMachine.run {
multi = EventMachine::MultiRequest.new
multi.add(EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_1).post :head => headers, :body => body, :timeout => 30)
multi.add(EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_2).post :head => headers, :body => body, :timeout => 30)
multi.add(EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_3).post :head => headers, :body => body, :timeout => 30)
multi.callback {
assert_equal(3, multi.responses[:succeeded].length)
0.upto(2) do |i|
assert_equal(403, multi.responses[:succeeded][i].response_header.status, "Channel was created")
assert_equal(0, multi.responses[:succeeded][i].response_header.content_length, "Received response for creating channel with id containing wildcard")
assert_equal("Channel id not authorized for this method.", multi.responses[:succeeded][i].response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
end
EventMachine.stop
}
}
end
def config_test_post_message_larger_than_max_body_size_should_be_rejected
@client_max_body_size = '2k'
@client_body_buffer_size = '1k'
......
......@@ -128,6 +128,32 @@ class TestPublisherAdmin < Test::Unit::TestCase
}
end
def test_cannot_create_a_channel_with_id_containing_wildcard
headers = {'accept' => 'application/json'}
body = 'body'
channel_1 = 'abcd*efgh'
channel_2 = '*abcdefgh'
channel_3 = 'abcdefgh*'
EventMachine.run {
multi = EventMachine::MultiRequest.new
multi.add(EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_1).post :head => headers, :body => body, :timeout => 30)
multi.add(EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_2).post :head => headers, :body => body, :timeout => 30)
multi.add(EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel_3).post :head => headers, :body => body, :timeout => 30)
multi.callback {
assert_equal(3, multi.responses[:succeeded].length)
0.upto(2) do |i|
assert_equal(403, multi.responses[:succeeded][i].response_header.status, "Channel was created")
assert_equal(0, multi.responses[:succeeded][i].response_header.content_length, "Received response for creating channel with id containing wildcard")
assert_equal("Channel id not authorized for this method.", multi.responses[:succeeded][i].response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
end
EventMachine.stop
}
}
end
def config_test_admin_post_message_larger_than_max_body_size_should_be_rejected
@client_max_body_size = '2k'
@client_body_buffer_size = '1k'
......
......@@ -119,6 +119,36 @@ class TestPublisher < Test::Unit::TestCase
}
end
def test_cannot_access_a_channel_with_id_containing_wildcard
headers = {'accept' => 'application/json'}
channel_1 = 'abcd*efgh'
channel_2 = '*abcdefgh'
channel_3 = 'abcdefgh*'
EventMachine.run {
multi = EventMachine::MultiRequest.new
multi.add(EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1).get :head => headers, :timeout => 30)
multi.add(EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_2).get :head => headers, :timeout => 30)
multi.add(EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_3).get :head => headers, :timeout => 30)
multi.callback {
assert_equal(3, multi.responses[:succeeded].length)
0.upto(2) do |i|
assert_equal(403, multi.responses[:succeeded][i].response_header.status, "Channel was created")
assert_equal(0, multi.responses[:succeeded][i].response_header.content_length, "Received response for creating channel with id containing wildcard")
assert_equal("Channel id not authorized for this method.", multi.responses[:succeeded][i].response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
end
EventMachine.stop
}
EM.add_timer(5) do
fail("Subscribers didn't disconnect")
EventMachine.stop
end
}
end
def config_test_broadcast_channels_without_common_channel
@subscriber_connection_timeout = '1s'
@broadcast_channel_prefix = "bd_"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment