Commit 883aea05 authored by Wandenberg's avatar Wandenberg

add support to get old messages using time and tag to all wrappers on pushstream.js

parent f98357ed
......@@ -268,7 +268,6 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_TAG = ngx_string("X-Nginx-Pu
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_COMMIT = ngx_string("X-Nginx-PushStream-Commit");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ETAG = ngx_string("Etag");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_IF_NONE_MATCH = ngx_string("If-None-Match");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_VARY = ngx_string("Vary");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_UPGRADE = ngx_string("Upgrade");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_CONNECTION = ngx_string("Connection");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_SEC_WEBSOCKET_KEY = ngx_string("Sec-WebSocket-Key");
......@@ -333,7 +332,7 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_DELETE_METHODS =
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_METHODS = ngx_string("GET, POST, PUT");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOWED_HEADERS = ngx_string("If-Modified-Since,If-None-Match");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOWED_HEADERS = ngx_string("If-Modified-Since,If-None-Match,Etag,Event-Id,Event-Type,Last-Event-Id");
#define NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(val, fail, r, errormessage) \
if (val == fail) { \
......
......@@ -92,6 +92,22 @@
throw "Invalid JSON: " + data;
};
var getControlParams = function(pushstream) {
var data = {};
data[pushstream.tagArgument] = "";
data[pushstream.timeArgument] = "";
data[pushstream.eventIdArgument] = "";
if (pushstream.messagesControlByArgument) {
data[pushstream.tagArgument] = Number(pushstream._etag);
if (pushstream._lastModified) {
data[pushstream.timeArgument] = pushstream._lastModified;
} else if (pushstream._lastEventId) {
data[pushstream.eventIdArgument] = pushstream._lastEventId;
}
}
return data;
};
var getTime = function() {
return (new Date()).getTime();
};
......@@ -127,6 +143,10 @@
return Object.prototype.toString.call(obj) === '[object String]';
};
var isDate = function(obj) {
return Object.prototype.toString.call(obj) === '[object Date]';
};
var Log4js = {
logger: null,
debug : function() { if (PushStream.LOG_LEVEL === 'debug') { Log4js._log.apply(Log4js._log, arguments); }},
......@@ -344,17 +364,17 @@
return (options.backtrack) ? ".b" + Number(options.backtrack) : "";
};
var getChannelsPath = function(channels) {
var getChannelsPath = function(channels, withBacktrack) {
var path = '';
for (var channelName in channels) {
if (!channels.hasOwnProperty || channels.hasOwnProperty(channelName)) {
path += "/" + channelName + getBacktrack(channels[channelName]);
path += "/" + channelName + (withBacktrack ? getBacktrack(channels[channelName]) : "");
}
}
return path;
};
var getSubscriberUrl = function(pushstream, prefix, extraParams) {
var getSubscriberUrl = function(pushstream, prefix, extraParams, withBacktrack) {
var websocket = pushstream.wrapper.type === WebSocketWrapper.TYPE;
var useSSL = pushstream.useSSL;
var url = (websocket) ? ((useSSL) ? "wss://" : "ws://") : ((useSSL) ? "https://" : "http://");
......@@ -362,7 +382,7 @@
url += ((!useSSL && pushstream.port === 80) || (useSSL && pushstream.port === 443)) ? "" : (":" + pushstream.port);
url += prefix;
var channels = getChannelsPath(pushstream.channels);
var channels = getChannelsPath(pushstream.channels, withBacktrack);
if (pushstream.channelsByArgument) {
var channelParam = {};
channelParam[pushstream.channelsArgument] = channels.substring(1);
......@@ -420,6 +440,9 @@
var onmessageCallback = function(event) {
Log4js.info("[" + this.type + "] message received", arguments);
var message = Utils.parseMessage(event.data, this.pushstream);
if (message.tag) { this.pushstream._etag = message.tag; }
if (message.time) { this.pushstream._lastModified = message.time; }
if (message.eventid) { this.pushstream._lastEventId = message.eventid; }
this.pushstream._onmessage(message.text, message.id, message.channel, message.eventid, true);
};
......@@ -455,8 +478,8 @@
WebSocketWrapper.prototype = {
connect: function() {
this._closeCurrentConnection();
var params = extend({}, this.pushstream.extraParams(), currentTimestampParam());
var url = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixWebsocket, params);
var params = extend({}, this.pushstream.extraParams(), currentTimestampParam(), getControlParams(this.pushstream));
var url = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixWebsocket, params, !this.pushstream._useControlArguments());
this.connection = (window.WebSocket) ? new window.WebSocket(url) : new window.MozWebSocket(url);
this.connection.onerror = linker(onerrorCallback, this);
this.connection.onclose = linker(onerrorCallback, this);
......@@ -498,8 +521,8 @@
EventSourceWrapper.prototype = {
connect: function() {
this._closeCurrentConnection();
var params = extend({}, this.pushstream.extraParams(), currentTimestampParam());
var url = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixEventsource, params);
var params = extend({}, this.pushstream.extraParams(), currentTimestampParam(), getControlParams(this.pushstream));
var url = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixEventsource, params, !this.pushstream._useControlArguments());
this.connection = new window.EventSource(url);
this.connection.onerror = linker(onerrorCallback, this);
this.connection.onopen = linker(onopenCallback, this);
......@@ -545,8 +568,8 @@
} catch(e) {
Log4js.error("[Stream] (warning) problem setting document.domain = " + domain + " (OBS: IE8 does not support set IP numbers as domain)");
}
var params = extend({}, this.pushstream.extraParams(), currentTimestampParam(), {"streamid": this.pushstream.id});
this.url = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixStream, params);
var params = extend({}, this.pushstream.extraParams(), currentTimestampParam(), {"streamid": this.pushstream.id}, getControlParams(this.pushstream));
this.url = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixStream, params, !this.pushstream._useControlArguments());
Log4js.debug("[Stream] connecting to:", this.url);
this.loadFrame(this.url);
},
......@@ -621,9 +644,14 @@
Log4js.info("[Stream] frame registered");
},
process: function(id, channel, text, eventid) {
process: function(id, channel, text, eventid, time, tag) {
this.pingtimer = clearTimer(this.pingtimer);
Log4js.info("[Stream] message received", arguments);
if (id !== -1) {
if (tag) { this.pushstream._etag = tag; }
if (time) { this.pushstream._lastModified = time; }
if (eventid) { this.pushstream._lastEventId = eventid; }
}
this.pushstream._onmessage(unescapeText(text), id, channel, eventid || "", true);
this.setPingTimer();
},
......@@ -644,8 +672,6 @@
this.type = LongPollingWrapper.TYPE;
this.pushstream = pushstream;
this.connection = null;
this.lastModified = null;
this.etag = 0;
this.opentimer = null;
this.messagesQueue = [];
this._linkedInternalListen = linker(this._internalListen, this);
......@@ -656,7 +682,6 @@
success: linker(this.onmessage, this),
error: linker(this.onerror, this),
load: linker(this.onload, this),
beforeOpen: linker(this.beforeOpen, this),
beforeSend: linker(this.beforeSend, this),
afterReceive: linker(this.afterReceive, this)
};
......@@ -668,7 +693,9 @@
connect: function() {
this.messagesQueue = [];
this._closeCurrentConnection();
this.xhrSettings.url = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixLongpolling);
this.urlWithBacktrack = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixLongpolling, {}, true);
this.urlWithoutBacktrack = getSubscriberUrl(this.pushstream, this.pushstream.urlPrefixLongpolling, {}, false);
this.xhrSettings.url = this.urlWithBacktrack;
var domain = Utils.extract_xss_domain(this.pushstream.host);
var currentDomain = Utils.extract_xss_domain(window.location.hostname);
var port = this.pushstream.port;
......@@ -690,7 +717,8 @@
_internalListen: function() {
if (this.pushstream._keepConnected) {
this.xhrSettings.data = extend({}, this.pushstream.extraParams(), this.xhrSettings.data);
this.xhrSettings.url = this.pushstream._useControlArguments() ? this.urlWithoutBacktrack : this.urlWithBacktrack;
this.xhrSettings.data = extend({}, this.pushstream.extraParams(), this.xhrSettings.data, getControlParams(this.pushstream));
if (this.useJSONP) {
this.connection = Ajax.jsonp(this.xhrSettings);
} else if (!this.connection) {
......@@ -714,35 +742,21 @@
try { Ajax.clear(this.connection); } catch (e) { /* ignore error on closing */ }
}
this.connection = null;
this.lastModified = null;
this.xhrSettings.url = null;
}
},
beforeOpen: function(xhr) {
if (this.lastModified === null) {
var date = new Date();
if (this.pushstream.secondsAgo) { date.setTime(date.getTime() - (this.pushstream.secondsAgo * 1000)); }
this.lastModified = Utils.dateToUTCString(date);
}
if (this.pushstream.messagesControlByArgument) {
this.xhrSettings.data[this.pushstream.tagArgument] = this.etag;
this.xhrSettings.data[this.pushstream.timeArgument] = this.lastModified;
}
},
beforeSend: function(xhr) {
if (!this.pushstream.messagesControlByArgument) {
xhr.setRequestHeader("If-None-Match", this.etag);
xhr.setRequestHeader("If-Modified-Since", this.lastModified);
xhr.setRequestHeader("If-None-Match", this.pushstream._etag);
xhr.setRequestHeader("If-Modified-Since", this.pushstream._lastModified);
}
},
afterReceive: function(xhr) {
if (!this.pushstream.messagesControlByArgument) {
this.etag = xhr.getResponseHeader('Etag');
this.lastModified = xhr.getResponseHeader('Last-Modified');
this.pushstream._etag = xhr.getResponseHeader('Etag');
this.pushstream._lastModified = xhr.getResponseHeader('Last-Modified');
}
this.connection = null;
},
......@@ -764,16 +778,17 @@
},
onmessage: function(responseText) {
if (this._internalListenTimeout) { clearTimer(this._internalListenTimeout); }
Log4js.info("[LongPolling] message received", responseText);
var lastMessage = null;
var messages = isArray(responseText) ? responseText : responseText.split("\r\n");
var messages = isArray(responseText) ? responseText : responseText.replace(/\}\{/g, "}\r\n{").split("\r\n");
for (var i = 0; i < messages.length; i++) {
if (messages[i]) {
lastMessage = Utils.parseMessage(messages[i], this.pushstream);
this.messagesQueue.push(lastMessage);
if (this.pushstream.messagesControlByArgument && lastMessage.time) {
this.etag = lastMessage.tag;
this.lastModified = lastMessage.time;
this.pushstream._etag = lastMessage.tag;
this.pushstream._lastModified = lastMessage.time;
}
}
}
......@@ -805,13 +820,18 @@
this.reconnectOnTimeoutInterval = settings.reconnectOnTimeoutInterval || 3000;
this.reconnectOnChannelUnavailableInterval = settings.reconnectOnChannelUnavailableInterval || 60000;
this.secondsAgo = Number(settings.secondsAgo);
this.lastEventId = settings.lastEventId || null;
this.messagesPublishedAfter = settings.messagesPublishedAfter;
this.messagesControlByArgument = settings.messagesControlByArgument || false;
this.tagArgument = settings.tagArgument || 'tag';
this.timeArgument = settings.timeArgument || 'time';
this.eventIdArgument = settings.eventIdArgument || 'eventid';
this.useJSONP = settings.useJSONP || false;
this.reconnecttimer = null;
this._reconnecttimer = null;
this._etag = 0;
this._lastModified = null;
this._lastEventId = null;
this.urlPrefixPublisher = settings.urlPrefixPublisher || '/pub';
this.urlPrefixStream = settings.urlPrefixStream || '/sub';
......@@ -930,7 +950,31 @@
Log4js.debug("leaving disconnect");
},
_useControlArguments :function() {
return this.messagesControlByArgument && ((this._lastModified !== null) || (this._lastEventId !== null));
},
_connect: function() {
if (this._lastEventId === null) {
this._lastEventId = this.lastEventId;
}
if (this._lastModified === null) {
var date = this.messagesPublishedAfter;
if (!isDate(date)) {
var messagesPublishedAfter = Number(this.messagesPublishedAfter);
if (messagesPublishedAfter > 0) {
date = new Date();
date.setTime(date.getTime() - (messagesPublishedAfter * 1000));
} else if (messagesPublishedAfter < 0) {
date = new Date(0);
}
}
if (isDate(date)) {
this._lastModified = Utils.dateToUTCString(date);
}
}
this._disconnect();
this._setState(PushStream.CONNECTING);
this.wrapper = this.wrappers[this._lastUsedMode++ % this.wrappers.length];
......@@ -946,14 +990,14 @@
},
_disconnect: function() {
this.reconnecttimer = clearTimer(this.reconnecttimer);
this._reconnecttimer = clearTimer(this._reconnecttimer);
if (this.wrapper) {
this.wrapper.disconnect();
}
},
_onopen: function() {
this.reconnecttimer = clearTimer(this.reconnecttimer);
this._reconnecttimer = clearTimer(this._reconnecttimer);
this._setState(PushStream.OPEN);
if (this._lastUsedMode > 0) {
this._lastUsedMode--; //use same mode on next connection
......@@ -961,7 +1005,7 @@
},
_onclose: function() {
this.reconnecttimer = clearTimer(this.reconnecttimer);
this._reconnecttimer = clearTimer(this._reconnecttimer);
this._setState(PushStream.CLOSED);
this._reconnect(this.reconnectOnTimeoutInterval);
},
......@@ -982,9 +1026,9 @@
},
_reconnect: function(timeout) {
if (this._keepConnected && !this.reconnecttimer && (this.readyState !== PushStream.CONNECTING)) {
if (this._keepConnected && !this._reconnecttimer && (this.readyState !== PushStream.CONNECTING)) {
Log4js.info("trying to reconnect in", timeout);
this.reconnecttimer = window.setTimeout(linker(this._connect, this), timeout);
this._reconnecttimer = window.setTimeout(linker(this._connect, this), timeout);
}
},
......
......@@ -15,47 +15,30 @@ events {
}
http {
default_type application/octet-stream;
access_log logs/nginx-http_access.log;
error_log logs/nginx-http_error.log debug;
tcp_nopush off;
tcp_nodelay on;
keepalive_timeout 10;
send_timeout 10;
client_body_timeout 10;
client_header_timeout 10;
sendfile on;
client_header_buffer_size 1k;
large_client_header_buffers 2 4k;
client_max_body_size 1k;
client_body_buffer_size 1k;
ignore_invalid_headers on;
push_stream_shared_memory_size 100m;
push_stream_max_channel_id_length 200;
push_stream_shared_memory_size 100m;
push_stream_max_channel_id_length 200;
# max messages to store in memory
push_stream_max_messages_stored_per_channel 20;
push_stream_max_messages_stored_per_channel 20;
# message ttl
push_stream_message_ttl 5m;
push_stream_message_ttl 5m;
# ping frequency
push_stream_ping_message_interval 10s;
push_stream_ping_message_interval 1s;
# connection ttl to enable recycle
push_stream_subscriber_connection_ttl 15m;
push_stream_subscriber_connection_ttl 15m;
# connection ttl for long polling
push_stream_longpolling_connection_ttl 30s;
push_stream_timeout_with_body off;
# wildcard
push_stream_wildcard_channel_prefix "broad_";
push_stream_wildcard_channel_max_qtd 3;
#push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":\"~text~\",\"eventid\":\"~event-id~\"}";
push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":\"~text~\"}";
push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":\"~text~\", \"tag\":\"~tag~\", \"time\":\"~time~\", \"eventid\":\"~event-id~\"}";
# subscriber may create channels on demand or only authorized
# (publisher) may do it?
# subscriber may create channels on demand or only authorized (publisher) may do it?
push_stream_authorized_channels_only off;
push_stream_wildcard_channel_max_qtd 3;
push_stream_allowed_origins "*";
......@@ -82,7 +65,7 @@ http {
push_stream_channels_path $arg_id;
# store messages in memory
push_stream_store_messages off;
push_stream_store_messages on;
# Message size limit
# client_max_body_size MUST be equal to client_body_buffer_size or
......@@ -105,12 +88,17 @@ http {
push_stream_header_template "<html><head><meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-store\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-cache\">\r\n<meta http-equiv=\"Pragma\" content=\"no-cache\">\r\n<meta http-equiv=\"Expires\" content=\"Thu, 1 Jan 1970 00:00:00 GMT\">\r\n<script type=\"text/javascript\">\r\nwindow.onError = null;\r\ntry{ document.domain = (window.location.hostname.match(/^(\d{1,3}\.){3}\d{1,3}$/)) ? window.location.hostname : window.location.hostname.split('.').slice(-1 * Math.max(window.location.hostname.split('.').length - 1, (window.location.hostname.match(/(\w{4,}\.\w{2}|\.\w{3,})$/) ? 2 : 3))).join('.');}catch(e){}\r\nparent.PushStream.register(this);\r\n</script>\r\n</head>\r\n<body>";
# message template
#push_stream_message_template "<script>p(~id~,'~channel~','~text~','~event-id~');</script>";
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
push_stream_message_template "<script>p(~id~,'~channel~','~text~','~event-id~', '~time~', '~tag~');</script>";
# footer to be sent when finishing subscriber connection
push_stream_footer_template "</body></html>";
# content-type
default_type "text/html; charset=utf-8";
if ($arg_qs = "on") {
push_stream_last_received_message_time "$arg_time";
push_stream_last_received_message_tag "$arg_tag";
push_stream_last_event_id "$arg_eventid";
}
}
location ~ /ev/(.*) {
......@@ -122,6 +110,12 @@ http {
if ($arg_tests = "on") {
push_stream_channels_path "test_$1";
}
if ($arg_qs = "on") {
push_stream_last_received_message_time "$arg_time";
push_stream_last_received_message_tag "$arg_tag";
push_stream_last_event_id "$arg_eventid";
}
}
location ~ /lp/(.*) {
......@@ -133,16 +127,21 @@ http {
if ($arg_tests = "on") {
push_stream_channels_path "test_$1";
}
if ($arg_qs = "on") {
push_stream_last_received_message_time "$arg_time";
push_stream_last_received_message_tag "$arg_tag";
push_stream_last_event_id "$arg_eventid";
}
}
location ~ /jsonp/(.*) {
# activate long-polling mode for this location
push_stream_subscriber long-polling;
push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":\"~text~\", \"tag\":\"~tag~\", \"time\":\"~time~\"}";
push_stream_last_received_message_time "$arg_time";
push_stream_last_received_message_tag "$arg_tag";
push_stream_last_event_id "$arg_eventid";
# positional channel path
push_stream_channels_path $1;
......@@ -165,6 +164,12 @@ http {
push_stream_store_messages on;
push_stream_websocket_allow_publish on;
if ($arg_qs = "on") {
push_stream_last_received_message_time "$arg_time";
push_stream_last_received_message_tag "$arg_tag";
push_stream_last_event_id "$arg_eventid";
}
}
location / {
......
......@@ -54,8 +54,8 @@ describe("PushStream", function() {
expect(pushstream.readyState).toBe(PushStream.CLOSED);
});
it("should set seconds ago as NaN", function() {
expect(isNaN(pushstream.secondsAgo)).toBeTruthy();
it("should set messagesPublishedAfter as undefined", function() {
expect(pushstream.messagesPublishedAfter).toBe(undefined);
});
describe("for operation timeouts", function() {
......@@ -133,6 +133,10 @@ describe("PushStream", function() {
expect(pushstream.timeArgument).toBe('time');
});
it("should has a argument for 'eventid'", function() {
expect(pushstream.eventIdArgument).toBe('eventid');
});
it("should has a argument for 'channels'", function() {
expect(pushstream.channelsArgument).toBe('channels');
});
......@@ -270,11 +274,13 @@ describe("PushStream", function() {
for (var i = 0; i < PushStreamManager.length; i++) {
PushStreamManager[i].disconnect();
}
channelName = "ch_" + new Date().getTime();
channelName = "ch_" + new Date().getTime() + "_" + Math.floor((Math.random() * 1000) + 1);
});
afterEach(function() {
if (pushstream) { pushstream.disconnect(); }
for (var i = 0; i < PushStreamManager.length; i++) {
PushStreamManager[i].disconnect();
}
});
describe("when connecting", function() {
......@@ -383,7 +389,7 @@ describe("PushStream", function() {
expect(data.subscribers).toBe("0");
}
});
}, 3000);
}, 5000);
});
});
});
......@@ -396,7 +402,7 @@ describe("PushStream", function() {
modes: mode,
port: port,
useJSONP: jsonp,
urlPrefixLongpolling: urlPrefixLongpolling,
urlPrefixLongpolling: '/jsonp',
onstatuschange: function(st) {
status.push(st);
},
......@@ -427,14 +433,12 @@ describe("PushStream", function() {
waitsFor(function() {
return messages.length >= 2;
}, "The callback was not called", 2000);
}, "The callback was not called", 2500);
runs(function() {
expect(status).toEqual([PushStream.CONNECTING, PushStream.OPEN, PushStream.CLOSED, PushStream.CONNECTING, PushStream.OPEN]);
expect(messages).toEqual([
["a test message", 1, channelName, "", true],
["message on other channel", 1, "other_" + channelName, "", true]
]);
expect(messages[0]).toEqual(["a test message", 1, channelName, "", true]);
expect(messages[1]).toEqual(["message on other channel", 1, "other_" + channelName, "", true]);
});
});
});
......@@ -542,6 +546,464 @@ describe("PushStream", function() {
});
});
});
describe("when getting old messages", function() {
it("should be possible use time", function() {
var messages = [];
var receivedMessage = receivedMessage2 = false;
var finished = false;
pushstream = new PushStream({
messagesControlByArgument: true,
messagesPublishedAfter: 1,
modes: mode,
port: port,
useJSONP: jsonp,
urlPrefixLongpolling: urlPrefixLongpolling,
extraParams: function() {
return {"qs":"on"};
},
onmessage: function(text, id, channel, eventid, isLastMessageFromBatch) {
messages.push([text, id, channel, eventid, isLastMessageFromBatch]);
if (messages.length == 1) {
receivedMessage = true;
pushstream.disconnect();
}
if (messages.length >= 2) {
receivedMessage2 = true;
pushstream.disconnect();
}
}
});
pushstream.addChannel(channelName);
runs(function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message", function() {
pushstream.connect();
});
});
waitsFor(function() {
return receivedMessage;
}, "The callback was not called", 2000);
runs(function() {
setTimeout(function() {
$.ajax({
url: "http://" + nginxServer + "/pub?id=" + channelName,
success: function(data) {
expect(data.subscribers).toBe("0");
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message", function() {
pushstream.connect();
});
}
});
}, 1500);
});
waitsFor(function() {
return receivedMessage2;
}, "The callback was not called", 3000);
runs(function() {
setTimeout(function() {
expect(messages[0]).toEqual(["a test message", 1, channelName, "", true]);
expect(messages[1]).toEqual(["another test message", 2, channelName, "", true]);
finished = true;
}, 500);
});
waitsFor(function() {
return finished;
}, "The callback was not called", 5000);
});
it("should be possible use a Date object", function() {
var messages = [];
var receivedMessage = receivedMessage2 = false;
var finished = false;
var now = new Date();
pushstream = new PushStream({
messagesControlByArgument: true,
messagesPublishedAfter: new Date(now.getTime() - 1000),
modes: mode,
port: port,
useJSONP: jsonp,
urlPrefixLongpolling: urlPrefixLongpolling,
extraParams: function() {
return {"qs":"on"};
},
onmessage: function(text, id, channel, eventid, isLastMessageFromBatch) {
messages.push([text, id, channel, eventid, isLastMessageFromBatch]);
if (messages.length == 1) {
receivedMessage = true;
pushstream.disconnect();
}
if (messages.length >= 2) {
receivedMessage2 = true;
pushstream.disconnect();
}
}
});
pushstream.addChannel(channelName);
runs(function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message", function() {
pushstream.connect();
});
});
waitsFor(function() {
return receivedMessage;
}, "The callback was not called", 2000);
runs(function() {
setTimeout(function() {
$.ajax({
url: "http://" + nginxServer + "/pub?id=" + channelName,
success: function(data) {
expect(data.subscribers).toBe("0");
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message", function() {
pushstream.connect();
});
}
});
}, 1500);
});
waitsFor(function() {
return receivedMessage2;
}, "The callback was not called", 3000);
runs(function() {
setTimeout(function() {
expect(messages[0]).toEqual(["a test message", 1, channelName, "", true]);
expect(messages[1]).toEqual(["another test message", 2, channelName, "", true]);
finished = true;
}, 500);
});
waitsFor(function() {
return finished;
}, "The callback was not called", 5000);
});
it("should be possible use a negative value to get messages since epoch time", function() {
var messages = [];
var receivedMessage = receivedMessage2 = false;
var finished = false;
pushstream = new PushStream({
messagesControlByArgument: true,
messagesPublishedAfter: -10,
modes: mode,
port: port,
useJSONP: jsonp,
urlPrefixLongpolling: urlPrefixLongpolling,
extraParams: function() {
return {"qs":"on"};
},
onmessage: function(text, id, channel, eventid, isLastMessageFromBatch) {
messages.push([text, id, channel, eventid, isLastMessageFromBatch]);
if (messages.length == 2) {
receivedMessage = true;
// set a delay to wait for a ping message on streaming
setTimeout(function() {
pushstream.disconnect();
}, (pushstream.wrapper.type === "LongPolling") ? 5 : 1500);
}
if (messages.length >= 4) {
receivedMessage2 = true;
pushstream.disconnect();
}
}
});
pushstream.addChannel(channelName);
runs(function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message 1", function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message 2", function() {
pushstream.connect();
});
});
});
waitsFor(function() {
return receivedMessage;
}, "The callback was not called", 2000);
runs(function() {
setTimeout(function() {
$.ajax({
url: "http://" + nginxServer + "/pub?id=" + channelName,
success: function(data) {
expect(data.subscribers).toBe("0");
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message 1", function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message 2", function() {
pushstream.connect();
});
});
}
});
}, 1500);
});
waitsFor(function() {
return receivedMessage2;
}, "The callback was not called", 3000);
runs(function() {
setTimeout(function() {
expect(messages[0]).toEqual(["a test message 1", 1, channelName, "", (pushstream.wrapper.type === "LongPolling") ? false : true]);
expect(messages[1]).toEqual(["a test message 2", 2, channelName, "", true]);
expect(messages[2]).toEqual(["another test message 1", 3, channelName, "", (pushstream.wrapper.type === "LongPolling") ? false : true]);
expect(messages[3]).toEqual(["another test message 2", 4, channelName, "", true]);
finished = true;
}, 500);
});
waitsFor(function() {
return finished;
}, "The callback was not called", 5000);
});
it("should be possible use backtrack", function() {
var messages = [];
var receivedMessage = receivedMessage2 = false;
var finished = false;
pushstream = new PushStream({
modes: mode,
port: port,
useJSONP: jsonp,
urlPrefixLongpolling: urlPrefixLongpolling,
extraParams: function() {
return {"qs":"on"};
},
onmessage: function(text, id, channel, eventid, isLastMessageFromBatch) {
messages.push([text, id, channel, eventid, isLastMessageFromBatch]);
if (messages.length == 1) {
receivedMessage = true;
pushstream.disconnect();
}
if (messages.length >= 2) {
receivedMessage2 = true;
pushstream.disconnect();
}
}
});
pushstream.addChannel(channelName, {backtrack: 1});
runs(function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message 1", function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message 2", function() {
pushstream.connect();
});
});
});
waitsFor(function() {
return receivedMessage;
}, "The callback was not called", 2000);
runs(function() {
setTimeout(function() {
$.ajax({
url: "http://" + nginxServer + "/pub?id=" + channelName,
success: function(data) {
expect(data.subscribers).toBe("0");
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message 1", function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message 2", function() {
pushstream.connect();
});
});
}
});
}, 1500);
});
waitsFor(function() {
return receivedMessage2;
}, "The callback was not called", 3000);
runs(function() {
setTimeout(function() {
expect(messages[0]).toEqual(["a test message 2", 2, channelName, "", true]);
if (jsonp) {
expect(messages[1]).toEqual(["another test message 1", 3, channelName, "", false]);
expect(messages[2]).toEqual(["another test message 2", 4, channelName, "", true]);
} else {
expect(messages[1]).toEqual(["another test message 2", 4, channelName, "", true]);
}
finished = true;
}, 500);
});
waitsFor(function() {
return finished;
}, "The callback was not called", 5000);
});
it("should be possible use event_id", function() {
var messages = [];
var receivedMessage = receivedMessage2 = false;
var finished = false;
pushstream = new PushStream({
messagesControlByArgument: true,
lastEventId: "some_event_id",
modes: mode,
port: port,
useJSONP: jsonp,
urlPrefixLongpolling: urlPrefixLongpolling,
extraParams: function() {
return {"qs":"on"};
},
onmessage: function(text, id, channel, eventid, isLastMessageFromBatch) {
messages.push([text, id, channel, eventid, isLastMessageFromBatch]);
if (messages.length == 1) {
receivedMessage = true;
pushstream.disconnect();
}
if (messages.length >= 3) {
receivedMessage2 = true;
pushstream.disconnect();
}
}
});
pushstream.addChannel(channelName);
runs(function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message 1", function() {
$.ajax({ url: "http://" + nginxServer + "/pub?id=" + channelName,
type: "POST", data: "a test message 2",
beforeSend: function(req) { req.setRequestHeader("Event-Id", "some_event_id"); },
success: function() {
$.ajax({ url: "http://" + nginxServer + "/pub?id=" + channelName,
type: "POST", data: "a test message 3",
beforeSend: function(req) { req.setRequestHeader("Event-Id", "some_event_id_2"); },
success: function() {
pushstream.connect();
}
});
}
});
});
});
waitsFor(function() {
return receivedMessage;
}, "The callback was not called", 2000);
runs(function() {
setTimeout(function() {
$.ajax({
url: "http://" + nginxServer + "/pub?id=" + channelName,
success: function(data) {
expect(data.subscribers).toBe("0");
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message 1", function() {
$.ajax({
url: "http://" + nginxServer + "/pub?id=" + channelName,
type: "post",
data: "another test message 2",
beforeSend: function(req) { req.setRequestHeader("Event-Id", "some_other_event_id"); },
success: function() {
pushstream.connect();
}
});
});
}
});
}, 1500);
});
waitsFor(function() {
return receivedMessage2;
}, "The callback was not called", 3000);
runs(function() {
setTimeout(function() {
expect(messages[0]).toEqual(["a test message 3", 3, channelName, "some_event_id_2", true]);
expect(messages[1]).toEqual(["another test message 1", 4, channelName, "", (pushstream.wrapper.type !== "LongPolling")]);
expect(messages[2]).toEqual(["another test message 2", 5, channelName, "some_other_event_id", true]);
finished = true;
}, 500);
});
waitsFor(function() {
return finished;
}, "The callback was not called", 5000);
});
it("should be possible mix backtrack and time", function() {
var messages = [];
var receivedMessage = receivedMessage2 = false;
var finished = false;
pushstream = new PushStream({
messagesControlByArgument: true,
modes: mode,
port: port,
useJSONP: jsonp,
urlPrefixLongpolling: urlPrefixLongpolling,
extraParams: function() {
return {"qs":"on"};
},
onmessage: function(text, id, channel, eventid, isLastMessageFromBatch) {
messages.push([text, id, channel, eventid, isLastMessageFromBatch]);
if (messages.length >= 3) {
receivedMessage2 = true;
pushstream.disconnect();
}
if (messages.length == 1) {
receivedMessage = true;
pushstream.disconnect();
}
}
});
pushstream.addChannel(channelName, {backtrack: 1});
runs(function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message 1", function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "a test message 2", function() {
pushstream.connect();
});
});
});
waitsFor(function() {
return receivedMessage;
}, "The callback was not called", 2000);
runs(function() {
setTimeout(function() {
$.ajax({
url: "http://" + nginxServer + "/pub?id=" + channelName,
success: function(data) {
expect(data.subscribers).toBe("0");
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message 1", function() {
$.post("http://" + nginxServer + "/pub?id=" + channelName, "another test message 2", function() {
pushstream.connect();
});
});
}
});
}, 1500);
});
waitsFor(function() {
return receivedMessage2;
}, "The callback was not called", 3000);
runs(function() {
setTimeout(function() {
expect(messages[0]).toEqual(["a test message 2", 2, channelName, "", true]);
expect(messages[1]).toEqual(["another test message 1", 3, channelName, "", (pushstream.wrapper.type !== "LongPolling")]);
expect(messages[2]).toEqual(["another test message 2", 4, channelName, "", true]);
finished = true;
}, 500);
});
waitsFor(function() {
return finished;
}, "The callback was not called", 5000);
});
});
};
describe("on Stream mode", function() {
......
......@@ -356,6 +356,7 @@ describe "Publisher Properties" do
pub.callback do
pub.response_header['ACCESS_CONTROL_ALLOW_ORIGIN'].should be_nil
pub.response_header['ACCESS_CONTROL_ALLOW_METHODS'].should be_nil
pub.response_header['ACCESS_CONTROL_ALLOW_HEADERS'].should be_nil
EventMachine.stop
end
......@@ -373,6 +374,7 @@ describe "Publisher Properties" do
pub.callback do
pub.response_header['ACCESS_CONTROL_ALLOW_ORIGIN'].should eql("custom.domain.com")
pub.response_header['ACCESS_CONTROL_ALLOW_METHODS'].should eql(accepted_methods)
pub.response_header['ACCESS_CONTROL_ALLOW_HEADERS'].should eql("If-Modified-Since,If-None-Match,Etag,Event-Id,Event-Type,Last-Event-Id")
EventMachine.stop
end
......
......@@ -657,7 +657,7 @@ describe "Subscriber Properties" do
sub_1.stream do |chunk|
sub_1.response_header['ACCESS_CONTROL_ALLOW_ORIGIN'].should eql("custom.domain.com")
sub_1.response_header['ACCESS_CONTROL_ALLOW_METHODS'].should eql("GET")
sub_1.response_header['ACCESS_CONTROL_ALLOW_HEADERS'].should eql("If-Modified-Since,If-None-Match")
sub_1.response_header['ACCESS_CONTROL_ALLOW_HEADERS'].should eql("If-Modified-Since,If-None-Match,Etag,Event-Id,Event-Type,Last-Event-Id")
EventMachine.stop
end
......
......@@ -43,6 +43,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN, &cf->allowed_origins);
const ngx_str_t *header_value = (cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) ? &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_DELETE_METHODS : &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_METHODS;
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ACCESS_CONTROL_ALLOW_METHODS, header_value);
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ACCESS_CONTROL_ALLOW_HEADERS, &NGX_HTTP_PUSH_STREAM_ALLOWED_HEADERS);
}
if (r->method & NGX_HTTP_OPTIONS) {
......
......@@ -195,19 +195,19 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
offsetof(ngx_http_push_stream_loc_conf_t, websocket_allow_publish),
NULL },
{ ngx_string("push_stream_last_received_message_time"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, last_received_message_time),
NULL },
{ ngx_string("push_stream_last_received_message_tag"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, last_received_message_tag),
NULL },
{ ngx_string("push_stream_last_event_id"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, last_event_id),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment