/*
 * This file is distributed under the MIT License.
 *
 * Copyright (c) 2009 Leo Ponomarev
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use,
 * copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the
 * Software is furnished to do so, subject to the following
 * conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
 * OTHER DEALINGS IN THE SOFTWARE.
 *
 *
 * ngx_http_push_stream_module_ipc.c
 *
 * Modified: Oct 26, 2010
 * Modifications by: Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com>
 */

#include <ngx_http_push_stream_module_ipc.h>

void ngx_http_push_stream_ipc_init_worker_data(ngx_http_push_stream_shm_data_t *data);
static ngx_inline void ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_data_t *data);
static ngx_inline void ngx_http_push_stream_process_worker_message_data(ngx_http_push_stream_shm_data_t *data);


static ngx_int_t
ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers)
{
    int         i, s = 0, on = 1;
    ngx_int_t   last_expected_process = ngx_last_process;


    /*
     * here's the deal: we have no control over fork()ing, nginx's internal
     * socketpairs are unusable for our purposes (as of nginx 0.8 -- check the
     * code to see why), and the module initialization callbacks occur before
     * any workers are spawned. Rather than futzing around with existing
     * socketpairs, we populate our own socketpairs array.
     * Trouble is, ngx_spawn_process() creates them one-by-one, and we need to
     * do it all at once. So we must guess all the workers' ngx_process_slots in
     * advance. Meaning the spawning logic must be copied to the T.
     */

    for(i=0; i<workers; i++) {
        while (s < last_expected_process && ngx_processes[s].pid != NGX_INVALID_FILE) {
            // find empty existing slot
            s++;
        }

        // copypaste from os/unix/ngx_process.c (ngx_spawn_process)
        ngx_socket_t    *socks = ngx_http_push_stream_socketpairs[s];
        if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed on socketpair while initializing push stream module");
            return NGX_ERROR;
        }
        if (ngx_nonblocking(socks[0]) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed on socketpair while initializing push stream module");
            ngx_close_channel(socks, cycle->log);
            return NGX_ERROR;
        }
        if (ngx_nonblocking(socks[1]) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed on socketpair while initializing push stream module");
            ngx_close_channel(socks, cycle->log);
            return NGX_ERROR;
        }
        if (ioctl(socks[0], FIOASYNC, &on) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed on socketpair while initializing push stream module");
            ngx_close_channel(socks, cycle->log);
            return NGX_ERROR;
        }
        if (fcntl(socks[0], F_SETOWN, ngx_pid) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed on socketpair while initializing push stream module");
            ngx_close_channel(socks, cycle->log);
            return NGX_ERROR;
        }
        if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed on socketpair while initializing push stream module");
            ngx_close_channel(socks, cycle->log);
            return NGX_ERROR;
        }
        if (fcntl(socks[1], F_SETFD, FD_CLOEXEC) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while initializing push stream module");
            ngx_close_channel(socks, cycle->log);
            return NGX_ERROR;
        }

        s++; // NEXT!!
    }

    return NGX_OK;
}


static void
ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle)
{
    ngx_close_channel((ngx_socket_t *) ngx_http_push_stream_socketpairs[ngx_process_slot], cycle->log);
}


// will be called many times
static ngx_int_t
ngx_http_push_stream_ipc_init_worker(void)
{
    ngx_slab_pool_t                        *global_shpool = (ngx_slab_pool_t *) ngx_http_push_stream_global_shm_zone->shm.addr;
    ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
    ngx_queue_t                            *q;
    int                                     i;

    ngx_shmtx_lock(&global_shpool->mutex);
    global_data->pid[ngx_process_slot] = ngx_pid;
    for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
        ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
        ngx_http_push_stream_ipc_init_worker_data(data);
    }
    ngx_shmtx_unlock(&global_shpool->mutex);

    for(i = 0; i < NGX_MAX_PROCESSES; i++) {
        if (global_data->pid[i] > 0) {
            ngx_http_push_stream_alert_worker_census_subscribers(global_data->pid[i], i, ngx_cycle->log);
        }
    }

    return NGX_OK;
}


void
ngx_http_push_stream_ipc_init_worker_data(ngx_http_push_stream_shm_data_t *data)
{
    ngx_slab_pool_t                        *shpool = data->shpool;
    int                                     i;

    // cleanning old content if worker die and another one is set on same slot
    ngx_http_push_stream_clean_worker_data(data);

    ngx_shmtx_lock(&shpool->mutex);

    data->ipc[ngx_process_slot].pid = ngx_pid;
    data->ipc[ngx_process_slot].startup = ngx_time();

    data->slots_for_census = 0;
    for(i = 0; i < NGX_MAX_PROCESSES; i++) {
        if (data->ipc[i].pid > 0) {
            data->slots_for_census++;
        }
    }

    ngx_shmtx_unlock(&shpool->mutex);
}


static void
ngx_http_push_stream_alert_shutting_down_workers(void)
{
    ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
    int                                     i;

    for(i = 0; i < NGX_MAX_PROCESSES; i++) {
        if (global_data->pid[i] > 0) {
            ngx_http_push_stream_alert_worker_shutting_down_cleanup(global_data->pid[i], i, ngx_cycle->log);
            ngx_close_channel((ngx_socket_t *) ngx_http_push_stream_socketpairs[i], ngx_cycle->log);
            ngx_http_push_stream_socketpairs[i][0] = NGX_INVALID_FILE;
            ngx_http_push_stream_socketpairs[i][1] = NGX_INVALID_FILE;
        }
    }
}


static ngx_int_t
ngx_http_push_stream_unsubscribe_worker(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
    ngx_http_push_stream_pid_queue_t        *worker;
    ngx_queue_t                             *q;

    ngx_shmtx_lock(channel->mutex);
    for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
        worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
        if ((worker->pid == ngx_pid) || (worker->slot == ngx_process_slot)) {
            ngx_queue_remove(&worker->queue);
            ngx_slab_free(shpool, worker);
            break;
        }
    }
    ngx_shmtx_unlock(channel->mutex);

    return NGX_OK;
}


static void
ngx_http_push_stream_clean_worker_data(ngx_http_push_stream_shm_data_t *data)
{
    ngx_slab_pool_t                        *shpool = data->shpool;
    ngx_queue_t                            *cur, *q;
    ngx_http_push_stream_channel_t         *channel;
    ngx_http_push_stream_worker_msg_t      *worker_msg;

    while (!ngx_queue_empty(&data->ipc[ngx_process_slot].messages_queue)) {
        cur = ngx_queue_head(&data->ipc[ngx_process_slot].messages_queue);
        worker_msg = ngx_queue_data(cur, ngx_http_push_stream_worker_msg_t, queue);
        ngx_http_push_stream_free_worker_message_memory(shpool, worker_msg);
    }

    ngx_queue_init(&data->ipc[ngx_process_slot].subscribers_queue);

    ngx_shmtx_lock(&data->channels_queue_mutex);
    for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
        channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
        ngx_http_push_stream_unsubscribe_worker(channel, shpool);
    }
    ngx_shmtx_unlock(&data->channels_queue_mutex);

    data->ipc[ngx_process_slot].pid = NGX_INVALID_FILE;
    data->ipc[ngx_process_slot].subscribers = 0;
}


static ngx_int_t
ngx_http_push_stream_register_worker_message_handler(ngx_cycle_t *cycle)
{
    if (ngx_add_channel_event(cycle, ngx_http_push_stream_socketpairs[ngx_process_slot][1], NGX_READ_EVENT, ngx_http_push_stream_channel_handler) == NGX_ERROR) {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "failed to register channel handler while initializing push stream module worker");
        return NGX_ERROR;
    }

    return NGX_OK;
}


static void
ngx_http_push_stream_channel_handler(ngx_event_t *ev)
{
    // copypaste from os/unix/ngx_process_cycle.c (ngx_channel_handler)
    ngx_int_t           n;
    ngx_channel_t       ch;
    ngx_connection_t   *c;


    if (ev->timedout) {
        ev->timedout = 0;
        return;
    }
    c = ev->data;

    while (1) {
        n = ngx_read_channel(c->fd, &ch, sizeof(ch), ev->log);
        if (n == NGX_ERROR) {
            if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
                ngx_del_conn(c, 0);
            }
            ngx_close_connection(c);
            return;
        }

        if ((ngx_event_flags & NGX_USE_EVENTPORT_EVENT) && (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR)) {
            return;
        }

        if (n == NGX_AGAIN) {
            return;
        }

        if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES.command) {
            ngx_http_push_stream_process_worker_message();
        } else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS.command) {
            ngx_http_push_stream_census_worker_subscribers();
        } else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL.command) {
            ngx_http_push_stream_delete_worker_channel();
        } else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CLEANUP_SHUTTING_DOWN.command) {
            ngx_http_push_stream_cleanup_shutting_down_worker();
        }
    }
}


static ngx_int_t
ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command)
{
    if (ngx_http_push_stream_socketpairs[slot][0] != NGX_INVALID_FILE) {
        return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &command, sizeof(ngx_channel_t), log);
    }
    return NGX_OK;
}


static ngx_inline void
ngx_http_push_stream_census_worker_subscribers(void)
{
    ngx_slab_pool_t                        *global_shpool = (ngx_slab_pool_t *) ngx_http_push_stream_global_shm_zone->shm.addr;
    ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
    ngx_queue_t                            *q;

    ngx_shmtx_lock(&global_shpool->mutex);
    for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
        ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
        ngx_http_push_stream_census_worker_subscribers_data(data);
    }
    ngx_shmtx_unlock(&global_shpool->mutex);
}

static ngx_inline void
ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_data_t *data)
{
    ngx_slab_pool_t                             *shpool = data->shpool;
    ngx_http_push_stream_worker_data_t          *thisworker_data = &data->ipc[ngx_process_slot];
    ngx_queue_t                                 *q, *cur, *cur_worker;
    int                                          i;


    thisworker_data->subscribers = 0;

    ngx_shmtx_lock(&data->channels_queue_mutex);
    for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
        ngx_http_push_stream_channel_t *channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
        ngx_shmtx_lock(channel->mutex);
        for (cur_worker = ngx_queue_head(&channel->workers_with_subscribers); cur_worker != ngx_queue_sentinel(&channel->workers_with_subscribers); cur_worker = ngx_queue_next(cur_worker)) {
            ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
            if (worker->pid == ngx_pid) {
                worker->subscribers = 0;
            }
        }
        ngx_shmtx_unlock(channel->mutex);
    }
    ngx_shmtx_unlock(&data->channels_queue_mutex);

    for (q = ngx_queue_head(&thisworker_data->subscribers_queue); q != ngx_queue_sentinel(&thisworker_data->subscribers_queue); q = ngx_queue_next(q)) {
        ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(q, ngx_http_push_stream_subscriber_t, worker_queue);

        for (cur = ngx_queue_head(&subscriber->subscriptions); cur != ngx_queue_sentinel(&subscriber->subscriptions); cur = ngx_queue_next(cur)) {
            ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, queue);
            subscription->channel_worker_sentinel->subscribers++;
        }
        thisworker_data->subscribers++;
    }

    ngx_shmtx_lock(&shpool->mutex);
    data->slots_for_census--;
    ngx_shmtx_unlock(&shpool->mutex);

    if (data->slots_for_census == 0) {
        ngx_shmtx_lock(&shpool->mutex);
        data->subscribers = 0;
        for (i = 0; i < NGX_MAX_PROCESSES; i++) {
            if (data->ipc[i].pid > 0) {
                data->subscribers += data->ipc[i].subscribers;
            }
        }
        ngx_shmtx_unlock(&shpool->mutex);

        ngx_shmtx_lock(&data->channels_queue_mutex);
        for (q = ngx_queue_head(&data->channels_queue); q != ngx_queue_sentinel(&data->channels_queue); q = ngx_queue_next(q)) {
            ngx_http_push_stream_channel_t *channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
            ngx_shmtx_lock(channel->mutex);
            channel->subscribers = 0;
            for (cur_worker = ngx_queue_head(&channel->workers_with_subscribers); cur_worker != ngx_queue_sentinel(&channel->workers_with_subscribers); cur_worker = ngx_queue_next(cur_worker)) {
                ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
                channel->subscribers += worker->subscribers;
            }
            ngx_shmtx_unlock(channel->mutex);
        }
        ngx_shmtx_unlock(&data->channels_queue_mutex);
    }


}


static ngx_inline void
ngx_http_push_stream_process_worker_message(void)
{
    ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
    ngx_queue_t                            *q;

    for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
        ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
        ngx_http_push_stream_process_worker_message_data(data);
    }
}


static ngx_inline void
ngx_http_push_stream_process_worker_message_data(ngx_http_push_stream_shm_data_t *data)
{
    ngx_http_push_stream_worker_msg_t      *worker_msg;
    ngx_queue_t                            *cur, *q;
    ngx_slab_pool_t                        *shpool = data->shpool;
    ngx_http_push_stream_worker_data_t     *thisworker_data = data->ipc + ngx_process_slot;


    while (!ngx_queue_empty(&thisworker_data->messages_queue)) {
        cur = ngx_queue_head(&thisworker_data->messages_queue);
        worker_msg = ngx_queue_data(cur, ngx_http_push_stream_worker_msg_t, queue);
        if (worker_msg->pid == ngx_pid) {
            // everything is okay
            ngx_http_push_stream_respond_to_subscribers(worker_msg->channel, worker_msg->subscriptions_sentinel, worker_msg->msg);
        } else {
            // that's quite bad you see. a previous worker died with an undelivered message.
            // but all its subscribers' connections presumably got canned, too. so it's not so bad after all.

            ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: worker %i intercepted a message intended for another worker process (%i) that probably died and will remove the reference to the old worker", ngx_pid, worker_msg->pid);

            // delete that invalid sucker
            ngx_shmtx_lock(worker_msg->channel->mutex);
            for (q = ngx_queue_head(&worker_msg->channel->workers_with_subscribers); q != ngx_queue_sentinel(&worker_msg->channel->workers_with_subscribers); q = ngx_queue_next(q)) {
                ngx_http_push_stream_pid_queue_t *worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
                if (worker->pid == worker_msg->pid) {
                    ngx_queue_remove(&worker->queue);
                    ngx_slab_free(shpool, worker);
                    break;
                }
            }
            ngx_shmtx_unlock(worker_msg->channel->mutex);
        }

        // free worker_msg already sent
        ngx_http_push_stream_free_worker_message_memory(shpool, worker_msg);
    }
}


static ngx_int_t
ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_flag_t *queue_was_empty, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
    ngx_slab_pool_t                         *shpool = mcf->shpool;
    ngx_http_push_stream_worker_data_t      *thisworker_data = mcf->shm_data->ipc + worker_slot;
    ngx_http_push_stream_worker_msg_t       *newmessage;

    ngx_shmtx_lock(&shpool->mutex);
    if ((newmessage = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_worker_msg_t))) == NULL) {
        ngx_shmtx_unlock(&shpool->mutex);
        ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate worker message, pid: %P, slot: %d", pid, worker_slot);
        return NGX_ERROR;
    }

    msg->workers_ref_count++;
    newmessage->msg = msg;
    newmessage->pid = pid;
    newmessage->subscriptions_sentinel = subscriptions_sentinel;
    newmessage->channel = channel;
    newmessage->mcf = mcf;
    *queue_was_empty = ngx_queue_empty(&thisworker_data->messages_queue);
    ngx_queue_insert_tail(&thisworker_data->messages_queue, &newmessage->queue);
    ngx_shmtx_unlock(&shpool->mutex);

    return NGX_OK;
}


static void
ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
{
    // subscribers are queued up in a local pool. Queue heads, however, are located
    // in shared memory, identified by pid.
    ngx_http_push_stream_pid_queue_t        *worker;
    ngx_queue_t                             *q;
    ngx_flag_t                               queue_was_empty[NGX_MAX_PROCESSES];

    ngx_shmtx_lock(channel->mutex);
    for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
        worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
        ngx_http_push_stream_send_worker_message(channel, &worker->subscriptions, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log, mcf);
    }
    ngx_shmtx_unlock(channel->mutex);

    for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
        worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
        // interprocess communication breakdown
        if (queue_was_empty[worker->slot] && (ngx_http_push_stream_alert_worker_check_messages(worker->pid, worker->slot, log) != NGX_OK)) {
            ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", worker->pid, worker->slot);
        }
    }

    if (ngx_queue_empty(&msg->queue)) {
        ngx_http_push_stream_throw_the_message_away(msg, mcf->shm_data);
    }
}

static ngx_int_t
ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_queue_t *subscriptions, ngx_http_push_stream_msg_t *msg)
{
    ngx_queue_t      *q;

    if (subscriptions == NULL) {
        return NGX_ERROR;
    }

    if (msg != NULL) {

        // now let's respond to some requests!
        for (q = ngx_queue_head(subscriptions); q != ngx_queue_sentinel(subscriptions);) {
            ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, channel_worker_queue);
            q = ngx_queue_next(q);
            ngx_http_push_stream_subscriber_t *subscriber = subscription->subscriber;
            if (subscriber->longpolling) {
                ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->request->pool);
                ngx_http_send_header(subscriber->request);

                ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
                ngx_http_push_stream_send_response_message(subscriber->request, channel, msg, 1, 0);
                ngx_http_push_stream_send_response_finalize(subscriber->request);
            } else {
                if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg, 0, 0) != NGX_OK) {
                    ngx_http_push_stream_send_response_finalize(subscriber->request);
                } else {
                    ngx_http_push_stream_module_ctx_t     *ctx = ngx_http_get_module_ctx(subscriber->request, ngx_http_push_stream_module);
                    ngx_http_push_stream_loc_conf_t       *pslcf = ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module);
                    ngx_http_push_stream_timer_reset(pslcf->ping_message_interval, ctx->ping_timer);
                }
            }
        }
    }

    return NGX_OK;
}