ClusterManager.java 18.3 KB
Newer Older
Gaston Dombiak's avatar
Gaston Dombiak committed
1 2 3 4 5
/**
 * $RCSfile: $
 * $Revision: $
 * $Date: $
 *
6
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
Gaston Dombiak's avatar
Gaston Dombiak committed
7
 *
8 9 10 11 12 13 14 15 16 17 18
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
Gaston Dombiak's avatar
Gaston Dombiak committed
19 20 21 22
 */

package org.jivesoftware.openfire.cluster;

Gaston Dombiak's avatar
Gaston Dombiak committed
23
import java.util.Collection;
Gaston Dombiak's avatar
Gaston Dombiak committed
24
import java.util.Queue;
25
import java.util.concurrent.BlockingQueue;
Gaston Dombiak's avatar
Gaston Dombiak committed
26
import java.util.concurrent.ConcurrentLinkedQueue;
27
import java.util.concurrent.LinkedBlockingQueue;
Gaston Dombiak's avatar
Gaston Dombiak committed
28

29 30 31 32 33 34 35
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.JiveProperties;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Gaston Dombiak's avatar
Gaston Dombiak committed
36 37 38 39 40 41 42
/**
 * A cluster manager is responsible for triggering events related to clustering.
 * A future version will also provide statistics about the cluster.
 *
 * @author Gaston Dombiak
 */
public class ClusterManager {
43 44 45
	
	private static final Logger Log = LoggerFactory.getLogger(ClusterManager.class);

Gaston Dombiak's avatar
Gaston Dombiak committed
46
    public static String CLUSTER_PROPERTY_NAME = "clustering.enabled";
Gaston Dombiak's avatar
Gaston Dombiak committed
47
    private static Queue<ClusterEventListener> listeners = new ConcurrentLinkedQueue<ClusterEventListener>();
guus's avatar
guus committed
48
    private static BlockingQueue<Event> events = new LinkedBlockingQueue<Event>(10000);
49 50 51

    static {
        Thread thread = new Thread("ClusterManager events dispatcher") {
52 53
            @Override
			public void run() {
54 55 56 57 58
                for (; ;) {
                    try {
                        Event event = events.take();
                        EventType eventType = event.getType();
                        // Make sure that CacheFactory is getting this events first (to update cache structure)
59
                        if (eventType == EventType.joined_cluster && event.getNodeID() == null) {
60 61 62 63 64 65 66 67
                            // Replace standalone caches with clustered caches. Local cached data is not moved.
                            CacheFactory.joinedCluster();
                        }
                        // Now notify rest of the listeners
                        for (ClusterEventListener listener : listeners) {
                            try {
                                switch (eventType) {
                                    case joined_cluster: {
68 69 70 71 72 73
                                        if (event.getNodeID() == null) {
                                            listener.joinedCluster();
                                        }
                                        else {
                                            listener.joinedCluster(event.getNodeID());
                                        }
74 75 76
                                        break;
                                    }
                                    case left_cluster: {
77 78 79 80 81 82
                                        if (event.getNodeID() == null) {
                                            listener.leftCluster();
                                        }
                                        else {
                                            listener.leftCluster(event.getNodeID());
                                        }
83 84 85 86 87 88 89 90
                                        break;
                                    }
                                    case marked_senior_cluster_member: {
                                        listener.markedAsSeniorClusterMember();
                                        break;
                                    }
                                    default:
                                        break;
91
                                }
92 93
                            }
                            catch (Exception e) {
94
                                Log.error(e.getMessage(), e);
95 96
                            }
                        }
97 98 99
                        // Mark event as processed
                        event.setProcessed(true);
                    } catch (InterruptedException e) {
100
                        Log.warn(e.getMessage(), e);
101
                    } catch (Exception e) {
102
                        Log.error(e.getMessage(), e);
103 104 105 106 107 108 109
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }
Gaston Dombiak's avatar
Gaston Dombiak committed
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133

    /**
     * Registers a listener to receive events.
     *
     * @param listener the listener.
     */
    public static void addListener(ClusterEventListener listener) {
        if (listener == null) {
            throw new NullPointerException();
        }
        listeners.add(listener);
    }

    /**
     * Unregisters a listener to receive events.
     *
     * @param listener the listener.
     */
    public static void removeListener(ClusterEventListener listener) {
        listeners.remove(listener);
    }


    /**
134 135 136 137 138 139 140
     * Triggers event indicating that this JVM is now part of a cluster. At this point the
     * {@link org.jivesoftware.openfire.XMPPServer#getNodeID()} holds the new nodeID value and
     * the old nodeID value is passed in case the listener needs it.<p>
     * <p/>
     * When joining the cluster as the senior cluster member the {@link #fireMarkedAsSeniorClusterMember()}
     * event will be sent right after this event.<p>
     * <p/>
141
     * This event will be triggered in another thread. This will avoid potential deadlocks
142
     * in Coherence.
Gaston Dombiak's avatar
Gaston Dombiak committed
143
     *
144 145
     * @param asynchronous true if event will be triggered in background
     */
146
    public static void fireJoinedCluster(boolean asynchronous) {
147
        try {
148
            Event event = new Event(EventType.joined_cluster, null);
149 150 151
            events.put(event);
            if (!asynchronous) {
                while (!event.isProcessed()) {
152
                    Thread.sleep(50);
153 154 155 156
                }
            }
        } catch (InterruptedException e) {
            // Should never happen
157
            Log.error(e.getMessage(), e);
158 159 160 161
        }
    }

    /**
162 163 164 165 166 167 168
     * Triggers event indicating that another JVM is now part of a cluster.<p>
     *
     * This event will be triggered in another thread. This will avoid potential deadlocks
     * in Coherence.
     *
     * @param nodeID    nodeID assigned to the JVM when joining the cluster.
     * @param asynchronous true if event will be triggered in background
169
     */
170
    public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) {
171
        try {
172
            Event event = new Event(EventType.joined_cluster, nodeID);
173
            events.put(event);
174 175 176 177
            if (!asynchronous) {
                while (!event.isProcessed()) {
                    Thread.sleep(50);
                }
178
            }
179 180
        } catch (InterruptedException e) {
            // Should never happen
181
            Log.error(e.getMessage(), e);
182 183 184 185 186 187 188
        }
    }

    /**
     * Triggers event indicating that this JVM is no longer part of the cluster. This could
     * happen when disabling clustering support or removing the enterprise plugin that provides
     * clustering support.<p>
Gaston Dombiak's avatar
Gaston Dombiak committed
189
     *
190 191 192 193 194
     * Moreover, if we were in a "split brain" scenario (ie. separated cluster islands) and the
     * island were this JVM belonged was marked as "old" then all nodes of that island will
     * get the <tt>left cluster event</tt> and <tt>joined cluster events</tt>. That means that
     * caches will be reset and thus will need to be repopulated again with fresh data from this JVM.
     * This also includes the case where this JVM was the senior cluster member and when the islands
Gaston Dombiak's avatar
Gaston Dombiak committed
195 196 197 198 199 200 201 202 203
     * met again then this JVM stopped being the senior member.
     */
    public static void fireLeftCluster() {
        // Now notify rest of the listeners
        for (ClusterEventListener listener : listeners) {
            try {
                listener.leftCluster();
            }
            catch (Exception e) {
204
                Log.error(e.getMessage(), e);
Gaston Dombiak's avatar
Gaston Dombiak committed
205 206 207 208 209 210 211 212
            }
        }
    }

    /**
     * Triggers event indicating that another JVM is no longer part of the cluster. This could
     * happen when disabling clustering support or removing the enterprise plugin that provides
     * clustering support.
213
     *
Gaston Dombiak's avatar
Gaston Dombiak committed
214
     * @param nodeID    nodeID assigned to the JVM when joining the cluster.
215
     */
Gaston Dombiak's avatar
Gaston Dombiak committed
216
    public static void fireLeftCluster(byte[] nodeID) {
217
        try {
Gaston Dombiak's avatar
Gaston Dombiak committed
218
            Event event = new Event(EventType.left_cluster, nodeID);
219
            events.put(event);
220 221
        } catch (InterruptedException e) {
            // Should never happen
222
            Log.error(e.getMessage(), e);
223 224 225 226 227 228 229 230 231 232 233 234
        }
    }

    /**
     * Triggers event indicating that this JVM is now the senior cluster member. This
     * could either happen when initially joining the cluster or when the senior cluster
     * member node left the cluster and this JVM was marked as the new senior cluster member.<p>
     * <p/>
     * Moreover, in the case of a "split brain" scenario (ie. separated cluster islands) each
     * island will have its own senior cluster member. However, when the islands meet again there
     * could only be one senior cluster member so one of the senior cluster members will stop playing
     * that role. When that happens the JVM no longer playing that role will receive the
Gaston Dombiak's avatar
Gaston Dombiak committed
235
     * {@link #fireLeftCluster()} and {@link #fireJoinedCluster(boolean)} events.<p>
236 237 238
     * <p/>
     * This event will be triggered in another thread. This will avoid potential deadlocks
     * in Coherence.
Gaston Dombiak's avatar
Gaston Dombiak committed
239
     */
240
    public static void fireMarkedAsSeniorClusterMember() {
241
        try {
242
            events.put(new Event(EventType.marked_senior_cluster_member, null));
243 244
        } catch (InterruptedException e) {
            // Should never happen
Gaston Dombiak's avatar
Gaston Dombiak committed
245 246 247 248 249 250
        }
    }

    /**
     * Starts the cluster service if clustering is enabled. The process of starting clustering
     * will recreate caches as distributed caches.<p>
251 252
     *
     * Before starting a cluster the
Gaston Dombiak's avatar
Gaston Dombiak committed
253 254 255 256
     * {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and
     * {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)}
     * need to be properly configured.
     */
257
    public static synchronized void startup() {
Gaston Dombiak's avatar
Gaston Dombiak committed
258 259 260 261 262 263 264 265 266 267 268 269
        if (isClusteringStarted()) {
            return;
        }
        // See if clustering should be enabled.
        if (isClusteringEnabled()) {
            if (XMPPServer.getInstance().getRemoteSessionLocator() == null) {
                throw new IllegalStateException("No RemoteSessionLocator was found.");
            }
            if (XMPPServer.getInstance().getRoutingTable().getRemotePacketRouter() == null) {
                throw new IllegalStateException("No RemotePacketRouter was found.");
            }
            // Start up the cluster and reset caches
270
            CacheFactory.startClustering();
Gaston Dombiak's avatar
Gaston Dombiak committed
271 272 273 274 275 276 277 278 279 280 281 282 283 284
        }
    }

    /**
     * Shuts down the clustering service. This method should be called when the Jive
     * system is shutting down, and must not be called otherwise. Failing to call
     * this method may temporarily impact cluster performance, as the system will
     * have to do extra work to recover from a non-clean shutdown.
     * If clustering is not enabled, this method will do nothing.
     */
    public static synchronized void shutdown() {
        // Reset packet router to use to deliver packets to remote cluster nodes
        XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
        if (isClusteringStarted()) {
285
            Log.debug("ClusterManager: Shutting down clustered cache service.");
286
            CacheFactory.stopClustering();
Gaston Dombiak's avatar
Gaston Dombiak committed
287
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
288 289
        // Reset the session locator to use
        XMPPServer.getInstance().setRemoteSessionLocator(null);
Gaston Dombiak's avatar
Gaston Dombiak committed
290 291 292
    }

    /**
293 294 295
     * Sets true if clustering support is enabled. An attempt to start or join
     * an existing cluster will be attempted in the service was enabled. On the
     * other hand, if disabled then this JVM will leave or stop the cluster.
Gaston Dombiak's avatar
Gaston Dombiak committed
296 297 298 299
     *
     * @param enabled if clustering support is enabled.
     */
    public static void setClusteringEnabled(boolean enabled) {
Gaston Dombiak's avatar
Gaston Dombiak committed
300 301 302 303 304 305 306 307 308 309 310
        if (enabled) {
            // Check that clustering is not already enabled and we are already in a cluster
            if (isClusteringEnabled() && isClusteringStarted()) {
                return;
            }
        }
        else {
            // Check that clustering is disabled
            if (!isClusteringEnabled()) {
                return;
            }
311
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
312
        JiveGlobals.setXMLProperty(CLUSTER_PROPERTY_NAME, Boolean.toString(enabled));
313
        if (!enabled) {
Gaston Dombiak's avatar
Gaston Dombiak committed
314
            shutdown();
315 316 317 318 319
        }
        else {
            // Reload Jive properties. This will ensure that this nodes copy of the
            // properties starts correct.
           JiveProperties.getInstance().init();
Gaston Dombiak's avatar
Gaston Dombiak committed
320
           startup();
321
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
322 323 324 325 326 327 328 329 330 331 332 333
    }

    /**
     * Returns true if clustering support is enabled. This does not mean
     * that clustering has started or that clustering will be able to start.
     *
     * @return true if clustering support is enabled.
     */
    public static boolean isClusteringEnabled() {
        return JiveGlobals.getXMLProperty(CLUSTER_PROPERTY_NAME, false);
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
334
    /**
Gaston Dombiak's avatar
Gaston Dombiak committed
335 336 337 338
     * Returns true if clustering is installed and can be used by this JVM
     * to join a cluster. A false value could mean that either clustering
     * support is not available or the license does not allow to have more
     * than 1 cluster node.
Gaston Dombiak's avatar
Gaston Dombiak committed
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
     *
     * @return true if clustering is installed and can be used by 
     * this JVM to join a cluster.
     */
    public static boolean isClusteringAvailable() {
        return CacheFactory.isClusteringAvailable();
    }

    /**
     * Returns true is clustering is currently being started. Once the cluster
     * is started or failed to be started this value will be false.
     *
     * @return true is clustering is currently being started.
     */
    public static boolean isClusteringStarting() {
        return CacheFactory.isClusteringStarting();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
    /**
     * Returns true if this JVM is part of a cluster. The cluster may have many nodes
     * or this JVM could be the only node.
     *
     * @return true if this JVM is part of a cluster.
     */
    public static boolean isClusteringStarted() {
        return CacheFactory.isClusteringStarted();
    }

    /**
     * Returns true if this member is the senior member in the cluster. If clustering
     * is not enabled, this method will also return true. This test is useful for
     * tasks that should only be run on a single member in a cluster.
     *
     * @return true if this cluster member is the senior or if clustering is not enabled.
     */
    public static boolean isSeniorClusterMember() {
        return CacheFactory.isSeniorClusterMember();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
378 379 380 381 382 383 384 385 386 387 388
    /**
     * Returns basic information about the current members of the cluster or an empty
     * collection if not running in a cluster.
     *
     * @return information about the current members of the cluster or an empty
     *         collection if not running in a cluster.
     */
    public static Collection<ClusterNodeInfo> getNodesInfo() {
        return CacheFactory.getClusterNodesInfo();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
389
    /**
Gaston Dombiak's avatar
Gaston Dombiak committed
390 391 392
     * Returns the maximum number of cluster members allowed. Both values 0 and 1 mean that clustering
     * is not available. However, a value of 1 means that it's a license problem rather than not having
     * the ability to do clustering as defined with value 0.
Gaston Dombiak's avatar
Gaston Dombiak committed
393
     *
Gaston Dombiak's avatar
Gaston Dombiak committed
394
     * @return the maximum number of cluster members allowed or 0 or 1 if clustering is not allowed.
Gaston Dombiak's avatar
Gaston Dombiak committed
395 396 397 398 399
     */
    public static int getMaxClusterNodes() {
        return CacheFactory.getMaxClusterNodes();
    }

400 401 402 403 404 405 406 407 408 409 410 411 412 413
    /**
     * Returns the id of the node that is the senior cluster member. When not in a cluster the returned
     * node id will be the {@link XMPPServer#getNodeID()}.
     *
     * @return the id of the node that is the senior cluster member.
     */
    public static NodeID getSeniorClusterMember() {
        byte[] clusterMemberID = CacheFactory.getSeniorClusterMemberID();
        if (clusterMemberID == null) {
            return XMPPServer.getInstance().getNodeID();
        }
        return NodeID.getInstance(clusterMemberID);
    }

414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
    /**
     * Returns true if the specified node ID belongs to a known cluster node
     * of this cluster.
     *
     * @param nodeID the ID of the node to verify.
     * @return true if the specified node ID belongs to a known cluster node
     *         of this cluster.
     */
    public static boolean isClusterMember(byte[] nodeID) {
        for (ClusterNodeInfo nodeInfo : getNodesInfo()) {
            if (nodeInfo.getNodeID().equals(nodeID)) {
                return true;
            }
        }
        return false;
    }

431 432
    private static class Event {
        private EventType type;
433
        private byte[] nodeID;
434 435 436 437
        private boolean processed;

        public Event(EventType type, byte[] oldNodeID) {
            this.type = type;
438
            this.nodeID = oldNodeID;
439 440 441 442 443 444
        }

        public EventType getType() {
            return type;
        }

445 446
        public byte[] getNodeID() {
            return nodeID;
447 448 449 450 451 452 453 454 455 456
        }

        public boolean isProcessed() {
            return processed;
        }

        public void setProcessed(boolean processed) {
            this.processed = processed;
        }

457 458
        @Override
		public String toString() {
459 460 461 462
            return super.toString() + " type: " + type;
        }
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
463 464 465
    /**
     * Represents valid event types.
     */
466
    private enum EventType {
Gaston Dombiak's avatar
Gaston Dombiak committed
467 468 469 470 471 472

        /**
         * This JVM joined a cluster.
         */
        joined_cluster,

473 474
        /**
         * This JVM is no longer part of the cluster.
Gaston Dombiak's avatar
Gaston Dombiak committed
475 476 477 478
         */
        left_cluster,

        /**
479
         * This JVM is now the senior cluster member.
Gaston Dombiak's avatar
Gaston Dombiak committed
480 481 482 483
         */
        marked_senior_cluster_member
    }
}