Commit 43d397f7 authored by Wandenberg's avatar Wandenberg

remove the unecessary CRLF at the end of messages

parent cbfa6b69
h1(#changelog). Changelog
* Removed default value from push_stream_padding_by_user_agent directive, it was "[A|a]ndroid 2,4097,4097:[S|s]afari,1025,0"
* Change the publish message action through a WebSocket connection to add the message to all subscribed channels
* Added support to get channels statistics, delete channels and publish message to some channels specifying their ids on push_stream_channels_path
* Avoid reapply formatter to header, message or footer template when inside an if on event source mode
......
......@@ -254,7 +254,7 @@ h2(#push_stream_padding_by_user_agent). push_stream_padding_by_user_agent <a nam
*syntax:* _push_stream_padding_by_user_agent string_
*default:* _[A|a]ndroid 2,4097,4097:[S|s]afari,1025,0_
*default:* _none_
*context:* _location_
......
......@@ -47,7 +47,7 @@ static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_CHANNEL_INACTIVITY_TIME = 30;
#define NGX_HTTP_PUSH_STREAM_DEFAULT_ALLOWED_ORIGINS ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_PADDING_BY_USER_AGENT "[A|a]ndroid 2,4097,4097:[S|s]afari,1025,0"
#define NGX_HTTP_PUSH_STREAM_DEFAULT_PADDING_BY_USER_AGENT ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_WILDCARD_CHANNEL_PREFIX ""
......
......@@ -190,7 +190,7 @@ static ngx_http_push_stream_content_subtype_t subtypes[] = {
};
static const ngx_int_t NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID = -1;
#define NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT ""
#define NGX_HTTP_PUSH_STREAM_PING_MESSAGE_TEXT " "
static const ngx_int_t NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID = -2;
#define NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_TEXT "Channel deleted"
......@@ -204,17 +204,17 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG = ngx_string("~ta
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME = ngx_string("~time~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_PREFIX = ngx_string(": ");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_DEFAULT_HEADER_TEMPLATE = ngx_string(": \r\n");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE = ngx_string(": ~text~\r\n");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_DEFAULT_HEADER_TEMPLATE = ngx_string(": " CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_COMMENT_TEMPLATE = ngx_string(": ~text~" CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX = ngx_string("data: ");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE = ngx_string("id: ~event-id~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_EVENT_TEMPLATE = ngx_string("event: ~event-type~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_ID_TEMPLATE = ngx_string("id: ~event-id~" CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_EVENT_TEMPLATE = ngx_string("event: ~event-type~" CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE = ngx_string("text/event-stream; charset=utf-8");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK = ngx_string(": -1" CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK = ngx_string("([");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_MID_CHUNK = ngx_string(",");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK = ngx_string("]);" CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK = ngx_string("]);");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_CONTENT_TYPE = ngx_string("application/javascript");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN = ngx_string("([^:]+),(\\d+),(\\d+)");
......@@ -233,7 +233,6 @@ static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_re
static ngx_str_t * ngx_http_push_stream_get_header(ngx_http_request_t *r, const ngx_str_t *header_name);
static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static u_char * ngx_http_push_stream_str_replace(u_char *org, u_char *find, u_char *replace, ngx_uint_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_chunk(const u_char *text, off_t len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *text, off_t len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool);
......
......@@ -67,7 +67,7 @@ describe "Keepalive" do
socket.print(get_messages)
post_in_socket("/pub?id=#{channel}", "#{body_prefix} #{j.to_s.rjust(3, '0')}", socket_pub, {:wait_for => "}\r\n"})
headers, body = read_response_on_socket(socket, "\r\n0\r\n\r\n")
body.should eql("18\r\nmessage to be sent #{j.to_s.rjust(3, '0')}\r\n\r\n0\r\n\r\n")
body.should eql("16\r\nmessage to be sent #{j.to_s.rjust(3, '0')}\r\n0\r\n\r\n")
end
socket.close
......
......@@ -26,7 +26,7 @@ describe "Wildcard Properties" do
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '/' + channel_broad).get :head => headers
sub_2.stream do |chunk2|
chunk2.should eql("#{conf.header_template}\r\n")
chunk2.should eql(conf.header_template)
EventMachine.stop
end
end
......
......@@ -431,8 +431,8 @@ describe "Publisher Properties" do
resp_2 += chunk
end
sub_2.callback do
resp_1.should eql("<script>p(1,'channel_id_inside_if_block','published message');</script>\r\n")
resp_2.should eql("<script>p(1,'test_channel_id_inside_if_block','published message');</script>\r\n")
resp_1.should eql("<script>p(1,'channel_id_inside_if_block','published message');</script>")
resp_2.should eql("<script>p(1,'test_channel_id_inside_if_block','published message');</script>")
EventMachine.stop
end
......@@ -572,19 +572,19 @@ describe "Publisher Properties" do
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + "_1").get :head => headers
sub_1.stream do |chunk|
chunk.should eql("#{body}|#{channel.to_s + "_1"}\r\n")
chunk.should eql("#{body}|#{channel.to_s + "_1"}")
messages += 1
end
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + "_2").get :head => headers
sub_2.stream do |chunk|
chunk.should eql("#{body}|#{channel.to_s + "_2"}\r\n")
chunk.should eql("#{body}|#{channel.to_s + "_2"}")
messages += 1
end
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + "_3").get :head => headers
sub_3.stream do |chunk|
chunk.should eql("#{body}|#{channel.to_s + "_3"}\r\n")
chunk.should eql("#{body}|#{channel.to_s + "_3"}")
messages += 1
end
......@@ -816,7 +816,7 @@ describe "Publisher Properties" do
:header_template => " ", # send a space as header to has a chunk received
:footer_template => nil,
:ping_message_interval => nil,
:message_template => '{\"id\":\"~id~\", \"channel\":\"~channel~\", \"text\":\"~text~\"}'
:message_template => '{\"id\":\"~id~\", \"channel\":\"~channel~\", \"text\":\"~text~\"}|'
})
resp = ""
......@@ -843,7 +843,7 @@ describe "Publisher Properties" do
else
if !stage1_complete
stage1_complete = true
response = JSON.parse(resp)
response = JSON.parse(resp.split("|")[0])
response["channel"].should eql(channel_1)
response["id"].to_i.should eql(-2)
response["text"].should eql("Channel deleted")
......@@ -862,7 +862,7 @@ describe "Publisher Properties" do
end
elsif !stage2_complete
stage2_complete = true
response = JSON.parse(resp.split("\r\n")[2])
response = JSON.parse(resp.split("|")[1])
response["channel"].should eql(channel_2)
response["id"].to_i.should eql(1)
response["text"].should eql(body)
......@@ -873,7 +873,7 @@ describe "Publisher Properties" do
pub.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'].should eql("Channel deleted.")
end
else
response = JSON.parse(resp.split("\r\n")[3])
response = JSON.parse(resp.split("|")[2])
response["channel"].should eql(channel_2)
response["id"].to_i.should eql(-2)
response["text"].should eql("Channel deleted")
......@@ -913,7 +913,7 @@ describe "Publisher Properties" do
resp_1 += chunk
end
sub_1.callback do
resp_1.should eql("{\"id\":\"-2\", \"channel\":\"test_delete_channels_whith_subscribers_1\", \"text\":\"Channel deleted\"}\r\nFOOTER\r\n")
resp_1.should eql("{\"id\":\"-2\", \"channel\":\"test_delete_channels_whith_subscribers_1\", \"text\":\"Channel deleted\"}FOOTER")
end
resp_2 = ""
......@@ -922,7 +922,7 @@ describe "Publisher Properties" do
resp_2 += chunk
end
sub_2.callback do
resp_2.should eql("{\"id\":\"-2\", \"channel\":\"test_delete_channels_whith_subscribers_2\", \"text\":\"Channel deleted\"}\r\nFOOTER\r\n")
resp_2.should eql("{\"id\":\"-2\", \"channel\":\"test_delete_channels_whith_subscribers_2\", \"text\":\"Channel deleted\"}FOOTER")
end
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => {'accept' => 'application/json'}
......@@ -977,7 +977,7 @@ describe "Publisher Properties" do
sub_1.stream do |chunk|
resp = resp + chunk
if resp == "#{conf.header_template}\r\n"
if resp == conf.header_template
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).delete :head => headers
pub.callback do
pub.should be_http_status(200).without_body
......@@ -986,7 +986,7 @@ describe "Publisher Properties" do
end
end
sub_1.callback do
resp.should eql("#{conf.header_template}\r\nChannel deleted\r\n#{conf.footer_template}\r\n")
resp.should eql("#{conf.header_template}Channel deleted#{conf.footer_template}")
EventMachine.stop
end
end
......@@ -1039,8 +1039,8 @@ describe "Publisher Properties" do
end
EM.add_timer(2) do
resp.should eql("#{conf.header_template}\r\nChannel deleted\r\n#{conf.footer_template}\r\n")
resp2.should eql("<html><body>\r\n|Channel deleted|\r\n</body></html>\r\n")
resp.should eql("#{conf.header_template}Channel deleted#{conf.footer_template}")
resp2.should eql("<html><body>|Channel deleted|</body></html>")
EventMachine.stop
end
end
......
......@@ -18,7 +18,7 @@ describe "Publisher Publishing Messages" do
EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub.stream do |chunk|
chunk.should eql(body + "\r\n")
chunk.should eql(body)
EventMachine.stop
end
......@@ -35,7 +35,7 @@ describe "Publisher Publishing Messages" do
EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub.stream do |chunk|
chunk.should eql(body + "\r\n")
chunk.should eql(body)
EventMachine.stop
end
......@@ -67,7 +67,7 @@ describe "Publisher Publishing Messages" do
end
sub.callback do
response.bytes.to_a.should eql("#{body}\r\n".bytes.to_a)
response.bytes.to_a.should eql(body.bytes.to_a)
EventMachine.stop
end
......@@ -91,7 +91,7 @@ describe "Publisher Publishing Messages" do
end
sub.callback do
(Time.now - start).should be < 2 #should be disconnect right after receive the large message
response.should eql(body + "\r\n")
response.should eql(body)
response = ''
start = Time.now
......@@ -101,7 +101,7 @@ describe "Publisher Publishing Messages" do
end
sub_1.callback do
(Time.now - start).should be > 2 #should be disconnected only when timeout happens
response.should eql(body + "\r\n")
response.should eql(body)
EventMachine.stop
end
end
......@@ -117,12 +117,12 @@ describe "Publisher Publishing Messages" do
messagens_to_publish = 1500
response = ""
nginx_run_server(config.merge(:max_reserved_memory => "256m", :keepalive_requests => 500)) do |conf|
nginx_run_server(config.merge(:max_reserved_memory => "256m", :keepalive_requests => 500, :message_template => "~text~|")) do |conf|
EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub.stream do |chunk|
response += chunk
recieved_messages = response.split("\r\n")
recieved_messages = response.split("|")
if recieved_messages.length == messagens_to_publish
recieved_messages.last.should eql(body_prefix + messagens_to_publish.to_s)
......
......@@ -18,7 +18,7 @@ describe "Comunication Properties" do
EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub.stream do |chunk|
chunk.should eql("#{conf.header_template}\r\n")
chunk.should eql(conf.header_template)
EventMachine.stop
end
end
......@@ -40,7 +40,7 @@ describe "Comunication Properties" do
pub.callback do
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_2.stream do |chunk2|
chunk2.should eql("#{conf.header_template}\r\n")
chunk2.should eql(conf.header_template)
EventMachine.stop
end
end
......@@ -84,9 +84,9 @@ describe "Comunication Properties" do
end
EM.add_timer(17) do
response_1.should eql("#{conf.header_template}\r\n#{body}\r\n")
response_2.should eql("#{conf.header_template}\r\n#{body}\r\n")
response_3.should eql("#{conf.header_template}\r\n")
response_1.should eql("#{conf.header_template}#{body}")
response_2.should eql("#{conf.header_template}#{body}")
response_3.should eql("#{conf.header_template}")
EventMachine.stop
end
end
......@@ -98,7 +98,7 @@ describe "Comunication Properties" do
body = 'message to create a channel'
response = ""
nginx_run_server(config.merge(:message_template => '{\"duplicated\":\"~channel~\", \"channel\":\"~channel~\", \"message\":\"~text~\", \"message_id\":\"~id~\"}')) do |conf|
nginx_run_server(config.merge(:message_template => '|{\"duplicated\":\"~channel~\", \"channel\":\"~channel~\", \"message\":\"~text~\", \"message_id\":\"~id~\"}')) do |conf|
publish_message(channel, headers, body)
EventMachine.run do
......@@ -106,12 +106,12 @@ describe "Comunication Properties" do
sub.stream do |chunk|
response += chunk
lines = response.split("\r\n")
lines = response.split("|")
if lines.length >= 3
lines[0].should eql("#{conf.header_template}")
lines[1].should eql("{\"duplicated\":\"#{channel}\", \"channel\":\"#{channel}\", \"message\":\"#{body}\", \"message_id\":\"1\"}")
lines[2].should eql("{\"duplicated\":\"\", \"channel\":\"\", \"message\":\"\", \"message_id\":\"-1\"}")
lines[2].should eql("{\"duplicated\":\"\", \"channel\":\"\", \"message\":\" \", \"message_id\":\"-1\"}")
EventMachine.stop
end
end
......@@ -124,7 +124,7 @@ describe "Comunication Properties" do
body = '~channel~~channel~~channel~~text~~text~~text~'
response = ""
nginx_run_server(config.merge(:message_template => '{\"channel\":\"~channel~\", \"message\":\"~text~\", \"message_id\":\"~id~\"}')) do |conf|
nginx_run_server(config.merge(:message_template => '|{\"channel\":\"~channel~\", \"message\":\"~text~\", \"message_id\":\"~id~\"}')) do |conf|
publish_message(channel, headers, body)
EventMachine.run do
......@@ -132,12 +132,12 @@ describe "Comunication Properties" do
sub.stream do |chunk|
response += chunk
lines = response.split("\r\n")
lines = response.split("|")
if lines.length >= 3
lines[0].should eql("#{conf.header_template}")
lines[1].should eql("{\"channel\":\"ch_test_message_and_channel_with_same_pattern_of_the_template~channel~~channel~~channel~~channel~~channel~~channel~~text~~text~~text~~channel~~channel~~channel~~text~~text~~text~~channel~~channel~~channel~~text~~text~~text~\", \"message\":\"~channel~~channel~~channel~~text~~text~~text~\", \"message_id\":\"1\"}")
lines[2].should eql("{\"channel\":\"\", \"message\":\"\", \"message_id\":\"-1\"}")
lines[2].should eql("{\"channel\":\"\", \"message\":\" \", \"message_id\":\"-1\"}")
EventMachine.stop
end
end
......
......@@ -16,7 +16,7 @@ describe "Subscriber Properties" do
response += chunk
end
sub_1.callback do |chunk|
response.should eql("#{body}\r\n")
response.should eql("#{body}")
sent_headers = headers.merge({'If-Modified-Since' => sub_1.response_header['LAST_MODIFIED'], 'If-None-Match' => sub_1.response_header['ETAG']})
response = ""
......@@ -25,7 +25,7 @@ describe "Subscriber Properties" do
response += chunk2
end
sub_2.callback do
response.should eql("#{body} 1\r\n")
response.should eql("#{body} 1")
EventMachine.stop
end
......@@ -53,7 +53,7 @@ describe "Subscriber Properties" do
response += chunk
end
sub.callback do |chunk|
response.should eql("msg 3\r\nmsg 4\r\n")
response.should eql("msg 3msg 4")
EventMachine.stop
end
end
......@@ -199,7 +199,7 @@ describe "Subscriber Properties" do
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?callback=' + callback_function_name).get :head => headers
sub_1.callback do
sub_1.response.should eql("#{callback_function_name}\r\n([#{body}\r\n]);\r\n")
sub_1.response.should eql("#{callback_function_name}([#{body}]);")
EventMachine.stop
end
......@@ -221,7 +221,7 @@ describe "Subscriber Properties" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b2' + '?callback=' + callback_function_name).get :head => headers
sub_1.callback do
sub_1.response.should eql("#{callback_function_name}\r\n([#{body}\r\n,#{body + "1"}\r\n,]);\r\n")
sub_1.response.should eql("#{callback_function_name}([#{body},#{body + "1"},]);")
EventMachine.stop
end
end
......@@ -280,7 +280,7 @@ describe "Subscriber Properties" do
sub_1.response_header["CONTENT_ENCODING"].should eql("gzip")
actual_response = Zlib::GzipReader.new(StringIO.new(actual_response)).read
actual_response.should eql("#{body}\r\n")
actual_response.should eql("#{body}")
EventMachine.stop
end
publish_message_inline(channel, {}, body)
......
......@@ -20,17 +20,17 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(1100 + conf.header_template.size + 4)
sub_1.response.size.should eql(1100 + conf.header_template.size)
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 2")
sub_2.callback do
sub_2.should be_http_status(200)
sub_2.response.size.should eql(4097 + conf.header_template.size + 4)
sub_2.response.size.should eql(4097 + conf.header_template.size)
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 3")
sub_3.callback do
sub_3.should be_http_status(200)
sub_3.response.size.should eql(conf.header_template.size + 2)
sub_3.response.size.should eql(conf.header_template.size)
EventMachine.stop
end
......@@ -50,17 +50,17 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback {
sub_1.should be_http_status(200)
sub_1.response.size.should eql(500 + body.size + 4)
sub_1.response.size.should eql(500 + body.size)
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 2")
sub_2.callback {
sub_2.should be_http_status(200)
sub_2.response.size.should eql(body.size + 2)
sub_2.response.size.should eql(body.size)
sub_3 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 3")
sub_3.callback {
sub_3.should be_http_status(200)
sub_3.response.size.should eql(body.size + 2)
sub_3.response.size.should eql(body.size)
EventMachine.stop
}
......@@ -84,7 +84,7 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(expected_padding + i + 4)
sub_1.response.size.should eql(expected_padding + i)
i = 105
expected_padding = 600 - ((i/100).to_i * 100)
......@@ -92,7 +92,7 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(expected_padding + i + 4)
sub_1.response.size.should eql(expected_padding + i)
i = 221
expected_padding = 600 - ((i/100).to_i * 100)
......@@ -100,7 +100,7 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(expected_padding + i + 4)
sub_1.response.size.should eql(expected_padding + i)
i = 331
expected_padding = 600 - ((i/100).to_i * 100)
......@@ -108,7 +108,7 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(expected_padding + i + 4)
sub_1.response.size.should eql(expected_padding + i)
i = 435
expected_padding = 600 - ((i/100).to_i * 100)
......@@ -116,7 +116,7 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(expected_padding + i + 4)
sub_1.response.size.should eql(expected_padding + i)
i = 502
expected_padding = 600 - ((i/100).to_i * 100)
......@@ -124,14 +124,14 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(expected_padding + i + 4)
sub_1.response.size.should eql(expected_padding + i)
i = 550
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers.merge("User-Agent" => "Test 1")
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(i + 2)
sub_1.response.size.should eql(i)
EventMachine.stop
end
......@@ -160,12 +160,12 @@ describe "Subscriber Padding by user agent" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?ua=test 1').get :head => headers
sub_1.callback do
sub_1.should be_http_status(200)
sub_1.response.size.should eql(1024 + conf.header_template.size + 4)
sub_1.response.size.should eql(1024 + conf.header_template.size)
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?ua=test 2').get :head => headers
sub_2.callback do
sub_2.should be_http_status(200)
sub_2.response.size.should eql(conf.header_template.size + 2)
sub_2.response.size.should eql(conf.header_template.size)
EventMachine.stop
end
......
......@@ -56,7 +56,7 @@ describe "Subscriber Properties" do
sub_1.should be_http_status(200)
sub_1.response_header['LAST_MODIFIED'].to_s.should_not eql("")
sub_1.response_header['ETAG'].to_s.should eql("0")
sub_1.response.should eql("#{body}\r\n")
sub_1.response.should eql("#{body}")
EventMachine.stop
end
end
......@@ -74,7 +74,7 @@ describe "Subscriber Properties" do
publish_message_inline(channel, {}, body)
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?callback=' + callback_function_name).get :head => headers.merge({'If-Modified-Since' => Time.at(0).utc.strftime("%a, %d %b %Y %T %Z")})
sub_1.callback do
sub_1.response.should eql("#{callback_function_name}\r\n([#{body}\r\n,]);\r\n")
sub_1.response.should eql("#{callback_function_name}([#{body},]);")
EventMachine.stop
end
end
......@@ -94,7 +94,7 @@ describe "Subscriber Properties" do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b2' + '?callback=' + callback_function_name).get :head => headers
sub_1.callback do
sub_1.response.should eql("#{callback_function_name}\r\n([#{body}\r\n,#{body + "1"}\r\n,]);\r\n")
sub_1.response.should eql("#{callback_function_name}([#{body},#{body + "1"},]);")
EventMachine.stop
end
end
......@@ -140,7 +140,7 @@ describe "Subscriber Properties" do
sub_1.response_header["CONTENT_ENCODING"].should eql("gzip")
actual_response = Zlib::GzipReader.new(StringIO.new(actual_response)).read
actual_response.should eql("#{body}\r\n")
actual_response.should eql("#{body}")
EventMachine.stop
end
end
......
......@@ -322,12 +322,12 @@ describe "Subscriber Properties" do
body = 'body'
response = ""
nginx_run_server(config.merge(:header_template => nil, :message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}')) do |conf|
nginx_run_server(config.merge(:header_template => nil, :message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}|')) do |conf|
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '/' + channel_2.to_s + '/' + channel_3.to_s + '/' + channel_4.to_s + '/' + channel_5.to_s + '/' + channel_6.to_s).get :head => headers
sub_1.stream do |chunk|
response += chunk
lines = response.split("\r\n")
lines = response.split("|")
if lines.length >= 6
line = JSON.parse(lines[0])
......@@ -487,7 +487,7 @@ describe "Subscriber Properties" do
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.stream do |chunk|
chunk.should eql("#{body}\r\n")
chunk.should eql("#{body}")
EventMachine.stop
end
......@@ -505,7 +505,7 @@ describe "Subscriber Properties" do
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.stream do |chunk|
chunk.should eql("\r\n")
chunk.should eql(" ")
EventMachine.stop
end
end
......@@ -520,7 +520,7 @@ describe "Subscriber Properties" do
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.stream do |chunk|
chunk.should eql("#{conf.ping_message_text}\r\n")
chunk.should eql(conf.ping_message_text)
EventMachine.stop
end
end
......@@ -535,7 +535,7 @@ describe "Subscriber Properties" do
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.stream do |chunk|
chunk.should eql("-1:\r\n")
chunk.should eql("-1: ")
EventMachine.stop
end
end
......@@ -550,7 +550,7 @@ describe "Subscriber Properties" do
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub_1.stream do |chunk|
chunk.should eql("-1:#{conf.ping_message_text}\r\n")
chunk.should eql("-1:#{conf.ping_message_text}")
EventMachine.stop
end
end
......@@ -611,7 +611,7 @@ describe "Subscriber Properties" do
response += chunk
end
sub.callback do
response.should eql("msg 2\r\nmsg 3\r\nmsg 4\r\n")
response.should eql("msg 2msg 3msg 4")
response = ''
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
......@@ -619,7 +619,7 @@ describe "Subscriber Properties" do
response += chunk
end
sub_1.callback do
response.should eql("msg 5\r\n")
response.should eql("msg 5")
EventMachine.stop
end
......@@ -673,7 +673,7 @@ describe "Subscriber Properties" do
EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers
sub.stream do |chunk|
chunk.should eql("#{conf.header_template}\r\n")
chunk.should eql("#{conf.header_template}")
EventMachine.stop
end
end
......@@ -768,8 +768,8 @@ describe "Subscriber Properties" do
resp_2 += chunk
end
sub_2.callback do
resp_1.should eql("<script>p(1,'channels_path_inside_if_block','published message');</script>\r\n")
resp_2.should eql("<script>p(1,'test_channels_path_inside_if_block','published message');</script>\r\n")
resp_1.should eql("<script>p(1,'channels_path_inside_if_block','published message');</script>")
resp_2.should eql("<script>p(1,'test_channels_path_inside_if_block','published message');</script>")
EventMachine.stop
end
......@@ -804,7 +804,7 @@ describe "Subscriber Properties" do
sub_1.response_header["CONTENT_ENCODING"].should eql("gzip")
actual_response = Zlib::GzipReader.new(StringIO.new(actual_response)).read
actual_response.should eql("HEADER\r\nTEMPLATE\r\n1234\r\n\r\n<script>p(1,'ch_test_get_content_gzipped','body');</script>\r\n</body></html>\r\n")
actual_response.should eql("HEADER\r\nTEMPLATE\r\n1234\r\n<script>p(1,'ch_test_get_content_gzipped','body');</script></body></html>")
EventMachine.stop
end
publish_message_inline(channel, {}, body)
......
......@@ -5,7 +5,7 @@ describe "Receive old messages" do
{
:header_template => nil,
:footer_template => nil,
:message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}',
:message_template => '{\"channel\":\"~channel~\", \"id\":\"~id~\", \"message\":\"~text~\"}\r\n',
:subscriber_mode => subscriber_mode
}
end
......@@ -18,7 +18,7 @@ describe "Receive old messages" do
body = 'body'
nginx_run_server(config.merge(:header_template => 'HEADER')) do |conf|
nginx_run_server(config.merge(:header_template => 'HEADER\r\n')) do |conf|
#create channels with some messages
1.upto(3) do |i|
publish_message(channel_1, headers, body + i.to_s)
......@@ -69,7 +69,7 @@ describe "Receive old messages" do
body = 'body'
nginx_run_server(config.merge(:header_template => 'HEADER'), :timeout => 45) do |conf|
nginx_run_server(config.merge(:header_template => 'HEADER\r\n'), :timeout => 45) do |conf|
#create channels with some messages with progressive interval (1,2,3,5,7,9,12,15,18 seconds)
1.upto(3) do |i|
sleep(i)
......@@ -124,7 +124,7 @@ describe "Receive old messages" do
body = 'body'
nginx_run_server(config.merge(:header_template => 'HEADER'), :timeout => 45) do |conf|
nginx_run_server(config.merge(:header_template => 'HEADER\r\n'), :timeout => 45) do |conf|
#create channels with some messages with progressive interval (1,2,3,5,7,9,12,15,18 seconds)
1.upto(3) do |i|
sleep(i)
......@@ -185,7 +185,7 @@ describe "Receive old messages" do
it "should receive old messages by 'last_event_id'" do
channel = 'ch_test_disconnect_after_receive_old_messages_by_last_event_id_when_longpolling_is_on'
nginx_run_server(config.merge(:message_template => '~text~')) do |conf|
nginx_run_server(config.merge(:message_template => '~text~\r\n')) do |conf|
publish_message(channel, {'Event-Id' => 'event 1'}, 'msg 1')
publish_message(channel, {'Event-Id' => 'event 2'}, 'msg 2')
publish_message(channel, {}, 'msg 3')
......@@ -209,7 +209,7 @@ describe "Receive old messages" do
messages_to_publish = 10
now = nil
nginx_run_server(config.merge(:message_template => '~text~')) do |conf|
nginx_run_server(config.merge(:message_template => '~text~\r\n')) do |conf|
messages_to_publish.times do |i|
now = Time.now if i == 5
publish_message(channel.to_s, headers, body_prefix + i.to_s)
......@@ -233,7 +233,7 @@ describe "Receive old messages" do
messages_to_publish = 10
now = nil
nginx_run_server(config.merge(:last_received_message_time => "$arg_time", :last_received_message_tag => "$arg_tag", :message_template => '~text~')) do |conf|
nginx_run_server(config.merge(:last_received_message_time => "$arg_time", :last_received_message_tag => "$arg_tag", :message_template => '~text~\r\n')) do |conf|
messages_to_publish.times do |i|
now = Time.now if i == 5
publish_message(channel.to_s, headers, body_prefix + i.to_s)
......@@ -257,7 +257,7 @@ describe "Receive old messages" do
messages_to_publish = 10
now = nil
nginx_run_server(config.merge(:last_event_id => "$arg_event_id", :message_template => '~text~')) do |conf|
nginx_run_server(config.merge(:last_event_id => "$arg_event_id", :message_template => '~text~\r\n')) do |conf|
publish_message(channel, {'Event-Id' => 'event 1'}, 'msg 1')
publish_message(channel, {'Event-Id' => 'event 2'}, 'msg 2')
publish_message(channel, {}, 'msg 3')
......@@ -335,7 +335,7 @@ describe "Receive old messages" do
hash_headers
end
lines = body.gsub(/[^\w{:,}" ]/, "\n").gsub("d{", "{").split("\n").delete_if{|line| line.empty?}.compact
lines = body.gsub(/[^\w{:,}" ]/, "\n").gsub("f{", "{").split("\n").delete_if{|line| line.empty?}.compact
lines.length.should be >= number_expected_lines
......
......@@ -274,7 +274,7 @@ describe "Subscriber WebSocket" do
socket_2 = open_socket(nginx_host, nginx_port)
socket_2.print("#{request_2}\r\n")
headers_2, body_2 = read_response_on_socket(socket_2, '}')
body_2.should eql("{\"text\":\"#{body}\"}\r\n")
body_2.should eql("{\"text\":\"#{body}\"}")
end
end
......
......@@ -361,24 +361,36 @@ ngx_http_push_stream_postconfig(ngx_conf_t *cf)
}
ngx_conf_log_error(NGX_LOG_INFO, cf, 0, "Using %udKiB of shared memory for push stream module", shm_size >> 10);
ngx_uint_t steps = ngx_http_push_stream_padding_max_len / 100;
if ((ngx_http_push_stream_module_paddings_chunks = ngx_palloc(cf->pool, sizeof(ngx_str_t) * (steps + 1))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to create padding messages");
return NGX_ERROR;
}
u_char aux[ngx_http_push_stream_padding_max_len + 1];
ngx_memset(aux, ' ', ngx_http_push_stream_padding_max_len);
aux[ngx_http_push_stream_padding_max_len] = '\0';
ngx_int_t i, len = ngx_http_push_stream_padding_max_len;
for (i = steps; i >= 0; i--) {
if ((*(ngx_http_push_stream_module_paddings_chunks + i) = ngx_http_push_stream_get_formatted_chunk(aux, len, cf->pool)) == NULL) {
if (ngx_http_push_stream_padding_max_len > 0) {
ngx_uint_t steps = ngx_http_push_stream_padding_max_len / 100;
if ((ngx_http_push_stream_module_paddings_chunks = ngx_palloc(cf->pool, sizeof(ngx_str_t) * (steps + 1))) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to create padding messages");
return NGX_ERROR;
}
len = i * 100;
*(aux + len) = '\0';
u_int padding_max_len = ngx_http_push_stream_padding_max_len + ((ngx_http_push_stream_padding_max_len % 2) ? 1 : 0);
ngx_str_t *aux = ngx_http_push_stream_create_str(cf->pool, padding_max_len);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to create padding messages value");
return NGX_ERROR;
}
while (padding_max_len > 0) {
padding_max_len -= 2;
ngx_memcpy(aux->data + padding_max_len, CRLF, 2);
}
ngx_int_t i, len = ngx_http_push_stream_padding_max_len;
for (i = steps; i >= 0; i--) {
ngx_str_t *padding = ngx_pcalloc(cf->pool, sizeof(ngx_str_t));
if ((*(ngx_http_push_stream_module_paddings_chunks + i) = padding) == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to create padding messages");
return NGX_ERROR;
}
padding->data = aux->data;
padding->len = len;
len = i * 100;
}
}
return ngx_http_push_stream_set_up_shm(cf, ngx_http_push_stream_shm_size);
......@@ -599,7 +611,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
// formatting message template
if (ngx_strncmp(conf->message_template.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX.data, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX.len) != 0) {
ngx_str_t *aux = (conf->message_template.len > 0) ? &conf->message_template : (ngx_str_t *) &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT;
ngx_str_t *template = ngx_http_push_stream_create_str(cf->pool, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX.len + aux->len + sizeof(CRLF) -1);
ngx_str_t *template = ngx_http_push_stream_create_str(cf->pool, NGX_HTTP_PUSH_STREAM_EVENTSOURCE_MESSAGE_PREFIX.len + aux->len + ngx_strlen(CRLF));
if (template == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to append message prefix to message template");
return NGX_CONF_ERROR;
......@@ -625,16 +637,10 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
conf->footer_template.len = aux->len;
}
}
} else {
} else if (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
// formatting header and footer template for chunk transfer
if (conf->header_template.len > 0) {
ngx_str_t *aux = NULL;
if (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->header_template.data, conf->header_template.len, cf->pool);
} else {
aux = ngx_http_push_stream_get_formatted_chunk(conf->header_template.data, conf->header_template.len, cf->pool);
}
ngx_str_t *aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->header_template.data, conf->header_template.len, cf->pool);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format header template");
return NGX_CONF_ERROR;
......@@ -644,13 +650,7 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
}
if (conf->footer_template.len > 0) {
ngx_str_t *aux = NULL;
if (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) {
aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->footer_template.data, conf->footer_template.len, cf->pool);
} else {
aux = ngx_http_push_stream_get_formatted_chunk(conf->footer_template.data, conf->footer_template.len, cf->pool);
}
ngx_str_t *aux = ngx_http_push_stream_get_formatted_websocket_frame(conf->footer_template.data, conf->footer_template.len, cf->pool);
if (aux == NULL) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to format footer template");
return NGX_CONF_ERROR;
......
......@@ -160,10 +160,12 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
if (ngx_http_arg(r, NGX_HTTP_PUSH_STREAM_CALLBACK.data, NGX_HTTP_PUSH_STREAM_CALLBACK.len, &callback_function_name) == NGX_OK) {
ngx_http_push_stream_unescape_uri(&callback_function_name);
if ((ctx->callback = ngx_http_push_stream_get_formatted_chunk(callback_function_name.data, callback_function_name.len, r->pool)) == NULL) {
if ((ctx->callback = ngx_pcalloc(r->pool, sizeof(ngx_str_t))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for callback function name");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ctx->callback->data = callback_function_name.data;
ctx->callback->len = callback_function_name.len;
}
greater_message_tag = tag;
......
......@@ -189,14 +189,13 @@ ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_
return NGX_ERROR;
}
ngx_str_t *chunk = ngx_http_push_stream_get_formatted_chunk(aux, ngx_strlen(aux), temp_pool);
if ((chunk == NULL) || ((*dst_message) = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + chunk->len + 1)) == NULL) {
if (((*dst_message) = ngx_slab_alloc_locked(shpool, sizeof(ngx_str_t) + ngx_strlen(aux) + 1)) == NULL) {
return NGX_ERROR;
}
(*dst_message)->len = chunk->len;
(*dst_message)->len = ngx_strlen(aux);
(*dst_message)->data = (u_char *) ((*dst_message) + 1);
ngx_memcpy((*dst_message)->data, chunk->data, (*dst_message)->len);
ngx_memcpy((*dst_message)->data, aux, (*dst_message)->len);
(*dst_message)->data[(*dst_message)->len] = '\0';
}
......@@ -272,7 +271,11 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
break;
}
}
aux = ngx_http_push_stream_join_with_crlf(lines, temp_pool);
ngx_str_t *tmp = ngx_http_push_stream_join_with_crlf(lines, temp_pool);
if ((aux = ngx_http_push_stream_create_str(temp_pool, tmp->len + 2)) != NULL) {
ngx_sprintf(aux->data, "%V" CRLF, tmp);
}
} else {
aux = ngx_http_push_stream_format_message(channel, msg, &msg->raw, cur->template, temp_pool);
}
......@@ -282,11 +285,9 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
return NULL;
}
ngx_str_t *text = NULL;
ngx_str_t *text = aux;
if (cur->websocket) {
text = ngx_http_push_stream_get_formatted_websocket_frame(aux->data, aux->len, temp_pool);
} else {
text = ngx_http_push_stream_get_formatted_chunk(aux->data, aux->len, temp_pool);
}
ngx_str_t *formmated = (msg->formatted_messages + i);
......@@ -1258,20 +1259,6 @@ ngx_http_push_stream_get_formatted_hostname(ngx_pool_t *pool)
}
static ngx_str_t *
ngx_http_push_stream_get_formatted_chunk(const u_char *text, off_t len, ngx_pool_t *temp_pool)
{
ngx_str_t *chunk;
chunk = ngx_http_push_stream_create_str(temp_pool, sizeof(CRLF) + len);
if (chunk != NULL) {
ngx_sprintf(chunk->data, "%*s" CRLF, (size_t) len, text);
chunk->len = ngx_strlen(chunk->data);
}
return chunk;
}
uint64_t
ngx_http_push_stream_htonll(uint64_t value) {
int num = 42;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment