Commit 6286c4ac authored by Dave Cridland's avatar Dave Cridland Committed by daryl herzmann

OF-1402 XEP-0198 resumption (#872)

* WIP: XEP-0198 Stream Resumption

This patch implements a first cut of XEP-0198
Stream Resumption for TCP and WebSockets.

This appears to work on (very) basic testing, but
the code is very likely to run into problems with
existing code assuming that LocalSession.getConnection()
never returns null, and similar issues.

This is likely to generate unexpected (and
possibly unhandled) NPEs.

The basic premise to the design is that
StanzaHandlers (or similar) and Connections
from the new session are re-pointed to the old
session. The old session lives on in limbo with its
conn field set to null during detachment.

I strongly suspect that bits are missing from this,
but so far...

* Fix CR/LF

* WIP

* WIP

* Close detached 198 sessions after timeout

Also:
* Quick audit of LocalSession.getConnection
* Add in guard code for LocalSession.getConnection

* CRLF

* CRLF
parent 8b664a7a
...@@ -21,6 +21,7 @@ import java.net.UnknownHostException; ...@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.StanzaHandler;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.spi.ConnectionConfiguration; import org.jivesoftware.openfire.spi.ConnectionConfiguration;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
...@@ -49,6 +50,14 @@ public interface Connection extends Closeable { ...@@ -49,6 +50,14 @@ public interface Connection extends Closeable {
*/ */
void init( LocalSession session ); void init( LocalSession session );
/**
* Reinitializes the connection to switch to a different session. This allows for
* XEP-0198 resumption and transport-switching.
*
* @param session The new session now owning the connection.
*/
void reinit( LocalSession session );
/** /**
* Returns the raw IP address of this <code>InetAddress</code> * Returns the raw IP address of this <code>InetAddress</code>
* object. The result is in network byte order: the highest order * object. The result is in network byte order: the highest order
......
...@@ -143,7 +143,9 @@ class LocalSessionManager { ...@@ -143,7 +143,9 @@ class LocalSessionManager {
for (LocalSession session : sessions) { for (LocalSession session : sessions) {
try { try {
// Notify connected client that the server is being shut down // Notify connected client that the server is being shut down
session.getConnection().systemShutdown(); if (!session.isDetached()) {
session.getConnection().systemShutdown();
}
} }
catch (Throwable t) { catch (Throwable t) {
// Ignore. // Ignore.
......
...@@ -17,15 +17,8 @@ ...@@ -17,15 +17,8 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList; import java.util.*;
import java.util.Collection; import java.util.concurrent.ConcurrentHashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
...@@ -46,6 +39,7 @@ import org.jivesoftware.openfire.spi.BasicStreamIDFactory; ...@@ -46,6 +39,7 @@ import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.TaskEngine;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -143,6 +137,13 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -143,6 +137,13 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
private OutgoingServerSessionListener outgoingServerListener = new OutgoingServerSessionListener(); private OutgoingServerSessionListener outgoingServerListener = new OutgoingServerSessionListener();
private ConnectionMultiplexerSessionListener multiplexerSessionListener = new ConnectionMultiplexerSessionListener(); private ConnectionMultiplexerSessionListener multiplexerSessionListener = new ConnectionMultiplexerSessionListener();
/**
* Sessions contained in this Map are (client?) sessions which are detached.
* Sessions remaining here too long will be reaped, but they will be checked
* to see if they have in fact resumed since.
*/
private final Map<StreamID, LocalSession> detachedSessions = new ConcurrentHashMap<>();
/** /**
* Local session manager responsible for keeping sessions connected to this JVM that are not * Local session manager responsible for keeping sessions connected to this JVM that are not
* present in the routing table. * present in the routing table.
...@@ -177,6 +178,30 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -177,6 +178,30 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
conflictLimit = JiveGlobals.getIntProperty("xmpp.session.conflict-limit", 0); conflictLimit = JiveGlobals.getIntProperty("xmpp.session.conflict-limit", 0);
} }
/**
* Record a session as being detached (ie, has no connection). This is idempotent.
* This should really only be called by the LocalSession itself when it detaches.
*
* @param localSession the LocalSession (this) to mark as detached.
*/
public void addDetached(LocalSession localSession) {
this.detachedSessions.put(localSession.getStreamID(), localSession);
}
/**
* Remove a session as being detached. This is idempotent.
* This should be called by the LocalSession itself either when resumed or when
* closed.
*
* @param localSession the LocalSession (this) which has been resumed or closed.
*/
public synchronized void removeDetached(LocalSession localSession) {
LocalSession other = this.detachedSessions.get(localSession.getStreamID());
if (other == localSession) {
this.detachedSessions.remove(localSession.getStreamID());
}
}
/** /**
* Returns the session originated from the specified address or <tt>null</tt> if none was * Returns the session originated from the specified address or <tt>null</tt> if none was
* found. The specified address MUST contain a resource that uniquely identifies the session. * found. The specified address MUST contain a resource that uniquely identifies the session.
...@@ -1223,6 +1248,15 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1223,6 +1248,15 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
public void onConnectionClose(Object handback) { public void onConnectionClose(Object handback) {
try { try {
LocalClientSession session = (LocalClientSession) handback; LocalClientSession session = (LocalClientSession) handback;
if (session.isDetached()) {
Log.debug("Closing session is detached already.");
return;
}
if (session.getStreamManager().getResume()) {
Log.debug("Closing session has SM enabled; detaching.");
session.setDetached();
return;
}
try { try {
if ((session.getPresence().isAvailable() || !session.wasAvailable()) && if ((session.getPresence().isAvailable() || !session.wasAvailable()) &&
routingTable.hasClientRoute(session.getAddress())) { routingTable.hasClientRoute(session.getAddress())) {
...@@ -1431,6 +1465,9 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1431,6 +1465,9 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
public void start() throws IllegalStateException { public void start() throws IllegalStateException {
super.start(); super.start();
localSessionManager.start(); localSessionManager.start();
// Run through the server sessions every 3 minutes after a 3 minutes server startup delay (default values)
int period = 3 * 60 * 1000;
TaskEngine.getInstance().scheduleAtFixedRate(new DetachedCleanupTask(), period, period);
} }
@Override @Override
...@@ -1527,6 +1564,18 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1527,6 +1564,18 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
return JiveGlobals.getIntProperty("xmpp.server.session.idle", 10 * 60 * 1000); return JiveGlobals.getIntProperty("xmpp.server.session.idle", 10 * 60 * 1000);
} }
public void setSessionDetachTime(int idleTime) {
if (getSessionDetachTime() == idleTime) {
return;
}
// Set the new property value
JiveGlobals.setProperty("xmpp.session.detach.timeout", Integer.toString(idleTime));
}
public int getSessionDetachTime() {
return JiveGlobals.getIntProperty("xmpp.session.detach.timeout", 10 * 60 * 1000);
}
public Cache<String, ClientSessionInfo> getSessionInfoCache() { public Cache<String, ClientSessionInfo> getSessionInfoCache() {
return sessionInfoCache; return sessionInfoCache;
} }
...@@ -1611,4 +1660,54 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1611,4 +1660,54 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
} }
} }
} }
/**
* Task that closes detached client sessions.
*/
private class DetachedCleanupTask extends TimerTask {
/**
* Close detached client sessions that haven't seen activity in more than
* 30 minutes by default.
*/
@Override
public void run() {
int idleTime = getSessionDetachTime();
if (idleTime == -1) {
return;
}
final long deadline = System.currentTimeMillis() - idleTime;
for (LocalSession session : detachedSessions.values()) {
try {
if (session.getLastActiveDate().getTime() < deadline) {
removeDetached(session);
LocalClientSession clientSession = (LocalClientSession)session;
if (clientSession != null) {
try {
if ((clientSession.getPresence().isAvailable() || !clientSession.wasAvailable()) &&
routingTable.hasClientRoute(session.getAddress())) {
// Send an unavailable presence to the user's subscribers
// Note: This gives us a chance to send an unavailable presence to the
// entities that the user sent directed presences
Presence presence = new Presence();
presence.setType(Presence.Type.unavailable);
presence.setFrom(session.getAddress());
router.route(presence);
}
session.getStreamManager().onClose(router, serverAddress);
} finally {
// Remove the session
removeSession(clientSession);
}
}
}
}
catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
}
} }
...@@ -271,6 +271,11 @@ public class SocketConnection implements Connection { ...@@ -271,6 +271,11 @@ public class SocketConnection implements Connection {
session = owner; session = owner;
} }
@Override
public void reinit(LocalSession owner) {
session = owner;
}
@Override @Override
public void registerCloseListener(ConnectionCloseListener listener, Object handbackMessage) { public void registerCloseListener(ConnectionCloseListener listener, Object handbackMessage) {
if (isClosed()) { if (isClosed()) {
......
...@@ -104,6 +104,10 @@ public abstract class StanzaHandler { ...@@ -104,6 +104,10 @@ public abstract class StanzaHandler {
this.connection = connection; this.connection = connection;
} }
public void setSession(LocalSession session) {
this.session = session;
}
public void process(String stanza, XMPPPacketReader reader) throws Exception { public void process(String stanza, XMPPPacketReader reader) throws Exception {
boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream"); boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
...@@ -147,6 +151,7 @@ public abstract class StanzaHandler { ...@@ -147,6 +151,7 @@ public abstract class StanzaHandler {
// Verify if end of stream was requested // Verify if end of stream was requested
if (stanza.equals("</stream:stream>")) { if (stanza.equals("</stream:stream>")) {
if (session != null) { if (session != null) {
session.getStreamManager().formalClose();
session.close(); session.close();
} }
return; return;
...@@ -189,7 +194,7 @@ public abstract class StanzaHandler { ...@@ -189,7 +194,7 @@ public abstract class StanzaHandler {
waitingCompressionACK = true; waitingCompressionACK = true;
} }
} else if (isStreamManagementStanza(doc)) { } else if (isStreamManagementStanza(doc)) {
session.getStreamManager().process( doc, session.getAddress() ); session.getStreamManager().process( doc );
} }
else { else {
process(doc); process(doc);
......
...@@ -175,6 +175,11 @@ public abstract class VirtualConnection implements Connection { ...@@ -175,6 +175,11 @@ public abstract class VirtualConnection implements Connection {
this.session = session; this.session = session;
} }
@Override
public void reinit(LocalSession session) {
this.session = session;
}
/** /**
* Closes the session, the virtual connection and notifies listeners that the connection * Closes the session, the virtual connection and notifies listeners that the connection
* has been closed. * has been closed.
......
...@@ -44,8 +44,8 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -44,8 +44,8 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
private static final Logger Log = LoggerFactory.getLogger(ConnectionHandler.class); private static final Logger Log = LoggerFactory.getLogger(ConnectionHandler.class);
static final String XML_PARSER = "XML-PARSER"; static final String XML_PARSER = "XML-PARSER";
protected static final String HANDLER = "HANDLER"; static final String HANDLER = "HANDLER";
protected static final String CONNECTION = "CONNECTION"; static final String CONNECTION = "CONNECTION";
private static final ThreadLocal<XMPPPacketReader> PARSER_CACHE = new ThreadLocal<XMPPPacketReader>() private static final ThreadLocal<XMPPPacketReader> PARSER_CACHE = new ThreadLocal<XMPPPacketReader>()
{ {
......
...@@ -42,6 +42,7 @@ import org.jivesoftware.openfire.Connection; ...@@ -42,6 +42,7 @@ import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener; import org.jivesoftware.openfire.ConnectionCloseListener;
import org.jivesoftware.openfire.PacketDeliverer; import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.StanzaHandler;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.ConnectionConfiguration; import org.jivesoftware.openfire.spi.ConnectionConfiguration;
...@@ -265,6 +266,17 @@ public class NIOConnection implements Connection { ...@@ -265,6 +266,17 @@ public class NIOConnection implements Connection {
session = owner; session = owner;
} }
@Override
public void reinit(LocalSession owner) {
session = owner;
StanzaHandler stanzaHandler = getStanzaHandler();
stanzaHandler.setSession(owner);
}
protected StanzaHandler getStanzaHandler() {
return (StanzaHandler)ioSession.getAttribute(ConnectionHandler.HANDLER);
}
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return state.get() == State.CLOSED; return state.get() == State.CLOSED;
......
...@@ -948,8 +948,9 @@ public class LocalClientSession extends LocalSession implements ClientSession { ...@@ -948,8 +948,9 @@ public class LocalClientSession extends LocalSession implements ClientSession {
@Override @Override
public void deliver(Packet packet) throws UnauthorizedException { public void deliver(Packet packet) throws UnauthorizedException {
if (conn != null) {
conn.deliver(packet); conn.deliver(packet);
}
streamManager.sentStanza(packet); streamManager.sentStanza(packet);
} }
......
...@@ -77,7 +77,7 @@ public abstract class LocalSession implements Session { ...@@ -77,7 +77,7 @@ public abstract class LocalSession implements Session {
/** /**
* The connection that this session represents. * The connection that this session represents.
*/ */
protected final Connection conn; protected Connection conn;
protected SessionManager sessionManager; protected SessionManager sessionManager;
...@@ -120,10 +120,47 @@ public abstract class LocalSession implements Session { ...@@ -120,10 +120,47 @@ public abstract class LocalSession implements Session {
String id = streamID.getID(); String id = streamID.getID();
this.address = new JID(null, serverName, id, true); this.address = new JID(null, serverName, id, true);
this.sessionManager = SessionManager.getInstance(); this.sessionManager = SessionManager.getInstance();
this.streamManager = new StreamManager(conn); this.streamManager = new StreamManager(this);
this.language = language; this.language = language;
} }
/**
* Returns true if the session is detached (that is, if the underlying connection
* has been closed.
*
* @return true if session detached
*/
public boolean isDetached() {
return this.conn == null;
}
/**
* Set the session to detached mode, indicating that the underlying connection
* has been closed.
*/
public void setDetached() {
this.sessionManager.addDetached(this);
this.conn = null;
}
/**
* Reattach the session to a new connection. The connection must already be
* initialized as a running XML Stream, normally by having run through XEP-0198
* resumption.
*/
public void reattach(Connection connection, long h) {
Connection temp = this.conn;
this.conn = null;
if (temp != null && !temp.isClosed()) {
temp.close();
}
this.conn = connection;
this.conn.reinit(this);
this.status = STATUS_AUTHENTICATED;
this.sessionManager.removeDetached(this);
this.streamManager.onResume(new JID(null, this.serverName, null, true), h);
}
/** /**
* Obtain the address of the user. The address is used by services like the core * Obtain the address of the user. The address is used by services like the core
* server packet router to determine if a packet should be sent to the handler. * server packet router to determine if a packet should be sent to the handler.
...@@ -155,6 +192,13 @@ public abstract class LocalSession implements Session { ...@@ -155,6 +192,13 @@ public abstract class LocalSession implements Session {
* @return The connection for this session * @return The connection for this session
*/ */
public Connection getConnection() { public Connection getConnection() {
if (conn == null) {
try {
conn.isClosed(); // This generates an NPE deliberately.
} catch (NullPointerException e) {
Log.error("Attempt to read connection of detached session: ", e);
}
}
return conn; return conn;
} }
...@@ -176,6 +220,10 @@ public abstract class LocalSession implements Session { ...@@ -176,6 +220,10 @@ public abstract class LocalSession implements Session {
* @param status The new status code for this session * @param status The new status code for this session
*/ */
public void setStatus(int status) { public void setStatus(int status) {
if (status == STATUS_CLOSED && this.streamManager.getResume()) {
Log.debug("Suppressing close.");
return;
}
this.status = status; this.status = status;
} }
...@@ -375,6 +423,7 @@ public abstract class LocalSession implements Session { ...@@ -375,6 +423,7 @@ public abstract class LocalSession implements Session {
@Override @Override
public void close() { public void close() {
if (conn == null) return;
conn.close(); conn.close();
} }
......
...@@ -139,7 +139,9 @@ class LocalRoutingTable { ...@@ -139,7 +139,9 @@ class LocalRoutingTable {
LocalSession session = (LocalSession) route; LocalSession session = (LocalSession) route;
try { try {
// Notify connected client that the server is being shut down // Notify connected client that the server is being shut down
session.getConnection().systemShutdown(); if (!session.isDetached()) {
session.getConnection().systemShutdown();
}
} }
catch (Throwable t) { catch (Throwable t) {
// Ignore. // Ignore.
......
...@@ -2,19 +2,27 @@ package org.jivesoftware.openfire.streammanagement; ...@@ -2,19 +2,27 @@ package org.jivesoftware.openfire.streammanagement;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.QName;
import org.dom4j.dom.DOMElement;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.PacketRouter; import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.AuthToken;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.XMPPDateTimeFormat; import org.jivesoftware.util.XMPPDateTimeFormat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID; import org.xmpp.packet.*;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
/** /**
* XEP-0198 Stream Manager. * XEP-0198 Stream Manager.
...@@ -25,6 +33,7 @@ import org.xmpp.packet.PacketError; ...@@ -25,6 +33,7 @@ import org.xmpp.packet.PacketError;
public class StreamManager { public class StreamManager {
private final Logger Log; private final Logger Log;
private boolean resume = false;
public static class UnackedPacket { public static class UnackedPacket {
public final long x; public final long x;
public final Date timestamp = new Date(); public final Date timestamp = new Date();
...@@ -44,10 +53,10 @@ public class StreamManager { ...@@ -44,10 +53,10 @@ public class StreamManager {
public static final String NAMESPACE_V2 = "urn:xmpp:sm:2"; public static final String NAMESPACE_V2 = "urn:xmpp:sm:2";
public static final String NAMESPACE_V3 = "urn:xmpp:sm:3"; public static final String NAMESPACE_V3 = "urn:xmpp:sm:3";
/** /**
* Connection (stream) to client for the session the manager belongs to * Session (stream) to client.
*/ */
private final Connection connection; private final LocalSession session;
/** /**
* Namespace to be used in stanzas sent to client (depending on XEP-0198 version used by client) * Namespace to be used in stanzas sent to client (depending on XEP-0198 version used by client)
...@@ -73,10 +82,10 @@ public class StreamManager { ...@@ -73,10 +82,10 @@ public class StreamManager {
*/ */
private Deque<UnackedPacket> unacknowledgedServerStanzas = new LinkedList<>(); private Deque<UnackedPacket> unacknowledgedServerStanzas = new LinkedList<>();
public StreamManager(Connection connection) { public StreamManager(LocalSession session) {
String address; String address;
try { try {
address = connection.getHostAddress(); address = session.getConnection().getHostAddress();
} }
catch ( UnknownHostException e ) catch ( UnknownHostException e )
{ {
...@@ -84,21 +93,41 @@ public class StreamManager { ...@@ -84,21 +93,41 @@ public class StreamManager {
} }
this.Log = LoggerFactory.getLogger(StreamManager.class + "["+ (address == null ? "(unknown address)" : address) +"]" ); this.Log = LoggerFactory.getLogger(StreamManager.class + "["+ (address == null ? "(unknown address)" : address) +"]" );
this.connection = connection; this.session = session;
} }
/** /**
* Returns true if a stream is resumable.
*
* @return True if a stream is resumable.
*/
public boolean getResume() {
return resume;
}
/**
* Processes a stream management element. * Processes a stream management element.
* *
* @param element The stream management element to be processed. * @param element The stream management element to be processed.
* @param onBehalfOf The (full) JID of the entity for which the element is processed.
*/ */
public void process( Element element, JID onBehalfOf ) public void process( Element element )
{ {
switch(element.getName()) { switch(element.getName()) {
case "enable": case "enable":
enable( onBehalfOf, element.getNamespace().getStringValue() ); String resumeString = element.attributeValue("resume");
boolean resume = false;
if (resumeString != null) {
if (resumeString.equalsIgnoreCase("true") || resumeString.equalsIgnoreCase("yes") || resumeString.equals("1")) {
resume = true;
}
}
enable( element.getNamespace().getStringValue(), resume );
break; break;
case "resume":
long h = new Long(element.attributeValue("h"));
String previd = element.attributeValue("previd");
startResume( element.getNamespaceURI(), previd, h);
break;
case "r": case "r":
sendServerAcknowledgement(); sendServerAcknowledgement();
break; break;
...@@ -110,42 +139,152 @@ public class StreamManager { ...@@ -110,42 +139,152 @@ public class StreamManager {
} }
} }
/**
* Should this session be allowed to resume?
* This is used while processed <enable/> and <resume/>
*
* @return True if the session is allowed to resume.
*/
private boolean allowResume() {
boolean allow = false;
// Ensure that resource binding has occurred.
if (session instanceof ClientSession) {
AuthToken authToken = ((LocalClientSession)session).getAuthToken();
if (authToken != null) {
if (!authToken.isAnonymous()) {
allow = true;
}
}
}
return allow;
}
/** /**
* Attempts to enable Stream Management for the entity identified by the provided JID. * Attempts to enable Stream Management for the entity identified by the provided JID.
* *
* @param onBehalfOf The address of the entity for which SM is to be enabled.
* @param namespace The namespace that defines what version of SM is to be enabled. * @param namespace The namespace that defines what version of SM is to be enabled.
* @param resume Whether the client is requesting a resumable session.
*/ */
private void enable( JID onBehalfOf, String namespace ) private void enable( String namespace, boolean resume )
{ {
boolean offerResume = allowResume();
// Ensure that resource binding has occurred. // Ensure that resource binding has occurred.
if( onBehalfOf.getResource() == null ) { if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
sendUnexpectedError(); this.namespace = namespace;
return; sendUnexpectedError();
} return;
}
String smId = null;
synchronized ( this ) synchronized ( this )
{ {
// Do nothing if already enabled // Do nothing if already enabled
if ( isEnabled() ) if ( isEnabled() )
{ {
sendUnexpectedError();
return; return;
} }
this.namespace = namespace; this.namespace = namespace;
this.resume = resume && offerResume;
if ( this.resume ) {
// Create SM-ID.
smId = StringUtils.encodeBase64( session.getAddress().getResource() + "\0" + session.getStreamID().getID());
}
} }
// Send confirmation to the requestee. // Send confirmation to the requestee.
connection.deliverRawText( String.format( "<enabled xmlns='%s'/>", namespace ) ); Element enabled = new DOMElement(QName.get("enabled", namespace));
if (this.resume) {
enabled.addAttribute("resume", "true");
enabled.addAttribute( "id", smId);
}
session.deliverRawText(enabled.asXML());
} }
private void startResume(String namespace, String previd, long h) {
this.namespace = namespace;
// Ensure that resource binding has NOT occurred.
if (!allowResume() ) {
sendUnexpectedError();
return;
}
if (session.getStatus() == Session.STATUS_AUTHENTICATED) {
sendUnexpectedError();
return;
}
AuthToken authToken = null;
// Ensure that resource binding has occurred.
if (session instanceof ClientSession) {
authToken = ((LocalClientSession) session).getAuthToken();
}
if (authToken == null) {
sendUnexpectedError();
return;
}
// Decode previd.
String resource;
String streamId;
try {
StringTokenizer toks = new StringTokenizer(new String(StringUtils.decodeBase64(previd), StandardCharsets.UTF_8), "\0");
resource = toks.nextToken();
streamId = toks.nextToken();
} catch (Exception e) {
Log.debug("Exception from previd decode:", e);
sendUnexpectedError();
return;
}
JID fullJid = new JID(authToken.getUsername(), authToken.getDomain(), resource, true);
// Locate existing session.
LocalClientSession otherSession = (LocalClientSession)XMPPServer.getInstance().getRoutingTable().getClientRoute(fullJid);
if (otherSession == null) {
sendError(new PacketError(PacketError.Condition.item_not_found));
return;
}
if (!otherSession.getStreamID().getID().equals(streamId)) {
sendError(new PacketError(PacketError.Condition.item_not_found));
return;
}
// Previd identifies proper session. Now check SM status
if (!otherSession.getStreamManager().namespace.equals(namespace)) {
sendError(new PacketError(PacketError.Condition.unexpected_request));
return;
}
if (!otherSession.getStreamManager().resume) {
sendError(new PacketError(PacketError.Condition.unexpected_request));
return;
}
if (!otherSession.isDetached()) {
otherSession.setDetached();
}
// If we're all happy, disconnect this session.
Connection conn = session.getConnection();
session.setDetached();
// Connect new session.
otherSession.reattach(conn, h);
// Perform resumption on new session.
session.close();
}
/**
* Called when a session receives a closing stream tag, this prevents the
* session from being detached.
*/
public void formalClose() {
this.resume = false;
}
/** /**
* Sends XEP-0198 acknowledgement &lt;a /&gt; to client from server * Sends XEP-0198 acknowledgement &lt;a /&gt; to client from server
*/ */
public void sendServerAcknowledgement() { public void sendServerAcknowledgement() {
if(isEnabled()) { if(isEnabled()) {
String ack = String.format("<a xmlns='%s' h='%s' />", namespace, serverProcessedStanzas & mask); String ack = String.format("<a xmlns='%s' h='%s' />", namespace, serverProcessedStanzas & mask);
connection.deliverRawText( ack ); session.deliverRawText( ack );
} }
} }
...@@ -154,8 +293,12 @@ public class StreamManager { ...@@ -154,8 +293,12 @@ public class StreamManager {
*/ */
private void sendServerRequest() { private void sendServerRequest() {
if(isEnabled()) { if(isEnabled()) {
if (session.isDetached()) {
Log.debug("Session is detached, won't request an ack.");
return;
}
String request = String.format("<r xmlns='%s' />", namespace); String request = String.format("<r xmlns='%s' />", namespace);
connection.deliverRawText( request ); session.deliverRawText( request );
} }
} }
...@@ -164,13 +307,62 @@ public class StreamManager { ...@@ -164,13 +307,62 @@ public class StreamManager {
* e.g. before resource-binding has completed. * e.g. before resource-binding has completed.
*/ */
private void sendUnexpectedError() { private void sendUnexpectedError() {
connection.deliverRawText( sendError(new PacketError( PacketError.Condition.unexpected_request ));
String.format( "<failed xmlns='%s'>", namespace )
+ new PacketError( PacketError.Condition.unexpected_request ).toXML()
+ "</failed>"
);
} }
/**
* Send a generic failed error.
*
* @param error PacketError describing the failure.
*/
private void sendError(PacketError error) {
session.deliverRawText(
String.format("<failed xmlns='%s'>", namespace)
+ String.format("<%s xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>", error.getCondition().toXMPP())
+ "</failed>"
);
this.namespace = null; // isEnabled() is testing this.
}
/**
* Process client acknowledgements for a given value of h.
*
* @param h Last handled stanza to be acknowledged.
*/
private void processClientAcknowledgement(long h) {
synchronized (this) {
if ( !unacknowledgedServerStanzas.isEmpty() && h > unacknowledgedServerStanzas.getLast().x ) {
Log.warn( "Client acknowledges stanzas that we didn't send! Client Ack h: {}, our last stanza: {}", h, unacknowledgedServerStanzas.getLast().x );
}
clientProcessedStanzas = h;
// Remove stanzas from temporary storage as now acknowledged
Log.trace( "Before processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size() );
// Pop all acknowledged stanzas.
while( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getFirst().x <= h )
{
unacknowledgedServerStanzas.removeFirst();
}
// Ensure that unacknowledged stanzas are purged after the client rolled over 'h' which occurs at h= (2^32)-1
final int maxUnacked = getMaximumUnacknowledgedStanzas();
final boolean clientHadRollOver = h < maxUnacked && !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked;
if ( clientHadRollOver )
{
Log.info( "Client rolled over 'h'. Purging high-numbered unacknowledged stanzas." );
while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked)
{
unacknowledgedServerStanzas.removeLast();
}
}
Log.trace( "After processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size());
}
}
/** /**
* Receive and process acknowledgement packet from client * Receive and process acknowledgement packet from client
* @param ack XEP-0198 acknowledgement <a /> stanza to process * @param ack XEP-0198 acknowledgement <a /> stanza to process
...@@ -181,37 +373,7 @@ public class StreamManager { ...@@ -181,37 +373,7 @@ public class StreamManager {
final long h = Long.valueOf(ack.attributeValue("h")); final long h = Long.valueOf(ack.attributeValue("h"));
Log.debug( "Received acknowledgement from client: h={}", h ); Log.debug( "Received acknowledgement from client: h={}", h );
synchronized (this) { processClientAcknowledgement(h);
if ( !unacknowledgedServerStanzas.isEmpty() && h > unacknowledgedServerStanzas.getLast().x ) {
Log.warn( "Client acknowledges stanzas that we didn't send! Client Ack h: {}, our last stanza: {}", h, unacknowledgedServerStanzas.getLast().x );
}
clientProcessedStanzas = h;
// Remove stanzas from temporary storage as now acknowledged
Log.trace( "Before processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size() );
// Pop all acknowledged stanzas.
while( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getFirst().x <= h )
{
unacknowledgedServerStanzas.removeFirst();
}
// Ensure that unacknowledged stanzas are purged after the client rolled over 'h' which occurs at h= (2^32)-1
final int maxUnacked = getMaximumUnacknowledgedStanzas();
final boolean clientHadRollOver = h < maxUnacked && !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked;
if ( clientHadRollOver )
{
Log.info( "Client rolled over 'h'. Purging high-numbered unacknowledged stanzas." );
while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked)
{
unacknowledgedServerStanzas.removeLast();
}
}
Log.trace( "After processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size());
}
} }
} }
} }
...@@ -276,6 +438,49 @@ public class StreamManager { ...@@ -276,6 +438,49 @@ public class StreamManager {
} }
public void onResume(JID serverAddress, long h) {
Log.debug("Agreeing to resume");
Element resumed = new DOMElement(QName.get("resumed", namespace));
resumed.addAttribute("previd", StringUtils.encodeBase64( session.getAddress().getResource() + "\0" + session.getStreamID().getID()));
resumed.addAttribute("h", Long.toString(clientProcessedStanzas));
session.getConnection().deliverRawText(resumed.asXML());
Log.debug("Resuming session: Ack for {}", h);
processClientAcknowledgement(h);
Log.debug("Processing remaining unacked stanzas");
// Re-deliver unacknowledged stanzas from broken stream (XEP-0198)
synchronized (this) {
if(isEnabled()) {
for (StreamManager.UnackedPacket unacked : unacknowledgedServerStanzas) {
try {
if (unacked.packet instanceof Message) {
Message m = (Message) unacked.packet;
if (m.getExtension("delay", "urn:xmpp:delay") == null) {
Element delayInformation = m.addChildElement("delay", "urn:xmpp:delay");
delayInformation.addAttribute("stamp", XMPPDateTimeFormat.format(unacked.timestamp));
delayInformation.addAttribute("from", serverAddress.toBareJID());
}
session.getConnection().deliver(m);
} else if (unacked.packet instanceof Presence) {
Presence p = (Presence) unacked.packet;
if (p.getExtension("delay", "urn:xmpp:delay") == null) {
Element delayInformation = p.addChildElement("delay", "urn:xmpp:delay");
delayInformation.addAttribute("stamp", XMPPDateTimeFormat.format(unacked.timestamp));
delayInformation.addAttribute("from", serverAddress.toBareJID());
}
session.getConnection().deliver(p);
} else {
session.getConnection().deliver(unacked.packet);
}
} catch (UnauthorizedException e) {
Log.warn("Caught unauthorized exception, which seems worrying: ", e);
}
}
sendServerRequest();
}
}
}
/** /**
* Determines whether Stream Management enabled for session this * Determines whether Stream Management enabled for session this
* manager belongs to. * manager belongs to.
......
...@@ -56,7 +56,7 @@ public class StreamManagementPacketRouter extends SessionPacketRouter { ...@@ -56,7 +56,7 @@ public class StreamManagementPacketRouter extends SessionPacketRouter {
public void route(Element wrappedElement) throws UnknownStanzaException { public void route(Element wrappedElement) throws UnknownStanzaException {
if (StreamManager.NAMESPACE_V3.equals(wrappedElement.getNamespace().getStringValue())) { if (StreamManager.NAMESPACE_V3.equals(wrappedElement.getNamespace().getStringValue())) {
session.getStreamManager().process( wrappedElement, session.getAddress() ); session.getStreamManager().process( wrappedElement );
} else { } else {
super.route(wrappedElement); super.route(wrappedElement);
if (isUnsolicitedAckExpected()) { if (isUnsolicitedAckExpected()) {
......
...@@ -23,6 +23,8 @@ import org.jivesoftware.openfire.PacketDeliverer; ...@@ -23,6 +23,8 @@ import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.VirtualConnection; import org.jivesoftware.openfire.net.VirtualConnection;
import org.jivesoftware.openfire.nio.OfflinePacketDeliverer; import org.jivesoftware.openfire.nio.OfflinePacketDeliverer;
import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.spi.ConnectionConfiguration; import org.jivesoftware.openfire.spi.ConnectionConfiguration;
import org.jivesoftware.openfire.spi.ConnectionManagerImpl; import org.jivesoftware.openfire.spi.ConnectionManagerImpl;
import org.jivesoftware.openfire.spi.ConnectionType; import org.jivesoftware.openfire.spi.ConnectionType;
...@@ -131,4 +133,10 @@ public class WebSocketConnection extends VirtualConnection ...@@ -131,4 +133,10 @@ public class WebSocketConnection extends VirtualConnection
public boolean isCompressed() { public boolean isCompressed() {
return XmppWebSocket.isCompressionEnabled(); return XmppWebSocket.isCompressionEnabled();
} }
@Override
public void reinit(LocalSession session) {
this.socket.setXmppSession((LocalClientSession)session);
super.reinit(session);
}
} }
...@@ -161,12 +161,18 @@ public class XmppWebSocket { ...@@ -161,12 +161,18 @@ public class XmppWebSocket {
closeStream(null); closeStream(null);
} }
if (xmppSession != null) { if (xmppSession != null) {
xmppSession.close(); if (!xmppSession.getStreamManager().getResume()) {
SessionManager.getInstance().removeSession(xmppSession); xmppSession.close();
SessionManager.getInstance().removeSession(xmppSession);
}
xmppSession = null; xmppSession = null;
} }
} }
public void setXmppSession(LocalClientSession session) {
this.xmppSession = session;
}
/** /**
* Send an XML packet to the remote peer * Send an XML packet to the remote peer
* *
...@@ -206,6 +212,7 @@ public class XmppWebSocket { ...@@ -206,6 +212,7 @@ public class XmppWebSocket {
try { try {
String tag = stanza.getName(); String tag = stanza.getName();
if (STREAM_FOOTER.equals(tag)) { if (STREAM_FOOTER.equals(tag)) {
xmppSession.getStreamManager().formalClose();
closeStream(null); closeStream(null);
} else if ("auth".equals(tag)) { } else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL // User is trying to authenticate using SASL
...@@ -401,4 +408,4 @@ public class XmppWebSocket { ...@@ -401,4 +408,4 @@ public class XmppWebSocket {
} }
} }
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment