Commit 9b957470 authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-573: Improve session concurrency and consistency across local and remote...

OF-573: Improve session concurrency and consistency across local and remote (clustered) client sessions

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13312 b35dd754-fafc-0310-a699-88a17e54d16e
parent c2d60db8
...@@ -53,6 +53,7 @@ import org.jivesoftware.openfire.net.VirtualConnection; ...@@ -53,6 +53,7 @@ import org.jivesoftware.openfire.net.VirtualConnection;
import org.jivesoftware.openfire.session.LocalClientSession; import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.util.JiveConstants; import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
...@@ -1002,61 +1003,60 @@ public class HttpSession extends LocalClientSession { ...@@ -1002,61 +1003,60 @@ public class HttpSession extends LocalClientSession {
isClosed = true; isClosed = true;
} }
if (pendingElements.size() > 0) { // close connection(s) and deliver pending elements (if any)
failDelivery(); synchronized (connectionQueue) {
} for (HttpConnection toClose : connectionQueue) {
try {
for (SessionListener listener : listeners) { if (!toClose.isClosed()) {
listener.sessionClosed(this); if (!pendingElements.isEmpty() && toClose.getRequestId() == lastRequestID + 1) {
synchronized(pendingElements) {
deliver(toClose, pendingElements);
lastRequestID = toClose.getRequestId();
pendingElements.clear();
}
} else {
toClose.deliverBody(null);
}
}
} catch (HttpConnectionClosedException e) {
/* ignore ... already closed */
}
}
} }
this.listeners.clear();
}
private void failDelivery() {
synchronized (pendingElements) { synchronized (pendingElements) {
for (Deliverable deliverable : pendingElements) { for (Deliverable deliverable : pendingElements) {
Collection<Packet> packet = deliverable.getPackets(); failDelivery(deliverable.getPackets());
if (packet != null) {
failDelivery(packet);
}
} }
pendingElements.clear(); pendingElements.clear();
} }
synchronized (connectionQueue) { for (SessionListener listener : listeners) {
for (HttpConnection toClose : connectionQueue) { listener.sessionClosed(this);
if (!toClose.isDelivered()) {
Delivered delivered = retrieveDeliverable(toClose.getRequestId());
if (delivered != null) {
failDelivery(delivered.getPackets());
}
else {
Log.warn("Packets could not be found for session " + getStreamID() + " cannot " +
"be delivered to client");
}
}
toClose.close();
fireConnectionClosed(toClose);
}
} }
this.listeners.clear();
} }
private void failDelivery(Collection<Packet> packets) { private void failDelivery(final Collection<Packet> packets) {
if (packets == null) { if (packets == null) {
// Do nothing if someone asked to deliver nothing :) // Do nothing if someone asked to deliver nothing :)
return; return;
} }
for (Packet packet : packets) { // use a separate thread to schedule backup delivery
try { TaskEngine.getInstance().submit(new Runnable() {
backupDeliverer.deliver(packet); public void run() {
} for (Packet packet : packets) {
catch (UnauthorizedException e) { try {
Log.error("Unable to deliver message to backup deliverer", e); backupDeliverer.deliver(packet);
} }
} catch (UnauthorizedException e) {
Log.error("Unable to deliver message to backup deliverer", e);
}
}
}
});
} }
private String createEmptyBody() { private String createEmptyBody() {
Element body = DocumentHelper.createElement("body"); Element body = DocumentHelper.createElement("body");
body.addNamespace("", "http://jabber.org/protocol/httpbind"); body.addNamespace("", "http://jabber.org/protocol/httpbind");
......
...@@ -65,6 +65,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -65,6 +65,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
public static final String ANONYMOUS_C2S_CACHE_NAME = "Routing AnonymousUsers Cache"; public static final String ANONYMOUS_C2S_CACHE_NAME = "Routing AnonymousUsers Cache";
public static final String S2S_CACHE_NAME = "Routing Servers Cache"; public static final String S2S_CACHE_NAME = "Routing Servers Cache";
public static final String COMPONENT_CACHE_NAME = "Routing Components Cache"; public static final String COMPONENT_CACHE_NAME = "Routing Components Cache";
public static final String C2S_SESSION_NAME = "Routing User Sessions";
/** /**
* Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server. * Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
...@@ -108,7 +109,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -108,7 +109,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
componentsCache = CacheFactory.createCache(COMPONENT_CACHE_NAME); componentsCache = CacheFactory.createCache(COMPONENT_CACHE_NAME);
usersCache = CacheFactory.createCache(C2S_CACHE_NAME); usersCache = CacheFactory.createCache(C2S_CACHE_NAME);
anonymousUsersCache = CacheFactory.createCache(ANONYMOUS_C2S_CACHE_NAME); anonymousUsersCache = CacheFactory.createCache(ANONYMOUS_C2S_CACHE_NAME);
usersSessions = CacheFactory.createCache("Routing User Sessions"); usersSessions = CacheFactory.createCache(C2S_SESSION_NAME);
localRoutingTable = new LocalRoutingTable(); localRoutingTable = new LocalRoutingTable();
} }
...@@ -313,6 +314,9 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -313,6 +314,9 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
if (remotePacketRouter != null) { if (remotePacketRouter != null) {
routed = remotePacketRouter routed = remotePacketRouter
.routePacket(clientRoute.getNodeID().toByteArray(), jid, packet); .routePacket(clientRoute.getNodeID().toByteArray(), jid, packet);
if (!routed) {
removeClientRoute(jid); // drop invalid client route
}
} }
} }
} }
...@@ -706,19 +710,26 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -706,19 +710,26 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
} }
else { else {
// Address is a bare JID so return all AVAILABLE resources of user // Address is a bare JID so return all AVAILABLE resources of user
Collection<String> sessions = usersSessions.get(route.toBareJID()); Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
if (sessions != null) { try {
// Select only available sessions lock.lock(); // temporarily block new sessions for this JID
for (String jid : sessions) { Collection<String> sessions = usersSessions.get(route.toBareJID());
ClientRoute clientRoute = usersCache.get(jid); if (sessions != null) {
if (clientRoute == null) { // Select only available sessions
clientRoute = anonymousUsersCache.get(jid); for (String jid : sessions) {
} ClientRoute clientRoute = usersCache.get(jid);
if (clientRoute != null && (clientRoute.isAvailable() || if (clientRoute == null) {
presenceUpdateHandler.hasDirectPresence(new JID(jid), requester))) { clientRoute = anonymousUsersCache.get(jid);
jids.add(new JID(jid)); }
} if (clientRoute != null && (clientRoute.isAvailable() ||
} presenceUpdateHandler.hasDirectPresence(new JID(jid), requester))) {
jids.add(new JID(jid));
}
}
}
}
finally {
lock.unlock();
} }
} }
} }
......
...@@ -19,22 +19,23 @@ ...@@ -19,22 +19,23 @@
package com.jivesoftware.openfire.session; package com.jivesoftware.openfire.session;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.ClientRoute; import org.jivesoftware.openfire.spi.ClientRoute;
import org.jivesoftware.openfire.spi.RoutingTableImpl; import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/** /**
* Class that defines possible remote operations that could be performed * Class that defines possible remote operations that could be performed
* on remote client sessions. * on remote client sessions.
...@@ -42,7 +43,11 @@ import java.io.ObjectOutput; ...@@ -42,7 +43,11 @@ import java.io.ObjectOutput;
* @author Gaston Dombiak * @author Gaston Dombiak
*/ */
public class ClientSessionTask extends RemoteSessionTask { public class ClientSessionTask extends RemoteSessionTask {
private static Logger logger = LoggerFactory.getLogger(ClientSessionTask.class);
private JID address; private JID address;
private transient Session session;
public ClientSessionTask() { public ClientSessionTask() {
super(); super();
...@@ -54,10 +59,17 @@ public class ClientSessionTask extends RemoteSessionTask { ...@@ -54,10 +59,17 @@ public class ClientSessionTask extends RemoteSessionTask {
} }
Session getSession() { Session getSession() {
return XMPPServer.getInstance().getRoutingTable().getClientRoute(address); if (session == null) {
session = XMPPServer.getInstance().getRoutingTable().getClientRoute(address);
}
return session;
} }
public void run() { public void run() {
if (getSession() == null || getSession().isClosed()) {
logger.error("Session not found for JID: " + address);
return;
}
super.run(); super.run();
ClientSession session = (ClientSession) getSession(); ClientSession session = (ClientSession) getSession();
...@@ -67,7 +79,7 @@ public class ClientSessionTask extends RemoteSessionTask { ...@@ -67,7 +79,7 @@ public class ClientSessionTask extends RemoteSessionTask {
ClientRoute route = usersCache.get(address.toString()); ClientRoute route = usersCache.get(address.toString());
NodeID nodeID = route.getNodeID(); NodeID nodeID = route.getNodeID();
Log.warn("Found remote session instead of local session. JID: " + address + " found in Node: " + logger.warn("Found remote session instead of local session. JID: " + address + " found in Node: " +
nodeID.toByteArray() + " and local node is: " + XMPPServer.getInstance().getNodeID().toByteArray()); nodeID.toByteArray() + " and local node is: " + XMPPServer.getInstance().getNodeID().toByteArray());
} }
if (operation == Operation.isInitialized) { if (operation == Operation.isInitialized) {
......
...@@ -19,6 +19,10 @@ ...@@ -19,6 +19,10 @@
package com.jivesoftware.openfire.session; package com.jivesoftware.openfire.session;
import java.net.UnknownHostException;
import java.util.Date;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID; import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
...@@ -26,9 +30,6 @@ import org.jivesoftware.util.cache.ClusterTask; ...@@ -26,9 +30,6 @@ import org.jivesoftware.util.cache.ClusterTask;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import java.net.UnknownHostException;
import java.util.Date;
/** /**
* Base class for sessions being hosted in other cluster nodes. Almost all * Base class for sessions being hosted in other cluster nodes. Almost all
* messages will be forwarded to the actual session in some remote cluster node. * messages will be forwarded to the actual session in some remote cluster node.
...@@ -168,7 +169,16 @@ public abstract class RemoteSession implements Session { ...@@ -168,7 +169,16 @@ public abstract class RemoteSession implements Session {
* @throws IllegalStateException if requested node was not found or not running in a cluster. * @throws IllegalStateException if requested node was not found or not running in a cluster.
*/ */
protected Object doSynchronousClusterTask(ClusterTask task) { protected Object doSynchronousClusterTask(ClusterTask task) {
return CacheFactory.doSynchronousClusterTask(task, nodeID); try {
return CacheFactory.doSynchronousClusterTask(task, nodeID);
} catch (IllegalStateException ise) {
if (task instanceof RemoteSessionTask) {
// clean up invalid session
SessionManager.getInstance().removeSession(null,
((RemoteSessionTask)task).getSession().getAddress(), false, false);
}
throw ise;
}
} }
/** /**
...@@ -178,7 +188,16 @@ public abstract class RemoteSession implements Session { ...@@ -178,7 +188,16 @@ public abstract class RemoteSession implements Session {
* @throws IllegalStateException if requested node was not found or not running in a cluster. * @throws IllegalStateException if requested node was not found or not running in a cluster.
*/ */
protected void doClusterTask(ClusterTask task) { protected void doClusterTask(ClusterTask task) {
CacheFactory.doClusterTask(task, nodeID); try {
CacheFactory.doClusterTask(task, nodeID);
} catch (IllegalStateException ise) {
if (task instanceof RemoteSessionTask) {
// clean up invalid session
SessionManager.getInstance().removeSession(null,
((RemoteSessionTask)task).getSession().getAddress(), false, false);
}
throw ise;
}
} }
/** /**
......
...@@ -46,7 +46,7 @@ public class ClusterPacketRouter implements RemotePacketRouter { ...@@ -46,7 +46,7 @@ public class ClusterPacketRouter implements RemotePacketRouter {
CacheFactory.doClusterTask(new RemotePacketExecution(receipient, packet), nodeID); CacheFactory.doClusterTask(new RemotePacketExecution(receipient, packet), nodeID);
return true; return true;
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
logger.warn("Error while routing packet to remote node", e); logger.warn("Error while routing packet to remote node: " + e);
return false; return false;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment