Commit 09827759 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

put the code responsable for send old messages to subscriber on function...

put the code responsable for send old messages to subscriber on function ngx_http_push_stream_send_old_messages
parent 8a387cea
......@@ -27,6 +27,7 @@
static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static void ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since);
static ngx_int_t
ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
......@@ -161,7 +162,6 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscriber_t *subscriber;
ngx_http_push_stream_subscriber_t *subscriber_sentinel;
ngx_http_push_stream_msg_t *message, *message_sentinel;
ngx_http_push_stream_subscription_t *subscription;
channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log);
......@@ -224,37 +224,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
subscription->subscriber = subscriber;
// send old messages to new subscriber
if (channel->stored_messages > 0) {
ngx_uint_t backtrack = requested_channel->backtrack_messages;
message_sentinel = &channel->message_queue;
message = message_sentinel;
ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack;
ngx_uint_t start = channel->stored_messages - qtd;
// positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (start == 0) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, message, r->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
}
qtd--;
} else {
start--;
}
}
if ((backtrack == 0) && (if_modified_since != 0)) {
while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (message->time > if_modified_since) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, message, r->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
}
}
}
}
}
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since);
ngx_shmtx_lock(&shpool->mutex);
// check if channel still exists
......@@ -429,3 +399,40 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subs
data->subscribers++;
thisworker_data->subscribers++;
}
static void
ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since)
{
ngx_http_push_stream_msg_t *message, *message_sentinel;
if (channel->stored_messages > 0) {
message_sentinel = &channel->message_queue;
message = message_sentinel;
ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack;
ngx_uint_t start = channel->stored_messages - qtd;
// positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (start == 0) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, message, r->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
}
qtd--;
} else {
start--;
}
}
if ((backtrack == 0) && (if_modified_since != 0)) {
while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (message->time > if_modified_since) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, message, r->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
}
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment