ngx_http_push_stream_module_publisher.c 17.5 KB
Newer Older
1
/*
Wandenberg's avatar
Wandenberg committed
2
 * Copyright (C) 2010-2015 Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com>
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
 *
 * This file is part of Nginx Push Stream Module.
 *
 * Nginx Push Stream Module is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Nginx Push Stream Module is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Nginx Push Stream Module.  If not, see <http://www.gnu.org/licenses/>.
 *
 *
 * ngx_http_push_stream_module_publisher.c
 *
 * Created: Oct 26, 2010
 * Authors: Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com>
 */

26
#include <ngx_http_push_stream_module_publisher.h>
27
#include <ngx_http_push_stream_module_version.h>
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
28

29
static ngx_int_t    ngx_http_push_stream_publisher_handle_after_read_body(ngx_http_request_t *r, ngx_http_client_body_handler_pt post_handler);
30

Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
31 32 33
static ngx_int_t
ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
{
34
    ngx_http_push_stream_main_conf_t   *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
35
    ngx_http_push_stream_loc_conf_t    *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
36 37
    ngx_http_push_stream_module_ctx_t  *ctx;

38
    ngx_http_push_stream_requested_channel_t       *requested_channels, *requested_channel;
39
    ngx_str_t                                       vv_allowed_origins = ngx_null_string;
40
    ngx_queue_t                                     *q;
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
41

42
    ngx_http_push_stream_set_expires(r, NGX_HTTP_PUSH_STREAM_EXPIRES_EPOCH, 0);
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
43

44 45 46 47 48 49 50

    if (cf->allowed_origins != NULL) {
        ngx_http_push_stream_complex_value(r, cf->allowed_origins, &vv_allowed_origins);
    }

    if (vv_allowed_origins.len > 0) {
        ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN, &vv_allowed_origins);
Wandenberg's avatar
Wandenberg committed
51 52
        const ngx_str_t *header_value = (cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) ? &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_DELETE_METHODS : &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_METHODS;
        ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ACCESS_CONTROL_ALLOW_METHODS, header_value);
53
        ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ACCESS_CONTROL_ALLOW_HEADERS, &NGX_HTTP_PUSH_STREAM_ALLOWED_HEADERS);
54 55 56 57
    }

    if (r->method & NGX_HTTP_OPTIONS) {
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, NULL);
58
    }
59

60
    // only accept GET, POST, PUT and DELETE methods if enable publisher administration
61 62
    if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && !(r->method & (NGX_HTTP_GET|NGX_HTTP_POST|NGX_HTTP_PUT|NGX_HTTP_DELETE))) {
        ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_DELETE_METHODS);
63 64 65
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
    }

66
    // only accept GET, POST and PUT methods if NOT enable publisher administration
67 68
    if ((cf->location_type != NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && !(r->method & (NGX_HTTP_GET|NGX_HTTP_POST|NGX_HTTP_PUT))) {
        ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_METHODS);
69
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
70 71
    }

72 73 74
    if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
75
    }
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
76

77
    //get channels ids
78 79
    requested_channels = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
    if ((requested_channels == NULL) || ngx_queue_empty(&requested_channels->queue)) {
80 81 82 83
        ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the push_stream_channels_path is required but is not set");
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
    }

84 85 86
    for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
        requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);

87
        // check if channel id isn't equals to ALL or contain wildcard
88
        if ((ngx_memn2cmp(requested_channel->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, requested_channel->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(requested_channel->id->data, '*') != NULL)) {
89
            return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE);
90 91
        }

92
        // could not have a large size
93 94
        if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (requested_channel->id->len > mcf->max_channel_id_length)) {
            ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", requested_channel->id->len);
95
            return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
96 97
        }

98 99
        if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
            // create the channel if doesn't exist
100
            requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf);
101
            if (requested_channel->channel == NULL) {
102 103
                return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
            }
104

105
            if (requested_channel->channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
106
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: number of channels were exceeded");
107 108
                return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE);
            }
109 110
        } else {
            requested_channel->channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
111
        }
112 113 114 115 116

        if ((r->method != NGX_HTTP_GET) && (requested_channel->channel != NULL) && requested_channel->channel->for_events) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: only internal routines can change events channel");
            return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_INTERNAL_ONLY_EVENTS_CHANNEL_MESSAGE);
        }
117
    }
118

119
    ctx->requested_channels = requested_channels;
120

121 122
    if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
        return ngx_http_push_stream_publisher_handle_after_read_body(r, ngx_http_push_stream_publisher_body_handler);
123
    }
124

125
    if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && (r->method == NGX_HTTP_DELETE)) {
126
        return ngx_http_push_stream_publisher_handle_after_read_body(r, ngx_http_push_stream_publisher_delete_handler);
127 128
    }

129
    return ngx_http_push_stream_send_response_channels_info_detailed(r, requested_channels);
130
}
131

132
static ngx_int_t
133
ngx_http_push_stream_publisher_handle_after_read_body(ngx_http_request_t *r, ngx_http_client_body_handler_pt post_handler)
134 135 136 137 138 139 140 141 142 143 144 145 146
{
    ngx_int_t                           rc;

    /*
     * Instruct ngx_http_read_subscriber_request_body to store the request
     * body entirely in a memory buffer or in a file.
     */
    r->request_body_in_single_buf = 0;
    r->request_body_in_persistent_file = 1;
    r->request_body_in_clean_file = 0;
    r->request_body_file_log_level = 0;

    // parse the body message and return
147
    rc = ngx_http_read_client_request_body(r, post_handler);
148 149 150 151 152
    if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
        return rc;
    }

    return NGX_DONE;
153
}
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
154

155 156 157 158 159 160 161 162 163 164
static ngx_buf_t *
ngx_http_push_stream_read_request_body_to_buffer(ngx_http_request_t *r)
{
    ngx_buf_t                              *buf = NULL;
    ngx_chain_t                            *chain;
    ssize_t                                 n;
    off_t                                   len;

    buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n + 1);
    if (buf != NULL) {
165 166
        buf->memory = 1;
        buf->temporary = 0;
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
        ngx_memset(buf->start, '\0', r->headers_in.content_length_n + 1);

        chain = r->request_body->bufs;
        while ((chain != NULL) && (chain->buf != NULL)) {
            len = ngx_buf_size(chain->buf);
            // if buffer is equal to content length all the content is in this buffer
            if (len >= r->headers_in.content_length_n) {
                buf->start = buf->pos;
                buf->last = buf->pos;
                len = r->headers_in.content_length_n;
            }

            if (chain->buf->in_file) {
                n = ngx_read_file(chain->buf->file, buf->start, len, 0);
                if (n == NGX_FILE_ERROR) {
182
                    ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: cannot read file with request body");
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
                    return NULL;
                }
                buf->last = buf->last + len;
                ngx_delete_file(chain->buf->file->name.data);
                chain->buf->file->fd = NGX_INVALID_FILE;
            } else {
                buf->last = ngx_copy(buf->start, chain->buf->pos, len);
            }

            chain = chain->next;
            buf->start = buf->last;
        }
    }
    return buf;
}

static void
ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
{
202
    ngx_http_push_stream_main_conf_t       *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
203
    ngx_http_push_stream_module_ctx_t      *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
204
    ngx_buf_t                              *buf = NULL;
205 206
    u_char                                 *text = mcf->channel_deleted_message_text.data;
    size_t                                  len = mcf->channel_deleted_message_text.len;
207 208 209
    ngx_uint_t                              qtd_channels = 0;

    ngx_http_push_stream_requested_channel_t       *requested_channel;
210
    ngx_queue_t                                    *q;
211 212 213 214 215 216 217 218 219 220 221 222 223

    if (r->headers_in.content_length_n > 0) {

        // get and check if has access to request body
        NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(r->request_body->bufs, NULL, r, "push stream module: unexpected publisher message request body buffer location. please report this to the push stream module developers.");

        buf = ngx_http_push_stream_read_request_body_to_buffer(r);
        NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message");

        text = buf->pos;
        len = ngx_buf_size(buf);
    }

224 225
    for (q = ngx_queue_head(&ctx->requested_channels->queue); q != ngx_queue_sentinel(&ctx->requested_channels->queue); q = ngx_queue_next(q)) {
        requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
226
        if (ngx_http_push_stream_delete_channel(mcf, requested_channel->channel, text, len, r->pool)) {
227 228 229
            qtd_channels++;
        }
    }
230

231
    if (qtd_channels == 0) {
232
        ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_NOT_FOUND, NULL);
233
    } else {
234
        ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
235
    }
236 237
}

Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
238 239 240
static void
ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
{
241
    ngx_str_t                              *event_id, *event_type;
242
    ngx_http_push_stream_module_ctx_t      *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
243
    ngx_http_push_stream_main_conf_t       *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
244
    ngx_http_push_stream_loc_conf_t        *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
245
    ngx_buf_t                              *buf = NULL;
246

247
    ngx_http_push_stream_requested_channel_t       *requested_channel;
248
    ngx_queue_t                                    *q;
249

250 251
    // check if body message wasn't empty
    if (r->headers_in.content_length_n <= 0) {
252
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: Post request was sent with no message");
253
        ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE);
254 255
        return;
    }
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
256

257 258 259 260 261
    // get and check if has access to request body
    NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(r->request_body->bufs, NULL, r, "push stream module: unexpected publisher message request body buffer location. please report this to the push stream module developers.");


    // copy request body to a memory buffer
262
    buf = ngx_http_push_stream_read_request_body_to_buffer(r);
263
    NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message");
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
264

265
    event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
266
    event_type = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE);
267

268 269
    for (q = ngx_queue_head(&ctx->requested_channels->queue); q != ngx_queue_sentinel(&ctx->requested_channels->queue); q = ngx_queue_next(q)) {
        requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
270

271
        if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, requested_channel->channel, buf->pos, ngx_buf_size(buf), event_id, event_type, cf->store_messages, r->pool) != NGX_OK) {
272 273 274
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
275 276
    }

277
    if (cf->channel_info_on_publish) {
278
        ngx_http_push_stream_send_response_channels_info_detailed(r, ctx->requested_channels);
279
        ngx_http_finalize_request(r, NGX_OK);
280
    } else {
281
        ngx_http_push_stream_send_only_header_response_and_finalize(r, NGX_HTTP_OK, NULL);
282
    }
283
}
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
284

285
static ngx_int_t
286
ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r)
287
{
288
    ngx_http_push_stream_main_conf_t   *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
289
    char                               *pos = NULL;
290

291 292 293
    ngx_http_push_stream_requested_channel_t       *requested_channels, *requested_channel;
    ngx_queue_t                                     *q;

294

295
    ngx_http_push_stream_set_expires(r, NGX_HTTP_PUSH_STREAM_EXPIRES_EPOCH, 0);
296

297 298 299 300 301
    // only accept GET method
    if (!(r->method & NGX_HTTP_GET)) {
        ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET);
        return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
    }
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
302

303 304 305
    ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TAG, &NGX_HTTP_PUSH_STREAM_TAG);
    ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_COMMIT, &NGX_HTTP_PUSH_STREAM_COMMIT);

306
    //get channels ids
307
    requested_channels = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
308

309
    // if not specify a channel id, get info about all channels in a resumed way
310
    if ((requested_channels == NULL) || ngx_queue_empty(&requested_channels->queue)) {
311 312
        return ngx_http_push_stream_send_response_all_channels_info_summarized(r);
    }
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
313

314 315 316
    for (q = ngx_queue_head(&requested_channels->queue); q != ngx_queue_sentinel(&requested_channels->queue); q = ngx_queue_next(q)) {
        requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);

317
        // could not have a large size
318 319
        if ((mcf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (requested_channel->id->len > mcf->max_channel_id_length)) {
            ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", requested_channel->id->len);
320
            return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
321
        }
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
322

323
        if ((pos = ngx_strchr(requested_channel->id->data, '*')) != NULL) {
324
            ngx_str_t *aux = NULL;
325
            if (pos != (char *) requested_channel->id->data) {
326
                *pos = '\0';
327 328
                requested_channel->id->len  = ngx_strlen(requested_channel->id->data);
                aux = requested_channel->id;
329 330 331
            }
            return ngx_http_push_stream_send_response_all_channels_info_detailed(r, aux);
        }
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
332

333
        // if specify a channel id equals to ALL, get info about all channels in a detailed way
334
        if (ngx_memn2cmp(requested_channel->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, requested_channel->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) {
335 336
            return ngx_http_push_stream_send_response_all_channels_info_detailed(r, NULL);
        }
337 338

        requested_channel->channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
339
    }
340

341
    // if specify a channels ids != ALL, get info about specified channels if they exists
342
    return ngx_http_push_stream_send_response_channels_info_detailed(r, requested_channels);
Wandenberg Peixoto's avatar
Wandenberg Peixoto committed
343
}