Commit d9d6d947 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

More and more and more work on clustering events.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8553 b35dd754-fafc-0310-a699-88a17e54d16e
parent 20ffb1c4
......@@ -233,6 +233,14 @@ public interface RoutingTable {
*/
Collection<ClientSession> getClientsRoutes(boolean onlyLocal);
/**
* Returns the ID of the node that is hosting the specified client session.
*
* @param jid the address of the specified client session.
* @return the ID of the node that is hosting the specified client session or null if not found.
*/
byte[] getNodeIDForClientRoute(JID jid);
/**
* Returns the outgoing server session associated to the specified XMPP address or <tt>null</tt>
* if none was found. When running inside of a cluster and a remote node is hosting
......
......@@ -1398,94 +1398,30 @@ public class SessionManager extends BasicModule implements ClusterEventListener
}
}
public void joinedCluster(byte[] oldNodeID) {
public void joinedCluster() {
restoreCacheContent();
}
public void leavingCluster() {
if (XMPPServer.getInstance().isShuttingDown()) {
// Do nothing since local sessions will be closed. Local session manager
// and local routing table will be correctly updated thus updating the
// other cluster nodes correctly
}
else {
// This JVM is leaving the cluster but will continue to work. That means
// that clients connected to this JVM will be able to keep talking.
// In other words, their sessions will not be closed (and not removed from
// the routing table or the session manager). However, other nodes should
// get their session managers correctly updated.
// Remove external component sessions hosted locally to the cache (using new nodeID)
for (Session session : localSessionManager.getComponentsSessions()) {
componentSessionsCache.remove(session.getAddress().toString());
}
// Remove connection multiplexer sessions hosted locally to the cache (using new nodeID)
for (String address : localSessionManager.getConnnectionManagerSessions().keySet()) {
multiplexerSessionsCache.remove(address);
}
// Remove incoming server sessions hosted locally to the cache (using new nodeID)
for (LocalIncomingServerSession session : localSessionManager.getIncomingServerSessions()) {
String streamID = session.getStreamID().getID();
incomingServerSessionsCache.remove(streamID);
for (String hostname : session.getValidatedDomains()) {
// Update list of sockets/sessions coming from the same remote hostname
Lock lock = LockManager.getLock(hostname);
try {
lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname);
streamIDs.remove(streamID);
if (streamIDs.isEmpty()) {
hostnameSessionsCache.remove(hostname);
}
else {
hostnameSessionsCache.put(hostname, streamIDs);
}
}
finally {
lock.unlock();
}
// Remove from clustered cache
lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
validatedDomains = new HashSet<String>();
}
validatedDomains.remove(hostname);
if (!validatedDomains.isEmpty()) {
validatedDomainsCache.put(streamID, validatedDomains);
}
else {
validatedDomainsCache.remove(streamID);
}
} finally {
lock.unlock();
}
}
}
// Update counters of client sessions
for (ClientSession session : routingTable.getClientsRoutes(true)) {
// Increment the counter of user sessions
decrementCounter("conncounter");
if (session.getStatus() == Session.STATUS_AUTHENTICATED) {
// Increment counter of authenticated sessions
decrementCounter("usercounter");
}
}
}
public void joinedCluster(byte[] nodeID) {
// Do nothing
}
public void leftCluster() {
// TODO Send unavailable presence TO roster contacts not hosted in this JVM (type=FROM)
// TODO Send unavailable presence FROM roster contacts not hosted in this JVM (type=TO)
// TODO Send unavailable presence FROM & TO roster contacts not hosted in this JVM (type=BOTH)
// TODO Send unavailable presence TO other resources of the user not hosted in this JVM
if (!XMPPServer.getInstance().isShuttingDown()) {
// Add local sessions to caches
restoreCacheContent();
}
}
public void leftCluster(byte[] nodeID) {
// Do nothing
}
public void markedAsSeniorClusterMember() {
// Do nothing
}
......
......@@ -29,29 +29,24 @@ public interface ClusterEventListener {
* At this point the CacheFactory holds clustered caches. That means that modifications
* to the caches will be reflected in the cluster. The clustered caches were just
* obtained from the cluster and no local cached data was automatically moved.<p>
*
* @param oldNodeID nodeID used by this JVM before joining the cluster.
*/
void joinedCluster(byte[] oldNodeID);
void joinedCluster();
/**
* Notification event indicating that this JVM is about to leave the cluster. This could
* happen when disabling clustering support, removing the enterprise plugin that provides
* clustering support or even shutdown the server.<p>
* Notification event indicating that another JVM is now part of a cluster.<p>
*
* At this point the CacheFactory is still holding clustered caches. That means that
* modifications to the caches will be reflected in the cluster.<p>
* At this point the CacheFactory of the new node holds clustered caches. That means
* that modifications to the caches of this JVM will be reflected in the cluster and
* in particular in the new node.
*
* Use {@link org.jivesoftware.openfire.XMPPServer#isShuttingDown()} to figure out if the
* server is being shutdown.
* @param nodeID ID of the node that joined the cluster.
*/
void leavingCluster();
void joinedCluster(byte[] nodeID);
/**
* Notification event indicating that this JVM is no longer part of the cluster. This could
* happen when disabling clustering support, removing the enterprise plugin that provides
* clustering support or connection to cluster got lost. If connection to cluster was lost
* then this event will not be predated by the {@link #leavingCluster()} event.<p>
* clustering support or connection to cluster got lost.<p>
*
* Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the
* island were this JVM belonged was marked as "old" then all nodes of that island will
......@@ -65,6 +60,25 @@ public interface ClusterEventListener {
*/
void leftCluster();
/**
* Notification event indicating that another JVM is no longer part of the cluster. This could
* happen when disabling clustering support, removing the enterprise plugin that provides
* clustering support or connection to cluster got lost.<p>
*
* Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the
* island were the other JVM belonged was marked as "old" then all nodes of that island will
* get the <tt>left cluster event</tt> and <tt>joined cluster events</tt>. That means that
* caches will be reset and thus will need to be repopulated again with fresh data from this JVM.
* This also includes the case where the other JVM was the senior cluster member and when the islands
* met again then the other JVM stopped being the senior member.<p>
*
* At this point the CacheFactory of the leaving node holds local caches. That means that modifications to
* the caches of this JVM will not affect the leaving node but other cluster members.
*
* @param nodeID ID of the node that is left the cluster.
*/
void leftCluster(byte[] nodeID);
/**
* Notification event indicating that this JVM is now the senior cluster member. This
* could either happen when initially joining the cluster or when the senior cluster
......@@ -74,7 +88,7 @@ public interface ClusterEventListener {
* island will have its own senior cluster member. However, when the islands meet again there
* could only be one senior cluster member so one of the senior cluster members will stop playing
* that role. When that happens the JVM no longer playing that role will receive the
* {@link #leftCluster()} and {@link #joinedCluster(byte[])} events.
* {@link #leftCluster()} and {@link #joinedCluster()} events.
*/
void markedAsSeniorClusterMember();
}
......@@ -43,11 +43,11 @@ public class ClusterManager {
Event event = events.take();
EventType eventType = event.getType();
// Make sure that CacheFactory is getting this events first (to update cache structure)
if (eventType == EventType.joined_cluster) {
if (eventType == EventType.joined_cluster && event.getNodeID() == null) {
// Replace standalone caches with clustered caches. Local cached data is not moved.
CacheFactory.joinedCluster();
}
else if (eventType == EventType.left_cluster) {
else if (eventType == EventType.left_cluster && event.getNodeID() == null) {
// Replace clustered caches with standalone caches. Cached data is not moved to new cache.
CacheFactory.leftCluster();
}
......@@ -56,15 +56,21 @@ public class ClusterManager {
try {
switch (eventType) {
case joined_cluster: {
listener.joinedCluster(event.getOldNodeID());
break;
}
case leaving_cluster: {
listener.leavingCluster();
if (event.getNodeID() == null) {
listener.joinedCluster();
}
else {
listener.joinedCluster(event.getNodeID());
}
break;
}
case left_cluster: {
listener.leftCluster();
if (event.getNodeID() == null) {
listener.leftCluster();
}
else {
listener.leftCluster(event.getNodeID());
}
break;
}
case marked_senior_cluster_member: {
......@@ -126,12 +132,11 @@ public class ClusterManager {
* This event will be triggered in another thread. This will avoid potential deadlocks
* in Coherence.
*
* @param oldNodeID nodeID used by this JVM before joining the cluster.
* @param asynchronous true if event will be triggered in background
*/
public static void fireJoinedCluster(byte[] oldNodeID, boolean asynchronous) {
public static void fireJoinedCluster(boolean asynchronous) {
try {
Event event = new Event(EventType.joined_cluster, oldNodeID);
Event event = new Event(EventType.joined_cluster, null);
events.put(event);
if (!asynchronous) {
while (!event.isProcessed()) {
......@@ -144,20 +149,22 @@ public class ClusterManager {
}
/**
* Triggers event indicating that this JVM is about to leave the cluster. This could
* happen when disabling clustering support, removing the enterprise plugin that provides
* clustering support or even shutdown the server.<p>
* <p/>
* This event will be triggered in another thread but won't return until all listeners have
* been alerted. This will give listeners the chance to use the cluster for any clean up
* operation before the node actually leaves the cluster.
* Triggers event indicating that another JVM is now part of a cluster.<p>
*
* This event will be triggered in another thread. This will avoid potential deadlocks
* in Coherence.
*
* @param nodeID nodeID assigned to the JVM when joining the cluster.
* @param asynchronous true if event will be triggered in background
*/
public static void fireLeavingCluster() {
public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) {
try {
Event event = new Event(EventType.leaving_cluster, null);
Event event = new Event(EventType.joined_cluster, nodeID);
events.put(event);
while (!event.isProcessed()) {
Thread.sleep(50);
if (!asynchronous) {
while (!event.isProcessed()) {
Thread.sleep(50);
}
}
} catch (InterruptedException e) {
// Should never happen
......@@ -204,7 +211,7 @@ public class ClusterManager {
* island will have its own senior cluster member. However, when the islands meet again there
* could only be one senior cluster member so one of the senior cluster members will stop playing
* that role. When that happens the JVM no longer playing that role will receive the
* {@link #fireLeftCluster(boolean)} and {@link #fireJoinedCluster(byte[],boolean)} events.<p>
* {@link #fireLeftCluster(boolean)} and {@link #fireJoinedCluster(boolean)} events.<p>
* <p/>
* This event will be triggered in another thread. This will avoid potential deadlocks
* in Coherence.
......@@ -305,20 +312,20 @@ public class ClusterManager {
private static class Event {
private EventType type;
private byte[] oldNodeID;
private byte[] nodeID;
private boolean processed;
public Event(EventType type, byte[] oldNodeID) {
this.type = type;
this.oldNodeID = oldNodeID;
this.nodeID = oldNodeID;
}
public EventType getType() {
return type;
}
public byte[] getOldNodeID() {
return oldNodeID;
public byte[] getNodeID() {
return nodeID;
}
public boolean isProcessed() {
......@@ -344,11 +351,6 @@ public class ClusterManager {
*/
joined_cluster,
/**
* This JVM is about to leave the cluster.
*/
leaving_cluster,
/**
* This JVM is no longer part of the cluster.
*/
......
......@@ -13,6 +13,8 @@ package org.jivesoftware.openfire.handler;
import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.roster.Roster;
import org.jivesoftware.openfire.roster.RosterItem;
......@@ -22,16 +24,17 @@ import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.ConcurrentHashSet;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.xmpp.packet.*;
import java.util.Collection;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Implements the presence protocol. Clients use this protocol to
......@@ -71,9 +74,23 @@ import java.util.concurrent.ConcurrentHashMap;
*
* @author Iain Shigeoka
*/
public class PresenceUpdateHandler extends BasicModule implements ChannelHandler {
public class PresenceUpdateHandler extends BasicModule implements ChannelHandler, ClusterEventListener {
private Map<String, Map<String, Set<String>>> directedPresences;
private static final String PRESENCE_CACHE_NAME = "Directed Presences";
/**
* Keeps track of entities that sent directed presences to other entities. In this map
* we keep track of every directed presence no matter if the recipient was hosted in
* this JVM or another cluster node.
*
* Key: sender, Value: list of DirectedPresences
*/
private Cache<String, Collection<DirectedPresence>> directedPresencesCache;
/**
* Same as the directedPresencesCache but only keeps directed presences sent from
* users connected to this JVM.
*/
private Map<String, Collection<DirectedPresence>> localDirectedPresences;
private RoutingTable routingTable;
private RosterManager rosterManager;
......@@ -86,7 +103,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
public PresenceUpdateHandler() {
super("Presence update handler");
directedPresences = new ConcurrentHashMap<String, Map<String, Set<String>>>();
localDirectedPresences = new ConcurrentHashMap<String, Collection<DirectedPresence>>();
}
public void process(Packet packet) throws UnauthorizedException, PacketException {
......@@ -296,7 +313,6 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
}
if (localServer.isLocal(update.getFrom())) {
boolean keepTrack = false;
Map<String, Set<String>> map;
String name = update.getFrom().getNode();
if (name != null && !"".equals(name)) {
// Keep track of all directed presences if roster service is disabled
......@@ -313,7 +329,9 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
try {
rosterItem = roster.getRosterItem(update.getTo());
}
catch (UserNotFoundException e) {}
catch (UserNotFoundException e) {
// Ignore
}
if (rosterItem == null ||
RosterItem.SUB_NONE == rosterItem.getSubStatus() ||
RosterItem.SUB_TO == rosterItem.getSubStatus()) {
......@@ -334,55 +352,72 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
}
if (keepTrack) {
String sender = update.getFrom().toString();
map = directedPresences.get(sender);
Collection<DirectedPresence> directedPresences = directedPresencesCache.get(sender);
if (Presence.Type.unavailable.equals(update.getType())) {
if (map != null) {
if (directedPresences != null) {
// It's a directed unavailable presence
if (routingTable.hasClientRoute(handlerJID)) {
// Client sessions will receive only presences to the same JID (the
// address of the session) so remove the handler from the map
map.remove(handlerJID.toString());
if (map.isEmpty()) {
// Remove the user from the registry since the list of directed
// presences is empty
directedPresences.remove(sender);
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
directedPresences.remove(directedPresence);
break;
}
}
}
else {
// A service may receive presences for many JIDs so in this case we
// just need to remove the jid that has received a directed
// unavailable presence
Set<String> jids = map.get(handlerJID.toString());
if (jids != null) {
jids.remove(jid);
if (jids.isEmpty()) {
map.remove(handlerJID.toString());
if (map.isEmpty()) {
// Remove the user from the registry since the list of directed
// presences is empty
directedPresences.remove(sender);
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
directedPresence.removeReceiver(jid);
if (directedPresence.isEmpty()) {
directedPresences.remove(directedPresence);
}
break;
}
}
}
if (directedPresences.isEmpty()) {
// Remove the user from the registry since the list of directed
// presences is empty
directedPresencesCache.remove(sender);
localDirectedPresences.remove(sender);
}
else {
directedPresencesCache.put(sender, directedPresences);
localDirectedPresences.put(sender, directedPresences);
}
}
}
else {
if (map == null) {
if (directedPresences == null) {
// We are using a set to avoid duplicate jids in case the user
// sends several directed presences to the same handler. The Map also
// ensures that if the user sends several presences to the same handler
// we will have only one entry in the Map
map = new ConcurrentHashMap<String, Set<String>>();
directedPresences.put(sender, map);
directedPresences = new ConcurrentLinkedQueue<DirectedPresence>();
}
// Add the handler to the list of handler that processed the directed
// presence sent by the user. This handler will be used to send
// the unavailable presence when the user goes offline
if (map.get(handlerJID.toString()) == null) {
map.put(handlerJID.toString(), new ConcurrentHashSet<String>());
DirectedPresence affectedDirectedPresence = null;
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
affectedDirectedPresence = directedPresence;
break;
}
}
map.get(handlerJID.toString()).add(jid);
if (affectedDirectedPresence == null) {
affectedDirectedPresence = new DirectedPresence(handlerJID);
}
affectedDirectedPresence.addReceiver(jid);
directedPresencesCache.put(sender, directedPresences);
localDirectedPresences.put(sender, directedPresences);
}
}
}
......@@ -395,37 +430,35 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
* @param update the unavailable presence sent by the user.
*/
private void broadcastUnavailableForDirectedPresences(Presence update) {
if (update.getFrom() == null) {
JID from = update.getFrom();
if (from == null) {
return;
}
if (localServer.isLocal(update.getFrom())) {
if (localServer.isLocal(from)) {
// Remove the registry of directed presences of this user
Map<String, Set<String>> map = directedPresences.remove(update.getFrom().toString());
if (map != null) {
Collection<DirectedPresence> directedPresences = directedPresencesCache.remove(from.toString());
if (directedPresences != null) {
// Iterate over all the entities that the user sent a directed presence
for (String handler : new HashSet<String>(map.keySet())) {
JID handlerJID = new JID(handler);
Set<String> jids = map.get(handler);
if (jids == null) {
continue;
}
for (String jid : jids) {
for (DirectedPresence directedPresence : directedPresences) {
for (String receiver : directedPresence.getReceivers()) {
Presence presence = update.createCopy();
presence.setTo(jid);
routingTable.routePacket(handlerJID, presence);
presence.setTo(receiver);
routingTable.routePacket(directedPresence.getHandler(), presence);
}
}
localDirectedPresences.remove(from.toString());
}
}
}
public boolean hasDirectPresence(Session session, JID recipientJID) {
Map<String, Set<String>> map = directedPresences.get(session.getAddress().toString());
if (map != null) {
Collection<DirectedPresence> directedPresences = directedPresencesCache.get(session.getAddress().toString());
if (directedPresences != null) {
String recipient = recipientJID.toBareJID();
for (Set<String> fullJIDs : map.values()) {
for (String fullJID : fullJIDs) {
if (fullJID.contains(recipient)) {
for (DirectedPresence directedPresence : directedPresences) {
for (String receiver : directedPresence.getReceivers()) {
if (receiver.contains(recipient)) {
return true;
}
}
......@@ -434,6 +467,26 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
return false;
}
/**
* Removes directed presences sent to entities that are no longer available.
*/
public void removedExpiredPresences() {
Map<String, Collection<DirectedPresence>> copy =
new HashMap<String, Collection<DirectedPresence>>(localDirectedPresences);
for (Map.Entry<String, Collection<DirectedPresence>> entry : copy.entrySet()) {
for (DirectedPresence directedPresence : entry.getValue()) {
if (!routingTable.hasClientRoute(directedPresence.getHandler()) &&
!routingTable.hasComponentRoute(directedPresence.getHandler())) {
Collection<DirectedPresence> presences = localDirectedPresences.get(entry.getKey());
presences.remove(directedPresence);
if (presences.isEmpty()) {
localDirectedPresences.remove(entry.getKey());
}
}
}
}
}
public void initialize(XMPPServer server) {
super.initialize(server);
localServer = server;
......@@ -444,7 +497,39 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
sessionManager = server.getSessionManager();
userManager = server.getUserManager();
routingTable = server.getRoutingTable();
// TODO Add as route listener (to remove direct presences info for removed routes)
directedPresencesCache = CacheFactory.createCache(PRESENCE_CACHE_NAME);
// TODO Add as route listener (to remove direct presences info for removed routes). Mainly for c2s sessions which is uncommon.
// Listen to cluster events
ClusterManager.addListener(this);
}
public void joinedCluster() {
// Populate directedPresencesCache with local content since when not in a cluster
// we could still send directed presences to entities that when connected to a cluster
// they will be replicated. An example would be MUC rooms.
for (Map.Entry<String, Collection<DirectedPresence>> entry : localDirectedPresences.entrySet()) {
directedPresencesCache.put(entry.getKey(), entry.getValue());
}
}
public void joinedCluster(byte[] nodeID) {
// Do nothing
}
public void leftCluster() {
if (!XMPPServer.getInstance().isShuttingDown()) {
// Populate directedPresencesCache with local content
for (Map.Entry<String, Collection<DirectedPresence>> entry : localDirectedPresences.entrySet()) {
directedPresencesCache.put(entry.getKey(), entry.getValue());
}
}
}
public void leftCluster(byte[] nodeID) {
// Do nothing
}
public void markedAsSeniorClusterMember() {
// Do nothing
}
}
......@@ -12,15 +12,15 @@
package org.jivesoftware.openfire.muc.spi;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.NotFoundException;
import org.jivesoftware.openfire.PacketException;
import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.muc.*;
import org.jivesoftware.openfire.user.UserAlreadyExistsException;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.NotFoundException;
import org.xmpp.packet.*;
import java.util.*;
......@@ -476,6 +476,7 @@ public class MUCUserImpl implements MUCUser {
else {
if (Presence.Type.unavailable == packet.getType()) {
try {
// TODO Consider that different nodes can be creating and processing this presence at the same time (when remote node went down)
removeRole(group);
role.getChatRoom().leaveRoom(role.getNickname());
}
......
......@@ -923,18 +923,30 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
outMessages.addAndGet(numOccupants);
}
public void joinedCluster(byte[] oldNodeID) {
public void joinedCluster() {
// Disable the service until we know that we are the senior cluster member
enableService(false, false);
//TODO Do not disable the service. All nodes will host the service (unless it was disabled before)
//TODO Merge rooms existing in the cluster with the ones of the new node
//TODO For rooms that exist in cluster and in new node then send presences of remote occupants to LOCAL occupants
}
public void leavingCluster() {
// Do nothing
public void joinedCluster(byte[] nodeID) {
//TODO Merge rooms existing in the cluster with the ones of the new node
//TODO For rooms that exist in cluster and in new node then send presences of new remote occupants to LOCAL occupants
}
public void leftCluster() {
// Offer the service when not running in a cluster
enableService(true, false);
//TODO Do not mess with service enablement! :)
//TODO Send unavailable presences of leaving remote occupants to LOCAL occupants
//TODO Remove rooms with no occupants (should happen with previous step)?
}
public void leftCluster(byte[] nodeID) {
//TODO Send unavailable presences of leaving remote occupants to LOCAL occupants
//TODO Remove rooms with no occupants (should happen with previous step)?
}
public void markedAsSeniorClusterMember() {
......
......@@ -439,12 +439,12 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
}
public void joinedCluster(byte[] oldNodeID) {
public void joinedCluster() {
// Disable the service until we know that we are the senior cluster member
enableService(false);
}
public void leavingCluster() {
public void joinedCluster(byte[] nodeID) {
// Do nothing
}
......@@ -453,6 +453,10 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
enableService(true);
}
public void leftCluster(byte[] nodeID) {
// Do nothing
}
public void markedAsSeniorClusterMember() {
// Offer the service since we are the senior cluster member
enableService(true);
......
......@@ -527,11 +527,22 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
}
public boolean hasClientRoute(JID jid) {
return usersCache.get(jid.toString()) != null || isAnonymousRoute(jid);
return usersCache.containsKey(jid.toString()) || isAnonymousRoute(jid);
}
public byte[] getNodeIDForClientRoute(JID jid) {
ClientRoute clientRoute = usersCache.get(jid.toString());
if (clientRoute == null) {
clientRoute = anonymousUsersCache.get(jid.toString());
}
if (clientRoute != null) {
return clientRoute.getNodeID();
}
return null;
}
public boolean isAnonymousRoute(JID jid) {
return anonymousUsersCache.get(jid.toString()) != null;
return anonymousUsersCache.containsKey(jid.toString());
}
public boolean isLocalRoute(JID jid) {
......@@ -539,11 +550,11 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
}
public boolean hasServerRoute(JID jid) {
return serversCache.get(jid.getDomain()) != null;
return serversCache.containsKey(jid.getDomain());
}
public boolean hasComponentRoute(JID jid) {
return componentsCache.get(jid.getDomain()) != null;
return componentsCache.containsKey(jid.getDomain());
}
public List<JID> getRoutes(JID route) {
......@@ -581,7 +592,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
else if (route.getDomain().contains(serverName)) {
// Packet sent to component hosted in this server
if (componentsCache.containsKey(route.getDomain())) {
jids.add(route);
jids.add(new JID(route.getDomain()));
}
}
else {
......@@ -686,7 +697,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
localRoutingTable.stop();
}
public void joinedCluster(byte[] oldNodeID) {
public void joinedCluster() {
restoreCacheContent();
// Broadcast presence of local sessions to remote sessions when subscribed to presence
......@@ -703,89 +714,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
}
}
public void leavingCluster() {
if (XMPPServer.getInstance().isShuttingDown()) {
// Do nothing since local sessions will be closed. Local session manager
// and local routing table will be correctly updated thus updating the
// other cluster nodes correctly
}
else {
// This JVM is leaving the cluster but will continue to work. That means
// that clients connected to this JVM will be able to keep talking.
// In other words, their sessions will not be closed (and not removed from
// the routing table or the session manager). However, other nodes should
// get their routing tables correctly updated so we need to temporarily
// remove the content from the cache so other cluster nodes are correctly
// updated. Local content will be restored to cache in #leftCluster
// In the case of an abnormal disconnection from the cluster this event will
// not be triggered so it is up to the cluster nodes to know how to clean up
// their caches from the local data added by this JVM
// Remove outgoing server sessions hosted locally from the cache (using new nodeID)
for (LocalOutgoingServerSession session : localRoutingTable.getServerRoutes()) {
String address = session.getAddress().getDomain();
serversCache.remove(address);
}
// Remove component sessions hosted locally from the cache (using new nodeID) and remove traces to old nodeID
for (RoutableChannelHandler componentRoute : localRoutingTable.getComponentRoute()) {
JID route = componentRoute.getAddress();
String address = route.getDomain();
Lock lock = LockManager.getLock(address + "rt");
try {
lock.lock();
Set<byte[]> nodes = componentsCache.get(address);
if (nodes != null) {
nodes.remove(server.getNodeID());
if (nodes.isEmpty()) {
componentsCache.remove(address);
}
else {
componentsCache.put(address, nodes);
}
}
} finally {
lock.unlock();
}
}
// Remove client sessions hosted locally from the cache (using new nodeID)
for (LocalClientSession session : localRoutingTable.getClientRoutes()) {
boolean anonymous = false;
JID route = session.getAddress();
String address = route.toString();
ClientRoute clientRoute = usersCache.remove(address);
if (clientRoute == null) {
clientRoute = anonymousUsersCache.remove(address);
anonymous = true;
}
if (clientRoute != null && route.getResource() != null) {
Lock lock = LockManager.getLock(route.toBareJID());
try {
lock.lock();
if (anonymous) {
usersSessions.remove(route.toBareJID());
}
else {
Collection<String> jids = usersSessions.get(route.toBareJID());
if (jids != null) {
jids.remove(route.toString());
if (!jids.isEmpty()) {
usersSessions.put(route.toBareJID(), jids);
}
else {
usersSessions.remove(route.toBareJID());
}
}
}
}
finally {
lock.unlock();
}
}
}
}
public void joinedCluster(byte[] nodeID) {
// Do nothing
}
public void leftCluster() {
......@@ -795,6 +725,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
}
}
public void leftCluster(byte[] nodeID) {
// Do nothing
}
public void markedAsSeniorClusterMember() {
// Do nothing
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment