Commit 114d148e authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding uptime information for server and workers on statistics

parent 9666f079
...@@ -159,6 +159,7 @@ typedef struct { ...@@ -159,6 +159,7 @@ typedef struct {
ngx_http_push_stream_worker_msg_t *messages_queue; ngx_http_push_stream_worker_msg_t *messages_queue;
ngx_http_push_stream_worker_subscriber_t *worker_subscribers_sentinel; ngx_http_push_stream_worker_subscriber_t *worker_subscribers_sentinel;
ngx_uint_t subscribers; // # of subscribers in the worker ngx_uint_t subscribers; // # of subscribers in the worker
time_t startup;
pid_t pid; pid_t pid;
} ngx_http_push_stream_worker_data_t; } ngx_http_push_stream_worker_data_t;
...@@ -173,6 +174,7 @@ typedef struct { ...@@ -173,6 +174,7 @@ typedef struct {
ngx_rbtree_t channels_to_delete; ngx_rbtree_t channels_to_delete;
ngx_rbtree_t unrecoverable_channels; ngx_rbtree_t unrecoverable_channels;
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup;
} ngx_http_push_stream_shm_data_t; } ngx_http_push_stream_shm_data_t;
ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL; ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL;
......
...@@ -45,39 +45,39 @@ typedef struct { ...@@ -45,39 +45,39 @@ typedef struct {
#define NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN "channel: %s" CRLF"published_messages: %ui" CRLF"stored_messages: %ui" CRLF"active_subscribers: %ui" #define NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN "channel: %s" CRLF"published_messages: %ui" CRLF"stored_messages: %ui" CRLF"active_subscribers: %ui"
#define NGX_HTTP_PUSH_STREAM_WORKER_INFO_PLAIN_PATTERN " pid: %d" CRLF" subscribers: %ui" #define NGX_HTTP_PUSH_STREAM_WORKER_INFO_PLAIN_PATTERN " pid: %d" CRLF" subscribers: %ui" CRLF" uptime: %ui"
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_PLAIN = ngx_string("hostname: %s, time: %s, channels: %ui, broadcast_channels: %ui, infos: " CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_PLAIN = ngx_string("hostname: %s, time: %s, channels: %ui, broadcast_channels: %ui, uptime: %ui, infos: " CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_PLAIN = ngx_string(CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_PLAIN = ngx_string(CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN "," CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN "," CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_PLAIN = ngx_string("hostname: %s" CRLF "time: %s" CRLF "channels: %ui" CRLF "broadcast_channels: %ui" CRLF "published_messages: %ui" CRLF "subscribers: %ui" CRLF "by_worker:"CRLF"%s" CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_PLAIN = ngx_string("hostname: %s" CRLF "time: %s" CRLF "channels: %ui" CRLF "broadcast_channels: %ui" CRLF "published_messages: %ui" CRLF "subscribers: %ui" CRLF "uptime: %ui" CRLF "by_worker:"CRLF"%s" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_PLAIN_PATTERN "," CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_PLAIN_PATTERN "," CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_PLAIN_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_PLAIN = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_PLAIN_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_PLAIN = ngx_string("text/plain"); static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_PLAIN = ngx_string("text/plain");
#define NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN "{\"channel\": \"%s\", \"published_messages\": \"%ui\", \"stored_messages\": \"%ui\", \"subscribers\": \"%ui\"}" #define NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN "{\"channel\": \"%s\", \"published_messages\": \"%ui\", \"stored_messages\": \"%ui\", \"subscribers\": \"%ui\"}"
#define NGX_HTTP_PUSH_STREAM_WORKER_INFO_JSON_PATTERN "{\"pid\": \"%d\", \"subscribers\": \"%ui\"}" #define NGX_HTTP_PUSH_STREAM_WORKER_INFO_JSON_PATTERN "{\"pid\": \"%d\", \"subscribers\": \"%ui\", \"uptime\": \"%ui\"}"
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_JSON = ngx_string("{\"hostname\": \"%s\", \"time\": \"%s\", \"channels\": \"%ui\", \"broadcast_channels\": \"%ui\", \"infos\": [" CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_JSON = ngx_string("{\"hostname\": \"%s\", \"time\": \"%s\", \"channels\": \"%ui\", \"broadcast_channels\": \"%ui\", \"uptime\": \"%ui\", \"infos\": [" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_JSON = ngx_string("]}" CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_JSON = ngx_string("]}" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN "," CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN "," CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_JSON = ngx_string("{\"hostname\": \"%s\", \"time\": \"%s\", \"channels\": \"%ui\", \"broadcast_channels\": \"%ui\", \"published_messages\": \"%ui\", \"subscribers\": \"%ui\", \"by_worker\": [" CRLF "%s" CRLF"]}" CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_JSON = ngx_string("{\"hostname\": \"%s\", \"time\": \"%s\", \"channels\": \"%ui\", \"broadcast_channels\": \"%ui\", \"published_messages\": \"%ui\", \"subscribers\": \"%ui\", \"uptime\": \"%ui\", \"by_worker\": [" CRLF "%s" CRLF"]}" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_JSON_PATTERN "," CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_JSON_PATTERN "," CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_JSON_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_JSON = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_JSON_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_JSON = ngx_string("application/json"); static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_JSON = ngx_string("application/json");
static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_X_JSON = ngx_string("text/x-json"); static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_X_JSON = ngx_string("text/x-json");
#define NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN " channel: %s" CRLF" published_messages: %ui" CRLF" stored_messages: %ui" CRLF" subscribers: %ui" #define NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN " channel: %s" CRLF" published_messages: %ui" CRLF" stored_messages: %ui" CRLF" subscribers: %ui"
#define NGX_HTTP_PUSH_STREAM_WORKER_INFO_YAML_PATTERN " pid: %d" CRLF" subscribers: %ui" #define NGX_HTTP_PUSH_STREAM_WORKER_INFO_YAML_PATTERN " pid: %d" CRLF" subscribers: %ui" CRLF" uptime: %ui"
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_YAML = ngx_string("hostname: %s" CRLF"time: %s" CRLF"channels: %ui" CRLF"broadcast_channels: %ui" CRLF"infos: "CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_YAML = ngx_string("hostname: %s" CRLF"time: %s" CRLF"channels: %ui" CRLF"broadcast_channels: %ui" CRLF"uptime: %ui" CRLF"infos: "CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_YAML = ngx_string(CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_YAML = ngx_string(CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_YAML = ngx_string(" hostname: %s" CRLF" time: %s" CRLF" channels: %ui" CRLF" broadcast_channels: %ui" CRLF" published_messages: %ui" CRLF" subscribers: %ui" CRLF " by_worker:"CRLF"%s" CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_YAML = ngx_string(" hostname: %s" CRLF" time: %s" CRLF" channels: %ui" CRLF" broadcast_channels: %ui" CRLF" published_messages: %ui" CRLF" subscribers: %ui" CRLF" uptime: %ui" CRLF" by_worker:"CRLF"%s" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_WORKER_INFO_YAML_PATTERN CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_WORKER_INFO_YAML_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_WORKER_INFO_YAML_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_YAML = ngx_string(" -" CRLF NGX_HTTP_PUSH_STREAM_WORKER_INFO_YAML_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_YAML = ngx_string("application/yaml"); static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_YAML = ngx_string("application/yaml");
...@@ -95,9 +95,10 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_X_YAML = ngx_string("text/x- ...@@ -95,9 +95,10 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_X_YAML = ngx_string("text/x-
"<worker>" CRLF \ "<worker>" CRLF \
" <pid>%d</pid>" CRLF \ " <pid>%d</pid>" CRLF \
" <subscribers>%ui</subscribers>" CRLF \ " <subscribers>%ui</subscribers>" CRLF \
" <uptime>%ui</uptime>" CRLF \
"</worker>" CRLF "</worker>" CRLF
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML = ngx_string("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML_PATTERN CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML = ngx_string("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML_PATTERN CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_XML = ngx_string("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF "<root>" CRLF" <hostname>%s</hostname>" CRLF" <time>%s</time>" CRLF" <channels>%ui</channels>" CRLF" <broadcast_channels>%ui</broadcast_channels>" CRLF" <infos>" CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_XML = ngx_string("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF "<root>" CRLF" <hostname>%s</hostname>" CRLF" <time>%s</time>" CRLF" <channels>%ui</channels>" CRLF" <broadcast_channels>%ui</broadcast_channels>" CRLF" <uptime>%ui</uptime>" CRLF" <infos>" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_XML = ngx_string(" </infos>" CRLF"</root>" CRLF); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_XML = ngx_string(" </infos>" CRLF"</root>" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_XML = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_ITEM_XML = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_XML = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_LAST_ITEM_XML = ngx_string(NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML_PATTERN);
...@@ -109,8 +110,9 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_XML = ngx_string ...@@ -109,8 +110,9 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_XML = ngx_string
" <channels>%ui</channels>" CRLF \ " <channels>%ui</channels>" CRLF \
" <broadcast_channels>%ui</broadcast_channels>" CRLF \ " <broadcast_channels>%ui</broadcast_channels>" CRLF \
" <published_messages>%ui</published_messages>" CRLF \ " <published_messages>%ui</published_messages>" CRLF \
" <subscribers>%ui</subscribers>" CRLF\ " <subscribers>%ui</subscribers>" CRLF \
" <by_worker>%s</by_worker>" CRLF\ " <uptime>%ui</uptime>" CRLF \
" <by_worker>%s</by_worker>" CRLF \
"</infos>" CRLF); "</infos>" CRLF);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_XML = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_XML_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_ITEM_XML = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_XML_PATTERN);
static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_XML = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_XML_PATTERN); static ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNELS_INFO_SUMMARIZED_WORKER_LAST_ITEM_XML = ngx_string(NGX_HTTP_PUSH_STREAM_WORKER_INFO_XML_PATTERN);
......
...@@ -159,7 +159,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -159,7 +159,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
} }
len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len; len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len;
len = used_slots * (2*NGX_INT_T_LEN + len - 5); //minus 5 sprintf len = used_slots * (3*NGX_INT_T_LEN + len - 8); //minus 8 sprintf
if ((subscribers_by_workers = ngx_pcalloc(r->pool, len)) == NULL) { if ((subscribers_by_workers = ngx_pcalloc(r->pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate memory to write workers statistics."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate memory to write workers statistics.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
...@@ -170,12 +170,12 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -170,12 +170,12 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
worker_data = data->ipc + j; worker_data = data->ipc + j;
if (worker_data->pid > 0) { if (worker_data->pid > 0) {
format = (i < used_slots - 1) ? subtype->format_summarized_worker_item : subtype->format_summarized_worker_last_item; format = (i < used_slots - 1) ? subtype->format_summarized_worker_item : subtype->format_summarized_worker_last_item;
start = ngx_sprintf(start, (char *) format->data, worker_data->pid, worker_data->subscribers); start = ngx_sprintf(start, (char *) format->data, worker_data->pid, worker_data->subscribers, ngx_time() - worker_data->startup);
i++; i++;
} }
} }
len = 3*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 18;// minus 18 sprintf len = 4*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf
if ((b = ngx_create_temp_buf(r->pool, len)) == NULL) { if ((b = ngx_create_temp_buf(r->pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
...@@ -183,7 +183,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -183,7 +183,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
} }
ngx_memset(b->start, '\0', len); ngx_memset(b->start, '\0', len);
b->last = ngx_sprintf(b->start, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, data->published_messages, data->subscribers, subscribers_by_workers); b->last = ngx_sprintf(b->start, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, data->published_messages, data->subscribers, ngx_time() - data->startup, subscribers_by_workers);
return ngx_http_push_stream_send_buf_response(r, b, subtype->content_type, NGX_HTTP_OK); return ngx_http_push_stream_send_buf_response(r, b, subtype->content_type, NGX_HTTP_OK);
} }
...@@ -283,13 +283,13 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -283,13 +283,13 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
hostname = ngx_http_push_stream_get_formatted_hostname(r->pool); hostname = ngx_http_push_stream_get_formatted_hostname(r->pool);
// format content header // format content header
if ((header_response.data = ngx_pcalloc(r->pool, head->len + hostname->len + currenttime->len + 1)) == NULL) { if ((header_response.data = ngx_pcalloc(r->pool, head->len + hostname->len + currenttime->len + NGX_INT_T_LEN + 1)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
ngx_memset(header_response.data, '\0', head->len + hostname->len + currenttime->len + 1); ngx_memset(header_response.data, '\0', head->len + hostname->len + currenttime->len + NGX_INT_T_LEN + 1);
ngx_sprintf(header_response.data, (char *) head->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels); ngx_sprintf(header_response.data, (char *) head->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, ngx_time() - data->startup);
header_response.len = ngx_strlen(header_response.data); header_response.len = ngx_strlen(header_response.data);
content_len += header_response.len + tail->len; content_len += header_response.len + tail->len;
......
...@@ -141,6 +141,7 @@ ngx_http_push_stream_ipc_init_worker() ...@@ -141,6 +141,7 @@ ngx_http_push_stream_ipc_init_worker()
} }
data->ipc[ngx_process_slot].pid = ngx_pid; data->ipc[ngx_process_slot].pid = ngx_pid;
data->ipc[ngx_process_slot].startup = ngx_time();
ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue); ngx_queue_init(&data->ipc[ngx_process_slot].messages_queue->queue);
ngx_queue_init(&data->ipc[ngx_process_slot].worker_subscribers_sentinel->queue); ngx_queue_init(&data->ipc[ngx_process_slot].worker_subscribers_sentinel->queue);
......
...@@ -652,11 +652,14 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -652,11 +652,14 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
ngx_queue_init(&d->messages_to_delete.queue); ngx_queue_init(&d->messages_to_delete.queue);
for (i = 0; i < NGX_MAX_PROCESSES; i++) { for (i = 0; i < NGX_MAX_PROCESSES; i++) {
d->ipc[i].pid = -1; d->ipc[i].pid = -1;
d->ipc[i].startup = 0;
d->ipc[i].subscribers = 0; d->ipc[i].subscribers = 0;
d->ipc[i].messages_queue = NULL; d->ipc[i].messages_queue = NULL;
d->ipc[i].worker_subscribers_sentinel = NULL; d->ipc[i].worker_subscribers_sentinel = NULL;
} }
d->startup = ngx_time();
// initialize rbtree // initialize rbtree
if ((sentinel = ngx_slab_alloc(shpool, sizeof(*sentinel))) == NULL) { if ((sentinel = ngx_slab_alloc(shpool, sizeof(*sentinel))) == NULL) {
return NGX_ERROR; return NGX_ERROR;
......
...@@ -545,4 +545,79 @@ class TestChannelStatistics < Test::Unit::TestCase ...@@ -545,4 +545,79 @@ class TestChannelStatistics < Test::Unit::TestCase
} }
} }
end end
def test_get_uptime_in_detailed_channels_statistics
headers = {'accept' => 'application/json'}
channel = 'ch_test_get_uptime_in_detailed_channels_statistics'
body = 'body'
#create channel
publish_message(channel, headers, body)
EventMachine.run {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ALL').get :head => headers, :timeout => 30
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_2.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_2.response)
assert(response.has_key?("hostname") && !response["hostname"].empty?, "Hasn't a key hostname")
assert(response.has_key?("time") && !response["time"].empty?, "Hasn't a key time")
assert(response.has_key?("channels") && !response["channels"].empty?, "Hasn't a key channels")
assert(response.has_key?("broadcast_channels") && !response["broadcast_channels"].empty?, "Hasn't a key broadcast_channels")
assert(response.has_key?("uptime") && !response["uptime"].empty?, "Hasn't a key uptime")
assert(response.has_key?("infos") && !response["infos"].empty?, "Hasn't a key infos")
sleep (2)
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ALL').get :head => headers, :timeout => 30
pub_3.callback {
assert_equal(200, pub_3.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_3.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_3.response)
assert(response["uptime"].to_i >= 2, "Don't get server uptime")
EventMachine.stop
}
}
}
end
def test_get_uptime_in_summarized_channels_statistics
headers = {'accept' => 'application/json'}
channel = 'ch_test_get_uptime_in_summarized_channels_statistics'
body = 'body'
#create channel
publish_message(channel, headers, body)
EventMachine.run {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30
pub_2.callback {
assert_equal(200, pub_2.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_2.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_2.response)
assert(response.has_key?("hostname") && !response["hostname"].empty?, "Hasn't a key hostname")
assert(response.has_key?("time") && !response["time"].empty?, "Hasn't a key time")
assert(response.has_key?("channels") && !response["channels"].empty?, "Hasn't a key channels")
assert(response.has_key?("broadcast_channels") && !response["broadcast_channels"].empty?, "Hasn't a key broadcast_channels")
assert(response.has_key?("published_messages") && !response["published_messages"].empty?, "Hasn't a key published_messages")
assert(response.has_key?("subscribers") && !response["subscribers"].empty?, "Hasn't a key subscribers")
assert(response.has_key?("uptime") && !response["uptime"].empty?, "Hasn't a key uptime")
assert(response.has_key?("by_worker") && !response["by_worker"].empty?, "Hasn't a key by_worker")
assert(response["by_worker"][0].has_key?("pid") && !response["by_worker"][0]["pid"].empty?, "Hasn't a key pid on worker info")
assert(response["by_worker"][0].has_key?("subscribers") && !response["by_worker"][0]["subscribers"].empty?, "Hasn't a key subscribers on worker info")
assert(response["by_worker"][0].has_key?("uptime") && !response["by_worker"][0]["uptime"].empty?, "Hasn't a key uptime on worker info")
sleep (2)
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30
pub_3.callback {
assert_equal(200, pub_3.response_header.status, "Request was not accepted")
assert_not_equal(0, pub_3.response_header.content_length, "Empty response was received")
response = JSON.parse(pub_3.response)
assert(response["uptime"].to_i >= 2, "Don't get server uptime")
assert(response["by_worker"][0]["uptime"].to_i >= 2, "Don't get worker uptime")
EventMachine.stop
}
}
}
end
end end
...@@ -5,7 +5,7 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -5,7 +5,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
def config_test_message_cleanup def config_test_message_cleanup
@min_message_buffer_timeout = '10s' @min_message_buffer_timeout = '10s'
@max_reserved_memory = "64k" @max_reserved_memory = "65k"
@max_message_buffer_length = 100 @max_message_buffer_length = 100
@memory_cleanup_timeout = '30s' @memory_cleanup_timeout = '30s'
end end
...@@ -38,6 +38,7 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -38,6 +38,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics") assert_equal(200, pub_2.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics") assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
stored_messages_setp_1 = JSON.parse(pub_2.response)["stored_messages"].to_i stored_messages_setp_1 = JSON.parse(pub_2.response)["stored_messages"].to_i
fail("Don't create any message") if stored_messages_setp_1 == 0
i = 0 i = 0
EM.add_periodic_timer(1) do EM.add_periodic_timer(1) do
...@@ -64,7 +65,7 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -64,7 +65,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
def config_test_channel_cleanup def config_test_channel_cleanup
@min_message_buffer_timeout = '10s' @min_message_buffer_timeout = '10s'
@max_reserved_memory = "64k" @max_reserved_memory = "65k"
@memory_cleanup_timeout = '30s' @memory_cleanup_timeout = '30s'
end end
...@@ -92,6 +93,7 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -92,6 +93,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
assert_equal(200, pub_2.response_header.status, "Don't get channels statistics") assert_equal(200, pub_2.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics") assert_not_equal(0, pub_2.response_header.content_length, "Don't received channels statistics")
channels_setp_1 = JSON.parse(pub_2.response)["channels"].to_i channels_setp_1 = JSON.parse(pub_2.response)["channels"].to_i
fail("Don't create any channel") if channels_setp_1 == 0
i = 0 i = 0
EM.add_periodic_timer(1) do EM.add_periodic_timer(1) do
...@@ -117,7 +119,7 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -117,7 +119,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
end end
def config_test_message_cleanup_with_store_off_with_subscriber def config_test_message_cleanup_with_store_off_with_subscriber
@max_reserved_memory = "64k" @max_reserved_memory = "65k"
@store_messages = 'off' @store_messages = 'off'
@memory_cleanup_timeout = '30s' @memory_cleanup_timeout = '30s'
end end
...@@ -153,7 +155,7 @@ class TestCreateManyChannels < Test::Unit::TestCase ...@@ -153,7 +155,7 @@ class TestCreateManyChannels < Test::Unit::TestCase
end end
def config_test_message_cleanup_with_store_off_without_subscriber def config_test_message_cleanup_with_store_off_without_subscriber
@max_reserved_memory = "64k" @max_reserved_memory = "65k"
@store_messages = 'off' @store_messages = 'off'
@memory_cleanup_timeout = '30s' @memory_cleanup_timeout = '30s'
end end
......
...@@ -22,7 +22,7 @@ class TestKeepalive < Test::Unit::TestCase ...@@ -22,7 +22,7 @@ class TestKeepalive < Test::Unit::TestCase
socket.print(get_without_channel_id) socket.print(get_without_channel_id)
headers, body = read_response(socket) headers, body = read_response(socket)
assert_equal("", body, "Wrong response") assert_equal("", body, "Wrong response")
assert(headers.index('No channel id provided.') > 0, "Didn't receive error message") assert(headers.match(/No channel id provided\./), "Didn't receive error message")
socket.print(post_channel_message) socket.print(post_channel_message)
headers, body = read_response(socket) headers, body = read_response(socket)
...@@ -30,8 +30,8 @@ class TestKeepalive < Test::Unit::TestCase ...@@ -30,8 +30,8 @@ class TestKeepalive < Test::Unit::TestCase
socket.print(get_channels_stats) socket.print(get_channels_stats)
headers, body = read_response(socket) headers, body = read_response(socket)
assert(body.index("\"channels\": \"1\", \"broadcast_channels\": \"0\", \"published_messages\": \"1\", \"subscribers\": \"0\", \"by_worker\": [\r\n") > 0, "Didn't receive message") assert(body.match(/"channels": "1", "broadcast_channels": "0", "published_messages": "1", "subscribers": "0", "uptime": "[0-9]*", "by_worker": \[\r\n/), "Didn't receive message")
assert(body.index("\"subscribers\": \"0\"}") > 0, "Didn't receive message") assert(body.match(/\{"pid": "[0-9]*", "subscribers": "0", "uptime": "[0-9]*"\}/), "Didn't receive message")
socket.print(get_channel_stats) socket.print(get_channel_stats)
headers, body = read_response(socket) headers, body = read_response(socket)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment