Commit 653007db authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

fixing messages sent to subscribers to be truly a transfer encoding chunked connection

parent 8c855057
......@@ -204,6 +204,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_DATE_FORMAT_ISO_8601 = ngx_string("
//// headers
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ALLOW = ngx_string("Allow");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_EXPLAIN = ngx_string("X-Nginx-PushStream-Explain");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING = ngx_string("Transfer-Encoding");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED = ngx_string("chunked");
// other stuff
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_METHODS = ngx_string("GET, POST");
......
......@@ -187,6 +187,8 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_LAST_CHUNK = ngx_string("0" CRLF CRLF);
ngx_event_t ngx_http_push_stream_ping_event;
ngx_event_t ngx_http_push_stream_disconnect_event;
ngx_event_t ngx_http_push_stream_memory_cleanup_event;
......@@ -200,6 +202,7 @@ ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_l
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_chunk(const u_char *text, uint len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t message_template, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
......
......@@ -294,8 +294,7 @@ ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect)
while ((cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue)) != sentinel) {
if ((cur->request != NULL) && (ngx_exiting || (force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires)))) {
ngx_http_push_stream_worker_subscriber_cleanup(cur);
cur->request->keepalive = 0;
ngx_http_send_special(cur->request, NGX_HTTP_LAST | NGX_HTTP_FLUSH);
ngx_http_push_stream_send_response_text(cur->request, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.data, NGX_HTTP_PUSH_STREAM_LAST_CHUNK.len, 1);
ngx_http_finalize_request(cur->request, NGX_HTTP_OK);
} else {
break;
......
......@@ -468,13 +468,12 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// append crlf to templates
if (conf->header_template.len > 0) {
conf->header_template.data = ngx_http_push_stream_append_crlf(&conf->header_template, cf->pool);
conf->header_template.len = ngx_strlen(conf->header_template.data);
ngx_str_t * aux = ngx_http_push_stream_get_formatted_chunk(conf->header_template.data, conf->header_template.len, cf->pool);
conf->header_template.data = aux->data;
conf->header_template.len = aux->len;
}
if (conf->message_template.len > 0) {
conf->message_template.data = ngx_http_push_stream_append_crlf(&conf->message_template, cf->pool);
conf->message_template.len = ngx_strlen(conf->message_template.data);
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template);
}
......
......@@ -149,6 +149,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = -1;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_TRANSFER_ENCODING, &NGX_HTTP_PUSH_STREAM_HEADER_CHUNCKED);
ngx_http_send_header(r);
// sending response content header
......
......@@ -100,17 +100,18 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
msg->formatted_messages = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t)*ngx_http_push_stream_module_main_conf->qtd_templates);
while ((cur = (ngx_http_push_stream_msg_template_t *) ngx_queue_next(&cur->queue)) != sentinel) {
ngx_str_t *aux = ngx_http_push_stream_format_message(channel, msg, cur->template, temp_pool);
ngx_str_t *chunk = ngx_http_push_stream_get_formatted_chunk(aux->data, aux->len, temp_pool);
ngx_str_t *formmated = (msg->formatted_messages + i);
formmated->data = ngx_slab_alloc_locked(shpool, aux->len + 1);
formmated->data = ngx_slab_alloc_locked(shpool, chunk->len + 1);
if (formmated->data == NULL) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
formmated->len = aux->len;
formmated->len = chunk->len;
ngx_memset(formmated->data, '\0', formmated->len + 1);
ngx_memcpy(formmated->data, aux->data, formmated->len);
ngx_memcpy(formmated->data, chunk->data, formmated->len);
i++;
}
......@@ -732,3 +733,22 @@ ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool)
return hostname;
}
static ngx_str_t *
ngx_http_push_stream_get_formatted_chunk(const u_char *text, uint len, ngx_pool_t *temp_pool)
{
ngx_str_t *chunk;
u_int max_len;
/* the "0000000000000000" is 64-bit hexadimal string */
max_len = sizeof("0000000000000000" CRLF CRLF CRLF) + len;
chunk = (ngx_str_t *) ngx_pcalloc(temp_pool, sizeof(ngx_str_t) + max_len);
if (chunk != NULL) {
chunk->data = (u_char *) (chunk + 1);
ngx_memset(chunk->data, '\0', max_len);
ngx_sprintf(chunk->data, "%xO" CRLF "%s" CRLF CRLF, len + sizeof(CRLF) - 1, text);
chunk->len = ngx_strlen(chunk->data);
}
return chunk;
}
......@@ -571,4 +571,16 @@ class TestPublisher < Test::Unit::TestCase
}
end
def test_transfer_encoding_chuncked
headers = {'accept' => 'application/json'}
channel = 'ch_test_transfer_encoding_chuncked'
EventMachine.run {
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream { |chunk|
assert_equal("chunked", sub_1.response_header['TRANSFER_ENCODING'], "Didn't receive the right transfer encoding")
EventMachine.stop
}
}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment