Commit 37db1b56 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding a reference count to the message to avoid discard it before be processed for all workers

parent af02c91b
...@@ -99,6 +99,7 @@ typedef struct { ...@@ -99,6 +99,7 @@ typedef struct {
ngx_str_t *event_id; ngx_str_t *event_id;
ngx_str_t *event_id_message; ngx_str_t *event_id_message;
ngx_str_t *formatted_messages; ngx_str_t *formatted_messages;
ngx_int_t workers_ref_count;
} ngx_http_push_stream_msg_t; } ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t; typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t;
......
...@@ -52,11 +52,11 @@ static ngx_int_t ngx_http_push_stream_register_worker_message_handler(ngx_cyc ...@@ -52,11 +52,11 @@ static ngx_int_t ngx_http_push_stream_register_worker_message_handler(ngx_cyc
static void ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log); static void ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command); static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command);
#define ngx_http_push_stream_alert_worker_check_messages(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES); #define ngx_http_push_stream_alert_worker_check_messages(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES)
#define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS); #define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS)
#define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL); #define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL)
static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_log_t *log); static ngx_int_t ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers); static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle); static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
......
...@@ -316,6 +316,10 @@ ngx_http_push_stream_process_worker_message(void) ...@@ -316,6 +316,10 @@ ngx_http_push_stream_process_worker_message(void)
// free worker_msg already sent // free worker_msg already sent
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
worker_msg->msg->workers_ref_count--;
if ((worker_msg->msg->workers_ref_count <= 0) && worker_msg->msg->deleted) {
worker_msg->msg->expires = ngx_time() + ngx_http_push_stream_module_main_conf->shm_cleanup_objects_ttl;
}
ngx_queue_remove(&worker_msg->queue); ngx_queue_remove(&worker_msg->queue);
ngx_slab_free_locked(shpool, worker_msg); ngx_slab_free_locked(shpool, worker_msg);
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
...@@ -324,30 +328,28 @@ ngx_http_push_stream_process_worker_message(void) ...@@ -324,30 +328,28 @@ ngx_http_push_stream_process_worker_message(void)
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_log_t *log) ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_queue_elem_t *subscribers_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log)
{ {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc; ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + worker_slot; ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + worker_slot;
ngx_http_push_stream_worker_msg_t *newmessage; ngx_http_push_stream_worker_msg_t *newmessage;
ngx_shmtx_lock(&shpool->mutex);
if ((newmessage = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL) { if ((newmessage = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex); ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate worker message, pid: %P, slot: %d", pid, worker_slot);
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate worker message");
return NGX_ERROR; return NGX_ERROR;
} }
msg->workers_ref_count++;
newmessage->msg = msg; newmessage->msg = msg;
newmessage->pid = pid; newmessage->pid = pid;
newmessage->subscribers_sentinel = subscribers_sentinel; newmessage->subscribers_sentinel = subscribers_sentinel;
newmessage->channel = channel; newmessage->channel = channel;
if (ngx_queue_empty(&thisworker_data->messages_queue->queue)) {
*queue_was_empty = 1;
}
ngx_queue_insert_tail(&thisworker_data->messages_queue->queue, &newmessage->queue); ngx_queue_insert_tail(&thisworker_data->messages_queue->queue, &newmessage->queue);
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK; return NGX_OK;
} }
...@@ -360,16 +362,21 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http ...@@ -360,16 +362,21 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers; ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *cur = sentinel; ngx_http_push_stream_pid_queue_t *cur = sentinel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_flag_t queue_was_empty = 0;
ngx_shmtx_lock(&shpool->mutex);
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
pid_t worker_pid = cur->pid; ngx_http_push_stream_send_worker_message_locked(channel, &cur->subscribers_sentinel, cur->pid, cur->slot, msg, &queue_was_empty, log);
ngx_int_t worker_slot = cur->slot; }
ngx_shmtx_unlock(&shpool->mutex);
// interprocess communication breakdown if (queue_was_empty) {
if (ngx_http_push_stream_send_worker_message(channel, &cur->subscribers_sentinel, worker_pid, worker_slot, msg, log) != NGX_ERROR) { cur = sentinel;
ngx_http_push_stream_alert_worker_check_messages(worker_pid, worker_slot, log); while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
} else { // interprocess communication breakdown
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with some other worker process"); if (ngx_http_push_stream_alert_worker_check_messages(cur->pid, cur->slot, log) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", cur->pid, cur->slot);
}
} }
} }
......
...@@ -213,6 +213,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -213,6 +213,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->queue.prev = NULL; msg->queue.prev = NULL;
msg->queue.next = NULL; msg->queue.next = NULL;
msg->id = id; msg->id = id;
msg->workers_ref_count = 0;
if ((msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t)*ngx_http_push_stream_module_main_conf->qtd_templates)) == NULL) { if ((msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t)*ngx_http_push_stream_module_main_conf->qtd_templates)) == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg); ngx_http_push_stream_free_message_memory_locked(shpool, msg);
...@@ -700,7 +701,7 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for ...@@ -700,7 +701,7 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
cur = &data->messages_to_delete; cur = &data->messages_to_delete;
while ((cur = (ngx_http_push_stream_msg_t *)ngx_queue_next(&cur->queue)) != &data->messages_to_delete) { while ((cur = (ngx_http_push_stream_msg_t *)ngx_queue_next(&cur->queue)) != &data->messages_to_delete) {
if ((ngx_time() > cur->expires) || force) { if (force || ((cur->workers_ref_count <= 0) && (ngx_time() > cur->expires))) {
prev = (ngx_http_push_stream_msg_t *)ngx_queue_prev(&cur->queue); prev = (ngx_http_push_stream_msg_t *)ngx_queue_prev(&cur->queue);
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
ngx_http_push_stream_free_message_memory_locked(shpool, cur); ngx_http_push_stream_free_message_memory_locked(shpool, cur);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment