ClusterManager.java 14.9 KB
Newer Older
Gaston Dombiak's avatar
Gaston Dombiak committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/**
 * $RCSfile: $
 * $Revision: $
 * $Date: $
 *
 * Copyright (C) 2007 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution.
 */

package org.jivesoftware.openfire.cluster;

import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.util.JiveGlobals;
16
import org.jivesoftware.util.JiveProperties;
Gaston Dombiak's avatar
Gaston Dombiak committed
17 18 19 20 21 22
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LocalLockFactory;
import org.jivesoftware.util.lock.LockManager;

import java.util.Queue;
23
import java.util.concurrent.BlockingQueue;
Gaston Dombiak's avatar
Gaston Dombiak committed
24
import java.util.concurrent.ConcurrentLinkedQueue;
25
import java.util.concurrent.LinkedBlockingQueue;
Gaston Dombiak's avatar
Gaston Dombiak committed
26 27 28 29 30 31 32 33 34 35 36

/**
 * A cluster manager is responsible for triggering events related to clustering.
 * A future version will also provide statistics about the cluster.
 *
 * @author Gaston Dombiak
 */
public class ClusterManager {

    private static String CLUSTER_PROPERTY_NAME = "cache.clustering.enabled";
    private static Queue<ClusterEventListener> listeners = new ConcurrentLinkedQueue<ClusterEventListener>();
37
    private static BlockingQueue<Event> events = new LinkedBlockingQueue<Event>();
38 39 40 41

    static {
        Thread thread = new Thread("ClusterManager events dispatcher") {
            public void run() {
42 43 44 45 46
                for (; ;) {
                    try {
                        Event event = events.take();
                        EventType eventType = event.getType();
                        // Make sure that CacheFactory is getting this events first (to update cache structure)
47
                        if (eventType == EventType.joined_cluster && event.getNodeID() == null) {
48 49 50 51 52 53 54 55
                            // Replace standalone caches with clustered caches. Local cached data is not moved.
                            CacheFactory.joinedCluster();
                        }
                        // Now notify rest of the listeners
                        for (ClusterEventListener listener : listeners) {
                            try {
                                switch (eventType) {
                                    case joined_cluster: {
56 57 58 59 60 61
                                        if (event.getNodeID() == null) {
                                            listener.joinedCluster();
                                        }
                                        else {
                                            listener.joinedCluster(event.getNodeID());
                                        }
62 63 64
                                        break;
                                    }
                                    case left_cluster: {
65 66 67 68 69 70
                                        if (event.getNodeID() == null) {
                                            listener.leftCluster();
                                        }
                                        else {
                                            listener.leftCluster(event.getNodeID());
                                        }
71 72 73 74 75 76 77 78
                                        break;
                                    }
                                    case marked_senior_cluster_member: {
                                        listener.markedAsSeniorClusterMember();
                                        break;
                                    }
                                    default:
                                        break;
79
                                }
80 81 82
                            }
                            catch (Exception e) {
                                Log.error(e);
83 84
                            }
                        }
85 86 87 88 89 90
                        // Mark event as processed
                        event.setProcessed(true);
                    } catch (InterruptedException e) {
                        Log.warn(e);
                    } catch (Exception e) {
                        Log.error(e);
91 92 93 94 95 96 97
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }
Gaston Dombiak's avatar
Gaston Dombiak committed
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121

    /**
     * Registers a listener to receive events.
     *
     * @param listener the listener.
     */
    public static void addListener(ClusterEventListener listener) {
        if (listener == null) {
            throw new NullPointerException();
        }
        listeners.add(listener);
    }

    /**
     * Unregisters a listener to receive events.
     *
     * @param listener the listener.
     */
    public static void removeListener(ClusterEventListener listener) {
        listeners.remove(listener);
    }


    /**
122 123 124 125 126 127 128
     * Triggers event indicating that this JVM is now part of a cluster. At this point the
     * {@link org.jivesoftware.openfire.XMPPServer#getNodeID()} holds the new nodeID value and
     * the old nodeID value is passed in case the listener needs it.<p>
     * <p/>
     * When joining the cluster as the senior cluster member the {@link #fireMarkedAsSeniorClusterMember()}
     * event will be sent right after this event.<p>
     * <p/>
129
     * This event will be triggered in another thread. This will avoid potential deadlocks
130
     * in Coherence.
Gaston Dombiak's avatar
Gaston Dombiak committed
131
     *
132 133
     * @param asynchronous true if event will be triggered in background
     */
134
    public static void fireJoinedCluster(boolean asynchronous) {
135
        try {
136
            Event event = new Event(EventType.joined_cluster, null);
137 138 139
            events.put(event);
            if (!asynchronous) {
                while (!event.isProcessed()) {
140
                    Thread.sleep(50);
141 142 143 144
                }
            }
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
145
            Log.error(e);
146 147 148 149
        }
    }

    /**
150 151 152 153 154 155 156
     * Triggers event indicating that another JVM is now part of a cluster.<p>
     *
     * This event will be triggered in another thread. This will avoid potential deadlocks
     * in Coherence.
     *
     * @param nodeID    nodeID assigned to the JVM when joining the cluster.
     * @param asynchronous true if event will be triggered in background
157
     */
158
    public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) {
159
        try {
160
            Event event = new Event(EventType.joined_cluster, nodeID);
161
            events.put(event);
162 163 164 165
            if (!asynchronous) {
                while (!event.isProcessed()) {
                    Thread.sleep(50);
                }
166
            }
167 168
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
169
            Log.error(e);
170 171 172 173 174 175 176
        }
    }

    /**
     * Triggers event indicating that this JVM is no longer part of the cluster. This could
     * happen when disabling clustering support or removing the enterprise plugin that provides
     * clustering support.<p>
Gaston Dombiak's avatar
Gaston Dombiak committed
177
     *
178 179 180 181 182
     * Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the
     * island were this JVM belonged was marked as "old" then all nodes of that island will
     * get the <tt>left cluster event</tt> and <tt>joined cluster events</tt>. That means that
     * caches will be reset and thus will need to be repopulated again with fresh data from this JVM.
     * This also includes the case where this JVM was the senior cluster member and when the islands
Gaston Dombiak's avatar
Gaston Dombiak committed
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
     * met again then this JVM stopped being the senior member.
     */
    public static void fireLeftCluster() {
        CacheFactory.leftCluster();
        // Now notify rest of the listeners
        for (ClusterEventListener listener : listeners) {
            try {
                listener.leftCluster();
            }
            catch (Exception e) {
                Log.error(e);
            }
        }
    }

    /**
     * Triggers event indicating that another JVM is no longer part of the cluster. This could
     * happen when disabling clustering support or removing the enterprise plugin that provides
     * clustering support.
202
     *
Gaston Dombiak's avatar
Gaston Dombiak committed
203
     * @param nodeID    nodeID assigned to the JVM when joining the cluster.
204
     */
Gaston Dombiak's avatar
Gaston Dombiak committed
205
    public static void fireLeftCluster(byte[] nodeID) {
206
        try {
Gaston Dombiak's avatar
Gaston Dombiak committed
207
            Event event = new Event(EventType.left_cluster, nodeID);
208
            events.put(event);
209 210
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
211
            Log.error(e);
212 213 214 215 216 217 218 219 220 221 222 223
        }
    }

    /**
     * Triggers event indicating that this JVM is now the senior cluster member. This
     * could either happen when initially joining the cluster or when the senior cluster
     * member node left the cluster and this JVM was marked as the new senior cluster member.<p>
     * <p/>
     * Moreover, in the case of a "split brain" scenario (ie. separated cluster islands) each
     * island will have its own senior cluster member. However, when the islands meet again there
     * could only be one senior cluster member so one of the senior cluster members will stop playing
     * that role. When that happens the JVM no longer playing that role will receive the
Gaston Dombiak's avatar
Gaston Dombiak committed
224
     * {@link #fireLeftCluster()} and {@link #fireJoinedCluster(boolean)} events.<p>
225 226 227
     * <p/>
     * This event will be triggered in another thread. This will avoid potential deadlocks
     * in Coherence.
Gaston Dombiak's avatar
Gaston Dombiak committed
228
     */
229
    public static void fireMarkedAsSeniorClusterMember() {
230
        try {
231
            events.put(new Event(EventType.marked_senior_cluster_member, null));
232 233
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
234 235 236 237 238 239
        }
    }

    /**
     * Starts the cluster service if clustering is enabled. The process of starting clustering
     * will recreate caches as distributed caches.<p>
240
     * <p/>
Gaston Dombiak's avatar
Gaston Dombiak committed
241 242 243 244 245
     * Before starting a cluster the {@link LockManager#setLockFactory(org.jivesoftware.util.lock.LockFactory)},
     * {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and
     * {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)}
     * need to be properly configured.
     */
246
    public static synchronized void startup() {
Gaston Dombiak's avatar
Gaston Dombiak committed
247 248 249 250 251 252 253 254 255 256 257 258
        if (isClusteringStarted()) {
            return;
        }
        // See if clustering should be enabled.
        if (isClusteringEnabled()) {
            if (XMPPServer.getInstance().getRemoteSessionLocator() == null) {
                throw new IllegalStateException("No RemoteSessionLocator was found.");
            }
            if (XMPPServer.getInstance().getRoutingTable().getRemotePacketRouter() == null) {
                throw new IllegalStateException("No RemotePacketRouter was found.");
            }
            // Start up the cluster and reset caches
259
            CacheFactory.startClustering();
Gaston Dombiak's avatar
Gaston Dombiak committed
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
        }
    }

    /**
     * Shuts down the clustering service. This method should be called when the Jive
     * system is shutting down, and must not be called otherwise. Failing to call
     * this method may temporarily impact cluster performance, as the system will
     * have to do extra work to recover from a non-clean shutdown.
     * If clustering is not enabled, this method will do nothing.
     */
    public static synchronized void shutdown() {
        // Reset the LockFactory to the default one
        LockManager.setLockFactory(new LocalLockFactory());
        // Reset packet router to use to deliver packets to remote cluster nodes
        XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
        if (isClusteringStarted()) {
276 277
            Log.debug("Shutting down clustered cache service.");
            CacheFactory.stopClustering();
Gaston Dombiak's avatar
Gaston Dombiak committed
278
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
279 280
        // Reset the session locator to use
        XMPPServer.getInstance().setRemoteSessionLocator(null);
Gaston Dombiak's avatar
Gaston Dombiak committed
281 282 283
    }

    /**
284 285 286
     * Sets true if clustering support is enabled. An attempt to start or join
     * an existing cluster will be attempted in the service was enabled. On the
     * other hand, if disabled then this JVM will leave or stop the cluster.
Gaston Dombiak's avatar
Gaston Dombiak committed
287 288 289 290
     *
     * @param enabled if clustering support is enabled.
     */
    public static void setClusteringEnabled(boolean enabled) {
291 292 293
        if (enabled == isClusteringEnabled()) {
            return;
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
294
        JiveGlobals.setXMLProperty(CLUSTER_PROPERTY_NAME, Boolean.toString(enabled));
295 296 297 298 299 300 301 302 303
        if (!enabled) {
            CacheFactory.stopClustering();
        }
        else {
            // Reload Jive properties. This will ensure that this nodes copy of the
            // properties starts correct.
           JiveProperties.getInstance().init();
           CacheFactory.startClustering();
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
    }

    /**
     * Returns true if clustering support is enabled. This does not mean
     * that clustering has started or that clustering will be able to start.
     *
     * @return true if clustering support is enabled.
     */
    public static boolean isClusteringEnabled() {
        return JiveGlobals.getXMLProperty(CLUSTER_PROPERTY_NAME, false);
    }

    /**
     * Returns true if this JVM is part of a cluster. The cluster may have many nodes
     * or this JVM could be the only node.
     *
     * @return true if this JVM is part of a cluster.
     */
    public static boolean isClusteringStarted() {
        return CacheFactory.isClusteringStarted();
    }

    /**
     * Returns true if this member is the senior member in the cluster. If clustering
     * is not enabled, this method will also return true. This test is useful for
     * tasks that should only be run on a single member in a cluster.
     *
     * @return true if this cluster member is the senior or if clustering is not enabled.
     */
    public static boolean isSeniorClusterMember() {
        return CacheFactory.isSeniorClusterMember();
    }

337 338
    private static class Event {
        private EventType type;
339
        private byte[] nodeID;
340 341 342 343
        private boolean processed;

        public Event(EventType type, byte[] oldNodeID) {
            this.type = type;
344
            this.nodeID = oldNodeID;
345 346 347 348 349 350
        }

        public EventType getType() {
            return type;
        }

351 352
        public byte[] getNodeID() {
            return nodeID;
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
        }

        public boolean isProcessed() {
            return processed;
        }

        public void setProcessed(boolean processed) {
            this.processed = processed;
        }

        public String toString() {
            return super.toString() + " type: " + type;
        }
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
368 369 370
    /**
     * Represents valid event types.
     */
371
    private enum EventType {
Gaston Dombiak's avatar
Gaston Dombiak committed
372 373 374 375 376 377

        /**
         * This JVM joined a cluster.
         */
        joined_cluster,

378 379
        /**
         * This JVM is no longer part of the cluster.
Gaston Dombiak's avatar
Gaston Dombiak committed
380 381 382 383
         */
        left_cluster,

        /**
384
         * This JVM is now the senior cluster member.
Gaston Dombiak's avatar
Gaston Dombiak committed
385 386 387 388
         */
        marked_senior_cluster_member
    }
}