Commit 9b272cf3 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

publisher admin implementation to delete channels

parent f7210a24
* Fixing bug which removed default message template
* Adding publisher administrator feature to delete channels (Suggested by Alexey Vdovin)
* Removing support for versions 0.7.x
* Fixing messages sent to subscribers to be a truly transfer encoding chunked connection
* Removing hack to keep connection open (Thanks _Maxim Dounin_)
......
......@@ -185,6 +185,7 @@ h3(#directives). Directives
|push_stream_max_number_of_broadcast_channels|unset|number|http, location|(push_stream_subscriber and push_stream_publisher) or main nginx configuration|
|push_stream_memory_cleanup_timeout|30 seconds|time constant|http|main nginx configuration|
|push_stream_keepalive|off|on, off|http, location|(push_stream_publisher and push_stream_channels_statistics) or main nginx configuration|
|push_stream_publisher_admin|off|on, off|location|push_stream_publisher|
h4(#push_stream_channels_statistics). push_stream_channels_statistics
......@@ -382,11 +383,39 @@ h4(#push_stream_keepalive). push_stream_keepalive [ on | off ]
New in version 0.2.4
default: off
context: location
context: http, location
location: (push_stream_publisher and push_stream_channels_statistics) or main nginx configuration
Enable keepalive connections, on publisher or channels statistics locations.
h4(#push_stream_publisher_admin). push_stream_publisher_admin [ on | off ]
New in version 0.2.5
default: off
context: location
location: push_stream_publisher
Enable admin features for publishers.
They can delete channels removing any existent subscribers using DELETE http method.
<pre>
<code>
# Pub create channel 1
curl -s -v -X POST "http://localhost/pub?id=my_channel_1" -d "Hello World 1!"
# Pub create channel 2
curl -s -v -X POST "http://localhost/pub?id=my_channel_2" -d "Hello World 2!"
# Sub on both channels
curl -s -v "http://localhost/sub/my_channel_1.b1/my_channel_2.b1"
# Pub delete channel 2
curl -s -v -X DELETE "http://localhost/pub?id=my_channel_2"
</code>
</pre>
h2(#attention). Attention
This module controls everything needed to send the messages to subscribers.
......
......@@ -67,6 +67,7 @@ typedef struct {
ngx_uint_t max_number_of_broadcast_channels;
ngx_msec_t buffer_cleanup_interval;
ngx_uint_t keepalive;
ngx_uint_t publisher_admin;
} ngx_http_push_stream_loc_conf_t;
// shared memory segment name
......@@ -110,6 +111,7 @@ typedef struct {
time_t expires;
ngx_flag_t deleted;
ngx_flag_t broadcast;
ngx_http_push_stream_msg_t *channel_deleted_message;
} ngx_http_push_stream_channel_t;
typedef struct {
......@@ -166,6 +168,7 @@ typedef struct {
ngx_uint_t subscribers; // # of subscribers in all channels
ngx_http_push_stream_msg_t messages_to_delete;
ngx_rbtree_t channels_to_delete;
ngx_rbtree_t unrecoverable_channels;
ngx_http_push_stream_worker_data_t ipc[NGX_MAX_PROCESSES]; // interprocess stuff
} ngx_http_push_stream_shm_data_t;
......@@ -190,6 +193,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE = ngx_s
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_MUCH_BROADCAST_CHANNELS = ngx_string("Subscribed too much broadcast channels.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS = ngx_string("Subscriber could not create channels.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE = ngx_string("Number of channels were exceeded.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED = ngx_string("Channel deleted.");
#define NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID (void *) -1
#define NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID (void *) -2
......@@ -208,6 +212,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING = ngx_stri
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED = ngx_string("chunked");
// other stuff
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_DELETE_METHODS = ngx_string("GET, POST, DELETE");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_METHODS = ngx_string("GET, POST");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET");
......
......@@ -44,6 +44,7 @@ static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES = {49, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_SEND_PING = {50, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS = {51, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS = {52, 0, 0, -1};
static ngx_channel_t NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL = {53, 0, 0, -1};
// worker processes of the world, unite.
ngx_socket_t ngx_http_push_stream_socketpairs[NGX_MAX_PROCESSES][2];
......@@ -57,6 +58,7 @@ static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int
#define ngx_http_push_stream_alert_worker_send_ping(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_SEND_PING);
#define ngx_http_push_stream_alert_worker_disconnect_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS);
#define ngx_http_push_stream_alert_worker_census_subscribers(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS);
#define ngx_http_push_stream_alert_worker_delete_channel(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL);
static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_log_t *log);
......
......@@ -183,6 +183,9 @@ static const ngx_int_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID = -1;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT = ngx_string("");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PING_CHANNEL_ID = ngx_string("");
static const ngx_int_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID = -2;
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT = ngx_string("Channel deleted");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
......@@ -223,15 +226,17 @@ static void ngx_http_push_stream_buffer_cleanup_timer_set(ngx_ht
static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event);
static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
static void ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
u_char * ngx_http_push_stream_append_crlf(const ngx_str_t *str, ngx_pool_t *pool);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_delete_channel(ngx_str_t *id);
static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_rbtree_node_t *node, ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
static ngx_int_t ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t force);
static ngx_inline void ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired);
static ngx_inline void ngx_http_push_stream_delete_worker_channel(void);
static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request_t *r, ngx_uint_t default_subtype);
......
......@@ -240,6 +240,8 @@ ngx_http_push_stream_channel_handler(ngx_event_t *ev)
ngx_http_push_stream_disconnect_worker_subscribers(0);
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS.command) {
ngx_http_push_stream_census_worker_subscribers();
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_DELETE_CHANNEL.command) {
ngx_http_push_stream_delete_worker_channel();
}
}
}
......@@ -286,20 +288,23 @@ ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect)
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_subscriber_t *cur = sentinel;
time_t now = ngx_time();
ngx_shmtx_lock(&shpool->mutex);
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) {
ngx_http_push_stream_worker_subscriber_cleanup(cur);
ngx_http_push_stream_worker_subscriber_cleanup_locked(cur);
ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
ngx_http_finalize_request(cur->request, NGX_HTTP_OK);
} else {
break;
}
}
ngx_shmtx_unlock(&shpool->mutex);
}
......
......@@ -36,8 +36,14 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
r->keepalive = cf->keepalive;
// only accept GET and POST methods
if (!(r->method & (NGX_HTTP_GET|NGX_HTTP_POST))) {
// only accept GET, POST and DELETE methods if enable publisher administration
if (cf->publisher_admin && !(r->method & (NGX_HTTP_GET|NGX_HTTP_POST|NGX_HTTP_DELETE))) {
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_DELETE_METHODS);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
}
// only accept GET and POST methods if NOT enable publisher administration
if (!cf->publisher_admin && !(r->method & (NGX_HTTP_GET|NGX_HTTP_POST))) {
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_METHODS);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
}
......@@ -62,14 +68,20 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
return ngx_http_push_stream_publisher_handle_post(cf, r, id);
}
// GET only make sense with a previous existing channel
// GET or DELETE only make sense with a previous existing channel
if (channel == NULL) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_FOUND, NULL);
}
if (cf->publisher_admin && (r->method == NGX_HTTP_DELETE)) {
ngx_http_push_stream_delete_channel(id);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
}
return ngx_http_push_stream_send_response_channel_info(r, channel);
}
static ngx_int_t
ngx_http_push_stream_publisher_handle_post(ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_str_t *id)
{
......
......@@ -146,6 +146,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, keepalive),
NULL },
{ ngx_string("push_stream_publisher_admin"),
NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, publisher_admin),
NULL },
ngx_null_command
};
......@@ -303,6 +309,7 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
return mcf;
}
static char *
ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
{
......@@ -358,6 +365,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->max_number_of_broadcast_channels = NGX_CONF_UNSET_UINT;
lcf->buffer_cleanup_interval = NGX_CONF_UNSET_MSEC;
lcf->keepalive = NGX_CONF_UNSET_UINT;
lcf->publisher_admin = NGX_CONF_UNSET_UINT;
return lcf;
}
......@@ -385,6 +393,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_uint_value(conf->max_number_of_broadcast_channels, prev->max_number_of_broadcast_channels, NGX_CONF_UNSET_UINT);
ngx_conf_merge_uint_value(conf->buffer_cleanup_interval, prev->buffer_cleanup_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_uint_value(conf->keepalive, prev->keepalive, 0);
ngx_conf_merge_uint_value(conf->publisher_admin, prev->publisher_admin, 0);
// sanity checks
......@@ -506,6 +515,7 @@ ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handl
return NGX_CONF_OK;
}
static char *
ngx_http_push_stream_channels_statistics(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
......@@ -522,6 +532,7 @@ ngx_http_push_stream_channels_statistics(ngx_conf_t *cf, ngx_command_t *cmd, voi
return rc;
}
static char *
ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
......@@ -585,7 +596,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
ngx_rbtree_node_t *sentinel, *remove_sentinel;
ngx_rbtree_node_t *sentinel, *remove_sentinel, *unrecoverable_sentinel;
ngx_http_push_stream_shm_data_t *d;
if ((d = (ngx_http_push_stream_shm_data_t *) ngx_slab_alloc(shpool, sizeof(*d))) == NULL) { //shm_data plus an array.
......@@ -611,6 +622,11 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
ngx_rbtree_init(&d->channels_to_delete, remove_sentinel, ngx_http_push_stream_rbtree_insert);
if ((unrecoverable_sentinel = ngx_slab_alloc(shpool, sizeof(*unrecoverable_sentinel))) == NULL) {
return NGX_ERROR;
}
ngx_rbtree_init(&d->unrecoverable_channels, unrecoverable_sentinel, ngx_http_push_stream_rbtree_insert);
// create ping message
ngx_http_push_stream_ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT.data, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, ngx_cycle->pool);
if (ngx_http_push_stream_ping_msg == NULL) {
......
......@@ -394,7 +394,11 @@ ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_poo
static void
ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (data->worker_subscriber != NULL) {
ngx_http_push_stream_worker_subscriber_cleanup(data->worker_subscriber);
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_worker_subscriber_cleanup_locked(data->worker_subscriber);
ngx_shmtx_unlock(&shpool->mutex);
}
}
This diff is collapsed.
......@@ -174,6 +174,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_pool_t *temp_pool;
ngx_flag_t is_broadcast_channel = 0;
channel = ngx_http_push_stream_find_channel(id, log);
......@@ -200,6 +201,13 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
return NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED;
}
//create a temporary pool to allocate temporary elements
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate memory for temporary pool");
ngx_shmtx_unlock(&shpool->mutex);
return NULL;
}
if ((channel = ngx_slab_alloc_locked(shpool, sizeof(ngx_http_push_stream_channel_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NULL;
......@@ -220,6 +228,8 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_http_push_stream_initialize_channel(channel);
channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT.data, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT.len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, temp_pool);
ngx_rbtree_insert(&data->tree, (ngx_rbtree_node_t *) channel);
(is_broadcast_channel) ? data->broadcast_channels++ : data->channels++;
......
......@@ -147,6 +147,9 @@ module BaseTestCase
@memory_cleanup_timeout = '5m'
@config_template = nil
@keepalive = 'off'
@publisher_admin = 'off'
self.send(:global_configuration) if self.respond_to?(:global_configuration)
end
def publish_message(channel, headers, body)
......@@ -238,6 +241,9 @@ http {
# activate publisher mode for this location
push_stream_publisher;
# activate publisher mode for this location
<%= "push_stream_publisher_admin #{@publisher_admin};" unless @publisher_admin.nil? %>
# query string based channel id
set $push_stream_channel_id $arg_id;
# store messages
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment