Commit 45ba8f5c authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

changing channels ids parser for a smarter version, using regular expression

parent b4ad6863
...@@ -76,6 +76,7 @@ typedef struct { ...@@ -76,6 +76,7 @@ typedef struct {
ngx_uint_t max_messages_stored_per_channel; ngx_uint_t max_messages_stored_per_channel;
ngx_uint_t max_channel_id_length; ngx_uint_t max_channel_id_length;
ngx_http_push_stream_template_queue_t msg_templates; ngx_http_push_stream_template_queue_t msg_templates;
ngx_regex_t *backtrack_parser_regex;
} ngx_http_push_stream_main_conf_t; } ngx_http_push_stream_main_conf_t;
typedef struct { typedef struct {
...@@ -251,8 +252,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED = ngx_string("Channe ...@@ -251,8 +252,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED = ngx_string("Channe
#define NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED (void *) -3 #define NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED (void *) -3
static ngx_str_t NGX_HTTP_PUSH_STREAM_EMPTY = ngx_string(""); static ngx_str_t NGX_HTTP_PUSH_STREAM_EMPTY = ngx_string("");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_BACKTRACK_SEP = ngx_string(".b"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_BACKTRACK_PATTERN = ngx_string("((\\.b([0-9]+))?(/|$))");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_SLASH = ngx_string("/");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK = ngx_string("callback"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK = ngx_string("callback");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_DATE_FORMAT_ISO_8601 = ngx_string("%4d-%02d-%02dT%02d:%02d:%02d"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_DATE_FORMAT_ISO_8601 = ngx_string("%4d-%02d-%02dT%02d:%02d:%02d");
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ #ifndef NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.2"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.3");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("04f0721170654f29ae8d4f0bfb4e718cab61cf1c"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("b4ad686386ea8c0738071f525207077db4bdb1de");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
...@@ -487,6 +487,26 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent) ...@@ -487,6 +487,26 @@ ngx_http_push_stream_init_main_conf(ngx_conf_t *cf, void *parent)
conf->buffer_cleanup_interval = 1000; // 1 second conf->buffer_cleanup_interval = 1000; // 1 second
} }
ngx_regex_compile_t *backtrack_parser = NULL;
u_char errstr[NGX_MAX_CONF_ERRSTR];
if ((backtrack_parser = ngx_pcalloc(cf->pool, sizeof(ngx_regex_compile_t))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to compile backtrack parser");
return NGX_CONF_ERROR;
}
backtrack_parser->pattern = NGX_HTTP_PUSH_STREAM_BACKTRACK_PATTERN;
backtrack_parser->pool = cf->pool;
backtrack_parser->err.len = NGX_MAX_CONF_ERRSTR;
backtrack_parser->err.data = errstr;
if (ngx_regex_compile(backtrack_parser) != NGX_OK) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to compile backtrack parser pattern %V", &NGX_HTTP_PUSH_STREAM_BACKTRACK_PATTERN);
return NGX_CONF_ERROR;
}
conf->backtrack_parser_regex = backtrack_parser->regex;
return NGX_CONF_OK; return NGX_CONF_OK;
} }
......
...@@ -309,91 +309,58 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http ...@@ -309,91 +309,58 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
return result; return result;
} }
ngx_http_push_stream_requested_channel_t * ngx_http_push_stream_requested_channel_t *
ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool) { ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool) {
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_variable_value_t *vv_channels_path = ngx_http_get_indexed_variable(r, cf->index_channels_path); ngx_http_variable_value_t *vv_channels_path = ngx_http_get_indexed_variable(r, cf->index_channels_path);
ngx_http_push_stream_requested_channel_t *channels_ids, *cur; ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
u_char *channel_pos, *slash_pos, *backtrack_pos; ngx_str_t aux;
ngx_uint_t len, backtrack_messages; int captures[15];
ngx_str_t *channels_path; ngx_int_t n;
if (vv_channels_path == NULL || vv_channels_path->not_found || vv_channels_path->len == 0) { if (vv_channels_path == NULL || vv_channels_path->not_found || vv_channels_path->len == 0) {
return NULL; return NULL;
} }
if ((channels_path = ngx_http_push_stream_create_str(pool, vv_channels_path->len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channels_path string");
return NULL;
}
if ((channels_ids = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) { if ((channels_ids = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channels_ids queue"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channels_ids queue");
return NULL; return NULL;
} }
ngx_memcpy(channels_path->data, vv_channels_path->data, vv_channels_path->len);
ngx_queue_init(&channels_ids->queue); ngx_queue_init(&channels_ids->queue);
channel_pos = channels_path->data;
// doing the parser of given channel path // doing the parser of given channel path
while (channel_pos != NULL) { aux.data = vv_channels_path->data;
backtrack_messages = 0; do {
len = 0; aux.len = vv_channels_path->len - (aux.data - vv_channels_path->data);
if ((n = ngx_regex_exec(mcf->backtrack_parser_regex, &aux, captures, 15)) >= 0) {
backtrack_pos = (u_char *) ngx_strstr(channel_pos, NGX_HTTP_PUSH_STREAM_BACKTRACK_SEP.data);
slash_pos = (u_char *) ngx_strstr(channel_pos, NGX_HTTP_PUSH_STREAM_SLASH.data);
if ((backtrack_pos != NULL) && (slash_pos != NULL)) {
if (slash_pos > backtrack_pos) {
len = backtrack_pos - channel_pos;
backtrack_pos = backtrack_pos + NGX_HTTP_PUSH_STREAM_BACKTRACK_SEP.len;
if (slash_pos > backtrack_pos) {
backtrack_messages = ngx_atoi(backtrack_pos, slash_pos - backtrack_pos);
}
} else {
len = slash_pos - channel_pos;
}
} else if (backtrack_pos != NULL) {
len = backtrack_pos - channel_pos;
backtrack_pos = backtrack_pos + NGX_HTTP_PUSH_STREAM_BACKTRACK_SEP.len;
if ((channels_path->data + channels_path->len) > backtrack_pos) {
backtrack_messages = ngx_atoi(backtrack_pos, (channels_path->data + channels_path->len) - backtrack_pos);
}
} else if (slash_pos != NULL) {
len = slash_pos - channel_pos;
} else {
len = channels_path->data + channels_path->len - channel_pos;
}
if (len > 0) {
if ((cur = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) { if ((cur = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id item"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id item");
return NULL; return NULL;
} }
if ((cur->id = ngx_http_push_stream_create_str(pool, len)) == NULL) { if ((cur->id = ngx_http_push_stream_create_str(pool, captures[0])) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id string"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id string");
return NULL; return NULL;
} }
ngx_memcpy(cur->id->data, channel_pos, len); ngx_memcpy(cur->id->data, aux.data, captures[0]);
cur->backtrack_messages = (backtrack_messages > 0) ? backtrack_messages : 0; cur->backtrack_messages = 0;
if (captures[7] > captures[6]) {
cur->backtrack_messages = ngx_atoi(aux.data + captures[6], captures[7] - captures[6]);
}
ngx_queue_insert_tail(&channels_ids->queue, &cur->queue); ngx_queue_insert_tail(&channels_ids->queue, &cur->queue);
}
channel_pos = NULL; aux.data = aux.data + captures[1];
if (slash_pos != NULL) {
channel_pos = slash_pos + NGX_HTTP_PUSH_STREAM_SLASH.len;
} }
} } while ((n != NGX_REGEX_NO_MATCHED) && (aux.data < (vv_channels_path->data + vv_channels_path->len)));
return channels_ids; return channels_ids;
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, ngx_int_t *status_code, ngx_str_t **explain_error_message) ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, ngx_int_t *status_code, ngx_str_t **explain_error_message)
{ {
......
...@@ -673,6 +673,7 @@ class TestSubscriber < Test::Unit::TestCase ...@@ -673,6 +673,7 @@ class TestSubscriber < Test::Unit::TestCase
EventMachine.stop EventMachine.stop
} }
} }
add_test_timeout
} }
end end
...@@ -922,4 +923,47 @@ class TestSubscriber < Test::Unit::TestCase ...@@ -922,4 +923,47 @@ class TestSubscriber < Test::Unit::TestCase
add_test_timeout add_test_timeout
} }
end end
def config_test_accept_channels_name_with_dot_b
@subscriber_connection_timeout = "1s"
@ping_message_interval = nil
@header_template = nil
@footer_template = nil
@message_template = nil
end
def test_accept_channels_name_with_dot_b
channel = 'room.b18.beautiful'
response = ''
EventMachine.run {
publish_message_inline(channel, {'accept' => 'text/html'}, 'msg 1')
publish_message_inline(channel, {'accept' => 'text/html'}, 'msg 2')
publish_message_inline(channel, {'accept' => 'text/html'}, 'msg 3')
publish_message_inline(channel, {'accept' => 'text/html'}, 'msg 4')
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b3').get
sub.stream { | chunk |
response += chunk
}
sub.callback {
assert_equal("msg 2\r\nmsg 3\r\nmsg 4\r\n", response, "The published message was not received correctly")
response = ''
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub_1.stream { | chunk |
response += chunk
}
sub_1.callback { |chunk|
assert_equal("msg 5\r\n", response, "The published message was not received correctly")
EventMachine.stop
}
publish_message_inline(channel, {'accept' => 'text/html'}, 'msg 5')
}
add_test_timeout
}
end
end end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment