Commit e615d904 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

- add function ngx_http_push_stream_find_channel_locked to search for a...

- add function ngx_http_push_stream_find_channel_locked to search for a channel in a locked memory area
- fix memory allocation for channel id
parent 3477fe92
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf); static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_loc_conf_t *cf);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log); static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel_locked(ngx_str_t *id, ngx_log_t *log);
static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right)); static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
static void ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel); static void ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
......
...@@ -77,6 +77,23 @@ ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbt ...@@ -77,6 +77,23 @@ ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbt
return NULL; return NULL;
} }
static void
ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
{
channel->stored_messages = 0;
channel->subscribers = 0;
channel->deleted = 0;
channel->expires = 0;
// initialize queues
ngx_queue_init(&channel->message_queue.queue);
ngx_queue_init(&channel->workers_with_subscribers.queue);
channel->message_queue.deleted = 0;
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
}
static ngx_http_push_stream_channel_t * static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log) ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
{ {
...@@ -92,21 +109,58 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log) ...@@ -92,21 +109,58 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree); channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if ((channel == NULL) || channel->deleted) { if ((channel == NULL) || channel->deleted) {
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// check again to see if any other worker didn't recover the channel from trash or create the channel
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if (channel != NULL) { // we found our channel
ngx_shmtx_unlock(&shpool->mutex);
return channel;
}
// check to see if it's on the trash
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->channels_to_delete); channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->channels_to_delete);
if (channel != NULL) { if (channel != NULL) {
channel->deleted = 0; // move the channel back to main tree (recover from trash)
channel->expires = 0; ngx_rbtree_delete(&data->channels_to_delete, (ngx_rbtree_node_t *) channel);
ngx_http_push_stream_initialize_channel(channel);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(channel->broadcast) ? data->broadcast_channels++ : data->channels++; (channel->broadcast) ? data->broadcast_channels++ : data->channels++;
}
// reinitialize queues ngx_shmtx_unlock(&shpool->mutex);
ngx_queue_init(&channel->message_queue.queue); }
ngx_queue_init(&channel->workers_with_subscribers.queue);
return channel;
}
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel_locked(ngx_str_t *id, ngx_log_t *log)
{
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_channel_t *channel = NULL;
if (id == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: tried to find a channel with a null id");
return NULL;
}
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if ((channel == NULL) || channel->deleted) {
// check again to see if any other worker didn't recover the channel from trash or create the channel
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if (channel != NULL) { // we found our channel
return channel;
}
// check to see if it's on the trash
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->channels_to_delete);
if (channel != NULL) {
// move the channel back to main tree (recover from trash)
ngx_rbtree_delete(&data->channels_to_delete, (ngx_rbtree_node_t *) channel); ngx_rbtree_delete(&data->channels_to_delete, (ngx_rbtree_node_t *) channel);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len); ngx_http_push_stream_initialize_channel(channel);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel); ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(channel->broadcast) ? data->broadcast_channels++ : data->channels++;
} }
ngx_shmtx_unlock(&shpool->mutex);
} }
return channel; return channel;
...@@ -128,6 +182,14 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st ...@@ -128,6 +182,14 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
// check again to see if any other worker didn't create the channel
channel = ngx_http_push_stream_find_channel_locked(id, log);
if (channel != NULL) { // we found our channel
ngx_shmtx_unlock(&shpool->mutex);
return channel;
}
if ((cf->broadcast_channel_prefix.len > 0) && (ngx_strncmp(id->data, cf->broadcast_channel_prefix.data, cf->broadcast_channel_prefix.len) == 0)) { if ((cf->broadcast_channel_prefix.len > 0) && (ngx_strncmp(id->data, cf->broadcast_channel_prefix.data, cf->broadcast_channel_prefix.len) == 0)) {
is_broadcast_channel = 1; is_broadcast_channel = 1;
} }
...@@ -138,30 +200,25 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st ...@@ -138,30 +200,25 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
return NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED; return NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED;
} }
if ((channel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_channel_t) + id->len + 1)) == NULL) { if ((channel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_channel_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return NULL; return NULL;
} }
ngx_memset(channel, '\0', sizeof(ngx_http_push_stream_channel_t) + id->len + 1); if ((channel->id.data = ngx_slab_alloc_locked(shpool, id->len + 1)) == NULL) {
channel->id.data = (u_char *) (channel + 1); ngx_slab_free_locked(shpool, channel);
ngx_shmtx_unlock(&shpool->mutex);
return NULL;
}
ngx_memset(channel->id.data, '\0', id->len + 1);
channel->id.len = id->len; channel->id.len = id->len;
ngx_memcpy(channel->id.data, id->data, channel->id.len); ngx_memcpy(channel->id.data, id->data, channel->id.len);
channel->node.key = ngx_crc32_short(id->data, id->len);
channel->last_message_id = 0; channel->last_message_id = 0;
channel->stored_messages = 0;
channel->subscribers = 0;
channel->broadcast = is_broadcast_channel; channel->broadcast = is_broadcast_channel;
channel->message_queue.deleted = 0; ngx_http_push_stream_initialize_channel(channel);
channel->deleted = 0;
// initialize queues
ngx_queue_init(&channel->message_queue.queue);
ngx_queue_init(&channel->workers_with_subscribers.queue);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel); ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(is_broadcast_channel) ? data->broadcast_channels++ : data->channels++; (is_broadcast_channel) ? data->broadcast_channels++ : data->channels++;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment