ClusterManager.java 17.7 KB
Newer Older
Gaston Dombiak's avatar
Gaston Dombiak committed
1 2 3 4 5
/**
 * $RCSfile: $
 * $Revision: $
 * $Date: $
 *
6
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
Gaston Dombiak's avatar
Gaston Dombiak committed
7 8
 *
 * This software is published under the terms of the GNU Public License (GPL),
9 10
 * a copy of which is included in this distribution, or a commercial license
 * agreement with Jive.
Gaston Dombiak's avatar
Gaston Dombiak committed
11 12 13 14 15 16
 */

package org.jivesoftware.openfire.cluster;

import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.util.JiveGlobals;
17
import org.jivesoftware.util.JiveProperties;
Gaston Dombiak's avatar
Gaston Dombiak committed
18 19 20
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.CacheFactory;

Gaston Dombiak's avatar
Gaston Dombiak committed
21
import java.util.Collection;
Gaston Dombiak's avatar
Gaston Dombiak committed
22
import java.util.Queue;
23
import java.util.concurrent.BlockingQueue;
Gaston Dombiak's avatar
Gaston Dombiak committed
24
import java.util.concurrent.ConcurrentLinkedQueue;
25
import java.util.concurrent.LinkedBlockingQueue;
Gaston Dombiak's avatar
Gaston Dombiak committed
26 27 28 29 30 31 32 33

/**
 * A cluster manager is responsible for triggering events related to clustering.
 * A future version will also provide statistics about the cluster.
 *
 * @author Gaston Dombiak
 */
public class ClusterManager {
Gaston Dombiak's avatar
Gaston Dombiak committed
34
    public static String CLUSTER_PROPERTY_NAME = "clustering.enabled";
Gaston Dombiak's avatar
Gaston Dombiak committed
35
    private static Queue<ClusterEventListener> listeners = new ConcurrentLinkedQueue<ClusterEventListener>();
36
    private static BlockingQueue<Event> events = new LinkedBlockingQueue<Event>();
37 38 39 40

    static {
        Thread thread = new Thread("ClusterManager events dispatcher") {
            public void run() {
41 42 43 44 45
                for (; ;) {
                    try {
                        Event event = events.take();
                        EventType eventType = event.getType();
                        // Make sure that CacheFactory is getting this events first (to update cache structure)
46
                        if (eventType == EventType.joined_cluster && event.getNodeID() == null) {
47 48 49 50 51 52 53 54
                            // Replace standalone caches with clustered caches. Local cached data is not moved.
                            CacheFactory.joinedCluster();
                        }
                        // Now notify rest of the listeners
                        for (ClusterEventListener listener : listeners) {
                            try {
                                switch (eventType) {
                                    case joined_cluster: {
55 56 57 58 59 60
                                        if (event.getNodeID() == null) {
                                            listener.joinedCluster();
                                        }
                                        else {
                                            listener.joinedCluster(event.getNodeID());
                                        }
61 62 63
                                        break;
                                    }
                                    case left_cluster: {
64 65 66 67 68 69
                                        if (event.getNodeID() == null) {
                                            listener.leftCluster();
                                        }
                                        else {
                                            listener.leftCluster(event.getNodeID());
                                        }
70 71 72 73 74 75 76 77
                                        break;
                                    }
                                    case marked_senior_cluster_member: {
                                        listener.markedAsSeniorClusterMember();
                                        break;
                                    }
                                    default:
                                        break;
78
                                }
79 80 81
                            }
                            catch (Exception e) {
                                Log.error(e);
82 83
                            }
                        }
84 85 86 87 88 89
                        // Mark event as processed
                        event.setProcessed(true);
                    } catch (InterruptedException e) {
                        Log.warn(e);
                    } catch (Exception e) {
                        Log.error(e);
90 91 92 93 94 95 96
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }
Gaston Dombiak's avatar
Gaston Dombiak committed
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120

    /**
     * Registers a listener to receive events.
     *
     * @param listener the listener.
     */
    public static void addListener(ClusterEventListener listener) {
        if (listener == null) {
            throw new NullPointerException();
        }
        listeners.add(listener);
    }

    /**
     * Unregisters a listener to receive events.
     *
     * @param listener the listener.
     */
    public static void removeListener(ClusterEventListener listener) {
        listeners.remove(listener);
    }


    /**
121 122 123 124 125 126 127
     * Triggers event indicating that this JVM is now part of a cluster. At this point the
     * {@link org.jivesoftware.openfire.XMPPServer#getNodeID()} holds the new nodeID value and
     * the old nodeID value is passed in case the listener needs it.<p>
     * <p/>
     * When joining the cluster as the senior cluster member the {@link #fireMarkedAsSeniorClusterMember()}
     * event will be sent right after this event.<p>
     * <p/>
128
     * This event will be triggered in another thread. This will avoid potential deadlocks
129
     * in Coherence.
Gaston Dombiak's avatar
Gaston Dombiak committed
130
     *
131 132
     * @param asynchronous true if event will be triggered in background
     */
133
    public static void fireJoinedCluster(boolean asynchronous) {
134
        try {
135
            Event event = new Event(EventType.joined_cluster, null);
136 137 138
            events.put(event);
            if (!asynchronous) {
                while (!event.isProcessed()) {
139
                    Thread.sleep(50);
140 141 142 143
                }
            }
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
144
            Log.error(e);
145 146 147 148
        }
    }

    /**
149 150 151 152 153 154 155
     * Triggers event indicating that another JVM is now part of a cluster.<p>
     *
     * This event will be triggered in another thread. This will avoid potential deadlocks
     * in Coherence.
     *
     * @param nodeID    nodeID assigned to the JVM when joining the cluster.
     * @param asynchronous true if event will be triggered in background
156
     */
157
    public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) {
158
        try {
159
            Event event = new Event(EventType.joined_cluster, nodeID);
160
            events.put(event);
161 162 163 164
            if (!asynchronous) {
                while (!event.isProcessed()) {
                    Thread.sleep(50);
                }
165
            }
166 167
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
168
            Log.error(e);
169 170 171 172 173 174 175
        }
    }

    /**
     * Triggers event indicating that this JVM is no longer part of the cluster. This could
     * happen when disabling clustering support or removing the enterprise plugin that provides
     * clustering support.<p>
Gaston Dombiak's avatar
Gaston Dombiak committed
176
     *
177 178 179 180 181
     * Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the
     * island were this JVM belonged was marked as "old" then all nodes of that island will
     * get the <tt>left cluster event</tt> and <tt>joined cluster events</tt>. That means that
     * caches will be reset and thus will need to be repopulated again with fresh data from this JVM.
     * This also includes the case where this JVM was the senior cluster member and when the islands
Gaston Dombiak's avatar
Gaston Dombiak committed
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
     * met again then this JVM stopped being the senior member.
     */
    public static void fireLeftCluster() {
        // Now notify rest of the listeners
        for (ClusterEventListener listener : listeners) {
            try {
                listener.leftCluster();
            }
            catch (Exception e) {
                Log.error(e);
            }
        }
    }

    /**
     * Triggers event indicating that another JVM is no longer part of the cluster. This could
     * happen when disabling clustering support or removing the enterprise plugin that provides
     * clustering support.
200
     *
Gaston Dombiak's avatar
Gaston Dombiak committed
201
     * @param nodeID    nodeID assigned to the JVM when joining the cluster.
202
     */
Gaston Dombiak's avatar
Gaston Dombiak committed
203
    public static void fireLeftCluster(byte[] nodeID) {
204
        try {
Gaston Dombiak's avatar
Gaston Dombiak committed
205
            Event event = new Event(EventType.left_cluster, nodeID);
206
            events.put(event);
207 208
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
209
            Log.error(e);
210 211 212 213 214 215 216 217 218 219 220 221
        }
    }

    /**
     * Triggers event indicating that this JVM is now the senior cluster member. This
     * could either happen when initially joining the cluster or when the senior cluster
     * member node left the cluster and this JVM was marked as the new senior cluster member.<p>
     * <p/>
     * Moreover, in the case of a "split brain" scenario (ie. separated cluster islands) each
     * island will have its own senior cluster member. However, when the islands meet again there
     * could only be one senior cluster member so one of the senior cluster members will stop playing
     * that role. When that happens the JVM no longer playing that role will receive the
Gaston Dombiak's avatar
Gaston Dombiak committed
222
     * {@link #fireLeftCluster()} and {@link #fireJoinedCluster(boolean)} events.<p>
223 224 225
     * <p/>
     * This event will be triggered in another thread. This will avoid potential deadlocks
     * in Coherence.
Gaston Dombiak's avatar
Gaston Dombiak committed
226
     */
227
    public static void fireMarkedAsSeniorClusterMember() {
228
        try {
229
            events.put(new Event(EventType.marked_senior_cluster_member, null));
230 231
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
232 233 234 235 236 237
        }
    }

    /**
     * Starts the cluster service if clustering is enabled. The process of starting clustering
     * will recreate caches as distributed caches.<p>
238 239
     *
     * Before starting a cluster the
Gaston Dombiak's avatar
Gaston Dombiak committed
240 241 242 243
     * {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and
     * {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)}
     * need to be properly configured.
     */
244
    public static synchronized void startup() {
Gaston Dombiak's avatar
Gaston Dombiak committed
245 246 247 248 249 250 251 252 253 254 255 256
        if (isClusteringStarted()) {
            return;
        }
        // See if clustering should be enabled.
        if (isClusteringEnabled()) {
            if (XMPPServer.getInstance().getRemoteSessionLocator() == null) {
                throw new IllegalStateException("No RemoteSessionLocator was found.");
            }
            if (XMPPServer.getInstance().getRoutingTable().getRemotePacketRouter() == null) {
                throw new IllegalStateException("No RemotePacketRouter was found.");
            }
            // Start up the cluster and reset caches
257
            CacheFactory.startClustering();
Gaston Dombiak's avatar
Gaston Dombiak committed
258 259 260 261 262 263 264 265 266 267 268 269 270 271
        }
    }

    /**
     * Shuts down the clustering service. This method should be called when the Jive
     * system is shutting down, and must not be called otherwise. Failing to call
     * this method may temporarily impact cluster performance, as the system will
     * have to do extra work to recover from a non-clean shutdown.
     * If clustering is not enabled, this method will do nothing.
     */
    public static synchronized void shutdown() {
        // Reset packet router to use to deliver packets to remote cluster nodes
        XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
        if (isClusteringStarted()) {
272
            Log.debug("ClusterManager: Shutting down clustered cache service.");
273
            CacheFactory.stopClustering();
Gaston Dombiak's avatar
Gaston Dombiak committed
274
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
275 276
        // Reset the session locator to use
        XMPPServer.getInstance().setRemoteSessionLocator(null);
Gaston Dombiak's avatar
Gaston Dombiak committed
277 278 279
    }

    /**
280 281 282
     * Sets true if clustering support is enabled. An attempt to start or join
     * an existing cluster will be attempted in the service was enabled. On the
     * other hand, if disabled then this JVM will leave or stop the cluster.
Gaston Dombiak's avatar
Gaston Dombiak committed
283 284 285 286
     *
     * @param enabled if clustering support is enabled.
     */
    public static void setClusteringEnabled(boolean enabled) {
Gaston Dombiak's avatar
Gaston Dombiak committed
287 288 289 290 291 292 293 294 295 296 297
        if (enabled) {
            // Check that clustering is not already enabled and we are already in a cluster
            if (isClusteringEnabled() && isClusteringStarted()) {
                return;
            }
        }
        else {
            // Check that clustering is disabled
            if (!isClusteringEnabled()) {
                return;
            }
298
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
299
        JiveGlobals.setXMLProperty(CLUSTER_PROPERTY_NAME, Boolean.toString(enabled));
300
        if (!enabled) {
Gaston Dombiak's avatar
Gaston Dombiak committed
301
            shutdown();
302 303 304 305 306
        }
        else {
            // Reload Jive properties. This will ensure that this nodes copy of the
            // properties starts correct.
           JiveProperties.getInstance().init();
Gaston Dombiak's avatar
Gaston Dombiak committed
307
           startup();
308
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
309 310 311 312 313 314 315 316 317 318 319 320
    }

    /**
     * Returns true if clustering support is enabled. This does not mean
     * that clustering has started or that clustering will be able to start.
     *
     * @return true if clustering support is enabled.
     */
    public static boolean isClusteringEnabled() {
        return JiveGlobals.getXMLProperty(CLUSTER_PROPERTY_NAME, false);
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
321
    /**
Gaston Dombiak's avatar
Gaston Dombiak committed
322 323 324 325
     * Returns true if clustering is installed and can be used by this JVM
     * to join a cluster. A false value could mean that either clustering
     * support is not available or the license does not allow to have more
     * than 1 cluster node.
Gaston Dombiak's avatar
Gaston Dombiak committed
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
     *
     * @return true if clustering is installed and can be used by 
     * this JVM to join a cluster.
     */
    public static boolean isClusteringAvailable() {
        return CacheFactory.isClusteringAvailable();
    }

    /**
     * Returns true is clustering is currently being started. Once the cluster
     * is started or failed to be started this value will be false.
     *
     * @return true is clustering is currently being started.
     */
    public static boolean isClusteringStarting() {
        return CacheFactory.isClusteringStarting();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
    /**
     * Returns true if this JVM is part of a cluster. The cluster may have many nodes
     * or this JVM could be the only node.
     *
     * @return true if this JVM is part of a cluster.
     */
    public static boolean isClusteringStarted() {
        return CacheFactory.isClusteringStarted();
    }

    /**
     * Returns true if this member is the senior member in the cluster. If clustering
     * is not enabled, this method will also return true. This test is useful for
     * tasks that should only be run on a single member in a cluster.
     *
     * @return true if this cluster member is the senior or if clustering is not enabled.
     */
    public static boolean isSeniorClusterMember() {
        return CacheFactory.isSeniorClusterMember();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
365 366 367 368 369 370 371 372 373 374 375
    /**
     * Returns basic information about the current members of the cluster or an empty
     * collection if not running in a cluster.
     *
     * @return information about the current members of the cluster or an empty
     *         collection if not running in a cluster.
     */
    public static Collection<ClusterNodeInfo> getNodesInfo() {
        return CacheFactory.getClusterNodesInfo();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
376
    /**
Gaston Dombiak's avatar
Gaston Dombiak committed
377 378 379
     * Returns the maximum number of cluster members allowed. Both values 0 and 1 mean that clustering
     * is not available. However, a value of 1 means that it's a license problem rather than not having
     * the ability to do clustering as defined with value 0.
Gaston Dombiak's avatar
Gaston Dombiak committed
380
     *
Gaston Dombiak's avatar
Gaston Dombiak committed
381
     * @return the maximum number of cluster members allowed or 0 or 1 if clustering is not allowed.
Gaston Dombiak's avatar
Gaston Dombiak committed
382 383 384 385 386
     */
    public static int getMaxClusterNodes() {
        return CacheFactory.getMaxClusterNodes();
    }

387 388 389 390 391 392 393 394 395 396 397 398 399 400
    /**
     * Returns the id of the node that is the senior cluster member. When not in a cluster the returned
     * node id will be the {@link XMPPServer#getNodeID()}.
     *
     * @return the id of the node that is the senior cluster member.
     */
    public static NodeID getSeniorClusterMember() {
        byte[] clusterMemberID = CacheFactory.getSeniorClusterMemberID();
        if (clusterMemberID == null) {
            return XMPPServer.getInstance().getNodeID();
        }
        return NodeID.getInstance(clusterMemberID);
    }

401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
    /**
     * Returns true if the specified node ID belongs to a known cluster node
     * of this cluster.
     *
     * @param nodeID the ID of the node to verify.
     * @return true if the specified node ID belongs to a known cluster node
     *         of this cluster.
     */
    public static boolean isClusterMember(byte[] nodeID) {
        for (ClusterNodeInfo nodeInfo : getNodesInfo()) {
            if (nodeInfo.getNodeID().equals(nodeID)) {
                return true;
            }
        }
        return false;
    }

418 419
    private static class Event {
        private EventType type;
420
        private byte[] nodeID;
421 422 423 424
        private boolean processed;

        public Event(EventType type, byte[] oldNodeID) {
            this.type = type;
425
            this.nodeID = oldNodeID;
426 427 428 429 430 431
        }

        public EventType getType() {
            return type;
        }

432 433
        public byte[] getNodeID() {
            return nodeID;
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
        }

        public boolean isProcessed() {
            return processed;
        }

        public void setProcessed(boolean processed) {
            this.processed = processed;
        }

        public String toString() {
            return super.toString() + " type: " + type;
        }
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
449 450 451
    /**
     * Represents valid event types.
     */
452
    private enum EventType {
Gaston Dombiak's avatar
Gaston Dombiak committed
453 454 455 456 457 458

        /**
         * This JVM joined a cluster.
         */
        joined_cluster,

459 460
        /**
         * This JVM is no longer part of the cluster.
Gaston Dombiak's avatar
Gaston Dombiak committed
461 462 463 464
         */
        left_cluster,

        /**
465
         * This JVM is now the senior cluster member.
Gaston Dombiak's avatar
Gaston Dombiak committed
466 467 468 469
         */
        marked_senior_cluster_member
    }
}