/*
 * Copyright (C) 2010-2015 Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com>
 *
 * This file is part of Nginx Push Stream Module.
 *
 * Nginx Push Stream Module is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Nginx Push Stream Module is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Nginx Push Stream Module.  If not, see <http://www.gnu.org/licenses/>.
 *
 *
 * ngx_http_push_stream_module_websocket.c
 *
 * Created: Oct 20, 2011
 * Authors: Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com>
 */

#include <ngx_http_push_stream_module_websocket.h>

ngx_str_t *ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool);
ngx_int_t  ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len);
void       ngx_http_push_stream_set_buffer(ngx_buf_t *buf, u_char *start, u_char *last, ssize_t len);

static ngx_int_t
ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
{
#if !(NGX_HAVE_SHA1)
    ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: sha1 support is needed to use WebSocket");
    return NGX_OK;
#endif
    ngx_http_push_stream_main_conf_t               *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
    ngx_http_push_stream_loc_conf_t                *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
    ngx_http_push_stream_subscriber_t              *worker_subscriber;
    ngx_http_push_stream_requested_channel_t       *requested_channels, *requested_channel;
    ngx_queue_t                                    *q;
    ngx_http_push_stream_module_ctx_t              *ctx;
    ngx_int_t                                       tag;
    time_t                                          if_modified_since;
    ngx_str_t                                      *last_event_id = NULL;
    ngx_int_t                                       status_code;
    ngx_str_t                                      *explain_error_message;
    ngx_str_t                                      *upgrade_header, *connection_header, *sec_key_header, *sec_version_header, *sec_accept_header;
    ngx_int_t                                       version;

    // WebSocket connections must not use keepalive
    r->keepalive = 0;

    // only accept GET method
    if (!(r->method & NGX_HTTP_GET)) {
        ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET);
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
    }

    ngx_http_push_stream_set_expires(r, NGX_HTTP_PUSH_STREAM_EXPIRES_EPOCH, 0);

    upgrade_header = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_UPGRADE);
    connection_header = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_CONNECTION);
    sec_key_header = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_KEY);
    sec_version_header = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_VERSION);

    if ((upgrade_header == NULL) || (connection_header == NULL) || (sec_key_header == NULL) || (sec_version_header == NULL)) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: %V", &NGX_HTTP_PUSH_STREAM_NO_MANDATORY_HEADERS_MESSAGE);
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_MANDATORY_HEADERS_MESSAGE);
    }

    version = ngx_atoi(sec_version_header->data, sec_version_header->len);
    if ((version != NGX_HTTP_PUSH_STREAM_WEBSOCKET_VERSION_8) && (version != NGX_HTTP_PUSH_STREAM_WEBSOCKET_VERSION_13)) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: version: %d %V", version, &NGX_HTTP_PUSH_STREAM_WRONG_WEBSOCKET_VERSION_MESSAGE);
        ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_VERSION, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_SUPPORTED_VERSIONS);
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_WRONG_WEBSOCKET_VERSION_MESSAGE);
    }

    if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    if ((ctx->frame = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_frame_t))) == NULL) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create frame structure");
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }
    ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP;
    ctx->frame->last = NULL;
    ctx->frame->payload = NULL;

    if ((sec_accept_header = ngx_http_push_stream_generate_websocket_accept_value(r, sec_key_header, ctx->temp_pool)) == NULL) {
        ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: could not generate security accept header value");
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_UPGRADE, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_UPGRADE);
    ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_CONNECTION, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_CONNECTION);
    ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_ACCEPT, sec_accept_header);
    r->headers_out.status_line = NGX_HTTP_PUSH_STREAM_101_STATUS_LINE;

    ngx_http_push_stream_send_only_added_headers(r);

    //get channels ids and backtracks from path
    requested_channels = ngx_http_push_stream_parse_channels_ids_from_path(r, ctx->temp_pool);
    if ((requested_channels == NULL) || ngx_queue_empty(&requested_channels->queue)) {
        ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the push_stream_channels_path is required but is not set");
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
    }

    //validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
    if (ngx_http_push_stream_validate_channels(r, requested_channels, &status_code, &explain_error_message) == NGX_ERROR) {
        return ngx_http_push_stream_send_websocket_close_frame(r, status_code, explain_error_message);
    }

    // get control values
    ngx_http_push_stream_get_last_received_message_values(r, &if_modified_since, &tag, &last_event_id);

    // stream access
    if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
        return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
    }

    // sending response content header
    if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: could not send content header to subscriber");
        return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
    }

    if (ngx_http_push_stream_registry_subscriber(r, worker_subscriber) == NGX_ERROR) {
        return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
    }

    // adding subscriber to channel(s) and send backtrack messages
    for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
        requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
        if (ngx_http_push_stream_subscriber_assign_channel(mcf, cf, r, requested_channel, if_modified_since, tag, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
            return ngx_http_push_stream_send_websocket_close_frame(r, NGX_HTTP_INTERNAL_SERVER_ERROR, &NGX_HTTP_PUSH_STREAM_EMPTY);
        }
    }

    if (ctx->temp_pool != NULL) {
        ngx_destroy_pool(ctx->temp_pool);
        ctx->temp_pool = NULL;
    }
    return NGX_DONE;
}


ngx_str_t *
ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool)
{
#if (NGX_HAVE_SHA1)
    ngx_str_t    *sha1_signed, *accept_value;
    ngx_sha1_t   sha1;

    sha1_signed = ngx_http_push_stream_create_str(temp_pool, NGX_HTTP_PUSH_STREAM_WEBSOCKET_SHA1_SIGNED_HASH_LENGTH);
    accept_value = ngx_http_push_stream_create_str(r->pool, ngx_base64_encoded_length(NGX_HTTP_PUSH_STREAM_WEBSOCKET_SHA1_SIGNED_HASH_LENGTH));

    if ((sha1_signed == NULL) || (accept_value == NULL)) {
        return NULL;
    }

    ngx_sha1_init(&sha1);
    ngx_sha1_update(&sha1, sec_key->data, sec_key->len);
    ngx_sha1_update(&sha1, NGX_HTTP_PUSH_STREAM_WEBSOCKET_SIGN_KEY.data, NGX_HTTP_PUSH_STREAM_WEBSOCKET_SIGN_KEY.len);
    ngx_sha1_final(sha1_signed->data, &sha1);

    ngx_encode_base64(accept_value, sha1_signed);

    return accept_value;
#else
    return NULL;
#endif
}


void
ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
{
    ngx_http_push_stream_main_conf_t  *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
    ngx_http_push_stream_loc_conf_t   *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
    ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
    ngx_int_t                          rc = NGX_OK;
    ngx_event_t                       *rev;
    ngx_connection_t                  *c;
    uint64_t                           i;
    ngx_buf_t                          buf;
    ngx_queue_t                       *q;

    ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, ctx->frame->last, 8);

    c = r->connection;
    rev = c->read;

    for (;;) {
        if (c->error || c->timedout || c->close || c->destroyed || rev->closed || rev->eof) {
            goto finalize;
        }

        switch (ctx->frame->step) {
            case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP:
                //reading frame header
                if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 2)) != NGX_OK) {
                    goto exit;
                }

                ctx->frame->fin  = (ctx->frame->header[0] >> 7) & 1;
                ctx->frame->rsv1 = (ctx->frame->header[0] >> 6) & 1;
                ctx->frame->rsv2 = (ctx->frame->header[0] >> 5) & 1;
                ctx->frame->rsv3 = (ctx->frame->header[0] >> 4) & 1;
                ctx->frame->opcode = ctx->frame->header[0] & 0xf;

                ctx->frame->mask = (ctx->frame->header[1] >> 7) & 1;
                ctx->frame->payload_len = ctx->frame->header[1] & 0x7f;

                ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_REAL_SIZE_STEP;

                break;

            case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_REAL_SIZE_STEP:

                if (ctx->frame->payload_len == 126) {
                    if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 2)) != NGX_OK) {
                        goto exit;
                    }
                    uint16_t len;
                    ngx_memcpy(&len, ctx->frame->header, 2);
                    ctx->frame->payload_len = ntohs(len);
                } else if (ctx->frame->payload_len == 127) {
                    if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 8)) != NGX_OK) {
                        goto exit;
                    }
                    uint64_t len;
                    ngx_memcpy(&len, ctx->frame->header, 8);
                    ctx->frame->payload_len = ngx_http_push_stream_ntohll(len);
                }

                ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_MASK_KEY_STEP;

                break;

            case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_MASK_KEY_STEP:

                if (ctx->frame->mask) {
                    if ((rc = ngx_http_push_stream_recv(c, rev, &buf, 4)) != NGX_OK) {
                        goto exit;
                    }

                    ngx_memcpy(ctx->frame->mask_key, buf.start, 4);
                }

                ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_PAYLOAD_STEP;

                break;

            case NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_GET_PAYLOAD_STEP:

                if (ctx->frame->payload_len > 0) {
                    //create a temporary pool to allocate temporary elements
                    if (ctx->temp_pool == NULL) {
                        if ((ctx->temp_pool = ngx_create_pool(4096, r->connection->log)) == NULL) {
                            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool");
                            goto finalize;
                        }

                        if ((ctx->frame->payload = ngx_pcalloc(ctx->temp_pool, ctx->frame->payload_len)) == NULL) {
                            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for payload");
                            goto finalize;
                        }

                        ctx->frame->last = ctx->frame->payload;
                    }

                    ngx_http_push_stream_set_buffer(&buf, ctx->frame->payload, ctx->frame->last, ctx->frame->payload_len);

                    if ((rc = ngx_http_push_stream_recv(c, rev, &buf, ctx->frame->payload_len)) != NGX_OK) {
                        goto exit;
                    }

                    if (cf->websocket_allow_publish && (ctx->frame->opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_OPCODE)) {
                        if (ctx->frame->mask) {
                            for (i = 0; i < ctx->frame->payload_len; i++) {
                                ctx->frame->payload[i] = ctx->frame->payload[i] ^ ctx->frame->mask_key[i % 4];
                            }
                        }

                        if (!ngx_http_push_stream_is_utf8(ctx->frame->payload, ctx->frame->payload_len)) {
                            goto finalize;
                        }

                        for (q = ngx_queue_head(&ctx->subscriber->subscriptions); q != ngx_queue_sentinel(&ctx->subscriber->subscriptions); q = ngx_queue_next(q)) {
                            ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, queue);
                            if (subscription->channel->for_events) {
                                // skip events channel on publish by websocket connections
                                continue;
                            }

                            if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, cf->store_messages, ctx->temp_pool) != NGX_OK) {
                                goto finalize;
                            }
                        }
                        ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
                    }

                    if (ctx->temp_pool != NULL) {
                        ngx_destroy_pool(ctx->temp_pool);
                        ctx->temp_pool = NULL;
                    }
                }

                if (ctx->frame->opcode == NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE) {
                    ngx_http_push_stream_send_response_finalize(r);
                } else {
                    ctx->frame->step = NGX_HTTP_PUSH_STREAM_WEBSOCKET_READ_START_STEP;
                    ctx->frame->last = NULL;
                }
                return;

                break;

            default:
                ngx_log_debug(NGX_LOG_DEBUG, c->log, 0, "push stream module: unknown websocket step (%d)", ctx->frame->step);
                goto finalize;
                break;
        }

        ngx_http_push_stream_set_buffer(&buf, ctx->frame->header, NULL, 8);
    }

exit:
    if (rc == NGX_AGAIN) {
        ctx->frame->last = buf.last;
        if (!c->read->ready) {
            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: failed to restore read events");
                goto finalize;
            }
        }
    }

    if (rc == NGX_ERROR) {
        rev->eof = 1;
        c->error = 1;
        ngx_log_error(NGX_LOG_INFO, c->log, ngx_socket_errno, "push stream module: client closed prematurely connection");
        goto finalize;
    }

    return;

finalize:
    ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context);
    ngx_http_finalize_request(r, c->error ? NGX_HTTP_CLIENT_CLOSED_REQUEST : NGX_OK);
}


ngx_int_t
ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len)
{
    ssize_t n = c->recv(c, buf->last, (ssize_t) len - (buf->last - buf->start));

    if (n == NGX_AGAIN) {
        return NGX_AGAIN;
    }

    if ((n == NGX_ERROR) || (n == 0)) {
        return NGX_ERROR;
    }

    buf->last += n;

    if ((buf->last - buf->start) < len) {
        return NGX_AGAIN;
    }

    return NGX_OK;
}


void
ngx_http_push_stream_set_buffer(ngx_buf_t *buf, u_char *start, u_char *last, ssize_t len)
{
    buf->start = start;
    buf->pos = buf->start;
    buf->last = (last != NULL) ? last : start;
    buf->end = buf->start + len;
    buf->temporary = 0;
    buf->memory = 1;
}