Commit f291db1a authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

replace ping and disconnect routines by individual timers in each request

parent 3ddb9d05
...@@ -50,8 +50,7 @@ typedef struct { ...@@ -50,8 +50,7 @@ typedef struct {
ngx_uint_t max_number_of_channels; ngx_uint_t max_number_of_channels;
ngx_uint_t max_number_of_broadcast_channels; ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t ping_message_interval; ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_disconnect_interval; ngx_msec_t subscriber_connection_ttl;
time_t subscriber_connection_ttl;
ngx_msec_t buffer_cleanup_interval; ngx_msec_t buffer_cleanup_interval;
time_t message_ttl; time_t message_ttl;
ngx_uint_t max_subscribers_per_channel; ngx_uint_t max_subscribers_per_channel;
...@@ -152,6 +151,12 @@ typedef struct { ...@@ -152,6 +151,12 @@ typedef struct {
ngx_flag_t longpolling; ngx_flag_t longpolling;
} ngx_http_push_stream_worker_subscriber_t; } ngx_http_push_stream_worker_subscriber_t;
typedef struct {
ngx_event_t *disconnect_timer;
ngx_event_t *ping_timer;
ngx_flag_t longpolling;
} ngx_http_push_stream_subscriber_ctx_t;
// cleaning supplies // cleaning supplies
struct ngx_http_push_stream_subscriber_cleanup_s { struct ngx_http_push_stream_subscriber_cleanup_s {
ngx_http_push_stream_worker_subscriber_t *worker_subscriber; ngx_http_push_stream_worker_subscriber_t *worker_subscriber;
......
...@@ -41,10 +41,8 @@ ...@@ -41,10 +41,8 @@
// constants // constants
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES = {49, 0, 0, -1}; static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES = {49, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_SEND_PING = {50, 0, 0, -1}; static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS = {50, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS = {51, 0, 0, -1}; static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL = {51, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS = {52, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL = {53, 0, 0, -1};
// worker processes of the world, unite. // worker processes of the world, unite.
ngx_socket_t ngx_http_push_stream_socketpairs[NGX_MAX_PROCESSES][2]; ngx_socket_t ngx_http_push_stream_socketpairs[NGX_MAX_PROCESSES][2];
...@@ -55,8 +53,6 @@ static void ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *ch ...@@ -55,8 +53,6 @@ static void ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *ch
static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command); static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command);
#define ngx_http_push_stream_alert_worker_check_messages(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES); #define ngx_http_push_stream_alert_worker_check_messages(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES);
#define ngx_http_push_stream_alert_worker_send_ping(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_SEND_PING);
#define ngx_http_push_stream_alert_worker_disconnect_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS);
#define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS); #define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS);
#define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL); #define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL);
...@@ -69,8 +65,6 @@ static void ngx_http_push_stream_clean_worker_data(); ...@@ -69,8 +65,6 @@ static void ngx_http_push_stream_clean_worker_data();
static void ngx_http_push_stream_channel_handler(ngx_event_t *ev); static void ngx_http_push_stream_channel_handler(ngx_event_t *ev);
static ngx_inline void ngx_http_push_stream_process_worker_message(void); static ngx_inline void ngx_http_push_stream_process_worker_message(void);
static ngx_inline void ngx_http_push_stream_send_worker_ping_message(void);
static ngx_inline void ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect);
static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void); static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void);
static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *sentinel, ngx_http_push_stream_msg_t *msg); static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *sentinel, ngx_http_push_stream_msg_t *msg);
......
...@@ -205,8 +205,6 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK = ng ...@@ -205,8 +205,6 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK = ng
static const ngx_str_t NGX_HTTP_PUSH_STREAM_LAST_CHUNK = ngx_string("0" CRLF CRLF); static const ngx_str_t NGX_HTTP_PUSH_STREAM_LAST_CHUNK = ngx_string("0" CRLF CRLF);
ngx_event_t ngx_http_push_stream_ping_event;
ngx_event_t ngx_http_push_stream_disconnect_event;
ngx_event_t ngx_http_push_stream_memory_cleanup_event; ngx_event_t ngx_http_push_stream_memory_cleanup_event;
ngx_event_t ngx_http_push_stream_buffer_cleanup_event; ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
...@@ -240,8 +238,6 @@ static void ngx_http_push_stream_buffer_timer_wake_handler(ngx_e ...@@ -240,8 +238,6 @@ static void ngx_http_push_stream_buffer_timer_wake_handler(ngx_e
static void ngx_http_push_stream_timer_set(ngx_msec_t timer_interval, ngx_event_t *event, ngx_event_handler_pt event_handler, ngx_flag_t start_timer); static void ngx_http_push_stream_timer_set(ngx_msec_t timer_interval, ngx_event_t *event, ngx_event_handler_pt event_handler, ngx_flag_t start_timer);
static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event); static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event);
#define ngx_http_push_stream_ping_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->ping_message_interval, &ngx_http_push_stream_ping_event, ngx_http_push_stream_ping_timer_wake_handler, 1);
#define ngx_http_push_stream_disconnect_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->subscriber_disconnect_interval, &ngx_http_push_stream_disconnect_event, ngx_http_push_stream_disconnect_timer_wake_handler, 1);
#define ngx_http_push_stream_memory_cleanup_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event, ngx_http_push_stream_memory_cleanup_timer_wake_handler, 1); #define ngx_http_push_stream_memory_cleanup_timer_set() ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->memory_cleanup_interval, &ngx_http_push_stream_memory_cleanup_event, ngx_http_push_stream_memory_cleanup_timer_wake_handler, 1);
#define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages); #define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(ngx_http_push_stream_module_main_conf->buffer_cleanup_interval, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages);
......
...@@ -234,11 +234,6 @@ ngx_http_push_stream_channel_handler(ngx_event_t *ev) ...@@ -234,11 +234,6 @@ ngx_http_push_stream_channel_handler(ngx_event_t *ev)
if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES.command) { if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES.command) {
ngx_http_push_stream_process_worker_message(); ngx_http_push_stream_process_worker_message();
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_SEND_PING.command) {
ngx_http_push_stream_send_worker_ping_message();
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS.command) {
// disconnect only expired subscribers (force_disconnect = 0)
ngx_http_push_stream_disconnect_worker_subscribers(0);
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS.command) { } else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS.command) {
ngx_http_push_stream_census_worker_subscribers(); ngx_http_push_stream_census_worker_subscribers();
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL.command) { } else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL.command) {
...@@ -283,60 +278,6 @@ ngx_http_push_stream_census_worker_subscribers(void) ...@@ -283,60 +278,6 @@ ngx_http_push_stream_census_worker_subscribers(void)
} }
static ngx_inline void
ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect)
{
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *cur = sentinel;
time_t now = ngx_time();
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) {
if (cur->longpolling) {
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(cur->request);
} else {
ngx_http_push_stream_send_response_finalize(cur->request);
}
} else {
break;
}
}
}
static ngx_inline void
ngx_http_push_stream_send_worker_ping_message(void)
{
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *cur = sentinel;
if ((ngx_http_push_stream_ping_msg != NULL) && (!ngx_queue_empty(&sentinel->queue))) {
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if ((cur->request != NULL) && (!cur->longpolling)) {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(cur->request, ngx_http_push_stream_module);
ngx_int_t rc;
if (pslcf->eventsource_support) {
rc = ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
} else {
rc = ngx_http_push_stream_send_response_message(cur->request, NULL, ngx_http_push_stream_ping_msg);
}
if (rc == NGX_ERROR) {
ngx_http_push_stream_worker_subscriber_t *prev = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_send_response_finalize(cur->request);
cur = prev;
}
}
}
}
}
static ngx_inline void static ngx_inline void
ngx_http_push_stream_process_worker_message(void) ngx_http_push_stream_process_worker_message(void)
{ {
......
...@@ -102,7 +102,7 @@ static ngx_command_t ngx_http_push_stream_commands[] = { ...@@ -102,7 +102,7 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NULL }, NULL },
{ ngx_string("push_stream_subscriber_connection_ttl"), { ngx_string("push_stream_subscriber_connection_ttl"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot, ngx_conf_set_msec_slot,
NGX_HTTP_MAIN_CONF_OFFSET, NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, subscriber_connection_ttl), offsetof(ngx_http_push_stream_main_conf_t, subscriber_connection_ttl),
NULL }, NULL },
...@@ -249,10 +249,6 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle) ...@@ -249,10 +249,6 @@ ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
// turn on timer to cleanup memory of old messages and channels // turn on timer to cleanup memory of old messages and channels
ngx_http_push_stream_memory_cleanup_timer_set(); ngx_http_push_stream_memory_cleanup_timer_set();
// setting disconnect and ping timer
ngx_http_push_stream_disconnect_timer_set();
ngx_http_push_stream_ping_timer_set();
return ngx_http_push_stream_register_worker_message_handler(cycle); return ngx_http_push_stream_register_worker_message_handler(cycle);
} }
...@@ -284,18 +280,8 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle) ...@@ -284,18 +280,8 @@ ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
return; return;
} }
// disconnect all subscribers (force_disconnect = 1)
ngx_http_push_stream_disconnect_worker_subscribers(1);
ngx_http_push_stream_clean_worker_data(); ngx_http_push_stream_clean_worker_data();
if (ngx_http_push_stream_ping_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_ping_event);
}
if (ngx_http_push_stream_disconnect_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_disconnect_event);
}
if (ngx_http_push_stream_memory_cleanup_event.timer_set) { if (ngx_http_push_stream_memory_cleanup_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_memory_cleanup_event); ngx_del_timer(&ngx_http_push_stream_memory_cleanup_event);
} }
...@@ -351,8 +337,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf) ...@@ -351,8 +337,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->message_ttl = NGX_CONF_UNSET; mcf->message_ttl = NGX_CONF_UNSET;
mcf->max_channel_id_length = NGX_CONF_UNSET_UINT; mcf->max_channel_id_length = NGX_CONF_UNSET_UINT;
mcf->ping_message_interval = NGX_CONF_UNSET_MSEC; mcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
mcf->subscriber_disconnect_interval = NGX_CONF_UNSET_MSEC; mcf->subscriber_connection_ttl = NGX_CONF_UNSET_MSEC;
mcf->subscriber_connection_ttl = NGX_CONF_UNSET;
mcf->max_subscribers_per_channel = NGX_CONF_UNSET; mcf->max_subscribers_per_channel = NGX_CONF_UNSET;
mcf->max_messages_stored_per_channel = NGX_CONF_UNSET_UINT; mcf->max_messages_stored_per_channel = NGX_CONF_UNSET_UINT;
mcf->qtd_templates = 0; mcf->qtd_templates = 0;
...@@ -401,7 +386,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent) ...@@ -401,7 +386,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
} }
// subscriber connection ttl cannot be zero // subscriber connection ttl cannot be zero
if ((conf->subscriber_connection_ttl != NGX_CONF_UNSET) && (conf->subscriber_connection_ttl == 0)) { if ((conf->subscriber_connection_ttl != NGX_CONF_UNSET_MSEC) && (conf->subscriber_connection_ttl == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_connection_ttl cannot be zero."); ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_subscriber_connection_ttl cannot be zero.");
return NGX_CONF_ERROR; return NGX_CONF_ERROR;
} }
...@@ -442,12 +427,6 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent) ...@@ -442,12 +427,6 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
conf->buffer_cleanup_interval = 1000; // 1 second conf->buffer_cleanup_interval = 1000; // 1 second
} }
// calc subscriber disconnect interval
if (conf->subscriber_connection_ttl != NGX_CONF_UNSET) {
ngx_uint_t interval = conf->subscriber_connection_ttl / 3;
conf->subscriber_disconnect_interval = (interval > 1) ? (interval * 1000) + 1000 : 1000; // min 1 second
}
return NGX_CONF_OK; return NGX_CONF_OK;
} }
...@@ -764,8 +743,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) ...@@ -764,8 +743,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
ngx_rbtree_init(&d->unrecoverable_channels, unrecoverable_sentinel, ngx_http_push_stream_rbtree_insert); ngx_rbtree_init(&d->unrecoverable_channels, unrecoverable_sentinel, ngx_http_push_stream_rbtree_insert);
// create ping message // create ping message
ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, ngx_cycle->pool); if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->ping_message_text.data, ngx_http_push_stream_module_main_conf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, ngx_cycle->pool)) == NULL) {
if (ngx_http_push_stream_ping_msg == NULL) {
return NGX_ERROR; return NGX_ERROR;
} }
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool); static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool);
static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r); static ngx_http_push_stream_worker_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static void ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber); static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id); static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id); static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log); static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_log_t *log);
...@@ -52,6 +52,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -52,6 +52,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_str_t *push_mode; ngx_str_t *push_mode;
ngx_flag_t polling, longpolling; ngx_flag_t polling, longpolling;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_push_stream_module_main_conf; ngx_http_push_stream_main_conf_t *mcf = ngx_http_push_stream_module_main_conf;
ngx_int_t rc;
// only accept GET method // only accept GET method
if (!(r->method & NGX_HTTP_GET)) { if (!(r->method & NGX_HTTP_GET)) {
...@@ -164,9 +165,14 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -164,9 +165,14 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
} }
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_registry_subscriber_locked(worker_subscriber); rc = ngx_http_push_stream_registry_subscriber_locked(worker_subscriber);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
if (rc == NGX_ERROR) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// adding subscriber to channel(s) and send backtrack messages // adding subscriber to channel(s) and send backtrack messages
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
...@@ -232,7 +238,10 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -232,7 +238,10 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
} }
worker_subscriber->longpolling = 1; worker_subscriber->longpolling = 1;
ngx_http_push_stream_registry_subscriber_locked(worker_subscriber); if (ngx_http_push_stream_registry_subscriber_locked(worker_subscriber) == NGX_ERROR) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// adding subscriber to channel(s) // adding subscriber to channel(s)
cur = channels_ids; cur = channels_ids;
...@@ -241,12 +250,12 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -241,12 +250,12 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
// channel not found // channel not found
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", cur->id->data);
return NGX_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel, longpolling)) == NULL) { if ((subscription = ngx_http_push_stream_create_channel_subscription(r, channel, longpolling)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, cur->id, subscription, &worker_subscriber->subscriptions_sentinel, r->connection->log); ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, cur->id, subscription, &worker_subscriber->subscriptions_sentinel, r->connection->log);
...@@ -448,7 +457,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque ...@@ -448,7 +457,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
worker_subscriber->longpolling = 0; worker_subscriber->longpolling = 0;
worker_subscriber->request = r; worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid; worker_subscriber->worker_subscribed_pid = ngx_pid;
worker_subscriber->expires = (ngx_http_push_stream_module_main_conf->subscriber_connection_ttl == NGX_CONF_UNSET) ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->subscriber_connection_ttl); worker_subscriber->expires = (ngx_http_push_stream_module_main_conf->subscriber_connection_ttl == NGX_CONF_UNSET_MSEC) ? 0 : (ngx_time() + (ngx_http_push_stream_module_main_conf->subscriber_connection_ttl / 1000));
ngx_queue_init(&worker_subscriber->queue); ngx_queue_init(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue); ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue);
...@@ -474,18 +483,58 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque ...@@ -474,18 +483,58 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
return worker_subscriber; return worker_subscriber;
} }
static void static ngx_int_t
ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber) ngx_http_push_stream_registry_subscriber_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber)
{ {
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot; ngx_http_push_stream_worker_data_t *thisworker_data = data->ipc + ngx_process_slot;
ngx_http_push_stream_subscriber_ctx_t *ctx;
// adding subscriber to woker list of subscribers // adding subscriber to woker list of subscribers
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel->queue, &worker_subscriber->queue); ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel->queue, &worker_subscriber->queue);
if ((ngx_http_push_stream_module_main_conf->subscriber_connection_ttl != NGX_CONF_UNSET_MSEC) ||
(ngx_http_push_stream_module_main_conf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if ((ctx = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_http_push_stream_subscriber_ctx_t))) == NULL) {
return NGX_ERROR;
}
ctx->longpolling = worker_subscriber->longpolling;
if (ngx_http_push_stream_module_main_conf->subscriber_connection_ttl != NGX_CONF_UNSET_MSEC) {
if ((ctx->disconnect_timer = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_event_t))) == NULL) {
return NGX_ERROR;
}
}
if ((!ctx->longpolling) && (ngx_http_push_stream_module_main_conf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if ((ctx->ping_timer = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_event_t))) == NULL) {
return NGX_ERROR;
}
}
if (ctx->disconnect_timer != NULL) {
ctx->disconnect_timer->handler = ngx_http_push_stream_disconnect_timer_wake_handler;
ctx->disconnect_timer->data = worker_subscriber->request;
ctx->disconnect_timer->log = worker_subscriber->request->connection->log;
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->subscriber_connection_ttl, ctx->disconnect_timer);
}
if (ctx->ping_timer != NULL) {
ctx->ping_timer->handler = ngx_http_push_stream_ping_timer_wake_handler;
ctx->ping_timer->data = worker_subscriber->request;
ctx->ping_timer->log = worker_subscriber->request->connection->log;
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->ping_message_interval, ctx->ping_timer);
}
ngx_http_set_ctx(worker_subscriber->request, ctx, ngx_http_push_stream_module);
}
// increment global subscribers count // increment global subscribers count
data->subscribers++; data->subscribers++;
thisworker_data->subscribers++; thisworker_data->subscribers++;
return NGX_OK;
} }
static ngx_flag_t static ngx_flag_t
......
...@@ -732,15 +732,35 @@ ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_e ...@@ -732,15 +732,35 @@ ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_e
static void static void
ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev) ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
{ {
ngx_http_push_stream_alert_worker_send_ping(ngx_pid, ngx_process_slot, ngx_cycle->log); ngx_http_request_t *r = (ngx_http_request_t *) ev->data;
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->ping_message_interval, &ngx_http_push_stream_ping_event); ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_int_t rc;
if (pslcf->eventsource_support) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
} else {
rc = ngx_http_push_stream_send_response_message(r, NULL, ngx_http_push_stream_ping_msg);
}
if (rc == NGX_ERROR) {
ngx_http_push_stream_send_response_finalize(r);
} else {
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->ping_message_interval, ctx->ping_timer);
}
} }
static void static void
ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev) ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev)
{ {
ngx_http_push_stream_alert_worker_disconnect_subscribers(ngx_pid, ngx_process_slot, ngx_cycle->log); ngx_http_request_t *r = (ngx_http_request_t *) ev->data;
ngx_http_push_stream_timer_reset(ngx_http_push_stream_module_main_conf->subscriber_disconnect_interval, &ngx_http_push_stream_disconnect_event); ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
if (ctx->longpolling) {
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(r);
} else {
ngx_http_push_stream_send_response_finalize(r);
}
} }
static void static void
...@@ -844,6 +864,17 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worke ...@@ -844,6 +864,17 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worke
{ {
ngx_http_push_stream_subscription_t *cur, *sentinel; ngx_http_push_stream_subscription_t *cur, *sentinel;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(worker_subscriber->request, ngx_http_push_stream_module);
if (ctx != NULL) {
if ((ctx->disconnect_timer != NULL) && ctx->disconnect_timer->timer_set) {
ngx_del_timer(ctx->disconnect_timer);
}
if ((ctx->ping_timer != NULL) && ctx->ping_timer->timer_set) {
ngx_del_timer(ctx->ping_timer);
}
}
sentinel = &worker_subscriber->subscriptions_sentinel; sentinel = &worker_subscriber->subscriptions_sentinel;
......
...@@ -26,7 +26,7 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase ...@@ -26,7 +26,7 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase
sub.callback { sub.callback {
stop = Time.now stop = Time.now
elapsed = time_diff_sec(start, stop) elapsed = time_diff_sec(start, stop)
assert(elapsed >= 38 && elapsed <= 39.5, "Disconnect was in #{elapsed} seconds") assert(elapsed >= 37 && elapsed <= 37.5, "Disconnect was in #{elapsed} seconds")
assert(response.include?(@footer_template), "Didn't received footer template") assert(response.include?(@footer_template), "Didn't received footer template")
EventMachine.stop EventMachine.stop
} }
...@@ -57,7 +57,7 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase ...@@ -57,7 +57,7 @@ class TestSubscriberConnectionCleanup < Test::Unit::TestCase
sub.callback { sub.callback {
stop = Time.now stop = Time.now
elapsed = time_diff_sec(start, stop) elapsed = time_diff_sec(start, stop)
assert(elapsed >= 38 && elapsed <= 39.5, "Disconnect was in #{elapsed} seconds") assert(elapsed >= 37 && elapsed <= 37.5, "Disconnect was in #{elapsed} seconds")
assert_equal(7, chunksReceived, "Received #{chunksReceived} chunks") assert_equal(7, chunksReceived, "Received #{chunksReceived} chunks")
EventMachine.stop EventMachine.stop
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment