Commit 42ebbfb9 authored by Wandenberg's avatar Wandenberg

change push_stream_websocket directive to be a subtype of push_stream_subscriber directive

parent ff1afae7
h1(#changelog). Changelog
* Changed push_stream_websocket directive to be a subtype of push_stream_subscriber directive
* Changed push_stream_eventsource_support directive to be a subtype of push_stream_subscriber directive
* Fix to support gzip usage
* Added the feature to send a custom 'channel delete message' on the body of the DELETE request
......
......@@ -105,7 +105,6 @@ h1(#directives). Directives
| "push_stream_channels_statistics":push_stream_channels_statistics |   x |   - |   - |   - |   - |   - |
| "push_stream_publisher":push_stream_publisher |   x |   - |   - |   - |   - |   - |
| "push_stream_subscriber":push_stream_subscriber |   x |   - |   - |   - |   - |   - |
| "push_stream_websocket":push_stream_websocket |   x |   - |   - |   - |   - |   - |
| "push_stream_shared_memory_size":push_stream_shared_memory_size |   - |   x |   - |   - |   - |   - |
| "push_stream_shared_memory_cleanup_objects_ttl":push_stream_shared_memory_cleanup_objects_ttl |   - |   x |   - |   - |   - |   - |
| "push_stream_channel_deleted_message_text":push_stream_channel_deleted_message_text |   - |   x |   - |   - |   - |   - |
......@@ -212,7 +211,6 @@ h1(#contributors). Contributors
[push_stream_channels_statistics]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/channels_statistics.textile#push_stream_channels_statistics
[push_stream_publisher]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/publishers.textile#push_stream_publisher
[push_stream_subscriber]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_subscriber
[push_stream_websocket]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_websocket
[push_stream_shared_memory_size]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_shared_memory_size
[push_stream_shared_memory_cleanup_objects_ttl]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_shared_memory_cleanup_objects_ttl
[push_stream_channel_deleted_message_text]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_channel_deleted_message_text
......
......@@ -2,7 +2,7 @@ h1(#subscribers_configuration). Subscribers Configuration
h2(#push_stream_subscriber). push_stream_subscriber <a name="push_stream_subscriber" href="#">&nbsp;</a>
*syntax:* _push_stream_subscriber [streaming | polling | long-polling | eventsource]_
*syntax:* _push_stream_subscriber [streaming | polling | long-polling | eventsource | websocket]_
*default:* _streaming_
......@@ -13,6 +13,8 @@ This location only supports GET http method to receive published messages.
And has three possible values to set push mode: streaming, polling, long-polling. The default values is streaming.
The polling and long-polling modes could be set by the request header *X-Nginx-PushStream-Mode* overriding push_stream_subscriber directive value.
The eventsource mode enable "Event Source":eventsource_ref support for subscribers. Using headers Event-ID and Event-Type on publish is possible to set values to _id:_ and _event:_ attributes on message sent to subscribers.
The websocket mode enable subscriber to use WebSocket protocol.
<pre>
# streaming subscriber location
......@@ -53,26 +55,10 @@ The eventsource mode enable "Event Source":eventsource_ref support for subscribe
}
curl localhost/sub/ch1 #eventsource request
</pre>
h2(#push_stream_websocket). push_stream_websocket <a name="push_stream_websocket" href="#">&nbsp;</a>
*syntax:* _push_stream_websocket_
*default:* _none_
*context:* _location_
*release version:* _0.3.2_
Defines a location as a subscriber using WebSocket protocol. This location represents a subscriber's interface to a channel's message queue.
This location only supports GET http method to receive published messages.
<pre>
# websocket subscriber location
location /ws/(.*) {
push_stream_websocket;
# eventsource subscriber location
location /sub/(.*) {
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $1;
}
......
......@@ -19,7 +19,7 @@ Create a html page with the content on **Client** part, access it from browser a
location ~ /ws/(.*) {
# activate websocket mode for this location
push_stream_websocket;
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $1;
......@@ -86,7 +86,7 @@ If needed you can change this behavior changing the javascript usage, like the e
location /ws {
# activate websocket mode for this location
push_stream_websocket;
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $arg_channels;
......
......@@ -290,16 +290,17 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_STREAMING = ngx_string("stre
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_POLLING = ngx_string("polling");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING = ngx_string("long-polling");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_EVENTSOURCE = ngx_string("eventsource");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET = ngx_string("websocket");
#define NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING 0
#define NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_POLLING 1
#define NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING 2
#define NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE 3
#define NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET 4
#define NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_NORMAL 4
#define NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN 5
#define NGX_HTTP_PUSH_STREAM_STATISTICS_MODE 6
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE 7
#define NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_NORMAL 5
#define NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN 6
#define NGX_HTTP_PUSH_STREAM_STATISTICS_MODE 7
#define NGX_HTTP_PUSH_STREAM_WEBSOCKET_VERSION_8 8
......
......@@ -60,9 +60,6 @@ static char * ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t
// subscriber
static char * ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// websockets
static char * ngx_http_push_stream_websocket(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// setup
static char * ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *));
static ngx_int_t ngx_http_push_stream_init_module(ngx_cycle_t *cycle);
......
......@@ -129,7 +129,7 @@ http {
location ~ /ws/(.*) {
# activate websocket mode for this location
push_stream_websocket;
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $1;
......
......@@ -78,7 +78,7 @@ describe "Setup Parameters" do
end
it "should not accept an invalid push mode" do
nginx_test_configuration({:subscriber_mode => "unknown"}).should include("invalid push_stream_subscriber mode value: unknown, accepted values (streaming, polling, long-polling, eventsource)")
nginx_test_configuration({:subscriber_mode => "unknown"}).should include("invalid push_stream_subscriber mode value: unknown, accepted values (streaming, polling, long-polling, eventsource, websocket)")
end
it "should accept the known push modes" do
......@@ -87,6 +87,7 @@ describe "Setup Parameters" do
nginx_test_configuration({:subscriber_mode => "polling"}).should_not include("invalid push_stream_subscriber mode value")
nginx_test_configuration({:subscriber_mode => "long-polling"}).should_not include("invalid push_stream_subscriber mode value")
nginx_test_configuration({:subscriber_mode => "eventsource"}).should_not include("invalid push_stream_subscriber mode value")
nginx_test_configuration({:subscriber_mode => "websocket"}).should_not include("invalid push_stream_subscriber mode value")
end
it "should not accept an invalid publisher mode" do
......
......@@ -9,7 +9,7 @@ describe "Subscriber WebSocket" do
:extra_location => %q{
location ~ /ws/(.*)? {
# activate websocket mode for this location
push_stream_websocket;
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $1;
......@@ -284,7 +284,7 @@ describe "Subscriber WebSocket" do
:extra_location => %q{
location ~ /ws/(.*)? {
# activate websocket mode for this location
push_stream_websocket;
push_stream_subscriber websocket;
# positional channel path
push_stream_channels_path $1;
......
......@@ -46,12 +46,6 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, location_type),
NULL },
{ ngx_string("push_stream_websocket"),
NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
ngx_http_push_stream_websocket,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
/* Main directives*/
{ ngx_string("push_stream_shared_memory_size"),
......@@ -724,7 +718,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// formatting header and footer template for chunk transfer
if (conf->header_template.len > 0) {
ngx_str_t *aux = NULL;
if (conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
if (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->header_template.data, conf->header_template.len, cf->pool);
} else {
aux = ngx_http_push_stream_get_formatted_chunk(conf->header_template.data, conf->header_template.len, cf->pool);
......@@ -740,7 +734,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
if (conf->footer_template.len > 0) {
ngx_str_t *aux = NULL;
if (conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
if (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->footer_template.data, conf->footer_template.len, cf->pool);
} else {
aux = ngx_http_push_stream_get_formatted_chunk(conf->footer_template.data, conf->footer_template.len, cf->pool);
......@@ -758,8 +752,8 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_POLLING) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE)) {
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE), (conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE));
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET)) {
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE), (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET));
if (conf->padding_by_user_agent.len > 0) {
if ((conf->paddings = ngx_http_push_stream_parse_paddings(cf, &conf->padding_by_user_agent)) == NULL) {
......@@ -851,31 +845,28 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
*field = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING;
} else if ((value.len == NGX_HTTP_PUSH_STREAM_MODE_EVENTSOURCE.len) && (ngx_strncasecmp(value.data, NGX_HTTP_PUSH_STREAM_MODE_EVENTSOURCE.data, NGX_HTTP_PUSH_STREAM_MODE_EVENTSOURCE.len) == 0)) {
*field = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE;
} else if ((value.len == NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET.len) && (ngx_strncasecmp(value.data, NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET.data, NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET.len) == 0)) {
*field = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET;
} else {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid push_stream_subscriber mode value: %V, accepted values (%s, %s, %s, %s)", &value, NGX_HTTP_PUSH_STREAM_MODE_STREAMING.data, NGX_HTTP_PUSH_STREAM_MODE_POLLING.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.data, NGX_HTTP_PUSH_STREAM_MODE_EVENTSOURCE.data);
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid push_stream_subscriber mode value: %V, accepted values (%V, %V, %V, %V, %V)", &value, &NGX_HTTP_PUSH_STREAM_MODE_STREAMING, &NGX_HTTP_PUSH_STREAM_MODE_POLLING, &NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING, &NGX_HTTP_PUSH_STREAM_MODE_EVENTSOURCE, &NGX_HTTP_PUSH_STREAM_MODE_WEBSOCKET);
return NGX_CONF_ERROR;
}
}
return ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_subscriber_handler);
}
static char *
ngx_http_push_stream_websocket(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_websocket_handler);
if (*field == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_websocket_handler);
#if (NGX_HAVE_SHA1)
if (rc == NGX_CONF_OK) {
ngx_http_push_stream_loc_conf_t *pslcf = conf;
pslcf->location_type = NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE;
}
if (rc == NGX_CONF_OK) {
ngx_http_push_stream_loc_conf_t *pslcf = conf;
pslcf->location_type = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET;
}
#else
rc = NGX_CONF_ERROR;
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: sha1 support is needed to use WebSocket");
rc = NGX_CONF_ERROR;
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: sha1 support is needed to use WebSocket");
#endif
return rc;
return rc;
}
return ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_subscriber_handler);
}
......
......@@ -628,7 +628,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
}
if (rc == NGX_OK) {
if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_CLOSE_LAST_FRAME_BYTE), 1);
} else {
rc = ngx_http_send_special(r, NGX_HTTP_LAST | NGX_HTTP_FLUSH);
......@@ -943,7 +943,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK.len, 0);
} else if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
} else if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
} else {
rc = ngx_http_push_stream_send_response_message(r, NULL, ngx_http_push_stream_ping_msg, 1, 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment