Commit 65098bb9 authored by Wandenberg's avatar Wandenberg

make possible get statistics for some channels at once specifying their ids

parent ae39453b
......@@ -179,6 +179,7 @@ typedef struct {
ngx_chain_t *busy;
ngx_http_push_stream_padding_t *padding;
ngx_str_t *callback;
ngx_http_push_stream_requested_channel_t *requested_channels;
} ngx_http_push_stream_module_ctx_t;
// messages to worker processes
......@@ -230,6 +231,7 @@ static ngx_str_t * ngx_http_push_stream_get_channel_id(ngx_http_request_t *
static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix);
static ngx_int_t ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channels);
static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket);
......
......@@ -679,6 +679,88 @@ shared_examples_for "statistics location" do
end
end
end
context "when sending multiple channels ids" do
it "should return detailed channels statistics for existent channels" do
body = 'body'
actual_response = ''
nginx_run_server(config) do |conf|
#create channels
publish_message("ch1", headers, body)
publish_message("ch2", headers, body)
publish_message("ch3", headers, body)
EventMachine.run do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ch3/ch1').get :head => headers, :decoding => false
pub_2.stream do |chunk|
actual_response << chunk
end
pub_2.callback do
pub_2.should be_http_status(200)
if (conf.gzip == "on")
pub_2.response_header["CONTENT_ENCODING"].should eql("gzip")
actual_response = Zlib::GzipReader.new(StringIO.new(actual_response)).read
end
response = JSON.parse(actual_response)
response["infos"].length.should eql(2)
response["infos"][0]["channel"].to_s.should eql("ch3")
response["infos"][0]["published_messages"].to_i.should eql(1)
response["infos"][0]["stored_messages"].to_i.should eql(1)
response["infos"][0]["subscribers"].to_i.should eql(0)
response["infos"][1]["channel"].to_s.should eql("ch1")
response["infos"][1]["published_messages"].to_i.should eql(1)
response["infos"][1]["stored_messages"].to_i.should eql(1)
response["infos"][1]["subscribers"].to_i.should eql(0)
EventMachine.stop
end
end
end
end
it "should return detailed channels statistics for unexistent channel" do
body = 'body'
actual_response = ''
nginx_run_server(config) do |conf|
#create channels
publish_message("ch1", headers, body)
publish_message("ch2", headers, body)
publish_message("ch3", headers, body)
EventMachine.run do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ch3/ch4/ch2').get :head => headers, :decoding => false
pub_2.stream do |chunk|
actual_response << chunk
end
pub_2.callback do
pub_2.should be_http_status(200)
if (conf.gzip == "on")
pub_2.response_header["CONTENT_ENCODING"].should eql("gzip")
actual_response = Zlib::GzipReader.new(StringIO.new(actual_response)).read
end
response = JSON.parse(actual_response)
response["infos"].length.should eql(2)
response["infos"][0]["channel"].to_s.should eql("ch3")
response["infos"][0]["published_messages"].to_i.should eql(1)
response["infos"][0]["stored_messages"].to_i.should eql(1)
response["infos"][0]["subscribers"].to_i.should eql(0)
response["infos"][1]["channel"].to_s.should eql("ch2")
response["infos"][1]["published_messages"].to_i.should eql(1)
response["infos"][1]["stored_messages"].to_i.should eql(1)
response["infos"][1]["subscribers"].to_i.should eql(0)
EventMachine.stop
end
end
end
end
end
end
context "when getting statistics" do
......
......@@ -53,6 +53,7 @@ module NginxConfiguration
:channel_inactivity_time => nil,
:channel_id => '$arg_id',
:channels_path_for_pub => '$arg_id',
:channels_path => '$1',
:extra_location => ''
......@@ -152,8 +153,7 @@ http {
# activate channels statistics mode for this location
push_stream_channels_statistics;
# query string based channel id
<%= write_directive("push_stream_channel_id", channel_id) %>
<%= write_directive("push_stream_channels_path", channels_path_for_pub) %>
}
location /pub {
......
......@@ -189,29 +189,21 @@ ngx_http_push_stream_rbtree_walker_channel_info_locked(ngx_rbtree_t *tree, ngx_p
}
static ngx_int_t
ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix) {
ngx_http_push_stream_send_response_channels_info(ngx_http_request_t *r, ngx_queue_t *queue_channel_info) {
ngx_int_t rc, content_len = 0;
ngx_chain_t *chain, *first = NULL, *last = NULL;
ngx_str_t *currenttime, *hostname, *text, *header_response;
ngx_queue_t queue_channel_info;
ngx_queue_t *cur, *next;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
const ngx_str_t *format;
const ngx_str_t *head = subtype->format_group_head;
const ngx_str_t *tail = subtype->format_group_tail;
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, r->pool, data->tree.root, &queue_channel_info, prefix);
ngx_shmtx_unlock(&shpool->mutex);
// format content body
cur = ngx_queue_head(&queue_channel_info);
while (cur != &queue_channel_info) {
cur = ngx_queue_head(queue_channel_info);
while (cur != queue_channel_info) {
next = ngx_queue_next(cur);
ngx_http_push_stream_channel_info_t *channel_info = (ngx_http_push_stream_channel_info_t *) cur;
if ((chain = ngx_http_push_stream_get_buf(r)) == NULL) {
......@@ -219,7 +211,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
format = (next != &queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item;
format = (next != queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item;
if ((text = ngx_http_push_stream_channel_info_formatted(r->pool, format, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to format channel info");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
......@@ -283,6 +275,72 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
return ngx_http_push_stream_send_response_text(r, tail->data, tail->len, 1);
}
static ngx_int_t
ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix) {
ngx_queue_t queue_channel_info;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_rbtree_walker_channel_info_locked(&data->tree, r->pool, data->tree.root, &queue_channel_info, prefix);
ngx_shmtx_unlock(&shpool->mutex);
return ngx_http_push_stream_send_response_channels_info(r, &queue_channel_info);
}
static ngx_int_t
ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channels) {
ngx_str_t *text;
ngx_queue_t queue_channel_info;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
ngx_http_push_stream_channel_info_t *channel_info;
ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *cur = &requested_channels->queue;
ngx_uint_t qtd_channels = 0;
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
while ((cur = ngx_queue_next(cur)) != &requested_channels->queue) {
requested_channel = ngx_queue_data(cur, ngx_http_push_stream_requested_channel_t, queue);
// search for a existing channel with this id
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log);
if ((channel != NULL) && ((channel_info = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_channel_info_t))) != NULL)) {
channel_info->id.data = channel->id.data;
channel_info->id.len = channel->id.len;
channel_info->published_messages = channel->last_message_id;
channel_info->stored_messages = channel->stored_messages;
channel_info->subscribers = channel->subscribers;
ngx_queue_insert_tail(&queue_channel_info, &channel_info->queue);
qtd_channels++;
}
}
ngx_shmtx_unlock(&shpool->mutex);
if (qtd_channels == 0) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_FOUND, NULL);
}
if (qtd_channels == 1) {
channel_info = ngx_queue_data(ngx_queue_head(&queue_channel_info), ngx_http_push_stream_channel_info_t, queue);
text = ngx_http_push_stream_channel_info_formatted(r->pool, subtype->format_item, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers);
if (text == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK);
}
return ngx_http_push_stream_send_response_channels_info(r, &queue_channel_info);
}
static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket) {
ngx_http_push_stream_template_queue_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
......
......@@ -255,9 +255,8 @@ static ngx_int_t
ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
{
char *pos = NULL;
ngx_str_t *id = NULL;
ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_set_expires(r, NGX_HTTP_PUSH_STREAM_EXPIRES_EPOCH, 0);
......@@ -270,42 +269,38 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TAG, &NGX_HTTP_PUSH_STREAM_TAG);
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_COMMIT, &NGX_HTTP_PUSH_STREAM_COMMIT);
// get and check channel id value
id = ngx_http_push_stream_get_channel_id(r, cf);
if ((id == NULL) || (id == NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID)) {
if (id == NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
}
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
//get channels ids
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
// if not specify a channel id, get info about all channels in a resumed way
if (id == NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID) {
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
return ngx_http_push_stream_send_response_all_channels_info_summarized(r);
}
if ((pos = ngx_strchr(id->data, '*')) != NULL) {
ngx_str_t *aux = NULL;
if (pos != (char *) id->data) {
*pos = '\0';
id->len = ngx_strlen(id->data);
aux = id;
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
// could not have a large size
if ((ngx_http_push_stream_module_main_conf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > ngx_http_push_stream_module_main_conf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", cur->id->len);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
}
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, aux);
}
// if specify a channel id equals to ALL, get info about all channels in a detailed way
if (ngx_memn2cmp(id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, NULL);
}
// if specify a channel id != ALL, get info about specified channel if it exists
// search for a existing channel with this id
channel = ngx_http_push_stream_find_channel(id, r->connection->log);
if ((pos = ngx_strchr(cur->id->data, '*')) != NULL) {
ngx_str_t *aux = NULL;
if (pos != (char *) cur->id->data) {
*pos = '\0';
cur->id->len = ngx_strlen(cur->id->data);
aux = cur->id;
}
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, aux);
}
if (channel == NULL) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_FOUND, NULL);
// if specify a channel id equals to ALL, get info about all channels in a detailed way
if (ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, NULL);
}
}
return ngx_http_push_stream_send_response_channel_info(r, channel);
// if specify a channels ids != ALL, get info about specified channels if they exists
return ngx_http_push_stream_send_response_channels_info_detailed(r, channels_ids);
}
......@@ -584,8 +584,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
return NGX_CONF_OK;
}
if ((conf->location_type == NGX_HTTP_PUSH_STREAM_STATISTICS_MODE) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_NORMAL) ||
if ((conf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_NORMAL) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN)) {
if (conf->channel_id == NULL) {
......
......@@ -1133,6 +1133,7 @@ ngx_http_push_stream_add_request_context(ngx_http_request_t *r)
ctx->longpolling = 0;
ctx->padding = NULL;
ctx->callback = NULL;
ctx->requested_channels = NULL;
// set a cleaner to request
cln->handler = (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment