Commit 67fc7354 authored by Wandenberg's avatar Wandenberg

make possible post messages in some channels or delete them at once specifying...

make possible post messages in some channels or delete them at once specifying their ids on push_stream_channels_path diretive separated by a slash
parent 65098bb9
h1(#changelog). Changelog
* Added support to get channels statistics, delete channels and publish message to some channels specifying their ids on push_stream_channels_path
* Avoid reapply formatter to header, message or footer template when inside an if on event source mode
* Added support for OPTIONS method on publisher location
* Unified longPollingTimeout and timeout configurations on javascript client
......@@ -28,7 +29,7 @@ h1(#changelog). Changelog
* Changed push_stream_eventsource_support directive to be a subtype of push_stream_subscriber directive
* Fix to support gzip usage
* Added the feature to send a custom 'channel delete message' on the body of the DELETE request
* Changed push_stream_channel_id variable to directive, and make possible set it inside an if block
* Removed push_stream_channel_id variable, use the push_stream_channels_path instead of it
* Changed push_stream_channels_path variable to directive, and make possible set it inside an if block
* Back to use Nginx chunked filter
......
......@@ -31,7 +31,7 @@ h1(#basic-configuration). Basic Configuration
push_stream_channels_statistics;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location /pub {
......@@ -39,7 +39,7 @@ h1(#basic-configuration). Basic Configuration
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location ~ /sub/(.*) {
......@@ -120,8 +120,7 @@ h1(#directives). Directives
| "push_stream_max_number_of_channels":push_stream_max_number_of_channels |   - |   x |   - |   - |   - |   - |
| "push_stream_max_number_of_wildcard_channels":push_stream_max_number_of_wildcard_channels |   - |   x |   - |   - |   - |   - |
| "push_stream_wildcard_channel_prefix":push_stream_wildcard_channel_prefix |   - |   x |   - |   - |   - |   - |
| "push_stream_channel_id":push_stream_channel_id |   - |   - |   - |   - |   - |   x |
| "push_stream_channels_path":push_stream_channels_path |   - |   - |   x |   x |   x |   - |
| "push_stream_channels_path":push_stream_channels_path |   - |   - |   x |   x |   x |   x |
| "push_stream_authorized_channels_only":push_stream_authorized_channels_only |   - |   - |   x |   - |   - |   x |
| "push_stream_header_template":push_stream_header_template |   - |   - |   x |   - |   - |   x |
| "push_stream_message_template":push_stream_message_template |   - |   - |   x |   - |   - |   x |
......@@ -225,7 +224,6 @@ h1(#contributors). Contributors
[push_stream_max_number_of_channels]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_max_number_of_channels
[push_stream_max_number_of_wildcard_channels]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_max_number_of_wildcard_channels
[push_stream_wildcard_channel_prefix]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/main.textile#push_stream_wildcard_channel_prefix
[push_stream_channel_id]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/publishers.textile#push_stream_channel_id
[push_stream_channels_path]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_channels_path
[push_stream_authorized_channels_only]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_authorized_channels_only
[push_stream_header_template]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_header_template
......
......@@ -9,17 +9,17 @@ h2(#push_stream_channels_statistics). push_stream_channels_statistics <a name="p
*release version:* _0.2.0_
Defines a location as a source of statistics. You can use this location to get statistics about a specific, group or all channels, in a resumed ou summarized way.
To get statistics about all channels in a summarized way you have to make a GET in this location without specify a name in the push_stream_channel_id directive.
To get statistics about all channels in a detailed way you have to specify "ALL" in the push_stream_channel_id.
To get statistics about prefixed channels in a detailed way you have to specify "_prefix_*" in the push_stream_channel_id.
To get statistics about a channel you have to specify the name in the push_stream_channel_id.
To get statistics about all channels in a summarized way you have to make a GET in this location without specify a name in the push_stream_channels_path directive.
To get statistics about all channels in a detailed way you have to specify "ALL" in the push_stream_channels_path.
To get statistics about prefixed channels in a detailed way you have to specify "_prefix_*" in the push_stream_channels_path.
To get statistics about a channel you have to specify the name in the push_stream_channels_path.
You can get statistics in the formats plain, xml, yaml and json. The default is json, to change this behavior you can use *Accept* header parameter passing values like "text/plain", "application/xml", "application/yaml" and "application/json" respectivelly.
<pre>
location /channels-stats {
push_stream_channels_statistics;
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
# /channels-stats -> get statistics about all channels in a summarized way
......
......@@ -18,7 +18,7 @@ DELETE, remove any existent stored messages, disconnect any subscriber, and dele
# normal publisher location
location /pub {
push_stream_publisher;
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
# GET /pub?id=channel_id -> get statistics about a channel
......@@ -27,7 +27,7 @@ DELETE, remove any existent stored messages, disconnect any subscriber, and dele
# admin publisher location
location /pub_admin {
push_stream_publisher admin;
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
# GET /pub_admin?id=channel_id -> get statistics about a channel
......@@ -36,7 +36,7 @@ DELETE, remove any existent stored messages, disconnect any subscriber, and dele
</pre>
h2(#push_stream_channel_id). push_stream_channel_id <a name="push_stream_channel_id" href="#">&nbsp;</a>
h2(#push_stream_channels_path). push_stream_channels_path <a name="push_stream_channels_path" href="#">&nbsp;</a>
*values:* _channel id_
......@@ -45,7 +45,7 @@ h2(#push_stream_channel_id). push_stream_channel_id <a name="push_stream_channel
A string to uniquely identify a communication channel. Must be present on location of the push_stream_publisher and push_stream_channels_statistics.
<pre>
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
#channel id is now the url query string parameter "id"
#(/pub?id=channel_id_string or /channels-stats?id=channel_id_string)
</pre>
......
......@@ -77,6 +77,8 @@ Backtrack isn't needed, you can only sign channels without get old messages, or
More accepted examples: _/channel1_ , _/channel1/channel2_ , _/channel1.b5/channel2_ , _/channel1/channel2.b6_ , ...
Must be present on location of the push_stream_subscriber.
"*How can it be used on a publisher location?*":push_stream_channels_path
<pre>
location /sub/(.*) {
push_stream_channels_path $1;
......@@ -275,3 +277,4 @@ Set the value used on the Access-Control-Allow-Origin header to allow cross doma
[eventsource_ref]http://dev.w3.org/html5/eventsource/
[push_stream_authorized_channels_only]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/subscribers.textile#push_stream_authorized_channels_only
[push_stream_channels_path]https://github.com/wandenberg/nginx-push-stream-module/blob/master/docs/directives/publishers.textile#push_stream_channels_path
......@@ -14,7 +14,7 @@ Create a html page with the content on **Client** part, access it from browser a
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location ~ /ev/(.*) {
......@@ -79,7 +79,7 @@ If needed you can change this behavior changing the javascript usage, like the e
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location /ev {
......
......@@ -14,7 +14,7 @@ Create a html page with the content on **Client** part, access it from browser a
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location ~ /sub/(.*) {
......@@ -86,7 +86,7 @@ If needed you can change this behavior changing the javascript usage, like the e
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location /sub {
......
......@@ -14,7 +14,7 @@ Create a html page with the content on **Client** part, access it from browser a
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location ~ /lp/(.*) {
......@@ -80,7 +80,7 @@ If needed you can change this behavior changing the javascript usage, like the e
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location /lp {
......@@ -148,7 +148,7 @@ If needed you can change this behavior using some additional directives and chan
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location ~ /lp/(.*) {
......@@ -227,7 +227,7 @@ _The configuration in the example is the same used on long polling, just forcing
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location ~ /lp/(.*) {
......
......@@ -14,7 +14,7 @@ Create a html page with the content on **Client** part, access it from browser a
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location ~ /ws/(.*) {
......@@ -81,7 +81,7 @@ If needed you can change this behavior changing the javascript usage, like the e
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location /ws {
......
......@@ -66,7 +66,6 @@ typedef struct {
} ngx_http_push_stream_main_conf_t;
typedef struct {
ngx_http_complex_value_t *channel_id;
ngx_http_complex_value_t *channels_path;
ngx_uint_t authorized_channels_only;
ngx_flag_t store_messages;
......@@ -227,8 +226,6 @@ ngx_http_push_stream_main_conf_t *ngx_http_push_stream_module_main_conf = NULL;
ngx_str_t **ngx_http_push_stream_module_paddings_chunks = NULL;
// channel
static ngx_str_t * ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf);
static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix);
static ngx_int_t ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channels);
......
......@@ -269,7 +269,7 @@ static void ngx_http_push_stream_worker_subscriber_cleanup_locke
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool);
static ngx_flag_t ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
......
......@@ -70,7 +70,7 @@ http {
push_stream_channels_statistics;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location /pub {
......@@ -78,7 +78,7 @@ http {
push_stream_publisher admin;
# query string based channel id
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
# store messages in memory
push_stream_store_messages off;
......
......@@ -160,8 +160,7 @@ http {
# activate publisher mode for this location
push_stream_publisher <%= publisher_mode unless publisher_mode.nil? || publisher_mode == "normal" %>;
# query string based channel id
<%= write_directive("push_stream_channel_id", channel_id) %>
<%= write_directive("push_stream_channels_path", channels_path_for_pub) %>
<%= write_directive("push_stream_store_messages", store_messages, "store messages") %>
<%= write_directive("push_stream_channel_info_on_publish", channel_info_on_publish, "channel_info_on_publish") %>
......
......@@ -406,9 +406,9 @@ describe "Publisher Properties" do
location /pub2 {
push_stream_publisher #{config[:publisher_mode]};
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
if ($arg_test) {
push_stream_channel_id test_$arg_id;
push_stream_channels_path test_$arg_id;
}
}
}
......@@ -457,7 +457,7 @@ describe "Publisher Properties" do
:extra_location => %{
location /pub2 {
push_stream_publisher #{config[:publisher_mode]};
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
push_stream_store_messages off;
if ($arg_test) {
......@@ -562,6 +562,38 @@ describe "Publisher Properties" do
end
end
end
it "should published message on different channels on same post" do
body = 'body'
channel = 'ch_test_publish_message_on_different_channels_on_same_post'
messages = 0
nginx_run_server(config.merge({:message_template => "~text~|~channel~", :header_template => nil})) do |conf|
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + "_1").get :head => headers
sub_1.stream do |chunk|
chunk.should eql("#{body}|#{channel.to_s + "_1"}\r\n")
messages += 1
end
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + "_2").get :head => headers
sub_2.stream do |chunk|
chunk.should eql("#{body}|#{channel.to_s + "_2"}\r\n")
messages += 1
end
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + "_3").get :head => headers
sub_3.stream do |chunk|
chunk.should eql("#{body}|#{channel.to_s + "_3"}\r\n")
messages += 1
end
EM.add_periodic_timer(0.5) { EventMachine.stop if messages >= 3 }
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s + '_1/' + channel.to_s + '_2/' + channel.to_s + '_3').post :head => headers, :body => body
end
end
end
end
context "when is on normal mode" do
......@@ -1052,5 +1084,60 @@ describe "Publisher Properties" do
end
end
it "should delete channels on same request" do
body = 'published message'
nginx_run_server(config) do |conf|
publish_message("ch1", headers, body)
publish_message("ch2", headers, body)
EventMachine.run do
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=ch1/ch2').delete :head => headers
pub.callback do
pub.should be_http_status(200).without_body
pub.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'].should eql("Channel deleted.")
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
stats.callback do
stats.should be_http_status(200).with_body
response = JSON.parse(stats.response)
response["channels"].to_s.should_not be_empty
response["channels"].to_i.should eql(0)
EventMachine.stop
end
end
end
end
end
it "should delete channels on same request even when one of them does not exists" do
body = 'published message'
nginx_run_server(config) do |conf|
publish_message("ch1", headers, body)
publish_message("ch2", headers, body)
publish_message("ch3", headers, body)
EventMachine.run do
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=ch3/ch4/ch1').delete :head => headers
pub.callback do
pub.should be_http_status(200).without_body
pub.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'].should eql("Channel deleted.")
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=ALL').get :head => headers
stats.callback do
stats.should be_http_status(200).with_body
response = JSON.parse(stats.response)
response["channels"].to_s.should_not be_empty
response["channels"].to_i.should eql(1)
response["infos"][0]["channel"].should eql("ch2")
response["infos"][0]["published_messages"].should eql("1")
response["infos"][0]["stored_messages"].should eql("1")
EventMachine.stop
end
end
end
end
end
end
end
......@@ -60,12 +60,12 @@ http {
location /channels-stats {
push_stream_channels_statistics;
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
}
location /pub {
push_stream_publisher admin;
push_stream_channel_id $arg_id;
push_stream_channels_path $arg_id;
push_stream_store_messages off;
}
......
......@@ -32,34 +32,6 @@
#include <ngx_http_push_stream_module_subscriber.c>
#include <ngx_http_push_stream_module_websocket.c>
static ngx_str_t *
ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf)
{
ngx_str_t vv = ngx_null_string;
ngx_str_t *id;
ngx_http_push_stream_complex_value(r, cf->channel_id, &vv);
if (vv.len == 0) {
return NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID;
}
// maximum length limiter for channel id
if ((ngx_http_push_stream_module_main_conf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (vv.len > ngx_http_push_stream_module_main_conf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", vv.len);
return NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID;
}
if ((id = ngx_http_push_stream_create_str(r->pool, vv.len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for $push_stream_channel_id string");
return NULL;
}
ngx_memcpy(id->data, vv.data, vv.len);
return id;
}
static ngx_str_t *
ngx_http_push_stream_channel_info_formatted(ngx_pool_t *pool, const ngx_str_t *format, ngx_str_t *id, ngx_uint_t published_messages, ngx_uint_t stored_messages, ngx_uint_t subscribers)
{
......@@ -83,24 +55,6 @@ ngx_http_push_stream_channel_info_formatted(ngx_pool_t *pool, const ngx_str_t *f
}
// print information about a channel
static ngx_int_t
ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel)
{
ngx_str_t *text;
ngx_http_push_stream_content_subtype_t *subtype;
subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
text = ngx_http_push_stream_channel_info_formatted(r->pool, subtype->format_item, &channel->id, channel->last_message_id, channel->stored_messages, channel->subscribers);
if (text == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK);
}
static ngx_int_t
ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r)
{
......
......@@ -31,9 +31,11 @@ static ngx_int_t ngx_http_push_stream_publisher_handle_after_read_body(ngx_ht
static ngx_int_t
ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
{
ngx_str_t *id = NULL;
ngx_http_push_stream_channel_t *channel = NULL;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_module_ctx_t *ctx;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_http_push_stream_set_expires(r, NGX_HTTP_PUSH_STREAM_EXPIRES_EPOCH, 0);
......@@ -59,53 +61,57 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
}
// channel id is required
id = ngx_http_push_stream_get_channel_id(r, cf);
if ((id == NULL) || (id == NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID) || (id == NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID)) {
if (id == NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: the push_stream_channel_id is required but is not set");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
}
if (id == NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
}
return NGX_HTTP_INTERNAL_SERVER_ERROR;
if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
}
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
//get channels ids
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, r->pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the push_stream_channels_path is required but is not set");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
}
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
// check if channel id isn't equals to ALL or contain wildcard
if ((ngx_memn2cmp(id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(id->data, '*') != NULL)) {
if ((ngx_memn2cmp(cur->id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data, cur->id->len, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.len) == 0) || (ngx_strchr(cur->id->data, '*') != NULL)) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE);
}
// create the channel if doesn't exist
channel = ngx_http_push_stream_get_channel(id, r->connection->log, cf);
if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate memory for new channel");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
// could not have a large size
if ((ngx_http_push_stream_module_main_conf->max_channel_id_length != NGX_CONF_UNSET_UINT) && (cur->id->len > ngx_http_push_stream_module_main_conf->max_channel_id_length)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: channel id is larger than allowed %d", cur->id->len);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE);
}
if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: number of channels were exceeded");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE);
}
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
// create the channel if doesn't exist
channel = ngx_http_push_stream_get_channel(cur->id, r->connection->log, cf);
if (channel == NULL) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate memory for new channel");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
}
return ngx_http_push_stream_publisher_handle_after_read_body(r, ngx_http_push_stream_publisher_body_handler);
if (channel == NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: number of channels were exceeded");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE);
}
}
}
// search for a existing channel with this id
channel = ngx_http_push_stream_find_channel(id, r->connection->log);
ctx->requested_channels = channels_ids;
// GET or DELETE only make sense with a previous existing channel
if (channel == NULL) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_FOUND, NULL);
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
return ngx_http_push_stream_publisher_handle_after_read_body(r, ngx_http_push_stream_publisher_body_handler);
}
if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && (r->method == NGX_HTTP_DELETE)) {
return ngx_http_push_stream_publisher_handle_after_read_body(r, ngx_http_push_stream_publisher_delete_handler);
}
return ngx_http_push_stream_send_response_channel_info(r, channel);
return ngx_http_push_stream_send_response_channels_info_detailed(r, channels_ids);
}
static ngx_int_t
......@@ -176,11 +182,14 @@ ngx_http_push_stream_read_request_body_to_buffer(ngx_http_request_t *r)
static void
ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
{
ngx_str_t *id;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL;
u_char *text = ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data;
size_t len = ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len;
ngx_uint_t qtd_channels = 0;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *cur = &ctx->requested_channels->queue;
if (r->headers_in.content_length_n > 0) {
......@@ -194,25 +203,32 @@ ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
len = ngx_buf_size(buf);
}
// get and check channel id value
id = ngx_http_push_stream_get_channel_id(r, cf);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NULL, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large");
while ((cur = ngx_queue_next(cur)) != &ctx->requested_channels->queue) {
requested_channel = ngx_queue_data(cur, ngx_http_push_stream_requested_channel_t, queue);
if (ngx_http_push_stream_delete_channel(requested_channel->id, text, len, r->pool)) {
qtd_channels++;
}
}
ngx_http_push_stream_delete_channel(id, text, len, r->pool);
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
if (qtd_channels == 0) {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_FOUND, NULL);
} else {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
}
}
static void
ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
{
ngx_str_t *id;
ngx_str_t *event_id, *event_type;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_requested_channel_t *requested_channel;
ngx_queue_t *cur = &ctx->requested_channels->queue;
// check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: Post request was sent with no message");
......@@ -223,11 +239,6 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
// get and check if has access to request body
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(r->request_body->bufs, NULL, r, "push stream module: unexpected publisher message request body buffer location. please report this to the push stream module developers.");
// get and check channel id value
id = ngx_http_push_stream_get_channel_id(r, cf);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NULL, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large");
// copy request body to a memory buffer
buf = ngx_http_push_stream_read_request_body_to_buffer(r);
......@@ -236,14 +247,18 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
event_type = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE);
channel = ngx_http_push_stream_add_msg_to_channel(r, id, buf->pos, ngx_buf_size(buf), event_id, event_type, r->pool);
if (channel == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
while ((cur = ngx_queue_next(cur)) != &ctx->requested_channels->queue) {
requested_channel = ngx_queue_data(cur, ngx_http_push_stream_requested_channel_t, queue);
channel = ngx_http_push_stream_add_msg_to_channel(r, requested_channel->id, buf->pos, ngx_buf_size(buf), event_id, event_type, r->pool);
if (channel == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
if (cf->channel_info_on_publish) {
ngx_http_push_stream_send_response_channel_info(r, channel);
ngx_http_push_stream_send_response_channels_info_detailed(r, ctx->requested_channels);
} else {
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, NULL);
}
......
......@@ -116,12 +116,6 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NULL },
/* Location directives */
{ ngx_string("push_stream_channel_id"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, channel_id),
NULL },
{ ngx_string("push_stream_channels_path"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
......@@ -509,7 +503,6 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
return NGX_CONF_ERROR;
}
lcf->channel_id = NULL;
lcf->channels_path = NULL;
lcf->authorized_channels_only = NGX_CONF_UNSET_UINT;
lcf->store_messages = NGX_CONF_UNSET_UINT;
......@@ -556,10 +549,6 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_str_value(conf->allowed_origins, prev->allowed_origins, NGX_HTTP_PUSH_STREAM_DEFAULT_ALLOWED_ORIGINS);
ngx_conf_merge_uint_value(conf->location_type, prev->location_type, NGX_CONF_UNSET_UINT);
if (conf->channel_id == NULL) {
conf->channel_id = prev->channel_id;
}
if (conf->channels_path == NULL) {
conf->channels_path = prev->channels_path;
}
......@@ -584,18 +573,9 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
return NGX_CONF_OK;
}
if ((conf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_NORMAL) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN)) {
if (conf->channel_id == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_channel_id must be set on statistics and publisher location");
return NGX_CONF_ERROR;
}
} else {
if (conf->channels_path == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_channels_path must be set on statistics and publisher location");
return NGX_CONF_ERROR;
}
if (conf->channels_path == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_channels_path must be set on statistics and publisher location");
return NGX_CONF_ERROR;
}
// changing properties for event source support
......
......@@ -672,7 +672,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_
ngx_http_finalize_request(r, NGX_DONE);
}
static void
static ngx_flag_t
ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_channel_t *channel;
......@@ -703,7 +703,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(text, len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to channel deleted message");
return;
return 0;
}
// send signal to each worker with subscriber to this channel
......@@ -720,6 +720,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx
}
ngx_shmtx_unlock(&(shpool)->mutex);
return (channel != NULL);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment