Commit 48c039f1 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

make last_message_time and last_message_tag global, so messages published in...

make last_message_time and last_message_tag global, so messages published in different channels on same second will receive sequential tags
parent 54a25d61
...@@ -190,6 +190,8 @@ typedef struct { ...@@ -190,6 +190,8 @@ typedef struct {
ngx_rbtree_t unrecoverable_channels; ngx_rbtree_t unrecoverable_channels;
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
time_t startup; time_t startup;
time_t last_message_time;
ngx_int_t last_message_tag;
} ngx_http_push_stream_shm_data_t; } ngx_http_push_stream_shm_data_t;
ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL; ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL;
......
...@@ -230,7 +230,7 @@ static void ngx_http_push_stream_send_response_finalize_for_long ...@@ -230,7 +230,7 @@ static void ngx_http_push_stream_send_response_finalize_for_long
static ngx_int_t ngx_http_push_stream_memory_cleanup(); static ngx_int_t ngx_http_push_stream_memory_cleanup();
static ngx_int_t ngx_http_push_stream_buffer_cleanup(); static ngx_int_t ngx_http_push_stream_buffer_cleanup();
ngx_http_push_stream_channel_t *ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *data, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool); ngx_http_push_stream_channel_t *ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
......
...@@ -743,6 +743,8 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -743,6 +743,8 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
} }
d->startup = ngx_time(); d->startup = ngx_time();
d->last_message_time = 0;
d->last_message_tag = 0;
// initialize rbtree // initialize rbtree
if ((sentinel = ngx_slab_alloc(shpool, sizeof(*sentinel))) == NULL) { if ((sentinel = ngx_slab_alloc(shpool, sizeof(*sentinel))) == NULL) {
......
...@@ -256,8 +256,9 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -256,8 +256,9 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
ngx_http_push_stream_channel_t * ngx_http_push_stream_channel_t *
ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *data, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool) ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
...@@ -274,7 +275,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_ ...@@ -274,7 +275,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
} }
// create a buffer copy in shared mem // create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(data, len, channel, channel->last_message_id + 1, event_id, temp_pool); msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(text, len, channel, channel->last_message_id + 1, event_id, temp_pool);
if (msg == NULL) { if (msg == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex); ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate message in shared memory"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate message in shared memory");
...@@ -282,13 +283,13 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_ ...@@ -282,13 +283,13 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_
} }
channel->last_message_id++; channel->last_message_id++;
((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->published_messages++; data->published_messages++;
// tag message with time stamp and a sequence tag // tag message with time stamp and a sequence tag
msg->time = ngx_time(); msg->time = ngx_time();
msg->tag = (msg->time == channel->last_message_time) ? (channel->last_message_tag + 1) : 0; msg->tag = (msg->time == data->last_message_time) ? (data->last_message_tag + 1) : 0;
channel->last_message_time = msg->time; channel->last_message_time = data->last_message_time = msg->time;
channel->last_message_tag = msg->tag; channel->last_message_tag = data->last_message_tag = msg->tag;
// set message expiration time // set message expiration time
msg->expires = (ngx_http_push_stream_module_main_conf->message_ttl == NGX_CONF_UNSET ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->message_ttl)); msg->expires = (ngx_http_push_stream_module_main_conf->message_ttl == NGX_CONF_UNSET ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->message_ttl));
......
...@@ -463,4 +463,41 @@ class TestSubscriberLongPolling < Test::Unit::TestCase ...@@ -463,4 +463,41 @@ class TestSubscriberLongPolling < Test::Unit::TestCase
add_test_timeout add_test_timeout
} }
end end
def config_test_receiving_messages_when_connected_in_more_then_one_channel
@store_messages = "on"
@message_template = '{\"id\":\"~id~\", \"message\":\"~text~\", \"channel\":\"~channel~\"}'
end
def test_receiving_messages_when_connected_in_more_then_one_channel
headers = {'accept' => 'application/json'}
body = 'published message'
channel_1 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_1'
channel_2 = 'ch_test_receiving_messages_when_connected_in_more_then_one_channel_2'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s).get :head => {'If-Modified-Since' => 'Thu, 1 Jan 1970 00:00:00 GMT', 'If-None-Match' => 0}, :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Wrong status")
response = JSON.parse(sub_1.response)
assert_equal(channel_1, response["channel"], "Wrong channel")
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s).get :head => {'If-Modified-Since' => sub_1.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_1.response_header['ETAG']}, :timeout => 30
sub_2.callback {
assert_equal(200, sub_2.response_header.status, "Wrong status")
response = JSON.parse(sub_2.response)
assert_equal(channel_2, response["channel"], "Wrong channel")
assert_equal(sub_1.response_header['ETAG'].to_i + 1, sub_2.response_header['ETAG'].to_i)
EventMachine.stop
}
}
publish_message_inline(channel_1.to_s, headers, body)
publish_message_inline(channel_2.to_s, headers, body)
add_test_timeout
}
end
end end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment