Commit 874079ea authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding subscribers count by each worker as part of statistics

parent e95689e0
......@@ -121,6 +121,8 @@ typedef struct {
typedef struct {
ngx_http_push_stream_worker_msg_t messages_queue;
ngx_http_push_stream_worker_subscriber_t worker_subscribers_sentinel;
ngx_uint_t subscribers; // # of subscribers in the worker
pid_t pid;
} ngx_http_push_stream_worker_data_t;
// shared memory
......
This diff is collapsed.
......@@ -119,8 +119,11 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
ngx_buf_t *b;
ngx_uint_t len;
ngx_str_t *currenttime, *hostname;
ngx_str_t *currenttime, *hostname, *format;
u_char *subscribers_by_workers, *start;
int i;
ngx_http_push_stream_shm_data_t *shm_data;
ngx_http_push_stream_worker_data_t *worker_data;
ngx_http_push_stream_content_subtype_t *subtype;
subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
......@@ -129,7 +132,17 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
shm_data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
len = 3*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len - 16;// minus 16 sprintf
len = (subtype->format_summarized_worker_item->len > subtype->format_summarized_worker_last_item->len) ? subtype->format_summarized_worker_item->len : subtype->format_summarized_worker_last_item->len;
len = ngx_http_push_stream_worker_processes * (2*NGX_INT_T_LEN + len - 5); //minus 5 sprintf
subscribers_by_workers = ngx_pcalloc(r->pool, len);
start = subscribers_by_workers;
for (i = 0; i < ngx_http_push_stream_worker_processes; i++) {
format = (i < ngx_http_push_stream_worker_processes - 1) ? subtype->format_summarized_worker_item : subtype->format_summarized_worker_last_item;
worker_data = shm_data->ipc + i;
start = ngx_sprintf(start, (char *) format->data, worker_data->pid, worker_data->subscribers);
}
len = 3*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 18;// minus 18 sprintf
if ((b = ngx_create_temp_buf(r->pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
......@@ -137,7 +150,7 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
}
ngx_memset(b->start, '\0', len);
b->last = ngx_sprintf(b->start, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, shm_data->channels, shm_data->broadcast_channels, shm_data->published_messages, shm_data->subscribers);
b->last = ngx_sprintf(b->start, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, shm_data->channels, shm_data->broadcast_channels, shm_data->published_messages, shm_data->subscribers, subscribers_by_workers);
return ngx_http_push_stream_send_buf_response(r, b, subtype->content_type, NGX_HTTP_OK);
}
......
......@@ -99,12 +99,16 @@ ngx_http_push_stream_init_ipc_shm(ngx_int_t workers)
if (data->ipc != NULL) {
// already initialized... reset channel subscribers counters and census subscribers
ngx_http_push_stream_worker_data_t *workers_data = data->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_data_t *worker_data = NULL;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = &thisworker_data->worker_subscribers_sentinel;
ngx_queue_init(&sentinel->queue);
for(i=0; i<workers; i++) {
worker_data = data->ipc + i;
worker_data->subscribers = 0;
}
data->subscribers = 0;
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked);
......@@ -224,6 +228,7 @@ ngx_http_push_stream_census_worker_subscribers(void)
cur_subscription->channel->subscribers++;
}
data->subscribers++;
thisworker_data->subscribers++;
}
ngx_shmtx_unlock(&shpool->mutex);
......
......@@ -180,6 +180,10 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
return NGX_ERROR;
}
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
thisworker_data->pid = ngx_pid;
return ngx_http_push_stream_register_worker_message_handler(cycle);
}
......
......@@ -12,8 +12,8 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_pool_t *temp_pool;
ngx_uint_t subscribed_channels_qtd = 0;
ngx_uint_t subscribed_broadcast_channels_qtd = 0;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_flag_t is_broadcast_channel;
ngx_http_push_stream_channel_t *channel;
......@@ -140,7 +140,8 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel.queue, &worker_subscriber->queue);
// increment global subscribers count
((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->subscribers++;
data->subscribers++;
thisworker_data->subscribers++;
ngx_shmtx_unlock(&shpool->mutex);
......
......@@ -471,6 +471,7 @@ ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_worker_subsc
{
ngx_http_push_stream_subscription_t *cur, *sentinel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_shmtx_lock(&shpool->mutex);
sentinel = &worker_subscriber->subscriptions_sentinel;
......@@ -484,7 +485,8 @@ ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_worker_subsc
ngx_queue_remove(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->queue);
worker_subscriber->clndata->worker_subscriber = NULL;
((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->subscribers--;
data->subscribers--;
(data->ipc + ngx_process_slot)->subscribers--;
ngx_shmtx_unlock(&shpool->mutex);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment