Commit a6ea2b4b authored by Dave Cridland's avatar Dave Cridland

Use Deque instead of Map, and add delay stamping

After discussion with Jonny, switched the existing Map to a Deque, and in
response to Tom's suggestion, added XEP-0203 (and legacy) delay stamping.

Tested using Gajim and Swift; testing the delay stamping was done by disabling
the acknowledgement handling to force retransmission.
parent b4e47ce8
...@@ -210,14 +210,18 @@ public class OfflineMessageStore extends BasicModule implements UserEventListene ...@@ -210,14 +210,18 @@ public class OfflineMessageStore extends BasicModule implements UserEventListene
xmlReader.read(new StringReader(msgXML)).getRootElement()); xmlReader.read(new StringReader(msgXML)).getRootElement());
} }
// Add a delayed delivery (XEP-0203) element to the message. // if there is already a delay stamp, we shouldn't add another.
Element delay = message.addChildElement("delay", "urn:xmpp:delay"); Element delaytest = message.getChildElement("delay", "urn:xmpp:delay");
delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getXMPPDomain()); if (delaytest == null) {
delay.addAttribute("stamp", XMPPDateTimeFormat.format(creationDate)); // Add a delayed delivery (XEP-0203) element to the message.
// Add a legacy delayed delivery (XEP-0091) element to the message. XEP is obsolete and support should be dropped in future. Element delay = message.addChildElement("delay", "urn:xmpp:delay");
delay = message.addChildElement("x", "jabber:x:delay"); delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getXMPPDomain());
delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getXMPPDomain()); delay.addAttribute("stamp", XMPPDateTimeFormat.format(creationDate));
delay.addAttribute("stamp", XMPPDateTimeFormat.formatOld(creationDate)); // Add a legacy delayed delivery (XEP-0091) element to the message. XEP is obsolete and support should be dropped in future.
delay = message.addChildElement("x", "jabber:x:delay");
delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getXMPPDomain());
delay.addAttribute("stamp", XMPPDateTimeFormat.formatOld(creationDate));
}
messages.add(message); messages.add(message);
} }
// Check if the offline messages loaded should be deleted, and that there are // Check if the offline messages loaded should be deleted, and that there are
......
...@@ -25,6 +25,8 @@ import java.util.ArrayList; ...@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.Deque;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
...@@ -67,11 +69,13 @@ import org.jivesoftware.openfire.session.OutgoingServerSession; ...@@ -67,11 +69,13 @@ import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.openfire.session.RemoteSessionLocator; import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory; import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException; import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -1245,10 +1249,20 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1245,10 +1249,20 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
// Re-deliver unacknowledged stanzas from broken stream (XEP-0198) // Re-deliver unacknowledged stanzas from broken stream (XEP-0198)
if(session.getStreamManager().isEnabled()) { if(session.getStreamManager().isEnabled()) {
Map<Long,Packet> unacknowledgedStanzas = session.getStreamManager().getUnacknowledgedServerStanzas(); session.getStreamManager().setEnabled(false); // Avoid concurrent usage.
Deque<StreamManager.UnackedPacket> unacknowledgedStanzas = session.getStreamManager().getUnacknowledgedServerStanzas();
if(!unacknowledgedStanzas.isEmpty()) { if(!unacknowledgedStanzas.isEmpty()) {
for(Entry<Long,Packet> unacknowledgedStanza : unacknowledgedStanzas.entrySet()) { for(StreamManager.UnackedPacket unacked : unacknowledgedStanzas) {
router.route(unacknowledgedStanza.getValue()); if (unacked.packet instanceof Message) {
Message m = (Message)unacked.packet;
Element delayInformation = m.addChildElement("delay", "urn:xmpp:delay");
Element delayInformationOld = m.addChildElement("x", "jabber:x:delay");
delayInformation.addAttribute("stamp", XMPPDateTimeFormat.format(unacked.timestamp));
delayInformationOld.addAttribute("stamp", XMPPDateTimeFormat.formatOld(unacked.timestamp));
delayInformation.addAttribute("from", serverAddress.toBareJID());
delayInformationOld.addAttribute("from", serverAddress.toBareJID());
}
router.route(unacked.packet);
} }
} }
} }
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
package org.jivesoftware.openfire.session; package org.jivesoftware.openfire.session;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
...@@ -867,7 +868,7 @@ public class LocalClientSession extends LocalSession implements ClientSession { ...@@ -867,7 +868,7 @@ public class LocalClientSession extends LocalSession implements ClientSession {
if(streamManager.isEnabled()) { if(streamManager.isEnabled()) {
streamManager.incrementServerSentStanzas(); streamManager.incrementServerSentStanzas();
// Temporarily store packet until delivery confirmed // Temporarily store packet until delivery confirmed
streamManager.getUnacknowledgedServerStanzas().put(streamManager.getServerSentStanzas(), packet.createCopy()); streamManager.getUnacknowledgedServerStanzas().addLast(new StreamManager.UnackedPacket(new Date(), packet.createCopy()));
if(getNumServerPackets() % JiveGlobals.getLongProperty("stream.management.requestFrequency", 5) == 0) { if(getNumServerPackets() % JiveGlobals.getLongProperty("stream.management.requestFrequency", 5) == 0) {
streamManager.sendServerRequest(); streamManager.sendServerRequest();
} }
......
package org.jivesoftware.openfire.streammanagement; package org.jivesoftware.openfire.streammanagement;
import java.util.HashMap; import java.util.Date;
import java.util.Map; import java.util.Deque;
import java.util.LinkedList;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
...@@ -15,6 +16,15 @@ import org.xmpp.packet.PacketError; ...@@ -15,6 +16,15 @@ import org.xmpp.packet.PacketError;
* @author jonnyheavey * @author jonnyheavey
*/ */
public class StreamManager { public class StreamManager {
public static class UnackedPacket {
public final Date timestamp;
public final Packet packet;
public UnackedPacket(Date date, Packet p) {
timestamp = date;
packet = p;
}
}
/** /**
* Stanza namespaces * Stanza namespaces
...@@ -55,11 +65,13 @@ public class StreamManager { ...@@ -55,11 +65,13 @@ public class StreamManager {
* sent from the server that the client has processed * sent from the server that the client has processed
*/ */
private long clientProcessedStanzas = 0; private long clientProcessedStanzas = 0;
static private long mask = 0xFFFFFFFF; /* 2**32 - 1; this is used to emulate rollover */
/** /**
* Collection of stanzas/packets sent to client that haven't been acknowledged. * Collection of stanzas/packets sent to client that haven't been acknowledged.
*/ */
private Map<Long, Packet> unacknowledgedServerStanzas = new HashMap<Long, Packet>(); private Deque<UnackedPacket> unacknowledgedServerStanzas = new LinkedList<UnackedPacket>();
public StreamManager(Connection connection) { public StreamManager(Connection connection) {
this.connection = connection; this.connection = connection;
...@@ -70,13 +82,13 @@ public class StreamManager { ...@@ -70,13 +82,13 @@ public class StreamManager {
*/ */
public void sendServerAcknowledgement() { public void sendServerAcknowledgement() {
if(isEnabled()) { if(isEnabled()) {
String ack = String.format("<a xmlns='%s' h='%s' />", getNamespace(), getServerProcessedStanzas()); String ack = String.format("<a xmlns='%s' h='%s' />", getNamespace(), getServerProcessedStanzas() & mask);
getConnection().deliverRawText(ack); getConnection().deliverRawText(ack);
} }
} }
/** /**
* Sends XEP-0198 request <r /> to client from server * Sends XEP-0198 request <r /> to client from server
*/ */
public void sendServerRequest() { public void sendServerRequest() {
if(isEnabled()) { if(isEnabled()) {
...@@ -106,12 +118,18 @@ public class StreamManager { ...@@ -106,12 +118,18 @@ public class StreamManager {
if(ack.attribute("h") != null) { if(ack.attribute("h") != null) {
long count = Long.valueOf(ack.attributeValue("h")); long count = Long.valueOf(ack.attributeValue("h"));
// Remove stanzas from temporary storage as now acknowledged // Remove stanzas from temporary storage as now acknowledged
Map<Long,Packet> unacknowledgedStanzas = getUnacknowledgedServerStanzas(); Deque<UnackedPacket> unacknowledgedStanzas = getUnacknowledgedServerStanzas();
long i = getClientProcessedStanzas(); long i = getClientProcessedStanzas();
while(i <= count) { if (count < i) {
if(unacknowledgedStanzas.containsKey(i)) { /* Consider rollover? */
unacknowledgedStanzas.remove(i); if (i > mask) {
} while (count < i) {
count += mask + 1;
}
}
}
while(i < count) {
unacknowledgedStanzas.removeFirst();
i++; i++;
} }
...@@ -224,7 +242,7 @@ public class StreamManager { ...@@ -224,7 +242,7 @@ public class StreamManager {
* Retrieves all unacknowledged stanzas sent to client from server. * Retrieves all unacknowledged stanzas sent to client from server.
* @return * @return
*/ */
public Map<Long, Packet> getUnacknowledgedServerStanzas() { public Deque<UnackedPacket> getUnacknowledgedServerStanzas() {
return unacknowledgedServerStanzas; return unacknowledgedServerStanzas;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment