Commit 8a23634a authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

add the number of stored messages and the number of channels and messages in...

add the number of stored messages and the number of channels and messages in the trash to summarized channels statistics
parent 79f5f224
......@@ -205,11 +205,14 @@ typedef struct {
ngx_uint_t channels; // # of channels being used
ngx_uint_t broadcast_channels; // # of broadcast channels being used
ngx_uint_t published_messages; // # of published messagens in all channels
ngx_uint_t stored_messages; // # of messages being stored
ngx_uint_t subscribers; // # of subscribers in all channels
ngx_queue_t messages_trash;
ngx_queue_t channels_queue;
ngx_queue_t channels_trash;
ngx_queue_t channels_to_delete;
ngx_uint_t channels_in_trash; // # of channels in trash queue
ngx_uint_t messages_in_trash; // # of messages in trash queue
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup;
time_t last_message_time;
......
......@@ -56,7 +56,7 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_PLAIN = ngx_strin
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_PLAIN = ngx_string(CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN "," CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_PLAIN = ngx_string("hostname: %s" CRLF "time: %s" CRLF "channels: %ui" CRLF "broadcast_channels: %ui" CRLF "published_messages: %ui" CRLF "subscribers: %ui" CRLF "uptime: %ui" CRLF "by_worker:"CRLF"%s" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_PLAIN = ngx_string("hostname: %s" CRLF "time: %s" CRLF "channels: %ui" CRLF "broadcast_channels: %ui" CRLF "published_messages: %ui" CRLF "stored_messages: %ui" CRLF "messages_in_trash: %ui" CRLF "channels_in_trash: %ui" CRLF "subscribers: %ui" CRLF "uptime: %ui" CRLF "by_worker:"CRLF"%s" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_PLAIN_PATTERN "," CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_PLAIN_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_PLAIN = ngx_string("text/plain");
......@@ -69,7 +69,7 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_JSON = ngx_string
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_JSON = ngx_string("]}" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN "," CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_JSON = ngx_string("{\"hostname\": \"%s\", \"time\": \"%s\", \"channels\": \"%ui\", \"broadcast_channels\": \"%ui\", \"published_messages\": \"%ui\", \"subscribers\": \"%ui\", \"uptime\": \"%ui\", \"by_worker\": [" CRLF "%s" CRLF"]}" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_JSON = ngx_string("{\"hostname\": \"%s\", \"time\": \"%s\", \"channels\": \"%ui\", \"broadcast_channels\": \"%ui\", \"published_messages\": \"%ui\", \"stored_messages\": \"%ui\", \"messages_in_trash\": \"%ui\", \"channels_in_trash\": \"%ui\", \"subscribers\": \"%ui\", \"uptime\": \"%ui\", \"by_worker\": [" CRLF "%s" CRLF"]}" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_JSON_PATTERN "," CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_JSON_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_JSON = ngx_string("application/json");
......@@ -82,7 +82,7 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_YAML = ngx_string
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_YAML = ngx_string(CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_YAML = ngx_string(" hostname: %s" CRLF" time: %s" CRLF" channels: %ui" CRLF" broadcast_channels: %ui" CRLF" published_messages: %ui" CRLF" subscribers: %ui" CRLF" uptime: %ui" CRLF" by_worker:"CRLF"%s" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_YAML = ngx_string(" hostname: %s" CRLF" time: %s" CRLF" channels: %ui" CRLF" broadcast_channels: %ui" CRLF" published_messages: %ui" CRLF" stored_messages: %ui" CRLF" messages_in_trash: %ui" CRLF" channels_in_trash: %ui" CRLF" subscribers: %ui" CRLF" uptime: %ui" CRLF" by_worker:"CRLF"%s" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_WORKER_INFO_YAML_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_WORKER_INFO_YAML_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_YAML = ngx_string("application/yaml");
......@@ -115,6 +115,9 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_XML = ngx_string
" <channels>%ui</channels>" CRLF \
" <broadcast_channels>%ui</broadcast_channels>" CRLF \
" <published_messages>%ui</published_messages>" CRLF \
" <stored_messages>%ui</stored_messages>" CRLF \
" <messages_in_trash>%ui</messages_in_trash>" CRLF \
" <channels_in_trash>%ui</channels_in_trash>" CRLF \
" <subscribers>%ui</subscribers>" CRLF \
" <uptime>%ui</uptime>" CRLF \
" <by_worker>%s</by_worker>" CRLF \
......
......@@ -563,4 +563,74 @@ describe "Channel Statistics" do
end
end
end
it "should return the number of messages in the trash in summarized channels statistics" do
channel = 'ch_test_get_messages_in_trash_in_summarized_channels_statistics'
body = 'body'
nginx_run_server(config.merge(:message_ttl => '1s'), :timeout => 15) do |conf|
#create channel
publish_message(channel, headers, body)
EventMachine.run do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_2.callback do
pub_2.should be_http_status(200).with_body
response = JSON.parse(pub_2.response)
response["stored_messages"].to_i.should eql(1)
response["messages_in_trash"].to_i.should eql(0)
sleep(5)
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_3.callback do
pub_3.should be_http_status(200).with_body
response = JSON.parse(pub_3.response)
response["stored_messages"].to_i.should eql(0)
response["messages_in_trash"].to_i.should eql(1)
EventMachine.stop
end
end
end
end
end
it "should return the number of channels in the trash in summarized channels statistics" do
channel = 'ch_test_get_channels_in_trash_in_summarized_channels_statistics'
body = 'body'
nginx_run_server(config.merge(:publisher_mode => 'admin', :broadcast_channel_prefix => 'bd_', :broadcast_channel_max_qtd => 1), :timeout => 55) do |conf|
#create channel
publish_message(channel, headers, body)
publish_message("#{channel}_1", headers, body)
publish_message("bd_#{channel}_1", headers, body)
EventMachine.run do
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_2.callback do
pub_2.should be_http_status(200).with_body
response = JSON.parse(pub_2.response)
response["channels"].to_i.should eql(2)
response["broadcast_channels"].to_i.should eql(1)
response["channels_in_trash"].to_i.should eql(0)
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).delete :head => headers
pub.callback do
pub.should be_http_status(200).without_body
sleep(2)
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_3.callback do
pub_3.should be_http_status(200).with_body
response = JSON.parse(pub_3.response)
response["channels"].to_i.should eql(1)
response["broadcast_channels"].to_i.should eql(1)
response["channels_in_trash"].to_i.should eql(1)
EventMachine.stop
end
end
end
end
end
end
end
......@@ -44,7 +44,7 @@ describe "Keepalive" do
headers, body = get_in_socket("/channels-stats", socket)
body.should match_the_pattern(/"channels": "1", "broadcast_channels": "0", "published_messages": "1", "subscribers": "0", "uptime": "[0-9]*", "by_worker": \[\r\n/)
body.should match_the_pattern(/"channels": "1", "broadcast_channels": "0", "published_messages": "1", "stored_messages": "1", "messages_in_trash": "0", "channels_in_trash": "0", "subscribers": "0", "uptime": "[0-9]*", "by_worker": \[\r\n/)
body.should match_the_pattern(/\{"pid": "[0-9]*", "subscribers": "0", "uptime": "[0-9]*"\}/)
headers, body = get_in_socket("/pub?id=#{channel}", socket)
......
......@@ -139,14 +139,14 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
}
*start = '\0';
len = 4*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf
len = 7*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf
if ((text = ngx_http_push_stream_create_str(r->pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, data->published_messages, data->subscribers, ngx_time() - data->startup, subscribers_by_workers);
ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, data->published_messages, data->stored_messages, data->messages_in_trash, data->channels_in_trash, data->subscribers, ngx_time() - data->startup, subscribers_by_workers);
text->len = ngx_strlen(text->data);
return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK);
......
......@@ -903,7 +903,6 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
return NGX_ERROR;
}
shm_zone->data = d;
ngx_queue_init(&d->messages_trash);
for (i = 0; i < NGX_MAX_PROCESSES; i++) {
d->ipc[i].pid = -1;
d->ipc[i].startup = 0;
......@@ -912,6 +911,13 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->ipc[i].subscribers_sentinel = NULL;
}
d->channels = 0;
d->broadcast_channels = 0;
d->published_messages = 0;
d->stored_messages = 0;
d->subscribers = 0;
d->channels_in_trash = 0;
d->messages_in_trash = 0;
d->startup = ngx_time();
d->last_message_time = 0;
d->last_message_tag = 0;
......@@ -922,6 +928,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
ngx_rbtree_init(&d->tree, sentinel, ngx_http_push_stream_rbtree_insert);
ngx_queue_init(&d->messages_trash);
ngx_queue_init(&d->channels_queue);
ngx_queue_init(&d->channels_to_delete);
ngx_queue_init(&d->channels_trash);
......
......@@ -33,6 +33,7 @@ static ngx_int_t ngx_http_push_stream_send_response_padding(ngx_http_reque
static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_msg_t *sentinel, *msg;
if (max_messages == NGX_CONF_UNSET_UINT) {
......@@ -49,6 +50,7 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
}
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->stored_messages);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->stored_messages);
channel->last_activity_time = ngx_time();
ngx_queue_remove(&msg->queue);
ngx_http_push_stream_mark_message_to_delete_locked(msg);
......@@ -135,6 +137,7 @@ ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_
ngx_queue_remove(&channel->queue);
ngx_queue_insert_tail(&data->channels_trash, &channel->queue);
channel->queue_sentinel = &data->channels_trash;
data->channels_in_trash++;
}
}
ngx_shmtx_unlock(&shpool->mutex);
......@@ -326,6 +329,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
if (cf->store_messages) {
ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue);
channel->stored_messages++;
data->stored_messages++;
// now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, ngx_http_push_stream_module_main_conf->max_messages_stored_per_channel, 0);
......@@ -690,8 +694,12 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
// send signal to each worker with subscriber to this channel
cur = &channel->workers_with_subscribers;
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != &channel->workers_with_subscribers) {
ngx_http_push_stream_alert_worker_delete_channel(cur->pid, cur->slot, ngx_cycle->log);
if (ngx_queue_empty(&channel->workers_with_subscribers.queue)) {
ngx_http_push_stream_alert_worker_delete_channel(ngx_pid, ngx_process_slot, ngx_cycle->log);
} else {
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != &channel->workers_with_subscribers) {
ngx_http_push_stream_alert_worker_delete_channel(cur->pid, cur->slot, ngx_cycle->log);
}
}
}
......@@ -729,6 +737,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
ngx_queue_remove(&channel->queue);
ngx_queue_insert_tail(&data->channels_trash, &channel->queue);
channel->queue_sentinel = &data->channels_trash;
data->channels_in_trash++;
}
ngx_shmtx_unlock(&shpool->mutex);
......@@ -767,6 +776,7 @@ ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_http_push_stream
if ((ngx_time() > channel->expires) || force) {
ngx_queue_remove(&channel->queue);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels_in_trash);
} else {
break;
}
......@@ -832,6 +842,7 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
if (force || ((message->workers_ref_count <= 0) && (ngx_time() > message->expires))) {
ngx_queue_remove(&message->queue);
ngx_http_push_stream_free_message_memory_locked(shpool, message);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->messages_in_trash);
} else {
break;
}
......@@ -892,6 +903,7 @@ ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *m
msg->deleted = 1;
msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
ngx_queue_insert_tail(&data->messages_trash, &msg->queue);
data->messages_in_trash++;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment