Commit 0b864402 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

change the message_queue type on the ngx_http_push_stream_channel_t structure

parent 56e64d54
...@@ -138,7 +138,7 @@ typedef struct { ...@@ -138,7 +138,7 @@ typedef struct {
ngx_uint_t stored_messages; ngx_uint_t stored_messages;
ngx_uint_t subscribers; ngx_uint_t subscribers;
ngx_http_push_stream_pid_queue_t workers_with_subscribers; ngx_http_push_stream_pid_queue_t workers_with_subscribers;
ngx_http_push_stream_msg_t message_queue; ngx_queue_t message_queue;
time_t last_activity_time; time_t last_activity_time;
time_t expires; time_t expires;
ngx_flag_t deleted; ngx_flag_t deleted;
......
...@@ -555,17 +555,22 @@ static ngx_flag_t ...@@ -555,17 +555,22 @@ static ngx_flag_t
ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id) ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id)
{ {
ngx_flag_t old_messages = 0; ngx_flag_t old_messages = 0;
ngx_http_push_stream_msg_t *message, *message_sentinel; ngx_http_push_stream_msg_t *message;
ngx_queue_t *cur;
message_sentinel = &channel->message_queue;
message = message_sentinel;
if (channel->stored_messages > 0) { if (channel->stored_messages > 0) {
if (backtrack > 0) { if (backtrack > 0) {
old_messages = 1; old_messages = 1;
} else if ((last_event_id != NULL) || (if_modified_since >= 0)) { } else if ((last_event_id != NULL) || (if_modified_since >= 0)) {
ngx_flag_t found = 0; ngx_flag_t found = 0;
while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) { cur = &channel->message_queue;
while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &channel->message_queue)) {
message = (ngx_http_push_stream_msg_t *) ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
if (message->deleted) {
break;
}
if ((!found) && (last_event_id != NULL) && (message->event_id != NULL) && (ngx_memn2cmp(message->event_id->data, last_event_id->data, message->event_id->len, last_event_id->len) == 0)) { if ((!found) && (last_event_id != NULL) && (message->event_id != NULL) && (ngx_memn2cmp(message->event_id->data, last_event_id->data, message->event_id->len, last_event_id->len) == 0)) {
found = 1; found = 1;
continue; continue;
...@@ -591,16 +596,21 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch ...@@ -591,16 +596,21 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
static void static void
ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id) ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id)
{ {
ngx_http_push_stream_msg_t *message, *message_sentinel; ngx_http_push_stream_msg_t *message;
ngx_queue_t *cur;
if (ngx_http_push_stream_has_old_messages_to_send(channel, backtrack, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) { if (ngx_http_push_stream_has_old_messages_to_send(channel, backtrack, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) {
message_sentinel = &channel->message_queue; cur = &channel->message_queue;
message = message_sentinel;
if (backtrack > 0) { if (backtrack > 0) {
ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack; ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack;
ngx_uint_t start = channel->stored_messages - qtd; ngx_uint_t start = channel->stored_messages - qtd;
// positioning at first message, and send the others // positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) { while ((qtd > 0) && (cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &channel->message_queue)) {
message = (ngx_http_push_stream_msg_t *) ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
if (message->deleted) {
break;
}
if (start == 0) { if (start == 0) {
ngx_http_push_stream_send_response_message(r, channel, message, 0, 1); ngx_http_push_stream_send_response_message(r, channel, message, 0, 1);
qtd--; qtd--;
...@@ -610,7 +620,12 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -610,7 +620,12 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
} }
} else if ((last_event_id != NULL) || (if_modified_since >= 0)) { } else if ((last_event_id != NULL) || (if_modified_since >= 0)) {
ngx_flag_t found = 0; ngx_flag_t found = 0;
while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) { while ((cur = ngx_queue_next(cur)) && (cur != NULL) && (cur != &channel->message_queue)) {
message = (ngx_http_push_stream_msg_t *) ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
if (message->deleted) {
break;
}
if ((!found) && (last_event_id != NULL) && (message->event_id != NULL) && (ngx_memn2cmp(message->event_id->data, last_event_id->data, message->event_id->len, last_event_id->len) == 0)) { if ((!found) && (last_event_id != NULL) && (message->event_id != NULL) && (ngx_memn2cmp(message->event_id->data, last_event_id->data, message->event_id->len, last_event_id->len) == 0)) {
found = 1; found = 1;
continue; continue;
......
...@@ -34,18 +34,17 @@ static ngx_inline void ...@@ -34,18 +34,17 @@ static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired) ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_msg_t *sentinel, *msg; ngx_http_push_stream_msg_t *msg;
ngx_queue_t *cur;
if (max_messages == NGX_CONF_UNSET_UINT) { if (max_messages == NGX_CONF_UNSET_UINT) {
return; return;
} }
sentinel = &channel->message_queue; while ((cur = ngx_queue_head(&channel->message_queue)) && (cur != NULL) && (cur != &channel->message_queue) && ((channel->stored_messages > max_messages) || expired)) {
msg = (ngx_http_push_stream_msg_t *) ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
while (!ngx_queue_empty(&sentinel->queue) && ((channel->stored_messages > max_messages) || expired)) { if (expired && (msg->deleted || (msg->expires == 0) || (msg->expires > ngx_time()) || (msg->workers_ref_count > 0))) {
msg = (ngx_http_push_stream_msg_t *)ngx_queue_next(&sentinel->queue);
if (expired && ((msg->expires == 0) || (msg->expires > ngx_time()) || (msg->workers_ref_count > 0))) {
break; break;
} }
...@@ -55,7 +54,6 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_ ...@@ -55,7 +54,6 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_
ngx_queue_remove(&msg->queue); ngx_queue_remove(&msg->queue);
ngx_http_push_stream_mark_message_to_delete_locked(msg); ngx_http_push_stream_mark_message_to_delete_locked(msg);
} }
} }
...@@ -327,7 +325,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_ ...@@ -327,7 +325,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
// put messages on the queue // put messages on the queue
if (cf->store_messages) { if (cf->store_messages) {
ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue); ngx_queue_insert_tail(&channel->message_queue, &msg->queue);
channel->stored_messages++; channel->stored_messages++;
data->stored_messages++; data->stored_messages++;
......
...@@ -87,8 +87,7 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel) ...@@ -87,8 +87,7 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
channel->expires = 0; channel->expires = 0;
channel->last_activity_time = ngx_time(); channel->last_activity_time = ngx_time();
ngx_queue_init(&channel->message_queue.queue); ngx_queue_init(&channel->message_queue);
channel->message_queue.deleted = 0;
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len); channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->tree, &channel->node); ngx_rbtree_insert(&data->tree, &channel->node);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment