Commit 5678a1c3 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

More work on clustering events.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8508 b35dd754-fafc-0310-a699-88a17e54d16e
parent 1acb1222
...@@ -46,6 +46,10 @@ import java.util.concurrent.locks.Lock; ...@@ -46,6 +46,10 @@ import java.util.concurrent.locks.Lock;
*/ */
public class SessionManager extends BasicModule { public class SessionManager extends BasicModule {
public static final String COMPONENT_SESSION_CACHE_NAME = "Components Sessions";
public static final String CM_CACHE_NAME = "Connection Managers Sessions";
public static final String ISS_CACHE_NAME = "Incoming Server Sessions";
public static final int NEVER_KICK = -1; public static final int NEVER_KICK = -1;
private XMPPServer server; private XMPPServer server;
...@@ -1149,9 +1153,9 @@ public class SessionManager extends BasicModule { ...@@ -1149,9 +1153,9 @@ public class SessionManager extends BasicModule {
// Initialize caches. // Initialize caches.
countersCache = CacheFactory.createCache("Session Manager Counters"); countersCache = CacheFactory.createCache("Session Manager Counters");
componentSessionsCache = CacheFactory.createCache("Components Sessions"); componentSessionsCache = CacheFactory.createCache(COMPONENT_SESSION_CACHE_NAME);
multiplexerSessionsCache = CacheFactory.createCache("Connection Managers Sessions"); multiplexerSessionsCache = CacheFactory.createCache(CM_CACHE_NAME);
incomingServerSessionsCache = CacheFactory.createCache("Incoming Server Sessions"); incomingServerSessionsCache = CacheFactory.createCache(ISS_CACHE_NAME);
hostnameSessionsCache = CacheFactory.createCache("Sessions by Hostname"); hostnameSessionsCache = CacheFactory.createCache("Sessions by Hostname");
} }
......
...@@ -131,6 +131,7 @@ public class XMPPServer { ...@@ -131,6 +131,7 @@ public class XMPPServer {
"org.jivesoftware.openfire.starter.ServerStarter"; "org.jivesoftware.openfire.starter.ServerStarter";
private static final String WRAPPER_CLASSNAME = private static final String WRAPPER_CLASSNAME =
"org.tanukisoftware.wrapper.WrapperManager"; "org.tanukisoftware.wrapper.WrapperManager";
private boolean shuttingDown;
/** /**
* Returns a singleton instance of XMPPServer. * Returns a singleton instance of XMPPServer.
...@@ -877,6 +878,7 @@ public class XMPPServer { ...@@ -877,6 +878,7 @@ public class XMPPServer {
* Makes a best effort attempt to shutdown the server * Makes a best effort attempt to shutdown the server
*/ */
private void shutdownServer() { private void shutdownServer() {
shuttingDown = true;
// Notify server listeners that the server is about to be stopped // Notify server listeners that the server is about to be stopped
for (XMPPServerListener listener : listeners) { for (XMPPServerListener listener : listeners) {
listener.serverStopping(); listener.serverStopping();
...@@ -904,6 +906,15 @@ public class XMPPServer { ...@@ -904,6 +906,15 @@ public class XMPPServer {
Log.info("Openfire stopped"); Log.info("Openfire stopped");
} }
/**
* Returns true if the server is being shutdown.
*
* @return true if the server is being shutdown.
*/
public boolean isShuttingDown() {
return shuttingDown;
}
/** /**
* Returns the <code>ConnectionManager</code> registered with this server. The * Returns the <code>ConnectionManager</code> registered with this server. The
* <code>ConnectionManager</code> was registered with the server as a module while starting up * <code>ConnectionManager</code> was registered with the server as a module while starting up
......
...@@ -20,14 +20,32 @@ package org.jivesoftware.openfire.cluster; ...@@ -20,14 +20,32 @@ package org.jivesoftware.openfire.cluster;
public interface ClusterEventListener { public interface ClusterEventListener {
/** /**
* Notification event indication that this JVM is now part of a cluster. The * Notification event indicating that this JVM is now part of a cluster. At this point the
* {@link org.jivesoftware.openfire.XMPPServer#getNodeID()} will now return * {@link org.jivesoftware.openfire.XMPPServer#getNodeID()} holds the new nodeID value.<p>
* a new value.<p>
* *
* When joining the cluster as the senior cluster member the {@link #markedAsSeniorClusterMember()} * When joining the cluster as the senior cluster member the {@link #markedAsSeniorClusterMember()}
* event will be sent right after this event. * event will be sent right after this event.<p>
*
* At this point the CacheFactory holds clustered caches. That means that modifications
* to the caches will be reflected in the cluster. The clustered caches were just
* obtained from the cluster and no local cached data was automatically moved.<p>
*
* @param oldNodeID nodeID used by this JVM before joining the cluster.
*/
void joinedCluster(byte[] oldNodeID);
/**
* Notification event indicating that this JVM is about to leave the cluster. This could
* happen when disabling clustering support, removing the enterprise plugin that provides
* clustering support or even shutdown the server.<p>
*
* At this point the CacheFactory is still holding clustered caches. That means that
* modifications to the caches will be reflected in the cluster.<p>
*
* Use {@link org.jivesoftware.openfire.XMPPServer#isShuttingDown()} to figure out if the
* server is being shutdown.
*/ */
void joinedCluster(); void leavingCluster();
/** /**
* Notification event indicating that this JVM is no longer part of the cluster. This could * Notification event indicating that this JVM is no longer part of the cluster. This could
...@@ -39,7 +57,10 @@ public interface ClusterEventListener { ...@@ -39,7 +57,10 @@ public interface ClusterEventListener {
* get the <tt>left cluster event</tt> and <tt>joined cluster events</tt>. That means that * get the <tt>left cluster event</tt> and <tt>joined cluster events</tt>. That means that
* caches will be reset and thus will need to be repopulated again with fresh data from this JVM. * caches will be reset and thus will need to be repopulated again with fresh data from this JVM.
* This also includes the case where this JVM was the senior cluster member and when the islands * This also includes the case where this JVM was the senior cluster member and when the islands
* met again then this JVM stopped being the senior member. * met again then this JVM stopped being the senior member.<p>
*
* At this point the CacheFactory holds local caches. That means that modifications to
* the caches will only affect this JVM.
*/ */
void leftCluster(); void leftCluster();
...@@ -52,7 +73,7 @@ public interface ClusterEventListener { ...@@ -52,7 +73,7 @@ public interface ClusterEventListener {
* island will have its own senior cluster member. However, when the islands meet again there * island will have its own senior cluster member. However, when the islands meet again there
* could only be one senior cluster member so one of the senior cluster members will stop playing * could only be one senior cluster member so one of the senior cluster members will stop playing
* that role. When that happens the JVM no longer playing that role will receive the * that role. When that happens the JVM no longer playing that role will receive the
* {@link #leftCluster()} and {@link #joinedCluster()} events. * {@link #leftCluster()} and {@link #joinedCluster(byte[])} events.
*/ */
void markedAsSeniorClusterMember(); void markedAsSeniorClusterMember();
} }
...@@ -33,38 +33,59 @@ public class ClusterManager { ...@@ -33,38 +33,59 @@ public class ClusterManager {
private static String CLUSTER_PROPERTY_NAME = "cache.clustering.enabled"; private static String CLUSTER_PROPERTY_NAME = "cache.clustering.enabled";
private static Queue<ClusterEventListener> listeners = new ConcurrentLinkedQueue<ClusterEventListener>(); private static Queue<ClusterEventListener> listeners = new ConcurrentLinkedQueue<ClusterEventListener>();
private static BlockingQueue<EventType> events = new LinkedBlockingQueue<EventType>(); private static BlockingQueue<Event> events = new LinkedBlockingQueue<Event>();
static { static {
Thread thread = new Thread("ClusterManager events dispatcher") { Thread thread = new Thread("ClusterManager events dispatcher") {
public void run() { public void run() {
try { for (; ;) {
EventType eventType = events.take(); try {
for (ClusterEventListener listener : listeners) { Event event = events.take();
try { EventType eventType = event.getType();
switch (eventType) { // Make sure that CacheFactory is getting this events first (to update cache structure)
case joined_cluster: { if (eventType == EventType.joined_cluster) {
listener.joinedCluster(); // Replace standalone caches with clustered caches. Local cached data is not moved.
break; CacheFactory.joinedCluster();
} }
case left_cluster: { else if (eventType == EventType.left_cluster) {
listener.leftCluster(); // Replace clustered caches with standalone caches. Cached data is not moved to new cache.
break; CacheFactory.leftCluster();
} }
case marked_senior_cluster_member: { // Now notify rest of the listeners
listener.markedAsSeniorClusterMember(); for (ClusterEventListener listener : listeners) {
break; try {
switch (eventType) {
case joined_cluster: {
listener.joinedCluster(event.getOldNodeID());
break;
}
case leaving_cluster: {
listener.leavingCluster();
break;
}
case left_cluster: {
listener.leftCluster();
break;
}
case marked_senior_cluster_member: {
listener.markedAsSeniorClusterMember();
break;
}
default:
break;
} }
default: }
break; catch (Exception e) {
Log.error(e);
} }
} }
catch (Exception e) { // Mark event as processed
Log.error(e); event.setProcessed(true);
} } catch (InterruptedException e) {
Log.warn(e);
} catch (Exception e) {
Log.error(e);
} }
} catch (InterruptedException e) {
Log.warn(e);
} }
} }
}; };
...@@ -95,14 +116,89 @@ public class ClusterManager { ...@@ -95,14 +116,89 @@ public class ClusterManager {
/** /**
* Dispatches an event to all listeners. The dispatch will occur in another thread * Triggers event indicating that this JVM is now part of a cluster. At this point the
* to avoid potential deadlocks in Coherence. * {@link org.jivesoftware.openfire.XMPPServer#getNodeID()} holds the new nodeID value and
* the old nodeID value is passed in case the listener needs it.<p>
* <p/>
* When joining the cluster as the senior cluster member the {@link #fireMarkedAsSeniorClusterMember()}
* event will be sent right after this event.<p>
* <p/>
* This event could be triggered in another thread. This will avoid potential deadlocks
* in Coherence.
* *
* @param eventType the event type. * @param oldNodeID nodeID used by this JVM before joining the cluster.
* @param asynchronous true if event will be triggered in background
*/
public static void fireJoinedCluster(byte[] oldNodeID, boolean asynchronous) {
try {
Event event = new Event(EventType.joined_cluster, oldNodeID);
events.put(event);
if (!asynchronous) {
while (!event.isProcessed()) {
Thread.sleep(100);
}
}
} catch (InterruptedException e) {
// Should never happen
}
}
/**
* Triggers event indicating that this JVM is about to leave the cluster. This could
* happen when disabling clustering support, removing the enterprise plugin that provides
* clustering support or even shutdown the server.<p>
* <p/>
* This event will be triggered in another thread. This will avoid potential deadlocks
* in Coherence.
*/
public static void fireLeavingCluster() {
try {
events.put(new Event(EventType.leaving_cluster, null));
} catch (InterruptedException e) {
// Should never happen
}
}
/**
* Triggers event indicating that this JVM is no longer part of the cluster. This could
* happen when disabling clustering support or removing the enterprise plugin that provides
* clustering support.<p>
* <p/>
* Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the
* island were this JVM belonged was marked as "old" then all nodes of that island will
* get the <tt>left cluster event</tt> and <tt>joined cluster events</tt>. That means that
* caches will be reset and thus will need to be repopulated again with fresh data from this JVM.
* This also includes the case where this JVM was the senior cluster member and when the islands
* met again then this JVM stopped being the senior member.<p>
* <p/>
* This event will be triggered in another thread. This will avoid potential deadlocks
* in Coherence.
*/
public static void fireLeftCluster() {
try {
events.put(new Event(EventType.left_cluster, null));
} catch (InterruptedException e) {
// Should never happen
}
}
/**
* Triggers event indicating that this JVM is now the senior cluster member. This
* could either happen when initially joining the cluster or when the senior cluster
* member node left the cluster and this JVM was marked as the new senior cluster member.<p>
* <p/>
* Moreover, in the case of a "split brain" scenario (ie. separated cluster islands) each
* island will have its own senior cluster member. However, when the islands meet again there
* could only be one senior cluster member so one of the senior cluster members will stop playing
* that role. When that happens the JVM no longer playing that role will receive the
* {@link #fireLeftCluster()} and {@link #fireJoinedCluster(byte[],boolean)} events.<p>
* <p/>
* This event will be triggered in another thread. This will avoid potential deadlocks
* in Coherence.
*/ */
public static void dispatchEvent(EventType eventType) { public static void fireMarkedAsSeniorClusterMember() {
try { try {
events.put(eventType); events.put(new Event(EventType.marked_senior_cluster_member, null));
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Should never happen // Should never happen
} }
...@@ -111,7 +207,7 @@ public class ClusterManager { ...@@ -111,7 +207,7 @@ public class ClusterManager {
/** /**
* Starts the cluster service if clustering is enabled. The process of starting clustering * Starts the cluster service if clustering is enabled. The process of starting clustering
* will recreate caches as distributed caches.<p> * will recreate caches as distributed caches.<p>
* * <p/>
* Before starting a cluster the {@link LockManager#setLockFactory(org.jivesoftware.util.lock.LockFactory)}, * Before starting a cluster the {@link LockManager#setLockFactory(org.jivesoftware.util.lock.LockFactory)},
* {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and * {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and
* {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)} * {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)}
...@@ -194,10 +290,41 @@ public class ClusterManager { ...@@ -194,10 +290,41 @@ public class ClusterManager {
return CacheFactory.isSeniorClusterMember(); return CacheFactory.isSeniorClusterMember();
} }
private static class Event {
private EventType type;
private byte[] oldNodeID;
private boolean processed;
public Event(EventType type, byte[] oldNodeID) {
this.type = type;
this.oldNodeID = oldNodeID;
}
public EventType getType() {
return type;
}
public byte[] getOldNodeID() {
return oldNodeID;
}
public boolean isProcessed() {
return processed;
}
public void setProcessed(boolean processed) {
this.processed = processed;
}
public String toString() {
return super.toString() + " type: " + type;
}
}
/** /**
* Represents valid event types. * Represents valid event types.
*/ */
public enum EventType { private enum EventType {
/** /**
* This JVM joined a cluster. * This JVM joined a cluster.
...@@ -205,12 +332,17 @@ public class ClusterManager { ...@@ -205,12 +332,17 @@ public class ClusterManager {
joined_cluster, joined_cluster,
/** /**
* This JVM is no longer part of a cluster. * This JVM is about to leave the cluster.
*/
leaving_cluster,
/**
* This JVM is no longer part of the cluster.
*/ */
left_cluster, left_cluster,
/** /**
* This JVM is now the senior cluster member. * This JVM is now the senior cluster member.
*/ */
marked_senior_cluster_member marked_senior_cluster_member
} }
......
...@@ -923,11 +923,15 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -923,11 +923,15 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
outMessages.addAndGet(numOccupants); outMessages.addAndGet(numOccupants);
} }
public void joinedCluster() { public void joinedCluster(byte[] oldNodeID) {
// Disable the service until we know that we are the senior cluster member // Disable the service until we know that we are the senior cluster member
enableService(false, false); enableService(false, false);
} }
public void leavingCluster() {
// Do nothing
}
public void leftCluster() { public void leftCluster() {
// Offer the service when not running in a cluster // Offer the service when not running in a cluster
enableService(true, false); enableService(true, false);
......
...@@ -439,11 +439,15 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -439,11 +439,15 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
} }
} }
public void joinedCluster() { public void joinedCluster(byte[] oldNodeID) {
// Disable the service until we know that we are the senior cluster member // Disable the service until we know that we are the senior cluster member
enableService(false); enableService(false);
} }
public void leavingCluster() {
// Do nothing
}
public void leftCluster() { public void leftCluster() {
// Offer the service when not running in a cluster // Offer the service when not running in a cluster
enableService(true); enableService(true);
......
...@@ -13,10 +13,7 @@ package org.jivesoftware.openfire.spi; ...@@ -13,10 +13,7 @@ package org.jivesoftware.openfire.spi;
import org.jivesoftware.openfire.RoutableChannelHandler; import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.*;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.TaskEngine; import org.jivesoftware.util.TaskEngine;
...@@ -60,11 +57,41 @@ class LocalRoutingTable { ...@@ -60,11 +57,41 @@ class LocalRoutingTable {
* *
* @return the client sessions that are connected to this JVM. * @return the client sessions that are connected to this JVM.
*/ */
Collection<ClientSession> getClientRoutes() { Collection<LocalClientSession> getClientRoutes() {
List<ClientSession> sessions = new ArrayList<ClientSession>(); List<LocalClientSession> sessions = new ArrayList<LocalClientSession>();
for (RoutableChannelHandler route : routes.values()) { for (RoutableChannelHandler route : routes.values()) {
if (route instanceof ClientSession) { if (route instanceof LocalClientSession) {
sessions.add((ClientSession) route); sessions.add((LocalClientSession) route);
}
}
return sessions;
}
/**
* Returns the outgoing server sessions that are connected to this JVM.
*
* @return the outgoing server sessions that are connected to this JVM.
*/
Collection<LocalOutgoingServerSession> getServerRoutes() {
List<LocalOutgoingServerSession> sessions = new ArrayList<LocalOutgoingServerSession>();
for (RoutableChannelHandler route : routes.values()) {
if (route instanceof LocalOutgoingServerSession) {
sessions.add((LocalOutgoingServerSession) route);
}
}
return sessions;
}
/**
* Returns the external component sessions that are connected to this JVM.
*
* @return the external component sessions that are connected to this JVM.
*/
Collection<RoutableChannelHandler> getComponentRoute() {
List<RoutableChannelHandler> sessions = new ArrayList<RoutableChannelHandler>();
for (RoutableChannelHandler route : routes.values()) {
if (!(route instanceof LocalOutgoingServerSession || route instanceof LocalClientSession)) {
sessions.add(route);
} }
} }
return sessions; return sessions;
......
...@@ -13,7 +13,10 @@ package org.jivesoftware.openfire.spi; ...@@ -13,7 +13,10 @@ package org.jivesoftware.openfire.spi;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.handler.PresenceUpdateHandler;
import org.jivesoftware.openfire.server.OutgoingSessionPromise; import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.*; import org.jivesoftware.openfire.session.*;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
...@@ -42,7 +45,12 @@ import java.util.concurrent.locks.Lock; ...@@ -42,7 +45,12 @@ import java.util.concurrent.locks.Lock;
* *
* @author Gaston Dombiak * @author Gaston Dombiak
*/ */
public class RoutingTableImpl extends BasicModule implements RoutingTable { public class RoutingTableImpl extends BasicModule implements RoutingTable, ClusterEventListener {
public static final String C2S_CACHE_NAME = "Routing Users Cache";
public static final String ANONYMOUS_C2S_CACHE_NAME = "Routing AnonymousUsers Cache";
public static final String S2S_CACHE_NAME = "Routing Servers Cache";
public static final String COMPONENT_CACHE_NAME = "Routing Components Cache";
/** /**
* Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server. * Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
...@@ -69,7 +77,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -69,7 +77,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
* (includes anonymous). * (includes anonymous).
* Key: bare JID, Value: list of full JIDs of the user * Key: bare JID, Value: list of full JIDs of the user
*/ */
private Cache<String, List<String>> usersSessions; private Cache<String, Collection<String>> usersSessions;
private String serverName; private String serverName;
private XMPPServer server; private XMPPServer server;
...@@ -81,10 +89,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -81,10 +89,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
public RoutingTableImpl() { public RoutingTableImpl() {
super("Routing table"); super("Routing table");
serversCache = CacheFactory.createCache("Routing Servers Cache"); serversCache = CacheFactory.createCache(S2S_CACHE_NAME);
componentsCache = CacheFactory.createCache("Routing Components Cache"); componentsCache = CacheFactory.createCache(COMPONENT_CACHE_NAME);
usersCache = CacheFactory.createCache("Routing Users Cache"); usersCache = CacheFactory.createCache(C2S_CACHE_NAME);
anonymousUsersCache = CacheFactory.createCache("Routing AnonymousUsers Cache"); anonymousUsersCache = CacheFactory.createCache(ANONYMOUS_C2S_CACHE_NAME);
usersSessions = CacheFactory.createCache("Routing User Sessions"); usersSessions = CacheFactory.createCache("Routing User Sessions");
localRoutingTable = new LocalRoutingTable(); localRoutingTable = new LocalRoutingTable();
} }
...@@ -137,9 +145,9 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -137,9 +145,9 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
Lock lock = LockManager.getLock(route.toBareJID()); Lock lock = LockManager.getLock(route.toBareJID());
try { try {
lock.lock(); lock.lock();
List<String> jids = usersSessions.get(route.toBareJID()); Collection<String> jids = usersSessions.get(route.toBareJID());
if (jids == null) { if (jids == null) {
jids = new ArrayList<String>(); jids = new HashSet<String>();
} }
jids.add(route.toString()); jids.add(route.toString());
usersSessions.put(route.toBareJID(), jids); usersSessions.put(route.toBareJID(), jids);
...@@ -540,7 +548,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -540,7 +548,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
} }
else { else {
// Address is a bare JID so return all AVAILABLE resources of user // Address is a bare JID so return all AVAILABLE resources of user
List<String> sessions = usersSessions.get(route.toBareJID()); Collection<String> sessions = usersSessions.get(route.toBareJID());
if (sessions != null) { if (sessions != null) {
// Select only available sessions // Select only available sessions
for (String jid : sessions) { for (String jid : sessions) {
...@@ -584,7 +592,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -584,7 +592,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
usersSessions.remove(route.toBareJID()); usersSessions.remove(route.toBareJID());
} }
else { else {
List<String> jids = usersSessions.get(route.toBareJID()); Collection<String> jids = usersSessions.get(route.toBareJID());
if (jids != null) { if (jids != null) {
jids.remove(route.toString()); jids.remove(route.toString());
if (!jids.isEmpty()) { if (!jids.isEmpty()) {
...@@ -649,6 +657,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -649,6 +657,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
iqRouter = server.getIQRouter(); iqRouter = server.getIQRouter();
messageRouter = server.getMessageRouter(); messageRouter = server.getMessageRouter();
presenceRouter = server.getPresenceRouter(); presenceRouter = server.getPresenceRouter();
// Listen to cluster events
ClusterManager.addListener(this);
} }
public void start() throws IllegalStateException { public void start() throws IllegalStateException {
...@@ -660,4 +670,60 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -660,4 +670,60 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
super.stop(); super.stop();
localRoutingTable.stop(); localRoutingTable.stop();
} }
public void joinedCluster(byte[] oldNodeID) {
// Add outgoing server sessions hosted locally to the cache (using new nodeID)
for (LocalOutgoingServerSession session : localRoutingTable.getServerRoutes()) {
addServerRoute(session.getAddress(), session);
}
// Add component sessions hosted locally to the cache (using new nodeID) and remove traces to old nodeID
for (RoutableChannelHandler route : localRoutingTable.getComponentRoute()) {
addComponentRoute(route.getAddress(), route);
}
// Add client sessions hosted locally to the cache (using new nodeID)
for (LocalClientSession session : localRoutingTable.getClientRoutes()) {
addClientRoute(session.getAddress(), session);
}
// Broadcast presence of local sessions to remote sessions when subscribed to presence
// Probe presences of remote sessions when subscribed to presence of local session
// Send pending subscription requests to local sessions from remote sessions
// Deliver offline messages sent to local sessions that were unavailable in other nodes
// Send available presences of local sessions to other resources of the same user
PresenceUpdateHandler presenceUpdateHandler = XMPPServer.getInstance().getPresenceUpdateHandler();
for (LocalClientSession session : localRoutingTable.getClientRoutes()) {
// Simulate that the local session has just became available
session.setInitialized(false);
// Simulate that current session presence has just been received
presenceUpdateHandler.process(session.getPresence());
}
}
public void leavingCluster() {
if (XMPPServer.getInstance().isShuttingDown()) {
// Do nothing since local sessions will be closed. Local session manager
// and local routing table will be correctly updated thus updating the
// other cluster nodes correctly
}
else {
// This JVM is leaving the cluster but will continue to work. That means
// that clients connected to this JVM will be able to keep talking.
// In other words, their sessions will not be closed (and not removed from
// the routing table or the session manager). However, other nodes should
// get their routing tables correctly updated.
// TODO Implement this. Remove local sessions from caches
}
}
public void leftCluster() {
if (!XMPPServer.getInstance().isShuttingDown()) {
// TODO Implement this. Add local sessions to caches
}
}
public void markedAsSeniorClusterMember() {
// Do nothing
}
} }
...@@ -20,7 +20,6 @@ import org.jivesoftware.util.Log; ...@@ -20,7 +20,6 @@ import org.jivesoftware.util.Log;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* Creates Cache objects. The returned caches will either be local or clustered * Creates Cache objects. The returned caches will either be local or clustered
...@@ -44,11 +43,6 @@ public class CacheFactory { ...@@ -44,11 +43,6 @@ public class CacheFactory {
*/ */
private static Map<String, Cache> caches = new ConcurrentHashMap<String, Cache>(); private static Map<String, Cache> caches = new ConcurrentHashMap<String, Cache>();
/**
* List of registered listeners to be notified when clustering is enabled or disabled.
*/
private static List<ClusteringListener> listeners = new CopyOnWriteArrayList<ClusteringListener>();
private static String localCacheFactoryClass; private static String localCacheFactoryClass;
private static String clusteredCacheFactoryClass; private static String clusteredCacheFactoryClass;
private static CacheFactoryStrategy cacheFactoryStrategy; private static CacheFactoryStrategy cacheFactoryStrategy;
...@@ -67,7 +61,7 @@ public class CacheFactory { ...@@ -67,7 +61,7 @@ public class CacheFactory {
* Returns an array of all caches in the system. * Returns an array of all caches in the system.
* @return an array of all caches in the system. * @return an array of all caches in the system.
*/ */
public static synchronized Cache[] getAllCaches() { public static Cache[] getAllCaches() {
List<Cache> values = new ArrayList<Cache>(); List<Cache> values = new ArrayList<Cache>();
for (Cache cache : caches.values()) { for (Cache cache : caches.values()) {
values.add(cache); values.add(cache);
...@@ -280,26 +274,6 @@ public class CacheFactory { ...@@ -280,26 +274,6 @@ public class CacheFactory {
} }
} }
public static void addClusteringListener(ClusteringListener listener) {
listeners.add(listener);
}
public static void removeClusteringListener(ClusteringListener listener) {
listeners.remove(listener);
}
private static void fireClusteringStarted() {
for (ClusteringListener listener : listeners) {
(listener).clusteringStarted();
}
}
private static void fireClusteringStopped() {
for (ClusteringListener listener : listeners) {
(listener).clusteringStopped();
}
}
public static synchronized void initialize() throws InitializationException { public static synchronized void initialize() throws InitializationException {
try { try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class cacheFactoryStrategy = (CacheFactoryStrategy) Class
...@@ -389,30 +363,16 @@ public class CacheFactory { ...@@ -389,30 +363,16 @@ public class CacheFactory {
private static void startClustering() { private static void startClustering() {
clusteringStarted = false; clusteringStarted = false;
boolean clusterStarted = false;
try { try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(clusteredCacheFactoryClass, true, cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(clusteredCacheFactoryClass, true,
getClusteredCacheStrategyClassLoader("enterprise")) getClusteredCacheStrategyClassLoader("enterprise"))
.newInstance(); .newInstance();
clusterStarted = cacheFactoryStrategy.startCluster(); clusteringStarted = cacheFactoryStrategy.startCluster();
if (clusterStarted) {
// Loop through local caches and switch them to clustered cache.
for (String cacheName : caches.keySet()) {
CacheWrapper wrapper = (CacheWrapper) caches.get(cacheName);
wrapper.setWrappedCache(cacheFactoryStrategy.createCache(cacheName));
}
clusteringStarted = true;
// Set the ID of this cluster node
XMPPServer.getInstance().setNodeID(CacheFactory.getClusterMemberID());
// Fire event that cluster has been started
fireClusteringStarted();
}
} }
catch (Exception e) { catch (Exception e) {
Log.error("Unable to start clustering - continuing in local mode", e); Log.error("Unable to start clustering - continuing in local mode", e);
} }
if (!clusterStarted) { if (!clusteringStarted) {
// Revert to local cache factory if cluster fails to start // Revert to local cache factory if cluster fails to start
try { try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance(); cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance();
...@@ -428,35 +388,39 @@ public class CacheFactory { ...@@ -428,35 +388,39 @@ public class CacheFactory {
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass) cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass)
.newInstance(); .newInstance();
// Loop through clustered caches and change them to local caches.
for (String cacheName : caches.keySet()) {
CacheWrapper wrapper = (CacheWrapper) caches.get(cacheName);
wrapper.setWrappedCache(cacheFactoryStrategy.createCache(cacheName));
}
clusteringStarted = false; clusteringStarted = false;
// Reset the node ID // Reset the node ID
XMPPServer.getInstance().setNodeID(null); XMPPServer.getInstance().setNodeID(null);
// Stop the cluster // Stop the cluster
clusteredFactory.stopCluster(); clusteredFactory.stopCluster();
// Fire event that cluster has been stopped
fireClusteringStopped();
} }
catch (Exception e) { catch (Exception e) {
Log.error("Unable to stop clustering - continuing in clustered mode", e); Log.error("Unable to stop clustering - continuing in clustered mode", e);
} }
} }
/** /**
* Listener interface for any object which needs to be notified when clustering starts or stops * Notification message indicating that this JVM has joined a cluster.
*/ */
public static interface ClusteringListener { public static void joinedCluster() {
// Loop through local caches and switch them to clustered cache (migrate content)
public void clusteringStarted(); for (Cache cache : getAllCaches()) {
CacheWrapper cacheWrapper = ((CacheWrapper) cache);
public void clusteringStopped(); Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
cacheWrapper.setWrappedCache(clusteredCache);
}
} }
/**
* Notification message indicating that this JVM has left the cluster.
*/
public static void leftCluster() {
// Loop through clustered caches and change them to local caches (migrate content)
for (Cache cache : getAllCaches()) {
CacheWrapper cacheWrapper = ((CacheWrapper) cache);
Cache standaloneCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
cacheWrapper.setWrappedCache(standaloneCache);
}
}
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment