Commit 1ad4b2a1 authored by plamot's avatar plamot Committed by Wandenberg

add ~size~ directive to print ~text~ size in message templates

parent fbcb6222
...@@ -105,6 +105,7 @@ h1(#examples). Some Examples <a name="examples" href="#">&nbsp;</a> ...@@ -105,6 +105,7 @@ h1(#examples). Some Examples <a name="examples" href="#">&nbsp;</a>
* "WebSocket":websocket * "WebSocket":websocket
* "Long Polling":long_polling * "Long Polling":long_polling
* "JSONP":jsonp * "JSONP":jsonp
* "M-JPEG":m-jpeg
* "Other examples":wiki * "Other examples":wiki
...@@ -245,6 +246,7 @@ h1(#contributors). Contributors ...@@ -245,6 +246,7 @@ h1(#contributors). Contributors
[websocket]docs/examples/websocket.textile#websocket [websocket]docs/examples/websocket.textile#websocket
[long_polling]docs/examples/long_polling.textile#long_polling [long_polling]docs/examples/long_polling.textile#long_polling
[jsonp]docs/examples/long_polling.textile#jsonp [jsonp]docs/examples/long_polling.textile#jsonp
[m-jpeg]docs/examples/m_jpeg.textile#m_jpeg
[tests]docs/server_tests.textile [tests]docs/server_tests.textile
[push_stream_channels_statistics]docs/directives/channels_statistics.textile#push_stream_channels_statistics [push_stream_channels_statistics]docs/directives/channels_statistics.textile#push_stream_channels_statistics
[push_stream_publisher]docs/directives/publishers.textile#push_stream_publisher [push_stream_publisher]docs/directives/publishers.textile#push_stream_publisher
......
...@@ -132,7 +132,7 @@ h2(#push_stream_message_template). push_stream_message_template <a name="push_st ...@@ -132,7 +132,7 @@ h2(#push_stream_message_template). push_stream_message_template <a name="push_st
*context:* _location (push_stream_subscriber)_ *context:* _location (push_stream_subscriber)_
The text template that will be used to format the message before be sent to subscribers. The template can contain any number of the reserved words: ==~id~, ~text~, ~channel~, ~time~, ~tag~, ~event-id~ and ~event-type~, example: "&lt;script&gt;p(~id~,'~channel~','~text~', ~tag~, '~time~');&lt;/script&gt;"== The text template that will be used to format the message before be sent to subscribers. The template can contain any number of the reserved words: ==~id~, ~text~, ~size~, ~channel~, ~time~, ~tag~, ~event-id~ and ~event-type~, example: "&lt;script&gt;p(~id~,'~channel~','~text~', ~tag~, '~time~');&lt;/script&gt;"==
h2(#push_stream_footer_template). push_stream_footer_template <a name="push_stream_footer_template" href="#">&nbsp;</a> h2(#push_stream_footer_template). push_stream_footer_template <a name="push_stream_footer_template" href="#">&nbsp;</a>
......
h1(#m_jpeg). M-JPEG example <a name="m_jpeg" href="#">&nbsp;</a>
Using the module to stream images over HTTP "(wiki)":wiki.
Configure your server like suggested bellow. You should complete this configuration with other directives according to the target application.
*Server:*
<pre>
location /pub {
client_max_body_size 1m;
client_body_buffer_size 1m;
# activate publisher (admin) mode for this location
push_stream_publisher admin;
# query string based channel id
push_stream_channels_path $arg_id;
}
location ~ /sub/(.*) {
default_type "multipart/x-mixed-replace; boundary=endofsection";
push_stream_subscriber;
# positional channel path
push_stream_channels_path $1;
# message template
push_stream_message_template "--endofsection\nX-Timestamp: ~time~\nContent-Type: image/jpg\nContent-Length: ~size~\n\n~text~";
}
location / {
default_type "text/html";
return 200 "<html><head><title>M-JPEG example</title></head><body><img src='/sub/ch1' /></body></html>";
}
</pre>
Open any browser and point it to your server, like "http://localhost/"
Post jpeg images.
<pre>
curl -s -v -X POST 'http://localhost:9080/pub?id=ch1' --data-binary @image1.jpg
curl -s -v -X POST 'http://localhost:9080/pub?id=ch1' --data-binary @image2.jpg
curl -s -v -X POST 'http://localhost:9080/pub?id=ch1' --data-binary @image3.jpg
...
</pre>
[wiki]https://en.wikipedia.org/wiki/Motion_JPEG#M-JPEG_over_HTTP
\ No newline at end of file
...@@ -46,6 +46,7 @@ typedef enum { ...@@ -46,6 +46,7 @@ typedef enum {
PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_TYPE, PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_TYPE,
PUSH_STREAM_TEMPLATE_PART_TYPE_CHANNEL, PUSH_STREAM_TEMPLATE_PART_TYPE_CHANNEL,
PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT, PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT,
PUSH_STREAM_TEMPLATE_PART_TYPE_SIZE,
PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL
} ngx_http_push_stream_template_part_type; } ngx_http_push_stream_template_part_type;
...@@ -68,6 +69,7 @@ typedef struct { ...@@ -68,6 +69,7 @@ typedef struct {
ngx_uint_t qtd_event_type; ngx_uint_t qtd_event_type;
ngx_uint_t qtd_channel; ngx_uint_t qtd_channel;
ngx_uint_t qtd_text; ngx_uint_t qtd_text;
ngx_uint_t qtd_size;
ngx_uint_t qtd_tag; ngx_uint_t qtd_tag;
ngx_uint_t qtd_time; ngx_uint_t qtd_time;
size_t literal_len; size_t literal_len;
......
...@@ -203,6 +203,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string ...@@ -203,6 +203,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID = ngx_string
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE = ngx_string("~event-type~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE = ngx_string("~event-type~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_SIZE = ngx_string("~size~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG = ngx_string("~tag~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG = ngx_string("~tag~");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME = ngx_string("~time~"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME = ngx_string("~time~");
......
...@@ -306,23 +306,22 @@ describe "Publisher Publishing Messages" do ...@@ -306,23 +306,22 @@ describe "Publisher Publishing Messages" do
channel = 'ch_test_expose_message_tag_through_message_template' channel = 'ch_test_expose_message_tag_through_message_template'
response = '' response = ''
nginx_run_server(config.merge(:message_template => '{\"id\": \"~id~\", \"channel\": \"~channel~\", \"text\": \"~text~\", \"tag\": \"~tag~\"}')) do |conf| nginx_run_server(config.merge(:message_template => '{\"id\": \"~id~\", \"channel\": \"~channel~\", \"text\": \"~text~\", \"tag\": \"~tag~\"}\r\n')) do |conf|
EventMachine.run do EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream do |chunk| sub.stream do |chunk|
response += chunk response += chunk
lines = response.split('\r\n') lines = response.split("\r\n")
if lines.size > 1 if lines.size > 1
lines.each_with_index do |line, i| lines.each_with_index do |line, i|
resp = JSON.parse(line) resp = JSON.parse(line)
expect(resp["id"].to_i).to eql(i + 1) expect(resp["id"].to_i).to eql(i + 1)
expect(resp["channel"]).to eql(channel) expect(resp["channel"]).to eql(channel)
expect(resp["text"]).to eql(body) expect(resp["text"]).to eql(body)
expect(resp["tag"].to_i).to eql(i) expect(resp["tag"].to_i).to eql(i + 1)
end end
EventMachine.stop
end end
EventMachine.stop
end end
publish_message_inline(channel, headers, body) publish_message_inline(channel, headers, body)
...@@ -330,4 +329,33 @@ describe "Publisher Publishing Messages" do ...@@ -330,4 +329,33 @@ describe "Publisher Publishing Messages" do
end end
end end
end end
it "should expose message size through message template" do
body = 'test message'
channel = 'ch_test_expose_message_size_through_message_template'
response = ''
nginx_run_server(config.merge(:message_template => '{\"id\": \"~id~\", \"channel\": \"~channel~\", \"text\": \"~text~\", \"size\": \"~size~\"}\r\n')) do |conf|
EventMachine.run do
sub = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get
sub.stream do |chunk|
response += chunk
lines = response.split("\r\n")
if lines.size > 1
lines.each_with_index do |line, i|
resp = JSON.parse(line)
expect(resp["id"].to_i).to eql(i + 1)
expect(resp["channel"]).to eql(channel)
expect(resp["text"]).to eql(body + ("a" * i))
expect(resp["size"].to_i).to eql(body.size + i)
end
EventMachine.stop
end
end
publish_message_inline(channel, headers, body)
publish_message_inline(channel, headers, body + "a")
end
end
end
end end
...@@ -348,6 +348,7 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ng ...@@ -348,6 +348,7 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ng
cur->qtd_text = 0; cur->qtd_text = 0;
cur->qtd_tag = 0; cur->qtd_tag = 0;
cur->qtd_time = 0; cur->qtd_time = 0;
cur->qtd_size = 0;
cur->literal_len = 0; cur->literal_len = 0;
ngx_queue_init(&cur->parts); ngx_queue_init(&cur->parts);
ngx_memcpy(cur->template->data, template.data, template.len); ngx_memcpy(cur->template->data, template.data, template.len);
...@@ -384,6 +385,10 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ng ...@@ -384,6 +385,10 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ng
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME.len; start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME.len;
last = start; last = start;
cur->qtd_time++; cur->qtd_time++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_SIZE, PUSH_STREAM_TEMPLATE_PART_TYPE_SIZE)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_SIZE.len;
last = start;
cur->qtd_size++;
} else { } else {
start += 1; start += 1;
} }
......
...@@ -1370,8 +1370,9 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx ...@@ -1370,8 +1370,9 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
ngx_queue_t *q; ngx_queue_t *q;
u_char id[NGX_INT_T_LEN + 1]; u_char id[NGX_INT_T_LEN + 1];
u_char tag[NGX_INT_T_LEN + 1]; u_char tag[NGX_INT_T_LEN + 1];
u_char size[NGX_INT_T_LEN + 1];
u_char time[NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN + 1]; u_char time[NGX_HTTP_PUSH_STREAM_TIME_FMT_LEN + 1];
size_t id_len, tag_len, time_len; size_t id_len, tag_len, time_len, size_len;
ngx_str_t *channel_id = (channel != NULL) ? &channel->id : &NGX_HTTP_PUSH_STREAM_EMPTY; ngx_str_t *channel_id = (channel != NULL) ? &channel->id : &NGX_HTTP_PUSH_STREAM_EMPTY;
ngx_str_t *event_id = (message->event_id != NULL) ? message->event_id : &NGX_HTTP_PUSH_STREAM_EMPTY; ngx_str_t *event_id = (message->event_id != NULL) ? message->event_id : &NGX_HTTP_PUSH_STREAM_EMPTY;
...@@ -1386,6 +1387,9 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx ...@@ -1386,6 +1387,9 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
ngx_sprintf(tag, "%d%Z", message->tag); ngx_sprintf(tag, "%d%Z", message->tag);
tag_len = ngx_strlen(tag); tag_len = ngx_strlen(tag);
ngx_sprintf(size, "%d%Z", text->len);
size_len = ngx_strlen(size);
len += template->qtd_channel * channel_id->len; len += template->qtd_channel * channel_id->len;
len += template->qtd_event_id * event_id->len; len += template->qtd_event_id * event_id->len;
len += template->qtd_event_type * event_type->len; len += template->qtd_event_type * event_type->len;
...@@ -1393,6 +1397,7 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx ...@@ -1393,6 +1397,7 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
len += template->qtd_time * time_len; len += template->qtd_time * time_len;
len += template->qtd_tag * tag_len; len += template->qtd_tag * tag_len;
len += template->qtd_text * text->len; len += template->qtd_text * text->len;
len += template->qtd_size * size_len;
len += template->literal_len; len += template->literal_len;
txt = ngx_http_push_stream_create_str(temp_pool, len); txt = ngx_http_push_stream_create_str(temp_pool, len);
...@@ -1426,6 +1431,9 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx ...@@ -1426,6 +1431,9 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
case PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT: case PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT:
last = ngx_cpymem(last, text->data, text->len); last = ngx_cpymem(last, text->data, text->len);
break; break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_SIZE:
last = ngx_cpymem(last, size, size_len);
break;
case PUSH_STREAM_TEMPLATE_PART_TYPE_TIME: case PUSH_STREAM_TEMPLATE_PART_TYPE_TIME:
last = ngx_cpymem(last, time, time_len); last = ngx_cpymem(last, time, time_len);
break; break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment