Commit fcef9938 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding directive to limit the number of subscribers per channel

parent ee1c38b3
......@@ -197,6 +197,7 @@ h3(#directives). Directives
|push_stream_ping_message_text|""|any string|http|main nginx configuration|
|push_stream_ping_message_interval|unset|time constant|http|main nginx configuration|
|push_stream_message_ttl|unset|time constant|http|main nginx configuration|
|push_stream_max_subscribers_per_channel|unset|number|http|main nginx configuration|
|push_stream_max_messages_stored_per_channel|unset|number|http|main nginx configuration|
|push_stream_max_channel_id_length|unset|number|http|main nginx configuration|
|push_stream_subscriber_connection_ttl|unset|time constant|http|main nginx configuration|
......@@ -365,6 +366,14 @@ location: main nginx configuration
The length of time a message may be queued before it is considered expired. If you do not want messages to expire, just not set this directive.
h4(#push_stream_max_subscribers_per_channel). push_stream_max_subscribers_per_channel [ number ]
default: -
context: http
location: main nginx configuration
The maximum number of subscribers accepted per channel. If you do not want to limit number of subscribers access to channels, just not set this directive.
h4(#push_stream_max_messages_stored_per_channel). push_stream_max_messages_stored_per_channel [ number ]
default: -
......
......@@ -54,6 +54,7 @@ typedef struct {
time_t subscriber_connection_ttl;
ngx_msec_t buffer_cleanup_interval;
time_t message_ttl;
ngx_uint_t max_subscribers_per_channel;
ngx_uint_t max_messages_stored_per_channel;
ngx_uint_t max_channel_id_length;
ngx_http_push_stream_template_queue_t msg_templates;
......@@ -206,6 +207,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE = ngx_string("Empty post requests are not allowed.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE = ngx_string("Channel id is too large.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_MUCH_BROADCAST_CHANNELS = ngx_string("Subscribed too much broadcast channels.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL = ngx_string("Subscribers limit per channel has been exceeded.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS = ngx_string("Subscriber could not create channels.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE = ngx_string("Number of channels were exceeded.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED = ngx_string("Channel deleted.");
......
......@@ -76,6 +76,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, message_ttl),
NULL },
{ ngx_string("push_stream_max_subscribers_per_channel"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, max_subscribers_per_channel),
NULL },
{ ngx_string("push_stream_max_messages_stored_per_channel"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
......@@ -347,6 +353,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
mcf->subscriber_disconnect_interval = NGX_CONF_UNSET_MSEC;
mcf->subscriber_connection_ttl = NGX_CONF_UNSET;
mcf->max_subscribers_per_channel = NGX_CONF_UNSET;
mcf->max_messages_stored_per_channel = NGX_CONF_UNSET_UINT;
mcf->qtd_templates = 0;
ngx_queue_init(&mcf->msg_templates.queue);
......@@ -405,6 +412,12 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
return NGX_CONF_ERROR;
}
// max subscriber per channel cannot be zero
if ((conf->max_subscribers_per_channel != NGX_CONF_UNSET_UINT) && (conf->max_subscribers_per_channel == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_subscribers_per_channel cannot be zero.");
return NGX_CONF_ERROR;
}
// max messages stored per channel cannot be zero
if ((conf->max_messages_stored_per_channel != NGX_CONF_UNSET_UINT) && (conf->max_messages_stored_per_channel == 0)) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_messages_stored_per_channel cannot be zero.");
......
......@@ -73,7 +73,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
}
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
// could not be ALL channel or contain wildcard
......@@ -102,6 +102,12 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS);
}
// check if channel is full of subscribers
if ((mcf->max_subscribers_per_channel != NGX_CONF_UNSET_UINT) && (((channel = ngx_http_push_stream_find_channel(cur->id, r->connection->log)) != NULL) && (channel->subscribers >= mcf->max_subscribers_per_channel))) {
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL);
}
}
// check if number of subscribed broadcast channels is acceptable
......
......@@ -142,6 +142,7 @@ module BaseTestCase
@footer_template = %{</body></html>}
@max_channel_id_length = 200
@max_message_buffer_length = 20
@max_subscribers_per_channel = nil
@max_number_of_broadcast_channels = nil
@max_number_of_channels = nil
@message_template = %{<script>p(~id~,\\'~channel~\\',\\'~text~\\');</script>}
......@@ -264,6 +265,8 @@ http {
<%= "push_stream_max_number_of_channels #{@max_number_of_channels};" unless @max_number_of_channels.nil? %>
<%= "push_stream_max_number_of_broadcast_channels #{@max_number_of_broadcast_channels};" unless @max_number_of_broadcast_channels.nil? %>
# max subscribers per channel
<%= "push_stream_max_subscribers_per_channel #{@max_subscribers_per_channel};" unless @max_subscribers_per_channel.nil? %>
# max messages to store in memory
<%= "push_stream_max_messages_stored_per_channel #{@max_message_buffer_length};" unless @max_message_buffer_length.nil? %>
# message ttl
......
......@@ -52,7 +52,17 @@ class TestSetuParameters < Test::Unit::TestCase
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_max_message_buffer_length_cannot_be_zero
def test_max_subscribers_per_channel_cannot_be_zero
expected_error_message = "push_stream_max_subscribers_per_channel cannot be zero"
@max_subscribers_per_channel = 0
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_max_messages_stored_per_channel_cannot_be_zero
expected_error_message = "push_stream_max_messages_stored_per_channel cannot be zero"
@max_message_buffer_length = 0
......
......@@ -892,4 +892,34 @@ class TestSubscriber < Test::Unit::TestCase
add_test_timeout
}
end
def config_test_cannot_add_more_subscriber_to_one_channel_than_allowed
@max_subscribers_per_channel = 3
@subscriber_connection_timeout = "3s"
end
def test_cannot_add_more_subscriber_to_one_channel_than_allowed
headers = {'accept' => 'application/json'}
channel = 'ch_test_cannot_add_more_subscriber_to_one_channel_than_allowed'
other_channel = 'ch_test_cannot_add_more_subscriber_to_one_channel_than_allowed_2'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_4 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_4.callback {
assert_equal(403, sub_4.response_header.status, "Channel was created")
assert_equal(0, sub_4.response_header.content_length, "Received response for exceed subscriber limit")
assert_equal("Subscribers limit per channel has been exceeded.", sub_4.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'], "Didn't receive the right error message")
}
sub_5 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + other_channel.to_s).get :head => headers, :timeout => 30
sub_5.callback {
assert_equal(200, sub_5.response_header.status, "Channel was not created")
EventMachine.stop
}
add_test_timeout
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment