Commit 81204dbd authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

refactors to use function ngx_http_push_stream_send_response_text

parent 18717aaf
...@@ -225,6 +225,7 @@ static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_ ...@@ -225,6 +225,7 @@ static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code);
static ngx_int_t ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg); static ngx_int_t ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer); static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer);
static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r); static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r);
......
...@@ -59,10 +59,10 @@ ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_ ...@@ -59,10 +59,10 @@ ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_
} }
static ngx_buf_t * static ngx_str_t *
ngx_http_push_stream_channel_info_formatted(ngx_pool_t *pool, const ngx_str_t *format, ngx_str_t *id, ngx_uint_t published_messages, ngx_uint_t stored_messages, ngx_uint_t subscribers) ngx_http_push_stream_channel_info_formatted(ngx_pool_t *pool, const ngx_str_t *format, ngx_str_t *id, ngx_uint_t published_messages, ngx_uint_t stored_messages, ngx_uint_t subscribers)
{ {
ngx_buf_t *b; ngx_str_t *text;
ngx_uint_t len; ngx_uint_t len;
if ((format == NULL) || (id == NULL)) { if ((format == NULL) || (id == NULL)) {
...@@ -71,77 +71,40 @@ ngx_http_push_stream_channel_info_formatted(ngx_pool_t *pool, const ngx_str_t *f ...@@ -71,77 +71,40 @@ ngx_http_push_stream_channel_info_formatted(ngx_pool_t *pool, const ngx_str_t *f
len = 3*NGX_INT_T_LEN + format->len + id->len - 11;// minus 11 sprintf len = 3*NGX_INT_T_LEN + format->len + id->len - 11;// minus 11 sprintf
if ((b = ngx_create_temp_buf(pool, len)) == NULL) { if ((text = ngx_http_push_stream_create_str(pool, len)) == NULL) {
return NULL; return NULL;
} }
ngx_memset(b->start, '\0', len); ngx_sprintf(text->data, (char *) format->data, id->data, published_messages, stored_messages, subscribers);
b->last = ngx_sprintf(b->start, (char *) format->data, id->data, published_messages, stored_messages, subscribers); text->len = ngx_strlen(text->data);
b->memory = 1;
return b; return text;
} }
static ngx_int_t
ngx_http_push_stream_send_buf_response(ngx_http_request_t *r, ngx_buf_t *buf, const ngx_str_t *content_type, ngx_int_t status_code)
{
ngx_chain_t *chain;
ngx_int_t rc;
if ((r == NULL) || (buf == NULL) || (content_type == NULL)) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
r->headers_out.content_type.len = content_type->len;
r->headers_out.content_type.data = content_type->data;
r->headers_out.content_length_n = ngx_buf_size(buf);
if ((chain = ngx_pcalloc(r->pool, sizeof(ngx_chain_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for send buf response");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
chain->buf = buf;
chain->next = NULL;
buf->memory = 1;
buf->last_buf = 1;
r->headers_out.status = status_code;
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->header_only) {
return rc;
}
rc = ngx_http_output_filter(r, chain);
return rc;
}
// print information about a channel // print information about a channel
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel) ngx_http_push_stream_send_response_channel_info(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel)
{ {
ngx_buf_t *b; ngx_str_t *text;
ngx_http_push_stream_content_subtype_t *subtype; ngx_http_push_stream_content_subtype_t *subtype;
subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1); subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1);
b = ngx_http_push_stream_channel_info_formatted(r->pool, subtype->format_item, &channel->id, channel->last_message_id, channel->stored_messages, channel->subscribers); text = ngx_http_push_stream_channel_info_formatted(r->pool, subtype->format_item, &channel->id, channel->last_message_id, channel->stored_messages, channel->subscribers);
if (b == NULL) { if (text == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
return ngx_http_push_stream_send_buf_response(r, b, subtype->content_type, NGX_HTTP_OK); return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK);
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r) { ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request_t *r)
{
ngx_buf_t *b;
ngx_uint_t len; ngx_uint_t len;
ngx_str_t *currenttime, *hostname, *format; ngx_str_t *currenttime, *hostname, *format, *text;
u_char *subscribers_by_workers, *start; u_char *subscribers_by_workers, *start;
int i, j, used_slots; int i, j, used_slots;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
...@@ -178,15 +141,15 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request ...@@ -178,15 +141,15 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
len = 4*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf len = 4*NGX_INT_T_LEN + subtype->format_summarized->len + hostname->len + currenttime->len + ngx_strlen(subscribers_by_workers) - 21;// minus 21 sprintf
if ((b = ngx_create_temp_buf(r->pool, len)) == NULL) { if ((text = ngx_http_push_stream_create_str(r->pool, len)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer."); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "Failed to allocate response buffer.");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
ngx_memset(b->start, '\0', len); ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, data->published_messages, data->subscribers, ngx_time() - data->startup, subscribers_by_workers);
b->last = ngx_sprintf(b->start, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, data->published_messages, data->subscribers, ngx_time() - data->startup, subscribers_by_workers); text->len = ngx_strlen(text->data);
return ngx_http_push_stream_send_buf_response(r, b, subtype->content_type, NGX_HTTP_OK); return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK);
} }
static void static void
...@@ -228,8 +191,7 @@ static ngx_int_t ...@@ -228,8 +191,7 @@ static ngx_int_t
ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix) { ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t *r, ngx_str_t *prefix) {
ngx_int_t rc, content_len = 0; ngx_int_t rc, content_len = 0;
ngx_chain_t *chain, *first = NULL, *last = NULL; ngx_chain_t *chain, *first = NULL, *last = NULL;
ngx_str_t *currenttime, *hostname; ngx_str_t *currenttime, *hostname, *text, *header_response;
ngx_str_t header_response;
ngx_queue_t queue_channel_info; ngx_queue_t queue_channel_info;
ngx_queue_t *cur, *next; ngx_queue_t *cur, *next;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data; ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
...@@ -257,13 +219,23 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -257,13 +219,23 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
} }
format = (next != &queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item; format = (next != &queue_channel_info) ? subtype->format_group_item : subtype->format_group_last_item;
if ((chain->buf = ngx_http_push_stream_channel_info_formatted(r->pool, format, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers)) == NULL) { if ((text = ngx_http_push_stream_channel_info_formatted(r->pool, format, &channel_info->id, channel_info->published_messages, channel_info->stored_messages, channel_info->subscribers)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to format channel info"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to format channel info");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if ((chain->buf = ngx_calloc_buf(r->pool)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory to wrap channel info");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
chain->buf->last_buf = 0; chain->buf->last_buf = 0;
chain->buf->memory = 1;
chain->buf->pos = text->data;
chain->buf->last = text->data + text->len;
chain->buf->start = chain->buf->pos;
chain->buf->end = chain->buf->last;
content_len += ngx_buf_size(chain->buf); content_len += text->len;
if (first == NULL) { if (first == NULL) {
first = chain; first = chain;
...@@ -284,16 +256,15 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -284,16 +256,15 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
hostname = ngx_http_push_stream_get_formatted_hostname(r->pool); hostname = ngx_http_push_stream_get_formatted_hostname(r->pool);
// format content header // format content header
if ((header_response.data = ngx_pcalloc(r->pool, head->len + hostname->len + currenttime->len + NGX_INT_T_LEN + 1)) == NULL) { if ((header_response = ngx_http_push_stream_create_str(r->pool, head->len + hostname->len + currenttime->len + NGX_INT_T_LEN)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info"); ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info");
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
ngx_memset(header_response.data, '\0', head->len + hostname->len + currenttime->len + NGX_INT_T_LEN + 1); ngx_sprintf(header_response->data, (char *) head->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, ngx_time() - data->startup);
ngx_sprintf(header_response.data, (char *) head->data, hostname->data, currenttime->data, data->channels, data->broadcast_channels, ngx_time() - data->startup); header_response->len = ngx_strlen(header_response->data);
header_response.len = ngx_strlen(header_response.data);
content_len += header_response.len + tail->len; content_len += header_response->len + tail->len;
r->headers_out.content_type.len = subtype->content_type->len; r->headers_out.content_type.len = subtype->content_type->len;
r->headers_out.content_type.data = subtype->content_type->data; r->headers_out.content_type.data = subtype->content_type->data;
...@@ -306,7 +277,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t ...@@ -306,7 +277,7 @@ ngx_http_push_stream_send_response_all_channels_info_detailed(ngx_http_request_t
} }
// send content header // send content header
ngx_http_push_stream_send_response_text(r, header_response.data, header_response.len,0); ngx_http_push_stream_send_response_text(r, header_response->data, header_response->len,0);
// send content body // send content body
if (first != NULL) { if (first != NULL) {
ngx_http_output_filter(r, first); ngx_http_output_filter(r, first);
......
...@@ -433,6 +433,30 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_ ...@@ -433,6 +433,30 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
return rc; return rc;
} }
static ngx_int_t
ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code)
{
ngx_int_t rc;
if ((r == NULL) || (text == NULL) || (content_type == NULL)) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
r->headers_out.content_type.len = content_type->len;
r->headers_out.content_type.data = content_type->data;
r->headers_out.content_length_n = text->len;
r->headers_out.status = status_code;
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->header_only) {
return rc;
}
return ngx_http_push_stream_send_response_text(r, text->data, text->len, 1);
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer) ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment