Commit de3a1935 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

removing the feature of recover channel from trash instead of recreate it

parent 152326ab
...@@ -214,9 +214,8 @@ typedef struct { ...@@ -214,9 +214,8 @@ typedef struct {
ngx_uint_t published_messages; // # of published messagens in all channels ngx_uint_t published_messages; // # of published messagens in all channels
ngx_uint_t subscribers; // # of subscribers in all channels ngx_uint_t subscribers; // # of subscribers in all channels
ngx_http_push_stream_msg_t messages_to_delete; ngx_http_push_stream_msg_t messages_to_delete;
ngx_rbtree_t channels_to_delete;
ngx_queue_t channels_queue; ngx_queue_t channels_queue;
ngx_queue_t channels_to_delete_queue; ngx_queue_t channels_to_delete;
ngx_queue_t unrecoverable_channels; ngx_queue_t unrecoverable_channels;
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup; time_t startup;
......
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf); static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log); static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel_locked(ngx_str_t *id, ngx_log_t *log);
static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right)); static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
static void ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel); static void ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
......
...@@ -893,7 +893,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -893,7 +893,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
} }
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
ngx_rbtree_node_t *sentinel, *remove_sentinel; ngx_rbtree_node_t *sentinel;
ngx_http_push_stream_shm_data_t *d; ngx_http_push_stream_shm_data_t *d;
if ((d = (ngx_http_push_stream_shm_data_t *) ngx_slab_alloc(shpool, sizeof(*d))) == NULL) { //shm_data plus an array. if ((d = (ngx_http_push_stream_shm_data_t *) ngx_slab_alloc(shpool, sizeof(*d))) == NULL) { //shm_data plus an array.
...@@ -919,13 +919,8 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -919,13 +919,8 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
} }
ngx_rbtree_init(&d->tree, sentinel, ngx_http_push_stream_rbtree_insert); ngx_rbtree_init(&d->tree, sentinel, ngx_http_push_stream_rbtree_insert);
if ((remove_sentinel = ngx_slab_alloc(shpool, sizeof(*remove_sentinel))) == NULL) {
return NGX_ERROR;
}
ngx_rbtree_init(&d->channels_to_delete, remove_sentinel, ngx_http_push_stream_rbtree_insert);
ngx_queue_init(&d->channels_queue); ngx_queue_init(&d->channels_queue);
ngx_queue_init(&d->channels_to_delete_queue); ngx_queue_init(&d->channels_to_delete);
ngx_queue_init(&d->unrecoverable_channels); ngx_queue_init(&d->unrecoverable_channels);
// create ping message // create ping message
......
...@@ -188,7 +188,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -188,7 +188,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
// check if has any message to send // check if has any message to send
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_find_channel_locked(cur->id, r->connection->log); channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log);
if (channel == NULL) { if (channel == NULL) {
// channel not found // channel not found
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -226,7 +226,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -226,7 +226,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
// adding subscriber to channel(s) // adding subscriber to channel(s)
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if ((channel = ngx_http_push_stream_find_channel_locked(cur->id, r->connection->log)) == NULL) { if ((channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log)) == NULL) {
// channel not found // channel not found
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data);
...@@ -277,7 +277,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -277,7 +277,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_find_channel_locked(cur->id, r->connection->log); channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log);
if (channel == NULL) { if (channel == NULL) {
// channel not found // channel not found
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data);
...@@ -637,7 +637,7 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p ...@@ -637,7 +637,7 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
// check if channel still exists // check if channel still exists
if ((channel = ngx_http_push_stream_find_channel_locked(channel_id, log)) == NULL) { if ((channel = ngx_http_push_stream_find_channel(channel_id, log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data); ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NULL; return NULL;
} }
...@@ -681,7 +681,7 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo ...@@ -681,7 +681,7 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
ngx_http_push_stream_queue_elem_t *element_subscriber; ngx_http_push_stream_queue_elem_t *element_subscriber;
// check if channel still exists // check if channel still exists
if ((channel = ngx_http_push_stream_find_channel_locked(channel_id, log)) == NULL) { if ((channel = ngx_http_push_stream_find_channel(channel_id, log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data); ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NGX_ERROR; return NGX_ERROR;
} }
......
...@@ -296,7 +296,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_ ...@@ -296,7 +296,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// just find the channel. if it's not there, NULL and return error. // just find the channel. if it's not there, NULL and return error.
channel = ngx_http_push_stream_find_channel_locked(id, r->connection->log); channel = ngx_http_push_stream_find_channel(id, r->connection->log);
if (channel == NULL) { if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex); ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
...@@ -663,7 +663,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) ...@@ -663,7 +663,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
channel = ngx_http_push_stream_find_channel_locked(id, ngx_cycle->log); channel = ngx_http_push_stream_find_channel(id, ngx_cycle->log);
if (channel != NULL) { if (channel != NULL) {
// remove channel from tree // remove channel from tree
channel->deleted = 1; channel->deleted = 1;
...@@ -726,10 +726,8 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s ...@@ -726,10 +726,8 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_s
// move the channel to trash tree // move the channel to trash tree
ngx_rbtree_delete(&data->tree, &channel->node); ngx_rbtree_delete(&data->tree, &channel->node);
ngx_queue_remove(&channel->queue); ngx_queue_remove(&channel->queue);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len); ngx_queue_insert_tail(&data->channels_to_delete, &channel->queue);
ngx_rbtree_insert(&data->channels_to_delete, &channel->node); channel->queue_sentinel = &data->channels_to_delete;
ngx_queue_insert_tail(&data->channels_to_delete_queue, &channel->queue);
channel->queue_sentinel = &data->channels_to_delete_queue;
} }
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -757,17 +755,15 @@ ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *d ...@@ -757,17 +755,15 @@ ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *d
static void static void
ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force) ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force)
{ {
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_queue_t *cur; ngx_queue_t *cur;
while ((cur = ngx_queue_head(&data->channels_to_delete_queue)) != &data->channels_to_delete_queue) { while ((cur = ngx_queue_head(&data->channels_to_delete)) != &data->channels_to_delete) {
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue); channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
if ((ngx_time() > channel->expires) || force) { if ((ngx_time() > channel->expires) || force) {
ngx_rbtree_delete(&data->channels_to_delete, &channel->node);
ngx_queue_remove(&channel->queue); ngx_queue_remove(&channel->queue);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel); nxg_http_push_stream_free_channel_memory_locked(shpool, channel);
} else { } else {
...@@ -837,7 +833,7 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for ...@@ -837,7 +833,7 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
cur = prev; cur = prev;
} }
} }
ngx_http_push_stream_free_memory_of_expired_channels_locked(&data->channels_to_delete, shpool, data->channels_to_delete.root, force); ngx_http_push_stream_free_memory_of_expired_channels_locked(data, shpool, force);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK; return NGX_OK;
......
...@@ -77,6 +77,7 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel) ...@@ -77,6 +77,7 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
channel->channel_deleted_message = NULL;
channel->last_message_id = 0; channel->last_message_id = 0;
channel->last_message_time = 0; channel->last_message_time = 0;
channel->last_message_tag = 0; channel->last_message_tag = 0;
...@@ -100,7 +101,6 @@ static ngx_http_push_stream_channel_t * ...@@ -100,7 +101,6 @@ static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log) ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_channel_t *channel = NULL; ngx_http_push_stream_channel_t *channel = NULL;
if (id == NULL) { if (id == NULL) {
...@@ -110,59 +110,9 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log) ...@@ -110,59 +110,9 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree); channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if ((channel == NULL) || channel->deleted) { if ((channel == NULL) || channel->deleted) {
ngx_shmtx_lock(&shpool->mutex);
// check again to see if any other worker didn't recover the channel from trash or create the channel
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if (channel != NULL) { // we found our channel
ngx_shmtx_unlock(&shpool->mutex);
return channel;
}
// check to see if it's on the trash
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->channels_to_delete);
if (channel != NULL) {
// move the channel back to main tree (recover from trash)
ngx_rbtree_delete(&data->channels_to_delete, &channel->node);
ngx_queue_remove(&channel->queue);
ngx_http_push_stream_initialize_channel(channel);
}
ngx_shmtx_unlock(&shpool->mutex);
}
return channel;
}
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel_locked(ngx_str_t *id, ngx_log_t *log)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_channel_t *channel = NULL;
if (id == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: tried to find a channel with a null id");
return NULL; return NULL;
} }
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if ((channel == NULL) || channel->deleted) {
// check again to see if any other worker didn't recover the channel from trash or create the channel
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if (channel != NULL) { // we found our channel
return channel;
}
// check to see if it's on the trash
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->channels_to_delete);
if (channel != NULL) {
// move the channel back to main tree (recover from trash)
ngx_rbtree_delete(&data->channels_to_delete, &channel->node);
ngx_queue_remove(&channel->queue);
ngx_http_push_stream_initialize_channel(channel);
}
}
return channel; return channel;
} }
...@@ -185,7 +135,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st ...@@ -185,7 +135,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// check again to see if any other worker didn't create the channel // check again to see if any other worker didn't create the channel
channel = ngx_http_push_stream_find_channel_locked(id, log); channel = ngx_http_push_stream_find_channel(id, log);
if (channel != NULL) { // we found our channel if (channel != NULL) { // we found our channel
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return channel; return channel;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment