Commit 1c1e832c authored by Greg Thomas's avatar Greg Thomas Committed by akrherz

HZ-5: No functional changes, just tidy up the code before fixing the issue

parent 2c621353
...@@ -15,18 +15,23 @@ ...@@ -15,18 +15,23 @@
*/ */
package org.jivesoftware.openfire.plugin.util.cache; package org.jivesoftware.openfire.plugin.util.cache;
import java.util.ArrayList; import com.hazelcast.core.Cluster;
import java.util.Collection; import com.hazelcast.core.EntryEvent;
import java.util.Collections; import com.hazelcast.core.EntryEventType;
import java.util.HashMap; import com.hazelcast.core.EntryListener;
import java.util.HashSet; import com.hazelcast.core.LifecycleEvent;
import java.util.List; import com.hazelcast.core.LifecycleEvent.LifecycleState;
import java.util.Map; import com.hazelcast.core.LifecycleListener;
import java.util.Set; import com.hazelcast.core.MapEvent;
import java.util.concurrent.ConcurrentHashMap; import com.hazelcast.core.Member;
import java.util.concurrent.locks.Lock; import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import org.jivesoftware.openfire.*; import com.hazelcast.core.MembershipListener;
import org.jivesoftware.openfire.PacketException;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager; import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo; import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
...@@ -40,7 +45,6 @@ import org.jivesoftware.openfire.session.RemoteSessionLocator; ...@@ -40,7 +45,6 @@ import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory; import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.spi.ClientRoute; import org.jivesoftware.openfire.spi.ClientRoute;
import org.jivesoftware.openfire.spi.RoutingTableImpl; import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.CacheWrapper; import org.jivesoftware.util.cache.CacheWrapper;
...@@ -49,18 +53,17 @@ import org.slf4j.LoggerFactory; ...@@ -49,18 +53,17 @@ import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Presence; import org.xmpp.packet.Presence;
import com.hazelcast.core.Cluster; import java.nio.charset.StandardCharsets;
import com.hazelcast.core.EntryEvent; import java.util.ArrayList;
import com.hazelcast.core.EntryEventType; import java.util.Collection;
import com.hazelcast.core.EntryListener; import java.util.Collections;
import com.hazelcast.core.LifecycleEvent; import java.util.HashMap;
import com.hazelcast.core.LifecycleEvent.LifecycleState; import java.util.HashSet;
import com.hazelcast.core.LifecycleListener; import java.util.List;
import com.hazelcast.core.MapEvent; import java.util.Map;
import com.hazelcast.core.Member; import java.util.Set;
import com.hazelcast.core.MemberAttributeEvent; import java.util.concurrent.ConcurrentHashMap;
import com.hazelcast.core.MembershipEvent; import java.util.concurrent.locks.Lock;
import com.hazelcast.core.MembershipListener;
/** /**
* ClusterListener reacts to membership changes in the cluster. It takes care of cleaning up the state * ClusterListener reacts to membership changes in the cluster. It takes care of cleaning up the state
...@@ -68,7 +71,7 @@ import com.hazelcast.core.MembershipListener; ...@@ -68,7 +71,7 @@ import com.hazelcast.core.MembershipListener;
*/ */
public class ClusterListener implements MembershipListener, LifecycleListener { public class ClusterListener implements MembershipListener, LifecycleListener {
private static Logger logger = LoggerFactory.getLogger(ClusterListener.class); private static final Logger logger = LoggerFactory.getLogger(ClusterListener.class);
private static final int C2S_CACHE_IDX = 0; private static final int C2S_CACHE_IDX = 0;
private static final int ANONYMOUS_C2S_CACHE_IDX = 1; private static final int ANONYMOUS_C2S_CACHE_IDX = 1;
...@@ -82,33 +85,33 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -82,33 +85,33 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
/** /**
* Caches stored in RoutingTable * Caches stored in RoutingTable
*/ */
Cache<String, ClientRoute> C2SCache; private Cache<String, ClientRoute> C2SCache;
Cache<String, ClientRoute> anonymousC2SCache; private Cache<String, ClientRoute> anonymousC2SCache;
Cache<DomainPair, byte[]> S2SCache; private Cache<DomainPair, byte[]> S2SCache;
Cache<String, Set<NodeID>> componentsCache; private Cache<String, Set<NodeID>> componentsCache;
/** /**
* Caches stored in SessionManager * Caches stored in SessionManager
*/ */
Cache<String, ClientSessionInfo> sessionInfoCache; private Cache<String, ClientSessionInfo> sessionInfoCache;
Cache<String, byte[]> componentSessionsCache; private Cache<String, byte[]> componentSessionsCache;
Cache<String, byte[]> multiplexerSessionsCache; private Cache<String, byte[]> multiplexerSessionsCache;
Cache<String, byte[]> incomingServerSessionsCache; private Cache<String, byte[]> incomingServerSessionsCache;
/** /**
* Caches stored in PresenceUpdateHandler * Caches stored in PresenceUpdateHandler
*/ */
Cache<String, Collection<DirectedPresence>> directedPresencesCache; private Cache<String, Collection<DirectedPresence>> directedPresencesCache;
private Map<NodeID, Set<String>[]> nodeSessions = new ConcurrentHashMap<NodeID, Set<String>[]>(); private Map<NodeID, Set<String>[]> nodeSessions = new ConcurrentHashMap<>();
private Map<NodeID, Set<DomainPair>> nodeRoutes = new ConcurrentHashMap<>(); private Map<NodeID, Set<DomainPair>> nodeRoutes = new ConcurrentHashMap<>();
private Map<NodeID, Map<String, Collection<String>>> nodePresences = new ConcurrentHashMap<NodeID, Map<String, Collection<String>>>(); private Map<NodeID, Map<String, Collection<String>>> nodePresences = new ConcurrentHashMap<>();
private boolean seniorClusterMember = CacheFactory.isSeniorClusterMember(); private boolean seniorClusterMember = CacheFactory.isSeniorClusterMember();
private Map<Cache, EntryListener> EntryListeners = new HashMap<Cache, EntryListener>(); private Map<Cache<?,?>, EntryListener> entryListeners = new HashMap<>();
private Cluster cluster; private Cluster cluster;
private Map<String, ClusterNodeInfo> clusterNodesInfo = new ConcurrentHashMap<String, ClusterNodeInfo>(); private Map<String, ClusterNodeInfo> clusterNodesInfo = new ConcurrentHashMap<>();
/** /**
* Flag that indicates if the listener has done all clean up work when noticed that the * Flag that indicates if the listener has done all clean up work when noticed that the
...@@ -117,14 +120,14 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -117,14 +120,14 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
*/ */
private boolean done = true; private boolean done = true;
public ClusterListener(Cluster cluster) { ClusterListener(Cluster cluster) {
this.cluster = cluster; this.cluster = cluster;
for (Member member : cluster.getMembers()) { for (Member member : cluster.getMembers()) {
clusterNodesInfo.put(member.getUuid(), clusterNodesInfo.put(member.getUuid(),
new HazelcastClusterNodeInfo(member, cluster.getClusterTime())); new HazelcastClusterNodeInfo(member, cluster.getClusterTime()));
} }
C2SCache = CacheFactory.createCache(RoutingTableImpl.C2S_CACHE_NAME); C2SCache = CacheFactory.createCache(RoutingTableImpl.C2S_CACHE_NAME);
anonymousC2SCache = CacheFactory.createCache(RoutingTableImpl.ANONYMOUS_C2S_CACHE_NAME); anonymousC2SCache = CacheFactory.createCache(RoutingTableImpl.ANONYMOUS_C2S_CACHE_NAME);
S2SCache = CacheFactory.createCache(RoutingTableImpl.S2S_CACHE_NAME); S2SCache = CacheFactory.createCache(RoutingTableImpl.S2S_CACHE_NAME);
...@@ -140,28 +143,34 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -140,28 +143,34 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
joinCluster(); joinCluster();
} }
private void addEntryListener(Cache cache, EntryListener listener) { private void addEntryListener(Cache<?, ?> cache, EntryListener listener) {
if (cache instanceof CacheWrapper) { if (cache instanceof CacheWrapper) {
Cache wrapped = ((CacheWrapper)cache).getWrappedCache(); Cache wrapped = ((CacheWrapper)cache).getWrappedCache();
if (wrapped instanceof ClusteredCache) { if (wrapped instanceof ClusteredCache) {
((ClusteredCache)wrapped).addEntryListener(listener, false); ((ClusteredCache)wrapped).addEntryListener(listener, false);
// Keep track of the listener that we added to the cache // Keep track of the listener that we added to the cache
EntryListeners.put(cache, listener); entryListeners.put(cache, listener);
} }
} }
} }
private void simulateCacheInserts(Cache cache) { @SuppressWarnings("unchecked")
EntryListener EntryListener = EntryListeners.get(cache); private void simulateCacheInserts(Cache<?, ?> cache) {
if (EntryListener != null) { final EntryListener<?,?> entryListener = entryListeners.get(cache);
if (entryListener != null) {
if (cache instanceof CacheWrapper) { if (cache instanceof CacheWrapper) {
Cache wrapped = ((CacheWrapper) cache).getWrappedCache(); Cache wrapped = ((CacheWrapper) cache).getWrappedCache();
if (wrapped instanceof ClusteredCache) { if (wrapped instanceof ClusteredCache) {
ClusteredCache clusteredCache = (ClusteredCache) wrapped; ClusteredCache clusteredCache = (ClusteredCache) wrapped;
for (Map.Entry entry : (Set<Map.Entry>) cache.entrySet()) { for (Map.Entry<?, ?> entry : cache.entrySet()) {
EntryEvent event = new EntryEvent(clusteredCache.map.getName(), cluster.getLocalMember(), EntryEvent event = new EntryEvent<>(
EntryEventType.ADDED.getType(), entry.getKey(), null, entry.getValue()); clusteredCache.map.getName(),
EntryListener.entryAdded(event); cluster.getLocalMember(),
EntryEventType.ADDED.getType(),
entry.getKey(),
null,
entry.getValue());
entryListener.entryAdded(event);
} }
} }
} }
...@@ -200,8 +209,9 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -200,8 +209,9 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
} }
} }
@SuppressWarnings("unchecked")
private Set<String>[] insertJIDList(NodeID nodeKey) { private Set<String>[] insertJIDList(NodeID nodeKey) {
Set<String>[] allLists = new Set[] { Set<String>[] allLists = new Set[] {
new HashSet<String>(), new HashSet<String>(),
new HashSet<String>(), new HashSet<String>(),
new HashSet<String>(), new HashSet<String>(),
...@@ -264,7 +274,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -264,7 +274,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> registeredUsers = lookupJIDList(key, C2SCache.getName()); Set<String> registeredUsers = lookupJIDList(key, C2SCache.getName());
if (!registeredUsers.isEmpty()) { if (!registeredUsers.isEmpty()) {
for (String fullJID : new ArrayList<String>(registeredUsers)) { for (String fullJID : new ArrayList<>(registeredUsers)) {
JID offlineJID = new JID(fullJID); JID offlineJID = new JID(fullJID);
manager.removeSession(null, offlineJID, false, true); manager.removeSession(null, offlineJID, false, true);
} }
...@@ -272,7 +282,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -272,7 +282,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> anonymousUsers = lookupJIDList(key, anonymousC2SCache.getName()); Set<String> anonymousUsers = lookupJIDList(key, anonymousC2SCache.getName());
if (!anonymousUsers.isEmpty()) { if (!anonymousUsers.isEmpty()) {
for (String fullJID : new ArrayList<String>(anonymousUsers)) { for (String fullJID : new ArrayList<>(anonymousUsers)) {
JID offlineJID = new JID(fullJID); JID offlineJID = new JID(fullJID);
manager.removeSession(null, offlineJID, true, true); manager.removeSession(null, offlineJID, true, true);
} }
...@@ -290,11 +300,11 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -290,11 +300,11 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> components = lookupJIDList(key, componentsCache.getName()); Set<String> components = lookupJIDList(key, componentsCache.getName());
if (!components.isEmpty()) { if (!components.isEmpty()) {
for (String address : new ArrayList<String>(components)) { for (String address : new ArrayList<>(components)) {
Lock lock = CacheFactory.getLock(address, componentsCache); Lock lock = CacheFactory.getLock(address, componentsCache);
try { try {
lock.lock(); lock.lock();
Set<NodeID> nodes = (Set<NodeID>) componentsCache.get(address); Set<NodeID> nodes = componentsCache.get(address);
if (nodes != null) { if (nodes != null) {
nodes.remove(key); nodes.remove(key);
if (nodes.isEmpty()) { if (nodes.isEmpty()) {
...@@ -312,7 +322,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -312,7 +322,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> sessionInfo = lookupJIDList(key, sessionInfoCache.getName()); Set<String> sessionInfo = lookupJIDList(key, sessionInfoCache.getName());
if (!sessionInfo.isEmpty()) { if (!sessionInfo.isEmpty()) {
for (String session : new ArrayList<String>(sessionInfo)) { for (String session : new ArrayList<>(sessionInfo)) {
sessionInfoCache.remove(session); sessionInfoCache.remove(session);
// Registered sessions will be removed // Registered sessions will be removed
// by the clean up of the session info cache // by the clean up of the session info cache
...@@ -321,7 +331,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -321,7 +331,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> componentSessions = lookupJIDList(key, componentSessionsCache.getName()); Set<String> componentSessions = lookupJIDList(key, componentSessionsCache.getName());
if (!componentSessions.isEmpty()) { if (!componentSessions.isEmpty()) {
for (String domain : new ArrayList<String>(componentSessions)) { for (String domain : new ArrayList<>(componentSessions)) {
componentSessionsCache.remove(domain); componentSessionsCache.remove(domain);
// Registered subdomains of external component will be removed // Registered subdomains of external component will be removed
// by the clean up of the component cache // by the clean up of the component cache
...@@ -330,7 +340,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -330,7 +340,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> multiplexers = lookupJIDList(key, multiplexerSessionsCache.getName()); Set<String> multiplexers = lookupJIDList(key, multiplexerSessionsCache.getName());
if (!multiplexers.isEmpty()) { if (!multiplexers.isEmpty()) {
for (String fullJID : new ArrayList<String>(multiplexers)) { for (String fullJID : new ArrayList<>(multiplexers)) {
multiplexerSessionsCache.remove(fullJID); multiplexerSessionsCache.remove(fullJID);
// c2s connections connected to node that went down will be cleaned up // c2s connections connected to node that went down will be cleaned up
// by the c2s logic above. If the CM went down and the node is up then // by the c2s logic above. If the CM went down and the node is up then
...@@ -362,7 +372,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -362,7 +372,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
private void cleanupPresences(NodeID key) { private void cleanupPresences(NodeID key) {
Set<String> registeredUsers = lookupJIDList(key, C2SCache.getName()); Set<String> registeredUsers = lookupJIDList(key, C2SCache.getName());
if (!registeredUsers.isEmpty()) { if (!registeredUsers.isEmpty()) {
for (String fullJID : new ArrayList<String>(registeredUsers)) { for (String fullJID : new ArrayList<>(registeredUsers)) {
JID offlineJID = new JID(fullJID); JID offlineJID = new JID(fullJID);
try { try {
Presence presence = new Presence(Presence.Type.unavailable); Presence presence = new Presence(Presence.Type.unavailable);
...@@ -377,7 +387,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -377,7 +387,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> anonymousUsers = lookupJIDList(key, anonymousC2SCache.getName()); Set<String> anonymousUsers = lookupJIDList(key, anonymousC2SCache.getName());
if (!anonymousUsers.isEmpty()) { if (!anonymousUsers.isEmpty()) {
for (String fullJID : new ArrayList<String>(anonymousUsers)) { for (String fullJID : new ArrayList<>(anonymousUsers)) {
JID offlineJID = new JID(fullJID); JID offlineJID = new JID(fullJID);
try { try {
Presence presence = new Presence(Presence.Type.unavailable); Presence presence = new Presence(Presence.Type.unavailable);
...@@ -396,16 +406,17 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -396,16 +406,17 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
/** /**
* EntryListener implementation tracks events for caches of c2s sessions. * EntryListener implementation tracks events for caches of c2s sessions.
*/ */
private class DirectedPresenceListener implements EntryListener { private class DirectedPresenceListener implements EntryListener<String, Collection<DirectedPresence>> {
public void entryAdded(EntryEvent event) { @Override
byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid()); public void entryAdded(EntryEvent<String, Collection<DirectedPresence>> event) {
byte[] nodeID = event.getMember().getUuid().getBytes(StandardCharsets.UTF_8);
// Ignore events originated from this JVM // Ignore events originated from this JVM
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
// Check if the directed presence was sent to an entity hosted by this JVM // Check if the directed presence was sent to an entity hosted by this JVM
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable(); RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
String sender = event.getKey().toString(); String sender = event.getKey();
Collection<String> handlers = new HashSet<String>(); Collection<String> handlers = new HashSet<>();
for (JID handler : getHandlers(event)) { for (JID handler : getHandlers(event)) {
if (routingTable.isLocalRoute(handler)) { if (routingTable.isLocalRoute(handler)) {
// Keep track of the remote sender and local handler that got the directed presence // Keep track of the remote sender and local handler that got the directed presence
...@@ -415,7 +426,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -415,7 +426,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
if (!handlers.isEmpty()) { if (!handlers.isEmpty()) {
Map<String, Collection<String>> senders = nodePresences.get(NodeID.getInstance(nodeID)); Map<String, Collection<String>> senders = nodePresences.get(NodeID.getInstance(nodeID));
if (senders == null) { if (senders == null) {
senders = new ConcurrentHashMap<String, Collection<String>>(); senders = new ConcurrentHashMap<>();
nodePresences.put(NodeID.getInstance(nodeID), senders); nodePresences.put(NodeID.getInstance(nodeID), senders);
} }
senders.put(sender, handlers); senders.put(sender, handlers);
...@@ -423,14 +434,15 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -423,14 +434,15 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
} }
} }
public void entryUpdated(EntryEvent event) { @Override
byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid()); public void entryUpdated(EntryEvent<String, Collection<DirectedPresence>> event) {
byte[] nodeID = event.getMember().getUuid().getBytes(StandardCharsets.UTF_8);
// Ignore events originated from this JVM // Ignore events originated from this JVM
if (nodeID != null && !XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
// Check if the directed presence was sent to an entity hosted by this JVM // Check if the directed presence was sent to an entity hosted by this JVM
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable(); RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
String sender = event.getKey().toString(); String sender = event.getKey();
Collection<String> handlers = new HashSet<String>(); Collection<String> handlers = new HashSet<>();
for (JID handler : getHandlers(event)) { for (JID handler : getHandlers(event)) {
if (routingTable.isLocalRoute(handler)) { if (routingTable.isLocalRoute(handler)) {
// Keep track of the remote sender and local handler that got the directed presence // Keep track of the remote sender and local handler that got the directed presence
...@@ -439,7 +451,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -439,7 +451,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
} }
Map<String, Collection<String>> senders = nodePresences.get(NodeID.getInstance(nodeID)); Map<String, Collection<String>> senders = nodePresences.get(NodeID.getInstance(nodeID));
if (senders == null) { if (senders == null) {
senders = new ConcurrentHashMap<String, Collection<String>>(); senders = new ConcurrentHashMap<>();
nodePresences.put(NodeID.getInstance(nodeID), senders); nodePresences.put(NodeID.getInstance(nodeID), senders);
} }
if (!handlers.isEmpty()) { if (!handlers.isEmpty()) {
...@@ -452,32 +464,33 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -452,32 +464,33 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
} }
} }
public void entryRemoved(EntryEvent event) { @Override
public void entryRemoved(EntryEvent<String, Collection<DirectedPresence>> event) {
if (event == null || (event.getValue() == null && event.getOldValue() == null)) { if (event == null || (event.getValue() == null && event.getOldValue() == null)) {
// Nothing to remove // Nothing to remove
return; return;
} }
byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid()); byte[] nodeID = event.getMember().getUuid().getBytes(StandardCharsets.UTF_8);
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
String sender = event.getKey().toString(); String sender = event.getKey();
nodePresences.get(NodeID.getInstance(nodeID)).remove(sender); nodePresences.get(NodeID.getInstance(nodeID)).remove(sender);
} }
} }
Collection<JID> getHandlers(EntryEvent event) { Collection<JID> getHandlers(EntryEvent<String, Collection<DirectedPresence>> event) {
Object value = event.getValue(); Collection<DirectedPresence> value = event.getValue();
Collection<JID> answer = new ArrayList<JID>(); Collection<JID> answer = new ArrayList<>();
if (value != null) { if (value != null) {
for (DirectedPresence directedPresence : (Collection<DirectedPresence>)value) { for (DirectedPresence directedPresence : value) {
answer.add(directedPresence.getHandler()); answer.add(directedPresence.getHandler());
} }
} }
return answer; return answer;
} }
Set<String> getReceivers(EntryEvent event, JID handler) { Set<String> getReceivers(EntryEvent<String, Collection<DirectedPresence>> event, JID handler) {
Object value = event.getValue(); Collection<DirectedPresence> value = event.getValue();
for (DirectedPresence directedPresence : (Collection<DirectedPresence>)value) { for (DirectedPresence directedPresence : value) {
if (directedPresence.getHandler().equals(handler)) { if (directedPresence.getHandler().equals(handler)) {
return directedPresence.getReceivers(); return directedPresence.getReceivers();
} }
...@@ -485,18 +498,19 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -485,18 +498,19 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
return Collections.emptySet(); return Collections.emptySet();
} }
public void entryEvicted(EntryEvent event) { @Override
public void entryEvicted(EntryEvent<String, Collection<DirectedPresence>> event) {
entryRemoved(event); entryRemoved(event);
} }
private void mapClearedOrEvicted(MapEvent event) { private void mapClearedOrEvicted(MapEvent event) {
NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid())); NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8));
// ignore events which were triggered by this node // ignore events which were triggered by this node
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
nodePresences.get(nodeID).clear(); nodePresences.get(nodeID).clear();
} }
} }
@Override @Override
public void mapEvicted(MapEvent event) { public void mapEvicted(MapEvent event) {
mapClearedOrEvicted(event); mapClearedOrEvicted(event);
...@@ -511,24 +525,26 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -511,24 +525,26 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
/** /**
* EntryListener implementation tracks events for caches of internal/external components. * EntryListener implementation tracks events for caches of internal/external components.
*/ */
private class ComponentCacheListener implements EntryListener { private class ComponentCacheListener implements EntryListener<String, Set<NodeID>> {
public void entryAdded(EntryEvent event) { @Override
Object newValue = event.getValue(); public void entryAdded(EntryEvent<String, Set<NodeID>> event) {
Set<NodeID> newValue = event.getValue();
if (newValue != null) { if (newValue != null) {
for (NodeID nodeID : (Set<NodeID>) newValue) { for (NodeID nodeID : newValue) {
//ignore items which this node has added //ignore items which this node has added
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<String> sessionJIDS = lookupJIDList(nodeID, componentsCache.getName()); Set<String> sessionJIDS = lookupJIDList(nodeID, componentsCache.getName());
sessionJIDS.add(event.getKey().toString()); sessionJIDS.add(event.getKey());
} }
} }
} }
} }
public void entryUpdated(EntryEvent event) { @Override
public void entryUpdated(EntryEvent<String, Set<NodeID>> event) {
// Remove any trace to the component that was added/deleted to some node // Remove any trace to the component that was added/deleted to some node
String domain = event.getKey().toString(); String domain = event.getKey();
for (Map.Entry<NodeID, Set<String>[]> entry : nodeSessions.entrySet()) { for (Map.Entry<NodeID, Set<String>[]> entry : nodeSessions.entrySet()) {
// Get components hosted in this node // Get components hosted in this node
Set<String> nodeComponents = entry.getValue()[COMPONENT_CACHE_IDX]; Set<String> nodeComponents = entry.getValue()[COMPONENT_CACHE_IDX];
...@@ -538,32 +554,34 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -538,32 +554,34 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
entryAdded(event); entryAdded(event);
} }
public void entryRemoved(EntryEvent event) { @Override
Object newValue = event.getValue(); public void entryRemoved(EntryEvent<String, Set<NodeID>> event) {
Set<NodeID> newValue = event.getValue();
if (newValue != null) { if (newValue != null) {
for (NodeID nodeID : (Set<NodeID>) newValue) { for (NodeID nodeID : newValue) {
//ignore items which this node has added //ignore items which this node has added
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<String> sessionJIDS = lookupJIDList(nodeID, componentsCache.getName()); Set<String> sessionJIDS = lookupJIDList(nodeID, componentsCache.getName());
sessionJIDS.remove(event.getKey().toString()); sessionJIDS.remove(event.getKey());
} }
} }
} }
} }
public void entryEvicted(EntryEvent event) { @Override
public void entryEvicted(EntryEvent<String, Set<NodeID>> event) {
entryRemoved(event); entryRemoved(event);
} }
private void mapClearedOrEvicted(MapEvent event) { private void mapClearedOrEvicted(MapEvent event) {
NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid())); NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8));
// ignore events which were triggered by this node // ignore events which were triggered by this node
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<String> sessionJIDs = lookupJIDList(nodeID, componentsCache.getName()); Set<String> sessionJIDs = lookupJIDList(nodeID, componentsCache.getName());
sessionJIDs.clear(); sessionJIDs.clear();
} }
} }
@Override @Override
public void mapEvicted(MapEvent event) { public void mapEvicted(MapEvent event) {
mapClearedOrEvicted(event); mapClearedOrEvicted(event);
...@@ -620,7 +638,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -620,7 +638,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
} }
seniorClusterMember = false; seniorClusterMember = false;
// Clean up all traces. This will set all remote sessions as unavailable // Clean up all traces. This will set all remote sessions as unavailable
List<NodeID> nodeIDs = new ArrayList<NodeID>(nodeSessions.keySet()); List<NodeID> nodeIDs = new ArrayList<>(nodeSessions.keySet());
// Trigger event. Wait until the listeners have processed the event. Caches will be populated // Trigger event. Wait until the listeners have processed the event. Caches will be populated
// again with local content. // again with local content.
...@@ -642,25 +660,27 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -642,25 +660,27 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
done = true; done = true;
} }
@Override
public void memberAdded(MembershipEvent event) { public void memberAdded(MembershipEvent event) {
// local member only // local member only
if (event.getMember().localMember()) { // We left and re-joined the cluster if (event.getMember().localMember()) { // We left and re-joined the cluster
joinCluster(); joinCluster();
} else { } else {
nodePresences.put(NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid())), nodePresences.put(NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8)),
new ConcurrentHashMap<String, Collection<String>>()); new ConcurrentHashMap<String, Collection<String>>());
// Trigger event that a new node has joined the cluster // Trigger event that a new node has joined the cluster
ClusterManager.fireJoinedCluster(StringUtils.getBytes(event.getMember().getUuid()), true); ClusterManager.fireJoinedCluster(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8), true);
} }
clusterNodesInfo.put(event.getMember().getUuid(), clusterNodesInfo.put(event.getMember().getUuid(),
new HazelcastClusterNodeInfo(event.getMember(), cluster.getClusterTime())); new HazelcastClusterNodeInfo(event.getMember(), cluster.getClusterTime()));
} }
@Override
public void memberRemoved(MembershipEvent event) { public void memberRemoved(MembershipEvent event) {
byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid()); byte[] nodeID = event.getMember().getUuid().getBytes(StandardCharsets.UTF_8);
if (event.getMember().localMember()) { if (event.getMember().localMember()) {
logger.info("Leaving cluster: " + nodeID); logger.info("Leaving cluster: " + new String(nodeID, StandardCharsets.UTF_8));
// This node may have realized that it got kicked out of the cluster // This node may have realized that it got kicked out of the cluster
leaveCluster(); leaveCluster();
} else { } else {
...@@ -687,9 +707,10 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -687,9 +707,10 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
} }
public List<ClusterNodeInfo> getClusterNodesInfo() { public List<ClusterNodeInfo> getClusterNodesInfo() {
return new ArrayList<ClusterNodeInfo>(clusterNodesInfo.values()); return new ArrayList<>(clusterNodesInfo.values());
} }
@Override
public void stateChanged(LifecycleEvent event) { public void stateChanged(LifecycleEvent event) {
if (event.getState().equals(LifecycleState.SHUTDOWN)) { if (event.getState().equals(LifecycleState.SHUTDOWN)) {
leaveCluster(); leaveCluster();
...@@ -705,28 +726,32 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -705,28 +726,32 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
new HazelcastClusterNodeInfo(event.getMember(), priorNodeInfo.getJoinedTime())); new HazelcastClusterNodeInfo(event.getMember(), priorNodeInfo.getJoinedTime()));
} }
class S2SCacheListener implements EntryListener { class S2SCacheListener implements EntryListener<DomainPair, byte[]> {
public S2SCacheListener() { S2SCacheListener() {
} }
public void entryAdded(EntryEvent event) { @Override
public void entryAdded(EntryEvent<DomainPair, byte[]> event) {
handleEntryEvent(event, false); handleEntryEvent(event, false);
} }
public void entryUpdated(EntryEvent event) { @Override
public void entryUpdated(EntryEvent<DomainPair, byte[]> event) {
handleEntryEvent(event, false); handleEntryEvent(event, false);
} }
public void entryRemoved(EntryEvent event) { @Override
public void entryRemoved(EntryEvent<DomainPair, byte[]> event) {
handleEntryEvent(event, true); handleEntryEvent(event, true);
} }
public void entryEvicted(EntryEvent event) { @Override
public void entryEvicted(EntryEvent<DomainPair, byte[]> event) {
handleEntryEvent(event, true); handleEntryEvent(event, true);
} }
private void handleEntryEvent(EntryEvent event, boolean removal) { private void handleEntryEvent(EntryEvent<DomainPair, byte[]> event, boolean removal) {
NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid())); NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8));
// ignore events which were triggered by this node // ignore events which were triggered by this node
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<DomainPair> sessionJIDS = nodeRoutes.get(nodeID); Set<DomainPair> sessionJIDS = nodeRoutes.get(nodeID);
...@@ -735,15 +760,14 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -735,15 +760,14 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
} }
if (removal) { if (removal) {
sessionJIDS.remove(event.getKey()); sessionJIDS.remove(event.getKey());
} } else {
else { sessionJIDS.add(event.getKey());
sessionJIDS.add((DomainPair)event.getKey());
} }
} }
} }
private void handleMapEvent(MapEvent event) { private void handleMapEvent(MapEvent event) {
NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid())); NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes(StandardCharsets.UTF_8));
// ignore events which were triggered by this node // ignore events which were triggered by this node
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<DomainPair> sessionJIDS = nodeRoutes.get(nodeID); Set<DomainPair> sessionJIDS = nodeRoutes.get(nodeID);
......
...@@ -32,7 +32,6 @@ import org.jivesoftware.openfire.plugin.session.RemoteSessionLocator; ...@@ -32,7 +32,6 @@ import org.jivesoftware.openfire.plugin.session.RemoteSessionLocator;
import org.jivesoftware.openfire.plugin.util.cluster.ClusterPacketRouter; import org.jivesoftware.openfire.plugin.util.cluster.ClusterPacketRouter;
import org.jivesoftware.openfire.plugin.util.cluster.HazelcastClusterNodeInfo; import org.jivesoftware.openfire.plugin.util.cluster.HazelcastClusterNodeInfo;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.CacheFactoryStrategy; import org.jivesoftware.util.cache.CacheFactoryStrategy;
...@@ -44,6 +43,7 @@ import org.slf4j.Logger; ...@@ -44,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Serializable; import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
...@@ -52,6 +52,7 @@ import java.util.Collections; ...@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
...@@ -69,18 +70,18 @@ import java.util.concurrent.locks.Lock; ...@@ -69,18 +70,18 @@ import java.util.concurrent.locks.Lock;
*/ */
public class ClusteredCacheFactory implements CacheFactoryStrategy { public class ClusteredCacheFactory implements CacheFactoryStrategy {
public static final String HAZELCAST_EXECUTOR_SERVICE_NAME = private static final String HAZELCAST_EXECUTOR_SERVICE_NAME =
JiveGlobals.getProperty("hazelcast.executor.service.name", "openfire::cluster::executor"); JiveGlobals.getProperty("hazelcast.executor.service.name", "openfire::cluster::executor");
private static final long MAX_CLUSTER_EXECUTION_TIME = private static final long MAX_CLUSTER_EXECUTION_TIME =
JiveGlobals.getLongProperty("hazelcast.max.execution.seconds", 30); JiveGlobals.getLongProperty("hazelcast.max.execution.seconds", 30);
private static final long CLUSTER_STARTUP_RETRY_TIME = private static final long CLUSTER_STARTUP_RETRY_TIME =
JiveGlobals.getLongProperty("hazelcast.startup.retry.seconds", 10); JiveGlobals.getLongProperty("hazelcast.startup.retry.seconds", 10);
private static final long CLUSTER_STARTUP_RETRY_COUNT = private static final long CLUSTER_STARTUP_RETRY_COUNT =
JiveGlobals.getLongProperty("hazelcast.startup.retry.count", 1); JiveGlobals.getLongProperty("hazelcast.startup.retry.count", 1);
private static final String HAZELCAST_CONFIG_FILE = private static final String HAZELCAST_CONFIG_FILE =
JiveGlobals.getProperty("hazelcast.config.xml.filename", "hazelcast-cache-config.xml"); JiveGlobals.getProperty("hazelcast.config.xml.filename", "hazelcast-cache-config.xml");
private static final boolean HAZELCAST_JMX_ENABLED = private static final boolean HAZELCAST_JMX_ENABLED =
JiveGlobals.getBooleanProperty("hazelcast.config.jmx.enabled", false); JiveGlobals.getBooleanProperty("hazelcast.config.jmx.enabled", false);
private static Logger logger = LoggerFactory.getLogger(ClusteredCacheFactory.class); private static Logger logger = LoggerFactory.getLogger(ClusteredCacheFactory.class);
...@@ -106,9 +107,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -106,9 +107,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
*/ */
private State state = State.stopped; private State state = State.stopped;
@Override
public boolean startCluster() { public boolean startCluster() {
state = State.starting; state = State.starting;
// Set the serialization strategy to use for transmitting objects between node clusters // Set the serialization strategy to use for transmitting objects between node clusters
serializationStrategy = ExternalizableUtil.getInstance().getStrategy(); serializationStrategy = ExternalizableUtil.getInstance().getStrategy();
ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil()); ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil());
...@@ -117,7 +119,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -117,7 +119,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
// Set packet router to use to deliver packets to remote cluster nodes // Set packet router to use to deliver packets to remote cluster nodes
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter()); XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter());
ClassLoader oldLoader = null; ClassLoader oldLoader;
// Store previous class loader (in case we change it) // Store previous class loader (in case we change it)
oldLoader = Thread.currentThread().getContextClassLoader(); oldLoader = Thread.currentThread().getContextClassLoader();
ClassLoader loader = new ClusterClassLoader(); ClassLoader loader = new ClusterClassLoader();
...@@ -134,10 +136,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -134,10 +136,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
} }
hazelcast = Hazelcast.newHazelcastInstance(config); hazelcast = Hazelcast.newHazelcastInstance(config);
cluster = hazelcast.getCluster(); cluster = hazelcast.getCluster();
// Update the running state of the cluster // Update the running state of the cluster
state = cluster != null ? State.started : State.stopped; state = cluster != null ? State.started : State.stopped;
// Set the ID of this cluster node // Set the ID of this cluster node
XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID())); XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID()));
// CacheFactory is now using clustered caches. We can add our listeners. // CacheFactory is now using clustered caches. We can add our listeners.
...@@ -146,18 +148,22 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -146,18 +148,22 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
membershipListener = cluster.addMembershipListener(clusterListener); membershipListener = cluster.addMembershipListener(clusterListener);
break; break;
} catch (Exception e) { } catch (Exception e) {
cluster = null;
if (retry < CLUSTER_STARTUP_RETRY_COUNT) { if (retry < CLUSTER_STARTUP_RETRY_COUNT) {
logger.warn("Failed to start clustering (" + e.getMessage() + "); " + logger.warn("Failed to start clustering (" + e.getMessage() + "); " +
"will retry in " + CLUSTER_STARTUP_RETRY_TIME + " seconds"); "will retry in " + CLUSTER_STARTUP_RETRY_TIME + " seconds");
try { Thread.sleep(CLUSTER_STARTUP_RETRY_TIME*1000); } try {
catch (InterruptedException ie) { /* ignore */ } Thread.sleep(CLUSTER_STARTUP_RETRY_TIME * 1000);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
}
} else { } else {
logger.error("Unable to start clustering - continuing in local mode", e); logger.error("Unable to start clustering - continuing in local mode", e);
state = State.stopped; state = State.stopped;
} }
} }
} while (retry++ < CLUSTER_STARTUP_RETRY_COUNT); } while (retry++ < CLUSTER_STARTUP_RETRY_COUNT && !Thread.currentThread().isInterrupted());
if (oldLoader != null) { if (oldLoader != null) {
// Restore previous class loader // Restore previous class loader
Thread.currentThread().setContextClassLoader(oldLoader); Thread.currentThread().setContextClassLoader(oldLoader);
...@@ -165,6 +171,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -165,6 +171,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
return cluster != null; return cluster != null;
} }
@Override
public void stopCluster() { public void stopCluster() {
// Stop the cache services. // Stop the cache services.
cacheStats = null; cacheStats = null;
...@@ -175,11 +182,11 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -175,11 +182,11 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
cluster = null; cluster = null;
if (clusterListener != null) { if (clusterListener != null) {
// Wait until the server has updated its internal state // Wait until the server has updated its internal state
while (!clusterListener.isDone()) { while (!clusterListener.isDone() && !Thread.currentThread().isInterrupted()) {
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (final InterruptedException ignored) {
// Ignore Thread.currentThread().interrupt();
} }
} }
hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener); hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener);
...@@ -199,14 +206,14 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -199,14 +206,14 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
ExternalizableUtil.getInstance().setStrategy(serializationStrategy); ExternalizableUtil.getInstance().setStrategy(serializationStrategy);
} }
@Override
public Cache createCache(String name) { public Cache createCache(String name) {
// Check if cluster is being started up // Check if cluster is being started up
while (state == State.starting) { while (state == State.starting) {
// Wait until cluster is fully started (or failed) // Wait until cluster is fully started (or failed)
try { try {
Thread.sleep(250); Thread.sleep(250);
} } catch (InterruptedException e) {
catch (InterruptedException e) {
// Ignore // Ignore
} }
} }
...@@ -225,46 +232,52 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -225,46 +232,52 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
return new ClusteredCache(name, hazelcast.getMap(name), hazelcastLifetimeInSeconds); return new ClusteredCache(name, hazelcast.getMap(name), hazelcastLifetimeInSeconds);
} }
@Override
public void destroyCache(Cache cache) { public void destroyCache(Cache cache) {
if (cache instanceof CacheWrapper) { if (cache instanceof CacheWrapper) {
cache = ((CacheWrapper)cache).getWrappedCache(); cache = ((CacheWrapper) cache).getWrappedCache();
} }
ClusteredCache clustered = (ClusteredCache)cache; ClusteredCache clustered = (ClusteredCache) cache;
clustered.destroy(); clustered.destroy();
} }
@Override
public boolean isSeniorClusterMember() { public boolean isSeniorClusterMember() {
if (cluster == null) { return false; } if (cluster == null) {
return false;
}
// first cluster member is the oldest // first cluster member is the oldest
Iterator<Member> members = cluster.getMembers().iterator(); Iterator<Member> members = cluster.getMembers().iterator();
return members.next().getUuid().equals(cluster.getLocalMember().getUuid()); return members.next().getUuid().equals(cluster.getLocalMember().getUuid());
} }
public Collection<ClusterNodeInfo> getClusterNodesInfo() { @Override
return clusterListener == null ? Collections.EMPTY_LIST : clusterListener.getClusterNodesInfo(); public List<ClusterNodeInfo> getClusterNodesInfo() {
return clusterListener == null ? Collections.<ClusterNodeInfo>emptyList() : clusterListener.getClusterNodesInfo();
} }
@Override
public int getMaxClusterNodes() { public int getMaxClusterNodes() {
// No longer depends on license code so just return a big number // No longer depends on license code so just return a big number
return 10000; return 10000;
} }
@Override
public byte[] getSeniorClusterMemberID() { public byte[] getSeniorClusterMemberID() {
if (cluster != null && !cluster.getMembers().isEmpty()) { if (cluster != null && !cluster.getMembers().isEmpty()) {
Member oldest = cluster.getMembers().iterator().next(); Member oldest = cluster.getMembers().iterator().next();
return StringUtils.getBytes(oldest.getUuid()); return oldest.getUuid().getBytes(StandardCharsets.UTF_8);
} } else {
else {
return null; return null;
} }
} }
@Override
public byte[] getClusterMemberID() { public byte[] getClusterMemberID() {
if (cluster != null) { if (cluster != null) {
return StringUtils.getBytes(cluster.getLocalMember().getUuid()); return cluster.getLocalMember().getUuid().getBytes(StandardCharsets.UTF_8);
} } else {
else {
return null; return null;
} }
} }
...@@ -273,9 +286,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -273,9 +286,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Gets the pseudo-synchronized time from the cluster. While the cluster members may * Gets the pseudo-synchronized time from the cluster. While the cluster members may
* have varying system times, this method is expected to return a timestamp that is * have varying system times, this method is expected to return a timestamp that is
* synchronized (or nearly so; best effort) across the cluster. * synchronized (or nearly so; best effort) across the cluster.
* *
* @return Synchronized time for all cluster members * @return Synchronized time for all cluster members
*/ */
@Override
public long getClusterTime() { public long getClusterTime() {
return cluster == null ? System.currentTimeMillis() : cluster.getClusterTime(); return cluster == null ? System.currentTimeMillis() : cluster.getClusterTime();
} }
...@@ -285,20 +299,22 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -285,20 +299,22 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method does not provide the result set for the given * Note that this method does not provide the result set for the given
* task, as the task is run asynchronously across the cluster. * task, as the task is run asynchronously across the cluster.
*/ */
public void doClusterTask(final ClusterTask task) { @Override
if (cluster == null) { return; } public void doClusterTask(final ClusterTask<?> task) {
Set<Member> members = new HashSet<Member>(); if (cluster == null) {
return;
}
Set<Member> members = new HashSet<>();
Member current = cluster.getLocalMember(); Member current = cluster.getLocalMember();
for(Member member : cluster.getMembers()) { for (Member member : cluster.getMembers()) {
if (!member.getUuid().equals(current.getUuid())) { if (!member.getUuid().equals(current.getUuid())) {
members.add(member); members.add(member);
} }
} }
if (members.size() > 0) { if (!members.isEmpty()) {
// Asynchronously execute the task on the other cluster members // Asynchronously execute the task on the other cluster members
logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName()); logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers( hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(new CallableTask<>(task), members);
new CallableTask<Object>(task), members);
} else { } else {
logger.warn("No cluster members selected for cluster task " + task.getClass().getName()); logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
} }
...@@ -309,17 +325,19 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -309,17 +325,19 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method does not provide the result set for the given * Note that this method does not provide the result set for the given
* task, as the task is run asynchronously across the cluster. * task, as the task is run asynchronously across the cluster.
*/ */
public void doClusterTask(final ClusterTask task, byte[] nodeID) { @Override
if (cluster == null) { return; } public void doClusterTask(final ClusterTask<?> task, byte[] nodeID) {
if (cluster == null) {
return;
}
Member member = getMember(nodeID); Member member = getMember(nodeID);
// Check that the requested member was found // Check that the requested member was found
if (member != null) { if (member != null) {
// Asynchronously execute the task on the target member // Asynchronously execute the task on the target member
logger.debug("Executing asynchronous DistributedTask: " + task.getClass().getName()); logger.debug("Executing asynchronous DistributedTask: " + task.getClass().getName());
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMember( hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMember(new CallableTask<>(task), member);
new CallableTask<Object>(task), member);
} else { } else {
String msg = MessageFormat.format("Requested node {0} not found in cluster", StringUtils.getString(nodeID)); String msg = MessageFormat.format("Requested node {0} not found in cluster", new String(nodeID, StandardCharsets.UTF_8));
logger.warn(msg); logger.warn(msg);
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
...@@ -330,24 +348,26 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -330,24 +348,26 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method blocks for up to MAX_CLUSTER_EXECUTION_TIME * Note that this method blocks for up to MAX_CLUSTER_EXECUTION_TIME
* (seconds) per member until the task is run on all members. * (seconds) per member until the task is run on all members.
*/ */
public Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) { @Override
if (cluster == null) { return Collections.emptyList(); } public Collection<Object> doSynchronousClusterTask(ClusterTask<?> task, boolean includeLocalMember) {
Set<Member> members = new HashSet<Member>(); if (cluster == null) {
return Collections.emptyList();
}
Set<Member> members = new HashSet<>();
Member current = cluster.getLocalMember(); Member current = cluster.getLocalMember();
for(Member member : cluster.getMembers()) { for (Member member : cluster.getMembers()) {
if (includeLocalMember || (!member.getUuid().equals(current.getUuid()))) { if (includeLocalMember || (!member.getUuid().equals(current.getUuid()))) {
members.add(member); members.add(member);
} }
} }
Collection<Object> result = new ArrayList<Object>(); Collection<Object> result = new ArrayList<>();
if (members.size() > 0) { if (!members.isEmpty()) {
// Asynchronously execute the task on the other cluster members // Asynchronously execute the task on the other cluster members
try { try {
logger.debug("Executing MultiTask: " + task.getClass().getName()); logger.debug("Executing MultiTask: " + task.getClass().getName());
Map<Member, Future<Object>> futures = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME) Map<Member, ? extends Future<?>> futures = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(new CallableTask<>(task), members);
.submitToMembers(new CallableTask<Object>(task), members); long nanosLeft = TimeUnit.SECONDS.toNanos(MAX_CLUSTER_EXECUTION_TIME * members.size());
long nanosLeft = TimeUnit.SECONDS.toNanos(MAX_CLUSTER_EXECUTION_TIME*members.size()); for (Future<?> future : futures.values()) {
for (Future<Object> future : futures.values()) {
long start = System.nanoTime(); long start = System.nanoTime();
result.add(future.get(nanosLeft, TimeUnit.NANOSECONDS)); result.add(future.get(nanosLeft, TimeUnit.NANOSECONDS));
nanosLeft = nanosLeft - (System.nanoTime() - start); nanosLeft = nanosLeft - (System.nanoTime() - start);
...@@ -368,17 +388,19 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -368,17 +388,19 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method blocks for up to MAX_CLUSTER_EXECUTION_TIME * Note that this method blocks for up to MAX_CLUSTER_EXECUTION_TIME
* (seconds) until the task is run on the given member. * (seconds) until the task is run on the given member.
*/ */
public Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) { @Override
if (cluster == null) { return null; } public Object doSynchronousClusterTask(ClusterTask<?> task, byte[] nodeID) {
if (cluster == null) {
return null;
}
Member member = getMember(nodeID); Member member = getMember(nodeID);
Object result = null; Object result = null;
// Check that the requested member was found // Check that the requested member was found
if (member != null) { if (member != null) {
// Asynchronously execute the task on the target member // Asynchronously execute the task on the target member
logger.debug("Executing DistributedTask: " + task.getClass().getName()); logger.debug("Executing DistributedTask: " + task.getClass().getName());
try { try {
Future<Object> future = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME) Future<?> future = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMember(new CallableTask<>(task), member);
.submitToMember(new CallableTask<Object>(task), member);
result = future.get(MAX_CLUSTER_EXECUTION_TIME, TimeUnit.SECONDS); result = future.get(MAX_CLUSTER_EXECUTION_TIME, TimeUnit.SECONDS);
logger.debug("DistributedTask result: " + (result == null ? "null" : result)); logger.debug("DistributedTask result: " + (result == null ? "null" : result));
} catch (TimeoutException te) { } catch (TimeoutException te) {
...@@ -387,15 +409,18 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -387,15 +409,18 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
logger.error("Failed to execute cluster task", e); logger.error("Failed to execute cluster task", e);
} }
} else { } else {
String msg = MessageFormat.format("Requested node {0} not found in cluster", StringUtils.getString(nodeID)); String msg = MessageFormat.format("Requested node {0} not found in cluster", new String(nodeID, StandardCharsets.UTF_8));
logger.warn(msg); logger.warn(msg);
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
return result; return result;
} }
@Override
public ClusterNodeInfo getClusterNodeInfo(byte[] nodeID) { public ClusterNodeInfo getClusterNodeInfo(byte[] nodeID) {
if (cluster == null) { return null; } if (cluster == null) {
return null;
}
ClusterNodeInfo result = null; ClusterNodeInfo result = null;
Member member = getMember(nodeID); Member member = getMember(nodeID);
if (member != null) { if (member != null) {
...@@ -403,11 +428,11 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -403,11 +428,11 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
} }
return result; return result;
} }
private Member getMember(byte[] nodeID) { private Member getMember(byte[] nodeID) {
Member result = null; Member result = null;
for(Member member: cluster.getMembers()) { for (Member member : cluster.getMembers()) {
if (Arrays.equals(StringUtils.getBytes(member.getUuid()), nodeID)) { if (Arrays.equals(member.getUuid().getBytes(StandardCharsets.UTF_8), nodeID)) {
result = member; result = member;
break; break;
} }
...@@ -415,20 +440,21 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -415,20 +440,21 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
return result; return result;
} }
@Override
public void updateCacheStats(Map<String, Cache> caches) { public void updateCacheStats(Map<String, Cache> caches) {
if (caches.size() > 0 && cluster != null) { if (!caches.isEmpty() && cluster != null) {
// Create the cacheStats map if necessary. // Create the cacheStats map if necessary.
if (cacheStats == null) { if (cacheStats == null) {
cacheStats = hazelcast.getMap("opt-$cacheStats"); cacheStats = hazelcast.getMap("opt-$cacheStats");
} }
String uid = cluster.getLocalMember().getUuid(); String uid = cluster.getLocalMember().getUuid();
Map<String, long[]> stats = new HashMap<String, long[]>(); Map<String, long[]> stats = new HashMap<>();
for (String cacheName : caches.keySet()) { for (String cacheName : caches.keySet()) {
Cache cache = caches.get(cacheName); Cache cache = caches.get(cacheName);
// The following information is published: // The following information is published:
// current size, max size, num elements, cache // current size, max size, num elements, cache
// hits, cache misses. // hits, cache misses.
long [] info = new long[5]; long[] info = new long[5];
info[0] = cache.getCacheSize(); info[0] = cache.getCacheSize();
info[1] = cache.getMaxCacheSize(); info[1] = cache.getMaxCacheSize();
info[2] = cache.size(); info[2] = cache.size();
...@@ -441,13 +467,15 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -441,13 +467,15 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
} }
} }
@Override
public String getPluginName() { public String getPluginName() {
return "hazelcast"; return "hazelcast";
} }
@Override
public Lock getLock(Object key, Cache cache) { public Lock getLock(Object key, Cache cache) {
if (cache instanceof CacheWrapper) { if (cache instanceof CacheWrapper) {
cache = ((CacheWrapper)cache).getWrappedCache(); cache = ((CacheWrapper) cache).getWrappedCache();
} }
return new ClusterLock(key, (ClusteredCache) cache); return new ClusterLock(key, (ClusteredCache) cache);
} }
...@@ -457,31 +485,37 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -457,31 +485,37 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
private Object key; private Object key;
private ClusteredCache cache; private ClusteredCache cache;
public ClusterLock(Object key, ClusteredCache cache) { ClusterLock(Object key, ClusteredCache cache) {
this.key = key; this.key = key;
this.cache = cache; this.cache = cache;
} }
@Override
public void lock() { public void lock() {
cache.lock(key, -1); cache.lock(key, -1);
} }
public void lockInterruptibly() throws InterruptedException { @Override
public void lockInterruptibly() {
cache.lock(key, -1); cache.lock(key, -1);
} }
@Override
public boolean tryLock() { public boolean tryLock() {
return cache.lock(key, 0); return cache.lock(key, 0);
} }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { @Override
public boolean tryLock(long time, TimeUnit unit) {
return cache.lock(key, unit.toMillis(time)); return cache.lock(key, unit.toMillis(time));
} }
@Override
public void unlock() { public void unlock() {
cache.unlock(key); cache.unlock(key);
} }
@Override
public Condition newCondition() { public Condition newCondition() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
...@@ -489,19 +523,20 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -489,19 +523,20 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
private static class CallableTask<V> implements Callable<V>, Serializable { private static class CallableTask<V> implements Callable<V>, Serializable {
private ClusterTask<V> task; private ClusterTask<V> task;
public CallableTask(ClusterTask<V> task) { CallableTask(ClusterTask<V> task) {
this.task = task; this.task = task;
} }
@Override
public V call() { public V call() {
task.run(); task.run();
logger.debug("CallableTask[" + task.getClass().getName() + "] result: " + task.getResult()); logger.debug("CallableTask[" + task.getClass().getName() + "] result: " + task.getResult());
return task.getResult(); return task.getResult();
} }
} }
private static enum State { private enum State {
stopped, stopped,
starting, starting,
started started
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment