Commit 571f222f authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

simplifying the relation between subscriber and worker structures

parent 82e685e3
......@@ -167,7 +167,7 @@ struct ngx_http_push_stream_subscriber_s {
ngx_http_push_stream_subscription_t subscriptions_sentinel;
ngx_pid_t worker_subscribed_pid;
ngx_flag_t longpolling;
ngx_http_push_stream_queue_elem_t *worker_subscriber_element_ref;
ngx_queue_t worker_queue;
};
typedef struct {
......
......@@ -263,7 +263,6 @@ ngx_http_push_stream_census_worker_subscribers(void)
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *workers_data = data->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_queue_elem_t *elem;
ngx_queue_t *cur;
ngx_http_push_stream_subscription_t *cur_subscription;
......@@ -271,9 +270,8 @@ ngx_http_push_stream_census_worker_subscribers(void)
cur = &thisworker_data->subscribers_queue;
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &thisworker_data->subscribers_queue)) {
elem = ngx_queue_data(cur, ngx_http_push_stream_queue_elem_t, queue);
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(cur, ngx_http_push_stream_subscriber_t, worker_queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) elem->value;
cur_subscription = &subscriber->subscriptions_sentinel;
while ((cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue)) != &subscriber->subscriptions_sentinel) {
cur_subscription->channel->subscribers++;
......
......@@ -499,18 +499,10 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl;
ngx_http_push_stream_queue_elem_t *element_subscriber;
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
if ((element_subscriber = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_queue_elem_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscriber reference");
return NGX_ERROR;
}
element_subscriber->value = worker_subscriber;
worker_subscriber->worker_subscriber_element_ref = element_subscriber;
// adding subscriber to worker list of subscribers
ngx_queue_insert_tail(&thisworker_data->subscribers_queue, &element_subscriber->queue);
ngx_queue_insert_tail(&thisworker_data->subscribers_queue, &worker_subscriber->worker_queue);
ctx->longpolling = worker_subscriber->longpolling;
ctx->subscriber = worker_subscriber;
......
......@@ -1178,10 +1178,7 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc
ngx_queue_remove(&cur->queue);
}
ngx_queue_init(&sentinel->queue);
if (worker_subscriber->worker_subscriber_element_ref != NULL) {
ngx_queue_remove(&worker_subscriber->worker_subscriber_element_ref->queue);
ngx_queue_init(&worker_subscriber->worker_subscriber_element_ref->queue);
}
ngx_queue_remove(&worker_subscriber->worker_queue);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment