Commit 45828641 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

sending old messages by last event id header

parent 0a9dd7a0
......@@ -215,6 +215,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_DATE_FORMAT_ISO_8601 = ngx_string("
// headers
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID = ngx_string("Event-Id");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_LAST_EVENT_ID = ngx_string("Last-Event-Id");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ALLOW = ngx_string("Allow");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EXPLAIN = ngx_string("X-Nginx-PushStream-Explain");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING = ngx_string("Transfer-Encoding");
......
......@@ -37,7 +37,7 @@ ngx_http_push_stream_requested_channel_t * ngx_http_push_stream_parse_channels_i
static void ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data);
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_ */
......@@ -27,7 +27,7 @@
static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static void ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_str_t *last_event_id);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log);
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_http_push_stream_pid_queue_t *worker_subscribers_sentinel, ngx_log_t *log);
......@@ -45,6 +45,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_flag_t is_broadcast_channel;
ngx_http_push_stream_channel_t *channel;
time_t if_modified_since;
ngx_str_t *last_event_id;
// only accept GET method
if (!(r->method & NGX_HTTP_GET)) {
......@@ -122,6 +123,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
if_modified_since = (r->headers_in.if_modified_since != NULL) ? ngx_http_parse_time(r->headers_in.if_modified_since->value.data, r->headers_in.if_modified_since->value.len) : 0;
last_event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_LAST_EVENT_ID);
if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
ngx_destroy_pool(temp_pool);
......@@ -137,6 +139,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_registry_subscriber_locked(worker_subscriber);
ngx_shmtx_unlock(&shpool->mutex);
......@@ -144,7 +147,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// adding subscriber to channel(s) and send backtrack messages
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, &worker_subscriber->subscriptions_sentinel, temp_pool) != NGX_OK) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, last_event_id, &worker_subscriber->subscriptions_sentinel, temp_pool) != NGX_OK) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -159,7 +162,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool)
ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_pid_queue_t *cur, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel;
......@@ -195,7 +198,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
}
// send old messages to new subscriber
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since);
ngx_http_push_stream_send_old_messages(r, channel, requested_channel->backtrack_messages, if_modified_since, last_event_id);
ngx_shmtx_lock(&shpool->mutex);
result = ngx_http_push_stream_assing_subscription_to_channel_locked(requested_channel->id, subscription, subscriptions_sentinel, worker_subscribers_sentinel, r->connection->log);
......@@ -363,29 +366,43 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subs
}
static void
ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since)
ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_str_t *last_event_id)
{
ngx_http_push_stream_msg_t *message, *message_sentinel;
if (channel->stored_messages > 0) {
message_sentinel = &channel->message_queue;
message = message_sentinel;
ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack;
ngx_uint_t start = channel->stored_messages - qtd;
// positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (start == 0) {
ngx_http_push_stream_send_response_message(r, channel, message);
qtd--;
} else {
start--;
}
}
if ((backtrack == 0) && (if_modified_since != 0)) {
if (last_event_id != NULL) {
ngx_flag_t found = 0;
while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (message->time > if_modified_since) {
if ((!found) && (message->event_id != NULL) && (ngx_memn2cmp(message->event_id->data, last_event_id->data, message->event_id->len, last_event_id->len) == 0)) {
found = 1;
continue;
}
if (found) {
ngx_http_push_stream_send_response_message(r, channel, message);
}
}
} else {
ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack;
ngx_uint_t start = channel->stored_messages - qtd;
// positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (start == 0) {
ngx_http_push_stream_send_response_message(r, channel, message);
qtd--;
} else {
start--;
}
}
if ((backtrack == 0) && (if_modified_since != 0)) {
while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (message->time > if_modified_since) {
ngx_http_push_stream_send_response_message(r, channel, message);
}
}
}
}
......@@ -446,15 +463,15 @@ ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http
static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_http_push_stream_pid_queue_t *worker_subscribers_sentinel, ngx_log_t *log)
{
ngx_http_push_stream_channel_t *channel;
// check if channel still exists
if ((channel = ngx_http_push_stream_find_channel_locked(channel_id, log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NGX_ERROR;
}
channel->subscribers++; // do this only when we know everything went okay
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriber_sentinel.queue, &subscription->subscriber->queue);
return NGX_OK;
ngx_http_push_stream_channel_t *channel;
// check if channel still exists
if ((channel = ngx_http_push_stream_find_channel_locked(channel_id, log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_subscriber_assign_channel without created channel %s", channel_id->data);
return NGX_ERROR;
}
channel->subscribers++; // do this only when we know everything went okay
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriber_sentinel.queue, &subscription->subscriber->queue);
return NGX_OK;
}
......@@ -7,6 +7,7 @@ class TestEventSource < Test::Unit::TestCase
@subscriber_eventsource = 'on'
@header_template = nil
@message_template = nil
@ping_message_interval = nil
end
def config_test_content_type_should_be_event_stream
......@@ -42,7 +43,7 @@ class TestEventSource < Test::Unit::TestCase
end
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
publish_message_inline(channel, headers, body)
add_test_timeout
}
......@@ -50,7 +51,7 @@ class TestEventSource < Test::Unit::TestCase
def test_default_message_template_with_event_id
event_id = 'event_id_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-iD' => event_id }
headers = {'accept' => 'text/html', 'Event-Id' => event_id }
body = 'test message'
channel = 'ch_test_default_message_template_with_event_id'
response = ''
......@@ -65,7 +66,7 @@ class TestEventSource < Test::Unit::TestCase
end
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
publish_message_inline(channel, headers, body)
add_test_timeout
}
......@@ -91,7 +92,7 @@ class TestEventSource < Test::Unit::TestCase
end
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
publish_message_inline(channel, headers, body)
add_test_timeout
}
......@@ -103,7 +104,7 @@ class TestEventSource < Test::Unit::TestCase
def test_custom_message_template_with_event_id
event_id = 'event_id_with_generic_text_01'
headers = {'accept' => 'text/html', 'Event-iD' => event_id }
headers = {'accept' => 'text/html', 'Event-Id' => event_id }
body = 'test message'
channel = 'ch_test_custom_message_template_with_event_id'
response = ''
......@@ -118,10 +119,80 @@ class TestEventSource < Test::Unit::TestCase
end
}
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body, :timeout => 30
publish_message_inline(channel, headers, body)
add_test_timeout
}
end
def config_test_ping_message_on_event_source
@ping_message_interval = '1s'
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\"}'
end
def test_ping_message_on_event_source
headers = {'accept' => 'text/html'}
channel = 'ch_test_ping_message_on_event_source'
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream { | chunk |
assert_equal(": -1\r\n", chunk, "Wrong ping message")
EventMachine.stop
}
add_test_timeout
}
end
def test_get_old_messages_by_last_event_id
channel = 'ch_test_get_old_messages_by_last_event_id'
response = ''
EventMachine.run {
publish_message_inline(channel, {'accept' => 'text/html', 'Event-Id' => 'event 1' }, 'msg 1')
publish_message_inline(channel, {'accept' => 'text/html', 'Event-Id' => 'event 2' }, 'msg 2')
publish_message_inline(channel, {'accept' => 'text/html' }, 'msg 3')
publish_message_inline(channel, {'accept' => 'text/html', 'Event-Id' => 'event 3' }, 'msg 4')
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => {'Last-Event-Id' => 'event 2' }
sub.stream { | chunk |
response += chunk
if response.include?("msg 4")
assert_equal("data: msg 3\r\n\r\nid: event 3\r\ndata: msg 4\r\n\r\n", response, "The published message was not received correctly")
EventMachine.stop
end
}
add_test_timeout
}
end
def config_test_get_old_messages_by_last_event_id_without_found_event
@ping_message_interval = '1s'
end
def test_get_old_messages_by_last_event_id_without_found_event
channel = 'ch_test_get_old_messages_by_last_event_id_without_found_event'
response = ''
EventMachine.run {
publish_message_inline(channel, {'accept' => 'text/html', 'Event-Id' => 'event 1' }, 'msg 1')
publish_message_inline(channel, {'accept' => 'text/html', 'Event-Id' => 'event 2' }, 'msg 2')
publish_message_inline(channel, {'accept' => 'text/html' }, 'msg 3')
publish_message_inline(channel, {'accept' => 'text/html', 'Event-Id' => 'event 3' }, 'msg 4')
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => {'Last-Event-Id' => 'event_not_found' }
sub.stream { | chunk |
assert_equal(": -1\r\n", chunk, "Received any other message instead of ping")
EventMachine.stop
}
add_test_timeout
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment