Commit 2e663203 authored by Wandenberg's avatar Wandenberg

send alert messages to an events channel when a channel is created or...

send alert messages to an events channel when a channel is created or destroyed and when a client subscribe or unsubscribe a channel
parent 6010ef69
......@@ -152,6 +152,7 @@ h1(#directives). Directives
| "push_stream_max_number_of_channels":push_stream_max_number_of_channels |   - |   x |   - |   - |   - |   - |
| "push_stream_max_number_of_wildcard_channels":push_stream_max_number_of_wildcard_channels |   - |   x |   - |   - |   - |   - |
| "push_stream_wildcard_channel_prefix":push_stream_wildcard_channel_prefix |   - |   x |   - |   - |   - |   - |
| "push_stream_events_channel_id":push_stream_events_channel_id |   - |   x |   - |   - |   - |   - |
| "push_stream_channels_path":push_stream_channels_path |   - |   - |   x |   x |   x |   x |
| "push_stream_store_messages":push_stream_store_messages |   - |   - |   - |   x |   - |   x |
| "push_stream_channel_info_on_publish":push_stream_channel_info_on_publish |   - |   - |   - |   x |   - |   - |
......@@ -171,6 +172,7 @@ h1(#directives). Directives
| "push_stream_user_agent":push_stream_user_agent |   - |   - |   x |   - |   - |   - |
| "push_stream_padding_by_user_agent":push_stream_padding_by_user_agent |   - |   - |   x |   - |   - |   - |
| "push_stream_allowed_origins":push_stream_allowed_origins |   - |   - |   x |   - |   - |   - |
| "push_stream_allow_connections_to_events_channel":push_stream_allow_connections_to_events_channel |   - |   - |   x |   - |   - |   x |
h1(#installation). Installation <a name="installation" href="#">&nbsp;</a>
......@@ -258,6 +260,7 @@ h1(#contributors). Contributors
[push_stream_max_number_of_channels]docs/directives/main.textile#push_stream_max_number_of_channels
[push_stream_max_number_of_wildcard_channels]docs/directives/main.textile#push_stream_max_number_of_wildcard_channels
[push_stream_wildcard_channel_prefix]docs/directives/main.textile#push_stream_wildcard_channel_prefix
[push_stream_events_channel_id]docs/directives/main.textile#push_stream_events_channel_id
[push_stream_channels_path]docs/directives/subscribers.textile#push_stream_channels_path
[push_stream_authorized_channels_only]docs/directives/subscribers.textile#push_stream_authorized_channels_only
[push_stream_header_template_file]docs/directives/subscribers.textile#push_stream_header_template_file
......@@ -278,5 +281,6 @@ h1(#contributors). Contributors
[push_stream_channel_info_on_publish]docs/directives/publishers.textile#push_stream_channel_info_on_publish
[push_stream_allowed_origins]docs/directives/subscribers.textile#push_stream_allowed_origins
[push_stream_websocket_allow_publish]docs/directives/subscribers.textile#push_stream_websocket_allow_publish
[push_stream_allow_connections_to_events_channel]docs/directives/subscribers.textile#push_stream_allow_connections_to_events_channel
[wiki]https://github.com/wandenberg/nginx-push-stream-module/wiki/_pages
[nginx_debugging]http://wiki.nginx.org/Debugging
......@@ -134,4 +134,25 @@ The string prefix used to identify a wildcard channel, example: when you set thi
A wildcard channel is technically equals to a normal one. It is intended to be used when the "push_stream_authorized_channels_only":push_stream_authorized_channels_only is set to on.
h2(#push_stream_events_channel_id). push_stream_events_channel_id <a name="push_stream_events_channel_id" href="#">&nbsp;</a>
*syntax:* _push_stream_events_channel_id string_
*default:* _none_
*context:* _http_
*release version:* _0.6.0_
The string identify an events channel where some control messages will be published.
Examples:
{"type": "channel_created", "channel": "CHANNEL_ID"}
{"type": "channel_destroyed", "channel": "CHANNEL_ID"}
{"type": "client_subscribed", "channel": "CHANNEL_ID"}
{"type": "client_unsubscribed", "channel": "CHANNEL_ID"}
By default this channel is not available to subscription. To allow subscriptions to it is necessary set "push_stream_allow_connections_to_events_channel":push_stream_allow_connections_to_events_channel to on.
[push_stream_authorized_channels_only]subscribers.textile#push_stream_authorized_channels_only
[push_stream_allow_connections_to_events_channel]subscribers.textile#push_stream_allow_connections_to_events_channel
......@@ -221,6 +221,19 @@ h2(#push_stream_websocket_allow_publish). push_stream_websocket_allow_publish <a
Enable a WebSocket subscriber send messages to the channel(s) it is connected through the same connection it is receiving the messages, using _send_ method from WebSocket interface.
h2(#push_stream_allow_connections_to_events_channel). push_stream_allow_connections_to_events_channel <a name="push_stream_allow_connections_to_events_channel" href="#">&nbsp;</a>
*syntax:* _push_stream_allow_connections_to_events_channel on | off_
*default:* _off_
*context:* _location_
*release version:* _0.6.0_
Enable subscriptions to events channel.
h2(#push_stream_last_received_message_time). push_stream_last_received_message_time <a name="push_stream_last_received_message_time" href="#">&nbsp;</a>
*syntax:* _push_stream_last_received_message_time string_
......
......@@ -76,6 +76,7 @@ typedef struct {
typedef struct ngx_http_push_stream_msg_s ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_shm_data_s ngx_http_push_stream_shm_data_t;
typedef struct ngx_http_push_stream_global_shm_data_s ngx_http_push_stream_global_shm_data_t;
typedef struct ngx_http_push_stream_channel_s ngx_http_push_stream_channel_t;
typedef struct {
ngx_flag_t enabled;
......@@ -92,6 +93,8 @@ typedef struct {
ngx_uint_t max_channel_id_length;
ngx_queue_t msg_templates;
ngx_flag_t timeout_with_body;
ngx_str_t events_channel_id;
ngx_http_push_stream_channel_t *events_channel;
ngx_regex_t *backtrack_parser_regex;
ngx_http_push_stream_msg_t *ping_msg;
ngx_http_push_stream_msg_t *longpooling_timeout_msg;
......@@ -115,6 +118,7 @@ typedef struct {
ngx_msec_t longpolling_connection_ttl;
ngx_flag_t websocket_allow_publish;
ngx_flag_t channel_info_on_publish;
ngx_flag_t allow_connections_to_events_channel;
ngx_http_complex_value_t *last_received_message_time;
ngx_http_complex_value_t *last_received_message_tag;
ngx_http_complex_value_t *last_event_id;
......@@ -156,8 +160,7 @@ typedef struct {
ngx_uint_t subscribers;
} ngx_http_push_stream_pid_queue_t;
// our typecast-friendly rbtree node (channel)
typedef struct {
struct ngx_http_push_stream_channel_s {
ngx_rbtree_node_t node;
ngx_queue_t queue;
ngx_str_t id;
......@@ -171,9 +174,10 @@ typedef struct {
time_t expires;
ngx_flag_t deleted;
ngx_flag_t wildcard;
char for_events;
ngx_http_push_stream_msg_t *channel_deleted_message;
ngx_shmtx_t *mutex;
} ngx_http_push_stream_channel_t;
};
typedef struct {
ngx_queue_t queue;
......@@ -295,6 +299,8 @@ struct ngx_http_push_stream_shm_data_s {
ngx_shmtx_sh_t channels_lock[10];
ngx_shmtx_t cleanup_mutex;
ngx_shmtx_sh_t cleanup_lock;
ngx_shmtx_t events_channel_mutex;
ngx_shmtx_sh_t events_channel_lock;
};
ngx_shm_zone_t *ngx_http_push_stream_global_shm_zone = NULL;
......@@ -319,6 +325,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_MUCH_WILDCARD_CHANNELS = ngx_str
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL = ngx_string("Subscribers limit per channel has been exceeded.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS = ngx_string("Subscriber could not create channels.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE = ngx_string("Number of channels were exceeded.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_INTERNAL_ONLY_EVENTS_CHANNEL_MESSAGE = ngx_string("Only internal routines can change events channel.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_SUBSCRIPTION_EVENTS_CHANNEL_FORBIDDEN_MESSAGE = ngx_string("Subscription to events channel is not allowed.");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_NO_MANDATORY_HEADERS_MESSAGE = ngx_string("Don't have at least one of the mandatory headers: Connection, Upgrade, Sec-WebSocket-Key and Sec-WebSocket-Version");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_WRONG_WEBSOCKET_VERSION_MESSAGE = ngx_string("Version not supported. Supported versions: 8, 13");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED = ngx_string("Channel deleted.");
......
......@@ -50,6 +50,8 @@ static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_CHANNEL_INACTIVITY_TIME = 30;
#define NGX_HTTP_PUSH_STREAM_DEFAULT_WILDCARD_CHANNEL_PREFIX ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_EVENTS_CHANNEL_ID ""
static char * ngx_http_push_stream_channels_statistics(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
// publisher
......
......@@ -222,6 +222,13 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_CONTENT_TYPE = ngx_string(
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN = ngx_string("([^:]+),(\\d+),(\\d+)");
#define NGX_HTTP_PUSH_STREAM_EVENT_TEMPLATE "{\"type\": \"%V\", \"channel\": \"%V\"}%Z"
static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED = ngx_string("channel_created");
static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED = ngx_string("channel_destroyed");
static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED = ngx_string("client_subscribed");
static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED = ngx_string("client_unsubscribed");
ngx_event_t ngx_http_push_stream_memory_cleanup_event;
ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
......@@ -253,7 +260,8 @@ static void ngx_http_push_stream_unescape_uri(ngx_str_t *value);
static void ngx_http_push_stream_complex_value(ngx_http_request_t *r, ngx_http_complex_value_t *val, ngx_str_t *value);
ngx_int_t ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool);
ngx_int_t ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool);
ngx_int_t ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_id, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
......@@ -264,7 +272,7 @@ static void ngx_http_push_stream_timer_set(ngx_msec_t timer_inte
static void ngx_http_push_stream_timer_reset(ngx_msec_t timer_interval, ngx_event_t *timer_event);
#define ngx_http_push_stream_memory_cleanup_timer_set(void) ngx_http_push_stream_timer_set(NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_INTERVAL, &ngx_http_push_stream_memory_cleanup_event, ngx_http_push_stream_memory_cleanup_timer_wake_handler, 1);
#define ngx_http_push_stream_buffer_cleanup_timer_set(pslcf) ngx_http_push_stream_timer_set(NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, pslcf->store_messages);
#define ngx_http_push_stream_buffer_cleanup_timer_set(void) ngx_http_push_stream_timer_set(NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL, &ngx_http_push_stream_buffer_cleanup_event, ngx_http_push_stream_buffer_timer_wake_handler, 1);
static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t *worker_subscriber);
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
......
This diff is collapsed.
......@@ -59,6 +59,9 @@ module NginxConfiguration
:channels_path_for_pub => '$arg_id',
:channels_path => '$1',
:events_channel_id => nil,
:allow_connections_to_events_channel => nil,
:extra_location => '',
:extra_configuration => ''
}
......@@ -152,6 +155,9 @@ http {
<%= write_directive("push_stream_ping_message_text", ping_message_text) %>
<%= write_directive("push_stream_channel_inactivity_time", channel_inactivity_time) %>
<%= write_directive("push_stream_events_channel_id", events_channel_id) %>
<%= write_directive("push_stream_allow_connections_to_events_channel", allow_connections_to_events_channel) %>
server {
listen <%= nginx_port %>;
server_name <%= nginx_host %>;
......
......@@ -106,10 +106,14 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: number of channels were exceeded");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE);
}
} else {
requested_channel->channel = ngx_http_push_stream_find_channel(requested_channel->id, r->connection->log, mcf);
}
if ((r->method != NGX_HTTP_GET) && (requested_channel->channel != NULL) && requested_channel->channel->for_events) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: only internal routines can change events channel");
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_FORBIDDEN, &NGX_HTTP_PUSH_STREAM_INTERNAL_ONLY_EVENTS_CHANNEL_MESSAGE);
}
}
ctx->requested_channels = requested_channels;
......@@ -236,6 +240,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
{
ngx_str_t *event_id, *event_type;
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL;
......@@ -263,7 +268,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
for (q = ngx_queue_head(&ctx->requested_channels->queue); q != ngx_queue_sentinel(&ctx->requested_channels->queue); q = ngx_queue_next(q)) {
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
if (ngx_http_push_stream_add_msg_to_channel(r, requested_channel->channel, buf->pos, ngx_buf_size(buf), event_id, event_type, r->pool) != NGX_OK) {
if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, requested_channel->channel, buf->pos, ngx_buf_size(buf), event_id, event_type, cf->store_messages, r->pool) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
......
......@@ -121,6 +121,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, wildcard_channel_prefix),
NULL },
{ ngx_string("push_stream_events_channel_id"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, events_channel_id),
NULL },
/* Location directives */
{ ngx_string("push_stream_channels_path"),
......@@ -237,6 +243,13 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, allowed_origins),
NULL },
{ ngx_string("push_stream_allow_connections_to_events_channel"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, allow_connections_to_events_channel),
NULL },
ngx_null_command
};
......@@ -450,6 +463,8 @@ ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
mcf->max_messages_stored_per_channel = NGX_CONF_UNSET_UINT;
mcf->qtd_templates = 0;
mcf->timeout_with_body = NGX_CONF_UNSET;
ngx_str_null(&mcf->events_channel_id);
mcf->events_channel = NULL;
mcf->ping_msg = NULL;
mcf->longpooling_timeout_msg = NULL;
ngx_queue_init(&mcf->msg_templates);
......@@ -472,6 +487,7 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
ngx_conf_merge_str_value(conf->channel_deleted_message_text, conf->channel_deleted_message_text, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT);
ngx_conf_merge_str_value(conf->ping_message_text, conf->ping_message_text, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT);
ngx_conf_merge_str_value(conf->wildcard_channel_prefix, conf->wildcard_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_WILDCARD_CHANNEL_PREFIX);
ngx_conf_merge_str_value(conf->events_channel_id, conf->events_channel_id, NGX_HTTP_PUSH_STREAM_DEFAULT_EVENTS_CHANNEL_ID);
ngx_conf_init_value(conf->timeout_with_body, 0);
// sanity checks
......@@ -565,6 +581,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->longpolling_connection_ttl = NGX_CONF_UNSET_MSEC;
lcf->websocket_allow_publish = NGX_CONF_UNSET_UINT;
lcf->channel_info_on_publish = NGX_CONF_UNSET_UINT;
lcf->allow_connections_to_events_channel = NGX_CONF_UNSET_UINT;
lcf->last_received_message_time = NULL;
lcf->last_received_message_tag = NULL;
lcf->last_event_id = NULL;
......@@ -594,6 +611,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_msec_value(conf->longpolling_connection_ttl, prev->longpolling_connection_ttl, conf->subscriber_connection_ttl);
ngx_conf_merge_value(conf->websocket_allow_publish, prev->websocket_allow_publish, 0);
ngx_conf_merge_value(conf->channel_info_on_publish, prev->channel_info_on_publish, 1);
ngx_conf_merge_value(conf->allow_connections_to_events_channel, prev->allow_connections_to_events_channel, 0);
ngx_conf_merge_str_value(conf->padding_by_user_agent, prev->padding_by_user_agent, NGX_HTTP_PUSH_STREAM_DEFAULT_PADDING_BY_USER_AGENT);
ngx_conf_merge_uint_value(conf->location_type, prev->location_type, NGX_CONF_UNSET_UINT);
......@@ -1111,5 +1129,18 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->mutex_round_robin = 0;
if (mcf->events_channel_id.len > 0) {
if ((mcf->events_channel = ngx_http_push_stream_get_channel(&mcf->events_channel_id, ngx_cycle->log, mcf)) == NULL) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: unable to create events channel");
return NGX_ERROR;
}
if (ngx_http_push_stream_create_shmtx(&d->events_channel_mutex, &d->events_channel_lock, (u_char *) "push_stream_events_channel") != NGX_OK) {
return NGX_ERROR;
}
mcf->events_channel->mutex = &d->events_channel_mutex;
}
return NGX_OK;
}
......@@ -330,6 +330,14 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL;
return NGX_ERROR;
}
// check if is allowed to connect to events channel
if (!cf->allow_connections_to_events_channel && (requested_channel->channel != NULL) && requested_channel->channel->for_events) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: subscription to events channel is not allowed");
*status_code = NGX_HTTP_FORBIDDEN;
*explain_error_message = (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_SUBSCRIPTION_EVENTS_CHANNEL_FORBIDDEN_MESSAGE;
return NGX_ERROR;
}
}
// check if number of subscribed wildcard channels is acceptable
......@@ -635,6 +643,9 @@ ngx_http_push_stream_assing_subscription_to_channel(ngx_slab_pool_t *shpool, ngx
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscriptions, &subscription->channel_worker_queue);
subscription->channel_worker_sentinel = worker_subscribers_sentinel;
ngx_shmtx_unlock(channel->mutex);
ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED, NULL);
return NGX_OK;
}
......
......@@ -82,6 +82,7 @@ ngx_http_push_stream_delete_channels(void)
void
ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_http_push_stream_main_conf_t *mcf = data->mcf;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_pid_queue_t *worker;
ngx_queue_t *cur_worker, *cur;
......@@ -113,6 +114,8 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
// remove the subscription for the channel from worker
ngx_queue_remove(&subscription->channel_worker_queue);
ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, subscriber->request->pool);
if (subscriber->longpolling) {
ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool);
ngx_http_send_header(subscriber->request);
......@@ -139,9 +142,15 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
void
ngx_http_push_stream_collect_deleted_channels_data(ngx_http_push_stream_shm_data_t *data)
{
ngx_http_push_stream_main_conf_t *mcf = data->mcf;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *q;
ngx_uint_t qtd_removed;
ngx_pool_t *temp_pool = NULL;
if (mcf->events_channel_id.len > 0) {
temp_pool = ngx_create_pool(4096, ngx_cycle->log);
}
ngx_shmtx_lock(&data->channels_to_delete_mutex);
for (q = ngx_queue_head(&data->channels_to_delete); q != ngx_queue_sentinel(&data->channels_to_delete);) {
......@@ -166,9 +175,15 @@ ngx_http_push_stream_collect_deleted_channels_data(ngx_http_push_stream_shm_data
ngx_queue_insert_tail(&data->channels_trash, &channel->queue);
data->channels_in_trash++;
ngx_shmtx_unlock(&data->channels_trash_mutex);
ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool);
}
}
ngx_shmtx_unlock(&data->channels_to_delete_mutex);
if (temp_pool != NULL) {
ngx_destroy_pool(temp_pool);
}
}
......@@ -357,10 +372,8 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con
ngx_int_t
ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_pool_t *temp_pool)
ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_msg_t *msg;
ngx_uint_t qtd_removed;
......@@ -368,7 +381,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_str
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, text, len, channel, channel->last_message_id + 1, event_id, event_type, temp_pool);
if (msg == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate message in shared memory");
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate message in shared memory");
return NGX_ERROR;
}
......@@ -383,7 +396,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_str
channel->expires = ngx_time() + mcf->channel_inactivity_time;
// put messages on the queue
if (cf->store_messages) {
if (store_messages) {
ngx_queue_insert_tail(&channel->message_queue, &msg->queue);
channel->stored_messages++;
}
......@@ -392,26 +405,54 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_http_push_str
// now see if the queue is too big
qtd_removed = ngx_http_push_stream_ensure_qtd_of_messages(data, channel, mcf->max_messages_stored_per_channel, 0);
ngx_shmtx_lock(&data->channels_queue_mutex);
data->published_messages++;
if (!channel->for_events) {
ngx_shmtx_lock(&data->channels_queue_mutex);
data->published_messages++;
if (msg->time >= data->last_message_time) {
data->last_message_time = msg->time;
data->last_message_tag = msg->tag;
}
if (msg->time >= data->last_message_time) {
data->last_message_time = msg->time;
data->last_message_tag = msg->tag;
}
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed);
if (cf->store_messages) {
data->stored_messages++;
if (store_messages) {
data->stored_messages++;
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
// send an alert to workers
ngx_http_push_stream_broadcast(channel, msg, r->connection->log, mcf);
ngx_http_push_stream_broadcast(channel, msg, log, mcf);
// turn on timer to cleanup buffer of old messages
ngx_http_push_stream_buffer_cleanup_timer_set(cf);
ngx_http_push_stream_buffer_cleanup_timer_set();
return NGX_OK;
}
ngx_int_t
ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_type, ngx_pool_t *received_temp_pool)
{
ngx_pool_t *temp_pool = received_temp_pool;
if ((temp_pool == NULL) && ((temp_pool = ngx_create_pool(4096, log)) == NULL)) {
return NGX_ERROR;
}
if ((mcf->events_channel_id.len > 0) && !channel->for_events) {
size_t len = ngx_strlen(NGX_HTTP_PUSH_STREAM_EVENT_TEMPLATE) + event_type->len + channel->id.len;
ngx_str_t *event = ngx_http_push_stream_create_str(temp_pool, len);
if (event != NULL) {
ngx_sprintf(event->data, NGX_HTTP_PUSH_STREAM_EVENT_TEMPLATE, event_type, &channel->id);
ngx_http_push_stream_add_msg_to_channel(mcf, log, mcf->events_channel, event->data, ngx_strlen(event->data), NULL, event_type, 1, temp_pool);
}
}
if ((received_temp_pool == NULL) && (temp_pool != NULL)) {
ngx_destroy_pool(temp_pool);
}
return NGX_OK;
}
......@@ -942,8 +983,17 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_flag_t forc
void
ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force)
{
ngx_http_push_stream_main_conf_t *mcf = data->mcf;
ngx_http_push_stream_channel_t *channel;
ngx_queue_t *q;
ngx_pool_t *temp_pool = NULL;
if (mcf->events_channel_id.len > 0) {
if ((temp_pool = ngx_create_pool(4096, ngx_cycle->log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: unable to allocate memory to temporary pool");
return;
}
}
ngx_http_push_stream_collect_expired_messages_data(data, force);
......@@ -952,8 +1002,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_p
channel = ngx_queue_data(q, ngx_http_push_stream_channel_t, queue);
q = ngx_queue_next(q);
if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->expires < ngx_time())) {
if ((channel->stored_messages == 0) && (channel->subscribers == 0) && (channel->expires < ngx_time()) && !channel->for_events) {
channel->deleted = 1;
channel->expires = ngx_time() + NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL;
(channel->wildcard) ? NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->wildcard_channels) : NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->channels);
......@@ -965,9 +1014,15 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_p
ngx_queue_insert_tail(&data->channels_trash, &channel->queue);
data->channels_in_trash++;
ngx_shmtx_unlock(&data->channels_trash_mutex);
ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool);
}
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
if (temp_pool != NULL) {
ngx_destroy_pool(temp_pool);
}
}
......@@ -1475,6 +1530,8 @@ ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t
ngx_queue_remove(&subscription->channel_worker_queue);
ngx_queue_remove(&subscription->queue);
ngx_shmtx_unlock(subscription->channel->mutex);
ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, worker_subscriber->request->pool);
}
ngx_shmtx_lock(&shpool->mutex);
......
......@@ -180,6 +180,7 @@ ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_
void
ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_int_t rc = NGX_OK;
......@@ -288,7 +289,12 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
for (q = ngx_queue_head(&ctx->subscriber->subscriptions); q != ngx_queue_sentinel(&ctx->subscriber->subscriptions); q = ngx_queue_next(q)) {
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(q, ngx_http_push_stream_subscription_t, queue);
if (ngx_http_push_stream_add_msg_to_channel(r, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, ctx->temp_pool) != NGX_OK) {
if (subscription->channel->for_events) {
// skip events channel on publish by websocket connections
continue;
}
if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, cf->store_messages, ctx->temp_pool) != NGX_OK) {
goto finalize;
}
}
......
......@@ -151,6 +151,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
channel->stored_messages = 0;
channel->subscribers = 0;
channel->deleted = 0;
channel->for_events = ((mcf->events_channel_id.len > 0) && (channel->id.len == mcf->events_channel_id.len) && (ngx_strncmp(channel->id.data, mcf->events_channel_id.data, mcf->events_channel_id.len) == 0));
channel->expires = ngx_time() + mcf->channel_inactivity_time;
ngx_queue_init(&channel->message_queue);
......@@ -164,6 +165,9 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
channel->mutex = &data->channels_mutex[data->mutex_round_robin++ % 9];
ngx_shmtx_unlock(&data->channels_queue_mutex);
ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED, NULL);
return channel;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment