Commit 02870ab9 authored by GregDThomas's avatar GregDThomas

HZ-11: Fire the left cluster event before we leave the cluster

parent 8e3434cb
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package org.jivesoftware.openfire.cluster; package org.jivesoftware.openfire.cluster;
import java.nio.charset.StandardCharsets;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
...@@ -177,6 +178,7 @@ public class ClusterManager { ...@@ -177,6 +178,7 @@ public class ClusterManager {
*/ */
public static void fireJoinedCluster(boolean asynchronous) { public static void fireJoinedCluster(boolean asynchronous) {
try { try {
Log.info("Firing joined cluster event for this node");
Event event = new Event(EventType.joined_cluster, null); Event event = new Event(EventType.joined_cluster, null);
events.put(event); events.put(event);
if (!asynchronous) { if (!asynchronous) {
...@@ -201,6 +203,7 @@ public class ClusterManager { ...@@ -201,6 +203,7 @@ public class ClusterManager {
*/ */
public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) { public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) {
try { try {
Log.info("Firing joined cluster event for another node:" + new String(nodeID, StandardCharsets.UTF_8));
Event event = new Event(EventType.joined_cluster, nodeID); Event event = new Event(EventType.joined_cluster, nodeID);
events.put(event); events.put(event);
if (!asynchronous) { if (!asynchronous) {
...@@ -228,6 +231,7 @@ public class ClusterManager { ...@@ -228,6 +231,7 @@ public class ClusterManager {
*/ */
public static void fireLeftCluster() { public static void fireLeftCluster() {
try { try {
Log.info("Firing left cluster event for this node");
Event event = new Event(EventType.left_cluster, null); Event event = new Event(EventType.left_cluster, null);
events.put(event); events.put(event);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -245,6 +249,7 @@ public class ClusterManager { ...@@ -245,6 +249,7 @@ public class ClusterManager {
*/ */
public static void fireLeftCluster(byte[] nodeID) { public static void fireLeftCluster(byte[] nodeID) {
try { try {
Log.info("Firing left cluster event for another node:" + new String(nodeID, StandardCharsets.UTF_8));
Event event = new Event(EventType.left_cluster, nodeID); Event event = new Event(EventType.left_cluster, nodeID);
events.put(event); events.put(event);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -269,6 +274,7 @@ public class ClusterManager { ...@@ -269,6 +274,7 @@ public class ClusterManager {
*/ */
public static void fireMarkedAsSeniorClusterMember() { public static void fireMarkedAsSeniorClusterMember() {
try { try {
Log.info("Firing marked as senior event");
events.put(new Event(EventType.marked_senior_cluster_member, null)); events.put(new Event(EventType.marked_senior_cluster_member, null));
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Should never happen // Should never happen
......
...@@ -26,6 +26,8 @@ import com.hazelcast.core.HazelcastInstance; ...@@ -26,6 +26,8 @@ import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member; import com.hazelcast.core.Member;
import org.jivesoftware.openfire.JMXManager; import org.jivesoftware.openfire.JMXManager;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo; import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.plugin.session.RemoteSessionLocator; import org.jivesoftware.openfire.plugin.session.RemoteSessionLocator;
...@@ -57,6 +59,7 @@ import java.util.Map; ...@@ -57,6 +59,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
...@@ -177,25 +180,49 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -177,25 +180,49 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
cacheStats = null; cacheStats = null;
// Update the running state of the cluster // Update the running state of the cluster
state = State.stopped; state = State.stopped;
// Fire the leftClusterEvent before we leave the cluster - we need to access the clustered data before the
// cluster is shutdown so it can be copied in to the non-clustered, DefaultCache
final Semaphore leftClusterSemaphore = new Semaphore(0);
final ClusterEventListener clusterEventListener = new ClusterEventListener() {
@Override
public void joinedCluster() {
}
@Override
public void joinedCluster(byte[] bytes) {
}
@Override
public void leftCluster() {
leftClusterSemaphore.release();
}
@Override
public void leftCluster(byte[] bytes) {
}
@Override
public void markedAsSeniorClusterMember() {
}
};
try {
ClusterManager.addListener(clusterEventListener);
ClusterManager.fireLeftCluster();
leftClusterSemaphore.tryAcquire(30, TimeUnit.SECONDS);
} catch( final Exception e) {
logger.error("Unexpected exception waiting for clustering to shut down", e);
} finally {
ClusterManager.removeListener(clusterEventListener);
}
// Stop the cluster // Stop the cluster
hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener);
cluster.removeMembershipListener(membershipListener);
Hazelcast.shutdownAll(); Hazelcast.shutdownAll();
cluster = null; cluster = null;
if (clusterListener != null) { lifecycleListener = null;
// Wait until the server has updated its internal state membershipListener = null;
while (!clusterListener.isDone() && !Thread.currentThread().isInterrupted()) { clusterListener = null;
try {
Thread.sleep(100);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener);
cluster.removeMembershipListener(membershipListener);
lifecycleListener = null;
membershipListener = null;
clusterListener = null;
}
// Reset the node ID
XMPPServer.getInstance().setNodeID(null); XMPPServer.getInstance().setNodeID(null);
// Reset packet router to use to deliver packets to remote cluster nodes // Reset packet router to use to deliver packets to remote cluster nodes
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment