Commit 05abba2f authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

changes to avoid problems when accessing a channel wich may be removed by another worker

parent e615d904
...@@ -198,4 +198,12 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET"); ...@@ -198,4 +198,12 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET");
return; \ return; \
} }
#define NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(val, fail, r, errormessage) \
if (val == fail) { \
ngx_shmtx_unlock(&(shpool)->mutex); \
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, errormessage); \
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); \
return; \
}
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_H_ */
...@@ -134,14 +134,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -134,14 +134,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id"); NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large"); NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large");
// just find the channel. if it's not there, NULL and return error.
channel = ngx_http_push_stream_find_channel(id, r->connection->log);
if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// copy request body to a memory buffer // copy request body to a memory buffer
buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n + 1); buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n + 1);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message"); NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message");
...@@ -174,21 +166,25 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -174,21 +166,25 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
buf->start = buf->last; buf->start = buf->last;
} }
// format message
buf_msg = ngx_http_push_stream_get_formatted_message(cf, channel, buf, r->pool);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf_msg, NULL, r, "push stream module: unable to format message");
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// create a buffer copy in shared mem // just find the channel. if it's not there, NULL and return error.
msg = ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(buf_msg); channel = ngx_http_push_stream_find_channel_locked(id, r->connection->log);
if (msg == NULL) { if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex); ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate message in shared memory"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return; return;
} }
// format message
buf_msg = ngx_http_push_stream_get_formatted_message(cf, channel, buf, r->pool);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(buf_msg, NULL, r, "push stream module: unable to format message");
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(buf_msg);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(msg, NULL, r, "push stream module: unable to allocate message in shared memory");
channel->last_message_id++; channel->last_message_id++;
((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->published_messages++; ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->published_messages++;
...@@ -198,10 +194,10 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -198,10 +194,10 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
msg->expires = (cf->buffer_timeout == NGX_CONF_UNSET ? 0 : (ngx_time() + cf->buffer_timeout)); msg->expires = (cf->buffer_timeout == NGX_CONF_UNSET ? 0 : (ngx_time() + cf->buffer_timeout));
ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue); ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue);
channel->stored_messages++; channel->stored_messages++;
}
// now see if the queue is too big // now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, cf->max_messages, 0, cf->memory_cleanup_timeout); ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, cf->max_messages, 0, cf->memory_cleanup_timeout);
}
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
......
...@@ -81,7 +81,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -81,7 +81,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on //validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
// could not be ALL channel // could not be ALL channel
if (ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) { if (ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
ngx_destroy_pool(temp_pool); ngx_destroy_pool(temp_pool);
...@@ -119,7 +119,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -119,7 +119,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// create the channels in advance, if doesn't exist, to ensure max number of channels in the server // create the channels in advance, if doesn't exist, to ensure max number of channels in the server
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf); channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf);
if (channel == NULL) { if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate memory for new channel"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate memory for new channel");
...@@ -224,6 +224,14 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http ...@@ -224,6 +224,14 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
if (found == NULL) { // found nothing if (found == NULL) { // found nothing
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// check if channel still exists
channel = ngx_http_push_stream_find_channel_locked(requested_channel->id, r->connection->log);
if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", requested_channel->id->data);
return NGX_ERROR;
}
if ((found = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_pid_queue_t))) == NULL) { if ((found = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_pid_queue_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber queue marker in shared memory"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber queue marker in shared memory");
...@@ -274,6 +282,13 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http ...@@ -274,6 +282,13 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// check if channel still exists
channel = ngx_http_push_stream_find_channel_locked(requested_channel->id, r->connection->log);
if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", requested_channel->id->data);
return NGX_ERROR;
}
channel->subscribers++; // do this only when we know everything went okay channel->subscribers++; // do this only when we know everything went okay
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue); ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&subscriber_sentinel->queue, &subscriber->queue); ngx_queue_insert_tail(&subscriber_sentinel->queue, &subscriber->queue);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment