Commit de6be3d0 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

only send alert worker check messages signal to worker which was with empty queue

parent e72b0828
......@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("a025280b90abdab219dd1718c61fcc6a0ad56c3d");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("e72b082850848b16264d4096641fc8f1a639d330");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
......@@ -345,9 +345,7 @@ ngx_http_push_stream_send_worker_message_locked(ngx_http_push_stream_channel_t *
newmessage->pid = pid;
newmessage->subscribers_sentinel = subscribers_sentinel;
newmessage->channel = channel;
if (ngx_queue_empty(&thisworker_data->messages_queue->queue)) {
*queue_was_empty = 1;
}
*queue_was_empty = ngx_queue_empty(&thisworker_data->messages_queue->queue);
ngx_queue_insert_tail(&thisworker_data->messages_queue->queue, &newmessage->queue);
return NGX_OK;
......@@ -362,21 +360,19 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *cur = sentinel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_flag_t queue_was_empty = 0;
ngx_flag_t queue_was_empty[NGX_MAX_PROCESSES];
ngx_shmtx_lock(&shpool->mutex);
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_http_push_stream_send_worker_message_locked(channel, &cur->subscribers_sentinel, cur->pid, cur->slot, msg, &queue_was_empty, log);
ngx_http_push_stream_send_worker_message_locked(channel, &cur->subscribers_sentinel, cur->pid, cur->slot, msg, &queue_was_empty[cur->slot], log);
}
ngx_shmtx_unlock(&shpool->mutex);
if (queue_was_empty) {
cur = sentinel;
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
// interprocess communication breakdown
if (ngx_http_push_stream_alert_worker_check_messages(cur->pid, cur->slot, log) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", cur->pid, cur->slot);
}
cur = sentinel;
while (((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) && queue_was_empty[cur->slot]) {
// interprocess communication breakdown
if (ngx_http_push_stream_alert_worker_check_messages(cur->pid, cur->slot, log) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", cur->pid, cur->slot);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment