Commit af255bf4 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

padding messages based on user agent

parent 842d5095
......@@ -95,7 +95,7 @@ Just as information is listed below the minimum amount of memory used for each o
* channel on shared = 536 bytes
* subscriber
** on shared = 230 bytes
** on system = 6500 bytes
** on system = 7100 bytes
h1(#basic-configuration). Basic Configuration
......@@ -958,6 +958,32 @@ h2(#push_stream_last_received_message_tag). push_stream_last_received_message_ta
Set the tag of the last message received to the server knows which messages has to be sent to subscriber. Is a replacement for If-None-Match header. Example, $arg_tag indicate that the value will be take from tag argument.
h2(#push_stream_user_agent). push_stream_user_agent
*syntax:* _push_stream_user_agent string_
*default:* _http user-agent header_
*context:* _location_
*release version:* _0.3.3_
Set from where the user agent will be get to be used on validation for the need of padding. Is a replacement for User-Agent header. Example, $arg_ua indicate that the value will be take from ua argument.
h2(#push_stream_padding_by_user_agent). push_stream_padding_by_user_agent
*syntax:* _push_stream_padding_by_user_agent string_
*default:* _[A|a]ndroid 2,4097,4097:[S|s]afari,1025,0_
*context:* _location_
*release version:* _0.3.3_
Set the minimum header size and minimum message size to each user agent who match the given expression. The value may be compound for many groups on the format _user-agent-regexp,header_min_size,message_min_size_ separate by a colon (_:_) .
h1(#attention). Attention
This module controls everything needed to send the messages to subscribers.
......
......@@ -42,6 +42,14 @@ typedef struct {
ngx_pool_t *pool;
} ngx_http_push_stream_queue_pool_t;
typedef struct {
ngx_queue_t queue;
ngx_regex_t *agent;
ngx_uint_t header_min_len;
ngx_uint_t message_min_len;
} ngx_http_push_stream_padding_t;
// template queue
typedef struct {
ngx_queue_t queue; // this MUST be first
......@@ -90,6 +98,9 @@ typedef struct {
ngx_flag_t websocket_allow_publish;
ngx_http_complex_value_t *last_received_message_time;
ngx_http_complex_value_t *last_received_message_tag;
ngx_http_complex_value_t *user_agent;
ngx_str_t padding_by_user_agent;
ngx_http_push_stream_padding_t *paddings;
} ngx_http_push_stream_loc_conf_t;
// shared memory segment name
......@@ -168,6 +179,7 @@ typedef struct {
ngx_pool_t *temp_pool;
ngx_chain_t *free;
ngx_chain_t *busy;
ngx_http_push_stream_padding_t *padding;
} ngx_http_push_stream_subscriber_ctx_t;
// messages to worker processes
......@@ -208,6 +220,8 @@ ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL;
ngx_http_push_stream_main_conf_t *ngx_http_push_stream_module_main_conf = NULL;
ngx_str_t **ngx_http_push_stream_module_paddings_chunks = NULL;
// channel
static ngx_str_t * ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf);
static ngx_int_t ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel);
......
......@@ -41,6 +41,8 @@ static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 30;
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE "~text~"
#define NGX_HTTP_PUSH_STREAM_DEFAULT_FOOTER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_PADDING_BY_USER_AGENT "[A|a]ndroid 2,4097,4097:[S|s]afari,1025,0"
#define NGX_HTTP_PUSH_STREAM_DEFAULT_CONTENT_TYPE "text/plain"
#define NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX ""
......
......@@ -208,6 +208,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK = ng
static const ngx_str_t NGX_HTTP_PUSH_STREAM_LAST_CHUNK = ngx_string("0" CRLF CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN = ngx_string("([^:]+),(\\d+),(\\d+)");
ngx_event_t ngx_http_push_stream_memory_cleanup_event;
ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
......@@ -236,6 +238,7 @@ static ngx_int_t ngx_http_push_stream_memory_cleanup();
static ngx_int_t ngx_http_push_stream_buffer_cleanup();
ngx_chain_t * ngx_http_push_stream_get_buf(ngx_http_request_t *r);
static void ngx_http_push_stream_complex_value(ngx_http_request_t *r, ngx_http_complex_value_t *val, ngx_str_t *value);
ngx_http_push_stream_channel_t *ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *text, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool);
......@@ -272,6 +275,8 @@ static ngx_str_t * ngx_http_push_stream_join_wi
static ngx_http_push_stream_subscriber_ctx_t * ngx_http_push_stream_add_request_context(ngx_http_request_t *r);
static ngx_http_push_stream_padding_t * ngx_http_push_stream_parse_paddings(ngx_conf_t *cf, ngx_str_t *paddings_by_user_agent);
static ngx_str_t * ngx_http_push_stream_get_formatted_current_time(ngx_pool_t *pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool);
......
......@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("1ee1e3af467be23b909d7ddf7ca493fe33e79c24");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("842d509527c19bb844b1454788d518dec1321877");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
......@@ -25,6 +25,8 @@
#include <ngx_http_push_stream_module_setup.h>
ngx_uint_t ngx_http_push_stream_padding_max_len = 0;
static ngx_command_t ngx_http_push_stream_commands[] = {
{ ngx_string("push_stream_channels_statistics"),
NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
......@@ -210,6 +212,18 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, last_received_message_tag),
NULL },
{ ngx_string("push_stream_user_agent"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, user_agent),
NULL },
{ ngx_string("push_stream_padding_by_user_agent"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, padding_by_user_agent),
NULL },
ngx_null_command
};
......@@ -346,6 +360,26 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf)
}
ngx_conf_log_error(NGX_LOG_INFO, cf, 0, "Using %udKiB of shared memory for push stream module", shm_size >> 10);
ngx_uint_t steps = ngx_http_push_stream_padding_max_len / 100;
if ((ngx_http_push_stream_module_paddings_chunks = ngx_palloc(cf->pool, sizeof(ngx_str_t) * (steps + 1))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to create padding messages");
return NGX_ERROR;
}
u_char aux[ngx_http_push_stream_padding_max_len + 1];
ngx_memset(aux, ' ', ngx_http_push_stream_padding_max_len);
aux[ngx_http_push_stream_padding_max_len] = '\0';
ngx_int_t i, len = ngx_http_push_stream_padding_max_len;
for (i = steps; i >= 0; i--) {
if ((*(ngx_http_push_stream_module_paddings_chunks + i) = ngx_http_push_stream_get_formatted_chunk(aux, len, cf->pool)) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to create padding messages");
return NGX_ERROR;
}
len = i * 100;
*(aux + len) = '\0';
}
return ngx_http_push_stream_set_up_shm(cf, shm_size);
}
......@@ -484,6 +518,9 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
lcf->websocket_allow_publish = NGX_CONF_UNSET_UINT;
lcf->last_received_message_time = NULL;
lcf->last_received_message_tag = NULL;
lcf->user_agent = NULL;
lcf->padding_by_user_agent.data = NULL;
lcf->paddings = NULL;
return lcf;
}
......@@ -507,6 +544,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_msec_value(conf->subscriber_connection_ttl, prev->subscriber_connection_ttl, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_msec_value(conf->longpolling_connection_ttl, prev->longpolling_connection_ttl, conf->subscriber_connection_ttl);
ngx_conf_merge_value(conf->websocket_allow_publish, prev->websocket_allow_publish, 0);
ngx_conf_merge_str_value(conf->padding_by_user_agent, prev->padding_by_user_agent, NGX_HTTP_PUSH_STREAM_DEFAULT_PADDING_BY_USER_AGENT);
if (conf->last_received_message_time == NULL) {
conf->last_received_message_time = prev->last_received_message_time;
......@@ -516,6 +554,10 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->last_received_message_tag = prev->last_received_message_tag;
}
if (conf->user_agent == NULL) {
conf->user_agent = prev->user_agent;
}
if (conf->location_type == NGX_CONF_UNSET_UINT) {
return NGX_CONF_OK;
}
......@@ -661,6 +703,19 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE)) {
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, conf->eventsource_support, (conf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE));
if (conf->padding_by_user_agent.len > 0) {
if ((conf->paddings = ngx_http_push_stream_parse_paddings(cf, &conf->padding_by_user_agent)) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to parse paddings by user agent");
return NGX_CONF_ERROR;
}
ngx_http_push_stream_padding_t *padding = conf->paddings;
while ((padding = (ngx_http_push_stream_padding_t *) ngx_queue_next(&padding->queue)) != conf->paddings) {
ngx_http_push_stream_padding_max_len = ngx_max(ngx_http_push_stream_padding_max_len, padding->header_min_len);
ngx_http_push_stream_padding_max_len = ngx_max(ngx_http_push_stream_padding_max_len, padding->message_min_len);
}
}
}
return NGX_CONF_OK;
......
......@@ -34,6 +34,7 @@ static ngx_http_push_stream_pid_queue_t *ngx_http_push_stream_create_wor
static ngx_http_push_stream_subscription_t *ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber);
static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, time_t if_modified_since, ngx_str_t *last_event_id, ngx_flag_t longpolling, ngx_pool_t *temp_pool);
static ngx_http_push_stream_padding_t *ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r);
static ngx_int_t
ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
......@@ -45,7 +46,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_http_push_stream_subscriber_ctx_t *ctx;
time_t if_modified_since;
ngx_str_t *last_event_id, vv_time = ngx_null_string;
u_char *dst, *src;
ngx_str_t *push_mode;
ngx_flag_t polling, longpolling;
ngx_int_t rc;
......@@ -76,16 +76,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
if (cf->last_received_message_time != NULL) {
ngx_http_complex_value(r, cf->last_received_message_time, &vv_time);
if (vv_time.len) {
dst = vv_time.data;
src = vv_time.data;
ngx_unescape_uri(&dst, &src, vv_time.len, NGX_UNESCAPE_URI);
if (dst < src) {
*dst = '\0';
vv_time.len = dst - vv_time.data;
}
}
ngx_http_push_stream_complex_value(r, cf->last_received_message_time, &vv_time);
} else if (r->headers_in.if_modified_since != NULL) {
vv_time = r->headers_in.if_modified_since->value;
}
......@@ -107,6 +98,8 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return result;
}
ctx->padding = ngx_http_push_stream_get_padding_by_user_agent(r);
// stream access
if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
......@@ -160,7 +153,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_flag_t has_message_to_send = 0;
if (cf->last_received_message_tag != NULL) {
ngx_http_complex_value(r, cf->last_received_message_tag, &vv_etag);
ngx_http_push_stream_complex_value(r, cf->last_received_message_tag, &vv_etag);
etag = vv_etag.len ? &vv_etag : NULL;
} else {
etag = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH);
......@@ -726,3 +719,28 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
ngx_queue_insert_tail(&worker_subscribers_sentinel->subscribers_sentinel.queue, &element_subscriber->queue);
return NGX_OK;
}
static ngx_http_push_stream_padding_t *
ngx_http_push_stream_get_padding_by_user_agent(ngx_http_request_t *r)
{
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_padding_t *padding = cf->paddings;
ngx_str_t vv_user_agent = ngx_null_string;
if (cf->user_agent != NULL) {
ngx_http_push_stream_complex_value(r, cf->user_agent, &vv_user_agent);
} else if (r->headers_in.user_agent != NULL) {
vv_user_agent = r->headers_in.user_agent->value;
}
if ((padding != NULL) && (vv_user_agent.len > 0)) {
while ((padding = (ngx_http_push_stream_padding_t *) ngx_queue_next(&padding->queue)) != cf->paddings) {
if (ngx_regex_exec(padding->agent, &vv_user_agent, NULL, 0) >= 0) {
return padding;
}
}
}
return NULL;
}
......@@ -28,6 +28,7 @@
static void nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel);
static void ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler);
static void ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_response_padding(ngx_http_request_t *r, size_t len, ngx_flag_t sending_header);
static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
......@@ -411,6 +412,9 @@ ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_htt
if (pslcf->header_template.len > 0) {
rc = ngx_http_push_stream_send_response_text(r, pslcf->header_template.data, pslcf->header_template.len, 0);
if (rc == NGX_OK) {
rc = ngx_http_push_stream_send_response_padding(r, pslcf->header_template.len, 1);
}
}
return rc;
......@@ -430,6 +434,9 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, msg, r->pool);
if (str != NULL) {
rc = ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
if (rc == NGX_OK) {
rc = ngx_http_push_stream_send_response_padding(r, str->len, 0);
}
}
}
......@@ -573,6 +580,25 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex
return ngx_http_push_stream_output_filter(r, out);
}
static ngx_int_t
ngx_http_push_stream_send_response_padding(ngx_http_request_t *r, size_t len, ngx_flag_t sending_header)
{
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
if (ctx->padding != NULL) {
ngx_int_t diff = ((sending_header) ? ctx->padding->header_min_len : ctx->padding->message_min_len) - len;
if (diff > 0) {
ngx_str_t *padding = *(ngx_http_push_stream_module_paddings_chunks + diff / 100);
ngx_http_push_stream_send_response_text(r, padding->data, padding->len, 0);
}
}
return NGX_OK;
}
static void
ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler)
{
......@@ -1565,3 +1591,113 @@ ngx_http_push_stream_send_only_added_headers(ngx_http_request_t *r)
return ngx_http_write_filter(r, &out);
}
static ngx_http_push_stream_padding_t *
ngx_http_push_stream_parse_paddings(ngx_conf_t *cf, ngx_str_t *paddings_by_user_agent)
{
ngx_int_t rc;
u_char errstr[NGX_MAX_CONF_ERRSTR];
ngx_regex_compile_t padding_rc, *agent_rc;
int captures[12];
ngx_http_push_stream_padding_t *sentinel, *padding;
ngx_str_t aux, *agent;
if ((sentinel = ngx_palloc(cf->pool, sizeof(ngx_http_push_stream_padding_t))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to save padding info");
return NULL;
}
ngx_queue_init(&sentinel->queue);
ngx_memzero(&padding_rc, sizeof(ngx_regex_compile_t));
padding_rc.pattern = NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN;
padding_rc.pool = cf->pool;
padding_rc.err.len = NGX_MAX_CONF_ERRSTR;
padding_rc.err.data = errstr;
if (ngx_regex_compile(&padding_rc) != NGX_OK) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to compile padding pattern %V", &NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN);
return NULL;
}
aux.data = paddings_by_user_agent->data;
aux.len = paddings_by_user_agent->len;
do {
rc = ngx_regex_exec(padding_rc.regex, &aux, captures, 12);
if (rc == NGX_REGEX_NO_MATCHED) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: padding pattern not match the value %V", &aux);
return NULL;
}
if (rc < 0) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: error applying padding pattern to %V", &aux);
return NULL;
}
if (captures[0] != 0) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: error applying padding pattern to %V", &aux);
return NULL;
}
if ((agent = ngx_http_push_stream_create_str(cf->pool, captures[3] - captures[2])) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "video security module: unable to allocate memory to copy agent pattern");
return NGX_CONF_ERROR;
}
ngx_memcpy(agent->data, aux.data + captures[2], agent->len);
if ((agent_rc = ngx_pcalloc(cf->pool, sizeof(ngx_regex_compile_t))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "video security module: unable to allocate memory to compile agent patterns");
return NGX_CONF_ERROR;
}
agent_rc->pattern = *agent;
agent_rc->pool = cf->pool;
agent_rc->err.len = NGX_MAX_CONF_ERRSTR;
agent_rc->err.data = errstr;
if (ngx_regex_compile(agent_rc) != NGX_OK) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to compile agent pattern %V", &agent);
return NULL;
}
if ((padding = ngx_palloc(cf->pool, sizeof(ngx_http_push_stream_padding_t))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to save padding info");
return NULL;
}
padding->agent = agent_rc->regex;
padding->header_min_len = ngx_atoi(aux.data + captures[4], captures[5] - captures[4]);
padding->message_min_len = ngx_atoi(aux.data + captures[6], captures[7] - captures[6]);
ngx_queue_insert_tail(&sentinel->queue, &padding->queue);
ngx_conf_log_error(NGX_LOG_INFO, cf, 0, "push stream module: padding detected %V, header_min_len %d, message_min_len %d", &agent_rc->pattern, padding->header_min_len, padding->message_min_len);
aux.data = aux.data + (captures[1] - captures[0] + 1);
aux.len = aux.len - (captures[1] - captures[0] + 1);
} while (aux.data < (paddings_by_user_agent->data + paddings_by_user_agent->len));
return sentinel;
}
static void
ngx_http_push_stream_complex_value(ngx_http_request_t *r, ngx_http_complex_value_t *val, ngx_str_t *value)
{
u_char *dst, *src;
ngx_http_complex_value(r, val, value);
if (value->len) {
dst = value->data;
src = value->data;
ngx_unescape_uri(&dst, &src, value->len, NGX_UNESCAPE_URI);
if (dst < src) {
*dst = '\0';
value->len = dst - value->data;
}
}
}
......@@ -161,6 +161,8 @@ module BaseTestCase
@publisher_mode = nil
@last_received_message_time = nil
@last_received_message_tag = nil
@user_agent = nil
@padding_by_user_agent = nil
self.send(:global_configuration) if self.respond_to?(:global_configuration)
end
......@@ -270,6 +272,9 @@ http {
<%= "push_stream_last_received_message_time #{@last_received_message_time};" unless @last_received_message_time.nil? %>
<%= "push_stream_last_received_message_tag #{@last_received_message_tag};" unless @last_received_message_tag.nil? %>
<%= "push_stream_user_agent #{@user_agent};" unless @user_agent.nil? %>
<%= "push_stream_padding_by_user_agent '#{@padding_by_user_agent}';" unless @padding_by_user_agent.nil? %>
# max subscribers per channel
<%= "push_stream_max_subscribers_per_channel #{@max_subscribers_per_channel};" unless @max_subscribers_per_channel.nil? %>
# max messages to store in memory
......
......@@ -6,7 +6,7 @@ class TestMeasureMemory < Test::Unit::TestCase
@@message_estimate_size = 200
@@channel_estimate_size = 536
@@subscriber_estimate_size = 230
@@subscriber_estimate_system_size = 6500
@@subscriber_estimate_system_size = 7100
def global_configuration
@max_reserved_memory = "2m"
......@@ -28,14 +28,14 @@ class TestMeasureMemory < Test::Unit::TestCase
shared_size = @max_reserved_memory.to_i * 1024 * 1024
expected_message = shared_size / (@@message_estimate_size + body.size)
EventMachine.run {
EM.add_periodic_timer(0.0001) do
pub_1 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 60
pub_1.callback {
EventMachine.stop if pub_1.response_header.status == 500
}
end
}
post_channel_message = "POST /pub?id=#{channel} HTTP/1.0\r\nContent-Length: #{body.size}\r\n\r\n#{body}"
socket = TCPSocket.open(nginx_host, nginx_port)
while (true) do
socket.print(post_channel_message)
headers, body = read_response(socket)
break unless headers.match(/200 OK/)
end
EventMachine.run {
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats?id=' + channel.to_s).get :head => headers, :timeout => 60
......@@ -47,6 +47,8 @@ class TestMeasureMemory < Test::Unit::TestCase
assert(((expected_message - 10) < published_messages) && (published_messages < (expected_message + 10)), "Message size is far from %d bytes (expected: %d, published: %d)" % ([@@message_estimate_size, expected_message, published_messages]))
EventMachine.stop
}
add_test_timeout
}
end
......@@ -59,6 +61,7 @@ class TestMeasureMemory < Test::Unit::TestCase
EventMachine.run {
publish_message_in_loop(1000, headers, body)
add_test_timeout(20)
}
EventMachine.run {
......@@ -71,6 +74,7 @@ class TestMeasureMemory < Test::Unit::TestCase
assert(((expected_channel - 10) < created_channels) && (created_channels < (expected_channel + 10)), "Channel size is far from %d bytes (expected: %d, created: %d)" % ([@@channel_estimate_size, expected_channel, created_channels]))
EventMachine.stop
}
add_test_timeout
}
end
......@@ -98,6 +102,7 @@ class TestMeasureMemory < Test::Unit::TestCase
EventMachine.stop
}
end
add_test_timeout(30)
}
end
......@@ -108,13 +113,20 @@ class TestMeasureMemory < Test::Unit::TestCase
def test_subscriber_system_size
headers = {'accept' => 'text/html'}
body = '1'
channel = 'ch_test_subscriber_system_size'
shared_size = @max_reserved_memory.to_i * 1024
expected_subscriber = shared_size / (@@subscriber_estimate_size + @@channel_estimate_size + 4)
#warming up
EventMachine.run {
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_i.to_s).get :head => headers, :body => body, :timeout => 30
sub.stream { |chunk|
EventMachine.stop
}
add_test_timeout
}
EventMachine.run {
memory_1 = `ps -eo vsz,cmd | grep -E 'ngin[xX] -c '`.split(' ')[0].to_i
subscriber_in_loop_with_limit(1000, headers, body, 1199) do
subscriber_in_loop_with_limit(channel, headers, body, 1000, 1199) do
memory_2 = `ps -eo vsz,cmd | grep -E 'ngin[xX] -c '`.split(' ')[0].to_i
per_subscriber = ((memory_2 - memory_1).to_f / 200) * 1000
......@@ -123,6 +135,7 @@ class TestMeasureMemory < Test::Unit::TestCase
EventMachine.stop
end
add_test_timeout
}
end
......@@ -138,14 +151,14 @@ class TestMeasureMemory < Test::Unit::TestCase
}
end
def subscriber_in_loop_with_limit(channel, headers, body, limit, &block)
def subscriber_in_loop_with_limit(channel, headers, body, start, limit, &block)
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_i.to_s).get :head => headers, :body => body, :timeout => 30
sub.stream { |chunk|
if channel == limit
if start == limit
block.call
EventMachine.stop
end
subscriber_in_loop_with_limit(channel.to_i + 1, headers, body, limit) do
subscriber_in_loop_with_limit(channel, headers, body, start + 1, limit) do
yield block
end
}
......
require File.expand_path('base_test_case', File.dirname(__FILE__))
class TestPaddingByUserAgent < Test::Unit::TestCase
include BaseTestCase
def global_configuration
@padding_by_user_agent = "[T|t]est 1,1024,512:[T|t]est 2,4097,0"
@user_agent = nil
@subscriber_connection_timeout = '1s'
@header_template = nil
@message_template = nil
@footer_template = nil
end
def config_test_header_padding
@header_template = "0123456789"
end
def test_header_padding
headers = {'accept' => 'application/json'}
channel = 'ch_test_header_padding'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(1100 + @header_template.size + 4, sub_1.response.size, "Didn't received headder with padding")
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 2"), :timeout => 30
sub_2.callback {
assert_equal(200, sub_2.response_header.status, "Channel was founded")
assert_equal(4097 + @header_template.size + 4, sub_2.response.size, "Didn't received headder with padding")
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 3"), :timeout => 30
sub_3.callback {
assert_equal(200, sub_3.response_header.status, "Channel was founded")
assert_equal(@header_template.size + 2, sub_3.response.size, "Didn't received headder with padding")
EventMachine.stop
}
}
}
add_test_timeout
}
end
def test_message_padding
headers = {'accept' => 'application/json'}
channel = 'ch_test_message_padding'
body = "0123456789"
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(500 + body.size + 4, sub_1.response.size, "Didn't received headder with padding")
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 2"), :timeout => 30
sub_2.callback {
assert_equal(200, sub_2.response_header.status, "Channel was founded")
assert_equal(body.size + 2, sub_2.response.size, "Didn't received headder with padding")
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 3"), :timeout => 30
sub_3.callback {
assert_equal(200, sub_3.response_header.status, "Channel was founded")
assert_equal(body.size + 2, sub_3.response.size, "Didn't received headder with padding")
EventMachine.stop
}
publish_message_inline(channel, headers, body)
}
publish_message_inline(channel, headers, body)
}
publish_message_inline(channel, headers, body)
add_test_timeout
}
end
def config_test_message_padding_with_different_sizes
@padding_by_user_agent = "[T|t]est 1,0,545"
end
def test_message_padding_with_different_sizes
headers = {'accept' => 'application/json'}
channel = 'ch_test_message_padding_with_different_sizes'
EventMachine.run {
i = 1
expected_padding = 545
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(expected_padding + i + 4, sub_1.response.size, "Didn't received headder with padding")
i = 105
expected_padding = 600 - ((i/100).to_i * 100)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(expected_padding + i + 4, sub_1.response.size, "Didn't received headder with padding")
i = 221
expected_padding = 600 - ((i/100).to_i * 100)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(expected_padding + i + 4, sub_1.response.size, "Didn't received headder with padding")
i = 331
expected_padding = 600 - ((i/100).to_i * 100)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(expected_padding + i + 4, sub_1.response.size, "Didn't received headder with padding")
i = 435
expected_padding = 600 - ((i/100).to_i * 100)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(expected_padding + i + 4, sub_1.response.size, "Didn't received headder with padding")
i = 502
expected_padding = 600 - ((i/100).to_i * 100)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(expected_padding + i + 4, sub_1.response.size, "Didn't received headder with padding")
i = 550
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1"), :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(i + 2, sub_1.response.size, "Didn't received headder with padding")
EventMachine.stop
}
publish_message_inline(channel, headers, "_" * i)
}
publish_message_inline(channel, headers, "_" * i)
}
publish_message_inline(channel, headers, "_" * i)
}
publish_message_inline(channel, headers, "_" * i)
}
publish_message_inline(channel, headers, "_" * i)
}
publish_message_inline(channel, headers, "_" * i)
}
publish_message_inline(channel, headers, "_" * i)
add_test_timeout(10)
}
end
def config_test_user_agent_by_complex_value
@padding_by_user_agent = "[T|t]est 1,1024,512"
@user_agent = "$arg_ua"
@header_template = "0123456789"
end
def test_user_agent_by_complex_value
headers = {'accept' => 'application/json'}
channel = 'ch_test_user_agent_by_complex_value'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?ua=test 1').get :head => headers, :timeout => 30
sub_1.callback {
assert_equal(200, sub_1.response_header.status, "Channel was founded")
assert_equal(1024 + @header_template.size + 4, sub_1.response.size, "Didn't received headder with padding")
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?ua=test 2').get :head => headers, :timeout => 30
sub_2.callback {
assert_equal(200, sub_2.response_header.status, "Channel was founded")
assert_equal(@header_template.size + 2, sub_2.response.size, "Didn't received headder with padding")
EventMachine.stop
}
}
add_test_timeout
}
end
end
......@@ -304,4 +304,31 @@ class TestSetuParameters < Test::Unit::TestCase
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message), "Message error not founded: '#{ expected_error_message }' recieved '#{ stderr_msg }'")
end
def test_padding_by_user_agent_parser
expected_error_message = "push stream module: padding pattern not match the value "
@padding_by_user_agent = "user_agent,as,df"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message + "user_agent,as,df"), "Message error not founded: '#{ expected_error_message + "user_agent,as,df" }' recieved '#{ stderr_msg }'")
@padding_by_user_agent = "user_agent;10;0"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message + "user_agent;10;0"), "Message error not founded: '#{ expected_error_message + "user_agent;10;0" }' recieved '#{ stderr_msg }'")
expected_error_message = "error applying padding pattern to "
@padding_by_user_agent = "user_agent,10,0:other_user_agent;20;0:another_user_agent,30,0"
self.create_config_file
stderr_msg = self.start_server
assert(stderr_msg.include?(expected_error_message + "other_user_agent;20;0:another_user_agent,30,0"), "Message error not founded: '#{ expected_error_message + "other_user_agent;20;0:another_user_agent,30,0" }' recieved '#{ stderr_msg }'")
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment