Commit 2a16973e authored by Greg Thomas's avatar Greg Thomas

HZ-5: No functional changes, just tidy up the code before fixing the issue

parent 2782c8fb
......@@ -15,18 +15,23 @@
*/
package org.jivesoftware.openfire.plugin.util.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import org.jivesoftware.openfire.*;
import com.hazelcast.core.Cluster;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleEvent.LifecycleState;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import org.jivesoftware.openfire.PacketException;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.cluster.NodeID;
......@@ -40,7 +45,6 @@ import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.spi.ClientRoute;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.CacheWrapper;
......@@ -49,18 +53,17 @@ import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
import org.xmpp.packet.Presence;
import com.hazelcast.core.Cluster;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleEvent.LifecycleState;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
/**
* ClusterListener reacts to membership changes in the cluster. It takes care of cleaning up the state
......@@ -68,7 +71,7 @@ import com.hazelcast.core.MembershipListener;
*/
public class ClusterListener implements MembershipListener, LifecycleListener {
private static Logger logger = LoggerFactory.getLogger(ClusterListener.class);
private static final Logger logger = LoggerFactory.getLogger(ClusterListener.class);
private static final int C2S_CACHE_IDX = 0;
private static final int ANONYMOUS_C2S_CACHE_IDX = 1;
......@@ -82,33 +85,33 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
/**
* Caches stored in RoutingTable
*/
Cache<String, ClientRoute> C2SCache;
Cache<String, ClientRoute> anonymousC2SCache;
Cache<DomainPair, byte[]> S2SCache;
Cache<String, Set<NodeID>> componentsCache;
private Cache<String, ClientRoute> C2SCache;
private Cache<String, ClientRoute> anonymousC2SCache;
private Cache<DomainPair, byte[]> S2SCache;
private Cache<String, Set<NodeID>> componentsCache;
/**
* Caches stored in SessionManager
*/
Cache<String, ClientSessionInfo> sessionInfoCache;
Cache<String, byte[]> componentSessionsCache;
Cache<String, byte[]> multiplexerSessionsCache;
Cache<String, byte[]> incomingServerSessionsCache;
private Cache<String, ClientSessionInfo> sessionInfoCache;
private Cache<String, byte[]> componentSessionsCache;
private Cache<String, byte[]> multiplexerSessionsCache;
private Cache<String, byte[]> incomingServerSessionsCache;
/**
* Caches stored in PresenceUpdateHandler
*/
Cache<String, Collection<DirectedPresence>> directedPresencesCache;
private Cache<String, Collection<DirectedPresence>> directedPresencesCache;
private Map<NodeID, Set<String>[]> nodeSessions = new ConcurrentHashMap<NodeID, Set<String>[]>();
private Map<NodeID, Set<String>[]> nodeSessions = new ConcurrentHashMap<>();
private Map<NodeID, Set<DomainPair>> nodeRoutes = new ConcurrentHashMap<>();
private Map<NodeID, Map<String, Collection<String>>> nodePresences = new ConcurrentHashMap<NodeID, Map<String, Collection<String>>>();
private Map<NodeID, Map<String, Collection<String>>> nodePresences = new ConcurrentHashMap<>();
private boolean seniorClusterMember = CacheFactory.isSeniorClusterMember();
private Map<Cache, EntryListener> EntryListeners = new HashMap<Cache, EntryListener>();
private Map<Cache<?,?>, EntryListener> entryListeners = new HashMap<>();
private Cluster cluster;
private Map<String, ClusterNodeInfo> clusterNodesInfo = new ConcurrentHashMap<String, ClusterNodeInfo>();
private Map<String, ClusterNodeInfo> clusterNodesInfo = new ConcurrentHashMap<>();
/**
* Flag that indicates if the listener has done all clean up work when noticed that the
......@@ -117,14 +120,14 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
*/
private boolean done = true;
public ClusterListener(Cluster cluster) {
ClusterListener(Cluster cluster) {
this.cluster = cluster;
for (Member member : cluster.getMembers()) {
clusterNodesInfo.put(member.getUuid(),
clusterNodesInfo.put(member.getUuid(),
new HazelcastClusterNodeInfo(member, cluster.getClusterTime()));
}
C2SCache = CacheFactory.createCache(RoutingTableImpl.C2S_CACHE_NAME);
anonymousC2SCache = CacheFactory.createCache(RoutingTableImpl.ANONYMOUS_C2S_CACHE_NAME);
S2SCache = CacheFactory.createCache(RoutingTableImpl.S2S_CACHE_NAME);
......@@ -140,28 +143,34 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
joinCluster();
}
private void addEntryListener(Cache cache, EntryListener listener) {
private void addEntryListener(Cache<?, ?> cache, EntryListener listener) {
if (cache instanceof CacheWrapper) {
Cache wrapped = ((CacheWrapper)cache).getWrappedCache();
if (wrapped instanceof ClusteredCache) {
((ClusteredCache)wrapped).addEntryListener(listener, false);
// Keep track of the listener that we added to the cache
EntryListeners.put(cache, listener);
entryListeners.put(cache, listener);
}
}
}
private void simulateCacheInserts(Cache cache) {
EntryListener EntryListener = EntryListeners.get(cache);
if (EntryListener != null) {
@SuppressWarnings("unchecked")
private void simulateCacheInserts(Cache<?, ?> cache) {
final EntryListener<?,?> entryListener = entryListeners.get(cache);
if (entryListener != null) {
if (cache instanceof CacheWrapper) {
Cache wrapped = ((CacheWrapper) cache).getWrappedCache();
if (wrapped instanceof ClusteredCache) {
ClusteredCache clusteredCache = (ClusteredCache) wrapped;
for (Map.Entry entry : (Set<Map.Entry>) cache.entrySet()) {
EntryEvent event = new EntryEvent(clusteredCache.map.getName(), cluster.getLocalMember(),
EntryEventType.ADDED.getType(), entry.getKey(), null, entry.getValue());
EntryListener.entryAdded(event);
for (Map.Entry<?, ?> entry : cache.entrySet()) {
EntryEvent event = new EntryEvent<>(
clusteredCache.map.getName(),
cluster.getLocalMember(),
EntryEventType.ADDED.getType(),
entry.getKey(),
null,
entry.getValue());
entryListener.entryAdded(event);
}
}
}
......@@ -200,8 +209,9 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
}
}
@SuppressWarnings("unchecked")
private Set<String>[] insertJIDList(NodeID nodeKey) {
Set<String>[] allLists = new Set[] {
Set<String>[] allLists = new Set[] {
new HashSet<String>(),
new HashSet<String>(),
new HashSet<String>(),
......@@ -264,7 +274,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> registeredUsers = lookupJIDList(key, C2SCache.getName());
if (!registeredUsers.isEmpty()) {
for (String fullJID : new ArrayList<String>(registeredUsers)) {
for (String fullJID : new ArrayList<>(registeredUsers)) {
JID offlineJID = new JID(fullJID);
manager.removeSession(null, offlineJID, false, true);
}
......@@ -272,7 +282,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> anonymousUsers = lookupJIDList(key, anonymousC2SCache.getName());
if (!anonymousUsers.isEmpty()) {
for (String fullJID : new ArrayList<String>(anonymousUsers)) {
for (String fullJID : new ArrayList<>(anonymousUsers)) {
JID offlineJID = new JID(fullJID);
manager.removeSession(null, offlineJID, true, true);
}
......@@ -290,11 +300,11 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> components = lookupJIDList(key, componentsCache.getName());
if (!components.isEmpty()) {
for (String address : new ArrayList<String>(components)) {
for (String address : new ArrayList<>(components)) {
Lock lock = CacheFactory.getLock(address, componentsCache);
try {
lock.lock();
Set<NodeID> nodes = (Set<NodeID>) componentsCache.get(address);
Set<NodeID> nodes = componentsCache.get(address);
if (nodes != null) {
nodes.remove(key);
if (nodes.isEmpty()) {
......@@ -312,7 +322,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> sessionInfo = lookupJIDList(key, sessionInfoCache.getName());
if (!sessionInfo.isEmpty()) {
for (String session : new ArrayList<String>(sessionInfo)) {
for (String session : new ArrayList<>(sessionInfo)) {
sessionInfoCache.remove(session);
// Registered sessions will be removed
// by the clean up of the session info cache
......@@ -321,7 +331,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> componentSessions = lookupJIDList(key, componentSessionsCache.getName());
if (!componentSessions.isEmpty()) {
for (String domain : new ArrayList<String>(componentSessions)) {
for (String domain : new ArrayList<>(componentSessions)) {
componentSessionsCache.remove(domain);
// Registered subdomains of external component will be removed
// by the clean up of the component cache
......@@ -330,7 +340,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> multiplexers = lookupJIDList(key, multiplexerSessionsCache.getName());
if (!multiplexers.isEmpty()) {
for (String fullJID : new ArrayList<String>(multiplexers)) {
for (String fullJID : new ArrayList<>(multiplexers)) {
multiplexerSessionsCache.remove(fullJID);
// c2s connections connected to node that went down will be cleaned up
// by the c2s logic above. If the CM went down and the node is up then
......@@ -362,7 +372,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
private void cleanupPresences(NodeID key) {
Set<String> registeredUsers = lookupJIDList(key, C2SCache.getName());
if (!registeredUsers.isEmpty()) {
for (String fullJID : new ArrayList<String>(registeredUsers)) {
for (String fullJID : new ArrayList<>(registeredUsers)) {
JID offlineJID = new JID(fullJID);
try {
Presence presence = new Presence(Presence.Type.unavailable);
......@@ -377,7 +387,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> anonymousUsers = lookupJIDList(key, anonymousC2SCache.getName());
if (!anonymousUsers.isEmpty()) {
for (String fullJID : new ArrayList<String>(anonymousUsers)) {
for (String fullJID : new ArrayList<>(anonymousUsers)) {
JID offlineJID = new JID(fullJID);
try {
Presence presence = new Presence(Presence.Type.unavailable);
......@@ -396,16 +406,17 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
/**
* EntryListener implementation tracks events for caches of c2s sessions.
*/
private class DirectedPresenceListener implements EntryListener {
private class DirectedPresenceListener implements EntryListener<String, Collection<DirectedPresence>> {
public void entryAdded(EntryEvent event) {
byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid());
@Override
public void entryAdded(EntryEvent<String, Collection<DirectedPresence>> event) {
byte[] nodeID = event.getMember().getUuid().getBytes(StandardCharsets.UTF_8);
// Ignore events originated from this JVM
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
// Check if the directed presence was sent to an entity hosted by this JVM
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
String sender = event.getKey().toString();
Collection<String> handlers = new HashSet<String>();
String sender = event.getKey();
Collection<String> handlers = new HashSet<>();
for (JID handler : getHandlers(event)) {
if (routingTable.isLocalRoute(handler)) {
// Keep track of the remote sender and local handler that got the directed presence
......@@ -415,7 +426,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
if (!handlers.isEmpty()) {
Map<String, Collection<String>> senders = nodePresences.get(NodeID.getInstance(nodeID));
if (senders == null) {
senders = new ConcurrentHashMap<String, Collection<String>>();
senders = new ConcurrentHashMap<>();
nodePresences.put(NodeID.getInstance(nodeID), senders);
}
senders.put(sender, handlers);
......@@ -423,14 +434,15 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
}
}
public void entryUpdated(EntryEvent event) {
byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid());
@Override
public void entryUpdated(EntryEvent<String, Collection<DirectedPresence>> event) {
byte[] nodeID = event.getMember().getUuid().getBytes(StandardCharsets.UTF_8);
// Ignore events originated from this JVM
if (nodeID != null && !XMPPServer.getInstance().getNodeID().equals(nodeID)) {
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
// Check if the directed presence was sent to an entity hosted by this JVM
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
String sender = event.getKey().toString();
Collection<String> handlers = new HashSet<String>();
String sender = event.getKey();
Collection<String> handlers = new HashSet<>();
for (JID handler : getHandlers(event)) {
if (routingTable.isLocalRoute(handler)) {
// Keep track of the remote sender and local handler that got the directed presence
......@@ -439,7 +451,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
}
Map<String, Collection<String>> senders = nodePresences.get(NodeID.getInstance(nodeID));
if (senders == null) {
senders = new ConcurrentHashMap<String, Collection<String>>();
senders = new ConcurrentHashMap<>();
nodePresences.put(NodeID.getInstance(nodeID), senders);
}
if (!handlers.isEmpty()) {
......@@ -452,32 +464,33 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
}
}
public void entryRemoved(EntryEvent event) {
@Override
public void entryRemoved(EntryEvent<String, Collection<DirectedPresence>> event) {
if (event == null || (event.getValue() == null && event.getOldValue() == null)) {
// Nothing to remove
return;
}
byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid());
byte[] nodeID = event.getMember().getUuid().getBytes(StandardCharsets.UTF_8);
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
String sender = event.getKey().toString();
String sender = event.getKey();
nodePresences.get(NodeID.getInstance(nodeID)).remove(sender);
}
}
Collection<JID> getHandlers(EntryEvent event) {
Object value = event.getValue();
Collection<JID> answer = new ArrayList<JID>();
Collection<JID> getHandlers(EntryEvent<String, Collection<DirectedPresence>> event) {
Collection<DirectedPresence> value = event.getValue();
Collection<JID> answer = new ArrayList<>();
if (value != null) {
for (DirectedPresence directedPresence : (Collection<DirectedPresence>)value) {
for (DirectedPresence directedPresence : value) {
answer.add(directedPresence.getHandler());
}
}
return answer;
}
Set<String> getReceivers(EntryEvent event, JID handler) {
Object value = event.getValue();
for (DirectedPresence directedPresence : (Collection<DirectedPresence>)value) {
Set<String> getReceivers(EntryEvent<String, Collection<DirectedPresence>> event, JID handler) {
Collection<DirectedPresence> value = event.getValue();
for (DirectedPresence directedPresence : value) {
if (directedPresence.getHandler().equals(handler)) {
return directedPresence.getReceivers();
}
......@@ -485,18 +498,19 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
return Collections.emptySet();
}
public void entryEvicted(EntryEvent event) {
@Override
public void entryEvicted(EntryEvent<String, Collection<DirectedPresence>> event) {
entryRemoved(event);
}
private void mapClearedOrEvicted(MapEvent event) {
NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid()));
NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8));
// ignore events which were triggered by this node
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
nodePresences.get(nodeID).clear();
}
}
@Override
public void mapEvicted(MapEvent event) {
mapClearedOrEvicted(event);
......@@ -511,24 +525,26 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
/**
* EntryListener implementation tracks events for caches of internal/external components.
*/
private class ComponentCacheListener implements EntryListener {
private class ComponentCacheListener implements EntryListener<String, Set<NodeID>> {
public void entryAdded(EntryEvent event) {
Object newValue = event.getValue();
@Override
public void entryAdded(EntryEvent<String, Set<NodeID>> event) {
Set<NodeID> newValue = event.getValue();
if (newValue != null) {
for (NodeID nodeID : (Set<NodeID>) newValue) {
for (NodeID nodeID : newValue) {
//ignore items which this node has added
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<String> sessionJIDS = lookupJIDList(nodeID, componentsCache.getName());
sessionJIDS.add(event.getKey().toString());
sessionJIDS.add(event.getKey());
}
}
}
}
public void entryUpdated(EntryEvent event) {
@Override
public void entryUpdated(EntryEvent<String, Set<NodeID>> event) {
// Remove any trace to the component that was added/deleted to some node
String domain = event.getKey().toString();
String domain = event.getKey();
for (Map.Entry<NodeID, Set<String>[]> entry : nodeSessions.entrySet()) {
// Get components hosted in this node
Set<String> nodeComponents = entry.getValue()[COMPONENT_CACHE_IDX];
......@@ -538,32 +554,34 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
entryAdded(event);
}
public void entryRemoved(EntryEvent event) {
Object newValue = event.getValue();
@Override
public void entryRemoved(EntryEvent<String, Set<NodeID>> event) {
Set<NodeID> newValue = event.getValue();
if (newValue != null) {
for (NodeID nodeID : (Set<NodeID>) newValue) {
for (NodeID nodeID : newValue) {
//ignore items which this node has added
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<String> sessionJIDS = lookupJIDList(nodeID, componentsCache.getName());
sessionJIDS.remove(event.getKey().toString());
sessionJIDS.remove(event.getKey());
}
}
}
}
public void entryEvicted(EntryEvent event) {
@Override
public void entryEvicted(EntryEvent<String, Set<NodeID>> event) {
entryRemoved(event);
}
private void mapClearedOrEvicted(MapEvent event) {
NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid()));
NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8));
// ignore events which were triggered by this node
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<String> sessionJIDs = lookupJIDList(nodeID, componentsCache.getName());
sessionJIDs.clear();
}
}
@Override
public void mapEvicted(MapEvent event) {
mapClearedOrEvicted(event);
......@@ -620,7 +638,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
}
seniorClusterMember = false;
// Clean up all traces. This will set all remote sessions as unavailable
List<NodeID> nodeIDs = new ArrayList<NodeID>(nodeSessions.keySet());
List<NodeID> nodeIDs = new ArrayList<>(nodeSessions.keySet());
// Trigger event. Wait until the listeners have processed the event. Caches will be populated
// again with local content.
......@@ -642,25 +660,27 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
done = true;
}
@Override
public void memberAdded(MembershipEvent event) {
// local member only
if (event.getMember().localMember()) { // We left and re-joined the cluster
joinCluster();
} else {
nodePresences.put(NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid())),
nodePresences.put(NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8)),
new ConcurrentHashMap<String, Collection<String>>());
// Trigger event that a new node has joined the cluster
ClusterManager.fireJoinedCluster(StringUtils.getBytes(event.getMember().getUuid()), true);
ClusterManager.fireJoinedCluster(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8), true);
}
clusterNodesInfo.put(event.getMember().getUuid(),
new HazelcastClusterNodeInfo(event.getMember(), cluster.getClusterTime()));
}
@Override
public void memberRemoved(MembershipEvent event) {
byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid());
byte[] nodeID = event.getMember().getUuid().getBytes(StandardCharsets.UTF_8);
if (event.getMember().localMember()) {
logger.info("Leaving cluster: " + nodeID);
logger.info("Leaving cluster: " + new String(nodeID, StandardCharsets.UTF_8));
// This node may have realized that it got kicked out of the cluster
leaveCluster();
} else {
......@@ -687,9 +707,10 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
}
public List<ClusterNodeInfo> getClusterNodesInfo() {
return new ArrayList<ClusterNodeInfo>(clusterNodesInfo.values());
return new ArrayList<>(clusterNodesInfo.values());
}
@Override
public void stateChanged(LifecycleEvent event) {
if (event.getState().equals(LifecycleState.SHUTDOWN)) {
leaveCluster();
......@@ -705,28 +726,32 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
new HazelcastClusterNodeInfo(event.getMember(), priorNodeInfo.getJoinedTime()));
}
class S2SCacheListener implements EntryListener {
public S2SCacheListener() {
class S2SCacheListener implements EntryListener<DomainPair, byte[]> {
S2SCacheListener() {
}
public void entryAdded(EntryEvent event) {
@Override
public void entryAdded(EntryEvent<DomainPair, byte[]> event) {
handleEntryEvent(event, false);
}
public void entryUpdated(EntryEvent event) {
@Override
public void entryUpdated(EntryEvent<DomainPair, byte[]> event) {
handleEntryEvent(event, false);
}
public void entryRemoved(EntryEvent event) {
@Override
public void entryRemoved(EntryEvent<DomainPair, byte[]> event) {
handleEntryEvent(event, true);
}
public void entryEvicted(EntryEvent event) {
@Override
public void entryEvicted(EntryEvent<DomainPair, byte[]> event) {
handleEntryEvent(event, true);
}
private void handleEntryEvent(EntryEvent event, boolean removal) {
NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid()));
private void handleEntryEvent(EntryEvent<DomainPair, byte[]> event, boolean removal) {
NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8));
// ignore events which were triggered by this node
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<DomainPair> sessionJIDS = nodeRoutes.get(nodeID);
......@@ -735,15 +760,14 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
}
if (removal) {
sessionJIDS.remove(event.getKey());
}
else {
sessionJIDS.add((DomainPair)event.getKey());
} else {
sessionJIDS.add(event.getKey());
}
}
}
private void handleMapEvent(MapEvent event) {
NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid()));
NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8));
// ignore events which were triggered by this node
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<DomainPair> sessionJIDS = nodeRoutes.get(nodeID);
......
......@@ -32,7 +32,6 @@ import org.jivesoftware.openfire.plugin.session.RemoteSessionLocator;
import org.jivesoftware.openfire.plugin.util.cluster.ClusterPacketRouter;
import org.jivesoftware.openfire.plugin.util.cluster.HazelcastClusterNodeInfo;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.CacheFactoryStrategy;
......@@ -44,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
......@@ -69,18 +70,18 @@ import java.util.concurrent.locks.Lock;
*/
public class ClusteredCacheFactory implements CacheFactoryStrategy {
public static final String HAZELCAST_EXECUTOR_SERVICE_NAME =
JiveGlobals.getProperty("hazelcast.executor.service.name", "openfire::cluster::executor");
private static final long MAX_CLUSTER_EXECUTION_TIME =
JiveGlobals.getLongProperty("hazelcast.max.execution.seconds", 30);
private static final long CLUSTER_STARTUP_RETRY_TIME =
JiveGlobals.getLongProperty("hazelcast.startup.retry.seconds", 10);
private static final long CLUSTER_STARTUP_RETRY_COUNT =
JiveGlobals.getLongProperty("hazelcast.startup.retry.count", 1);
private static final String HAZELCAST_CONFIG_FILE =
JiveGlobals.getProperty("hazelcast.config.xml.filename", "hazelcast-cache-config.xml");
private static final boolean HAZELCAST_JMX_ENABLED =
JiveGlobals.getBooleanProperty("hazelcast.config.jmx.enabled", false);
private static final String HAZELCAST_EXECUTOR_SERVICE_NAME =
JiveGlobals.getProperty("hazelcast.executor.service.name", "openfire::cluster::executor");
private static final long MAX_CLUSTER_EXECUTION_TIME =
JiveGlobals.getLongProperty("hazelcast.max.execution.seconds", 30);
private static final long CLUSTER_STARTUP_RETRY_TIME =
JiveGlobals.getLongProperty("hazelcast.startup.retry.seconds", 10);
private static final long CLUSTER_STARTUP_RETRY_COUNT =
JiveGlobals.getLongProperty("hazelcast.startup.retry.count", 1);
private static final String HAZELCAST_CONFIG_FILE =
JiveGlobals.getProperty("hazelcast.config.xml.filename", "hazelcast-cache-config.xml");
private static final boolean HAZELCAST_JMX_ENABLED =
JiveGlobals.getBooleanProperty("hazelcast.config.jmx.enabled", false);
private static Logger logger = LoggerFactory.getLogger(ClusteredCacheFactory.class);
......@@ -106,9 +107,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
*/
private State state = State.stopped;
@Override
public boolean startCluster() {
state = State.starting;
// Set the serialization strategy to use for transmitting objects between node clusters
serializationStrategy = ExternalizableUtil.getInstance().getStrategy();
ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil());
......@@ -117,7 +119,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
// Set packet router to use to deliver packets to remote cluster nodes
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter());
ClassLoader oldLoader = null;
ClassLoader oldLoader;
// Store previous class loader (in case we change it)
oldLoader = Thread.currentThread().getContextClassLoader();
ClassLoader loader = new ClusterClassLoader();
......@@ -134,10 +136,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
}
hazelcast = Hazelcast.newHazelcastInstance(config);
cluster = hazelcast.getCluster();
// Update the running state of the cluster
state = cluster != null ? State.started : State.stopped;
// Set the ID of this cluster node
XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID()));
// CacheFactory is now using clustered caches. We can add our listeners.
......@@ -146,18 +148,22 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
membershipListener = cluster.addMembershipListener(clusterListener);
break;
} catch (Exception e) {
cluster = null;
if (retry < CLUSTER_STARTUP_RETRY_COUNT) {
logger.warn("Failed to start clustering (" + e.getMessage() + "); " +
"will retry in " + CLUSTER_STARTUP_RETRY_TIME + " seconds");
try { Thread.sleep(CLUSTER_STARTUP_RETRY_TIME*1000); }
catch (InterruptedException ie) { /* ignore */ }
logger.warn("Failed to start clustering (" + e.getMessage() + "); " +
"will retry in " + CLUSTER_STARTUP_RETRY_TIME + " seconds");
try {
Thread.sleep(CLUSTER_STARTUP_RETRY_TIME * 1000);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
}
} else {
logger.error("Unable to start clustering - continuing in local mode", e);
state = State.stopped;
}
}
} while (retry++ < CLUSTER_STARTUP_RETRY_COUNT);
} while (retry++ < CLUSTER_STARTUP_RETRY_COUNT && !Thread.currentThread().isInterrupted());
if (oldLoader != null) {
// Restore previous class loader
Thread.currentThread().setContextClassLoader(oldLoader);
......@@ -165,6 +171,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
return cluster != null;
}
@Override
public void stopCluster() {
// Stop the cache services.
cacheStats = null;
......@@ -175,11 +182,11 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
cluster = null;
if (clusterListener != null) {
// Wait until the server has updated its internal state
while (!clusterListener.isDone()) {
while (!clusterListener.isDone() && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener);
......@@ -199,14 +206,14 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
ExternalizableUtil.getInstance().setStrategy(serializationStrategy);
}
@Override
public Cache createCache(String name) {
// Check if cluster is being started up
while (state == State.starting) {
// Wait until cluster is fully started (or failed)
try {
Thread.sleep(250);
}
catch (InterruptedException e) {
} catch (InterruptedException e) {
// Ignore
}
}
......@@ -225,46 +232,52 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
return new ClusteredCache(name, hazelcast.getMap(name), hazelcastLifetimeInSeconds);
}
@Override
public void destroyCache(Cache cache) {
if (cache instanceof CacheWrapper) {
cache = ((CacheWrapper)cache).getWrappedCache();
cache = ((CacheWrapper) cache).getWrappedCache();
}
ClusteredCache clustered = (ClusteredCache)cache;
ClusteredCache clustered = (ClusteredCache) cache;
clustered.destroy();
}
@Override
public boolean isSeniorClusterMember() {
if (cluster == null) { return false; }
if (cluster == null) {
return false;
}
// first cluster member is the oldest
Iterator<Member> members = cluster.getMembers().iterator();
return members.next().getUuid().equals(cluster.getLocalMember().getUuid());
}
public Collection<ClusterNodeInfo> getClusterNodesInfo() {
return clusterListener == null ? Collections.EMPTY_LIST : clusterListener.getClusterNodesInfo();
@Override
public List<ClusterNodeInfo> getClusterNodesInfo() {
return clusterListener == null ? Collections.<ClusterNodeInfo>emptyList() : clusterListener.getClusterNodesInfo();
}
@Override
public int getMaxClusterNodes() {
// No longer depends on license code so just return a big number
return 10000;
}
@Override
public byte[] getSeniorClusterMemberID() {
if (cluster != null && !cluster.getMembers().isEmpty()) {
Member oldest = cluster.getMembers().iterator().next();
return StringUtils.getBytes(oldest.getUuid());
}
else {
return oldest.getUuid().getBytes(StandardCharsets.UTF_8);
} else {
return null;
}
}
@Override
public byte[] getClusterMemberID() {
if (cluster != null) {
return StringUtils.getBytes(cluster.getLocalMember().getUuid());
}
else {
return cluster.getLocalMember().getUuid().getBytes(StandardCharsets.UTF_8);
} else {
return null;
}
}
......@@ -273,9 +286,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Gets the pseudo-synchronized time from the cluster. While the cluster members may
* have varying system times, this method is expected to return a timestamp that is
* synchronized (or nearly so; best effort) across the cluster.
*
*
* @return Synchronized time for all cluster members
*/
@Override
public long getClusterTime() {
return cluster == null ? System.currentTimeMillis() : cluster.getClusterTime();
}
......@@ -285,20 +299,22 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method does not provide the result set for the given
* task, as the task is run asynchronously across the cluster.
*/
public void doClusterTask(final ClusterTask task) {
if (cluster == null) { return; }
Set<Member> members = new HashSet<Member>();
@Override
public void doClusterTask(final ClusterTask<?> task) {
if (cluster == null) {
return;
}
Set<Member> members = new HashSet<>();
Member current = cluster.getLocalMember();
for(Member member : cluster.getMembers()) {
for (Member member : cluster.getMembers()) {
if (!member.getUuid().equals(current.getUuid())) {
members.add(member);
}
}
if (members.size() > 0) {
if (!members.isEmpty()) {
// Asynchronously execute the task on the other cluster members
logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(
new CallableTask<Object>(task), members);
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(new CallableTask<>(task), members);
} else {
logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
}
......@@ -309,17 +325,19 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method does not provide the result set for the given
* task, as the task is run asynchronously across the cluster.
*/
public void doClusterTask(final ClusterTask task, byte[] nodeID) {
if (cluster == null) { return; }
@Override
public void doClusterTask(final ClusterTask<?> task, byte[] nodeID) {
if (cluster == null) {
return;
}
Member member = getMember(nodeID);
// Check that the requested member was found
if (member != null) {
// Asynchronously execute the task on the target member
logger.debug("Executing asynchronous DistributedTask: " + task.getClass().getName());
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMember(
new CallableTask<Object>(task), member);
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMember(new CallableTask<>(task), member);
} else {
String msg = MessageFormat.format("Requested node {0} not found in cluster", StringUtils.getString(nodeID));
String msg = MessageFormat.format("Requested node {0} not found in cluster", new String(nodeID, StandardCharsets.UTF_8));
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
......@@ -330,24 +348,26 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method blocks for up to MAX_CLUSTER_EXECUTION_TIME
* (seconds) per member until the task is run on all members.
*/
public Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) {
if (cluster == null) { return Collections.emptyList(); }
Set<Member> members = new HashSet<Member>();
@Override
public Collection<Object> doSynchronousClusterTask(ClusterTask<?> task, boolean includeLocalMember) {
if (cluster == null) {
return Collections.emptyList();
}
Set<Member> members = new HashSet<>();
Member current = cluster.getLocalMember();
for(Member member : cluster.getMembers()) {
for (Member member : cluster.getMembers()) {
if (includeLocalMember || (!member.getUuid().equals(current.getUuid()))) {
members.add(member);
}
}
Collection<Object> result = new ArrayList<Object>();
if (members.size() > 0) {
Collection<Object> result = new ArrayList<>();
if (!members.isEmpty()) {
// Asynchronously execute the task on the other cluster members
try {
logger.debug("Executing MultiTask: " + task.getClass().getName());
Map<Member, Future<Object>> futures = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME)
.submitToMembers(new CallableTask<Object>(task), members);
long nanosLeft = TimeUnit.SECONDS.toNanos(MAX_CLUSTER_EXECUTION_TIME*members.size());
for (Future<Object> future : futures.values()) {
Map<Member, ? extends Future<?>> futures = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(new CallableTask<>(task), members);
long nanosLeft = TimeUnit.SECONDS.toNanos(MAX_CLUSTER_EXECUTION_TIME * members.size());
for (Future<?> future : futures.values()) {
long start = System.nanoTime();
result.add(future.get(nanosLeft, TimeUnit.NANOSECONDS));
nanosLeft = nanosLeft - (System.nanoTime() - start);
......@@ -368,17 +388,19 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method blocks for up to MAX_CLUSTER_EXECUTION_TIME
* (seconds) until the task is run on the given member.
*/
public Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
if (cluster == null) { return null; }
@Override
public Object doSynchronousClusterTask(ClusterTask<?> task, byte[] nodeID) {
if (cluster == null) {
return null;
}
Member member = getMember(nodeID);
Object result = null;
// Check that the requested member was found
if (member != null) {
// Asynchronously execute the task on the target member
logger.debug("Executing DistributedTask: " + task.getClass().getName());
try {
Future<Object> future = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME)
.submitToMember(new CallableTask<Object>(task), member);
try {
Future<?> future = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMember(new CallableTask<>(task), member);
result = future.get(MAX_CLUSTER_EXECUTION_TIME, TimeUnit.SECONDS);
logger.debug("DistributedTask result: " + (result == null ? "null" : result));
} catch (TimeoutException te) {
......@@ -387,15 +409,18 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
logger.error("Failed to execute cluster task", e);
}
} else {
String msg = MessageFormat.format("Requested node {0} not found in cluster", StringUtils.getString(nodeID));
String msg = MessageFormat.format("Requested node {0} not found in cluster", new String(nodeID, StandardCharsets.UTF_8));
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
return result;
}
@Override
public ClusterNodeInfo getClusterNodeInfo(byte[] nodeID) {
if (cluster == null) { return null; }
if (cluster == null) {
return null;
}
ClusterNodeInfo result = null;
Member member = getMember(nodeID);
if (member != null) {
......@@ -403,11 +428,11 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
}
return result;
}
private Member getMember(byte[] nodeID) {
Member result = null;
for(Member member: cluster.getMembers()) {
if (Arrays.equals(StringUtils.getBytes(member.getUuid()), nodeID)) {
for (Member member : cluster.getMembers()) {
if (Arrays.equals(member.getUuid().getBytes(StandardCharsets.UTF_8), nodeID)) {
result = member;
break;
}
......@@ -415,20 +440,21 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
return result;
}
@Override
public void updateCacheStats(Map<String, Cache> caches) {
if (caches.size() > 0 && cluster != null) {
if (!caches.isEmpty() && cluster != null) {
// Create the cacheStats map if necessary.
if (cacheStats == null) {
cacheStats = hazelcast.getMap("opt-$cacheStats");
}
String uid = cluster.getLocalMember().getUuid();
Map<String, long[]> stats = new HashMap<String, long[]>();
Map<String, long[]> stats = new HashMap<>();
for (String cacheName : caches.keySet()) {
Cache cache = caches.get(cacheName);
// The following information is published:
// current size, max size, num elements, cache
// hits, cache misses.
long [] info = new long[5];
long[] info = new long[5];
info[0] = cache.getCacheSize();
info[1] = cache.getMaxCacheSize();
info[2] = cache.size();
......@@ -441,13 +467,15 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
}
}
@Override
public String getPluginName() {
return "hazelcast";
}
@Override
public Lock getLock(Object key, Cache cache) {
if (cache instanceof CacheWrapper) {
cache = ((CacheWrapper)cache).getWrappedCache();
cache = ((CacheWrapper) cache).getWrappedCache();
}
return new ClusterLock(key, (ClusteredCache) cache);
}
......@@ -457,31 +485,37 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
private Object key;
private ClusteredCache cache;
public ClusterLock(Object key, ClusteredCache cache) {
ClusterLock(Object key, ClusteredCache cache) {
this.key = key;
this.cache = cache;
}
@Override
public void lock() {
cache.lock(key, -1);
}
public void lockInterruptibly() throws InterruptedException {
@Override
public void lockInterruptibly() {
cache.lock(key, -1);
}
@Override
public boolean tryLock() {
return cache.lock(key, 0);
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
@Override
public boolean tryLock(long time, TimeUnit unit) {
return cache.lock(key, unit.toMillis(time));
}
@Override
public void unlock() {
cache.unlock(key);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
......@@ -489,19 +523,20 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
private static class CallableTask<V> implements Callable<V>, Serializable {
private ClusterTask<V> task;
public CallableTask(ClusterTask<V> task) {
CallableTask(ClusterTask<V> task) {
this.task = task;
}
@Override
public V call() {
task.run();
logger.debug("CallableTask[" + task.getClass().getName() + "] result: " + task.getResult());
return task.getResult();
}
}
private static enum State {
private enum State {
stopped,
starting,
started
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment