ClusterManager.java 18 KB
Newer Older
Gaston Dombiak's avatar
Gaston Dombiak committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/**
 * $RCSfile: $
 * $Revision: $
 * $Date: $
 *
 * Copyright (C) 2007 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution.
 */

package org.jivesoftware.openfire.cluster;

import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.util.JiveGlobals;
16
import org.jivesoftware.util.JiveProperties;
Gaston Dombiak's avatar
Gaston Dombiak committed
17 18 19 20 21
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LocalLockFactory;
import org.jivesoftware.util.lock.LockManager;

Gaston Dombiak's avatar
Gaston Dombiak committed
22
import java.util.Collection;
Gaston Dombiak's avatar
Gaston Dombiak committed
23
import java.util.Queue;
24
import java.util.concurrent.BlockingQueue;
Gaston Dombiak's avatar
Gaston Dombiak committed
25
import java.util.concurrent.ConcurrentLinkedQueue;
26
import java.util.concurrent.LinkedBlockingQueue;
Gaston Dombiak's avatar
Gaston Dombiak committed
27 28 29 30 31 32 33 34

/**
 * A cluster manager is responsible for triggering events related to clustering.
 * A future version will also provide statistics about the cluster.
 *
 * @author Gaston Dombiak
 */
public class ClusterManager {
Gaston Dombiak's avatar
Gaston Dombiak committed
35
    public static String CLUSTER_PROPERTY_NAME = "clustering.enabled";
Gaston Dombiak's avatar
Gaston Dombiak committed
36
    private static Queue<ClusterEventListener> listeners = new ConcurrentLinkedQueue<ClusterEventListener>();
37
    private static BlockingQueue<Event> events = new LinkedBlockingQueue<Event>();
38 39 40 41

    static {
        Thread thread = new Thread("ClusterManager events dispatcher") {
            public void run() {
42 43 44 45 46
                for (; ;) {
                    try {
                        Event event = events.take();
                        EventType eventType = event.getType();
                        // Make sure that CacheFactory is getting this events first (to update cache structure)
47
                        if (eventType == EventType.joined_cluster && event.getNodeID() == null) {
48 49 50 51 52 53 54 55
                            // Replace standalone caches with clustered caches. Local cached data is not moved.
                            CacheFactory.joinedCluster();
                        }
                        // Now notify rest of the listeners
                        for (ClusterEventListener listener : listeners) {
                            try {
                                switch (eventType) {
                                    case joined_cluster: {
56 57 58 59 60 61
                                        if (event.getNodeID() == null) {
                                            listener.joinedCluster();
                                        }
                                        else {
                                            listener.joinedCluster(event.getNodeID());
                                        }
62 63 64
                                        break;
                                    }
                                    case left_cluster: {
65 66 67 68 69 70
                                        if (event.getNodeID() == null) {
                                            listener.leftCluster();
                                        }
                                        else {
                                            listener.leftCluster(event.getNodeID());
                                        }
71 72 73 74 75 76 77 78
                                        break;
                                    }
                                    case marked_senior_cluster_member: {
                                        listener.markedAsSeniorClusterMember();
                                        break;
                                    }
                                    default:
                                        break;
79
                                }
80 81 82
                            }
                            catch (Exception e) {
                                Log.error(e);
83 84
                            }
                        }
85 86 87 88 89 90
                        // Mark event as processed
                        event.setProcessed(true);
                    } catch (InterruptedException e) {
                        Log.warn(e);
                    } catch (Exception e) {
                        Log.error(e);
91 92 93 94 95 96 97
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }
Gaston Dombiak's avatar
Gaston Dombiak committed
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121

    /**
     * Registers a listener to receive events.
     *
     * @param listener the listener.
     */
    public static void addListener(ClusterEventListener listener) {
        if (listener == null) {
            throw new NullPointerException();
        }
        listeners.add(listener);
    }

    /**
     * Unregisters a listener to receive events.
     *
     * @param listener the listener.
     */
    public static void removeListener(ClusterEventListener listener) {
        listeners.remove(listener);
    }


    /**
122 123 124 125 126 127 128
     * Triggers event indicating that this JVM is now part of a cluster. At this point the
     * {@link org.jivesoftware.openfire.XMPPServer#getNodeID()} holds the new nodeID value and
     * the old nodeID value is passed in case the listener needs it.<p>
     * <p/>
     * When joining the cluster as the senior cluster member the {@link #fireMarkedAsSeniorClusterMember()}
     * event will be sent right after this event.<p>
     * <p/>
129
     * This event will be triggered in another thread. This will avoid potential deadlocks
130
     * in Coherence.
Gaston Dombiak's avatar
Gaston Dombiak committed
131
     *
132 133
     * @param asynchronous true if event will be triggered in background
     */
134
    public static void fireJoinedCluster(boolean asynchronous) {
135
        try {
136
            Event event = new Event(EventType.joined_cluster, null);
137 138 139
            events.put(event);
            if (!asynchronous) {
                while (!event.isProcessed()) {
140
                    Thread.sleep(50);
141 142 143 144
                }
            }
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
145
            Log.error(e);
146 147 148 149
        }
    }

    /**
150 151 152 153 154 155 156
     * Triggers event indicating that another JVM is now part of a cluster.<p>
     *
     * This event will be triggered in another thread. This will avoid potential deadlocks
     * in Coherence.
     *
     * @param nodeID    nodeID assigned to the JVM when joining the cluster.
     * @param asynchronous true if event will be triggered in background
157
     */
158
    public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) {
159
        try {
160
            Event event = new Event(EventType.joined_cluster, nodeID);
161
            events.put(event);
162 163 164 165
            if (!asynchronous) {
                while (!event.isProcessed()) {
                    Thread.sleep(50);
                }
166
            }
167 168
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
169
            Log.error(e);
170 171 172 173 174 175 176
        }
    }

    /**
     * Triggers event indicating that this JVM is no longer part of the cluster. This could
     * happen when disabling clustering support or removing the enterprise plugin that provides
     * clustering support.<p>
Gaston Dombiak's avatar
Gaston Dombiak committed
177
     *
178 179 180 181 182
     * Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the
     * island were this JVM belonged was marked as "old" then all nodes of that island will
     * get the <tt>left cluster event</tt> and <tt>joined cluster events</tt>. That means that
     * caches will be reset and thus will need to be repopulated again with fresh data from this JVM.
     * This also includes the case where this JVM was the senior cluster member and when the islands
Gaston Dombiak's avatar
Gaston Dombiak committed
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
     * met again then this JVM stopped being the senior member.
     */
    public static void fireLeftCluster() {
        // Now notify rest of the listeners
        for (ClusterEventListener listener : listeners) {
            try {
                listener.leftCluster();
            }
            catch (Exception e) {
                Log.error(e);
            }
        }
    }

    /**
     * Triggers event indicating that another JVM is no longer part of the cluster. This could
     * happen when disabling clustering support or removing the enterprise plugin that provides
     * clustering support.
201
     *
Gaston Dombiak's avatar
Gaston Dombiak committed
202
     * @param nodeID    nodeID assigned to the JVM when joining the cluster.
203
     */
Gaston Dombiak's avatar
Gaston Dombiak committed
204
    public static void fireLeftCluster(byte[] nodeID) {
205
        try {
Gaston Dombiak's avatar
Gaston Dombiak committed
206
            Event event = new Event(EventType.left_cluster, nodeID);
207
            events.put(event);
208 209
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
210
            Log.error(e);
211 212 213 214 215 216 217 218 219 220 221 222
        }
    }

    /**
     * Triggers event indicating that this JVM is now the senior cluster member. This
     * could either happen when initially joining the cluster or when the senior cluster
     * member node left the cluster and this JVM was marked as the new senior cluster member.<p>
     * <p/>
     * Moreover, in the case of a "split brain" scenario (ie. separated cluster islands) each
     * island will have its own senior cluster member. However, when the islands meet again there
     * could only be one senior cluster member so one of the senior cluster members will stop playing
     * that role. When that happens the JVM no longer playing that role will receive the
Gaston Dombiak's avatar
Gaston Dombiak committed
223
     * {@link #fireLeftCluster()} and {@link #fireJoinedCluster(boolean)} events.<p>
224 225 226
     * <p/>
     * This event will be triggered in another thread. This will avoid potential deadlocks
     * in Coherence.
Gaston Dombiak's avatar
Gaston Dombiak committed
227
     */
228
    public static void fireMarkedAsSeniorClusterMember() {
229
        try {
230
            events.put(new Event(EventType.marked_senior_cluster_member, null));
231 232
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
233 234 235 236 237 238
        }
    }

    /**
     * Starts the cluster service if clustering is enabled. The process of starting clustering
     * will recreate caches as distributed caches.<p>
239
     * <p/>
Gaston Dombiak's avatar
Gaston Dombiak committed
240 241 242 243 244
     * Before starting a cluster the {@link LockManager#setLockFactory(org.jivesoftware.util.lock.LockFactory)},
     * {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and
     * {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)}
     * need to be properly configured.
     */
245
    public static synchronized void startup() {
Gaston Dombiak's avatar
Gaston Dombiak committed
246 247 248 249 250 251 252 253 254 255 256 257
        if (isClusteringStarted()) {
            return;
        }
        // See if clustering should be enabled.
        if (isClusteringEnabled()) {
            if (XMPPServer.getInstance().getRemoteSessionLocator() == null) {
                throw new IllegalStateException("No RemoteSessionLocator was found.");
            }
            if (XMPPServer.getInstance().getRoutingTable().getRemotePacketRouter() == null) {
                throw new IllegalStateException("No RemotePacketRouter was found.");
            }
            // Start up the cluster and reset caches
258
            CacheFactory.startClustering();
Gaston Dombiak's avatar
Gaston Dombiak committed
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
        }
    }

    /**
     * Shuts down the clustering service. This method should be called when the Jive
     * system is shutting down, and must not be called otherwise. Failing to call
     * this method may temporarily impact cluster performance, as the system will
     * have to do extra work to recover from a non-clean shutdown.
     * If clustering is not enabled, this method will do nothing.
     */
    public static synchronized void shutdown() {
        // Reset the LockFactory to the default one
        LockManager.setLockFactory(new LocalLockFactory());
        // Reset packet router to use to deliver packets to remote cluster nodes
        XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
        if (isClusteringStarted()) {
275
            Log.debug("ClusterManager: Shutting down clustered cache service.");
276
            CacheFactory.stopClustering();
Gaston Dombiak's avatar
Gaston Dombiak committed
277
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
278 279
        // Reset the session locator to use
        XMPPServer.getInstance().setRemoteSessionLocator(null);
Gaston Dombiak's avatar
Gaston Dombiak committed
280 281 282
    }

    /**
283 284 285
     * Sets true if clustering support is enabled. An attempt to start or join
     * an existing cluster will be attempted in the service was enabled. On the
     * other hand, if disabled then this JVM will leave or stop the cluster.
Gaston Dombiak's avatar
Gaston Dombiak committed
286 287 288 289
     *
     * @param enabled if clustering support is enabled.
     */
    public static void setClusteringEnabled(boolean enabled) {
Gaston Dombiak's avatar
Gaston Dombiak committed
290 291 292 293 294 295 296 297 298 299 300
        if (enabled) {
            // Check that clustering is not already enabled and we are already in a cluster
            if (isClusteringEnabled() && isClusteringStarted()) {
                return;
            }
        }
        else {
            // Check that clustering is disabled
            if (!isClusteringEnabled()) {
                return;
            }
301
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
302
        JiveGlobals.setXMLProperty(CLUSTER_PROPERTY_NAME, Boolean.toString(enabled));
303
        if (!enabled) {
Gaston Dombiak's avatar
Gaston Dombiak committed
304
            shutdown();
305 306 307 308 309
        }
        else {
            // Reload Jive properties. This will ensure that this nodes copy of the
            // properties starts correct.
           JiveProperties.getInstance().init();
Gaston Dombiak's avatar
Gaston Dombiak committed
310
           startup();
311
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
312 313 314 315 316 317 318 319 320 321 322 323
    }

    /**
     * Returns true if clustering support is enabled. This does not mean
     * that clustering has started or that clustering will be able to start.
     *
     * @return true if clustering support is enabled.
     */
    public static boolean isClusteringEnabled() {
        return JiveGlobals.getXMLProperty(CLUSTER_PROPERTY_NAME, false);
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
324
    /**
Gaston Dombiak's avatar
Gaston Dombiak committed
325 326 327 328
     * Returns true if clustering is installed and can be used by this JVM
     * to join a cluster. A false value could mean that either clustering
     * support is not available or the license does not allow to have more
     * than 1 cluster node.
Gaston Dombiak's avatar
Gaston Dombiak committed
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
     *
     * @return true if clustering is installed and can be used by 
     * this JVM to join a cluster.
     */
    public static boolean isClusteringAvailable() {
        return CacheFactory.isClusteringAvailable();
    }

    /**
     * Returns true is clustering is currently being started. Once the cluster
     * is started or failed to be started this value will be false.
     *
     * @return true is clustering is currently being started.
     */
    public static boolean isClusteringStarting() {
        return CacheFactory.isClusteringStarting();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
    /**
     * Returns true if this JVM is part of a cluster. The cluster may have many nodes
     * or this JVM could be the only node.
     *
     * @return true if this JVM is part of a cluster.
     */
    public static boolean isClusteringStarted() {
        return CacheFactory.isClusteringStarted();
    }

    /**
     * Returns true if this member is the senior member in the cluster. If clustering
     * is not enabled, this method will also return true. This test is useful for
     * tasks that should only be run on a single member in a cluster.
     *
     * @return true if this cluster member is the senior or if clustering is not enabled.
     */
    public static boolean isSeniorClusterMember() {
        return CacheFactory.isSeniorClusterMember();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
368 369 370 371 372 373 374 375 376 377 378
    /**
     * Returns basic information about the current members of the cluster or an empty
     * collection if not running in a cluster.
     *
     * @return information about the current members of the cluster or an empty
     *         collection if not running in a cluster.
     */
    public static Collection<ClusterNodeInfo> getNodesInfo() {
        return CacheFactory.getClusterNodesInfo();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
379
    /**
Gaston Dombiak's avatar
Gaston Dombiak committed
380 381 382
     * Returns the maximum number of cluster members allowed. Both values 0 and 1 mean that clustering
     * is not available. However, a value of 1 means that it's a license problem rather than not having
     * the ability to do clustering as defined with value 0.
Gaston Dombiak's avatar
Gaston Dombiak committed
383
     *
Gaston Dombiak's avatar
Gaston Dombiak committed
384
     * @return the maximum number of cluster members allowed or 0 or 1 if clustering is not allowed.
Gaston Dombiak's avatar
Gaston Dombiak committed
385 386 387 388 389
     */
    public static int getMaxClusterNodes() {
        return CacheFactory.getMaxClusterNodes();
    }

390 391 392 393 394 395 396 397 398 399 400 401 402 403
    /**
     * Returns the id of the node that is the senior cluster member. When not in a cluster the returned
     * node id will be the {@link XMPPServer#getNodeID()}.
     *
     * @return the id of the node that is the senior cluster member.
     */
    public static NodeID getSeniorClusterMember() {
        byte[] clusterMemberID = CacheFactory.getSeniorClusterMemberID();
        if (clusterMemberID == null) {
            return XMPPServer.getInstance().getNodeID();
        }
        return NodeID.getInstance(clusterMemberID);
    }

404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
    /**
     * Returns true if the specified node ID belongs to a known cluster node
     * of this cluster.
     *
     * @param nodeID the ID of the node to verify.
     * @return true if the specified node ID belongs to a known cluster node
     *         of this cluster.
     */
    public static boolean isClusterMember(byte[] nodeID) {
        for (ClusterNodeInfo nodeInfo : getNodesInfo()) {
            if (nodeInfo.getNodeID().equals(nodeID)) {
                return true;
            }
        }
        return false;
    }

421 422
    private static class Event {
        private EventType type;
423
        private byte[] nodeID;
424 425 426 427
        private boolean processed;

        public Event(EventType type, byte[] oldNodeID) {
            this.type = type;
428
            this.nodeID = oldNodeID;
429 430 431 432 433 434
        }

        public EventType getType() {
            return type;
        }

435 436
        public byte[] getNodeID() {
            return nodeID;
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
        }

        public boolean isProcessed() {
            return processed;
        }

        public void setProcessed(boolean processed) {
            this.processed = processed;
        }

        public String toString() {
            return super.toString() + " type: " + type;
        }
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
452 453 454
    /**
     * Represents valid event types.
     */
455
    private enum EventType {
Gaston Dombiak's avatar
Gaston Dombiak committed
456 457 458 459 460 461

        /**
         * This JVM joined a cluster.
         */
        joined_cluster,

462 463
        /**
         * This JVM is no longer part of the cluster.
Gaston Dombiak's avatar
Gaston Dombiak committed
464 465 466 467
         */
        left_cluster,

        /**
468
         * This JVM is now the senior cluster member.
Gaston Dombiak's avatar
Gaston Dombiak committed
469 470 471 472
         */
        marked_senior_cluster_member
    }
}