Commit b55c1349 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding support for websocket

parent dce9048a
......@@ -306,6 +306,86 @@ Create a html page with the content on **Client** part, access it from browser a
* _push_stream_message_template_ should be exactly like as the example to be used with PushStream class
* EventSource and Forever iFrame may be combined setting _/sub_ and _/ev_ locations on same server and setting *modes: "eventsource|stream"* on client. With that if the browser supports Event Source, it will use it, if not it will use iFrame.
h2(#websocket). WebSocket
Using WebSocket to receive the messages.
*This example uses the PushStream class present in _misc/js/pushstream.js_ file, copy it to your server htdocs.*
Configure your server like suggested bellow. You should complete this configuration with other directives according with target application.
Create a html page with the content on **Client** part, access it from browser and try with the command *curl http://localhost/pub?id=ch1 -d "Some Text"*.
*Server:*
<pre>
location /pub {
# activate publisher (admin) mode for this location
push_stream_publisher admin;
# query string based channel id
set $push_stream_channel_id $arg_id;
}
location ~ /ws/(.*) {
# activate websocket mode for this location
push_stream_websocket;
# positional channel path
set $push_stream_channels_path $1;
# message template
push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":\"~text~\"}";
# store messages in memory
push_stream_store_messages on;
push_stream_websocket_allow_publish on;
# ping frequency
push_stream_ping_message_interval 10s;
}
</pre>
*Client:*
<pre>
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
"http://www.w3.org/TR/html4/strict.dtd">
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
<title>WebSocket Example</title>
</head>
<body>
<p>Messages:</p>
<div id="messages" style="width:800px;height:300px;overflow:scroll;"></div>
<script src="/js/pushstream.js" type="text/javascript" language="javascript" charset="utf-8"></script>
<script type="text/javascript" language="javascript" charset="utf-8">
// <![CDATA[
function messageReceived(text, id, channel) {
document.getElementById('messages').innerHTML += id + ': ' + text + '<br>';
};
var pushstream = new PushStream({
host: window.location.hostname,
port: window.location.port,
modes: "websocket"
});
pushstream.onmessage = messageReceived;
pushstream.addChannel('ch1');
pushstream.connect();
// ]]>
</script>
</body>
</html>
</pre>
*Observations:*
* _push_stream_message_template_ should be exactly like as the example to be used with PushStream class
* WebSocket, EventSource and Forever iFrame may be combined setting _/ws_, _/sub_ and _/ev_ locations on same server and setting *modes: "websocket|eventsource|stream"* on client. With that if the browser supports Websocket or Event Source, it will use it, if not it will use iFrame, following the order on _modes_ attribute.
h2(#long-polling). Long Polling
Using jQuery to receive the messages.
......@@ -533,6 +613,29 @@ The polling and long-polling modes could be set by the request header *X-Nginx-P
</pre>
h2(#push_stream_websocket). push_stream_websocket
*syntax:* _push_stream_websocket_
*default:* _none_
*context:* _location_
*release version:* _0.3.2_
Defines a location as a subscriber using WebSocket protocol. This location represents a subscriber's interface to a channel's message queue.
This location only supports GET http method to receive published messages.
<pre>
# websocket subscriber location
location /ws/(.*) {
push_stream_websocket;
# positional channel path
set $push_stream_channels_path $1;
}
</pre>
h2(#push_stream_shared_memory_size). push_stream_shared_memory_size
*syntax:* _push_stream_shared_memory_size size_
......@@ -806,6 +909,19 @@ The length of time a long polling subscriber will stay connected waiting for a m
But, this operation is very important to help Nginx recycle memory consumed to send messages to susbscriber, allocated at pool request.
h2(#push_stream_websocket_allow_publish). push_stream_websocket_allow_publish
*syntax:* _push_stream_websocket_allow_publish on | off_
*default:* _off_
*context:* _location_
*release version:* _0.3.2_
Enable a WebSocket subscriber send messages to the channel it is connected (the first, if connected in more than one) through the same connection it is receiving the messages, using _send_ method from WebSocket interface.
h1(#attention). Attention
This module controls everything needed to send the messages to subscribers.
......
......@@ -42,6 +42,7 @@ typedef struct {
ngx_str_t *template;
ngx_uint_t index;
ngx_flag_t eventsource;
ngx_flag_t websocket;
} ngx_http_push_stream_template_queue_t;
typedef struct {
......@@ -79,6 +80,7 @@ typedef struct {
ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_connection_ttl;
ngx_msec_t longpolling_connection_ttl;
ngx_flag_t websocket_allow_publish;
} ngx_http_push_stream_loc_conf_t;
// shared memory segment name
......@@ -153,6 +155,7 @@ struct ngx_http_push_stream_subscriber_s {
typedef struct {
ngx_event_t *disconnect_timer;
ngx_event_t *ping_timer;
ngx_http_push_stream_subscriber_t *subscriber;
ngx_flag_t longpolling;
} ngx_http_push_stream_subscriber_ctx_t;
......@@ -204,7 +207,7 @@ static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix);
static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource);
static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID = ngx_string("ALL");
......@@ -216,6 +219,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_MUCH_BROADCAST_CHANNELS = ngx_st
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL = ngx_string("Subscribers limit per channel has been exceeded.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS = ngx_string("Subscriber could not create channels.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE = ngx_string("Number of channels were exceeded.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NO_MANDATORY_HEADERS_MESSAGE = ngx_string("Don't have at least one of the mandatory headers: Connection, Upgrade, Sec-WebSocket-Key and Sec-WebSocket-Version");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_WRONG_WEBSOCKET_VERSION_MESSAGE = ngx_string("Version not supported. Supported versions: 8, 13");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED = ngx_string("Channel deleted.");
#define NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID (void *) -1
......@@ -239,6 +244,19 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED = ngx_string("chunk
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ETAG = ngx_string("Etag");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH = ngx_string("If-None-Match");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_VARY = ngx_string("Vary");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_UPGRADE = ngx_string("Upgrade");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_CONNECTION = ngx_string("Connection");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_KEY = ngx_string("Sec-WebSocket-Key");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_VERSION = ngx_string("Sec-WebSocket-Version");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_ACCEPT = ngx_string("Sec-WebSocket-Accept");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_WEBSOCKET_UPGRADE = ngx_string("WebSocket");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_WEBSOCKET_CONNECTION = ngx_string("Upgrade");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_WEBSOCKET_SIGN_KEY = ngx_string("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_WEBSOCKET_SUPPORTED_VERSIONS = ngx_string("8, 13");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_101_STATUS_LINE = ngx_string("101 Switching Protocols");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_NORMAL = ngx_string("normal");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_ADMIN = ngx_string("admin");
......@@ -253,6 +271,27 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING = ngx_string("long
#define NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_NORMAL 3
#define NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN 4
#define NGX_HTTP_PUSH_STREAM_STATISTICS_MODE 5
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE 6
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_VERSION_8 8
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_VERSION_13 13
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_SHA1_SIGNED_HASH_LENGTH 20
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_FRAME_HEADER_MAX_LENGTH 144
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_LAST_FRAME 0x8
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_OPCODE 0x1
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE 0x8
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_OPCODE 0x9
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_P0NG_OPCODE 0xA
static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE = NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_OPCODE | (NGX_HTTP_PUSH_STREAM_WEBSOCKET_LAST_FRAME << 4);
static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE[] = {NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_OPCODE | (NGX_HTTP_PUSH_STREAM_WEBSOCKET_LAST_FRAME << 4), 0x00};
static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE[] = {NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_OPCODE | (NGX_HTTP_PUSH_STREAM_WEBSOCKET_LAST_FRAME << 4), 0x00};
static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_16_BYTE = 126;
static const u_char NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_64_BYTE = 127;
// other stuff
......
......@@ -32,6 +32,7 @@
#include <ngx_http_push_stream_module_ipc.h>
#include <ngx_http_push_stream_module_publisher.h>
#include <ngx_http_push_stream_module_subscriber.h>
#include <ngx_http_push_stream_module_websocket.h>
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 30; // 30 seconds
......@@ -56,6 +57,9 @@ static char * ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t
// subscriber
static char * ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// websockets
static char * ngx_http_push_stream_websocket(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// setup
static char * ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *));
static ngx_int_t ngx_http_push_stream_init_module(ngx_cycle_t *cycle);
......
......@@ -213,12 +213,14 @@ ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
// general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r);
static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool);
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_chunk(const u_char *text, off_t len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *text, off_t len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool);
......@@ -264,4 +266,7 @@ static ngx_str_t * ngx_http_push_stream_join_wi
static ngx_str_t * ngx_http_push_stream_get_formatted_current_time(ngx_pool_t *pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool);
uint64_t ngx_http_push_stream_htonll(uint64_t value);
uint64_t ngx_http_push_stream_ntohll(uint64_t value);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_UTILS_H_ */
/*
* Copyright (C) 2010-2011 Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com>
*
* This file is part of Nginx Push Stream Module.
*
* Nginx Push Stream Module is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Nginx Push Stream Module is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Nginx Push Stream Module. If not, see <http://www.gnu.org/licenses/>.
*
*
* ngx_http_push_stream_module_websocket.h
*
* Created: Oct 20, 2011
* Authors: Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com>
*/
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_WEBSOCKET_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_WEBSOCKET_H_
#if (NGX_HAVE_SHA1)
#include <ngx_sha1.h>
#endif
#include <ngx_http_push_stream_module_utils.h>
#include <ngx_http_push_stream_module_subscriber.h>
typedef struct {
unsigned char fin:1;
unsigned char rsv1:1;
unsigned char rsv2:1;
unsigned char rsv3:1;
unsigned char opcode:4;
unsigned char mask:1;
unsigned char mask_key[4];
uint64_t payload_len;
u_char *payload;
} ngx_http_push_stream_frame_t;
static ngx_int_t ngx_http_push_stream_websocket_handler(ngx_http_request_t *r);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_WEBSOCKET_H_ */
......@@ -45,7 +45,7 @@
var pushstream = new PushStream({
host: window.location.hostname,
port: window.location.port,
modes: "eventsource|stream"
modes: "websocket|eventsource|stream"
});
pushstream.onmessage = _manageEvent;
pushstream.onstatuschange = _statuschanged;
......
......@@ -118,8 +118,8 @@
return path;
};
var getSubscriberUrl = function(pushstream, prefix) {
var url = (pushstream.useSSL) ? "https://" : "http://";
var getSubscriberUrl = function(pushstream, prefix, websocket) {
var url = (websocket) ? ((pushstream.useSSL) ? "wss://" : "ws://") : ((pushstream.useSSL) ? "https://" : "http://");
url += pushstream.host;
url += ((pushstream.port != 80) && (pushstream.port != 443)) ? (":" + pushstream.port) : "";
url += prefix;
......@@ -182,6 +182,47 @@
/* wrappers */
var WebSocketWrapper = function(pushstream) {
if (!window.WebSocket && !window.MozWebSocket) throw "WebSocket not supported";
this.type = WebSocketWrapper.TYPE;
this.pushstream = pushstream;
this.connection = null;
};
WebSocketWrapper.TYPE = "WebSocket";
WebSocketWrapper.prototype = {
connect: function() {
this._closeCurrentConnection();
var url = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixWebsocket, true);
this.connection = (window.WebSocket) ? new window.WebSocket(url) : new window.MozWebSocket(url);
this.connection.onerror = linker(onerrorCallback, this);
this.connection.onclose = linker(onerrorCallback, this);
this.connection.onopen = linker(onopenCallback, this);
this.connection.onmessage = linker(onmessageCallback, this);
Log4js.info("[WebSocket] connecting to:", url);
},
disconnect: function() {
if (this.connection) {
Log4js.debug("[WebSocket] closing connection to:", this.connection.URL);
this._closeCurrentConnection();
this.pushstream._onclose();
}
},
_closeCurrentConnection: function() {
if (this.connection) {
try { this.connection.close(); } catch (e) { /* ignore error on closing */ }
this.connection = null;
}
},
sendMessage: function(message) {
if (this.connection) { this.connection.send(message); }
}
};
var EventSourceWrapper = function(pushstream) {
if (!window.EventSource) throw "EventSource not supported";
this.type = EventSourceWrapper.TYPE;
......@@ -440,16 +481,16 @@
this.urlPrefixStream = settings.urlPrefixStream || '/sub';
this.urlPrefixEventsource = settings.urlPrefixEventsource || '/ev';
this.urlPrefixLongpolling = settings.urlPrefixLongpolling || '/lp';
//this.urlPrefixWebsocket = settings.urlPrefixWebsocket || '/ws';
this.urlPrefixWebsocket = settings.urlPrefixWebsocket || '/ws';
this.modes = (settings.modes || 'eventsource|stream|longpolling').split('|');
//this.modes = (settings.modes || 'eventsource|websocket|stream|longpolling').split('|');
this.modes = (settings.modes || 'eventsource|websocket|stream|longpolling').split('|');
this.wrappers = [];
for ( var i = 0; i < this.modes.length; i++) {
try {
var wrapper = null;
switch (this.modes[i]) {
case "websocket" : wrapper = new WebSocketWrapper(this); break;
case "eventsource": wrapper = new EventSourceWrapper(this); break;
case "longpolling": wrapper = new LongPollingWrapper(this); break;
case "stream" : wrapper = new StreamWrapper(this); break;
......
......@@ -126,5 +126,18 @@ http {
# positional channel path
set $push_stream_channels_path $1;
}
location ~ /ws/(.*) {
# activate websocket mode for this location
push_stream_websocket;
# positional channel path
set $push_stream_channels_path $1;
# store messages in memory
push_stream_store_messages on;
push_stream_websocket_allow_publish on;
}
}
}
......@@ -30,6 +30,7 @@
#include <ngx_http_push_stream_module_ipc.c>
#include <ngx_http_push_stream_module_publisher.c>
#include <ngx_http_push_stream_module_subscriber.c>
#include <ngx_http_push_stream_module_websocket.c>
static ngx_str_t *
ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf)
......@@ -315,13 +316,14 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
}
static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource) {
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket) {
ngx_http_push_stream_template_queue_t *sentinel = &ngx_http_push_stream_module_main_conf->msg_templates;
ngx_http_push_stream_template_queue_t *cur = sentinel;
ngx_str_t *aux = NULL;
while ((cur = (ngx_http_push_stream_template_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (ngx_memn2cmp(cur->template->data, template.data, cur->template->len, template.len) == 0) {
if ((ngx_memn2cmp(cur->template->data, template.data, cur->template->len, template.len) == 0) &&
(cur->eventsource == eventsource) && (cur->websocket == websocket)) {
return cur->index;
}
}
......@@ -336,6 +338,7 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n
}
cur->template = aux;
cur->eventsource = eventsource;
cur->websocket = websocket;
cur->index = ngx_http_push_stream_module_main_conf->qtd_templates;
ngx_memcpy(cur->template->data, template.data, template.len);
ngx_queue_insert_tail(&ngx_http_push_stream_module_main_conf->msg_templates.queue, &cur->queue);
......
......@@ -44,6 +44,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, location_type),
NULL },
{ ngx_string("push_stream_websocket"),
NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
ngx_http_push_stream_websocket,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
/* Main directives*/
{ ngx_string("push_stream_shared_memory_size"),
......@@ -186,6 +192,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, longpolling_connection_ttl),
NULL },
{ ngx_string("push_stream_websocket_allow_publish"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, websocket_allow_publish),
NULL },
ngx_null_command
};
......@@ -447,6 +459,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
lcf->subscriber_connection_ttl = NGX_CONF_UNSET_MSEC;
lcf->longpolling_connection_ttl = NGX_CONF_UNSET_MSEC;
lcf->websocket_allow_publish = NGX_CONF_UNSET_UINT;
return lcf;
}
......@@ -469,9 +482,21 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_msec_value(conf->ping_message_interval, prev->ping_message_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_msec_value(conf->subscriber_connection_ttl, prev->subscriber_connection_ttl, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_msec_value(conf->longpolling_connection_ttl, prev->longpolling_connection_ttl, conf->subscriber_connection_ttl);
ngx_conf_merge_value(conf->websocket_allow_publish, prev->websocket_allow_publish, 0);
if (conf->location_type == NGX_CONF_UNSET_UINT) {
return NGX_CONF_OK;
}
// changing properties for event source support
if (conf->eventsource_support) {
if ((conf->location_type != NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING) &&
(conf->location_type != NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_POLLING) &&
(conf->location_type != NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: event source support is only available on subscriber location");
return NGX_CONF_ERROR;
}
conf->content_type.data = NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE.data;
conf->content_type.len = NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE.len;
......@@ -568,7 +593,13 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// formatting header and footer template for chunk transfer
if (conf->header_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_get_formatted_chunk(conf->header_template.data, conf->header_template.len, cf->pool);
ngx_str_t *aux = NULL;
if (conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->header_template.data, conf->header_template.len, cf->pool);
} else {
aux = ngx_http_push_stream_get_formatted_chunk(conf->header_template.data, conf->header_template.len, cf->pool);
}
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format header template");
return NGX_CONF_ERROR;
......@@ -578,7 +609,13 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
}
if (conf->footer_template.len > 0) {
ngx_str_t *aux = ngx_http_push_stream_get_formatted_chunk(conf->footer_template.data, conf->footer_template.len, cf->pool);
ngx_str_t *aux = NULL;
if (conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->footer_template.data, conf->footer_template.len, cf->pool);
} else {
aux = ngx_http_push_stream_get_formatted_chunk(conf->footer_template.data, conf->footer_template.len, cf->pool);
}
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format footer template");
return NGX_CONF_ERROR;
......@@ -587,7 +624,12 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->footer_template.len = aux->len;
}
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, conf->eventsource_support);
if ((conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_POLLING) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE)) {
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, conf->eventsource_support, (conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE));
}
return NGX_CONF_OK;
}
......@@ -697,6 +739,28 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
}
static char *
ngx_http_push_stream_websocket(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_websocket_handler);
#if (NGX_HAVE_SHA1)
if (rc == NGX_CONF_OK) {
ngx_http_push_stream_loc_conf_t *pslcf = conf;
pslcf->location_type = NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE;
pslcf->index_channels_path = ngx_http_get_variable_index(cf, &ngx_http_push_stream_channels_path);
if (pslcf->index_channels_path == NGX_ERROR) {
rc = NGX_CONF_ERROR;
}
}
#else
rc = NGX_CONF_ERROR;
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: sha1 support is needed to use WebSocket");
#endif
return rc;
}
// shared memory
static ngx_int_t
ngx_http_push_stream_set_up_shm(ngx_conf_t *cf, size_t shm_size)
......
......@@ -97,6 +97,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
ngx_http_send_header(r);
// sending response content header
......@@ -179,6 +180,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
worker_subscriber->longpolling = 1;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
if (ngx_http_push_stream_registry_subscriber_locked(r, worker_subscriber) == NGX_ERROR) {
ngx_shmtx_unlock(&shpool->mutex);
......@@ -498,8 +500,6 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = -1;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
return worker_subscriber;
}
......@@ -523,12 +523,13 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
// adding subscriber to woker list of subscribers
ngx_queue_insert_tail(&thisworker_data->subscribers_sentinel->queue, &element_subscriber->queue);
if ((connection_ttl != NGX_CONF_UNSET_MSEC) || (cf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if ((ctx = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_http_push_stream_subscriber_ctx_t))) == NULL) {
return NGX_ERROR;
}
ctx->longpolling = worker_subscriber->longpolling;
ctx->subscriber = worker_subscriber;
if ((ctx = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_http_push_stream_subscriber_ctx_t))) == NULL) {
return NGX_ERROR;
}
ctx->longpolling = worker_subscriber->longpolling;
if ((connection_ttl != NGX_CONF_UNSET_MSEC) || (cf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if (connection_ttl != NGX_CONF_UNSET_MSEC) {
if ((ctx->disconnect_timer = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_event_t))) == NULL) {
......@@ -555,10 +556,10 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
ctx->ping_timer->log = worker_subscriber->request->connection->log;
ngx_http_push_stream_timer_reset(cf->ping_message_interval, ctx->ping_timer);
}
ngx_http_set_ctx(worker_subscriber->request, ctx, ngx_http_push_stream_module);
}
ngx_http_set_ctx(worker_subscriber->request, ctx, ngx_http_push_stream_module);
// increment global subscribers count
data->subscribers++;
thisworker_data->subscribers++;
......
......@@ -243,17 +243,22 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
return NULL;
}
ngx_str_t *chunk = ngx_http_push_stream_get_formatted_chunk(aux->data, aux->len, temp_pool);
ngx_str_t *text = NULL;
if (cur->websocket) {
text = ngx_http_push_stream_get_formatted_websocket_frame(aux->data, aux->len, temp_pool);
} else {
text = ngx_http_push_stream_get_formatted_chunk(aux->data, aux->len, temp_pool);
}
ngx_str_t *formmated = (msg->formatted_messages + i);
if ((chunk == NULL) || ((formmated->data = ngx_slab_alloc_locked(shpool, chunk->len + 1)) == NULL)) {
if ((text == NULL) || ((formmated->data = ngx_slab_alloc_locked(shpool, text->len + 1)) == NULL)) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
formmated->len = chunk->len;
formmated->len = text->len;
ngx_memset(formmated->data, '\0', formmated->len + 1);
ngx_memcpy(formmated->data, chunk->data, formmated->len);
ngx_memcpy(formmated->data, text->data, formmated->len);
i++;
}
......@@ -484,7 +489,11 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
ngx_http_push_stream_send_response_text(r, pslcf->footer_template.data, pslcf->footer_template.len, 0);
}
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE), 1);
} else {
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
}
ngx_http_finalize_request(r, NGX_DONE);
}
......@@ -784,6 +793,8 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
if (pslcf->eventsource_support) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
} else if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
} else {
rc = ngx_http_push_stream_send_response_message(r, NULL, ngx_http_push_stream_ping_msg);
}
......@@ -930,8 +941,10 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc
ngx_queue_remove(&cur->queue);
}
ngx_queue_init(&sentinel->queue);
ngx_queue_remove(&worker_subscriber->worker_subscriber_element_ref->queue);
ngx_queue_init(&worker_subscriber->worker_subscriber_element_ref->queue);
if (worker_subscriber->worker_subscriber_element_ref != NULL) {
ngx_queue_remove(&worker_subscriber->worker_subscriber_element_ref->queue);
ngx_queue_init(&worker_subscriber->worker_subscriber_element_ref->queue);
}
worker_subscriber->clndata->worker_subscriber = NULL;
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers);
......@@ -1014,6 +1027,60 @@ ngx_http_push_stream_get_formatted_chunk(const u_char *text, off_t len, ngx_pool
}
uint64_t
ngx_http_push_stream_htonll(uint64_t value) {
int num = 42;
if (*(char *)&num == 42) {
uint32_t high_part = htonl((uint32_t)(value >> 32));
uint32_t low_part = htonl((uint32_t)(value & 0xFFFFFFFFLL));
return (((uint64_t)low_part) << 32) | high_part;
} else {
return value;
}
}
uint64_t
ngx_http_push_stream_ntohll(uint64_t value) {
int num = 42;
if (*(char *)&num == 42) {
uint32_t high_part = ntohl((uint32_t)(value >> 32));
uint32_t low_part = ntohl((uint32_t)(value & 0xFFFFFFFFLL));
return (((uint64_t)low_part) << 32) | high_part;
} else {
return value;
}
}
static ngx_str_t *
ngx_http_push_stream_get_formatted_websocket_frame(const u_char *text, off_t len, ngx_pool_t *temp_pool)
{
ngx_str_t *frame;
u_char *last;
frame = ngx_http_push_stream_create_str(temp_pool, NGX_HTTP_PUSH_STREAM_WEBSOCKET_FRAME_HEADER_MAX_LENGTH + len);
if (frame != NULL) {
last = ngx_copy(frame->data, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_TEXT_LAST_FRAME_BYTE));
if (len <= 125) {
last = ngx_copy(last, &len, 1);
} else if (len < (1 << 16)) {
last = ngx_copy(last, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_16_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_16_BYTE));
uint16_t len_net = htons(len);
last = ngx_copy(last, &len_net, 2);
} else {
last = ngx_copy(last, &NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_64_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PAYLOAD_LEN_64_BYTE));
uint64_t len_net = ngx_http_push_stream_htonll(len);
last = ngx_copy(last, &len_net, 8);
}
last = ngx_copy(last, text, len);
frame->len = last - frame->data;
}
return frame;
}
static ngx_str_t *
ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len)
{
......@@ -1159,3 +1226,134 @@ ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modi
}
}
}
/**
* Copied from nginx code to only send headers added on this module code
* */
static ngx_int_t
ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r)
{
size_t len;
ngx_str_t *status_line = NULL;
ngx_buf_t *b;
ngx_uint_t i;
ngx_chain_t out;
ngx_list_part_t *part;
ngx_table_elt_t *header;
if (r->header_sent) {
return NGX_OK;
}
r->header_sent = 1;
if (r != r->main) {
return NGX_OK;
}
if (r->http_version < NGX_HTTP_VERSION_10) {
return NGX_OK;
}
if (r->method == NGX_HTTP_HEAD) {
r->header_only = 1;
}
if (r->headers_out.last_modified_time != -1) {
if (r->headers_out.status != NGX_HTTP_OK
&& r->headers_out.status != NGX_HTTP_PARTIAL_CONTENT
&& r->headers_out.status != NGX_HTTP_NOT_MODIFIED)
{
r->headers_out.last_modified_time = -1;
r->headers_out.last_modified = NULL;
}
}
len = sizeof("HTTP/1.x ") - 1 + sizeof(CRLF) - 1
/* the end of the header */
+ sizeof(CRLF) - 1;
/* status line */
if (r->headers_out.status_line.len) {
len += r->headers_out.status_line.len;
status_line = &r->headers_out.status_line;
}
part = &r->headers_out.headers.part;
header = part->elts;
for (i = 0; /* void */; i++) {
if (i >= part->nelts) {
if (part->next == NULL) {
break;
}
part = part->next;
header = part->elts;
i = 0;
}
if (header[i].hash == 0) {
continue;
}
len += header[i].key.len + sizeof(": ") - 1 + header[i].value.len + sizeof(CRLF) - 1;
}
b = ngx_create_temp_buf(r->pool, len);
if (b == NULL) {
return NGX_ERROR;
}
/* "HTTP/1.x " */
b->last = ngx_cpymem(b->last, "HTTP/1.1 ", sizeof("HTTP/1.x ") - 1);
/* status line */
if (status_line) {
b->last = ngx_copy(b->last, status_line->data, status_line->len);
}
*b->last++ = CR; *b->last++ = LF;
part = &r->headers_out.headers.part;
header = part->elts;
for (i = 0; /* void */; i++) {
if (i >= part->nelts) {
if (part->next == NULL) {
break;
}
part = part->next;
header = part->elts;
i = 0;
}
if (header[i].hash == 0) {
continue;
}
b->last = ngx_copy(b->last, header[i].key.data, header[i].key.len);
*b->last++ = ':'; *b->last++ = ' ';
b->last = ngx_copy(b->last, header[i].value.data, header[i].value.len);
*b->last++ = CR; *b->last++ = LF;
}
/* the end of HTTP header */
*b->last++ = CR; *b->last++ = LF;
r->header_size = b->last - b->pos;
if (r->header_only) {
b->last_buf = 1;
}
out.buf = b;
out.next = NULL;
b->flush = 1;
return ngx_http_write_filter(r, &out);
}
This diff is collapsed.
......@@ -149,6 +149,7 @@ module BaseTestCase
@min_message_buffer_timeout = '50m'
@ping_message_interval = '10s'
@store_messages = 'on'
@websocket_allow_publish = nil
@subscriber_connection_timeout = nil
@longpolling_connection_timeout = nil
@memory_cleanup_timeout = '5m'
......@@ -334,6 +335,27 @@ http {
<%= "push_stream_broadcast_channel_max_qtd #{@broadcast_channel_max_qtd};" unless @broadcast_channel_max_qtd.nil? %>
}
location ~ /ws/(.*)? {
# activate websocket mode for this location
push_stream_websocket;
# positional channel path
set $push_stream_channels_path $1;
# store messages
<%= "push_stream_store_messages #{@store_messages};" unless @store_messages.nil? %>
# allow subscriber to publish
<%= "push_stream_websocket_allow_publish #{@websocket_allow_publish};" unless @websocket_allow_publish.nil? %>
# header to be sent when receiving new subscriber connection
<%= %{push_stream_header_template "#{@header_template}";} unless @header_template.nil? %>
# message template
<%= %{push_stream_message_template "#{@message_template}";} unless @message_template.nil? %>
# footer to be sent when finishing subscriber connection
<%= %{push_stream_footer_template "#{@footer_template}";} unless @footer_template.nil? %>
}
<%= @extra_location %>
}
}
......
......@@ -57,8 +57,12 @@ describe("PushStream", function() {
expect(pushstream.urlPrefixLongpolling).toBe('/lp');
});
it("should use '/ws' as url prefix for websocket", function() {
expect(pushstream.urlPrefixWebsocket).toBe('/ws');
});
it("should has all modes availables", function() {
expect(pushstream.modes).toEqual(['eventsource', 'stream', 'longpolling']);
expect(pushstream.modes).toEqual(['eventsource', 'websocket', 'stream', 'longpolling']);
});
it("should define callbacks attributes", function() {
......
......@@ -265,6 +265,43 @@ class TestSetuParameters < Test::Unit::TestCase
assert(!stderr_msg.include?(expected_error_message), "Message error founded: '#{ stderr_msg }'")
self.stop_server
end
def test_event_source_not_available_on_publisher_statistics_and_websocket_locations
expected_error_message = "push stream module: event source support is only available on subscriber location"
@extra_location = %q{
location ~ /test/ {
push_stream_websocket;
push_stream_eventsource_support on;
}
}
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
@extra_location = %q{
location ~ /test/ {
push_stream_publisher;
push_stream_eventsource_support on;
}
}
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
@extra_location = %q{
location ~ /test/ {
push_stream_channels_statistics;
push_stream_eventsource_support on;
}
}
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
end
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment