Commit 4db34ec9 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

More work on clustering events.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8549 b35dd754-fafc-0310-a699-88a17e54d16e
parent beb0ab8d
...@@ -91,6 +91,10 @@ class LocalSessionManager { ...@@ -91,6 +91,10 @@ class LocalSessionManager {
return incomingServerSessions.get(streamID); return incomingServerSessions.get(streamID);
} }
public Collection<LocalIncomingServerSession> getIncomingServerSessions() {
return incomingServerSessions.values();
}
public void addIncomingServerSessions(String streamID, LocalIncomingServerSession session) { public void addIncomingServerSessions(String streamID, LocalIncomingServerSession session) {
incomingServerSessions.put(streamID, session); incomingServerSessions.put(streamID, session);
} }
......
...@@ -170,6 +170,19 @@ public interface RoutingTable { ...@@ -170,6 +170,19 @@ public interface RoutingTable {
*/ */
boolean isAnonymousRoute(JID jid); boolean isAnonymousRoute(JID jid);
/**
* Returns true if the specified address belongs to a route that is hosted by this JVM.
* When running inside of a cluster each cluster node will host routes to local resources.
* A false value could either mean that the route is not hosted by this JVM but other
* cluster node or that there is no route to the specified address. Use
* {@link XMPPServer#isLocal(org.xmpp.packet.JID)} to figure out if the address
* belongs to tge domain hosted by this server.
*
* @param jid the address of the route.
* @return true if the specified address belongs to a route that is hosted by this JVM.
*/
boolean isLocalRoute(JID jid);
/** /**
* Returns true if an outgoing server session exists to the specified remote server. * Returns true if an outgoing server session exists to the specified remote server.
* The JID can be a full JID or a bare JID since only the domain of the specified * The JID can be a full JID or a bare JID since only the domain of the specified
...@@ -215,9 +228,10 @@ public interface RoutingTable { ...@@ -215,9 +228,10 @@ public interface RoutingTable {
* TODO Prevent usage of this message and change original requirement to avoid having to load all sessions. * TODO Prevent usage of this message and change original requirement to avoid having to load all sessions.
* TODO This may not scale when hosting millions of sessions. * TODO This may not scale when hosting millions of sessions.
* *
* @param onlyLocal true if only client sessions connected to this JVM must be considered.
* @return collection of client sessions authenticated with the server. * @return collection of client sessions authenticated with the server.
*/ */
Collection<ClientSession> getClientsRoutes(); Collection<ClientSession> getClientsRoutes(boolean onlyLocal);
/** /**
* Returns the outgoing server session associated to the specified XMPP address or <tt>null</tt> * Returns the outgoing server session associated to the specified XMPP address or <tt>null</tt>
......
...@@ -14,6 +14,8 @@ package org.jivesoftware.openfire; ...@@ -14,6 +14,8 @@ package org.jivesoftware.openfire;
import org.jivesoftware.openfire.audit.AuditStreamIDFactory; import org.jivesoftware.openfire.audit.AuditStreamIDFactory;
import org.jivesoftware.openfire.auth.AuthToken; import org.jivesoftware.openfire.auth.AuthToken;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.component.InternalComponentManager; import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.event.SessionEventDispatcher; import org.jivesoftware.openfire.event.SessionEventDispatcher;
...@@ -45,7 +47,7 @@ import java.util.concurrent.locks.Lock; ...@@ -45,7 +47,7 @@ import java.util.concurrent.locks.Lock;
* *
* @author Derek DeMoro * @author Derek DeMoro
*/ */
public class SessionManager extends BasicModule { public class SessionManager extends BasicModule implements ClusterEventListener {
public static final String COMPONENT_SESSION_CACHE_NAME = "Components Sessions"; public static final String COMPONENT_SESSION_CACHE_NAME = "Components Sessions";
public static final String CM_CACHE_NAME = "Connection Managers Sessions"; public static final String CM_CACHE_NAME = "Connection Managers Sessions";
...@@ -94,6 +96,20 @@ public class SessionManager extends BasicModule { ...@@ -94,6 +96,20 @@ public class SessionManager extends BasicModule {
*/ */
private Cache<String, List<String>> hostnameSessionsCache; private Cache<String, List<String>> hostnameSessionsCache;
/**
* Cache (unlimited, never expire) that holds domains, subdomains and virtual
* hostnames of the remote server that were validated with this server for each
* incoming server session.
* Key: stream ID, Value: Domains and subdomains of the remote server that were
* validated with this server.<p>
*
* This same information is stored in {@link LocalIncomingServerSession} but the
* reason for this duplication is that when running in a cluster other nodes
* will have access to this clustered cache even in the case of this node going
* down.
*/
private Cache<String, Set<String>> validatedDomainsCache;
private ClientSessionListener clientSessionListener = new ClientSessionListener(); private ClientSessionListener clientSessionListener = new ClientSessionListener();
private ComponentSessionListener componentSessionListener = new ComponentSessionListener(); private ComponentSessionListener componentSessionListener = new ComponentSessionListener();
private IncomingServerSessionListener incomingServerListener = new IncomingServerSessionListener(); private IncomingServerSessionListener incomingServerListener = new IncomingServerSessionListener();
...@@ -371,9 +387,10 @@ public class SessionManager extends BasicModule { ...@@ -371,9 +387,10 @@ public class SessionManager extends BasicModule {
*/ */
public void registerIncomingServerSession(String hostname, LocalIncomingServerSession session) { public void registerIncomingServerSession(String hostname, LocalIncomingServerSession session) {
// Keep local track of the incoming server session connected to this JVM // Keep local track of the incoming server session connected to this JVM
localSessionManager.addIncomingServerSessions(session.getStreamID().getID(), session); String streamID = session.getStreamID().getID();
localSessionManager.addIncomingServerSessions(streamID, session);
// Keep track of the nodeID hosting the incoming server session // Keep track of the nodeID hosting the incoming server session
incomingServerSessionsCache.put(session.getStreamID().getID(), server.getNodeID()); incomingServerSessionsCache.put(streamID, server.getNodeID());
// Update list of sockets/sessions coming from the same remote hostname // Update list of sockets/sessions coming from the same remote hostname
Lock lock = LockManager.getLock(hostname); Lock lock = LockManager.getLock(hostname);
try { try {
...@@ -382,41 +399,26 @@ public class SessionManager extends BasicModule { ...@@ -382,41 +399,26 @@ public class SessionManager extends BasicModule {
if (streamIDs == null) { if (streamIDs == null) {
streamIDs = new ArrayList<String>(); streamIDs = new ArrayList<String>();
} }
streamIDs.add(session.getStreamID().getID()); streamIDs.add(streamID);
hostnameSessionsCache.put(hostname, streamIDs); hostnameSessionsCache.put(hostname, streamIDs);
} }
finally { finally {
lock.unlock(); lock.unlock();
} }
} // Add to clustered cache
lock = LockManager.getLock(streamID);
/**
* Unregisters the server sessions originated by a remote server with the specified hostname.
* Notice that the remote server may be hosting several subdomains as well as virtual hosts so
* the same IncomingServerSession may be associated with many keys. The remote server may have
* many sessions established with this server (eg. to the server itself and to subdomains
* hosted by this server).
*
* @param hostname the hostname that is being served by the remote server.
*/
public void unregisterIncomingServerSessions(String hostname) {
List<String> streamIDs;
// Remove list of sockets/sessions coming from the remote hostname
Lock lock = LockManager.getLock(hostname);
try { try {
lock.lock(); lock.lock();
streamIDs = hostnameSessionsCache.remove(hostname); Set<String> validatedDomains = validatedDomainsCache.get(streamID);
} if (validatedDomains == null) {
finally { validatedDomains = new HashSet<String>();
lock.unlock(); }
} boolean added = validatedDomains.add(hostname);
if (streamIDs != null) { if (added) {
for (String streamID : streamIDs) { validatedDomainsCache.put(streamID, validatedDomains);
// Remove local track of the incoming server session connected to this JVM
localSessionManager.removeIncomingServerSessions(streamID);
// Remove track of the nodeID hosting the incoming server session
incomingServerSessionsCache.remove(streamID);
} }
} finally {
lock.unlock();
} }
} }
...@@ -428,9 +430,10 @@ public class SessionManager extends BasicModule { ...@@ -428,9 +430,10 @@ public class SessionManager extends BasicModule {
*/ */
public void unregisterIncomingServerSession(String hostname, IncomingServerSession session) { public void unregisterIncomingServerSession(String hostname, IncomingServerSession session) {
// Remove local track of the incoming server session connected to this JVM // Remove local track of the incoming server session connected to this JVM
localSessionManager.removeIncomingServerSessions(session.getStreamID().getID()); String streamID = session.getStreamID().getID();
localSessionManager.removeIncomingServerSessions(streamID);
// Remove track of the nodeID hosting the incoming server session // Remove track of the nodeID hosting the incoming server session
incomingServerSessionsCache.remove(session.getStreamID().getID()); incomingServerSessionsCache.remove(streamID);
// Remove from list of sockets/sessions coming from the remote hostname // Remove from list of sockets/sessions coming from the remote hostname
Lock lock = LockManager.getLock(hostname); Lock lock = LockManager.getLock(hostname);
...@@ -438,7 +441,7 @@ public class SessionManager extends BasicModule { ...@@ -438,7 +441,7 @@ public class SessionManager extends BasicModule {
lock.lock(); lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname); List<String> streamIDs = hostnameSessionsCache.get(hostname);
if (streamIDs != null) { if (streamIDs != null) {
streamIDs.remove(session.getStreamID().getID()); streamIDs.remove(streamID);
if (streamIDs.isEmpty()) { if (streamIDs.isEmpty()) {
hostnameSessionsCache.remove(hostname); hostnameSessionsCache.remove(hostname);
} }
...@@ -450,6 +453,50 @@ public class SessionManager extends BasicModule { ...@@ -450,6 +453,50 @@ public class SessionManager extends BasicModule {
finally { finally {
lock.unlock(); lock.unlock();
} }
// Remove from clustered cache
lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
validatedDomains = new HashSet<String>();
}
validatedDomains.remove(hostname);
if (!validatedDomains.isEmpty()) {
validatedDomainsCache.put(streamID, validatedDomains);
}
else {
validatedDomainsCache.remove(streamID);
}
} finally {
lock.unlock();
}
}
/**
* Returns a collection with all the domains, subdomains and virtual hosts that where
* validated. The remote server is allowed to send packets from any of these domains,
* subdomains and virtual hosts.<p>
*
* Content is stored in a clustered cache so that even in the case of the node hosting
* the sessions is lost we can still have access to this info to be able to perform
* proper clean up logic.
*
* @param streamID id that uniquely identifies the session.
* @return domains, subdomains and virtual hosts that where validated.
*/
public Collection<String> getValidatedDomains(String streamID) {
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
return Collections.emptyList();
}
return Collections.unmodifiableCollection(validatedDomains);
} finally {
lock.unlock();
}
} }
/** /**
...@@ -652,7 +699,7 @@ public class SessionManager extends BasicModule { ...@@ -652,7 +699,7 @@ public class SessionManager extends BasicModule {
* @return a list that contains all client sessions connected to the server. * @return a list that contains all client sessions connected to the server.
*/ */
public Collection<ClientSession> getSessions() { public Collection<ClientSession> getSessions() {
return routingTable.getClientsRoutes(); return routingTable.getClientsRoutes(false);
} }
...@@ -1083,7 +1130,6 @@ public class SessionManager extends BasicModule { ...@@ -1083,7 +1130,6 @@ public class SessionManager extends BasicModule {
for (String hostname : session.getValidatedDomains()) { for (String hostname : session.getValidatedDomains()) {
unregisterIncomingServerSession(hostname, session); unregisterIncomingServerSession(hostname, session);
} }
LocalIncomingServerSession.releaseValidatedDomains(session.getStreamID().getID());
} }
} }
...@@ -1161,6 +1207,9 @@ public class SessionManager extends BasicModule { ...@@ -1161,6 +1207,9 @@ public class SessionManager extends BasicModule {
multiplexerSessionsCache = CacheFactory.createCache(CM_CACHE_NAME); multiplexerSessionsCache = CacheFactory.createCache(CM_CACHE_NAME);
incomingServerSessionsCache = CacheFactory.createCache(ISS_CACHE_NAME); incomingServerSessionsCache = CacheFactory.createCache(ISS_CACHE_NAME);
hostnameSessionsCache = CacheFactory.createCache("Sessions by Hostname"); hostnameSessionsCache = CacheFactory.createCache("Sessions by Hostname");
validatedDomainsCache = CacheFactory.createCache("Validated Domains");
// Listen to cluster events
ClusterManager.addListener(this);
} }
...@@ -1349,4 +1398,154 @@ public class SessionManager extends BasicModule { ...@@ -1349,4 +1398,154 @@ public class SessionManager extends BasicModule {
} }
} }
public void joinedCluster(byte[] oldNodeID) {
restoreCacheContent();
}
public void leavingCluster() {
if (XMPPServer.getInstance().isShuttingDown()) {
// Do nothing since local sessions will be closed. Local session manager
// and local routing table will be correctly updated thus updating the
// other cluster nodes correctly
}
else {
// This JVM is leaving the cluster but will continue to work. That means
// that clients connected to this JVM will be able to keep talking.
// In other words, their sessions will not be closed (and not removed from
// the routing table or the session manager). However, other nodes should
// get their session managers correctly updated.
// Remove external component sessions hosted locally to the cache (using new nodeID)
for (Session session : localSessionManager.getComponentsSessions()) {
componentSessionsCache.remove(session.getAddress().toString());
}
// Remove connection multiplexer sessions hosted locally to the cache (using new nodeID)
for (String address : localSessionManager.getConnnectionManagerSessions().keySet()) {
multiplexerSessionsCache.remove(address);
}
// Remove incoming server sessions hosted locally to the cache (using new nodeID)
for (LocalIncomingServerSession session : localSessionManager.getIncomingServerSessions()) {
String streamID = session.getStreamID().getID();
incomingServerSessionsCache.remove(streamID);
for (String hostname : session.getValidatedDomains()) {
// Update list of sockets/sessions coming from the same remote hostname
Lock lock = LockManager.getLock(hostname);
try {
lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname);
streamIDs.remove(streamID);
if (streamIDs.isEmpty()) {
hostnameSessionsCache.remove(hostname);
}
else {
hostnameSessionsCache.put(hostname, streamIDs);
}
}
finally {
lock.unlock();
}
// Remove from clustered cache
lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
validatedDomains = new HashSet<String>();
}
validatedDomains.remove(hostname);
if (!validatedDomains.isEmpty()) {
validatedDomainsCache.put(streamID, validatedDomains);
}
else {
validatedDomainsCache.remove(streamID);
}
} finally {
lock.unlock();
}
}
}
// Update counters of client sessions
for (ClientSession session : routingTable.getClientsRoutes(true)) {
// Increment the counter of user sessions
decrementCounter("conncounter");
if (session.getStatus() == Session.STATUS_AUTHENTICATED) {
// Increment counter of authenticated sessions
decrementCounter("usercounter");
}
}
}
}
public void leftCluster() {
if (!XMPPServer.getInstance().isShuttingDown()) {
// Add local sessions to caches
restoreCacheContent();
}
}
public void markedAsSeniorClusterMember() {
// Do nothing
}
private void restoreCacheContent() {
// Add external component sessions hosted locally to the cache (using new nodeID)
for (Session session : localSessionManager.getComponentsSessions()) {
componentSessionsCache.put(session.getAddress().toString(), server.getNodeID());
}
// Add connection multiplexer sessions hosted locally to the cache (using new nodeID)
for (String address : localSessionManager.getConnnectionManagerSessions().keySet()) {
multiplexerSessionsCache.put(address, server.getNodeID());
}
// Add incoming server sessions hosted locally to the cache (using new nodeID)
for (LocalIncomingServerSession session : localSessionManager.getIncomingServerSessions()) {
String streamID = session.getStreamID().getID();
incomingServerSessionsCache.put(streamID, server.getNodeID());
for (String hostname : session.getValidatedDomains()) {
// Update list of sockets/sessions coming from the same remote hostname
Lock lock = LockManager.getLock(hostname);
try {
lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname);
if (streamIDs == null) {
streamIDs = new ArrayList<String>();
}
streamIDs.add(streamID);
hostnameSessionsCache.put(hostname, streamIDs);
}
finally {
lock.unlock();
}
// Add to clustered cache
lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
validatedDomains = new HashSet<String>();
}
boolean added = validatedDomains.add(hostname);
if (added) {
validatedDomainsCache.put(streamID, validatedDomains);
}
} finally {
lock.unlock();
}
}
}
// Update counters of client sessions
for (ClientSession session : routingTable.getClientsRoutes(true)) {
// Increment the counter of user sessions
incrementCounter("conncounter");
if (session.getStatus() == Session.STATUS_AUTHENTICATED) {
// Increment counter of authenticated sessions
incrementCounter("usercounter");
}
}
}
} }
...@@ -49,8 +49,9 @@ public interface ClusterEventListener { ...@@ -49,8 +49,9 @@ public interface ClusterEventListener {
/** /**
* Notification event indicating that this JVM is no longer part of the cluster. This could * Notification event indicating that this JVM is no longer part of the cluster. This could
* happen when disabling clustering support or removing the enterprise plugin that provides * happen when disabling clustering support, removing the enterprise plugin that provides
* clustering support.<p> * clustering support or connection to cluster got lost. If connection to cluster was lost
* then this event will not be predated by the {@link #leavingCluster()} event.<p>
* *
* Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the * Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the
* island were this JVM belonged was marked as "old" then all nodes of that island will * island were this JVM belonged was marked as "old" then all nodes of that island will
......
...@@ -123,7 +123,7 @@ public class ClusterManager { ...@@ -123,7 +123,7 @@ public class ClusterManager {
* When joining the cluster as the senior cluster member the {@link #fireMarkedAsSeniorClusterMember()} * When joining the cluster as the senior cluster member the {@link #fireMarkedAsSeniorClusterMember()}
* event will be sent right after this event.<p> * event will be sent right after this event.<p>
* <p/> * <p/>
* This event could be triggered in another thread. This will avoid potential deadlocks * This event will be triggered in another thread. This will avoid potential deadlocks
* in Coherence. * in Coherence.
* *
* @param oldNodeID nodeID used by this JVM before joining the cluster. * @param oldNodeID nodeID used by this JVM before joining the cluster.
...@@ -148,12 +148,17 @@ public class ClusterManager { ...@@ -148,12 +148,17 @@ public class ClusterManager {
* happen when disabling clustering support, removing the enterprise plugin that provides * happen when disabling clustering support, removing the enterprise plugin that provides
* clustering support or even shutdown the server.<p> * clustering support or even shutdown the server.<p>
* <p/> * <p/>
* This event will be triggered in another thread. This will avoid potential deadlocks * This event will be triggered in another thread but won't return until all listeners have
* in Coherence. * been alerted. This will give listeners the chance to use the cluster for any clean up
* operation before the node actually leaves the cluster.
*/ */
public static void fireLeavingCluster() { public static void fireLeavingCluster() {
try { try {
events.put(new Event(EventType.leaving_cluster, null)); Event event = new Event(EventType.leaving_cluster, null);
events.put(event);
while (!event.isProcessed()) {
Thread.sleep(50);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Should never happen // Should never happen
} }
...@@ -171,7 +176,7 @@ public class ClusterManager { ...@@ -171,7 +176,7 @@ public class ClusterManager {
* This also includes the case where this JVM was the senior cluster member and when the islands * This also includes the case where this JVM was the senior cluster member and when the islands
* met again then this JVM stopped being the senior member.<p> * met again then this JVM stopped being the senior member.<p>
* <p/> * <p/>
* This event could be triggered in another thread. This will avoid potential deadlocks * This event will be triggered in another thread. This will avoid potential deadlocks
* in Coherence. * in Coherence.
* *
* @param asynchronous true if event will be triggered in background * @param asynchronous true if event will be triggered in background
......
...@@ -11,11 +11,7 @@ ...@@ -11,11 +11,7 @@
package org.jivesoftware.openfire.session; package org.jivesoftware.openfire.session;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
/** /**
* Server-to-server communication is done using two TCP connections between the servers. One * Server-to-server communication is done using two TCP connections between the servers. One
...@@ -40,14 +36,6 @@ import java.util.Set; ...@@ -40,14 +36,6 @@ import java.util.Set;
*/ */
public interface IncomingServerSession extends Session { public interface IncomingServerSession extends Session {
/**
* Cache (unlimited, never expire) that holds domains, subdomains and virtual
* hostnames of the remote server that were validated with this server for each
* incoming server session.
* Key: stream ID, Value: Domains and subdomains of the remote server that were
* validated with this server.
*/
static Cache<String, Set<String>> validatedDomainsCache = CacheFactory.createCache("Validated Domains");
/** /**
* Returns a collection with all the domains, subdomains and virtual hosts that where * Returns a collection with all the domains, subdomains and virtual hosts that where
* validated. The remote server is allowed to send packets from any of these domains, * validated. The remote server is allowed to send packets from any of these domains,
......
...@@ -22,7 +22,6 @@ import org.jivesoftware.openfire.net.SocketConnection; ...@@ -22,7 +22,6 @@ import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.server.ServerDialback; import org.jivesoftware.openfire.server.ServerDialback;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.lock.LockManager;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
...@@ -33,7 +32,6 @@ import java.util.Collection; ...@@ -33,7 +32,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.Lock;
/** /**
* Server-to-server communication is done using two TCP connections between the servers. One * Server-to-server communication is done using two TCP connections between the servers. One
...@@ -57,6 +55,12 @@ import java.util.concurrent.locks.Lock; ...@@ -57,6 +55,12 @@ import java.util.concurrent.locks.Lock;
* @author Gaston Dombiak * @author Gaston Dombiak
*/ */
public class LocalIncomingServerSession extends LocalSession implements IncomingServerSession { public class LocalIncomingServerSession extends LocalSession implements IncomingServerSession {
/**
* List of domains, subdomains and virtual hostnames of the remote server that were
* validated with this server. The remote server is allowed to send packets to this
* server from any of the validated domains.
*/
private Set<String> validatedDomains = new HashSet<String>();
/** /**
* Domains or subdomain of this server that was used by the remote server * Domains or subdomain of this server that was used by the remote server
...@@ -193,22 +197,6 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -193,22 +197,6 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
return session; return session;
} }
/**
* Removes the list of validate domains for the specified session. This is usually required
* when an incoming server session is closed.
*
* @param streamID the streamID that identifies the incoming server session.
*/
public static void releaseValidatedDomains(String streamID) {
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
validatedDomainsCache.remove(streamID);
} finally {
lock.unlock();
}
}
public LocalIncomingServerSession(String serverName, Connection connection, StreamID streamID) { public LocalIncomingServerSession(String serverName, Connection connection, StreamID streamID) {
super(serverName, connection, streamID); super(serverName, connection, streamID);
} }
...@@ -273,18 +261,7 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -273,18 +261,7 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
* @return domains, subdomains and virtual hosts that where validated. * @return domains, subdomains and virtual hosts that where validated.
*/ */
public Collection<String> getValidatedDomains() { public Collection<String> getValidatedDomains() {
String streamID = getStreamID().getID(); return Collections.unmodifiableCollection(validatedDomains);
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
return Collections.emptyList();
}
return Collections.unmodifiableCollection(validatedDomains);
} finally {
lock.unlock();
}
} }
/** /**
...@@ -294,24 +271,7 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -294,24 +271,7 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
* @param domain the new validated domain, subdomain or virtual host to add. * @param domain the new validated domain, subdomain or virtual host to add.
*/ */
public void addValidatedDomain(String domain) { public void addValidatedDomain(String domain) {
boolean added; if (validatedDomains.add(domain)) {
String streamID = getStreamID().getID();
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
validatedDomains = new HashSet<String>();
}
added = validatedDomains.add(domain);
if (added) {
validatedDomainsCache.put(streamID, validatedDomains);
}
} finally {
lock.unlock();
}
if (added) {
// Register the new validated domain for this server session in SessionManager // Register the new validated domain for this server session in SessionManager
SessionManager.getInstance().registerIncomingServerSession(domain, this); SessionManager.getInstance().registerIncomingServerSession(domain, this);
} }
...@@ -326,27 +286,9 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -326,27 +286,9 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
* validated domains. * validated domains.
*/ */
public void removeValidatedDomain(String domain) { public void removeValidatedDomain(String domain) {
String streamID = getStreamID().getID(); validatedDomains.remove(domain);
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
validatedDomains = new HashSet<String>();
}
validatedDomains.remove(domain);
if (!validatedDomains.isEmpty()) {
validatedDomainsCache.put(streamID, validatedDomains);
}
else {
validatedDomainsCache.remove(streamID);
}
} finally {
lock.unlock();
}
// Unregister the validated domain for this server session in SessionManager // Unregister the validated domain for this server session in SessionManager
SessionManager.getInstance().unregisterIncomingServerSessions(domain); SessionManager.getInstance().unregisterIncomingServerSession(domain, this);
} }
/** /**
......
...@@ -17,6 +17,7 @@ import org.jivesoftware.openfire.session.*; ...@@ -17,6 +17,7 @@ import org.jivesoftware.openfire.session.*;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.TaskEngine; import org.jivesoftware.util.TaskEngine;
import org.xmpp.packet.JID;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -133,6 +134,10 @@ class LocalRoutingTable { ...@@ -133,6 +134,10 @@ class LocalRoutingTable {
} }
} }
public boolean isLocalRoute(JID jid) {
return routes.containsKey(jid.toString());
}
/** /**
* Task that closes idle server sessions. * Task that closes idle server sessions.
*/ */
......
...@@ -279,7 +279,6 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -279,7 +279,6 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
else { else {
// Return a promise of a remote session. This object will queue packets pending // Return a promise of a remote session. This object will queue packets pending
// to be sent to remote servers // to be sent to remote servers
// TODO Make sure that creating outgoing connections is thread-safe across cluster nodes
OutgoingSessionPromise.getInstance().process(packet); OutgoingSessionPromise.getInstance().process(packet);
routed = true; routed = true;
} }
...@@ -480,24 +479,26 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -480,24 +479,26 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
return session; return session;
} }
public Collection<ClientSession> getClientsRoutes() { public Collection<ClientSession> getClientsRoutes(boolean onlyLocal) {
// Add sessions hosted by this cluster node // Add sessions hosted by this cluster node
Collection<ClientSession> sessions = new ArrayList<ClientSession>(localRoutingTable.getClientRoutes()); Collection<ClientSession> sessions = new ArrayList<ClientSession>(localRoutingTable.getClientRoutes());
// Add sessions not hosted by this JVM if (!onlyLocal) {
RemoteSessionLocator locator = server.getRemoteSessionLocator(); // Add sessions not hosted by this JVM
if (locator != null) { RemoteSessionLocator locator = server.getRemoteSessionLocator();
// Add sessions of non-anonymous users hosted by other cluster nodes if (locator != null) {
for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) { // Add sessions of non-anonymous users hosted by other cluster nodes
ClientRoute route = entry.getValue(); for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {
if (!Arrays.equals(route.getNodeID(), server.getNodeID())) { ClientRoute route = entry.getValue();
sessions.add(locator.getClientSession(route.getNodeID(), new JID(entry.getKey()))); if (!Arrays.equals(route.getNodeID(), server.getNodeID())) {
sessions.add(locator.getClientSession(route.getNodeID(), new JID(entry.getKey())));
}
} }
} // Add sessions of anonymous users hosted by other cluster nodes
// Add sessions of anonymous users hosted by other cluster nodes for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) {
for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) { ClientRoute route = entry.getValue();
ClientRoute route = entry.getValue(); if (!Arrays.equals(route.getNodeID(), server.getNodeID())) {
if (!Arrays.equals(route.getNodeID(), server.getNodeID())) { sessions.add(locator.getClientSession(route.getNodeID(), new JID(entry.getKey())));
sessions.add(locator.getClientSession(route.getNodeID(), new JID(entry.getKey()))); }
} }
} }
} }
...@@ -533,6 +534,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -533,6 +534,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
return anonymousUsersCache.get(jid.toString()) != null; return anonymousUsersCache.get(jid.toString()) != null;
} }
public boolean isLocalRoute(JID jid) {
return localRoutingTable.isLocalRoute(jid);
}
public boolean hasServerRoute(JID jid) { public boolean hasServerRoute(JID jid) {
return serversCache.get(jid.getDomain()) != null; return serversCache.get(jid.getDomain()) != null;
} }
...@@ -682,20 +687,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -682,20 +687,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
} }
public void joinedCluster(byte[] oldNodeID) { public void joinedCluster(byte[] oldNodeID) {
// Add outgoing server sessions hosted locally to the cache (using new nodeID) restoreCacheContent();
for (LocalOutgoingServerSession session : localRoutingTable.getServerRoutes()) {
addServerRoute(session.getAddress(), session);
}
// Add component sessions hosted locally to the cache (using new nodeID) and remove traces to old nodeID
for (RoutableChannelHandler route : localRoutingTable.getComponentRoute()) {
addComponentRoute(route.getAddress(), route);
}
// Add client sessions hosted locally to the cache (using new nodeID)
for (LocalClientSession session : localRoutingTable.getClientRoutes()) {
addClientRoute(session.getAddress(), session);
}
// Broadcast presence of local sessions to remote sessions when subscribed to presence // Broadcast presence of local sessions to remote sessions when subscribed to presence
// Probe presences of remote sessions when subscribed to presence of local session // Probe presences of remote sessions when subscribed to presence of local session
...@@ -722,18 +714,106 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -722,18 +714,106 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
// that clients connected to this JVM will be able to keep talking. // that clients connected to this JVM will be able to keep talking.
// In other words, their sessions will not be closed (and not removed from // In other words, their sessions will not be closed (and not removed from
// the routing table or the session manager). However, other nodes should // the routing table or the session manager). However, other nodes should
// get their routing tables correctly updated. // get their routing tables correctly updated so we need to temporarily
// TODO Implement this. Remove local sessions from caches // remove the content from the cache so other cluster nodes are correctly
// updated. Local content will be restored to cache in #leftCluster
// In the case of an abnormal disconnection from the cluster this event will
// not be triggered so it is up to the cluster nodes to know how to clean up
// their caches from the local data added by this JVM
// Remove outgoing server sessions hosted locally from the cache (using new nodeID)
for (LocalOutgoingServerSession session : localRoutingTable.getServerRoutes()) {
String address = session.getAddress().getDomain();
serversCache.remove(address);
}
// Remove component sessions hosted locally from the cache (using new nodeID) and remove traces to old nodeID
for (RoutableChannelHandler componentRoute : localRoutingTable.getComponentRoute()) {
JID route = componentRoute.getAddress();
String address = route.getDomain();
Lock lock = LockManager.getLock(address + "rt");
try {
lock.lock();
Set<byte[]> nodes = componentsCache.get(address);
if (nodes != null) {
nodes.remove(server.getNodeID());
if (nodes.isEmpty()) {
componentsCache.remove(address);
}
else {
componentsCache.put(address, nodes);
}
}
} finally {
lock.unlock();
}
}
// Remove client sessions hosted locally from the cache (using new nodeID)
for (LocalClientSession session : localRoutingTable.getClientRoutes()) {
boolean anonymous = false;
JID route = session.getAddress();
String address = route.toString();
ClientRoute clientRoute = usersCache.remove(address);
if (clientRoute == null) {
clientRoute = anonymousUsersCache.remove(address);
anonymous = true;
}
if (clientRoute != null && route.getResource() != null) {
Lock lock = LockManager.getLock(route.toBareJID());
try {
lock.lock();
if (anonymous) {
usersSessions.remove(route.toBareJID());
}
else {
Collection<String> jids = usersSessions.get(route.toBareJID());
if (jids != null) {
jids.remove(route.toString());
if (!jids.isEmpty()) {
usersSessions.put(route.toBareJID(), jids);
}
else {
usersSessions.remove(route.toBareJID());
}
}
}
}
finally {
lock.unlock();
}
}
}
} }
} }
public void leftCluster() { public void leftCluster() {
if (!XMPPServer.getInstance().isShuttingDown()) { if (!XMPPServer.getInstance().isShuttingDown()) {
// TODO Implement this. Add local sessions to caches // Add local sessions to caches
restoreCacheContent();
} }
} }
public void markedAsSeniorClusterMember() { public void markedAsSeniorClusterMember() {
// Do nothing // Do nothing
} }
private void restoreCacheContent() {
// Add outgoing server sessions hosted locally to the cache (using new nodeID)
for (LocalOutgoingServerSession session : localRoutingTable.getServerRoutes()) {
addServerRoute(session.getAddress(), session);
}
// Add component sessions hosted locally to the cache (using new nodeID) and remove traces to old nodeID
for (RoutableChannelHandler route : localRoutingTable.getComponentRoute()) {
addComponentRoute(route.getAddress(), route);
}
// Add client sessions hosted locally to the cache (using new nodeID)
for (LocalClientSession session : localRoutingTable.getClientRoutes()) {
addClientRoute(session.getAddress(), session);
}
}
} }
...@@ -75,6 +75,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -75,6 +75,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
cacheNames.put("Sessions by Hostname", "sessionsHostname"); cacheNames.put("Sessions by Hostname", "sessionsHostname");
cacheNames.put("Secret Keys Cache", "secretKeys"); cacheNames.put("Secret Keys Cache", "secretKeys");
cacheNames.put("Validated Domains", "validatedDomains"); cacheNames.put("Validated Domains", "validatedDomains");
cacheNames.put("Directed Presences", "directedPresences");
cacheProps.put("cache.fileTransfer.size", 128 * 1024l); cacheProps.put("cache.fileTransfer.size", 128 * 1024l);
cacheProps.put("cache.fileTransfer.expirationTime", 1000 * 60 * 10l); cacheProps.put("cache.fileTransfer.expirationTime", 1000 * 60 * 10l);
...@@ -128,6 +129,8 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -128,6 +129,8 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
cacheProps.put("cache.secretKeys.expirationTime", -1l); cacheProps.put("cache.secretKeys.expirationTime", -1l);
cacheProps.put("cache.validatedDomains.size", -1l); cacheProps.put("cache.validatedDomains.size", -1l);
cacheProps.put("cache.validatedDomains.expirationTime", -1l); cacheProps.put("cache.validatedDomains.expirationTime", -1l);
cacheProps.put("cache.directedPresences.size", -1l);
cacheProps.put("cache.directedPresences.expirationTime", -1l);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment