Commit 32d59def authored by Wandenberg's avatar Wandenberg

move some code to utils to be reused

parent ce06ce97
......@@ -163,6 +163,12 @@ struct ngx_http_push_stream_subscriber_s {
ngx_queue_t worker_queue;
};
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_str_t *id;
ngx_uint_t backtrack_messages;
} ngx_http_push_stream_requested_channel_t;
typedef struct {
ngx_event_t *disconnect_timer;
ngx_event_t *ping_timer;
......
......@@ -26,14 +26,7 @@
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_str_t *id;
ngx_uint_t backtrack_messages;
} ngx_http_push_stream_requested_channel_t;
static ngx_int_t ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r);
ngx_http_push_stream_requested_channel_t *ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool);
static ngx_int_t ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, ngx_int_t *status_code, ngx_str_t **explain_error_message);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_ */
......@@ -295,5 +295,6 @@ uint64_t ngx_http_push_stream_ntohll(uint64_t value);
static ngx_int_t ngx_http_push_stream_set_expires(ngx_http_request_t *r, ngx_http_push_stream_expires_t expires, time_t expires_time);
ngx_http_push_stream_requested_channel_t *ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_UTILS_H_ */
......@@ -311,58 +311,6 @@ ngx_http_push_stream_subscriber_assign_channel(ngx_slab_pool_t *shpool, ngx_http
}
ngx_http_push_stream_requested_channel_t *
ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool) {
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_str_t vv_channels_path = ngx_null_string;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_str_t aux;
int captures[15];
ngx_int_t n;
ngx_http_push_stream_complex_value(r, cf->channels_path, &vv_channels_path);
if (vv_channels_path.len == 0) {
return NULL;
}
if ((channels_ids = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channels_ids queue");
return NULL;
}
ngx_queue_init(&channels_ids->queue);
// doing the parser of given channel path
aux.data = vv_channels_path.data;
do {
aux.len = vv_channels_path.len - (aux.data - vv_channels_path.data);
if ((n = ngx_regex_exec(mcf->backtrack_parser_regex, &aux, captures, 15)) >= 0) {
if ((cur = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id item");
return NULL;
}
if ((cur->id = ngx_http_push_stream_create_str(pool, captures[0])) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id string");
return NULL;
}
ngx_memcpy(cur->id->data, aux.data, captures[0]);
cur->backtrack_messages = 0;
if (captures[7] > captures[6]) {
cur->backtrack_messages = ngx_atoi(aux.data + captures[6], captures[7] - captures[6]);
}
ngx_queue_insert_tail(&channels_ids->queue, &cur->queue);
aux.data = aux.data + captures[1];
}
} while ((n != NGX_REGEX_NO_MATCHED) && (aux.data < (vv_channels_path.data + vv_channels_path.len)));
return channels_ids;
}
static ngx_int_t
ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, ngx_int_t *status_code, ngx_str_t **explain_error_message)
{
......
......@@ -1879,3 +1879,55 @@ ngx_http_push_stream_set_expires(ngx_http_request_t *r, ngx_http_push_stream_exp
return NGX_OK;
}
ngx_http_push_stream_requested_channel_t *
ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool) {
ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_str_t vv_channels_path = ngx_null_string;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_str_t aux;
int captures[15];
ngx_int_t n;
ngx_http_push_stream_complex_value(r, cf->channels_path, &vv_channels_path);
if (vv_channels_path.len == 0) {
return NULL;
}
if ((channels_ids = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channels_ids queue");
return NULL;
}
ngx_queue_init(&channels_ids->queue);
// doing the parser of given channel path
aux.data = vv_channels_path.data;
do {
aux.len = vv_channels_path.len - (aux.data - vv_channels_path.data);
if ((n = ngx_regex_exec(mcf->backtrack_parser_regex, &aux, captures, 15)) >= 0) {
if ((cur = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_requested_channel_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id item");
return NULL;
}
if ((cur->id = ngx_http_push_stream_create_str(pool, captures[0])) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id string");
return NULL;
}
ngx_memcpy(cur->id->data, aux.data, captures[0]);
cur->backtrack_messages = 0;
if (captures[7] > captures[6]) {
cur->backtrack_messages = ngx_atoi(aux.data + captures[6], captures[7] - captures[6]);
}
ngx_queue_insert_tail(&channels_ids->queue, &cur->queue);
aux.data = aux.data + captures[1];
}
} while ((n != NGX_REGEX_NO_MATCHED) && (aux.data < (vv_channels_path.data + vv_channels_path.len)));
return channels_ids;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment