Commit c16c6bbe authored by Wandenberg's avatar Wandenberg

fix count of subscribers on worker start

parent f517ae57
......@@ -127,6 +127,7 @@ typedef struct {
pid_t pid;
ngx_int_t slot;
ngx_queue_t subscriptions_queue;
ngx_uint_t subscribers;
} ngx_http_push_stream_pid_queue_t;
// our typecast-friendly rbtree node (channel)
......@@ -162,6 +163,7 @@ typedef struct {
ngx_queue_t channel_worker_queue;
ngx_http_push_stream_subscriber_t *subscriber;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *channel_worker_sentinel;
} ngx_http_push_stream_subscription_t;
struct ngx_http_push_stream_subscriber_s {
......@@ -227,8 +229,7 @@ typedef struct {
// shared memory
struct ngx_http_push_stream_global_shm_data_s {
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup;
pid_t pid[NGX_MAX_PROCESSES];
ngx_queue_t shm_datas_queue;
};
......@@ -253,6 +254,7 @@ struct ngx_http_push_stream_shm_data_s {
ngx_http_push_stream_main_conf_t *mcf;
ngx_shm_zone_t *shm_zone;
ngx_slab_pool_t *shpool;
ngx_uint_t slots_for_census;
};
ngx_shm_zone_t *ngx_http_push_stream_global_shm_zone = NULL;
......
......@@ -4,7 +4,6 @@ require 'spec_helper'
describe "Subscriber WebSocket" do
let(:config) do
{
:workers => 1,
:header_template => nil,
:message_template => nil,
:footer_template => nil,
......@@ -150,12 +149,12 @@ describe "Subscriber WebSocket" do
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
socket.close
body.should eql("")
headers.should match_the_pattern(/HTTP\/1\.1 101 Switching Protocols/)
headers.should match_the_pattern(/Sec-WebSocket-Accept: RaIOIcQ6CBoc74B9EKdH0avYZnw=/)
headers.should match_the_pattern(/Upgrade: WebSocket/)
headers.should match_the_pattern(/Connection: Upgrade/)
socket.close
end
end
......@@ -166,7 +165,6 @@ describe "Subscriber WebSocket" do
nginx_run_server(config.merge(:header_template => "HEADER_TEMPLATE")) do |conf|
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
sleep(1)
headers, body = read_response_on_socket(socket, 'TEMPLATE')
body.should eql("\201\017HEADER_TEMPLATE")
socket.close
......@@ -181,9 +179,7 @@ describe "Subscriber WebSocket" do
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
#wait for ping message
sleep(1)
body, dummy = read_response_on_socket(socket)
body, dummy = read_response_on_socket(socket, "\211\000")
body.should eql("\211\000")
socket.close
end
......@@ -197,8 +193,6 @@ describe "Subscriber WebSocket" do
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
#wait for disconnect
sleep(1)
body, dummy = read_response_on_socket(socket, "\210\000")
body.should eql("\210\000")
socket.close
......@@ -226,8 +220,6 @@ describe "Subscriber WebSocket" do
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
#wait for disconnect
sleep(1)
body, dummy = read_response_on_socket(socket, "\210\000")
body.should eql("\201\017FOOTER_TEMPLATE\210\000")
socket.close
......@@ -448,11 +440,10 @@ describe "Subscriber WebSocket" do
headers, body = read_response_on_socket(socket)
socket.print(frame)
sleep(1)
EventMachine.run do
pub = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :timeout => 30
pub.callback do
socket.close
pub.should be_http_status(200).with_body
response = JSON.parse(pub.response)
response["channel"].to_s.should eql(channel)
......@@ -460,7 +451,6 @@ describe "Subscriber WebSocket" do
response["stored_messages"].to_i.should eql(0)
response["subscribers"].to_i.should eql(1)
EventMachine.stop
socket.close
end
end
end
......@@ -477,12 +467,13 @@ describe "Subscriber WebSocket" do
socket.print("#{request}\r\n")
headers, body = read_response_on_socket(socket)
socket.print(frame)
sleep(1)
body, dummy = read_response_on_socket(socket, "\210\000")
body.should eql("\210\000")
EventMachine.run do
pub = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :timeout => 30
pub.callback do
socket.close
pub.should be_http_status(200).with_body
response = JSON.parse(pub.response)
response["channel"].to_s.should eql(channel)
......@@ -490,7 +481,6 @@ describe "Subscriber WebSocket" do
response["stored_messages"].to_i.should eql(0)
response["subscribers"].to_i.should eql(0)
EventMachine.stop
socket.close
end
end
end
......@@ -567,8 +557,6 @@ describe "Subscriber WebSocket" do
socket.print("WRITE SOMETHING UNKNOWN\r\n")
sleep(1)
error_log = File.read(conf.error_log)
error_log.should_not include("client sent invalid")
socket.close
......@@ -606,10 +594,9 @@ describe "Subscriber WebSocket" do
socket.print("WRITE SOMETHING UNKNOWN\r\n")
sleep(1)
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get
pub_2.callback do
socket.close
pub_2.should be_http_status(200).with_body
resp_2 = JSON.parse(pub_2.response)
resp_2.has_key?("channels").should be_true
......@@ -620,7 +607,6 @@ describe "Subscriber WebSocket" do
(error_log_pos - error_log_pre).join.should_not include("client sent invalid")
EventMachine.stop
socket.close
end
end
end
......
......@@ -123,23 +123,16 @@ ngx_http_push_stream_ipc_init_worker(void)
int i;
ngx_shmtx_lock(&global_shpool->mutex);
global_data->ipc[ngx_process_slot].pid = ngx_pid;
global_data->ipc[ngx_process_slot].startup = ngx_time();
global_data->pid[ngx_process_slot] = ngx_pid;
while ((cur = ngx_queue_next(cur)) != &global_data->shm_datas_queue) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_ipc_init_worker_data(data);
}
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (global_data->ipc[i].pid > 0) {
global_data->ipc[i].subscribers = 0;
}
}
ngx_shmtx_unlock(&global_shpool->mutex);
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (global_data->ipc[i].pid > 0) {
ngx_http_push_stream_alert_worker_census_subscribers(global_data->ipc[i].pid, i, ngx_cycle->log);
if (global_data->pid[i] > 0) {
ngx_http_push_stream_alert_worker_census_subscribers(global_data->pid[i], i, ngx_cycle->log);
}
}
......@@ -151,8 +144,6 @@ void
ngx_http_push_stream_ipc_init_worker_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_queue_t *cur = &data->channels_queue;
ngx_http_push_stream_channel_t *channel;
int i;
// cleanning old content if worker die and another one is set on same slot
......@@ -163,15 +154,10 @@ ngx_http_push_stream_ipc_init_worker_data(ngx_http_push_stream_shm_data_t *data)
data->ipc[ngx_process_slot].pid = ngx_pid;
data->ipc[ngx_process_slot].startup = ngx_time();
data->subscribers = 0;
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &data->channels_queue)) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
channel->subscribers = 0;
}
data->slots_for_census = 0;
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (data->ipc[i].pid > 0) {
data->ipc[i].subscribers = 0;
data->slots_for_census++;
}
}
......@@ -186,8 +172,8 @@ ngx_http_push_stream_alert_shutting_down_workers(void)
int i;
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (global_data->ipc[i].pid > 0) {
ngx_http_push_stream_alert_worker_shutting_down_cleanup(global_data->ipc[i].pid, i, ngx_cycle->log);
if (global_data->pid[i] > 0) {
ngx_http_push_stream_alert_worker_shutting_down_cleanup(global_data->pid[i], i, ngx_cycle->log);
ngx_close_channel((ngx_socket_t *) ngx_http_push_stream_socketpairs[i], ngx_cycle->log);
ngx_http_push_stream_socketpairs[i][0] = NGX_INVALID_FILE;
ngx_http_push_stream_socketpairs[i][1] = NGX_INVALID_FILE;
......@@ -331,25 +317,55 @@ static ngx_inline void
ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_worker_data_t *workers_data = data->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_queue_t *cur;
ngx_http_push_stream_worker_data_t *thisworker_data = &data->ipc[ngx_process_slot];
ngx_queue_t *q, *cur, *cur_worker;
ngx_http_push_stream_subscription_t *cur_subscription;
int i;
ngx_shmtx_lock(&shpool->mutex);
thisworker_data->subscribers = 0;
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_channel_t *channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
for (cur_worker = ngx_queue_head(&channel->workers_with_subscribers); cur_worker != ngx_queue_sentinel(&channel->workers_with_subscribers); cur_worker = ngx_queue_next(cur_worker)) {
ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == ngx_pid) {
worker->subscribers = 0;
}
}
}
cur = &thisworker_data->subscribers_queue;
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &thisworker_data->subscribers_queue)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(cur, ngx_http_push_stream_subscriber_t, worker_queue);
cur_subscription = &subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) {
cur_subscription->channel->subscribers++;
cur_subscription->channel_worker_sentinel->subscribers++;
}
data->subscribers++;
thisworker_data->subscribers++;
}
data->slots_for_census--;
if (data->slots_for_census == 0) {
data->subscribers = 0;
for (i = 0; i < NGX_MAX_PROCESSES; i++) {
if (data->ipc[i].pid > 0) {
data->subscribers += data->ipc[i].subscribers;
}
}
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_channel_t *channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
channel->subscribers = 0;
for (cur_worker = ngx_queue_head(&channel->workers_with_subscribers); cur_worker != ngx_queue_sentinel(&channel->workers_with_subscribers); cur_worker = ngx_queue_next(cur_worker)) {
ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
channel->subscribers += worker->subscribers;
}
}
}
ngx_shmtx_unlock(&shpool->mutex);
}
......
......@@ -1000,15 +1000,9 @@ ngx_http_push_stream_init_global_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
shm_zone->data = d;
for (i = 0; i < NGX_MAX_PROCESSES; i++) {
d->ipc[i].pid = -1;
d->ipc[i].startup = 0;
d->ipc[i].subscribers = 0;
ngx_queue_init(&d->ipc[i].messages_queue);
ngx_queue_init(&d->ipc[i].subscribers_queue);
d->pid[i] = -1;
}
d->startup = ngx_time();
ngx_queue_init(&d->shm_datas_queue);
ngx_http_push_stream_global_shm_zone = shm_zone;
......@@ -1067,6 +1061,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->last_message_tag = 0;
d->shm_zone = shm_zone;
d->shpool = mcf->shpool;
d->slots_for_census = 0;
// initialize rbtree
if ((sentinel = ngx_slab_alloc(mcf->shpool, sizeof(*sentinel))) == NULL) {
......
......@@ -450,7 +450,7 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_worker_data_t *thisworker_data = &data->ipc[ngx_process_slot];
ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
......@@ -619,6 +619,7 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p
// initialize
ngx_queue_insert_tail(&channel->workers_with_subscribers, &worker_sentinel->queue);
worker_sentinel->subscribers = 0;
worker_sentinel->pid = ngx_pid;
worker_sentinel->slot = ngx_process_slot;
ngx_queue_init(&worker_sentinel->subscriptions_queue);
......@@ -636,6 +637,7 @@ ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http
return NULL;
}
subscription->channel_worker_sentinel = NULL;
subscription->channel = channel;
subscription->subscriber = subscriber;
ngx_queue_init(&subscription->queue);
......@@ -675,9 +677,11 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
}
channel->subscribers++; // do this only when we know everything went okay
worker_subscribers_sentinel->subscribers++;
channel->expires = ngx_time() + mcf->channel_inactivity_time;
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriptions_queue, &subscription->channel_worker_queue);
subscription->channel_worker_sentinel = worker_subscribers_sentinel;
return NGX_OK;
}
......
......@@ -106,6 +106,7 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
ngx_shmtx_lock(&shpool->mutex);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(worker->subscribers);
// remove the subscription for the channel from subscriber
ngx_queue_remove(&subscription->queue);
// remove the subscription for the channel from worker
......@@ -171,8 +172,7 @@ ngx_http_push_stream_cleanup_shutting_down_worker(void)
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(cur, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_cleanup_shutting_down_worker_data(data);
}
global_data->ipc[ngx_process_slot].pid = -1;
global_data->ipc[ngx_process_slot].subscribers = 0;
global_data->pid[ngx_process_slot] = -1;
}
......@@ -1395,6 +1395,7 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc
while ((cur = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(cur->channel_worker_sentinel->subscribers);
ngx_queue_remove(&cur->channel_worker_queue);
ngx_queue_remove(&cur->queue);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment