Commit 35a8c4e1 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

send old messages together when using jsonp

parent bc4aeda8
...@@ -209,8 +209,9 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE = ngx_stri ...@@ -209,8 +209,9 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_CONTENT_TYPE = ngx_stri
static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK = ngx_string("6" CRLF ": -1" CRLF CRLF); static const ngx_str_t NGX_HTTP_PUSH_STREAM_EVENTSOURCE_PING_MESSAGE_CHUNK = ngx_string("6" CRLF ": -1" CRLF CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_LAST_CHUNK = ngx_string("0" CRLF CRLF); static const ngx_str_t NGX_HTTP_PUSH_STREAM_LAST_CHUNK = ngx_string("0" CRLF CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK = ngx_string("3" CRLF "(" CRLF CRLF); static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK = ngx_string("2" CRLF "([" CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK = ngx_string("4" CRLF ");" CRLF CRLF); static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_MID_CHUNK = ngx_string("1" CRLF "," CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK = ngx_string("5" CRLF "]);" CRLF CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN = ngx_string("([^:]+),(\\d+),(\\d+)"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN = ngx_string("([^:]+),(\\d+),(\\d+)");
...@@ -234,7 +235,7 @@ static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_st ...@@ -234,7 +235,7 @@ static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_st
static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool); static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf); static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code); static ngx_int_t ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code);
static ngx_int_t ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg); static ngx_int_t ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_flag_t send_callback, ngx_flag_t send_separator);
static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer); static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer);
static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r); static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r);
static void ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r); static void ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r);
......
...@@ -27,6 +27,6 @@ ...@@ -27,6 +27,6 @@
#define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_
static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.3"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_TAG = ngx_string("0.3.3");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("cd264714638b797333f70d8f4432901507e053c5"); static const ngx_str_t NGX_HTTP_PUSH_STREAM_COMMIT = ngx_string("bc4aeda8fa33e5806abe97cbee93f9fa045af834");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_VERSION_H_ */
/*global PushStream EventSourceWrapper EventSource*/ /*global PushStream WebSocketWrapper EventSourceWrapper EventSource*/
/*jshint evil: true */
/** /**
* Copyright (C) 2010-2011 Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com> * Copyright (C) 2010-2012 Wandenberg Peixoto <wandenberg@gmail.com>, Rogério Carvalho Schneider <stockrt@gmail.com>
* *
* This file is part of Nginx Push Stream Module. * This file is part of Nginx Push Stream Module.
* *
...@@ -27,16 +28,53 @@ ...@@ -27,16 +28,53 @@
/* prevent duplicate declaration */ /* prevent duplicate declaration */
if (window.PushStream) { return; } if (window.PushStream) { return; }
var PATTERN_MESSAGE = /\{"id":([\-\d]*),"channel":"([^"]*)","text":"(.*)"\}/; var validChars = /^[\],:{}\s]*$/,
var PATTERN_MESSAGE_WITH_EVENT_ID = /\{"id":([\-\d]*),"channel":"([^"]*)","text":"(.*)","eventid":"(.*)"\}/; validEscape = /\\(?:["\\\/bfnrt]|u[0-9a-fA-F]{4})/g,
validTokens = /"[^"\\\n\r]*"|true|false|null|-?\d+(?:\.\d*)?(?:[eE][+\-]?\d+)?/g,
validBraces = /(?:^|:|,)(?:\s*\[)+/g;
var PATTERN_MESSAGE_WITH_TAG = /\{"id":([\-\d]*),"channel":"([^"]*)","text":"(.*)","tag":([\-\d]*),"time":"(.*)"\}/; var trim = function(value) {
var PATTERN_MESSAGE_WITH_TAG_AND_EVENT_ID = /\{"id":([\-\d]*),"channel":"([^"]*)","text":"(.*)","tag":([\-\d]*),"time":"(.*)","eventid":"(.*)"\}/; return value.replace(/^\s*/, "").replace(/\s*$/, "");
};
var parseJSON = function(data) {
if (typeof data !== "string" || !data) {
return null;
}
// Make sure leading/trailing whitespace is removed (IE can't handle it)
data = trim(data);
// Attempt to parse using the native JSON parser first
if (window.JSON && window.JSON.parse) {
try {
return window.JSON.parse( data );
} catch(e) {
throw "Invalid JSON: " + data;
}
}
// Make sure the incoming data is actual JSON
// Logic borrowed from http://json.org/json2.js
if (validChars.test(data.replace(validEscape, "@").replace( validTokens, "]").replace( validBraces, "")) ) {
return (new Function("return " + data))();
}
throw "Invalid JSON: " + data;
};
var addTimestampToUrl = function(url) { var addTimestampToUrl = function(url) {
return url + ((url.indexOf('?') < 0) ? '?' : '&') + "_=" + (new Date()).getTime(); return url + ((url.indexOf('?') < 0) ? '?' : '&') + "_=" + (new Date()).getTime();
} }
var isArray = Array.isArray || function(obj) {
return Object.prototype.toString.call(obj) == '[object Array]';
};
var isString = function(obj) {
return Object.prototype.toString.call(obj) == '[object String]';
};
var Log4js = { var Log4js = {
debug : function() { if (PushStream.LOG_LEVEL === 'debug') Log4js._log.apply(Log4js._log, arguments); }, debug : function() { if (PushStream.LOG_LEVEL === 'debug') Log4js._log.apply(Log4js._log, arguments); },
info : function() { if ((PushStream.LOG_LEVEL === 'info') || (PushStream.LOG_LEVEL === 'debug')) Log4js._log.apply(Log4js._log, arguments); }, info : function() { if ((PushStream.LOG_LEVEL === 'info') || (PushStream.LOG_LEVEL === 'debug')) Log4js._log.apply(Log4js._log, arguments); },
...@@ -133,11 +171,20 @@ ...@@ -133,11 +171,20 @@
return xhr; return xhr;
}, },
_clear_script : function(head, script) { _clear_script : function(script) {
// Handling memory leak in IE, removing and dereference the script // Handling memory leak in IE, removing and dereference the script
script.setAttribute("src", null); if (script) {
script.onload = script.onreadystatechange = null; script.onerror = script.onload = script.onreadystatechange = null;
if (head && script.parentNode) head.removeChild(script); if (script.parentNode) script.parentNode.removeChild(script);
}
},
clear : function(settings) {
if (settings.timeoutId) {
settings.timeoutId = window.clearTimeout(settings.timeoutId);
}
Ajax._clear_script(document.getElementById(settings.scriptId));
}, },
jsonp : function(settings) { jsonp : function(settings) {
...@@ -146,33 +193,35 @@ ...@@ -146,33 +193,35 @@
var head = document.head || document.getElementsByTagName("head")[0]; var head = document.head || document.getElementsByTagName("head")[0];
var script = document.createElement("script"); var script = document.createElement("script");
var onerror = function() {
Ajax.clear(settings);
settings.error(304);
};
var onload = function() {
Ajax.clear(settings);
settings.load();
};
script.onerror = onerror;
script.onload = script.onreadystatechange = function(eventLoad) { script.onload = script.onreadystatechange = function(eventLoad) {
if (!script.readyState || /loaded|complete/.test(script.readyState)) { if (!script.readyState || /loaded|complete/.test(script.readyState)) {
if (settings.timeoutId) { onload();
window.clearTimeout(settings.timeoutId);
}
Ajax._clear_script(head, script);
script = undefined;
} }
}; };
if (settings.beforeOpen) settings.beforeOpen({}); if (settings.beforeOpen) settings.beforeOpen({});
if (settings.beforeSend) settings.beforeSend({}); if (settings.beforeSend) settings.beforeSend({});
settings.timeoutId = window.setTimeout(onerror, settings.timeout + 10);
settings.scriptId = settings.scriptId || new Date().getTime();
script.setAttribute("src", addTimestampToUrl(settings.url) + '&' + Ajax._parse_data(settings)); script.setAttribute("src", addTimestampToUrl(settings.url) + '&' + Ajax._parse_data(settings));
script.setAttribute("async", "async"); script.setAttribute("async", "async");
script.setAttribute("id", settings.scriptId);
// Use insertBefore instead of appendChild to circumvent an IE6 bug. // Use insertBefore instead of appendChild to circumvent an IE6 bug.
head.insertBefore(script, head.firstChild); head.insertBefore(script, head.firstChild);
if (settings.error) {
settings.timeoutId = window.setTimeout(function() {
Ajax._clear_script(head, script);
script = undefined;
settings.error(304);
}, settings.timeout + 10);
}
}, },
load : function(settings) { load : function(settings) {
...@@ -193,28 +242,18 @@ ...@@ -193,28 +242,18 @@
}; };
var parseMessage = function(messageText) { var parseMessage = function(messageText) {
var match = null; var msg = messageText;
var hasEventId = false; if (isString(messageText)) {
if (messageText.indexOf('"eventid":"') > 0) { msg = parseJSON(messageText);
hasEventId = true;
match = messageText.match(PATTERN_MESSAGE_WITH_TAG_AND_EVENT_ID);
if (!match || !match[0]) {
match = messageText.match(PATTERN_MESSAGE_WITH_EVENT_ID);
}
} else {
match = messageText.match(PATTERN_MESSAGE_WITH_TAG);
if (!match || !match[0]) {
match = messageText.match(PATTERN_MESSAGE);
}
} }
var message = { var message = {
id : match[1], id : msg.id,
channel: match[2], channel: msg.channel,
data : unescapeText(match[3]), data : unescapeText(msg.text),
tag : match[4], tag : msg.tag,
time : match[5], time : msg.time,
eventid: (hasEventId) ? match[match.length - 1] : "" eventid: msg.eventid || ""
}; };
return message; return message;
...@@ -234,8 +273,10 @@ ...@@ -234,8 +273,10 @@
return path; return path;
}; };
var getSubscriberUrl = function(pushstream, prefix, websocket) { var getSubscriberUrl = function(pushstream, prefix) {
var url = (websocket) ? ((pushstream.useSSL) ? "wss://" : "ws://") : ((pushstream.useSSL) ? "https://" : "http://"); var websocket = pushstream.wrapper.type === WebSocketWrapper.TYPE;
var useSSL = pushstream.useSSL;
var url = (websocket) ? ((useSSL) ? "wss://" : "ws://") : ((useSSL) ? "https://" : "http://");
url += pushstream.host; url += pushstream.host;
url += ((pushstream.port != 80) && (pushstream.port != 443)) ? (":" + pushstream.port) : ""; url += ((pushstream.port != 80) && (pushstream.port != 443)) ? (":" + pushstream.port) : "";
url += prefix; url += prefix;
...@@ -317,7 +358,7 @@ ...@@ -317,7 +358,7 @@
WebSocketWrapper.prototype = { WebSocketWrapper.prototype = {
connect: function() { connect: function() {
this._closeCurrentConnection(); this._closeCurrentConnection();
var url = addTimestampToUrl(getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixWebsocket, true)); var url = addTimestampToUrl(getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixWebsocket));
this.connection = (window.WebSocket) ? new window.WebSocket(url) : new window.MozWebSocket(url); this.connection = (window.WebSocket) ? new window.WebSocket(url) : new window.MozWebSocket(url);
this.connection.onerror = linker(onerrorCallback, this); this.connection.onerror = linker(onerrorCallback, this);
this.connection.onclose = linker(onerrorCallback, this); this.connection.onclose = linker(onerrorCallback, this);
...@@ -495,12 +536,14 @@ ...@@ -495,12 +536,14 @@
this.connectionEnabled = false; this.connectionEnabled = false;
this.opentimer = null; this.opentimer = null;
this.messagesQueue = []; this.messagesQueue = [];
this._linkedInternalListen = linker(this._internalListen, this);
this.xhrSettings = { this.xhrSettings = {
timeout: this.pushstream.longPollingTimeout, timeout: this.pushstream.longPollingTimeout,
data: {}, data: {},
url: null, url: null,
success: linker(this.onmessage, this), success: linker(this.onmessage, this),
error: linker(this.onerror, this), error: linker(this.onerror, this),
load: linker(this.onload, this),
beforeOpen: linker(this.beforeOpen, this), beforeOpen: linker(this.beforeOpen, this),
beforeSend: linker(this.beforeSend, this), beforeSend: linker(this.beforeSend, this),
afterReceive: linker(this.afterReceive, this) afterReceive: linker(this.afterReceive, this)
...@@ -518,18 +561,29 @@ ...@@ -518,18 +561,29 @@
var domain = extract_xss_domain(this.pushstream.host); var domain = extract_xss_domain(this.pushstream.host);
var currentDomain = extract_xss_domain(window.location.hostname); var currentDomain = extract_xss_domain(window.location.hostname);
this.useJSONP = (domain != currentDomain) || this.pushstream.longPollingUseJSONP; this.useJSONP = (domain != currentDomain) || this.pushstream.longPollingUseJSONP;
this.xhrSettings.scriptId = "PushStreamManager_" + this.pushstream.id;
if (this.useJSONP) { if (this.useJSONP) {
this.pushstream.longPollingByHeaders = false; this.pushstream.longPollingByHeaders = false;
this.xhrSettings.data.callback = "PushStreamManager[" + this.pushstream.id + "].wrapper.onmessage"; this.xhrSettings.data.callback = "PushStreamManager[" + this.pushstream.id + "].wrapper.onmessage";
} }
this._listen(); this._internalListen();
this.opentimer = setTimeout(linker(onopenCallback, this), 5000); this.opentimer = setTimeout(linker(onopenCallback, this), 5000);
Log4js.info("[LongPolling] connecting to:", this.xhrSettings.url); Log4js.info("[LongPolling] connecting to:", this.xhrSettings.url);
}, },
_listen: function() { _listen: function() {
if (this.connectionEnabled && !this.connection) { if (this._internalListenTimeout) clearTimer(this._internalListenTimeout);
this.connection = (this.useJSONP) ? Ajax.jsonp(this.xhrSettings) : Ajax.load(this.xhrSettings); this._internalListenTimeout = setTimeout(this._linkedInternalListen, this.pushstream.longPollingInterval);
},
_internalListen: function() {
if (this.connectionEnabled) {
if (this.useJSONP) {
Ajax.clear(this.xhrSettings);
Ajax.jsonp(this.xhrSettings);
} else if (!this.connection) {
this.connection = Ajax.load(this.xhrSettings);
}
} }
}, },
...@@ -592,22 +646,25 @@ ...@@ -592,22 +646,25 @@
} }
}, },
onload: function() {
this._listen();
},
onmessage: function(responseText) { onmessage: function(responseText) {
Log4js.info("[LongPolling] message received", responseText); Log4js.info("[LongPolling] message received", responseText);
var lastMessage = null; var lastMessage = null;
var messages = responseText.split("\r\n"); var messages = isArray(responseText) ? responseText : responseText.split("\r\n");
for (var i = 0; i < messages.length; i++) { for (var i = 0; i < messages.length; i++) {
if (messages[i]) { if (messages[i]) {
lastMessage = parseMessage(messages[i]); lastMessage = parseMessage(messages[i]);
this.messagesQueue.push(lastMessage); this.messagesQueue.push(lastMessage);
if (!this.pushstream.longPollingByHeaders && lastMessage.time) {
this.etag = lastMessage.tag;
this.lastModified = lastMessage.time;
}
} }
} }
if (!this.pushstream.longPollingByHeaders) {
this.etag = lastMessage.tag;
this.lastModified = lastMessage.time;
}
this._listen(); this._listen();
while (this.messagesQueue.length > 0) { while (this.messagesQueue.length > 0) {
...@@ -641,6 +698,7 @@ ...@@ -641,6 +698,7 @@
this.longPollingTimeArgument = settings.longPollingTimeArgument || 'time'; this.longPollingTimeArgument = settings.longPollingTimeArgument || 'time';
this.longPollingUseJSONP = settings.longPollingUseJSONP || false; this.longPollingUseJSONP = settings.longPollingUseJSONP || false;
this.longPollingTimeout = settings.longPollingTimeout || 30000; this.longPollingTimeout = settings.longPollingTimeout || 30000;
this.longPollingInterval = settings.longPollingInterval || 100;
this.reconnecttimer = null; this.reconnecttimer = null;
...@@ -652,7 +710,7 @@ ...@@ -652,7 +710,7 @@
this.modes = (settings.modes || 'eventsource|websocket|stream|longpolling').split('|'); this.modes = (settings.modes || 'eventsource|websocket|stream|longpolling').split('|');
this.wrappers = []; this.wrappers = [];
this.wrapper = null; //TODO test this.wrapper = null;
this.onopen = null; this.onopen = null;
this.onmessage = null; this.onmessage = null;
...@@ -685,9 +743,9 @@ ...@@ -685,9 +743,9 @@
PushStream.LOG_OUTPUT_ELEMENT_ID = 'Log4jsLogOutput'; PushStream.LOG_OUTPUT_ELEMENT_ID = 'Log4jsLogOutput';
/* status codes */ /* status codes */
PushStream.CLOSED = 0; //TODO test PushStream.CLOSED = 0;
PushStream.CONNECTING = 1; PushStream.CONNECTING = 1;
PushStream.OPEN = 2; //TODO test PushStream.OPEN = 2;
/* main code */ /* main code */
PushStream.prototype = { PushStream.prototype = {
...@@ -719,7 +777,7 @@ ...@@ -719,7 +777,7 @@
this.channelsCount = 0; this.channelsCount = 0;
}, },
_setState: function(state) { //TODO test _setState: function(state) {
if (this.readyState != state) { if (this.readyState != state) {
Log4js.info("status changed", state); Log4js.info("status changed", state);
this.readyState = state; this.readyState = state;
...@@ -729,7 +787,7 @@ ...@@ -729,7 +787,7 @@
} }
}, },
connect: function() { //TODO test connect: function() {
Log4js.debug("entering connect"); Log4js.debug("entering connect");
if (!this.host) throw "PushStream host not specified"; if (!this.host) throw "PushStream host not specified";
if (isNaN(this.port)) throw "PushStream port not specified"; if (isNaN(this.port)) throw "PushStream port not specified";
...@@ -743,7 +801,7 @@ ...@@ -743,7 +801,7 @@
Log4js.debug("leaving connect"); Log4js.debug("leaving connect");
}, },
disconnect: function() { //TODO test disconnect: function() {
Log4js.debug("entering disconnect"); Log4js.debug("entering disconnect");
this._keepConnected = false; this._keepConnected = false;
this._disconnect(); this._disconnect();
...@@ -752,7 +810,7 @@ ...@@ -752,7 +810,7 @@
Log4js.debug("leaving disconnect"); Log4js.debug("leaving disconnect");
}, },
_connect: function() { //TODO test _connect: function() {
this._disconnect(); this._disconnect();
this._setState(PushStream.CONNECTING); this._setState(PushStream.CONNECTING);
this.wrapper = this.wrappers[this._lastUsedMode++ % this.wrappers.length]; this.wrapper = this.wrappers[this._lastUsedMode++ % this.wrappers.length];
......
...@@ -399,12 +399,12 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan ...@@ -399,12 +399,12 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
ngx_http_send_header(subscriber->request); ngx_http_send_header(subscriber->request);
ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module)); ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
ngx_http_push_stream_send_response_message(subscriber->request, channel, msg); ngx_http_push_stream_send_response_message(subscriber->request, channel, msg, 1, 0);
ngx_http_push_stream_send_response_finalize(subscriber->request); ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev; cur = prev;
} else { } else {
if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg) != NGX_OK) { if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg, 0, 0) != NGX_OK) {
ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur->queue); ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur->queue);
ngx_http_push_stream_send_response_finalize(subscriber->request); ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev; cur = prev;
......
...@@ -262,6 +262,11 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -262,6 +262,11 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
return NGX_HTTP_INTERNAL_SERVER_ERROR; return NGX_HTTP_INTERNAL_SERVER_ERROR;
} }
if (ctx->callback != NULL) {
ngx_http_push_stream_send_response_text(r, ctx->callback->data, ctx->callback->len, 0);
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK.len, 0);
}
cur = channels_ids; cur = channels_ids;
while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) { while ((cur = (ngx_http_push_stream_requested_channel_t *) ngx_queue_next(&cur->queue)) != channels_ids) {
channel = ngx_http_push_stream_find_channel_locked(cur->id, r->connection->log); channel = ngx_http_push_stream_find_channel_locked(cur->id, r->connection->log);
...@@ -273,6 +278,10 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_ ...@@ -273,6 +278,10 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_send_old_messages(r, channel, cur->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id); ngx_http_push_stream_send_old_messages(r, channel, cur->backtrack_messages, if_modified_since, tag, greater_message_time, greater_message_tag, last_event_id);
} }
if (ctx->callback != NULL) {
ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.len, 0);
}
if (cf->footer_template.len > 0) { if (cf->footer_template.len > 0) {
ngx_http_push_stream_send_response_text(r, cf->footer_template.data, cf->footer_template.len, 0); ngx_http_push_stream_send_response_text(r, cf->footer_template.data, cf->footer_template.len, 0);
} }
...@@ -584,7 +593,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -584,7 +593,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
// positioning at first message, and send the others // positioning at first message, and send the others
while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) { while ((qtd > 0) && (!message->deleted) && ((message = (ngx_http_push_stream_msg_t *) ngx_queue_next(&message->queue)) != message_sentinel)) {
if (start == 0) { if (start == 0) {
ngx_http_push_stream_send_response_message(r, channel, message); ngx_http_push_stream_send_response_message(r, channel, message, 0, 1);
qtd--; qtd--;
} else { } else {
start--; start--;
...@@ -606,7 +615,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre ...@@ -606,7 +615,7 @@ ngx_http_push_stream_send_old_messages(ngx_http_request_t *r, ngx_http_push_stre
} }
if (found && (((greater_message_time == 0) && (greater_message_tag == -1)) || (greater_message_time > message->time) || ((greater_message_time == message->time) && (greater_message_tag >= message->tag)))) { if (found && (((greater_message_time == 0) && (greater_message_tag == -1)) || (greater_message_time > message->time) || ((greater_message_time == message->time) && (greater_message_tag >= message->tag)))) {
ngx_http_push_stream_send_response_message(r, channel, message); ngx_http_push_stream_send_response_message(r, channel, message, 0, 1);
} }
} }
} }
......
...@@ -110,7 +110,7 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data ...@@ -110,7 +110,7 @@ ngx_http_push_stream_delete_unrecoverable_channels(ngx_http_push_stream_shm_data
ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module)); ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module));
} }
ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message); ngx_http_push_stream_send_response_message(subscriber->request, channel, channel->channel_deleted_message, 1, 1);
break; break;
} }
...@@ -433,10 +433,11 @@ ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_htt ...@@ -433,10 +433,11 @@ ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_htt
} }
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg) ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_flag_t send_callback, ngx_flag_t send_separator)
{ {
ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *pslcf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module); ngx_http_push_stream_subscriber_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_push_stream_module);
ngx_flag_t use_jsonp = (ctx != NULL) && (ctx->callback != NULL);
ngx_int_t rc = NGX_OK; ngx_int_t rc = NGX_OK;
if (pslcf->eventsource_support) { if (pslcf->eventsource_support) {
...@@ -452,7 +453,7 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_ ...@@ -452,7 +453,7 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
if (rc == NGX_OK) { if (rc == NGX_OK) {
ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, msg, r->pool); ngx_str_t *str = ngx_http_push_stream_get_formatted_message(r, channel, msg, r->pool);
if (str != NULL) { if (str != NULL) {
if ((rc == NGX_OK) && (ctx != NULL) && (ctx->callback != NULL)) { if ((rc == NGX_OK) && use_jsonp && send_callback) {
rc = ngx_http_push_stream_send_response_text(r, ctx->callback->data, ctx->callback->len, 0); rc = ngx_http_push_stream_send_response_text(r, ctx->callback->data, ctx->callback->len, 0);
if (rc == NGX_OK) { if (rc == NGX_OK) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK.len, 0); rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_INIT_CHUNK.len, 0);
...@@ -463,8 +464,14 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_ ...@@ -463,8 +464,14 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
rc = ngx_http_push_stream_send_response_text(r, str->data, str->len, 0); rc = ngx_http_push_stream_send_response_text(r, str->data, str->len, 0);
} }
if ((rc == NGX_OK) && (ctx != NULL) && (ctx->callback != NULL)) { if ((rc == NGX_OK) && use_jsonp) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.len, 0); if (send_separator) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_MID_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_MID_CHUNK.len, 0);
}
if (send_callback) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.data, NGX_HTTP_PUSH_STREAM_CALLBACK_END_CHUNK.len, 0);
}
} }
if (rc == NGX_OK) { if (rc == NGX_OK) {
...@@ -946,7 +953,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev) ...@@ -946,7 +953,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
} else if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) { } else if (pslcf->location_type == NGX_HTTP_PUSH_STREAM_WEBSOCKET_MODE) {
rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1); rc = ngx_http_push_stream_send_response_text(r, NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE, sizeof(NGX_HTTP_PUSH_STREAM_WEBSOCKET_PING_LAST_FRAME_BYTE), 1);
} else { } else {
rc = ngx_http_push_stream_send_response_message(r, NULL, ngx_http_push_stream_ping_msg); rc = ngx_http_push_stream_send_response_message(r, NULL, ngx_http_push_stream_ping_msg, 1, 1);
} }
if (rc != NGX_OK) { if (rc != NGX_OK) {
......
...@@ -224,11 +224,17 @@ module BaseTestCase ...@@ -224,11 +224,17 @@ module BaseTestCase
TCPSocket.open(nginx_host, nginx_port) TCPSocket.open(nginx_host, nginx_port)
end end
def read_response(socket) def read_response(socket, wait_for=nil)
response = socket.readpartial(1) response ||= socket.readpartial(1)
while (tmp = socket.read_nonblock(256)) while (tmp = socket.read_nonblock(256))
response += tmp response += tmp
end end
rescue Errno::EAGAIN => e
headers, body = (response || "").split("\r\n\r\n", 2)
if !wait_for.nil? && (body.nil? || body.empty? || !body.include?(wait_for))
IO.select([socket])
retry
end
ensure ensure
fail("Any response") if response.nil? fail("Any response") if response.nil?
headers, body = response.split("\r\n\r\n", 2) headers, body = response.split("\r\n\r\n", 2)
......
...@@ -6,7 +6,7 @@ class TestMeasureMemory < Test::Unit::TestCase ...@@ -6,7 +6,7 @@ class TestMeasureMemory < Test::Unit::TestCase
@@message_estimate_size = 174 @@message_estimate_size = 174
@@channel_estimate_size = 536 @@channel_estimate_size = 536
@@subscriber_estimate_size = 230 @@subscriber_estimate_size = 230
@@subscriber_estimate_system_size = 6500 @@subscriber_estimate_system_size = 6600
def global_configuration def global_configuration
@max_reserved_memory = "2m" @max_reserved_memory = "2m"
......
...@@ -34,50 +34,23 @@ class TestSendSignals < Test::Unit::TestCase ...@@ -34,50 +34,23 @@ class TestSendSignals < Test::Unit::TestCase
pub_1.callback { pub_1.callback {
assert_equal(200, pub_1.response_header.status, "Don't get channels statistics") assert_equal(200, pub_1.response_header.status, "Don't get channels statistics")
assert_not_equal(0, pub_1.response_header.content_length, "Don't received channels statistics") assert_not_equal(0, pub_1.response_header.content_length, "Don't received channels statistics")
begin resp_1 = JSON.parse(pub_1.response)
resp_1 = JSON.parse(pub_1.response) assert(resp_1.has_key?("channels"), "Didn't received the correct answer with channels info")
assert(resp_1.has_key?("channels"), "Didn't received the correct answer with channels info") assert_equal(1, resp_1["channels"].to_i, "Didn't create channel")
assert_equal(1, resp_1["channels"].to_i, "Didn't create channel") assert_equal(1, resp_1["by_worker"].count, "Didn't return infos by_worker")
assert_equal(1, resp_1["by_worker"].count, "Didn't return infos by_worker") pid = resp_1["by_worker"][0]['pid'].to_i
pid = resp_1["by_worker"][0]['pid'].to_i
# send reload signal # send reload signal
`#{ nginx_executable } -c #{ config_filename } -s reload > /dev/null 2>&1` `#{ nginx_executable } -c #{ config_filename } -s reload > /dev/null 2>&1`
# publish a message
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 30
pub_2.callback {
# add new subscriber
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :head => headers, :timeout => 30
sub_2.stream { |chunk|
response2 = response2 + chunk
if response2.strip == @header_template
# check statistics again
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30
pub_3.callback {
resp_2 = JSON.parse(pub_3.response)
assert(resp_2.has_key?("channels"), "Didn't received the correct answer with channels info")
assert_equal(1, resp_2["channels"].to_i, "Didn't create channel")
assert_equal(1, resp_2["published_messages"].to_i, "Didn't create messages")
assert_equal(2, resp_2["subscribers"].to_i, "Didn't create subscribers")
assert_equal(2, resp_2["by_worker"].count, "Didn't return infos by_worker")
}
end
}
}
rescue JSON::ParserError
fail("Didn't receive a valid response")
EventMachine.stop
end
} }
end end
} }
conectted_after_reloaded = false
i = 0 i = 0
# check if first worker die # check if first worker die
EM.add_periodic_timer(1) do EM.add_periodic_timer(0.5) do
# check statistics again # check statistics again
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30 pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30
...@@ -85,7 +58,34 @@ class TestSendSignals < Test::Unit::TestCase ...@@ -85,7 +58,34 @@ class TestSendSignals < Test::Unit::TestCase
resp_3 = JSON.parse(pub_4.response) resp_3 = JSON.parse(pub_4.response)
assert(resp_3.has_key?("by_worker"), "Didn't received the correct answer with channels info") assert(resp_3.has_key?("by_worker"), "Didn't received the correct answer with channels info")
if resp_3["by_worker"].count == 1 if resp_3["by_worker"].count == 2 && !conectted_after_reloaded
conectted_after_reloaded = true
# publish a message
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body, :timeout => 30
pub_2.callback {
# add new subscriber
sub_2 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b1').get :head => headers, :timeout => 30
sub_2.stream { |chunk|
response2 = response2 + chunk
if response2.strip == @header_template
# check statistics again
pub_3 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers, :timeout => 30
pub_3.callback {
resp_2 = JSON.parse(pub_3.response)
assert(resp_2.has_key?("channels"), "Didn't received the correct answer with channels info")
assert_equal(1, resp_2["channels"].to_i, "Didn't create channel")
assert_equal(1, resp_2["published_messages"].to_i, "Didn't create messages")
assert_equal(2, resp_2["subscribers"].to_i, "Didn't create subscribers")
assert_equal(2, resp_2["by_worker"].count, "Didn't return infos by_worker")
}
end
}
}
end
if resp_3["by_worker"].count == 1 && conectted_after_reloaded
assert_equal(1, resp_3["channels"].to_i, "Didn't create channel") assert_equal(1, resp_3["channels"].to_i, "Didn't create channel")
assert_equal(1, resp_3["published_messages"].to_i, "Didn't create messages") assert_equal(1, resp_3["published_messages"].to_i, "Didn't create messages")
assert_equal(1, resp_3["subscribers"].to_i, "Didn't create subscribers") assert_equal(1, resp_3["subscribers"].to_i, "Didn't create subscribers")
...@@ -97,7 +97,7 @@ class TestSendSignals < Test::Unit::TestCase ...@@ -97,7 +97,7 @@ class TestSendSignals < Test::Unit::TestCase
end end
i = i + 1 i = i + 1
if i == 60 if i == 120
fail("Worker didn't die in 60 seconds") fail("Worker didn't die in 60 seconds")
EventMachine.stop EventMachine.stop
end end
......
...@@ -585,7 +585,7 @@ class TestSubscriberLongPolling < Test::Unit::TestCase ...@@ -585,7 +585,7 @@ class TestSubscriberLongPolling < Test::Unit::TestCase
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?callback=' + callback_function_name).get :head => headers, :timeout => 30 sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?callback=' + callback_function_name).get :head => headers, :timeout => 30
sub_1.callback { sub_1.callback {
assert_equal("#{callback_function_name}\r\n(\r\n#{body}\r\n);\r\n", sub_1.response, "Wrong message") assert_equal("#{callback_function_name}\r\n([#{body}\r\n]);\r\n", sub_1.response, "Wrong message")
EventMachine.stop EventMachine.stop
} }
...@@ -595,4 +595,26 @@ class TestSubscriberLongPolling < Test::Unit::TestCase ...@@ -595,4 +595,26 @@ class TestSubscriberLongPolling < Test::Unit::TestCase
} }
end end
def test_return_old_messages_using_function_name_specified_in_callback_parameter_grouping_in_one_answer
headers = {'accept' => 'application/json'}
channel = 'ch_test_return_old_messages_using_function_name_specified_in_callback_parameter_grouping_in_one_answer'
body = 'body'
response = ""
callback_function_name = "callback_function"
EventMachine.run {
publish_message_inline(channel, {'accept' => 'text/html'}, body)
publish_message_inline(channel, {'accept' => 'text/html'}, body + "1")
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '.b2' + '?callback=' + callback_function_name).get :head => headers, :timeout => 30
sub_1.callback {
assert_equal("#{callback_function_name}\r\n([#{body}\r\n,#{body + "1"}\r\n,]);\r\n", sub_1.response, "Wrong message")
EventMachine.stop
}
add_test_timeout
}
end
end end
...@@ -593,7 +593,7 @@ class TestSubscriberPolling < Test::Unit::TestCase ...@@ -593,7 +593,7 @@ class TestSubscriberPolling < Test::Unit::TestCase
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?callback=' + callback_function_name).get :head => headers, :timeout => 30 sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s + '?callback=' + callback_function_name).get :head => headers, :timeout => 30
sub_1.callback { sub_1.callback {
assert_equal("#{callback_function_name}\r\n(\r\n#{body}\r\n);\r\n", sub_1.response, "Wrong message") assert_equal("#{callback_function_name}\r\n([#{body}\r\n,]);\r\n", sub_1.response, "Wrong message")
EventMachine.stop EventMachine.stop
} }
......
...@@ -238,7 +238,7 @@ class TestWebSocket < Test::Unit::TestCase ...@@ -238,7 +238,7 @@ class TestWebSocket < Test::Unit::TestCase
socket = TCPSocket.open(nginx_host, nginx_port) socket = TCPSocket.open(nginx_host, nginx_port)
socket.print("#{request}\r\n") socket.print("#{request}\r\n")
headers, body = read_response(socket) headers, body = read_response(socket, "aaa")
assert(body.start_with?("\201~\377\377aaa"), "Wrong response") assert(body.start_with?("\201~\377\377aaa"), "Wrong response")
end end
...@@ -258,7 +258,7 @@ class TestWebSocket < Test::Unit::TestCase ...@@ -258,7 +258,7 @@ class TestWebSocket < Test::Unit::TestCase
socket = TCPSocket.open(nginx_host, nginx_port) socket = TCPSocket.open(nginx_host, nginx_port)
socket.print("#{request}\r\n") socket.print("#{request}\r\n")
headers, body = read_response(socket) headers, body = read_response(socket, "aaa")
assert(body.start_with?("\201\177\000\000\000\000\000\001\000\000aaa"), "Wrong response") assert(body.start_with?("\201\177\000\000\000\000\000\001\000\000aaa"), "Wrong response")
end end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment