Commit 8a387cea authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding support to retrieve old messages using If-Modified-Since header

parent 80fa531b
......@@ -80,6 +80,7 @@ typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_buf_t *buf;
time_t expires;
time_t time;
ngx_flag_t deleted;
ngx_int_t id;
ngx_str_t raw;
......
......@@ -37,7 +37,7 @@ ngx_http_push_stream_requested_channel_t * ngx_http_push_stream_parse_channels_i
static void ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data);
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_ */
......@@ -204,6 +204,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// put messages on the queue
if (cf->store_messages) {
msg->time = ngx_time();
// set message expiration time
msg->expires = (cf->buffer_timeout == NGX_CONF_UNSET ? 0 : (ngx_time() + cf->buffer_timeout));
ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue);
......
......@@ -40,6 +40,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_uint_t subscribed_broadcast_channels_qtd = 0;
ngx_flag_t is_broadcast_channel;
ngx_http_push_stream_channel_t *channel;
time_t if_modified_since;
// only accept GET method
if (!(r->method & NGX_HTTP_GET)) {
......@@ -116,6 +117,8 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
}
if_modified_since = (r->headers_in.if_modified_since != NULL) ? ngx_http_parse_time(r->headers_in.if_modified_since->value.data, r->headers_in.if_modified_since->value.len) : 0;
if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
......@@ -137,7 +140,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// adding subscriber to channel(s) and send backtrack messages
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, &worker_subscriber->subscriptions_sentinel, temp_pool) != NGX_OK) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, &worker_subscriber->subscriptions_sentinel, temp_pool) != NGX_OK) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -152,7 +155,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool)
ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_pid_queue_t *sentinel, *cur, *found;
ngx_http_push_stream_channel_t *channel;
......@@ -222,9 +225,10 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
// send old messages to new subscriber
if (channel->stored_messages > 0) {
ngx_uint_t backtrack = requested_channel->backtrack_messages;
message_sentinel = &channel->message_queue;
message = message_sentinel;
ngx_uint_t qtd = (requested_channel->backtrack_messages > channel->stored_messages) ? channel->stored_messages : requested_channel->backtrack_messages;
ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack;
ngx_uint_t start = channel->stored_messages - qtd;
// positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
......@@ -239,6 +243,17 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
start--;
}
}
if ((backtrack == 0) && (if_modified_since != 0)) {
while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (message->time > if_modified_since) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, message, r->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
}
}
}
}
}
ngx_shmtx_lock(&shpool->mutex);
......
......@@ -479,6 +479,150 @@ class TestPublisher < Test::Unit::TestCase
end
def config_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header
@header_template = 'HEADER'
@message_template = '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}'
end
def test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header
headers = {'accept' => 'application/json'}
channel_1 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_1'
channel_2 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_2'
channel_3 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_3'
body = 'body'
#create channels with some messages with progressive interval (2,4,6,10,14,18,24,30,36 seconds)
1.upto(3) do |i|
sleep(i * 2)
publish_message(channel_1, headers, body + i.to_s)
sleep(i * 2)
publish_message(channel_2, headers, body + i.to_s)
sleep(i * 2)
publish_message(channel_3, headers, body + i.to_s)
end
#get messages published less then 20 seconds ago
t = Time.now
t = t - 20
headers = headers.merge({'If-Modified-Since' => t.utc.strftime("%a, %e %b %Y %T %Z")})
response = ""
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '/' + channel_3.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
response += chunk
lines = response.split("\r\n")
if lines.length >= 5
assert_equal('HEADER', lines[0], "Header was not received")
line = JSON.parse(lines[1])
assert_equal(channel_1.to_s, line['channel'], "Wrong channel")
assert_equal('body3', line['message'], "Wrong message")
assert_equal(3, line['id'].to_i, "Wrong message")
line = JSON.parse(lines[2])
assert_equal(channel_2.to_s, line['channel'], "Wrong channel")
assert_equal('body3', line['message'], "Wrong message")
assert_equal(3, line['id'].to_i, "Wrong message")
line = JSON.parse(lines[3])
assert_equal(channel_3.to_s, line['channel'], "Wrong channel")
assert_equal('body2', line['message'], "Wrong message")
assert_equal(2, line['id'].to_i, "Wrong message")
line = JSON.parse(lines[4])
assert_equal(channel_3.to_s, line['channel'], "Wrong channel")
assert_equal('body3', line['message'], "Wrong message")
assert_equal(3, line['id'].to_i, "Wrong message")
EventMachine.stop
end
}
add_test_timeout
}
end
def config_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed
@header_template = 'HEADER'
@message_template = '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}'
end
def test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed
headers = {'accept' => 'application/json'}
channel_1 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_1'
channel_2 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_2'
channel_3 = 'ch_test_retreive_old_messages_in_multichannel_subscribe_using_if_modified_since_header_and_backtrack_mixed_3'
body = 'body'
#create channels with some messages with progressive interval (2,4,6,10,14,18,24,30,36 seconds)
1.upto(3) do |i|
sleep(i * 2)
publish_message(channel_1, headers, body + i.to_s)
sleep(i * 2)
publish_message(channel_2, headers, body + i.to_s)
sleep(i * 2)
publish_message(channel_3, headers, body + i.to_s)
end
#get messages published less then 20 seconds ago
t = Time.now
t = t - 20
headers = headers.merge({'If-Modified-Since' => t.utc.strftime("%a, %e %b %Y %T %Z")})
response = ""
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '.b5' + '/' + channel_3.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
response += chunk
lines = response.split("\r\n")
if lines.length >= 7
assert_equal('HEADER', lines[0], "Header was not received")
line = JSON.parse(lines[1])
assert_equal(channel_1.to_s, line['channel'], "Wrong channel")
assert_equal('body3', line['message'], "Wrong message")
assert_equal(3, line['id'].to_i, "Wrong message")
line = JSON.parse(lines[2])
assert_equal(channel_2.to_s, line['channel'], "Wrong channel")
assert_equal('body1', line['message'], "Wrong message")
assert_equal(1, line['id'].to_i, "Wrong message")
line = JSON.parse(lines[3])
assert_equal(channel_2.to_s, line['channel'], "Wrong channel")
assert_equal('body2', line['message'], "Wrong message")
assert_equal(2, line['id'].to_i, "Wrong message")
line = JSON.parse(lines[4])
assert_equal(channel_2.to_s, line['channel'], "Wrong channel")
assert_equal('body3', line['message'], "Wrong message")
assert_equal(3, line['id'].to_i, "Wrong message")
line = JSON.parse(lines[5])
assert_equal(channel_3.to_s, line['channel'], "Wrong channel")
assert_equal('body2', line['message'], "Wrong message")
assert_equal(2, line['id'].to_i, "Wrong message")
line = JSON.parse(lines[6])
assert_equal(channel_3.to_s, line['channel'], "Wrong channel")
assert_equal('body3', line['message'], "Wrong message")
assert_equal(3, line['id'].to_i, "Wrong message")
EventMachine.stop
end
}
add_test_timeout
}
end
def config_test_max_number_of_channels
@max_number_of_channels = 1
end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment