Commit a2dba6bc authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding functions ngx_http_push_stream_send_response_message and...

adding functions ngx_http_push_stream_send_response_message and ngx_http_push_stream_send_response_finalize to group the logics to send a message to subscriber and to finalize subscriber connection, respectively
parent b21eaab5
...@@ -208,7 +208,9 @@ static ngx_str_t * ngx_http_push_stream_get_formatted_chunk(const u_cha ...@@ -208,7 +208,9 @@ static ngx_str_t * ngx_http_push_stream_get_formatted_chunk(const u_cha
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer); static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer);
static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_conf_t *psmcf); static ngx_int_t ngx_http_push_stream_memory_cleanup(ngx_log_t *log, ngx_http_push_stream_main_conf_t *psmcf);
static ngx_int_t ngx_http_push_stream_buffer_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_buffer_cleanup(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf);
......
...@@ -298,8 +298,7 @@ ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect) ...@@ -298,8 +298,7 @@ ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect)
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) { if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) {
ngx_http_push_stream_worker_subscriber_cleanup_locked(cur); ngx_http_push_stream_worker_subscriber_cleanup_locked(cur);
ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1); ngx_http_push_stream_send_response_finalize(cur->request);
ngx_http_finalize_request(cur->request, NGX_HTTP_OK);
} else { } else {
break; break;
} }
...@@ -319,10 +318,7 @@ ngx_http_push_stream_send_worker_ping_message(void) ...@@ -319,10 +318,7 @@ ngx_http_push_stream_send_worker_ping_message(void)
if ((ngx_http_push_stream_ping_msg != NULL) && (!ngx_queue_empty(&sentinel->queue))) { if ((ngx_http_push_stream_ping_msg != NULL) && (!ngx_queue_empty(&sentinel->queue))) {
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
if (cur->request != NULL) { if (cur->request != NULL) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(cur->request, NULL, ngx_http_push_stream_ping_msg, cur->request->pool); ngx_http_push_stream_send_response_message(cur->request, NULL, ngx_http_push_stream_ping_msg);
if (str != NULL) {
ngx_http_push_stream_send_response_text(cur->request, str->data, str->len, 0);
}
} }
} }
} }
...@@ -452,10 +448,7 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan ...@@ -452,10 +448,7 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
// now let's respond to some requests! // now let's respond to some requests!
while ((cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) { while ((cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(cur->request, channel, msg, cur->request->pool); ngx_http_push_stream_send_response_message(cur->request, channel, msg);
if (str != NULL) {
ngx_http_push_stream_send_response_text(cur->request, str->data, str->len, 0);
}
} }
} }
......
...@@ -413,11 +413,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -413,11 +413,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
// positioning at first message, and send the others // positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) { while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (start == 0) { if (start == 0) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, message, r->pool); ngx_http_push_stream_send_response_message(r, channel, message);
if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
}
qtd--; qtd--;
} else { } else {
start--; start--;
...@@ -427,10 +423,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -427,10 +423,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
if ((backtrack == 0) && (if_modified_since != 0)) { if ((backtrack == 0) && (if_modified_since != 0)) {
while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) { while ((!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (message->time > if_modified_since) { if (message->time > if_modified_since) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, message, r->pool); ngx_http_push_stream_send_response_message(r, channel, message);
if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
}
} }
} }
} }
......
...@@ -93,10 +93,7 @@ ngx_http_push_stream_delete_worker_channel(void) ...@@ -93,10 +93,7 @@ ngx_http_push_stream_delete_worker_channel(void)
// remove the reference from channel for subscriber // remove the reference from channel for subscriber
ngx_queue_remove(&cur->queue); ngx_queue_remove(&cur->queue);
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(cur->request, channel, channel->channel_deleted_message, cur->request->pool); ngx_http_push_stream_send_response_message(cur->request, channel, channel->channel_deleted_message);
if (str != NULL) {
ngx_http_push_stream_send_response_text(cur->request, str->data, str->len, 0);
}
break; break;
} }
...@@ -105,8 +102,7 @@ ngx_http_push_stream_delete_worker_channel(void) ...@@ -105,8 +102,7 @@ ngx_http_push_stream_delete_worker_channel(void)
// subscriber does not have any other subscription, the connection may be closed // subscriber does not have any other subscription, the connection may be closed
if (ngx_queue_empty(&worker_subscriber->subscriptions_sentinel.queue)) { if (ngx_queue_empty(&worker_subscriber->subscriptions_sentinel.queue)) {
ngx_http_push_stream_worker_subscriber_cleanup_locked(worker_subscriber); ngx_http_push_stream_worker_subscriber_cleanup_locked(worker_subscriber);
ngx_http_push_stream_send_response_text(worker_subscriber->request, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1); ngx_http_push_stream_send_response_finalize(worker_subscriber->request);
ngx_http_finalize_request(worker_subscriber->request, NGX_HTTP_OK);
} }
break; break;
...@@ -245,6 +241,15 @@ ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_htt ...@@ -245,6 +241,15 @@ ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_htt
return rc; return rc;
} }
static void
ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg)
{
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, msg, r->pool);
if (str != NULL) {
ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
}
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer) ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer)
{ {
...@@ -275,6 +280,13 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex ...@@ -275,6 +280,13 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex
return ngx_http_output_filter(r, out); return ngx_http_output_filter(r, out);
} }
static void
ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r)
{
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
ngx_http_finalize_request(r, NGX_HTTP_OK);
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf) ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment