Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nginx-push-stream-module
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
nginx-push-stream-module
Commits
e6f12845
Commit
e6f12845
authored
Oct 22, 2014
by
Wandenberg
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix #152 not sending a trailing comma on jsonp object
parent
6599ae24
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
58 additions
and
17 deletions
+58
-17
long_polling_spec.rb
misc/spec/subscriber/long_polling_spec.rb
+27
-5
polling_spec.rb
misc/spec/subscriber/polling_spec.rb
+15
-4
ngx_http_push_stream_module_subscriber.c
src/ngx_http_push_stream_module_subscriber.c
+14
-6
ngx_http_push_stream_module_utils.c
src/ngx_http_push_stream_module_utils.c
+2
-2
No files found.
misc/spec/subscriber/long_polling_spec.rb
View file @
e6f12845
...
@@ -136,6 +136,7 @@ describe "Subscriber Properties" do
...
@@ -136,6 +136,7 @@ describe "Subscriber Properties" do
it
"should receive a timed out message when timeout_with_body is on"
do
it
"should receive a timed out message when timeout_with_body is on"
do
channel
=
'ch_test_disconnect_long_polling_subscriber_when_longpolling_timeout_is_set'
channel
=
'ch_test_disconnect_long_polling_subscriber_when_longpolling_timeout_is_set'
callback_function_name
=
"callback_function"
start
=
Time
.
now
start
=
Time
.
now
nginx_run_server
(
config
.
merge
(
:subscriber_connection_ttl
=>
"1s"
,
:timeout_with_body
=>
'on'
,
:message_template
=>
'{\"id\":\"~id~\", \"message\":\"~text~\", \"channel\":\"~channel~\", \"tag\":\"~tag~\", \"time\":\"~time~\"}'
),
:timeout
=>
30
)
do
|
conf
|
nginx_run_server
(
config
.
merge
(
:subscriber_connection_ttl
=>
"1s"
,
:timeout_with_body
=>
'on'
,
:message_template
=>
'{\"id\":\"~id~\", \"message\":\"~text~\", \"channel\":\"~channel~\", \"tag\":\"~tag~\", \"time\":\"~time~\"}'
),
:timeout
=>
30
)
do
|
conf
|
...
@@ -153,7 +154,12 @@ describe "Subscriber Properties" do
...
@@ -153,7 +154,12 @@ describe "Subscriber Properties" do
response
[
"time"
].
should
eql
(
"Thu, 01 Jan 1970 00:00:00 GMT"
)
response
[
"time"
].
should
eql
(
"Thu, 01 Jan 1970 00:00:00 GMT"
)
Time
.
parse
(
sub
.
response_header
[
'LAST_MODIFIED'
].
to_s
).
utc
.
to_i
.
should
be_in_the_interval
(
Time
.
now
.
utc
.
to_i
-
1
,
Time
.
now
.
utc
.
to_i
)
Time
.
parse
(
sub
.
response_header
[
'LAST_MODIFIED'
].
to_s
).
utc
.
to_i
.
should
be_in_the_interval
(
Time
.
now
.
utc
.
to_i
-
1
,
Time
.
now
.
utc
.
to_i
)
sub
.
response_header
[
'ETAG'
].
to_s
.
should
eql
(
"0"
)
sub
.
response_header
[
'ETAG'
].
to_s
.
should
eql
(
"0"
)
EventMachine
.
stop
sub_1
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
sub_1
.
callback
do
sub_1
.
response
.
should
eql
(
%(callback_function([{"id":"-3", "message":"Timed out", "channel":"", "tag":"0", "time":"Thu, 01 Jan 1970 00:00:00 GMT"}]);)
)
EventMachine
.
stop
end
end
end
end
end
end
end
...
@@ -191,7 +197,7 @@ describe "Subscriber Properties" do
...
@@ -191,7 +197,7 @@ describe "Subscriber Properties" do
it
"should accept delete a channel with a long polling subscriber"
do
it
"should accept delete a channel with a long polling subscriber"
do
channel
=
'ch_test_delete_channel_with_long_polling_subscriber'
channel
=
'ch_test_delete_channel_with_long_polling_subscriber'
body
=
'published message'
callback_function_name
=
"callback_function"
resp
=
""
resp
=
""
nginx_run_server
(
config
.
merge
(
:publisher_mode
=>
'admin'
,
:message_template
=>
'{\"id\":\"~id~\", \"message\":\"~text~\", \"channel\":\"~channel~\"}'
))
do
|
conf
|
nginx_run_server
(
config
.
merge
(
:publisher_mode
=>
'admin'
,
:message_template
=>
'{\"id\":\"~id~\", \"message\":\"~text~\", \"channel\":\"~channel~\"}'
))
do
|
conf
|
...
@@ -202,6 +208,11 @@ describe "Subscriber Properties" do
...
@@ -202,6 +208,11 @@ describe "Subscriber Properties" do
response
=
JSON
.
parse
(
sub_1
.
response
)
response
=
JSON
.
parse
(
sub_1
.
response
)
response
[
"channel"
].
should
eql
(
channel
)
response
[
"channel"
].
should
eql
(
channel
)
response
[
"id"
].
to_i
.
should
eql
(
-
2
)
response
[
"id"
].
to_i
.
should
eql
(
-
2
)
end
sub_2
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
sub_2
.
callback
do
sub_2
.
response
.
should
eql
(
%(#{callback_function_name}([{"id":"-2", "message":"Channel deleted", "channel":"ch_test_delete_channel_with_long_polling_subscriber"}]);)
)
EventMachine
.
stop
EventMachine
.
stop
end
end
...
@@ -241,13 +252,24 @@ describe "Subscriber Properties" do
...
@@ -241,13 +252,24 @@ describe "Subscriber Properties" do
nginx_run_server
(
config
)
do
|
conf
|
nginx_run_server
(
config
)
do
|
conf
|
EventMachine
.
run
do
EventMachine
.
run
do
publish_message_inline
(
channel
,
{},
body
)
publish_message_inline
(
channel
,
{
'Event-Id'
=>
'event_id'
},
body
)
publish_message_inline
(
channel
,
{},
body
+
"1"
)
publish_message_inline
(
channel
,
{},
body
+
"1"
)
sub_1
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'.b2'
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
sub_1
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'.b2'
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
sub_1
.
callback
do
sub_1
.
callback
do
sub_1
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
}
,
#{
body
+
"1"
}
,]);"
)
sub_1
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
}
,
#{
body
+
"1"
}
]);"
)
EventMachine
.
stop
sub_2
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
.
merge
({
'Last-Event-Id'
=>
'event_id'
})
sub_2
.
callback
do
sub_2
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
+
"1"
}
]);"
)
sub_3
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
.
merge
({
'If-Modified-Since'
=>
Time
.
at
(
0
).
utc
.
strftime
(
"%a, %d %b %Y %T %Z"
)})
sub_3
.
callback
do
sub_3
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
}
,
#{
body
+
"1"
}
]);"
)
EventMachine
.
stop
end
end
end
end
end
end
end
end
...
...
misc/spec/subscriber/polling_spec.rb
View file @
e6f12845
...
@@ -74,7 +74,7 @@ describe "Subscriber Properties" do
...
@@ -74,7 +74,7 @@ describe "Subscriber Properties" do
publish_message_inline
(
channel
,
{},
body
)
publish_message_inline
(
channel
,
{},
body
)
sub_1
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
.
merge
({
'If-Modified-Since'
=>
Time
.
at
(
0
).
utc
.
strftime
(
"%a, %d %b %Y %T %Z"
)})
sub_1
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
.
merge
({
'If-Modified-Since'
=>
Time
.
at
(
0
).
utc
.
strftime
(
"%a, %d %b %Y %T %Z"
)})
sub_1
.
callback
do
sub_1
.
callback
do
sub_1
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
}
,
]);"
)
sub_1
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
}
]);"
)
EventMachine
.
stop
EventMachine
.
stop
end
end
end
end
...
@@ -89,13 +89,24 @@ describe "Subscriber Properties" do
...
@@ -89,13 +89,24 @@ describe "Subscriber Properties" do
nginx_run_server
(
config
)
do
|
conf
|
nginx_run_server
(
config
)
do
|
conf
|
EventMachine
.
run
do
EventMachine
.
run
do
publish_message_inline
(
channel
,
{},
body
)
publish_message_inline
(
channel
,
{
'Event-Id'
=>
'event_id'
},
body
)
publish_message_inline
(
channel
,
{},
body
+
"1"
)
publish_message_inline
(
channel
,
{},
body
+
"1"
)
sub_1
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'.b2'
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
sub_1
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'.b2'
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
sub_1
.
callback
do
sub_1
.
callback
do
sub_1
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
}
,
#{
body
+
"1"
}
,]);"
)
sub_1
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
}
,
#{
body
+
"1"
}
]);"
)
EventMachine
.
stop
sub_2
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
.
merge
({
'Last-Event-Id'
=>
'event_id'
})
sub_2
.
callback
do
sub_2
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
+
"1"
}
]);"
)
sub_3
=
EventMachine
::
HttpRequest
.
new
(
nginx_address
+
'/sub/'
+
channel
.
to_s
+
'?callback='
+
callback_function_name
).
get
:head
=>
headers
.
merge
({
'If-Modified-Since'
=>
Time
.
at
(
0
).
utc
.
strftime
(
"%a, %d %b %Y %T %Z"
)})
sub_3
.
callback
do
sub_3
.
response
.
should
eql
(
"
#{
callback_function_name
}
([
#{
body
}
,
#{
body
+
"1"
}
]);"
)
EventMachine
.
stop
end
end
end
end
end
end
end
end
...
...
src/ngx_http_push_stream_module_subscriber.c
View file @
e6f12845
...
@@ -541,8 +541,8 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
...
@@ -541,8 +541,8 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
static
void
static
void
ngx_http_push_stream_send_old_messages
(
ngx_http_request_t
*
r
,
ngx_http_push_stream_channel_t
*
channel
,
ngx_uint_t
backtrack
,
time_t
if_modified_since
,
ngx_int_t
tag
,
time_t
greater_message_time
,
ngx_int_t
greater_message_tag
,
ngx_str_t
*
last_event_id
)
ngx_http_push_stream_send_old_messages
(
ngx_http_request_t
*
r
,
ngx_http_push_stream_channel_t
*
channel
,
ngx_uint_t
backtrack
,
time_t
if_modified_since
,
ngx_int_t
tag
,
time_t
greater_message_time
,
ngx_int_t
greater_message_tag
,
ngx_str_t
*
last_event_id
)
{
{
ngx_http_push_stream_msg_t
*
message
;
ngx_http_push_stream_msg_t
*
message
,
*
next_message
;
ngx_queue_t
*
cur
;
ngx_queue_t
*
cur
,
*
next
;
if
(
ngx_http_push_stream_has_old_messages_to_send
(
channel
,
backtrack
,
if_modified_since
,
tag
,
greater_message_time
,
greater_message_tag
,
last_event_id
))
{
if
(
ngx_http_push_stream_has_old_messages_to_send
(
channel
,
backtrack
,
if_modified_since
,
tag
,
greater_message_time
,
greater_message_tag
,
last_event_id
))
{
cur
=
&
channel
->
message_queue
;
cur
=
&
channel
->
message_queue
;
...
@@ -550,22 +550,22 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
...
@@ -550,22 +550,22 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
ngx_uint_t
qtd
=
(
backtrack
>
channel
->
stored_messages
)
?
channel
->
stored_messages
:
backtrack
;
ngx_uint_t
qtd
=
(
backtrack
>
channel
->
stored_messages
)
?
channel
->
stored_messages
:
backtrack
;
ngx_uint_t
start
=
channel
->
stored_messages
-
qtd
;
ngx_uint_t
start
=
channel
->
stored_messages
-
qtd
;
// positioning at first message, and send the others
// positioning at first message, and send the others
while
((
qtd
>
0
)
&&
(
cur
=
ngx_queue_next
(
cur
))
&&
(
cur
!=
NULL
)
&&
(
cur
!=
&
channel
->
message_queue
))
{
while
((
qtd
>
0
)
&&
(
cur
=
ngx_queue_next
(
cur
))
&&
(
cur
!=
&
channel
->
message_queue
))
{
message
=
(
ngx_http_push_stream_msg_t
*
)
ngx_queue_data
(
cur
,
ngx_http_push_stream_msg_t
,
queue
);
message
=
(
ngx_http_push_stream_msg_t
*
)
ngx_queue_data
(
cur
,
ngx_http_push_stream_msg_t
,
queue
);
if
(
message
->
deleted
)
{
if
(
message
->
deleted
)
{
break
;
break
;
}
}
if
(
start
==
0
)
{
if
(
start
==
0
)
{
ngx_http_push_stream_send_response_message
(
r
,
channel
,
message
,
0
,
1
);
qtd
--
;
qtd
--
;
ngx_http_push_stream_send_response_message
(
r
,
channel
,
message
,
0
,
qtd
>
0
);
}
else
{
}
else
{
start
--
;
start
--
;
}
}
}
}
}
else
if
((
last_event_id
!=
NULL
)
||
(
if_modified_since
>=
0
))
{
}
else
if
((
last_event_id
!=
NULL
)
||
(
if_modified_since
>=
0
))
{
ngx_flag_t
found
=
0
;
ngx_flag_t
found
=
0
;
while
((
cur
=
ngx_queue_next
(
cur
))
&&
(
cur
!=
NULL
)
&&
(
cur
!=
&
channel
->
message_queue
))
{
while
((
cur
=
ngx_queue_next
(
cur
))
&&
(
cur
!=
&
channel
->
message_queue
))
{
message
=
(
ngx_http_push_stream_msg_t
*
)
ngx_queue_data
(
cur
,
ngx_http_push_stream_msg_t
,
queue
);
message
=
(
ngx_http_push_stream_msg_t
*
)
ngx_queue_data
(
cur
,
ngx_http_push_stream_msg_t
,
queue
);
if
(
message
->
deleted
)
{
if
(
message
->
deleted
)
{
break
;
break
;
...
@@ -584,7 +584,15 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
...
@@ -584,7 +584,15 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
}
}
if
(
found
&&
(((
greater_message_time
==
0
)
&&
(
greater_message_tag
==
-
1
))
||
(
greater_message_time
>
message
->
time
)
||
((
greater_message_time
==
message
->
time
)
&&
(
greater_message_tag
>=
message
->
tag
))))
{
if
(
found
&&
(((
greater_message_time
==
0
)
&&
(
greater_message_tag
==
-
1
))
||
(
greater_message_time
>
message
->
time
)
||
((
greater_message_time
==
message
->
time
)
&&
(
greater_message_tag
>=
message
->
tag
))))
{
ngx_http_push_stream_send_response_message
(
r
,
channel
,
message
,
0
,
1
);
next
=
ngx_queue_next
(
cur
);
next_message
=
(
ngx_http_push_stream_msg_t
*
)
ngx_queue_data
(
next
,
ngx_http_push_stream_msg_t
,
queue
);
ngx_flag_t
send_separator
=
1
;
if
((
next
==
&
channel
->
message_queue
)
||
((
greater_message_time
>
0
)
&&
((
next_message
->
time
>
greater_message_time
)
||
((
next_message
->
time
==
greater_message_time
)
&&
(
next_message
->
tag
>
greater_message_tag
)))))
{
send_separator
=
0
;
}
ngx_http_push_stream_send_response_message
(
r
,
channel
,
message
,
0
,
send_separator
);
}
}
}
}
}
}
...
...
src/ngx_http_push_stream_module_utils.c
View file @
e6f12845
...
@@ -118,7 +118,7 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
...
@@ -118,7 +118,7 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
ngx_http_push_stream_send_response_content_header
(
subscriber
->
request
,
ngx_http_get_module_loc_conf
(
subscriber
->
request
,
ngx_http_push_stream_module
));
ngx_http_push_stream_send_response_content_header
(
subscriber
->
request
,
ngx_http_get_module_loc_conf
(
subscriber
->
request
,
ngx_http_push_stream_module
));
}
}
ngx_http_push_stream_send_response_message
(
subscriber
->
request
,
channel
,
channel
->
channel_deleted_message
,
1
,
1
);
ngx_http_push_stream_send_response_message
(
subscriber
->
request
,
channel
,
channel
->
channel_deleted_message
,
1
,
0
);
// subscriber does not have any other subscription, the connection may be closed
// subscriber does not have any other subscription, the connection may be closed
...
@@ -1089,7 +1089,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
...
@@ -1089,7 +1089,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
}
}
if
(
mcf
->
ping_msg
!=
NULL
)
{
if
(
mcf
->
ping_msg
!=
NULL
)
{
rc
=
ngx_http_push_stream_send_response_message
(
r
,
NULL
,
mcf
->
ping_msg
,
1
,
1
);
rc
=
ngx_http_push_stream_send_response_message
(
r
,
NULL
,
mcf
->
ping_msg
,
1
,
0
);
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment