Commit f0f2b472 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding polling and long-polling support

parent 78409d82
* Adding Polling support
* Adding Long Polling support
h2. Version 0.2.7
* Adding uptime information for server and workers on statistics
......
......@@ -174,7 +174,7 @@ h3(#directives). Directives
(head). | directive | default value | values | context | location |
|push_stream_channels_statistics|-|-|location|-|
|push_stream_publisher|-|-|location|-|
|push_stream_subscriber|-|-|location|-|
|push_stream_subscriber|streaming|streaming, polling, long-polling|location|-|
|push_stream_max_reserved_memory|16 * ngx_pagesize|size greater than 16 * ngx_pagesize|http|main nginx configuration|
|push_stream_memory_cleanup_timeout|30 seconds|time constant|http|main nginx configuration|
|push_stream_channel_deleted_message_text|"Channel deleted"|any string|http|main nginx configuration|
......@@ -247,10 +247,45 @@ POST, publish a message to the channel
h4(#push_stream_subscriber). push_stream_subscriber
default: streaming
context: location
values: streaming, polling, long-polling
Defines a location as a subscriber. This location represents a subscriber's interface to a channel's message queue.
This location only supports GET http method to receive published messages in a stream.
This location only supports GET http method to receive published messages.
And has three possible values to set push mode: streaming, polling, long-polling. The default values is streaming.
The polling and long-polling modes could be set by the request header *X-Nginx-PushStream-Mode* overriding push_stream_subscriber directive.
<pre>
<code>
location /sub/(.*) {
push_stream_subscriber;
# positional channel path
set $push_stream_channels_path $1;
}
curl localhost/sub/ch1 -H 'X-Nginx-PushStream-Mode:polling' #polling request on a streaming location
curl localhost/sub/ch1 -H 'X-Nginx-PushStream-Mode:long-polling' #long-polling request on a streaming location
location /sub/(.*) {
push_stream_subscriber polling;
# positional channel path
set $push_stream_channels_path $1;
}
curl localhost/sub/ch1 #polling request
curl localhost/sub/ch1 -H 'X-Nginx-PushStream-Mode:long-polling' #long-polling request on a polling location
location /sub/(.*) {
push_stream_subscriber long-polling;
# positional channel path
set $push_stream_channels_path $1;
}
curl localhost/sub/ch1 #long-polling request
curl localhost/sub/ch1 -H 'X-Nginx-PushStream-Mode:polling' #polling request on a logn-polling location
</code>
</pre>
h3(#functionality). Functionality
......
......@@ -73,6 +73,7 @@ typedef struct {
ngx_uint_t keepalive;
ngx_uint_t publisher_admin;
ngx_flag_t subscriber_eventsource;
ngx_uint_t subscriber_mode;
} ngx_http_push_stream_loc_conf_t;
// shared memory segment name
......@@ -86,6 +87,7 @@ typedef struct {
ngx_flag_t deleted;
ngx_int_t id;
ngx_str_t *raw;
ngx_int_t tag;
ngx_str_t *event_id;
ngx_str_t *event_id_message;
ngx_str_t *formatted_messages;
......@@ -97,6 +99,7 @@ typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_su
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_http_request_t *request;
ngx_flag_t longpolling;
} ngx_http_push_stream_subscriber_t;
typedef struct {
......@@ -111,6 +114,8 @@ typedef struct {
ngx_rbtree_node_t node; // this MUST be first
ngx_str_t id;
ngx_uint_t last_message_id;
time_t last_message_time;
ngx_int_t last_message_tag;
ngx_uint_t stored_messages;
ngx_uint_t subscribers;
ngx_http_push_stream_pid_queue_t workers_with_subscribers;
......@@ -219,8 +224,22 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID = ngx_string("Event
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_LAST_EVENT_ID = ngx_string("Last-Event-Id");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ALLOW = ngx_string("Allow");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EXPLAIN = ngx_string("X-Nginx-PushStream-Explain");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_MODE = ngx_string("X-Nginx-PushStream-Mode");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING = ngx_string("Transfer-Encoding");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED = ngx_string("chunked");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ETAG = ngx_string("Etag");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH = ngx_string("If-None-Match");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_VARY = ngx_string("Vary");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_STREAMING = ngx_string("streaming");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_POLLING = ngx_string("polling");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING = ngx_string("long-polling");
#define NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING 0
#define NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_POLLING 1
#define NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING 2
// other stuff
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_DELETE_METHODS = ngx_string("GET, POST, DELETE");
......
......@@ -37,7 +37,4 @@ ngx_http_push_stream_requested_channel_t * ngx_http_push_stream_parse_channels_i
static void ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data);
static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_str_t *last_event_id, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_ */
......@@ -30,8 +30,8 @@
#include <ngx_http_push_stream_module_ipc.h>
typedef struct {
ngx_queue_t queue;
ngx_str_t *line;
ngx_queue_t queue;
ngx_str_t *line;
} ngx_http_push_stream_line_t;
typedef struct {
......@@ -215,6 +215,7 @@ ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
// general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool);
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
......
......@@ -449,7 +449,20 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
// now let's respond to some requests!
while ((cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_http_push_stream_send_response_message(cur->request, channel, msg);
if (cur->longpolling) {
ngx_http_push_stream_subscriber_t *prev = (ngx_http_push_stream_subscriber_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_add_polling_headers(cur->request, msg->time, msg->tag, cur->request->pool);
ngx_http_send_header(cur->request);
ngx_http_push_stream_send_response_content_header(cur->request, ngx_http_get_module_loc_conf(cur->request, ngx_http_push_stream_module));
ngx_http_push_stream_send_response_message(cur->request, channel, msg);
ngx_http_push_stream_send_response_finalize(cur->request);
cur = prev;
} else {
ngx_http_push_stream_send_response_message(cur->request, channel, msg);
}
}
}
......
......@@ -207,7 +207,9 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// put messages on the queue
if (cf->store_messages) {
// tag message with time stamp and a sequence tag
msg->time = ngx_time();
msg->tag = (msg->time == channel->last_message_time) ? (channel->last_message_tag + 1) : 0;
// set message expiration time
msg->expires = (cf->buffer_timeout == NGX_CONF_UNSET ? 0 : (ngx_time() + cf->buffer_timeout));
ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue);
......@@ -215,6 +217,9 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, cf->max_messages, 0);
channel->last_message_time = msg->time;
channel->last_message_tag = msg->tag;
}
ngx_shmtx_unlock(&shpool->mutex);
......
......@@ -39,10 +39,10 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
0,
NULL },
{ ngx_string("push_stream_subscriber"),
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS|NGX_CONF_TAKE1,
ngx_http_push_stream_subscriber,
NGX_HTTP_LOC_CONF_OFFSET,
0,
offsetof(ngx_http_push_stream_loc_conf_t, subscriber_mode),
NULL },
{ ngx_string("push_stream_max_reserved_memory"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
......@@ -404,6 +404,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->keepalive = NGX_CONF_UNSET_UINT;
lcf->publisher_admin = NGX_CONF_UNSET_UINT;
lcf->subscriber_eventsource = NGX_CONF_UNSET_UINT;
lcf->subscriber_mode = NGX_CONF_UNSET_UINT;
return lcf;
}
......@@ -650,6 +651,26 @@ ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
static char *
ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_int_t *field = (ngx_int_t *) ((char *) conf + cmd->offset);
if (*field != NGX_CONF_UNSET) {
return "is duplicate";
}
*field = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING; //default
if(cf->args->nelts > 1) {
ngx_str_t value = (((ngx_str_t *) cf->args->elts)[1]);
if ((value.len == NGX_HTTP_PUSH_STREAM_MODE_STREAMING.len) && (ngx_strncasecmp(value.data, NGX_HTTP_PUSH_STREAM_MODE_STREAMING.data, NGX_HTTP_PUSH_STREAM_MODE_STREAMING.len) == 0)) {
*field = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING;
} else if ((value.len == NGX_HTTP_PUSH_STREAM_MODE_POLLING.len) && (ngx_strncasecmp(value.data, NGX_HTTP_PUSH_STREAM_MODE_POLLING.data, NGX_HTTP_PUSH_STREAM_MODE_POLLING.len) == 0)) {
*field = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_POLLING;
} else if ((value.len == NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) && (ngx_strncasecmp(value.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) == 0)) {
*field = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING;
} else {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid push_stream_subscriber mode value: %V, accepted values (%s, %s, %s)", &value, NGX_HTTP_PUSH_STREAM_MODE_STREAMING.data, NGX_HTTP_PUSH_STREAM_MODE_POLLING.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.data);
return NGX_CONF_ERROR;
}
}
char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_subscriber_handler);
if (rc == NGX_CONF_OK) {
......
......@@ -1113,3 +1113,20 @@ ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_
return result;
}
static void
ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool)
{
if (last_modified_time > 0) {
r->headers_out.last_modified_time = last_modified_time;
}
if (tag >= 0) {
ngx_str_t *etag = ngx_http_push_stream_create_str(temp_pool, NGX_INT_T_LEN);
if (etag != NULL) {
ngx_sprintf(etag->data, "%ui", tag);
etag->len = ngx_strlen(etag->data);
r->headers_out.etag = ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ETAG, etag);
}
}
}
......@@ -83,6 +83,8 @@ ngx_http_push_stream_initialize_channel(ngx_http_push_stream_channel_t *channel)
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
channel->last_message_id = 0;
channel->last_message_time = 0;
channel->last_message_tag = 0;
channel->stored_messages = 0;
channel->subscribers = 0;
channel->deleted = 0;
......
......@@ -155,6 +155,7 @@ module BaseTestCase
@channel_deleted_message_text = nil
@ping_message_text = nil
@subscriber_eventsource = 'off'
@subscriber_mode = nil
self.send(:global_configuration) if self.respond_to?(:global_configuration)
end
......@@ -285,7 +286,7 @@ http {
location ~ /sub/(.*)? {
# activate subscriber mode for this location
push_stream_subscriber;
push_stream_subscriber <%= @subscriber_mode unless @subscriber_mode.nil? || @subscriber_mode == "streaming" %>;
# activate event source support for this location
<%= "push_stream_subscriber_eventsource #{@subscriber_eventsource};" unless @subscriber_eventsource.nil? %>
......
......@@ -176,4 +176,49 @@ class TestSetuParameters < Test::Unit::TestCase
ensure
self.stop_server
end
def test_invalid_push_mode
expected_error_message = "invalid push_stream_subscriber mode value: unknown, accepted values (streaming, polling, long-polling)"
@subscriber_mode = "unknown"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_valid_push_mode
expected_error_message = "invalid push_stream_subscriber mode value"
@subscriber_mode = ""
self.create_config_file
stderr_msg = self.start_server
assert(!stderr_msg.include?(expected_error_message), "Message error founded: '#{ stderr_msg }'")
self.stop_server
@subscriber_mode = "streaming"
self.create_config_file
stderr_msg = self.start_server
assert(!stderr_msg.include?(expected_error_message), "Message error founded: '#{ stderr_msg }'")
self.stop_server
@subscriber_mode = "polling"
self.create_config_file
stderr_msg = self.start_server
assert(!stderr_msg.include?(expected_error_message), "Message error founded: '#{ stderr_msg }'")
self.stop_server
@subscriber_mode = "long-polling"
self.create_config_file
stderr_msg = self.start_server
assert(!stderr_msg.include?(expected_error_message), "Message error founded: '#{ stderr_msg }'")
self.stop_server
end
end
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment