Commit ea56ccdd authored by JonnyHeavey's avatar JonnyHeavey Committed by Dave Cridland

Initial xep-0198 message ack implementation

This patch adds a basic, non-resumable, XEP-0198 acking solution. It will
redirect "lost" messages to offline storage, but does not attempt to suspend
the session, so will not allow any resumptions.
parent 5e3f4161
...@@ -34,13 +34,25 @@ ...@@ -34,13 +34,25 @@
</network> </network>
--> -->
<!-- SPDY Protocol is npn. <!-- SPDY Protocol is npn.
(note: npn does not work with Java 8) (note: npn does not work with Java 8)
add -Xbootclasspath/p:/OPENFIRE_HOME/lib/npn-boot.jar to .vmoptions file --> add -Xbootclasspath/p:/OPENFIRE_HOME/lib/npn-boot.jar to .vmoptions file -->
<!-- <!--
<spdy> <spdy>
<protocol>npn</protocol> <protocol>npn</protocol>
</spdy> </spdy>
--> -->
<!-- XEP-0198 properties -->
<stream>
<management>
<!-- Whether stream management is offered to clients by server. -->
<active>true</active>
<!-- Number of stanzas sent to client before a stream management
acknowledgement request is made. -->
<requestFrequency>5</requestFrequency>
</management>
</stream>
</jive> </jive>
...@@ -29,6 +29,7 @@ import java.util.HashSet; ...@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
...@@ -110,7 +111,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -110,7 +111,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* Counter of user connections. A connection is counted just after it was created and not * Counter of user connections. A connection is counted just after it was created and not
* after the user became available. This counter only considers sessions local to this JVM. * after the user became available. This counter only considers sessions local to this JVM.
* That means that when running inside of a cluster you will need to add up this counter * That means that when running inside of a cluster you will need to add up this counter
* for each cluster node. * for each cluster node.
*/ */
private final AtomicInteger connectionsCounter = new AtomicInteger(0); private final AtomicInteger connectionsCounter = new AtomicInteger(0);
...@@ -159,7 +160,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -159,7 +160,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* This same information is stored in {@link LocalIncomingServerSession} but the * This same information is stored in {@link LocalIncomingServerSession} but the
* reason for this duplication is that when running in a cluster other nodes * reason for this duplication is that when running in a cluster other nodes
* will have access to this clustered cache even in the case of this node going * will have access to this clustered cache even in the case of this node going
* down. * down.
*/ */
private Cache<String, Set<String>> validatedDomainsCache; private Cache<String, Set<String>> validatedDomainsCache;
...@@ -171,7 +172,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -171,7 +172,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
/** /**
* Local session manager responsible for keeping sessions connected to this JVM that are not * Local session manager responsible for keeping sessions connected to this JVM that are not
* present in the routing table. * present in the routing table.
*/ */
private LocalSessionManager localSessionManager; private LocalSessionManager localSessionManager;
/** /**
...@@ -285,7 +286,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -285,7 +286,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* Creates a new <tt>ConnectionMultiplexerSession</tt>. * Creates a new <tt>ConnectionMultiplexerSession</tt>.
* *
* @param conn the connection to create the session from. * @param conn the connection to create the session from.
* @param address the JID (may include a resource) of the connection manager's session. * @param address the JID (may include a resource) of the connection manager's session.
* @return a newly created session. * @return a newly created session.
*/ */
public LocalConnectionMultiplexerSession createMultiplexerSession(Connection conn, JID address) { public LocalConnectionMultiplexerSession createMultiplexerSession(Connection conn, JID address) {
...@@ -1207,7 +1208,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1207,7 +1208,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
try { try {
// If the requesting entity is the user itself or the requesting entity can probe the presence of the user. // If the requesting entity is the user itself or the requesting entity can probe the presence of the user.
if (name != null && senderJID != null && if (name != null && senderJID != null &&
server.getUserManager().isRegisteredUser(senderJID) && server.getUserManager().isRegisteredUser(senderJID) &&
(name.equals(senderJID.getNode()) || server.getPresenceManager().canProbePresence(senderJID, name))) { (name.equals(senderJID.getNode()) || server.getPresenceManager().canProbePresence(senderJID, name))) {
Collection<DiscoItem> discoItems = new ArrayList<DiscoItem>(); Collection<DiscoItem> discoItems = new ArrayList<DiscoItem>();
for (ClientSession clientSession : getSessions(name)) { for (ClientSession clientSession : getSessions(name)) {
...@@ -1241,6 +1242,16 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1241,6 +1242,16 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
presence.setFrom(session.getAddress()); presence.setFrom(session.getAddress());
router.route(presence); router.route(presence);
} }
// Re-deliver unacknowledged stanzas from broken stream (XEP-0198)
if(session.getStreamManager().isEnabled()) {
Map<Long,Packet> unacknowledgedStanzas = session.getStreamManager().getUnacknowledgedServerStanzas();
if(!unacknowledgedStanzas.isEmpty()) {
for(Entry<Long,Packet> unacknowledgedStanza : unacknowledgedStanzas.entrySet()) {
router.route(unacknowledgedStanza.getValue());
}
}
}
} }
finally { finally {
// Remove the session // Remove the session
......
...@@ -28,6 +28,7 @@ import org.jivesoftware.openfire.auth.UnauthorizedException; ...@@ -28,6 +28,7 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.http.FlashCrossDomainServlet; import org.jivesoftware.openfire.http.FlashCrossDomainServlet;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
...@@ -182,13 +183,28 @@ public abstract class StanzaHandler { ...@@ -182,13 +183,28 @@ public abstract class StanzaHandler {
// resource binding and session establishment (to client sessions only) // resource binding and session establishment (to client sessions only)
waitingCompressionACK = true; waitingCompressionACK = true;
} }
} else if(isStreamManagementStanza(doc)) {
switch(tag) {
case "enable":
session.enableStreamMangement(doc);
break;
case "r":
session.getStreamManager().sendServerAcknowledgement();
break;
case "a":
session.getStreamManager().processClientAcknowledgement(doc);
break;
default:
process(doc);
break;
}
} }
else { else {
process(doc); process(doc);
} }
} }
private void process(Element doc) throws UnauthorizedException { private void process(Element doc) throws UnauthorizedException {
if (doc == null) { if (doc == null) {
return; return;
} }
...@@ -536,6 +552,16 @@ public abstract class StanzaHandler { ...@@ -536,6 +552,16 @@ public abstract class StanzaHandler {
connection.deliverRawText(sb.toString()); connection.deliverRawText(sb.toString());
} }
/**
* Determines whether stanza's namespace matches XEP-0198 namespace
* @param stanza Stanza to be checked
* @return whether stanza's namespace matches XEP-0198 namespace
*/
private boolean isStreamManagementStanza(Element stanza) {
return StreamManager.NAMESPACE_V2.equals(stanza.getNamespace().getStringValue()) ||
StreamManager.NAMESPACE_V3.equals(stanza.getNamespace().getStringValue());
}
private String geStreamHeader() { private String geStreamHeader() {
StringBuilder sb = new StringBuilder(200); StringBuilder sb = new StringBuilder(200);
sb.append("<?xml version='1.0' encoding='"); sb.append("<?xml version='1.0' encoding='");
......
...@@ -38,6 +38,7 @@ import org.jivesoftware.openfire.net.SSLConfig; ...@@ -38,6 +38,7 @@ import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.SocketConnection; import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.privacy.PrivacyList; import org.jivesoftware.openfire.privacy.PrivacyList;
import org.jivesoftware.openfire.privacy.PrivacyListManager; import org.jivesoftware.openfire.privacy.PrivacyListManager;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.openfire.user.PresenceEventDispatcher; import org.jivesoftware.openfire.user.PresenceEventDispatcher;
import org.jivesoftware.openfire.user.UserNotFoundException; import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
...@@ -801,10 +802,16 @@ public class LocalClientSession extends LocalSession implements ClientSession { ...@@ -801,10 +802,16 @@ public class LocalClientSession extends LocalSession implements ClientSession {
} }
} }
else { else {
// If the session has been authenticated then offer resource binding // If the session has been authenticated then offer resource binding,
// and session establishment // and session establishment
sb.append("<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>"); sb.append("<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>");
sb.append("<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"><optional/></session>"); sb.append("<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"><optional/></session>");
// Offer XEP-0198 stream management capabilities if enabled.
if(JiveGlobals.getBooleanProperty("stream.management.active", true)) {
sb.append(String.format("<sm xmlns='%s'/>", StreamManager.NAMESPACE_V2));
sb.append(String.format("<sm xmlns='%s'/>", StreamManager.NAMESPACE_V3));
}
} }
return sb.toString(); return sb.toString();
} }
...@@ -854,7 +861,17 @@ public class LocalClientSession extends LocalSession implements ClientSession { ...@@ -854,7 +861,17 @@ public class LocalClientSession extends LocalSession implements ClientSession {
@Override @Override
public void deliver(Packet packet) throws UnauthorizedException { public void deliver(Packet packet) throws UnauthorizedException {
conn.deliver(packet); conn.deliver(packet);
if(streamManager.isEnabled()) {
streamManager.incrementServerSentStanzas();
// Temporarily store packet until delivery confirmed
streamManager.getUnacknowledgedServerStanzas().put(streamManager.getServerSentStanzas(), packet.createCopy());
if(getNumServerPackets() % JiveGlobals.getLongProperty("stream.management.requestFrequency", 5) == 0) {
streamManager.sendServerRequest();
}
}
} }
@Override @Override
......
...@@ -24,6 +24,7 @@ import java.util.Map; ...@@ -24,6 +24,7 @@ import java.util.Map;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import org.dom4j.Element;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID; import org.jivesoftware.openfire.StreamID;
...@@ -33,7 +34,7 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager; ...@@ -33,7 +34,7 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager;
import org.jivesoftware.openfire.interceptor.PacketRejectedException; import org.jivesoftware.openfire.interceptor.PacketRejectedException;
import org.jivesoftware.openfire.net.SocketConnection; import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.TLSStreamHandler; import org.jivesoftware.openfire.net.TLSStreamHandler;
import org.jivesoftware.openfire.spi.RoutingTableImpl; import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -94,6 +95,11 @@ public abstract class LocalSession implements Session { ...@@ -94,6 +95,11 @@ public abstract class LocalSession implements Session {
*/ */
private final Map<String, Object> sessionData = new HashMap<String, Object>(); private final Map<String, Object> sessionData = new HashMap<String, Object>();
/**
* XEP-0198 Stream Manager
*/
protected StreamManager streamManager = null;
/** /**
* Creates a session with an underlying connection and permission protection. * Creates a session with an underlying connection and permission protection.
* *
...@@ -111,6 +117,7 @@ public abstract class LocalSession implements Session { ...@@ -111,6 +117,7 @@ public abstract class LocalSession implements Session {
String id = streamID.getID(); String id = streamID.getID();
this.address = new JID(null, serverName, id, true); this.address = new JID(null, serverName, id, true);
this.sessionManager = SessionManager.getInstance(); this.sessionManager = SessionManager.getInstance();
this.streamManager = new StreamManager(conn);
} }
/** /**
...@@ -209,6 +216,7 @@ public abstract class LocalSession implements Session { ...@@ -209,6 +216,7 @@ public abstract class LocalSession implements Session {
public void incrementClientPacketCount() { public void incrementClientPacketCount() {
clientPacketCount++; clientPacketCount++;
lastActiveDate = System.currentTimeMillis(); lastActiveDate = System.currentTimeMillis();
streamManager.incrementServerProcessedStanzas();
} }
/** /**
...@@ -279,6 +287,14 @@ public abstract class LocalSession implements Session { ...@@ -279,6 +287,14 @@ public abstract class LocalSession implements Session {
} }
} }
/**
* Get XEP-0198 Stream manager for session
* @return
*/
public StreamManager getStreamManager() {
return streamManager;
}
public void process(Packet packet) { public void process(Packet packet) {
// Check that the requested packet can be processed // Check that the requested packet can be processed
if (canProcess(packet)) { if (canProcess(packet)) {
...@@ -413,4 +429,27 @@ public abstract class LocalSession implements Session { ...@@ -413,4 +429,27 @@ public abstract class LocalSession implements Session {
} }
return "NONE"; return "NONE";
} }
/**
* Enables stream management for session
* @param enable XEP-0198 <enable/> stanza
*/
public void enableStreamMangement(Element enable) {
// Do nothing if already enabled
if(streamManager.isEnabled()) {
return;
}
streamManager.setNamespace(enable.getNamespace().getStringValue());
// Ensure that resource binding has occurred
if(getAddress().getResource() == null) {
streamManager.sendUnexpectedError();
return;
}
streamManager.setEnabled(true);
}
} }
package org.jivesoftware.openfire.streammanagement;
import java.util.HashMap;
import java.util.Map;
import org.dom4j.Element;
import org.jivesoftware.openfire.Connection;
import org.xmpp.packet.Packet;
/**
* XEP-0198 Stream Manager.
* Handles client/server messages acknowledgement.
*
* @author jonnyheavey
*/
public class StreamManager {
/**
* Stanza namespaces
*/
public static final String NAMESPACE_V2 = "urn:xmpp:sm:2";
public static final String NAMESPACE_V3 = "urn:xmpp:sm:3";
/**
* Connection (stream) to client for the session the manager belongs to
*/
private Connection connection;
/**
* Whether Stream Management is enabled for session
* the manager belongs to.
*/
private boolean enabled;
/**
* Namespace to be used in stanzas sent to client (depending on XEP-0198 version used by client)
*/
private String namespace;
/**
* Count of how many stanzas/packets
* have been sent from the server to the client (not necessarily processed)
*/
private long serverSentStanzas = 0;
/**
* Count of how many stanzas/packets
* sent from the client that the server has processed
*/
private long serverProcessedStanzas = 0;
/**
* Count of how many stanzas/packets
* sent from the server that the client has processed
*/
private long clientProcessedStanzas = 0;
/**
* Collection of stanzas/packets sent to client that haven't been acknowledged.
*/
private Map<Long, Packet> unacknowledgedServerStanzas = new HashMap<Long, Packet>();
public StreamManager(Connection connection) {
this.setConnection(connection);
}
/**
* Sends XEP-0198 acknowledgement <a /> to client from server
*/
public void sendServerAcknowledgement() {
if(isEnabled()) {
String ack = String.format("<a xmlns='%s' h='%s' />", getNamespace(), getServerProcessedStanzas());
getConnection().deliverRawText(ack);
}
}
/**
* Sends XEP-0198 request <r /> to client from server
*/
public void sendServerRequest() {
if(isEnabled()) {
String request = String.format("<r xmlns='%s' />", getNamespace());
getConnection().deliverRawText(request);
}
}
/**
* Send an error if a XEP-0198 stanza is received at an unexpected time.
* e.g. before resource-binding has completed.
*/
public void sendUnexpectedError() {
StringBuilder sb = new StringBuilder(340);
sb.append(String.format("<failed xmlns='%s'>", getNamespace()));
sb.append("<unexpected-request xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>");
sb.append("</failed>");
getConnection().deliverRawText(sb.toString());
}
/**
* Receive and process acknowledgement packet from client
* @param ack XEP-0198 acknowledgement <a /> stanza to process
*/
public void processClientAcknowledgement(Element ack) {
if(isEnabled()) {
if(ack.attribute("h") != null) {
long count = Long.valueOf(ack.attributeValue("h"));
// Remove stanzas from temporary storage as now acknowledged
Map<Long,Packet> unacknowledgedStanzas = getUnacknowledgedServerStanzas();
long i = getClientProcessedStanzas();
while(i <= count) {
if(unacknowledgedStanzas.containsKey(i)) {
unacknowledgedStanzas.remove(i);
}
i++;
}
setClientProcessedStanzas(count);
}
}
}
/**
* Get connection (stream) for the session
* @return
*/
public Connection getConnection() {
return connection;
}
/**
* Set connection for the session
* @param connection
*/
public void setConnection(Connection connection) {
this.connection = connection;
}
/**
* Determines whether Stream Management enabled for session this
* manager belongs to.
* @return
*/
public boolean isEnabled() {
return enabled;
}
/**
* Sets whether Stream Management enabled for session this
* manager belongs to.
* @param enabled
*/
public void setEnabled(boolean enabled) {
this.enabled = enabled;
if(enabled) {
String enabledStanza = String.format("<enabled xmlns='%s'/>", getNamespace());
getConnection().deliverRawText(enabledStanza);
}
}
/**
* Retrieve configured XEP-0198 namespace
* @return
*/
public String getNamespace() {
return namespace;
}
/**
* Configure XEP-0198 namespace
* @param namespace
*/
public void setNamespace(String namespace) {
this.namespace = namespace;
}
/**
* Retrieves number of stanzas sent to client by server.
* @return
*/
public long getServerSentStanzas() {
return serverSentStanzas;
}
/**
* Increments the count of stanzas sent to client by server.
*/
public void incrementServerSentStanzas() {
this.serverSentStanzas++;
}
/**
* Retrieve the number of stanzas processed by the server since
* Stream Management was enabled.
* @return
*/
public long getServerProcessedStanzas() {
return serverProcessedStanzas;
}
/**
* Increments the count of stanzas processed by the server since
* Stream Management was enabled.
*/
public void incrementServerProcessedStanzas() {
if(isEnabled()) {
this.serverProcessedStanzas++;
}
}
/**
* Retrieve the number of stanzas processed by the client since
* Stream Management was enabled.
* @return
*/
public long getClientProcessedStanzas() {
return clientProcessedStanzas;
}
/**
* Sets the count of stanzas processed by the client since
* Stream Management was enabled.
*/
public void setClientProcessedStanzas(long count) {
if(count >= clientProcessedStanzas) {
clientProcessedStanzas = count;
}
}
/**
* Retrieves all unacknowledged stanzas sent to client from server.
* @return
*/
public Map<Long, Packet> getUnacknowledgedServerStanzas() {
return unacknowledgedServerStanzas;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment