Commit 9b9dee58 authored by Wandenberg's avatar Wandenberg

using distributed lock with different responsibilities

parent a61b7a0c
......@@ -134,7 +134,6 @@ typedef struct {
typedef struct {
ngx_rbtree_node_t node;
ngx_queue_t queue;
ngx_queue_t *queue_sentinel;
ngx_str_t id;
ngx_uint_t last_message_id;
time_t last_message_time;
......@@ -147,6 +146,7 @@ typedef struct {
ngx_flag_t deleted;
ngx_flag_t wildcard;
ngx_http_push_stream_msg_t *channel_deleted_message;
ngx_shmtx_t *mutex;
} ngx_http_push_stream_channel_t;
typedef struct {
......@@ -242,9 +242,17 @@ struct ngx_http_push_stream_shm_data_s {
ngx_uint_t stored_messages; // # of messages being stored
ngx_uint_t subscribers; // # of subscribers in all channels
ngx_queue_t messages_trash;
ngx_shmtx_t messages_trash_mutex;
ngx_shmtx_sh_t messages_trash_lock;
ngx_queue_t channels_queue;
ngx_shmtx_t channels_queue_mutex;
ngx_shmtx_sh_t channels_queue_lock;
ngx_queue_t channels_trash;
ngx_shmtx_t channels_trash_mutex;
ngx_shmtx_sh_t channels_trash_lock;
ngx_queue_t channels_to_delete;
ngx_shmtx_t channels_to_delete_mutex;
ngx_shmtx_sh_t channels_to_delete_lock;
ngx_uint_t channels_in_trash; // # of channels in trash queue
ngx_uint_t messages_in_trash; // # of messages in trash queue
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
......@@ -256,6 +264,11 @@ struct ngx_http_push_stream_shm_data_s {
ngx_shm_zone_t *shm_zone;
ngx_slab_pool_t *shpool;
ngx_uint_t slots_for_census;
ngx_uint_t mutex_round_robin;
ngx_shmtx_t channels_mutex[10];
ngx_shmtx_sh_t channels_lock[10];
ngx_shmtx_t cleanup_mutex;
ngx_shmtx_sh_t cleanup_lock;
};
ngx_shm_zone_t *ngx_http_push_stream_global_shm_zone = NULL;
......@@ -389,6 +402,9 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOWED_HEADERS = ngx_string("If-Mo
#define NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(counter) \
(counter = (counter > 1) ? counter - 1 : 0)
#define NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(counter, qtd) \
(counter = (counter > qtd) ? counter - qtd : 0)
#define NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN 30 //sizeof("Mon, 28 Sep 1970 06:00:00 GMT")
......
......@@ -58,7 +58,7 @@ static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int
#define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL)
#define ngx_http_push_stream_alert_worker_shutting_down_cleanup(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CLEANUP_SHUTTING_DOWN)
static ngx_int_t ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
......
......@@ -226,7 +226,7 @@ ngx_event_t ngx_http_push_stream_memory_cleanup_event;
ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
// general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_main_conf_t *mcf, u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_conf_t *mcf, u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r);
static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_get_last_received_message_values(ngx_http_request_t *r, time_t *if_modified_since, ngx_int_t *tag, ngx_str_t **last_event_id);
......@@ -266,17 +266,17 @@ static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_in
#define ngx_http_push_stream_memory_cleanup_timer_set(void) ngx_http_push_stream_timer_set(NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_INTERVAL, &ngx_http_push_stream_memory_cleanup_event, ngx_http_push_stream_memory_cleanup_timer_wake_handler, 1);
#define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages);
static void ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber);
static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg, ngx_http_push_stream_shm_data_t *data);
static void ngx_http_push_stream_throw_the_message_away(ngx_http_push_stream_msg_t *msg, ngx_http_push_stream_shm_data_t *data);
static ngx_flag_t ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_free_worker_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg);
static void ngx_http_push_stream_free_message_memory(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_free_worker_message_memory(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_shm_data_t *data, ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
ngx_uint_t ngx_http_push_stream_ensure_qtd_of_messages(ngx_http_push_stream_shm_data_t *data, ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
static ngx_inline void ngx_http_push_stream_delete_worker_channel(void);
static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request_t *r, ngx_uint_t default_subtype);
......@@ -298,4 +298,6 @@ static ngx_int_t ngx_http_push_stream_set_expires(ngx_http_request_t
ngx_http_push_stream_requested_channel_t *ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool);
ngx_int_t ngx_http_push_stream_create_shmtx(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_UTILS_H_ */
......@@ -648,7 +648,7 @@ shared_examples_for "statistics location" do
pub.callback do
pub.should be_http_status(200).without_body
sleep(2)
sleep(5)
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_3.callback do
......
......@@ -12,7 +12,7 @@ Signal.trap("CLD", "IGNORE")
RSpec.configure do |config|
config.after(:each) do
non_time_wait_connections = `netstat -an | grep ":#{nginx_port} " | grep -v TIME_WAIT | grep -v LISTEN`.chomp.split("\n")
non_time_wait_connections = `netstat -an | grep ":#{nginx_port} " | grep -v TIME_WAIT | grep -v LISTEN | grep -v ESTABLISHED`.chomp.split("\n")
abort "There are sockects on non time wait state: #{non_time_wait_connections.join("\n")}" if non_time_wait_connections.count > 0
NginxTestHelper::Config.delete_config_and_log_files(config_id) if has_passed?
end
......
......@@ -200,13 +200,12 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_queue_t queue_channel_info;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_queue_t *q;
ngx_http_push_stream_channel_t *channel;
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
ngx_shmtx_lock(&data->channels_queue_mutex);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
......@@ -226,7 +225,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
}
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->channels_queue_mutex);
return ngx_http_push_stream_send_response_channels_info(r, &queue_channel_info);
}
......@@ -235,8 +234,6 @@ static ngx_int_t
ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channels) {
ngx_str_t *text;
ngx_queue_t queue_channel_info;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
ngx_http_push_stream_channel_info_t *channel_info;
ngx_http_push_stream_requested_channel_t *requested_channel;
......@@ -245,7 +242,6 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
......@@ -260,7 +256,6 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
qtd_channels++;
}
}
ngx_shmtx_unlock(&shpool->mutex);
if (qtd_channels == 0) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_FOUND, NULL);
......
......@@ -183,19 +183,21 @@ ngx_http_push_stream_alert_shutting_down_workers(void)
static ngx_int_t
ngx_http_push_stream_unsubscribe_worker_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
ngx_http_push_stream_unsubscribe_worker(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *q;
ngx_shmtx_lock(channel->mutex);
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
if ((worker->pid == ngx_pid) || (worker->slot == ngx_process_slot)) {
ngx_queue_remove(&worker->queue);
ngx_slab_free_locked(shpool, worker);
ngx_slab_free(shpool, worker);
break;
}
}
ngx_shmtx_unlock(channel->mutex);
return NGX_OK;
}
......@@ -209,21 +211,20 @@ ngx_http_push_stream_clean_worker_data(ngx_http_push_stream_shm_data_t *data)
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_worker_msg_t *worker_msg;
ngx_shmtx_lock(&shpool->mutex);
while (!ngx_queue_empty(&data->ipc[ngx_process_slot].messages_queue)) {
cur = ngx_queue_head(&data->ipc[ngx_process_slot].messages_queue);
worker_msg = ngx_queue_data(cur, ngx_http_push_stream_worker_msg_t, queue);
ngx_http_push_stream_free_worker_message_memory_locked(shpool, worker_msg);
ngx_http_push_stream_free_worker_message_memory(shpool, worker_msg);
}
ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_queue);
ngx_shmtx_lock(&data->channels_queue_mutex);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
ngx_http_push_stream_unsubscribe_worker_locked(channel, shpool);
ngx_http_push_stream_unsubscribe_worker(channel, shpool);
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->channels_queue_mutex);
data->ipc[ngx_process_slot].pid = NGX_INVALID_FILE;
data->ipc[ngx_process_slot].subscribers = 0;
......@@ -321,19 +322,22 @@ ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_dat
ngx_queue_t *q, *cur, *cur_worker;
int i;
ngx_shmtx_lock(&shpool->mutex);
thisworker_data->subscribers = 0;
ngx_shmtx_lock(&data->channels_queue_mutex);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_channel_t *channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
ngx_shmtx_lock(channel->mutex);
for (cur_worker = ngx_queue_head(&channel->workers_with_subscribers); cur_worker != ngx_queue_sentinel(&channel->workers_with_subscribers); cur_worker = ngx_queue_next(cur_worker)) {
ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == ngx_pid) {
worker->subscribers = 0;
}
}
ngx_shmtx_unlock(channel->mutex);
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
for (q = ngx_queue_head(&thisworker_data->subscribers_queue); q != ngx_queue_sentinel(&thisworker_data->subscribers_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(q, ngx_http_push_stream_subscriber_t, worker_queue);
......@@ -345,26 +349,35 @@ ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_dat
thisworker_data->subscribers++;
}
ngx_shmtx_lock(&shpool->mutex);
data->slots_for_census--;
ngx_shmtx_unlock(&shpool->mutex);
if (data->slots_for_census == 0) {
ngx_shmtx_lock(&shpool->mutex);
data->subscribers = 0;
for (i = 0; i < NGX_MAX_PROCESSES; i++) {
if (data->ipc[i].pid > 0) {
data->subscribers += data->ipc[i].subscribers;
}
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_lock(&data->channels_queue_mutex);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_channel_t *channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
ngx_shmtx_lock(channel->mutex);
channel->subscribers = 0;
for (cur_worker = ngx_queue_head(&channel->workers_with_subscribers); cur_worker != ngx_queue_sentinel(&channel->workers_with_subscribers); cur_worker = ngx_queue_next(cur_worker)) {
ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
channel->subscribers += worker->subscribers;
}
ngx_shmtx_unlock(channel->mutex);
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
}
ngx_shmtx_unlock(&shpool->mutex);
}
......@@ -400,38 +413,37 @@ ngx_http_push_stream_process_worker_message_data(ngx_http_push_stream_shm_data_t
// that's quite bad you see. a previous worker died with an undelivered message.
// but all its subscribers' connections presumably got canned, too. so it's not so bad after all.
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: worker %i intercepted a message intended for another worker process (%i) that probably died", ngx_pid, worker_msg->pid);
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: worker %i intercepted a message intended for another worker process (%i) that probably died and will remove the reference to the old worker", ngx_pid, worker_msg->pid);
// delete that invalid sucker
ngx_shmtx_lock(worker_msg->channel->mutex);
for (q = ngx_queue_head(&worker_msg->channel->workers_with_subscribers); q != ngx_queue_sentinel(&worker_msg->channel->workers_with_subscribers); q = ngx_queue_next(q)) {
ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == worker_msg->pid) {
ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "push stream module: reference to worker %i will be removed", worker_msg->pid);
ngx_shmtx_lock(&shpool->mutex);
ngx_queue_remove(&worker->queue);
ngx_slab_free_locked(shpool, worker);
ngx_shmtx_unlock(&shpool->mutex);
ngx_slab_free(shpool, worker);
break;
}
}
ngx_shmtx_unlock(worker_msg->channel->mutex);
}
// free worker_msg already sent
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_free_worker_message_memory_locked(shpool, worker_msg);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_free_worker_message_memory(shpool, worker_msg);
}
}
static ngx_int_t
ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_worker_data_t *thisworker_data = mcf->shm_data->ipc + worker_slot;
ngx_http_push_stream_worker_msg_t *newmessage;
ngx_shmtx_lock(&shpool->mutex);
if ((newmessage = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate worker message, pid: %P, slot: %d", pid, worker_slot);
return NGX_ERROR;
}
......@@ -444,6 +456,7 @@ ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *
newmessage->mcf = mcf;
*queue_was_empty = ngx_queue_empty(&thisworker_data->messages_queue);
ngx_queue_insert_tail(&thisworker_data->messages_queue, &newmessage->queue);
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
......@@ -456,15 +469,14 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
// in shared memory, identified by pid.
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *q;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_flag_t queue_was_empty[NGX_MAX_PROCESSES];
ngx_shmtx_lock(&shpool->mutex);
ngx_shmtx_lock(channel->mutex);
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscriptions, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log, mcf);
ngx_http_push_stream_send_worker_message(channel, &worker->subscriptions, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log, mcf);
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(channel->mutex);
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
......@@ -475,9 +487,7 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
}
if (ngx_queue_empty(&msg->queue)) {
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_mark_message_to_delete_locked(msg, mcf->shm_data);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_throw_the_message_away(msg, mcf->shm_data);
}
}
......
......@@ -99,7 +99,6 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
// create the channel if doesn't exist
requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, cf, mcf);
if (requested_channel->channel == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
}
......
......@@ -1077,5 +1077,35 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
ngx_queue_insert_tail(&global_shm_data->shm_datas_queue, &d->shm_data_queue);
if (ngx_http_push_stream_create_shmtx(&d->messages_trash_mutex, &d->messages_trash_lock, (u_char *) "push_stream_messages_trash") != NGX_OK) {
return NGX_ERROR;
}
if (ngx_http_push_stream_create_shmtx(&d->channels_queue_mutex, &d->channels_queue_lock, (u_char *) "push_stream_channels_queue") != NGX_OK) {
return NGX_ERROR;
}
if (ngx_http_push_stream_create_shmtx(&d->channels_to_delete_mutex, &d->channels_to_delete_lock, (u_char *) "push_stream_channels_to_delete") != NGX_OK) {
return NGX_ERROR;
}
if (ngx_http_push_stream_create_shmtx(&d->channels_trash_mutex, &d->channels_trash_lock, (u_char *) "push_stream_channels_trash") != NGX_OK) {
return NGX_ERROR;
}
if (ngx_http_push_stream_create_shmtx(&d->cleanup_mutex, &d->cleanup_lock, (u_char *) "push_stream_cleanup") != NGX_OK) {
return NGX_ERROR;
}
u_char lock_name[25];
for (i = 0; i < 10; i++) {
ngx_sprintf(lock_name, "push_stream_channels_%d", i);
if (ngx_http_push_stream_create_shmtx(&d->channels_mutex[i], &d->channels_lock[i], lock_name) != NGX_OK) {
return NGX_ERROR;
}
}
d->mutex_round_robin = 0;
return NGX_OK;
}
......@@ -27,12 +27,12 @@
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool);
static ngx_http_push_stream_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_int_t ngx_http_push_stream_registry_subscriber(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_flag_t ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static void ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_get_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_log_t *log);
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool);
static ngx_http_push_stream_padding_t *ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r);
void ngx_http_push_stream_websocket_reading(ngx_http_request_t *r);
......@@ -42,7 +42,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *requested_channels, *requested_channel;
ngx_queue_t *q;
......@@ -52,7 +51,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_str_t *last_event_id = NULL;
ngx_str_t *push_mode;
ngx_flag_t polling, longpolling;
ngx_int_t rc;
ngx_int_t status_code;
ngx_str_t *explain_error_message;
ngx_str_t vv_allowed_origins = ngx_null_string;
......@@ -128,11 +126,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_shmtx_lock(&shpool->mutex);
rc = ngx_http_push_stream_registry_subscriber_locked(r, worker_subscriber);
ngx_shmtx_unlock(&shpool->mutex);
if (rc == NGX_ERROR) {
if (ngx_http_push_stream_registry_subscriber(r, worker_subscriber) == NGX_ERROR) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -181,8 +175,6 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
greater_message_tag = tag;
greater_message_time = (if_modified_since < 0) ? 0 : if_modified_since;
ngx_shmtx_lock(&shpool->mutex);
// check if has any message to send
for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
......@@ -204,13 +196,11 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
if (longpolling && !has_message_to_send) {
// long polling mode without messages
if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
worker_subscriber->longpolling = 1;
if (ngx_http_push_stream_registry_subscriber_locked(r, worker_subscriber) == NGX_ERROR) {
ngx_shmtx_unlock(&shpool->mutex);
if (ngx_http_push_stream_registry_subscriber(r, worker_subscriber) == NGX_ERROR) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
......@@ -219,19 +209,15 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, requested_channel->channel, worker_subscriber)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->channel, subscription, &worker_subscriber->subscriptions, r->connection->log);
ngx_http_push_stream_assing_subscription_to_channel(shpool, requested_channel->channel, subscription, &worker_subscriber->subscriptions, r->connection->log);
}
ngx_shmtx_unlock(&shpool->mutex);
return NGX_DONE;
}
ngx_shmtx_unlock(&shpool->mutex);
// polling or long polling with messages to send
ngx_http_push_stream_add_polling_headers(r, greater_message_time, greater_message_tag, temp_pool);
......@@ -280,7 +266,6 @@ static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_subscription_t *subscription;
ngx_int_t result;
ngx_slab_pool_t *shpool = mcf->shpool;
if ((subscription = ngx_http_push_stream_create_channel_subscription(r, requested_channel->channel, subscriber)) == NULL) {
......@@ -290,11 +275,7 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t
// send old messages to new subscriber
ngx_http_push_stream_send_old_messages(r, requested_channel->channel, requested_channel->backtrack_messages, if_modified_since, tag, 0, -1, last_event_id);
ngx_shmtx_lock(&shpool->mutex);
result = ngx_http_push_stream_assing_subscription_to_channel_locked(shpool, requested_channel->channel, subscription, &subscriber->subscriptions, r->connection->log);
ngx_shmtx_unlock(&shpool->mutex);
return result;
return ngx_http_push_stream_assing_subscription_to_channel(shpool, requested_channel->channel, subscription, &subscriber->subscriptions, r->connection->log);
}
......@@ -398,10 +379,10 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
return NULL;
}
ngx_queue_init(&worker_subscriber->worker_queue);
worker_subscriber->longpolling = 0;
worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid;
ngx_queue_init(&worker_subscriber->worker_queue);
ngx_queue_init(&worker_subscriber->subscriptions);
ctx->subscriber = worker_subscriber;
......@@ -426,7 +407,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
}
static ngx_int_t
ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber)
ngx_http_push_stream_registry_subscriber(ngx_http_request_t *r, ngx_http_push_stream_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
......@@ -434,6 +415,7 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_worker_data_t *thisworker_data = &data->ipc[ngx_process_slot];
ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
// adding subscriber to worker list of subscribers
ngx_queue_insert_tail(&thisworker_data->subscribers_queue, &worker_subscriber->worker_queue);
......@@ -471,7 +453,9 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
}
// increment global subscribers count
ngx_shmtx_lock(&shpool->mutex);
data->subscribers++;
ngx_shmtx_unlock(&shpool->mutex);
thisworker_data->subscribers++;
return NGX_OK;
......@@ -490,6 +474,7 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
old_messages = 1;
} else if ((last_event_id != NULL) || (if_modified_since >= 0)) {
ngx_flag_t found = 0;
ngx_shmtx_lock(channel->mutex);
for (q = ngx_queue_head(&channel->message_queue); q != ngx_queue_sentinel(&channel->message_queue); q = ngx_queue_next(q)) {
message = ngx_queue_data(q, ngx_http_push_stream_msg_t, queue);
if (message->deleted) {
......@@ -513,6 +498,7 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
break;
}
}
ngx_shmtx_unlock(channel->mutex);
}
}
return old_messages;
......@@ -528,6 +514,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
if (backtrack > 0) {
ngx_uint_t qtd = (backtrack > channel->stored_messages) ? channel->stored_messages : backtrack;
ngx_uint_t start = channel->stored_messages - qtd;
ngx_shmtx_lock(channel->mutex);
// positioning at first message, and send the others
for (q = ngx_queue_head(&channel->message_queue); (qtd > 0) && q != ngx_queue_sentinel(&channel->message_queue); q = ngx_queue_next(q)) {
message = ngx_queue_data(q, ngx_http_push_stream_msg_t, queue);
......@@ -542,8 +529,10 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
start--;
}
}
ngx_shmtx_unlock(channel->mutex);
} else if ((last_event_id != NULL) || (if_modified_since >= 0)) {
ngx_flag_t found = 0;
ngx_shmtx_lock(channel->mutex);
for (q = ngx_queue_head(&channel->message_queue); q != ngx_queue_sentinel(&channel->message_queue); q = ngx_queue_next(q)) {
message = ngx_queue_data(q, ngx_http_push_stream_msg_t, queue);
if (message->deleted) {
......@@ -574,16 +563,25 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
ngx_http_push_stream_send_response_message(r, channel, message, 0, send_separator);
}
}
ngx_shmtx_unlock(channel->mutex);
}
}
}
static ngx_http_push_stream_pid_queue_t *
ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
ngx_http_push_stream_get_worker_subscriber_channel_sentinel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_log_t *log)
{
ngx_http_push_stream_pid_queue_t *worker_sentinel;
ngx_queue_t *q;
if ((worker_sentinel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_pid_queue_t))) == NULL) {
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker_sentinel = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
if (worker_sentinel->pid == ngx_pid) {
return worker_sentinel;
}
}
if ((worker_sentinel = ngx_slab_alloc(shpool, sizeof(ngx_http_push_stream_pid_queue_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate worker subscriber queue marker in shared memory");
return NULL;
}
......@@ -619,26 +617,16 @@ ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http
}
static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log)
ngx_http_push_stream_assing_subscription_to_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(subscription->subscriber->request, ngx_http_push_stream_module);
ngx_queue_t *q;
ngx_http_push_stream_pid_queue_t *worker, *worker_subscribers_sentinel = NULL;
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
if (worker->pid == ngx_pid) {
worker_subscribers_sentinel = worker;
break;
}
}
ngx_http_push_stream_pid_queue_t *worker_subscribers_sentinel;
if (worker_subscribers_sentinel == NULL) { // found nothing
worker_subscribers_sentinel = ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(shpool, channel, log, mcf);
if (worker_subscribers_sentinel == NULL) {
ngx_shmtx_lock(channel->mutex);
if ((worker_subscribers_sentinel = ngx_http_push_stream_get_worker_subscriber_channel_sentinel_locked(shpool, channel, log)) == NULL) {
ngx_shmtx_unlock(channel->mutex);
return NGX_ERROR;
}
}
channel->subscribers++; // do this only when we know everything went okay
worker_subscribers_sentinel->subscribers++;
......@@ -646,6 +634,7 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
ngx_queue_insert_tail(subscriptions, &subscription->queue);
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriptions, &subscription->channel_worker_queue);
subscription->channel_worker_sentinel = worker_subscribers_sentinel;
ngx_shmtx_unlock(channel->mutex);
return NGX_OK;
}
......
......@@ -25,7 +25,7 @@
#include <ngx_http_push_stream_module_utils.h>
static void nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel);
static void nxg_http_push_stream_free_channel_memory(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel);
static void ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler);
static void ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_padding(ngx_http_request_t *r, size_t len, ngx_flag_t sending_header);
......@@ -36,16 +36,18 @@ static ngx_inline void ngx_http_push_stream_cleanup_shutting_down_worker_data(ng
static void ngx_http_push_stream_flush_pending_output(ngx_http_request_t *r);
static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_shm_data_t *data, ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
ngx_uint_t
ngx_http_push_stream_ensure_qtd_of_messages(ngx_http_push_stream_shm_data_t *data, ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
{
ngx_http_push_stream_msg_t *msg;
ngx_queue_t *q;
ngx_uint_t qtd_removed = 0;
if (max_messages == NGX_CONF_UNSET_UINT) {
return;
return qtd_removed;
}
ngx_shmtx_lock(channel->mutex);
while (!ngx_queue_empty(&channel->message_queue) && ((channel->stored_messages > max_messages) || expired)) {
q = ngx_queue_head(&channel->message_queue);
msg = ngx_queue_data(q, ngx_http_push_stream_msg_t, queue);
......@@ -54,11 +56,14 @@ ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_shm_data
break;
}
qtd_removed++;
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->stored_messages);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->stored_messages);
ngx_queue_remove(&msg->queue);
ngx_http_push_stream_mark_message_to_delete_locked(msg, data);
ngx_http_push_stream_throw_the_message_away(msg, data);
}
ngx_shmtx_unlock(channel->mutex);
return qtd_removed;
}
......@@ -77,20 +82,17 @@ ngx_http_push_stream_delete_channels(void)
void
ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker, *cur;
ngx_queue_t *q;
ngx_shmtx_lock(&data->channels_to_delete_mutex);
for (q = ngx_queue_head(&data->channels_to_delete); q != ngx_queue_sentinel(&data->channels_to_delete); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
if (channel->queue_sentinel != &data->channels_to_delete) {
q = &data->channels_to_delete;
continue;
}
ngx_shmtx_lock(channel->mutex);
// remove subscribers if any
if (channel->subscribers > 0) {
// find the current worker
......@@ -104,14 +106,12 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, channel_worker_queue);
ngx_http_push_stream_subscriber_t *subscriber = subscription->subscriber;
ngx_shmtx_lock(&shpool->mutex);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(channel->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(worker->subscribers);
// remove the subscription for the channel from subscriber
ngx_queue_remove(&subscription->queue);
// remove the subscription for the channel from worker
ngx_queue_remove(&subscription->channel_worker_queue);
ngx_shmtx_unlock(&shpool->mutex);
if (subscriber->longpolling) {
ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool);
......@@ -131,25 +131,44 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
}
}
}
ngx_shmtx_unlock(channel->mutex);
}
ngx_shmtx_unlock(&data->channels_to_delete_mutex);
}
ngx_shmtx_lock(&shpool->mutex);
void
ngx_http_push_stream_collect_deleted_channels_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *q;
ngx_uint_t qtd_removed;
ngx_shmtx_lock(&data->channels_to_delete_mutex);
for (q = ngx_queue_head(&data->channels_to_delete); q != ngx_queue_sentinel(&data->channels_to_delete);) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
q = ngx_queue_next(q);
// channel has not subscribers and can be released
// remove all messages
qtd_removed = ngx_http_push_stream_ensure_qtd_of_messages(data, channel, 0, 0);
if (qtd_removed > 0) {
ngx_shmtx_lock(&data->channels_queue_mutex);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed);
ngx_shmtx_unlock(&data->channels_queue_mutex);
}
// channel has no subscribers and can be released
if (channel->subscribers == 0) {
channel->expires = ngx_time() + NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL;
// move the channel to trash queue
ngx_queue_remove(&channel->queue);
ngx_shmtx_lock(&data->channels_trash_mutex);
ngx_queue_insert_tail(&data->channels_trash, &channel->queue);
channel->queue_sentinel = &data->channels_trash;
data->channels_in_trash++;
ngx_shmtx_unlock(&data->channels_trash_mutex);
}
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->channels_to_delete_mutex);
}
......@@ -205,7 +224,7 @@ ngx_uint_t
ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_message, ngx_str_t *text, const ngx_str_t *template, const ngx_str_t *token, ngx_slab_pool_t *shpool, ngx_pool_t *temp_pool)
{
if (text != NULL) {
if ((*dst_value = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + text->len + 1)) == NULL) {
if ((*dst_value = ngx_slab_alloc(shpool, sizeof(ngx_str_t) + text->len + 1)) == NULL) {
return NGX_ERROR;
}
......@@ -219,7 +238,7 @@ ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_
return NGX_ERROR;
}
if (((*dst_message) = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + aux->len)) == NULL) {
if (((*dst_message) = ngx_slab_alloc(shpool, sizeof(ngx_str_t) + aux->len)) == NULL) {
return NGX_ERROR;
}
......@@ -232,7 +251,7 @@ ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_
}
ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_main_conf_t *mcf, u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_conf_t *mcf, u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
{
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *shm_data = mcf->shm_data;
......@@ -240,7 +259,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
ngx_http_push_stream_msg_t *msg;
int i = 0;
if ((msg = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_msg_t))) == NULL) {
if ((msg = ngx_slab_alloc(shpool, sizeof(ngx_http_push_stream_msg_t))) == NULL) {
return NULL;
}
......@@ -258,8 +277,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
msg->qtd_templates = mcf->qtd_templates;
ngx_queue_init(&msg->queue);
if ((msg->raw.data = ngx_slab_alloc_locked(shpool, len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
if ((msg->raw.data = ngx_slab_alloc(shpool, len + 1)) == NULL) {
ngx_http_push_stream_free_message_memory(shpool, msg);
return NULL;
}
......@@ -270,17 +289,17 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
if (ngx_http_push_stream_apply_text_template(&msg->event_id, &msg->event_id_message, event_id, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID, shpool, temp_pool) != NGX_OK) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
ngx_http_push_stream_free_message_memory(shpool, msg);
return NULL;
}
if (ngx_http_push_stream_apply_text_template(&msg->event_type, &msg->event_type_message, event_type, &NGX_HTTP_PUSH_STREAM_EVENTSOURCE_EVENT_TEMPLATE, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE, shpool, temp_pool) != NGX_OK) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
ngx_http_push_stream_free_message_memory(shpool, msg);
return NULL;
}
if ((msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) * msg->qtd_templates)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
if ((msg->formatted_messages = ngx_slab_alloc(shpool, sizeof(ngx_str_t) * msg->qtd_templates)) == NULL) {
ngx_http_push_stream_free_message_memory(shpool, msg);
return NULL;
}
......@@ -292,6 +311,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
ngx_queue_t *lines, *q_line;
if ((lines = ngx_http_push_stream_split_by_crlf(&msg->raw, temp_pool)) == NULL) {
ngx_http_push_stream_free_message_memory(shpool, msg);
return NULL;
}
......@@ -311,7 +331,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
}
if (aux == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
ngx_http_push_stream_free_message_memory(shpool, msg);
return NULL;
}
......@@ -321,8 +341,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_m
}
ngx_str_t *formmated = (msg->formatted_messages + i);
if ((text == NULL) || ((formmated->data = ngx_slab_alloc_locked(shpool, text->len)) == NULL)) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
if ((text == NULL) || ((formmated->data = ngx_slab_alloc(shpool, text->len)) == NULL)) {
ngx_http_push_stream_free_message_memory(shpool, msg);
return NULL;
}
......@@ -342,24 +362,22 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_str
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_msg_t *msg;
ngx_uint_t qtd_removed;
ngx_shmtx_lock(&shpool->mutex);
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, text, len, channel, channel->last_message_id + 1, event_id, event_type, temp_pool);
msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, text, len, channel, channel->last_message_id + 1, event_id, event_type, temp_pool);
if (msg == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate message in shared memory");
return NGX_ERROR;
}
ngx_shmtx_lock(channel->mutex);
channel->last_message_id++;
data->published_messages++;
// tag message with time stamp and a sequence tag
channel->last_message_time = data->last_message_time = msg->time;
channel->last_message_tag = data->last_message_tag = msg->tag;
channel->last_message_time = msg->time;
channel->last_message_tag = msg->tag;
// set message expiration time
msg->expires = msg->time + mcf->message_ttl;
channel->expires = ngx_time() + mcf->channel_inactivity_time;
......@@ -368,13 +386,26 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_str
if (cf->store_messages) {
ngx_queue_insert_tail(&channel->message_queue, &msg->queue);
channel->stored_messages++;
data->stored_messages++;
}
ngx_shmtx_unlock(channel->mutex);
// now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(data, channel, mcf->max_messages_stored_per_channel, 0);
qtd_removed = ngx_http_push_stream_ensure_qtd_of_messages(data, channel, mcf->max_messages_stored_per_channel, 0);
ngx_shmtx_lock(&data->channels_queue_mutex);
data->published_messages++;
if (msg->time >= data->last_message_time) {
data->last_message_time = msg->time;
data->last_message_tag = msg->tag;
}
ngx_shmtx_unlock(&shpool->mutex);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed);
if (cf->store_messages) {
data->stored_messages++;
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
// send an alert to workers
ngx_http_push_stream_broadcast(channel, msg, r->connection->log, mcf);
......@@ -792,7 +823,6 @@ static void
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context);
......@@ -800,11 +830,9 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_
if (mcf->timeout_with_body && (mcf->longpooling_timeout_msg == NULL)) {
// create longpooling timeout message
ngx_shmtx_lock(&shpool->mutex);
if ((mcf->longpooling_timeout_msg == NULL) && (mcf->longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, (u_char *)NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
if ((mcf->longpooling_timeout_msg == NULL) && (mcf->longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, (u_char *) NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate long pooling timeout message in shared memory");
}
ngx_shmtx_unlock(&shpool->mutex);
}
if (mcf->timeout_with_body && (mcf->longpooling_timeout_msg != NULL)) {
......@@ -837,30 +865,29 @@ ngx_http_push_stream_send_websocket_close_frame(ngx_http_request_t *r, ngx_uint_
static ngx_flag_t
ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_pool_t *temp_pool)
{
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *q;
ngx_flag_t deleted = 0;
ngx_shmtx_lock(&shpool->mutex);
ngx_shmtx_lock(&data->channels_queue_mutex);
if ((channel != NULL) && !channel->deleted) {
deleted = 1;
channel->deleted = 1;
(channel->wildcard) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->wildcard_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
// move the channel to unrecoverable queue
// remove channel from tree
ngx_rbtree_delete(&data->tree, &channel->node);
// move the channel to unrecoverable queue
ngx_queue_remove(&channel->queue);
ngx_queue_insert_tail(&data->channels_to_delete, &channel->queue);
channel->queue_sentinel = &data->channels_to_delete;
// remove all messages
ngx_http_push_stream_ensure_qtd_of_messages_locked(data, channel, 0, 0);
ngx_shmtx_lock(&data->channels_to_delete_mutex);
ngx_queue_insert_tail(&data->channels_to_delete, &channel->queue);
ngx_shmtx_unlock(&data->channels_to_delete_mutex);
// apply channel deleted message text to message template
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, text, len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, text, len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&data->channels_queue_mutex);
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to channel deleted message");
return 0;
}
......@@ -876,7 +903,7 @@ ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_h
}
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->channels_queue_mutex);
return deleted;
}
......@@ -897,25 +924,18 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_flag_t forc
void
ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *q;
ngx_http_push_stream_collect_expired_messages_data(data, force);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
ngx_shmtx_lock(&data->channels_queue_mutex);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue);) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
if (channel->queue_sentinel != &data->channels_queue) {
q = &data->channels_queue;
continue;
}
q = ngx_queue_next(q);
if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->expires < ngx_time())) {
// go back one node on queue, since the current node will be removed
q = ngx_queue_prev(q);
ngx_shmtx_lock(&shpool->mutex);
if (!channel->deleted) {
channel->deleted = 1;
channel->expires = ngx_time() + NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL;
(channel->wildcard) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->wildcard_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
......@@ -923,74 +943,79 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_p
// move the channel to trash queue
ngx_rbtree_delete(&data->tree, &channel->node);
ngx_queue_remove(&channel->queue);
ngx_shmtx_lock(&data->channels_trash_mutex);
ngx_queue_insert_tail(&data->channels_trash, &channel->queue);
channel->queue_sentinel = &data->channels_trash;
data->channels_in_trash++;
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->channels_trash_mutex);
}
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
}
static void
ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force)
{
ngx_slab_pool_t *shpool = data->shpool;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *q;
ngx_uint_t qtd_removed;
ngx_shmtx_lock(&shpool->mutex);
ngx_shmtx_lock(&data->channels_queue_mutex);
for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
ngx_http_push_stream_ensure_qtd_of_messages_locked(data, channel, (force) ? 0 : channel->stored_messages, 1);
qtd_removed = ngx_http_push_stream_ensure_qtd_of_messages(data, channel, (force) ? 0 : channel->stored_messages, 1);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed);
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->channels_queue_mutex);
}
static void
ngx_http_push_stream_free_memory_of_expired_channels_locked(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force)
ngx_http_push_stream_free_memory_of_expired_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force)
{
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *cur;
ngx_shmtx_lock(&data->channels_trash_mutex);
while (!ngx_queue_empty(&data->channels_trash)) {
cur = ngx_queue_head(&data->channels_trash);
channel = ngx_queue_data(cur, ngx_http_push_stream_channel_t, queue);
if ((ngx_time() > channel->expires) || force) {
ngx_queue_remove(&channel->queue);
nxg_http_push_stream_free_channel_memory_locked(shpool, channel);
nxg_http_push_stream_free_channel_memory(shpool, channel);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels_in_trash);
} else {
break;
}
}
ngx_shmtx_unlock(&data->channels_trash_mutex);
}
static void
nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel)
nxg_http_push_stream_free_channel_memory(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel)
{
// delete the worker-subscriber queue
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur;
ngx_shmtx_t *mutex = channel->mutex;
if (channel->channel_deleted_message != NULL) ngx_http_push_stream_free_message_memory(shpool, channel->channel_deleted_message);
ngx_shmtx_lock(mutex);
while (!ngx_queue_empty(&channel->workers_with_subscribers)) {
cur = ngx_queue_head(&channel->workers_with_subscribers);
worker = ngx_queue_data(cur, ngx_http_push_stream_pid_queue_t, queue);
ngx_queue_remove(&worker->queue);
ngx_slab_free_locked(shpool, worker);
ngx_slab_free(shpool, worker);
}
if (channel->channel_deleted_message != NULL) ngx_http_push_stream_free_message_memory_locked(shpool, channel->channel_deleted_message);
ngx_slab_free_locked(shpool, channel->id.data);
ngx_slab_free_locked(shpool, channel);
ngx_slab_free(shpool, channel->id.data);
ngx_slab_free(shpool, channel);
ngx_shmtx_unlock(mutex);
}
......@@ -1002,10 +1027,13 @@ ngx_http_push_stream_memory_cleanup(void)
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
ngx_http_push_stream_delete_channels_data(data);
if (ngx_shmtx_trylock(&data->cleanup_mutex)) {
ngx_http_push_stream_collect_deleted_channels_data(data);
ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(data, 0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels(0);
ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(data, 0);
ngx_shmtx_unlock(&data->cleanup_mutex);
}
}
return NGX_OK;
......@@ -1018,9 +1046,13 @@ ngx_http_push_stream_buffer_cleanup(void)
ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
ngx_queue_t *q;
for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
if (ngx_shmtx_trylock(&data->cleanup_mutex)) {
ngx_http_push_stream_collect_expired_messages_data(data, 0);
ngx_shmtx_unlock(&data->cleanup_mutex);
}
}
return NGX_OK;
......@@ -1049,26 +1081,26 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels_data(ngx_http_
ngx_http_push_stream_msg_t *message;
ngx_queue_t *cur;
ngx_shmtx_lock(&shpool->mutex);
ngx_shmtx_lock(&data->messages_trash_mutex);
while (!ngx_queue_empty(&data->messages_trash)) {
cur = ngx_queue_head(&data->messages_trash);
message = ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
if (force || ((message->workers_ref_count <= 0) && (ngx_time() > message->expires))) {
ngx_queue_remove(&message->queue);
ngx_http_push_stream_free_message_memory_locked(shpool, message);
ngx_http_push_stream_free_message_memory(shpool, message);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->messages_in_trash);
} else {
break;
}
}
ngx_http_push_stream_free_memory_of_expired_channels_locked(data, shpool, force);
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->messages_trash_mutex);
ngx_http_push_stream_free_memory_of_expired_channels(data, shpool, force);
}
static void
ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg)
ngx_http_push_stream_free_message_memory(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg)
{
u_int i;
......@@ -1076,6 +1108,7 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
return;
}
ngx_shmtx_lock(&shpool->mutex);
if (msg->formatted_messages != NULL) {
for (i = 0; i < msg->qtd_templates; i++) {
ngx_str_t *formmated = (msg->formatted_messages + i);
......@@ -1093,28 +1126,33 @@ ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_htt
if (msg->event_id_message != NULL) ngx_slab_free_locked(shpool, msg->event_id_message);
if (msg->event_type_message != NULL) ngx_slab_free_locked(shpool, msg->event_type_message);
ngx_slab_free_locked(shpool, msg);
ngx_shmtx_unlock(&shpool->mutex);
}
static void
ngx_http_push_stream_free_worker_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg)
ngx_http_push_stream_free_worker_message_memory(ngx_slab_pool_t *shpool, ngx_http_push_stream_worker_msg_t *worker_msg)
{
ngx_shmtx_lock(&shpool->mutex);
worker_msg->msg->workers_ref_count--;
if ((worker_msg->msg->workers_ref_count <= 0) && worker_msg->msg->deleted) {
worker_msg->msg->expires = ngx_time() + NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL;
}
ngx_queue_remove(&worker_msg->queue);
ngx_slab_free_locked(shpool, worker_msg);
ngx_shmtx_unlock(&shpool->mutex);
}
static void
ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg, ngx_http_push_stream_shm_data_t *data)
ngx_http_push_stream_throw_the_message_away(ngx_http_push_stream_msg_t *msg, ngx_http_push_stream_shm_data_t *data)
{
ngx_shmtx_lock(&data->channels_trash_mutex);
msg->deleted = 1;
msg->expires = ngx_time() + NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL;
ngx_queue_insert_tail(&data->messages_trash, &msg->queue);
data->messages_in_trash++;
ngx_shmtx_unlock(&data->channels_trash_mutex);
}
......@@ -1122,18 +1160,12 @@ static void
ngx_http_push_stream_timer_set(ngx_msec_t timer_interval, ngx_event_t *event, ngx_event_handler_pt event_handler, ngx_flag_t start_timer)
{
if ((timer_interval != NGX_CONF_UNSET_MSEC) && start_timer) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_global_shm_zone->shm.addr;
if (event->handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
if (event->handler == NULL) {
event->handler = event_handler;
event->data = event; //set event as data to avoid error when running on debug mode (on log event)
event->log = ngx_cycle->log;
ngx_http_push_stream_timer_reset(timer_interval, event);
}
ngx_shmtx_unlock(&shpool->mutex);
}
}
}
......@@ -1156,7 +1188,6 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
ngx_http_request_t *r = (ngx_http_request_t *) ev->data;
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_int_t rc = NGX_OK;
if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) {
......@@ -1165,12 +1196,10 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
} else {
if (mcf->ping_msg == NULL) {
ngx_shmtx_lock(&shpool->mutex);
// create ping message
if ((mcf->ping_msg == NULL) && (mcf->ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(mcf, mcf->ping_message_text.data, mcf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
if ((mcf->ping_msg == NULL) && (mcf->ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, mcf->ping_message_text.data, mcf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate ping message in shared memory");
}
ngx_shmtx_unlock(&shpool->mutex);
}
if (mcf->ping_msg != NULL) {
......@@ -1345,11 +1374,8 @@ ngx_http_push_stream_add_request_context(ngx_http_request_t *r)
static void
ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_shmtx_lock(&shpool->mutex);
if (ctx != NULL) {
if ((ctx->disconnect_timer != NULL) && ctx->disconnect_timer->timer_set) {
ngx_del_timer(ctx->disconnect_timer);
......@@ -1365,33 +1391,37 @@ ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r)
}
if (ctx->subscriber != NULL) {
ngx_http_push_stream_worker_subscriber_cleanup_locked(ctx->subscriber);
ngx_http_push_stream_worker_subscriber_cleanup(ctx->subscriber);
ctx->subscriber = NULL;
}
}
ngx_shmtx_unlock(&shpool->mutex);
}
static void
ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber)
ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(worker_subscriber->request, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_queue_t *cur;
while (!ngx_queue_empty(&worker_subscriber->subscriptions)) {
cur = ngx_queue_head(&worker_subscriber->subscriptions);
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, queue);
ngx_shmtx_lock(subscription->channel->mutex);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(subscription->channel->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(subscription->channel_worker_sentinel->subscribers);
ngx_queue_remove(&subscription->channel_worker_queue);
ngx_queue_remove(&subscription->queue);
ngx_shmtx_unlock(subscription->channel->mutex);
}
ngx_shmtx_lock(&shpool->mutex);
ngx_queue_remove(&worker_subscriber->worker_queue);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->ipc[ngx_process_slot].subscribers);
ngx_shmtx_unlock(&shpool->mutex);
}
......@@ -2120,3 +2150,37 @@ ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_poo
return requested_channels;
}
ngx_int_t
ngx_http_push_stream_create_shmtx(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
u_char *file;
#if (NGX_HAVE_ATOMIC_OPS)
file = NULL;
#else
ngx_str_t logs_dir = ngx_string("logs/");
if (ngx_conf_full_name((ngx_cycle_t *) ngx_cycle, &logs_dir, 0) != NGX_OK) {
return NGX_ERROR;
}
file = ngx_pnalloc(ngx_cycle->pool, logs_dir.len + ngx_strlen(name));
if (file == NULL) {
return NGX_ERROR;
}
(void) ngx_sprintf(file, "%V%s%Z", &logs_dir, name);
#endif
if (ngx_shmtx_create(mtx, addr, file) != NGX_OK) {
return NGX_ERROR;
}
return NGX_OK;
}
......@@ -38,7 +38,6 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
#endif
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *requested_channels, *requested_channel;
ngx_queue_t *q;
......@@ -46,7 +45,6 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_int_t tag;
time_t if_modified_since;
ngx_str_t *last_event_id = NULL;
ngx_int_t rc;
ngx_int_t status_code;
ngx_str_t *explain_error_message;
ngx_str_t *upgrade_header, *connection_header, *sec_key_header, *sec_version_header, *sec_accept_header;
......@@ -131,11 +129,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
}
ngx_shmtx_lock(&shpool->mutex);
rc = ngx_http_push_stream_registry_subscriber_locked(r, worker_subscriber);
ngx_shmtx_unlock(&shpool->mutex);
if (rc == NGX_ERROR) {
if (ngx_http_push_stream_registry_subscriber(r, worker_subscriber) == NGX_ERROR) {
return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
}
......
......@@ -76,7 +76,6 @@ ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbt
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_channel_t *channel = NULL;
......@@ -85,12 +84,9 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_s
return NULL;
}
ngx_shmtx_lock(&shpool->mutex);
ngx_shmtx_lock(&data->channels_queue_mutex);
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
ngx_shmtx_unlock(&shpool->mutex);
if ((channel == NULL) || channel->deleted) {
return NULL;
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
return channel;
}
......@@ -110,12 +106,12 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
return NULL;
}
ngx_shmtx_lock(&shpool->mutex);
ngx_shmtx_lock(&data->channels_queue_mutex);
// check again to see if any other worker didn't create the channel
channel = ngx_http_push_stream_find_channel_on_tree(id, log, &data->tree);
if (channel != NULL) { // we found our channel
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->channels_queue_mutex);
return channel;
}
......@@ -125,18 +121,21 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
if (((!is_wildcard_channel) && (mcf->max_number_of_channels != NGX_CONF_UNSET_UINT) && (mcf->max_number_of_channels == data->channels)) ||
((is_wildcard_channel) && (mcf->max_number_of_wildcard_channels != NGX_CONF_UNSET_UINT) && (mcf->max_number_of_wildcard_channels == data->wildcard_channels))) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_shmtx_unlock(&data->channels_queue_mutex);
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: number of channels were exceeded");
return NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED;
}
if ((channel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_channel_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
if ((channel = ngx_slab_alloc(shpool, sizeof(ngx_http_push_stream_channel_t))) == NULL) {
ngx_shmtx_unlock(&data->channels_queue_mutex);
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate memory for new channel");
return NULL;
}
if ((channel->id.data = ngx_slab_alloc_locked(shpool, id->len + 1)) == NULL) {
ngx_slab_free_locked(shpool, channel);
ngx_shmtx_unlock(&shpool->mutex);
if ((channel->id.data = ngx_slab_alloc(shpool, id->len + 1)) == NULL) {
ngx_slab_free(shpool, channel);
ngx_shmtx_unlock(&data->channels_queue_mutex);
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate memory for new channel id");
return NULL;
}
......@@ -160,10 +159,11 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
channel->node.key = ngx_crc32_short(channel->id.data, channel->id.len);
ngx_rbtree_insert(&data->tree, &channel->node);
ngx_queue_insert_tail(&data->channels_queue, &channel->queue);
channel->queue_sentinel = &data->channels_queue;
(channel->wildcard) ? data->wildcard_channels++ : data->channels++;
ngx_shmtx_unlock(&shpool->mutex);
channel->mutex = &data->channels_mutex[data->mutex_round_robin++ % 9];
ngx_shmtx_unlock(&data->channels_queue_mutex);
return channel;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment