Commit 03b911e6 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

fix bug on function...

fix bug on function ngx_http_push_stream_collect_expired_messages_and_empty_channels caused by high concurrency
parent 05abba2f
......@@ -32,7 +32,6 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
sentinel = &channel->message_queue;
while (!ngx_queue_empty(&sentinel->queue) && ((channel->stored_messages > max_messages) || expired)) {
msg = (ngx_http_push_stream_msg_t *)ngx_queue_next(&sentinel->queue);
......@@ -195,29 +194,33 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_rbtree_t *t
sentinel = tree->sentinel;
if (node != sentinel) {
channel = (ngx_http_push_stream_channel_t *) node;
if ((!channel->deleted) && (&channel->node != sentinel)) {
if (node->left != NULL) {
if ((!channel->deleted) && (channel->node.left != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(tree, shpool, node->left, force, memory_cleanup_timeout);
}
if (node->right != NULL) {
if ((!channel->deleted) && (channel->node.right != NULL)) {
ngx_http_push_stream_collect_expired_messages_and_empty_channels(tree, shpool, node->right, force, memory_cleanup_timeout);
}
ngx_shmtx_lock(&shpool->mutex);
channel = (ngx_http_push_stream_channel_t *) node;
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1, memory_cleanup_timeout);
if ((channel != NULL) && (!channel->deleted)) {
if ((channel->stored_messages == 0) && (channel->subscribers == 0)) {
channel->deleted = 1;
channel->expires = ngx_time() + memory_cleanup_timeout;
(channel->broadcast) ? data->broadcast_channels-- : data->channels--;
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, (force) ? 0 : channel->stored_messages, 1, memory_cleanup_timeout);
ngx_rbtree_delete(&data->tree, (ngx_rbtree_node_t *) channel);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->channels_to_delete, (ngx_rbtree_node_t *) channel);
if ((channel->stored_messages == 0) && (channel->subscribers == 0)) {
channel->deleted = 1;
channel->expires = ngx_time() + memory_cleanup_timeout;
(channel->broadcast) ? data->broadcast_channels-- : data->channels--;
// move the channel to trash tree
ngx_rbtree_delete(&data->tree, (ngx_rbtree_node_t *) channel);
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->channels_to_delete, (ngx_rbtree_node_t *) channel);
}
}
ngx_shmtx_unlock(&shpool->mutex);
......@@ -261,7 +264,8 @@ ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_rbtree_t *tree,
cur = next;
}
ngx_slab_free_locked(shpool, node);
ngx_slab_free_locked(shpool, channel->id.data);
ngx_slab_free_locked(shpool, channel);
}
}
}
......@@ -288,10 +292,9 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
ngx_http_push_stream_msg_t *sentinel, *cur, *next;
sentinel = &data->messages_to_delete;
cur = (ngx_http_push_stream_msg_t *)ngx_queue_next(&sentinel->queue);
ngx_shmtx_lock(&shpool->mutex);
cur = (ngx_http_push_stream_msg_t *)ngx_queue_next(&sentinel->queue);
while (cur != sentinel) {
next = (ngx_http_push_stream_msg_t *)ngx_queue_next(&cur->queue);
if ((ngx_time() > cur->expires) || force) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment