Commit a61b7a0c authored by Wandenberg's avatar Wandenberg

reduce number of calls to ngx_http_push_stream_find_channel function

parent 9935347c
......@@ -178,6 +178,7 @@ typedef struct {
ngx_queue_t queue;
ngx_str_t *id;
ngx_uint_t backtrack_messages;
ngx_http_push_stream_channel_t *channel;
} ngx_http_push_stream_requested_channel_t;
typedef struct {
......
......@@ -253,7 +253,7 @@ static void ngx_http_push_stream_unescape_uri(ngx_str_t *value);
static void ngx_http_push_stream_complex_value(ngx_http_request_t *r, ngx_http_complex_value_t *val, ngx_str_t *value);
ngx_http_push_stream_channel_t *ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
ngx_int_t ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
......@@ -270,7 +270,7 @@ static void ngx_http_push_stream_worker_subscriber_cleanup_locke
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg, ngx_http_push_stream_shm_data_t *data);
static ngx_flag_t ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool);
static ngx_flag_t ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
......
......@@ -239,7 +239,6 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
ngx_http_push_stream_channel_info_t *channel_info;
ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *q;
ngx_uint_t qtd_channels = 0;
......@@ -250,14 +249,12 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
// search for a existing channel with this id
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
if ((channel != NULL) && ((channel_info = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_channel_info_t))) != NULL)) {
channel_info->id.data = channel->id.data;
channel_info->id.len = channel->id.len;
channel_info->published_messages = channel->last_message_id;
channel_info->stored_messages = channel->stored_messages;
channel_info->subscribers = channel->subscribers;
if ((requested_channel->channel != NULL) && ((channel_info = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_channel_info_t))) != NULL)) {
channel_info->id.data = requested_channel->channel->id.data;
channel_info->id.len = requested_channel->channel->id.len;
channel_info->published_messages = requested_channel->channel->last_message_id;
channel_info->stored_messages = requested_channel->channel->stored_messages;
channel_info->subscribers = requested_channel->channel->subscribers;
ngx_queue_insert_tail(&queue_channel_info, &channel_info->queue);
qtd_channels++;
......
......@@ -31,7 +31,6 @@ static ngx_int_t ngx_http_push_stream_publisher_handle_after_read_body(ngx_ht
static ngx_int_t
ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
{
ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_module_ctx_t *ctx;
......@@ -98,16 +97,19 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
// create the channel if doesn't exist
channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, cf, mcf);
if (channel == NULL) {
requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, cf, mcf);
if (requested_channel->channel == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
}
if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
if (requested_channel->channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: number of channels were exceeded");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE);
}
} else {
requested_channel->channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
}
}
......@@ -216,7 +218,7 @@ ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
for (q = ngx_queue_head(&ctx->requested_channels->queue); q != ngx_queue_sentinel(&ctx->requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
if (ngx_http_push_stream_delete_channel(mcf, requested_channel->id, text, len, r->pool)) {
if (ngx_http_push_stream_delete_channel(mcf, requested_channel->channel, text, len, r->pool)) {
qtd_channels++;
}
}
......@@ -235,7 +237,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *q;
......@@ -261,8 +262,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
for (q = ngx_queue_head(&ctx->requested_channels->queue); q != ngx_queue_sentinel(&ctx->requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_add_msg_to_channel(r, requested_channel->id, buf->pos, ngx_buf_size(buf), event_id, event_type, r->pool);
if (channel == NULL) {
if (ngx_http_push_stream_add_msg_to_channel(r, requested_channel->channel, buf->pos, ngx_buf_size(buf), event_id, event_type, r->pool) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
......@@ -328,6 +328,8 @@ ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
if (ngx_memn2cmp(requested_channel->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, requested_channel->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
return ngx_http_push_stream_send_response_all_channels_info_detailed(r, NULL);
}
requested_channel->channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
}
// if specify a channels ids != ALL, get info about specified channels if they exists
......
......@@ -30,9 +30,9 @@ static ngx_http_push_stream_subscriber_t *ngx_http_push_stream_subscriber
static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool);
static ngx_http_push_stream_padding_t *ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r);
void ngx_http_push_stream_websocket_reading(ngx_http_request_t *r);
......@@ -162,7 +162,6 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *q;
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription;
time_t greater_message_time;
ngx_int_t greater_message_tag;
......@@ -187,22 +186,15 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
// check if has any message to send
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
if (channel == NULL) {
// channel not found
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (ngx_http_push_stream_has_old_messages_to_send(channel, requested_channel->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) {
if (ngx_http_push_stream_has_old_messages_to_send(requested_channel->channel, requested_channel->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) {
has_message_to_send = 1;
if (channel->last_message_time > greater_message_time) {
greater_message_time = channel->last_message_time;
greater_message_tag = channel->last_message_tag;
if (requested_channel->channel->last_message_time > greater_message_time) {
greater_message_time = requested_channel->channel->last_message_time;
greater_message_tag = requested_channel->channel->last_message_tag;
} else {
if ((channel->last_message_time == greater_message_time) && (channel->last_message_tag > greater_message_tag) ) {
greater_message_tag = channel->last_message_tag;
if ((requested_channel->channel->last_message_time == greater_message_time) && (requested_channel->channel->last_message_tag > greater_message_tag) ) {
greater_message_tag = requested_channel->channel->last_message_tag;
}
}
}
......@@ -225,19 +217,13 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
// adding subscriber to channel(s)
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
if ((channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf)) == NULL) {
// channel not found
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel, worker_subscriber)) == NULL) {
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, requested_channel->channel, worker_subscriber)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, &worker_subscriber->subscriptions, r->connection->log);
ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->channel, subscription, &worker_subscriber->subscriptions, r->connection->log);
}
ngx_shmtx_unlock(&shpool->mutex);
......@@ -274,13 +260,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
if (channel == NULL) {
// channel not found
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id);
ngx_http_push_stream_send_old_messages(r, requested_channel->channel, requested_channel->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id);
}
if (ctx->callback != NULL) {
......@@ -299,26 +279,19 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscription_t *subscription;
ngx_int_t result;
ngx_slab_pool_t *shpool = mcf->shpool;
if ((channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf)) == NULL) {
// channel not found
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", requested_channel->id->data);
return NGX_ERROR;
}
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel, subscriber)) == NULL) {
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, requested_channel->channel, subscriber)) == NULL) {
return NGX_ERROR;
}
// send old messages to new subscriber
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, tag, 0, -1, last_event_id);
ngx_http_push_stream_send_old_messages(r, requested_channel->channel, requested_channel->backtrack_messages, if_modified_since, tag, 0, -1, last_event_id);
ngx_shmtx_lock(&shpool->mutex);
result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->id, subscription, &subscriber->subscriptions, r->connection->log);
result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->channel, subscription, &subscriber->subscriptions, r->connection->log);
ngx_shmtx_unlock(&shpool->mutex);
return result;
......@@ -335,7 +308,6 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
ngx_uint_t subscribed_channels_qtd = 0;
ngx_uint_t subscribed_wildcard_channels_qtd = 0;
ngx_flag_t is_wildcard_channel;
ngx_http_push_stream_channel_t *channel;
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
......@@ -362,17 +334,17 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
subscribed_wildcard_channels_qtd++;
}
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
requested_channel->channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
// check if channel exists when authorized_channels_only is on
if (cf->authorized_channels_only && !is_wildcard_channel && ((channel == NULL) || (channel->stored_messages == 0))) {
if (cf->authorized_channels_only && !is_wildcard_channel && ((requested_channel->channel == NULL) || (requested_channel->channel->stored_messages == 0))) {
*status_code = NGX_HTTP_FORBIDDEN;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS;
return NGX_ERROR;
}
// check if channel is full of subscribers
if ((mcf->max_subscribers_per_channel != NGX_CONF_UNSET_UINT) && ((channel != NULL) && (channel->subscribers >= mcf->max_subscribers_per_channel))) {
if ((mcf->max_subscribers_per_channel != NGX_CONF_UNSET_UINT) && ((requested_channel->channel != NULL) && (requested_channel->channel->subscribers >= mcf->max_subscribers_per_channel))) {
*status_code = NGX_HTTP_FORBIDDEN;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL;
return NGX_ERROR;
......@@ -390,15 +362,19 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
// create the channels in advance, if doesn't exist, to ensure max number of channels in the server
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, cf, mcf);
if (channel == NULL) {
if (requested_channel->channel != NULL) {
continue;
}
requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, cf, mcf);
if (requested_channel->channel == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
*status_code = NGX_HTTP_INTERNAL_SERVER_ERROR;
*explain_error_message = NULL;
return NGX_ERROR;
}
if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
if (requested_channel->channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: number of channels were exceeded");
*status_code = NGX_HTTP_FORBIDDEN;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE;
......@@ -603,16 +579,9 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
}
static ngx_http_push_stream_pid_queue_t *
ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
ngx_http_push_stream_pid_queue_t *worker_sentinel;
ngx_http_push_stream_channel_t *channel;
// check if channel still exists
if ((channel = ngx_http_push_stream_find_channel(channel_id, log, mcf)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NULL;
}
if ((worker_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_pid_queue_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate worker subscriber queue marker in shared memory");
......@@ -650,18 +619,11 @@ ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http
}
static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log)
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(subscription->subscriber->request, ngx_http_push_stream_module);
ngx_queue_t *q;
ngx_http_push_stream_pid_queue_t *worker, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel;
// check if channel still exists
if ((channel = ngx_http_push_stream_find_channel(channel_id, log, mcf)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NGX_ERROR;
}
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
......@@ -672,7 +634,7 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
}
if (worker_subscribers_sentinel == NULL) { // found nothing
worker_subscribers_sentinel = ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(shpool, channel_id, log, mcf);
worker_subscribers_sentinel = ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(shpool, channel, log, mcf);
if (worker_subscribers_sentinel == NULL) {
return NGX_ERROR;
}
......
......@@ -336,32 +336,22 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
}
ngx_http_push_stream_channel_t *
ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
ngx_int_t
ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_msg_t *msg;
ngx_shmtx_lock(&shpool->mutex);
// just find the channel. if it's not there, NULL and return error.
channel = ngx_http_push_stream_find_channel(id, r->connection->log, mcf);
if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
return NULL;
}
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, text, len, channel, channel->last_message_id + 1, event_id, event_type, temp_pool);
if (msg == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate message in shared memory");
return NULL;
return NGX_ERROR;
}
channel->last_message_id++;
......@@ -392,7 +382,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
// turn on timer to cleanup buffer of old messages
ngx_http_push_stream_buffer_cleanup_timer_set(cf);
return channel;
return NGX_OK;
}
......@@ -845,19 +835,17 @@ ngx_http_push_stream_send_websocket_close_frame(ngx_http_request_t *r, ngx_uint_
}
static ngx_flag_t
ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool)
ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *q;
ngx_flag_t deleted = 0;
ngx_shmtx_lock(&shpool->mutex);
channel = ngx_http_push_stream_find_channel(id, ngx_cycle->log, mcf);
if (channel != NULL) {
// remove channel from tree
if ((channel != NULL) && !channel->deleted) {
deleted = 1;
channel->deleted = 1;
(channel->wildcard) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->wildcard_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
......@@ -867,13 +855,12 @@ ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_s
ngx_queue_insert_tail(&data->channels_to_delete, &channel->queue);
channel->queue_sentinel = &data->channels_to_delete;
// remove all messages
ngx_http_push_stream_ensure_qtd_of_messages_locked(data, channel, 0, 0);
// apply channel deleted message text to message template
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, text, len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to channel deleted message");
return 0;
}
......@@ -889,8 +876,8 @@ ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_s
}
}
ngx_shmtx_unlock(&(shpool)->mutex);
return (channel != NULL);
ngx_shmtx_unlock(&shpool->mutex);
return deleted;
}
......
......@@ -292,7 +292,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
for (q = ngx_queue_head(&ctx->subscriber->subscriptions); q != ngx_queue_sentinel(&ctx->subscriber->subscriptions); q = ngx_queue_next(q)) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, queue);
if (ngx_http_push_stream_add_msg_to_channel(r, &subscription->channel->id, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, ctx->temp_pool) == NULL) {
if (ngx_http_push_stream_add_msg_to_channel(r, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, ctx->temp_pool) != NGX_OK) {
ngx_http_finalize_request(r, NGX_OK);
return;
}
......
......@@ -76,6 +76,7 @@ ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbt
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_channel_t *channel = NULL;
......@@ -84,7 +85,9 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_s
return NULL;
}
ngx_shmtx_lock(&shpool->mutex);
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
ngx_shmtx_unlock(&shpool->mutex);
if ((channel == NULL) || channel->deleted) {
return NULL;
}
......@@ -102,15 +105,15 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_flag_t is_wildcard_channel = 0;
channel = ngx_http_push_stream_find_channel(id, log, mcf);
if (channel != NULL) { // we found our channel
return channel;
if (id == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: tried to create a channel with a null id");
return NULL;
}
ngx_shmtx_lock(&shpool->mutex);
// check again to see if any other worker didn't create the channel
channel = ngx_http_push_stream_find_channel(id, log, mcf);
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if (channel != NULL) { // we found our channel
ngx_shmtx_unlock(&shpool->mutex);
return channel;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment