Commit b5db1acb authored by Wandenberg's avatar Wandenberg

fix send old messages with jsonp for multiple channels subscription

parent 754357ec
...@@ -233,6 +233,7 @@ typedef struct { ...@@ -233,6 +233,7 @@ typedef struct {
ngx_event_t *ping_timer; ngx_event_t *ping_timer;
ngx_http_push_stream_subscriber_t *subscriber; ngx_http_push_stream_subscriber_t *subscriber;
ngx_flag_t longpolling; ngx_flag_t longpolling;
ngx_flag_t message_sent;
ngx_pool_t *temp_pool; ngx_pool_t *temp_pool;
ngx_chain_t *free; ngx_chain_t *free;
ngx_chain_t *busy; ngx_chain_t *busy;
......
...@@ -279,6 +279,35 @@ describe "Subscriber Properties" do ...@@ -279,6 +279,35 @@ describe "Subscriber Properties" do
end end
end end
it "should return messages from different channels on JSONP response" do
channel_1 = 'ch_test_jsonp_ch1'
channel_2 = 'ch_test_jsonp_ch2'
channel_3 = 'ch_test_jsonp_ch3'
body = 'body'
response = ""
callback_function_name = "callback_function"
nginx_run_server(config) do |conf|
EventMachine.run do
publish_message(channel_1, {}, body + "1_1")
publish_message(channel_2, {}, body + "1_2")
publish_message(channel_3, {}, body + "1_3")
publish_message(channel_1, {}, body + "2_1")
publish_message(channel_2, {}, body + "2_2")
publish_message(channel_3, {}, body + "2_3")
publish_message(channel_1, {}, body + "3_1")
publish_message(channel_2, {}, body + "3_2")
publish_message(channel_3, {}, body + "3_3")
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel_1.to_s + '.b3/' + channel_2.to_s + '.b3/' + channel_3.to_s + '.b3' + '?callback=' + callback_function_name).get :head => headers
sub_1.callback do
expect(sub_1.response).to eql("#{callback_function_name}([#{body}1_1,#{body}2_1,#{body}3_1,#{body}1_2,#{body}2_2,#{body}3_2,#{body}1_3,#{body}2_3,#{body}3_3]);")
EventMachine.stop
end
end
end
end
it "should force content_type to be application/javascript when using function name specified in callback parameter" do it "should force content_type to be application/javascript when using function name specified in callback parameter" do
channel = 'test_force_content_type_to_be_application_javascript_when_using_function_name_specified_in_callback_parameter_when_polling' channel = 'test_force_content_type_to_be_application_javascript_when_using_function_name_specified_in_callback_parameter_when_polling'
body = 'body' body = 'body'
......
...@@ -515,8 +515,9 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch ...@@ -515,8 +515,9 @@ ngx_http_push_stream_has_old_messages_to_send(ngx_http_push_stream_channel_t *ch
static void static void
ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id) ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_uint_t backtrack, time_t if_modified_since, ngx_int_t tag, time_t greater_message_time, ngx_int_t greater_message_tag, ngx_str_t *last_event_id)
{ {
ngx_http_push_stream_msg_t *message, *next_message; ngx_http_push_stream_module_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_queue_t *q, *next; ngx_http_push_stream_msg_t *message;
ngx_queue_t *q;
if (ngx_http_push_stream_has_old_messages_to_send(channel, backtrack, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) { if (ngx_http_push_stream_has_old_messages_to_send(channel, backtrack, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id)) {
if (backtrack > 0) { if (backtrack > 0) {
...@@ -532,7 +533,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -532,7 +533,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
if (start == 0) { if (start == 0) {
qtd--; qtd--;
ngx_http_push_stream_send_response_message(r, channel, message, 0, qtd > 0); ngx_http_push_stream_send_response_message(r, channel, message, 0, ctx->message_sent);
} else { } else {
start--; start--;
} }
...@@ -560,15 +561,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -560,15 +561,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
} }
if (found && (((greater_message_time == 0) && (greater_message_tag == -1)) || (greater_message_time > message->time) || ((greater_message_time == message->time) && (greater_message_tag >= message->tag)))) { if (found && (((greater_message_time == 0) && (greater_message_tag == -1)) || (greater_message_time > message->time) || ((greater_message_time == message->time) && (greater_message_tag >= message->tag)))) {
next = ngx_queue_next(q); ngx_http_push_stream_send_response_message(r, channel, message, 0, ctx->message_sent);
next_message = ngx_queue_data(next, ngx_http_push_stream_msg_t, queue);
ngx_flag_t send_separator = 1;
if ((q == ngx_queue_last(&channel->message_queue)) || ((greater_message_time > 0) &&
((next_message->time > greater_message_time) || ((next_message->time == greater_message_time) && (next_message->tag > greater_message_tag))))) {
send_separator = 0;
}
ngx_http_push_stream_send_response_message(r, channel, message, 0, send_separator);
} }
} }
ngx_shmtx_unlock(channel->mutex); ngx_shmtx_unlock(channel->mutex);
......
...@@ -585,18 +585,19 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_ ...@@ -585,18 +585,19 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
} }
} }
if (rc == NGX_OK) { if ((rc == NGX_OK) && use_jsonp && send_separator) {
rc = ngx_http_push_stream_send_response_text(r, str->data, str->len, 0); rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_MID_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_MID_CHUNK.len, 0);
} }
if ((rc == NGX_OK) && use_jsonp) { if (rc == NGX_OK) {
if (send_separator) { rc = ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_MID_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_MID_CHUNK.len, 0); if (rc == NGX_OK) {
ctx->message_sent = 1;
} }
}
if (send_callback) { if ((rc == NGX_OK) && use_jsonp && send_callback) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.len, 0); rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.len, 0);
}
} }
if (rc == NGX_OK) { if (rc == NGX_OK) {
...@@ -1475,6 +1476,7 @@ ngx_http_push_stream_add_request_context(ngx_http_request_t *r) ...@@ -1475,6 +1476,7 @@ ngx_http_push_stream_add_request_context(ngx_http_request_t *r)
ctx->ping_timer = NULL; ctx->ping_timer = NULL;
ctx->subscriber = NULL; ctx->subscriber = NULL;
ctx->longpolling = 0; ctx->longpolling = 0;
ctx->message_sent = 0;
ctx->padding = NULL; ctx->padding = NULL;
ctx->callback = NULL; ctx->callback = NULL;
ctx->requested_channels = NULL; ctx->requested_channels = NULL;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment