Commit 30a770a9 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

simplifying subscriber cleanup code

parent a59d8aac
...@@ -102,7 +102,6 @@ typedef struct { ...@@ -102,7 +102,6 @@ typedef struct {
ngx_int_t workers_ref_count; ngx_int_t workers_ref_count;
} ngx_http_push_stream_msg_t; } ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t;
typedef struct ngx_http_push_stream_subscriber_s ngx_http_push_stream_subscriber_t; typedef struct ngx_http_push_stream_subscriber_s ngx_http_push_stream_subscriber_t;
typedef struct { typedef struct {
...@@ -148,7 +147,6 @@ typedef struct { ...@@ -148,7 +147,6 @@ typedef struct {
struct ngx_http_push_stream_subscriber_s { struct ngx_http_push_stream_subscriber_s {
ngx_http_request_t *request; ngx_http_request_t *request;
ngx_http_push_stream_subscription_t subscriptions_sentinel; ngx_http_push_stream_subscription_t subscriptions_sentinel;
ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_pid_t worker_subscribed_pid; ngx_pid_t worker_subscribed_pid;
ngx_flag_t longpolling; ngx_flag_t longpolling;
ngx_http_push_stream_queue_elem_t *worker_subscriber_element_ref; ngx_http_push_stream_queue_elem_t *worker_subscriber_element_ref;
...@@ -159,13 +157,9 @@ typedef struct { ...@@ -159,13 +157,9 @@ typedef struct {
ngx_event_t *ping_timer; ngx_event_t *ping_timer;
ngx_http_push_stream_subscriber_t *subscriber; ngx_http_push_stream_subscriber_t *subscriber;
ngx_flag_t longpolling; ngx_flag_t longpolling;
ngx_pool_t *temp_pool;
} ngx_http_push_stream_subscriber_ctx_t; } ngx_http_push_stream_subscriber_ctx_t;
// cleaning supplies
struct ngx_http_push_stream_subscriber_cleanup_s {
ngx_http_push_stream_subscriber_t *worker_subscriber;
};
// messages to worker processes // messages to worker processes
typedef struct { typedef struct {
ngx_queue_t queue; ngx_queue_t queue;
......
...@@ -36,6 +36,4 @@ static ngx_int_t ngx_http_push_stream_subscriber_handler(ngx_http_request_t * ...@@ -36,6 +36,4 @@ static ngx_int_t ngx_http_push_stream_subscriber_handler(ngx_http_request_t *
ngx_http_push_stream_requested_channel_t *ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool); ngx_http_push_stream_requested_channel_t *ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_pool_t *pool);
static ngx_int_t ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, ngx_int_t *status_code, ngx_str_t **explain_error_message); static ngx_int_t ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *channels_ids, ngx_int_t *status_code, ngx_str_t **explain_error_message);
static void ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_SUBSCRIBER_H_ */
...@@ -264,6 +264,8 @@ static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_c ...@@ -264,6 +264,8 @@ static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_c
static ngx_http_push_stream_line_t * ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool); static ngx_http_push_stream_line_t * ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_join_with_crlf(ngx_http_push_stream_line_t *lines, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_join_with_crlf(ngx_http_push_stream_line_t *lines, ngx_pool_t *temp_pool);
static ngx_http_push_stream_subscriber_ctx_t * ngx_http_push_stream_add_request_context(ngx_http_request_t *r);
static ngx_str_t * ngx_http_push_stream_get_formatted_current_time(ngx_pool_t *pool); static ngx_str_t * ngx_http_push_stream_get_formatted_current_time(ngx_pool_t *pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool); static ngx_str_t * ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool);
......
...@@ -42,7 +42,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -42,7 +42,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur; ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_pool_t *temp_pool; ngx_http_push_stream_subscriber_ctx_t *ctx;
time_t if_modified_since; time_t if_modified_since;
ngx_str_t *last_event_id; ngx_str_t *last_event_id;
ngx_str_t *push_mode; ngx_str_t *push_mode;
...@@ -57,23 +57,20 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -57,23 +57,20 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_NOT_ALLOWED, NULL);
} }
//create a temporary pool to allocate temporary elements if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, r->connection->log)) == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
//get channels ids and backtracks from path //get channels ids and backtracks from path
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, temp_pool); channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, ctx->temp_pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) { if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the $push_stream_channels_path variable is required but is not set"); ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the $push_stream_channels_path variable is required but is not set");
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
} }
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers //validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
if (ngx_http_push_stream_validate_channels(r, channels_ids, &status_code, &explain_error_message) == NGX_ERROR) { if (ngx_http_push_stream_validate_channels(r, channels_ids, &status_code, &explain_error_message) == NGX_ERROR) {
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message); return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message);
} }
...@@ -86,14 +83,16 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -86,14 +83,16 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
longpolling = ((cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING) || ((push_mode != NULL) && (push_mode->len == NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) && (ngx_strncasecmp(push_mode->data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) == 0))); longpolling = ((cf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_LONGPOLLING) || ((push_mode != NULL) && (push_mode->len == NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) && (ngx_strncasecmp(push_mode->data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.data, NGX_HTTP_PUSH_STREAM_MODE_LONGPOLLING.len) == 0)));
if (polling || longpolling) { if (polling || longpolling) {
ngx_int_t result = ngx_http_push_stream_subscriber_polling_handler(r, channels_ids, if_modified_since, last_event_id, longpolling, temp_pool); ngx_int_t result = ngx_http_push_stream_subscriber_polling_handler(r, channels_ids, if_modified_since, last_event_id, longpolling, ctx->temp_pool);
ngx_destroy_pool(temp_pool); if (ctx->temp_pool != NULL) {
ngx_destroy_pool(ctx->temp_pool);
ctx->temp_pool = NULL;
}
return result; return result;
} }
// stream access // stream access
if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) { if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -103,7 +102,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -103,7 +102,6 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
// sending response content header // sending response content header
if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) { if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber");
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -112,20 +110,21 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r) ...@@ -112,20 +110,21 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
if (rc == NGX_ERROR) { if (rc == NGX_ERROR) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
// adding subscriber to channel(s) and send backtrack messages // adding subscriber to channel(s) and send backtrack messages
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, last_event_id, worker_subscriber, temp_pool) != NGX_OK) { if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, if_modified_since, last_event_id, worker_subscriber, ctx->temp_pool) != NGX_OK) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
} }
ngx_destroy_pool(temp_pool); if (ctx->temp_pool != NULL) {
ngx_destroy_pool(ctx->temp_pool);
ctx->temp_pool = NULL;
}
return NGX_DONE; return NGX_DONE;
} }
...@@ -226,7 +225,6 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -226,7 +225,6 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
r->headers_out.content_length_n = -1; r->headers_out.content_length_n = -1;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED); ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
ngx_http_send_header(r); ngx_http_send_header(r);
// sending response content header // sending response content header
...@@ -447,32 +445,14 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre ...@@ -447,32 +445,14 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
return NGX_OK; return NGX_OK;
} }
static void
ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (data->worker_subscriber != NULL) {
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_worker_subscriber_cleanup_locked(data->worker_subscriber);
ngx_shmtx_unlock(&shpool->mutex);
}
}
static ngx_http_push_stream_subscriber_t * static ngx_http_push_stream_subscriber_t *
ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r) ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r)
{ {
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_pool_cleanup_t *cln; ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_http_push_stream_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
// attach a cleaner to remove the request from the channel
if ((cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_http_push_stream_subscriber_cleanup_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for cleanup");
return NULL;
}
if ((worker_subscriber = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_subscriber_t))) == NULL) { if ((worker_subscriber = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_subscriber_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber");
return NULL; return NULL;
...@@ -482,12 +462,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque ...@@ -482,12 +462,7 @@ ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_reque
worker_subscriber->request = r; worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid; worker_subscriber->worker_subscribed_pid = ngx_pid;
ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue); ngx_queue_init(&worker_subscriber->subscriptions_sentinel.queue);
ctx->subscriber = worker_subscriber;
// set a cleaner to subscriber
cln->handler = (ngx_pool_cleanup_pt) ngx_http_push_stream_subscriber_cleanup;
clndata = (ngx_http_push_stream_subscriber_cleanup_t *) cln->data;
clndata->worker_subscriber = worker_subscriber;
clndata->worker_subscriber->clndata = clndata;
// increment request reference count to keep connection open // increment request reference count to keep connection open
r->main->count++; r->main->count++;
...@@ -511,7 +486,7 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_ ...@@ -511,7 +486,7 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl; ngx_msec_t connection_ttl = worker_subscriber->longpolling ? cf->longpolling_connection_ttl : cf->subscriber_connection_ttl;
ngx_http_push_stream_queue_elem_t *element_subscriber; ngx_http_push_stream_queue_elem_t *element_subscriber;
ngx_http_push_stream_subscriber_ctx_t *ctx; ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
if ((element_subscriber = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_queue_elem_t))) == NULL) { if ((element_subscriber = ngx_palloc(r->pool, sizeof(ngx_http_push_stream_queue_elem_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscriber reference"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscriber reference");
...@@ -520,12 +495,9 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_ ...@@ -520,12 +495,9 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
element_subscriber->value = worker_subscriber; element_subscriber->value = worker_subscriber;
worker_subscriber->worker_subscriber_element_ref = element_subscriber; worker_subscriber->worker_subscriber_element_ref = element_subscriber;
// adding subscriber to woker list of subscribers // adding subscriber to worker list of subscribers
ngx_queue_insert_tail(&thisworker_data->subscribers_sentinel->queue, &element_subscriber->queue); ngx_queue_insert_tail(&thisworker_data->subscribers_sentinel->queue, &element_subscriber->queue);
if ((ctx = ngx_pcalloc(worker_subscriber->request->pool, sizeof(ngx_http_push_stream_subscriber_ctx_t))) == NULL) {
return NGX_ERROR;
}
ctx->longpolling = worker_subscriber->longpolling; ctx->longpolling = worker_subscriber->longpolling;
ctx->subscriber = worker_subscriber; ctx->subscriber = worker_subscriber;
...@@ -558,8 +530,6 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_ ...@@ -558,8 +530,6 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
} }
} }
ngx_http_set_ctx(worker_subscriber->request, ctx, ngx_http_push_stream_module);
// increment global subscribers count // increment global subscribers count
data->subscribers++; data->subscribers++;
thisworker_data->subscribers++; thisworker_data->subscribers++;
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
static void nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel); static void nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel);
static void ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler); static void ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler);
static void ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r);
static ngx_inline void static ngx_inline void
ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired) ngx_http_push_stream_ensure_qtd_of_messages_locked(ngx_http_push_stream_channel_t *channel, ngx_uint_t max_messages, ngx_flag_t expired)
...@@ -508,7 +509,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r) ...@@ -508,7 +509,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
{ {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_subscriber_cleanup); ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context);
if (pslcf->footer_template.len > 0) { if (pslcf->footer_template.len > 0) {
ngx_http_push_stream_send_response_text(r, pslcf->footer_template.data, pslcf->footer_template.len, 0); ngx_http_push_stream_send_response_text(r, pslcf->footer_template.data, pslcf->footer_template.len, 0);
...@@ -525,7 +526,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r) ...@@ -525,7 +526,7 @@ ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
static void static void
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r) ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r)
{ {
ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_subscriber_cleanup); ngx_http_push_stream_run_cleanup_pool_handler(r->pool, (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context);
ngx_http_push_stream_add_polling_headers(r, ngx_time(), 0, r->pool); ngx_http_push_stream_add_polling_headers(r, ngx_time(), 0, r->pool);
r->headers_out.status = NGX_HTTP_NOT_MODIFIED; r->headers_out.status = NGX_HTTP_NOT_MODIFIED;
...@@ -953,12 +954,44 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx ...@@ -953,12 +954,44 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
} }
static ngx_http_push_stream_subscriber_ctx_t *
ngx_http_push_stream_add_request_context(ngx_http_request_t *r)
{
ngx_pool_cleanup_t *cln;
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
if (ctx != NULL) {
return ctx;
}
if ((ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_push_stream_subscriber_ctx_t))) == NULL) {
return NULL;
}
if ((cln = ngx_pool_cleanup_add(r->pool, 0)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for cleanup");
return NULL;
}
if ((ctx->temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, ngx_cycle->log)) == NULL) {
return NULL;
}
// set a cleaner to request
cln->handler = (ngx_pool_cleanup_pt) ngx_http_push_stream_cleanup_request_context;
cln->data = r;
ngx_http_set_ctx(r, ctx, ngx_http_push_stream_module);
return ctx;
}
static void static void
ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber) ngx_http_push_stream_cleanup_request_context(ngx_http_request_t *r)
{ {
ngx_http_push_stream_subscription_t *cur, *sentinel; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(worker_subscriber->request, ngx_http_push_stream_module);
if (ctx != NULL) { if (ctx != NULL) {
if ((ctx->disconnect_timer != NULL) && ctx->disconnect_timer->timer_set) { if ((ctx->disconnect_timer != NULL) && ctx->disconnect_timer->timer_set) {
...@@ -968,7 +1001,27 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc ...@@ -968,7 +1001,27 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc
if ((ctx->ping_timer != NULL) && ctx->ping_timer->timer_set) { if ((ctx->ping_timer != NULL) && ctx->ping_timer->timer_set) {
ngx_del_timer(ctx->ping_timer); ngx_del_timer(ctx->ping_timer);
} }
if (ctx->temp_pool != NULL) {
ngx_destroy_pool(ctx->temp_pool);
ctx->temp_pool = NULL;
}
if (ctx->subscriber != NULL) {
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_worker_subscriber_cleanup_locked(ctx->subscriber);
ctx->subscriber = NULL;
ngx_shmtx_unlock(&shpool->mutex);
} }
}
}
static void
ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_subscription_t *cur, *sentinel;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
sentinel = &worker_subscriber->subscriptions_sentinel; sentinel = &worker_subscriber->subscriptions_sentinel;
...@@ -982,7 +1035,6 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc ...@@ -982,7 +1035,6 @@ ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_subsc
ngx_queue_remove(&worker_subscriber->worker_subscriber_element_ref->queue); ngx_queue_remove(&worker_subscriber->worker_subscriber_element_ref->queue);
ngx_queue_init(&worker_subscriber->worker_subscriber_element_ref->queue); ngx_queue_init(&worker_subscriber->worker_subscriber_element_ref->queue);
} }
worker_subscriber->clndata->worker_subscriber = NULL;
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER(data->subscribers);
NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER((data->ipc + ngx_process_slot)->subscribers);
} }
......
...@@ -39,7 +39,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -39,7 +39,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_t *worker_subscriber; ngx_http_push_stream_subscriber_t *worker_subscriber;
ngx_http_push_stream_requested_channel_t *channels_ids, *cur; ngx_http_push_stream_requested_channel_t *channels_ids, *cur;
ngx_pool_t *temp_pool; ngx_http_push_stream_subscriber_ctx_t *ctx;
ngx_int_t rc; ngx_int_t rc;
ngx_int_t status_code; ngx_int_t status_code;
ngx_str_t *explain_error_message; ngx_str_t *explain_error_message;
...@@ -69,35 +69,30 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -69,35 +69,30 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_WRONG_WEBSOCKET_VERSION_MESSAGE); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_WRONG_WEBSOCKET_VERSION_MESSAGE);
} }
//create a temporary pool to allocate temporary elements if ((ctx = ngx_http_push_stream_add_request_context(r)) == NULL) {
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, r->connection->log)) == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to create request context");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
//get channels ids and backtracks from path //get channels ids and backtracks from path
channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, temp_pool); channels_ids = ngx_http_push_stream_parse_channels_ids_from_path(r, ctx->temp_pool);
if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) { if ((channels_ids == NULL) || ngx_queue_empty(&channels_ids->queue)) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the $push_stream_channels_path variable is required but is not set"); ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the $push_stream_channels_path variable is required but is not set");
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE); return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE);
} }
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers //validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
if (ngx_http_push_stream_validate_channels(r, channels_ids, &status_code, &explain_error_message) == NGX_ERROR) { if (ngx_http_push_stream_validate_channels(r, channels_ids, &status_code, &explain_error_message) == NGX_ERROR) {
ngx_destroy_pool(temp_pool);
return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message); return ngx_http_push_stream_send_only_header_response(r, status_code, explain_error_message);
} }
// stream access // stream access
if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) { if ((worker_subscriber = ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(r)) == NULL) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if ((sec_accept_header = ngx_http_push_stream_generate_websocket_accept_value(r, sec_key_header, temp_pool)) == NULL) { if ((sec_accept_header = ngx_http_push_stream_generate_websocket_accept_value(r, sec_key_header, ctx->temp_pool)) == NULL) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: could not generate security accept heade value"); ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: could not generate security accept heade value");
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -112,7 +107,6 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -112,7 +107,6 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
// sending response content header // sending response content header
if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) { if (ngx_http_push_stream_send_response_content_header(r, cf) == NGX_ERROR) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: could not send content header to subscriber");
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
...@@ -121,20 +115,21 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r) ...@@ -121,20 +115,21 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
if (rc == NGX_ERROR) { if (rc == NGX_ERROR) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
// adding subscriber to channel(s) and send backtrack messages // adding subscriber to channel(s) and send backtrack messages
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, -1, NULL, worker_subscriber, temp_pool) != NGX_OK) { if (ngx_http_push_stream_subscriber_assign_channel(shpool, cf, r, cur, -1, NULL, worker_subscriber, ctx->temp_pool) != NGX_OK) {
ngx_destroy_pool(temp_pool);
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
} }
ngx_destroy_pool(temp_pool); if (ctx->temp_pool != NULL) {
ngx_destroy_pool(ctx->temp_pool);
ctx->temp_pool = NULL;
}
return NGX_DONE; return NGX_DONE;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment