Commit 7ff9a1ad authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding functions create_worker_subscriber_channel_sentinel_locked,...

adding functions create_worker_subscriber_channel_sentinel_locked, create_channel_subscription and assing_subscription_to_channel_locked to group the logics responsable to add a subscriber to a channel
parent 91af3d05
...@@ -28,6 +28,9 @@ ...@@ -28,6 +28,9 @@
static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r); static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static void ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber); static void ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since); static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log);
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_http_push_stream_pid_queue_t *worker_subscribers_sentinel, ngx_log_t *log);
static ngx_int_t static ngx_int_t
ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
...@@ -158,88 +161,47 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -158,88 +161,47 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
static ngx_int_t static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool) ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool)
{ {
ngx_http_push_stream_pid_queue_t *sentinel, *cur, *found; ngx_http_push_stream_pid_queue_t *cur, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscriber_t *subscriber;
ngx_http_push_stream_subscriber_t *subscriber_sentinel;
ngx_http_push_stream_subscription_t *subscription; ngx_http_push_stream_subscription_t *subscription;
ngx_int_t result;
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log); if ((channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log)) == NULL) {
if (channel == NULL) {
// channel not found // channel not found
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_ERROR; return NGX_ERROR;
} }
sentinel = &channel->workers_with_subscribers; cur = &channel->workers_with_subscribers;
cur = sentinel;
found = NULL; while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != &channel->workers_with_subscribers) {
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (cur->pid == ngx_pid) { if (cur->pid == ngx_pid) {
found = cur; worker_subscribers_sentinel = cur;
break; break;
} }
} }
if (found == NULL) { // found nothing if (worker_subscribers_sentinel == NULL) { // found nothing
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// check if channel still exists worker_subscribers_sentinel = ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(shpool, requested_channel->id, r->connection->log);
channel = ngx_http_push_stream_find_channel_locked(requested_channel->id, r->connection->log); ngx_shmtx_unlock(&shpool->mutex);
if (channel == NULL) { if (worker_subscribers_sentinel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", requested_channel->id->data);
return NGX_ERROR;
}
if ((found = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_pid_queue_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber queue marker in shared memory");
return NGX_ERROR; return NGX_ERROR;
} }
// initialize
ngx_queue_insert_tail(&sentinel->queue, &found->queue);
found->pid = ngx_pid;
found->slot = ngx_process_slot;
ngx_queue_init(&found->subscriber_sentinel.queue);
ngx_shmtx_unlock(&shpool->mutex);
} }
if ((subscription = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_subscription_t))) == NULL) { if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channel reference");
return NGX_ERROR;
}
if ((subscriber = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_subscriber_t))) == NULL) { // unable to allocate request queue element
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channel reference");
return NGX_ERROR; return NGX_ERROR;
} }
subscriber_sentinel = &found->subscriber_sentinel;
subscriber->request = r;
subscription->channel = channel;
subscription->subscriber = subscriber;
// send old messages to new subscriber // send old messages to new subscriber
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since); ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since);
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// check if channel still exists result = ngx_http_push_stream_assing_subscription_to_channel_locked(requested_channel->id, subscription, subscriptions_sentinel, worker_subscribers_sentinel, r->connection->log);
channel = ngx_http_push_stream_find_channel_locked(requested_channel->id, r->connection->log);
if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", requested_channel->id->data);
return NGX_ERROR;
}
channel->subscribers++; // do this only when we know everything went okay
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&subscriber_sentinel->queue, &subscriber->queue);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK; return result;
} }
ngx_http_push_stream_requested_channel_t * ngx_http_push_stream_requested_channel_t *
...@@ -429,3 +391,70 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -429,3 +391,70 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
} }
} }
} }
static ngx_http_push_stream_pid_queue_t *
ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log)
{
ngx_http_push_stream_pid_queue_t *worker_sentinel;
ngx_http_push_stream_channel_t *channel;
// check if channel still exists
if ((channel = ngx_http_push_stream_find_channel_locked(channel_id, log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NULL;
}
if ((worker_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_pid_queue_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate worker subscriber queue marker in shared memory");
return NULL;
}
// initialize
ngx_queue_insert_tail(&channel->workers_with_subscribers.queue, &worker_sentinel->queue);
worker_sentinel->pid = ngx_pid;
worker_sentinel->slot = ngx_process_slot;
ngx_queue_init(&worker_sentinel->subscriber_sentinel.queue);
return worker_sentinel;
}
static ngx_http_push_stream_subscription_t *
ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel)
{
ngx_http_push_stream_subscription_t *subscription;
ngx_http_push_stream_subscriber_t *subscriber;
if ((subscription = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_subscription_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channel reference");
return NULL;
}
if ((subscriber = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_subscriber_t))) == NULL) { // unable to allocate request queue element
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channel reference");
return NULL;
}
subscriber->request = r;
subscription->channel = channel;
subscription->subscriber = subscriber;
return subscription;
}
static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_http_push_stream_pid_queue_t *worker_subscribers_sentinel, ngx_log_t *log)
{
ngx_http_push_stream_channel_t *channel;
// check if channel still exists
if ((channel = ngx_http_push_stream_find_channel_locked(channel_id, log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NGX_ERROR;
}
channel->subscribers++; // do this only when we know everything went okay
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriber_sentinel.queue, &subscription->subscriber->queue);
return NGX_OK;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment