Commit 7215028a authored by Wandenberg's avatar Wandenberg

add support to send binary data

parent 883aea05
......@@ -236,7 +236,7 @@ static void ngx_http_push_stream_get_last_received_message_value
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, const ngx_str_t *replace, off_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *opcode, off_t opcode_len, const u_char *text, off_t text_len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool);
......
......@@ -48,11 +48,11 @@ describe "Publisher Publishing Messages" do
channel = 'ch_test_publish_messages_with_different_bytes'
nginx_run_server(config.merge(:client_max_body_size => '130k', :client_body_buffer_size => '130k', :subscriber_connection_ttl => "1s")) do |conf|
ranges = [1..255]
ranges = [0..255]
ranges.each do |range|
bytes = []
range.each do |i|
1.upto(255) do |j|
0.upto(255) do |j|
bytes << "%s%s" % [i.chr, j.chr]
end
end
......@@ -112,7 +112,7 @@ describe "Publisher Publishing Messages" do
end
it "should publish many messages in the same channel" do
body_prefix = 'published message '
body_prefix = 'published_message_'
channel = 'ch_test_publish_many_messages_in_the_same_channel'
messagens_to_publish = 1500
......@@ -134,7 +134,7 @@ describe "Publisher Publishing Messages" do
0.step(messagens_to_publish - 1, 500) do |i|
socket = open_socket(nginx_host, nginx_port)
1.upto(500) do |j|
resp_headers, body = post_in_socket("/pub?id=#{channel}", body_prefix + (i+j).to_s, socket, {:wait_for => "}\r\n"})
resp_headers, body = post_in_socket("/pub?id=#{channel}", "#{body_prefix}#{i+j}", socket, {:wait_for => "}\r\n"})
fail("Message was not published: " + body_prefix + (i+j).to_s) unless resp_headers.include?("HTTP/1.1 200 OK")
end
socket.close
......
......@@ -424,6 +424,39 @@ describe "Subscriber WebSocket" do
end
end
it "should accept messages with different bytes" do
nginx_run_server(config.merge(:client_max_body_size => '130k', :client_body_buffer_size => '130k', :subscriber_connection_ttl => "1s", :message_template => "~text~|")) do |conf|
ranges = [0..255]
ranges.each do |range|
bytes = []
range.each do |i|
0.upto(255) do |j|
bytes << "%s%s" % [i.chr, j.chr]
end
end
channel = "ch_test_publish_messages_with_different_bytes_#{range}"
body = bytes.join('')
response = ''
request = "GET /ws/#{channel} HTTP/1.0\r\nConnection: Upgrade\r\nSec-WebSocket-Key: /mQoZf6pRiv8+6o72GncLQ==\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 8\r\n"
socket = open_socket(nginx_host, nginx_port)
socket.print("#{request}\r\n")
EventMachine.run do
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s ).post :head => headers, :body => body
pub.callback do
headers, resp = read_response_on_socket(socket, '|')
resp.bytes.to_a.should eql("\x81\x7F\x00\x00\x00\x00\x00\x02\x00\x01#{body}|".bytes.to_a)
EventMachine.stop
end
end
end
end
end
it "should not cache the response" do
channel = 'ch_test_not_cache_the_response'
......
......@@ -905,7 +905,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
}
// create longpooling timeout message
if ((ngx_http_push_stream_longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked((u_char *)NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, sizeof(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
if ((ngx_http_push_stream_longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked((u_char *)NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, ngx_cycle->pool)) == NULL) {
return NGX_ERROR;
}
......
......@@ -184,19 +184,18 @@ ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_
ngx_memcpy((*dst_value)->data, text->data, text->len);
(*dst_value)->data[(*dst_value)->len] = '\0';
u_char *aux = ngx_http_push_stream_str_replace(template->data, token->data, text->data, 0, temp_pool);
ngx_str_t *aux = ngx_http_push_stream_str_replace(template, token, text, 0, temp_pool);
if (aux == NULL) {
return NGX_ERROR;
}
if (((*dst_message) = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + ngx_strlen(aux) + 1)) == NULL) {
if (((*dst_message) = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + aux->len)) == NULL) {
return NGX_ERROR;
}
(*dst_message)->len = ngx_strlen(aux);
(*dst_message)->len = aux->len;
(*dst_message)->data = (u_char *) ((*dst_message) + 1);
ngx_memcpy((*dst_message)->data, aux, (*dst_message)->len);
(*dst_message)->data[(*dst_message)->len] = '\0';
ngx_memcpy((*dst_message)->data, aux->data, (*dst_message)->len);
}
return NGX_OK;
......@@ -291,14 +290,13 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
}
ngx_str_t *formmated = (msg->formatted_messages + i);
if ((text == NULL) || ((formmated->data = ngx_slab_alloc_locked(shpool, text->len + 1)) == NULL)) {
if ((text == NULL) || ((formmated->data = ngx_slab_alloc_locked(shpool, text->len)) == NULL)) {
ngx_http_push_stream_free_message_memory_locked(shpool, msg);
return NULL;
}
formmated->len = text->len;
ngx_memcpy(formmated->data, text->data, formmated->len);
formmated->data[formmated->len] = '\0';
i++;
}
......@@ -1035,35 +1033,30 @@ ngx_http_push_stream_buffer_timer_wake_handler(ngx_event_t *ev)
ngx_http_push_stream_timer_reset(NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL, &ngx_http_push_stream_buffer_cleanup_event);
}
static u_char *
ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *pool)
static ngx_str_t *
ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, const ngx_str_t *replace, off_t offset, ngx_pool_t *pool)
{
if (org == NULL) {
return NULL;
}
ngx_uint_t len_org = ngx_strlen(org);
ngx_uint_t len_find = ngx_strlen(find);
ngx_uint_t len_replace = ngx_strlen(replace);
u_char *result = org, *last;
ngx_str_t *result = (ngx_str_t *) org;
if (len_find > 0) {
u_char *ret = (u_char *) ngx_strstr(org + offset, find);
if (find->len > 0) {
u_char *ret = (u_char *) ngx_strnstr(org->data + offset, (char *) find->data, org->len - offset);
if (ret != NULL) {
u_char *tmp = ngx_pcalloc(pool, len_org + len_replace + len_find + 1);
ngx_str_t *tmp = ngx_http_push_stream_create_str(pool, org->len + replace->len - find->len);
if (tmp == NULL) {
ngx_log_error(NGX_LOG_ERR, pool->log, 0, "push stream module: unable to allocate memory to apply text replace");
return NULL;
}
u_int len_found = ret-org;
ngx_memcpy(tmp, org, len_found);
ngx_memcpy(tmp + len_found, replace, len_replace);
last = ngx_copy(tmp + len_found + len_replace, org + len_found + len_find, len_org - len_found - len_find);
*last = '\0';
off_t offset_found = ret - org->data;
ngx_memcpy(tmp->data, org->data, offset_found);
ngx_memcpy(tmp->data + offset_found, replace->data, replace->len);
ngx_memcpy(tmp->data + offset_found + replace->len, org->data + offset_found + find->len, org->len - offset_found - find->len);
result = ngx_http_push_stream_str_replace(tmp, find, replace, len_found + len_replace, pool);
result = ngx_http_push_stream_str_replace(tmp, find, replace, offset_found + replace->len, pool);
}
}
......@@ -1085,47 +1078,44 @@ ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_
static ngx_str_t *
ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool)
{
u_char *txt = NULL, *last;
ngx_str_t *str = NULL;
u_char *last;
ngx_str_t *txt = NULL;
u_char char_id[NGX_INT_T_LEN + 1];
u_char tag[NGX_INT_T_LEN + 1];
u_char time[NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN];
ngx_str_t *char_id = ngx_http_push_stream_create_str(temp_pool, NGX_INT_T_LEN);
ngx_str_t *tag = ngx_http_push_stream_create_str(temp_pool, NGX_INT_T_LEN);
ngx_str_t *time = ngx_http_push_stream_create_str(temp_pool, NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN);
if (char_id == NULL || tag == NULL || time == NULL) {
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to replace message values on template");
return NULL;
}
u_char *channel_id = (channel != NULL) ? channel->id.data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
u_char *event_id = (message->event_id != NULL) ? message->event_id->data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
u_char *event_type = (message->event_type != NULL) ? message->event_type->data : NGX_HTTP_PUSH_STREAM_EMPTY.data;
ngx_str_t *channel_id = (channel != NULL) ? &channel->id : &NGX_HTTP_PUSH_STREAM_EMPTY;
ngx_str_t *event_id = (message->event_id != NULL) ? message->event_id : &NGX_HTTP_PUSH_STREAM_EMPTY;
ngx_str_t *event_type = (message->event_type != NULL) ? message->event_type : &NGX_HTTP_PUSH_STREAM_EMPTY;
last = ngx_sprintf(char_id, "%d", message->id);
*last = '\0';
last = ngx_sprintf(char_id->data, "%d", message->id);
char_id->len = last - char_id->data;
last = ngx_http_time(time, message->time);
*last = '\0';
last = ngx_http_time(time->data, message->time);
time->len = last - time->data;
last = ngx_sprintf(tag, "%d", message->tag);
*last = '\0';
last = ngx_sprintf(tag->data, "%d", message->tag);
tag->len = last - tag->data;
txt = ngx_http_push_stream_str_replace(message_template->data, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID.data, char_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID.data, event_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE.data, event_type, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.data, channel_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, text->data, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME.data, time, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG.data, tag, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(message_template, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID, char_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID, event_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE, event_type, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL, channel_id, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT, text, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME, time, 0, temp_pool);
txt = ngx_http_push_stream_str_replace(txt, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG, tag, 0, temp_pool);
if (txt == NULL) {
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to replace message values on template");
return NULL;
}
if ((str = ngx_pcalloc(temp_pool, sizeof(ngx_str_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to return message applied to template");
return NULL;
}
str->data = txt;
str->len = ngx_strlen(txt);
return str;
return txt;
}
......@@ -1455,11 +1445,10 @@ ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_
if (lines != NULL) {
cur = lines;
while ((cur = (ngx_http_push_stream_line_t *) ngx_queue_next(&cur->queue)) != lines) {
cur->line->data = ngx_http_push_stream_str_replace(message_template->data, NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, cur->line->data, 0, temp_pool);
if (cur->line->data == NULL) {
cur->line = ngx_http_push_stream_str_replace(message_template, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT, cur->line, 0, temp_pool);
if (cur->line == NULL) {
return NULL;
}
cur->line->len = ngx_strlen(cur->line->data);
}
result = ngx_http_push_stream_join_with_crlf(lines, temp_pool);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment