CacheFactory.java 30.4 KB
Newer Older
1 2 3 4
/**
 * $Revision$
 * $Date$
 *
5
 * Copyright (C) 1999-2008 Jive Software. All rights reserved.
6 7 8 9
 * This software is the proprietary information of Jive Software. Use is subject to license terms.
 */
package org.jivesoftware.util.cache;

10 11
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.XMPPServerListener;
Gaston Dombiak's avatar
Gaston Dombiak committed
12 13
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
Gaston Dombiak's avatar
Gaston Dombiak committed
14
import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
15 16
import org.jivesoftware.openfire.container.Plugin;
import org.jivesoftware.openfire.container.PluginClassLoader;
17 18
import org.jivesoftware.openfire.container.PluginManager;
import org.jivesoftware.util.InitializationException;
19
import org.jivesoftware.util.JiveConstants;
20 21
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
22

23
import java.util.*;
24
import java.util.concurrent.ConcurrentHashMap;
25
import java.util.concurrent.locks.Lock;
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40

/**
 * Creates Cache objects. The returned caches will either be local or clustered
 * depending on the clustering enabled setting and a user's license.<p>
 * <p/>
 * When clustered caching is turned on, cache usage statistics for all caches
 * that have been created are periodically published to the clustered cache
 * named "opt-$cacheStats".
 *
 */
public class CacheFactory {

    public static String LOCAL_CACHE_PROPERTY_NAME = "cache.clustering.local.class";
    public static String CLUSTERED_CACHE_PROPERTY_NAME = "cache.clustering.clustered.class";

41
    private static boolean clusteringStarted = false;
Gaston Dombiak's avatar
Gaston Dombiak committed
42
    private static boolean clusteringStarting = false;
43 44 45 46 47 48 49 50 51

    /**
     * Storage for all caches that get created.
     */
    private static Map<String, Cache> caches = new ConcurrentHashMap<String, Cache>();

    private static String localCacheFactoryClass;
    private static String clusteredCacheFactoryClass;
    private static CacheFactoryStrategy cacheFactoryStrategy;
52
    private static Thread statsThread;
53

54 55 56 57 58 59 60 61 62 63 64 65 66 67
    public static final int DEFAULT_MAX_CACHE_SIZE = 1024 * 256;
    public static final long DEFAULT_MAX_CACHE_LIFETIME = 6 * JiveConstants.HOUR;

    /**
     * This map contains property names which were used to store cache configuration data
     * in local xml properties in previous versions.
     */
    private static final Map<String, String> cacheNames = new HashMap<String, String>();
    /**
     * Default properties to use for local caches. Default properties can be overridden
     * by setting the corresponding system properties.
     */
    private static final Map<String, Long> cacheProps = new HashMap<String, Long>();

68 69
    static {
        localCacheFactoryClass = JiveGlobals.getProperty(LOCAL_CACHE_PROPERTY_NAME,
70
                "org.jivesoftware.util.cache.DefaultLocalCacheStrategy");
71
        clusteredCacheFactoryClass = JiveGlobals.getProperty(CLUSTERED_CACHE_PROPERTY_NAME,
72
                "com.jivesoftware.util.cache.CoherenceClusteredCacheFactory");
73 74 75 76 77 78 79 80 81 82 83 84 85 86

        cacheNames.put("Favicon Hits", "faviconHits");
        cacheNames.put("Favicon Misses", "faviconMisses");
        cacheNames.put("Group", "group");
        cacheNames.put("Group Metadata Cache", "groupMeta");
        cacheNames.put("Javascript Cache", "javascript");
        cacheNames.put("Last Activity Cache", "lastActivity");
        cacheNames.put("Multicast Service", "multicast");
        cacheNames.put("Offline Message Size", "offlinemessage");
        cacheNames.put("Offline Presence Cache", "offlinePresence");
        cacheNames.put("Privacy Lists", "listsCache");
        cacheNames.put("Remote Users Existence", "remoteUsersCache");
        cacheNames.put("Roster", "username2roster");
        cacheNames.put("User", "userCache");
87
        cacheNames.put("Locked Out Accounts", "lockOutCache");
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
        cacheNames.put("VCard", "vcardCache");
        cacheNames.put("File Transfer Cache", "fileTransfer");
        cacheNames.put("File Transfer", "transferProxy");
        cacheNames.put("POP3 Authentication", "pop3");
        cacheNames.put("LDAP Authentication", "ldap");
        cacheNames.put("Routing Servers Cache", "routeServer");
        cacheNames.put("Routing Components Cache", "routeComponent");
        cacheNames.put("Routing Users Cache", "routeUser");
        cacheNames.put("Routing AnonymousUsers Cache", "routeAnonymousUser");
        cacheNames.put("Routing User Sessions", "routeUserSessions");
        cacheNames.put("Components Sessions", "componentsSessions");
        cacheNames.put("Connection Managers Sessions", "connManagerSessions");
        cacheNames.put("Incoming Server Sessions", "incServerSessions");
        cacheNames.put("Sessions by Hostname", "sessionsHostname");
        cacheNames.put("Secret Keys Cache", "secretKeys");
        cacheNames.put("Validated Domains", "validatedDomains");
        cacheNames.put("Directed Presences", "directedPresences");
        cacheNames.put("Disco Server Features", "serverFeatures");
        cacheNames.put("Disco Server Items", "serverItems");
        cacheNames.put("Remote Server Configurations", "serversConfigurations");
        cacheNames.put("Entity Capabilities", "entityCapabilities");
        cacheNames.put("Entity Capabilities Users", "entityCapabilitiesUsers");
        cacheNames.put("Entity Capabilities Pending Hashes", "entityCapabilitiesPendingHashes");
111
        cacheNames.put("Clearspace SSO Nonce", "clearspaceSSONonce");
112 113 114 115 116 117 118 119 120 121 122 123 124

        cacheProps.put("cache.fileTransfer.size", 128 * 1024l);
        cacheProps.put("cache.fileTransfer.maxLifetime", 1000 * 60 * 10l);
        cacheProps.put("cache.multicast.size", 128 * 1024l);
        cacheProps.put("cache.multicast.maxLifetime", JiveConstants.DAY);
        cacheProps.put("cache.offlinemessage.size", 100 * 1024l);
        cacheProps.put("cache.offlinemessage.maxLifetime", JiveConstants.HOUR * 12);
        cacheProps.put("cache.pop3.size", 512 * 1024l);
        cacheProps.put("cache.pop3.maxLifetime", JiveConstants.HOUR);
        cacheProps.put("cache.transferProxy.size", -1l);
        cacheProps.put("cache.transferProxy.maxLifetime", 1000 * 60 * 10l);
        cacheProps.put("cache.group.size", 1024 * 1024l);
        cacheProps.put("cache.group.maxLifetime", JiveConstants.MINUTE * 15);
125 126
        cacheProps.put("cache.lockOutCache.size", 1024 * 1024l);
        cacheProps.put("cache.lockOutCache.maxLifetime", JiveConstants.MINUTE * 15);
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
        cacheProps.put("cache.groupMeta.size", 512 * 1024l);
        cacheProps.put("cache.groupMeta.maxLifetime", JiveConstants.MINUTE * 15);
        cacheProps.put("cache.javascript.size", 128 * 1024l);
        cacheProps.put("cache.javascript.maxLifetime", 3600 * 24 * 10l);
        cacheProps.put("cache.ldap.size", 512 * 1024l);
        cacheProps.put("cache.ldap.maxLifetime", JiveConstants.HOUR * 2);
        cacheProps.put("cache.listsCache.size", 512 * 1024l);
        cacheProps.put("cache.offlinePresence.size", 512 * 1024l);
        cacheProps.put("cache.lastActivity.size", 128 * 1024l);
        cacheProps.put("cache.userCache.size", 512 * 1024l);
        cacheProps.put("cache.userCache.maxLifetime", JiveConstants.MINUTE * 30);
        cacheProps.put("cache.remoteUsersCache.size", 512 * 1024l);
        cacheProps.put("cache.remoteUsersCache.maxLifetime", JiveConstants.MINUTE * 30);
        cacheProps.put("cache.vcardCache.size", 512 * 1024l);
        cacheProps.put("cache.faviconHits.size", 128 * 1024l);
        cacheProps.put("cache.faviconMisses.size", 128 * 1024l);
        cacheProps.put("cache.routeServer.size", -1l);
        cacheProps.put("cache.routeServer.maxLifetime", -1l);
        cacheProps.put("cache.routeComponent.size", -1l);
        cacheProps.put("cache.routeComponent.maxLifetime", -1l);
        cacheProps.put("cache.routeUser.size", -1l);
        cacheProps.put("cache.routeUser.maxLifetime", -1l);
        cacheProps.put("cache.routeAnonymousUser.size", -1l);
        cacheProps.put("cache.routeAnonymousUser.maxLifetime", -1l);
        cacheProps.put("cache.routeUserSessions.size", -1l);
        cacheProps.put("cache.routeUserSessions.maxLifetime", -1l);
        cacheProps.put("cache.componentsSessions.size", -1l);
        cacheProps.put("cache.componentsSessions.maxLifetime", -1l);
        cacheProps.put("cache.connManagerSessions.size", -1l);
        cacheProps.put("cache.connManagerSessions.maxLifetime", -1l);
        cacheProps.put("cache.incServerSessions.size", -1l);
        cacheProps.put("cache.incServerSessions.maxLifetime", -1l);
        cacheProps.put("cache.sessionsHostname.size", -1l);
        cacheProps.put("cache.sessionsHostname.maxLifetime", -1l);
        cacheProps.put("cache.secretKeys.size", -1l);
        cacheProps.put("cache.secretKeys.maxLifetime", -1l);
        cacheProps.put("cache.validatedDomains.size", -1l);
        cacheProps.put("cache.validatedDomains.maxLifetime", -1l);
        cacheProps.put("cache.directedPresences.size", -1l);
        cacheProps.put("cache.directedPresences.maxLifetime", -1l);
        cacheProps.put("cache.serverFeatures.size", -1l);
        cacheProps.put("cache.serverFeatures.maxLifetime", -1l);
        cacheProps.put("cache.serverItems.size", -1l);
        cacheProps.put("cache.serverItems.maxLifetime", -1l);
        cacheProps.put("cache.serversConfigurations.size", 128 * 1024l);
        cacheProps.put("cache.serversConfigurations.maxLifetime", JiveConstants.MINUTE * 30);
        cacheProps.put("cache.entityCapabilities.size", -1l);
        cacheProps.put("cache.entityCapabilities.maxLifetime", JiveConstants.DAY * 2);
        cacheProps.put("cache.entityCapabilitiesUsers.size", -1l);
        cacheProps.put("cache.entityCapabilitiesUsers.maxLifetime", JiveConstants.DAY * 2);
        cacheProps.put("cache.entityCapabilitiesPendingHashes.size", -1l);
        cacheProps.put("cache.entityCapabilitiesPendingHashes.maxLifetime", JiveConstants.DAY * 2);
        cacheProps.put("cache.pluginCacheInfo.size", -1l);
        cacheProps.put("cache.pluginCacheInfo.maxLifetime", -1l);
181 182
        cacheProps.put("cache.clearspaceSSONonce.size", -1l);
        cacheProps.put("cache.clearspaceSSONonce.maxLifetime", JiveConstants.MINUTE * 2);
183 184 185 186 187
    }

    private CacheFactory() {
    }

188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
    /**
     * If a local property is found for the supplied name which specifies a value for cache size, it is returned.
     * Otherwise, the defaultSize argument is returned.
     *
     * @param cacheName the name of the cache to look up a corresponding property for.
     * @return either the property value or the default value.
     */
    public static long getMaxCacheSize(String cacheName) {
        return getCacheProperty(cacheName, ".size", DEFAULT_MAX_CACHE_SIZE);
    }

    /**
     * Sets a local property which overrides the maximum cache size as configured in coherence-cache-config.xml for the
     * supplied cache name.
     * @param cacheName the name of the cache to store a value for.
     * @param size the maximum cache size.
     */
    public static void setMaxSizeProperty(String cacheName, long size) {
        cacheName = cacheName.replaceAll(" ", "");
        JiveGlobals.setProperty("cache." + cacheName + ".size", Long.toString(size));
    }

    public static boolean hasMaxSizeFromProperty(String cacheName) {
        return hasCacheProperty(cacheName, ".size");
    }

    /**
    * If a local property is found for the supplied name which specifies a value for cache entry lifetime, it
     * is returned. Otherwise, the defaultLifetime argument is returned.
     *
    * @param cacheName the name of the cache to look up a corresponding property for.
    * @return either the property value or the default value.
    */
    public static long getMaxCacheLifetime(String cacheName) {
        return getCacheProperty(cacheName, ".maxLifetime", DEFAULT_MAX_CACHE_LIFETIME);
    }

    /**
     * Sets a local property which overrides the maximum cache entry lifetime as configured in coherence-cache-config.xml
     * for the supplied cache name.
     * @param cacheName the name of the cache to store a value for.
     * @param lifetime the maximum cache entry lifetime.
     */
    public static void setMaxLifetimeProperty(String cacheName, long lifetime) {
        cacheName = cacheName.replaceAll(" ", "");
        JiveGlobals.setProperty(("cache." + cacheName + ".maxLifetime"), Long.toString(lifetime));
    }

    public static boolean hasMaxLifetimeFromProperty(String cacheName) {
        return hasCacheProperty(cacheName, ".maxLifetime");
    }

    public static void setCacheTypeProperty(String cacheName, String type) {
        cacheName = cacheName.replaceAll(" ", "");
        JiveGlobals.setProperty("cache." + cacheName + ".type", type);
    }

    public static String getCacheTypeProperty(String cacheName) {
        cacheName = cacheName.replaceAll(" ", "");
        return JiveGlobals.getProperty("cache." + cacheName + ".type");
    }

    public static void setMinCacheSize(String cacheName, long size) {
        cacheName = cacheName.replaceAll(" ", "");
        JiveGlobals.setProperty("cache." + cacheName + ".min", Long.toString(size));
    }

    public static long getMinCacheSize(String cacheName) {
        return getCacheProperty(cacheName, ".min", 0);
    }

    private static long getCacheProperty(String cacheName, String suffix, long defaultValue) {
        // First check if user is overwriting default value using a system property for the cache name
        String propName = "cache." + cacheName.replaceAll(" ", "") + suffix;
        String sizeProp = JiveGlobals.getProperty(propName);
        if (sizeProp == null && cacheNames.containsKey(cacheName)) {
            // No system property was found for the cache name so try now with short name
            propName = "cache." + cacheNames.get(cacheName) + suffix;
            sizeProp = JiveGlobals.getProperty(propName);
        }
        if (sizeProp != null) {
            try {
                return Long.parseLong(sizeProp);
            }
            catch (NumberFormatException nfe) {
                Log.warn("Unable to parse " + propName + " using default value.");
            }
        }
        // Check if there is a default size value for this cache
        Long defaultSize = cacheProps.get(propName);
        return defaultSize == null ? defaultValue : defaultSize;
    }

    private static boolean hasCacheProperty(String cacheName, String suffix) {
        // First check if user is overwriting default value using a system property for the cache name
        String propName = "cache." + cacheName.replaceAll(" ", "") + suffix;
        String sizeProp = JiveGlobals.getProperty(propName);
        if (sizeProp == null && cacheNames.containsKey(cacheName)) {
            // No system property was found for the cache name so try now with short name
            propName = "cache." + cacheNames.get(cacheName) + suffix;
            sizeProp = JiveGlobals.getProperty(propName);
        }
        if (sizeProp != null) {
            try {
                Long.parseLong(sizeProp);
                return true;
            }
            catch (NumberFormatException nfe) {
                Log.warn("Unable to parse " + propName + " using default value.");
            }
        }
        return false;
    }

302 303 304 305
    /**
     * Returns an array of all caches in the system.
     * @return an array of all caches in the system.
     */
306
    public static Cache[] getAllCaches() {
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
        List<Cache> values = new ArrayList<Cache>();
        for (Cache cache : caches.values()) {
            values.add(cache);
        }
        return values.toArray(new Cache[values.size()]);
    }

    /**
     * Returns the named cache, creating it as necessary.
     *
     * @param name         the name of the cache to create.
     * @return the named cache, creating it as necessary.
     */
    @SuppressWarnings("unchecked")
    public static synchronized <T extends Cache> T createCache(String name) {
        T cache = (T) caches.get(name);
        if (cache != null) {
            return cache;
        }

        cache = (T) cacheFactoryStrategy.createCache(name);

        return wrapCache(cache, name);
    }

332 333 334 335 336 337 338 339 340 341 342 343
    /**
     * Destroys the cache for the cache name specified.
     *
     * @param name the name of the cache to destroy.
     */
    public static void destroyCache(String name) {
        Cache cache = caches.remove(name);
        if (cache != null) {
            cacheFactoryStrategy.destroyCache(cache);
        }
    }

344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
    /**
     * Returns an existing {@link java.util.concurrent.locks.Lock} on the specified key or creates a new one
     * if none was found. This operation is thread safe. Successive calls with the same key may or may not
     * return the same {@link java.util.concurrent.locks.Lock}. However, different threads asking for the
     * same Lock at the same time will get the same Lock object.<p>
     *
     * The supplied cache may or may not be used depending whether the server is running on cluster mode
     * or not. When not running as part of a cluster then the lock will be unrelated to the cache and will
     * only be visible in this JVM.
     *
     * @param key the object that defines the visibility or scope of the lock.
     * @param cache the cache used for holding the lock.
     * @return an existing lock on the specified key or creates a new one if none was found.
     */
    public static Lock getLock(Object key, Cache cache) {
        return cacheFactoryStrategy.getLock(key, cache);
360 361 362 363 364 365 366 367 368 369 370
    }

    @SuppressWarnings("unchecked")
    private static <T extends Cache> T wrapCache(T cache, String name) {
        cache = (T) new CacheWrapper(cache);
        cache.setName(name);

        caches.put(name, cache);
        return cache;
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
371
    /**
Gaston Dombiak's avatar
Gaston Dombiak committed
372 373 374 375
     * Returns true if clustering is installed and can be used by this JVM
     * to join a cluster. A false value could mean that either clustering
     * support is not available or the license does not allow to have more
     * than 1 cluster node.
Gaston Dombiak's avatar
Gaston Dombiak committed
376 377 378 379 380
     *
     * @return true if clustering is installed and can be used by
     * this JVM to join a cluster.
     */
    public static boolean isClusteringAvailable() {
Gaston Dombiak's avatar
Gaston Dombiak committed
381
        return getMaxClusterNodes() > 1;
Gaston Dombiak's avatar
Gaston Dombiak committed
382 383 384 385 386 387 388 389 390 391 392 393
    }

    /**
     * Returns true is clustering is currently being started. Once the cluster
     * is started or failed to be started this value will be false.
     *
     * @return true is clustering is currently being started.
     */
    public static boolean isClusteringStarting() {
        return clusteringStarting;
    }

394 395 396 397 398 399
    /**
     * Returns true if this node is currently a member of a cluster. The last step of application
     * initialization is to join a cluster, so this method returns false during most of application startup.
     *
     * @return true if this node is currently a member of a cluster.
     */
400 401
    public static boolean isClusteringStarted() {
        return clusteringStarted;
402 403 404
    }

    /**
405 406
     * Returns a byte[] that uniquely identifies this member within the cluster or <tt>null</tt>
     * when not in a cluster.
407
     *
408
     * @return a byte[] that uniquely identifies this member within the cluster or null when not in a cluster.
409
     */
410
    public static byte[] getClusterMemberID() {
411 412 413 414 415 416 417 418 419 420
        return cacheFactoryStrategy.getClusterMemberID();
    }

    public synchronized static void clearCaches() {
        for (String cacheName : caches.keySet()) {
            Cache cache = caches.get(cacheName);
            cache.clear();
        }
    }

421 422 423 424 425 426 427 428 429 430
    /**
     * Returns a byte[] that uniquely identifies this senior cluster member or <tt>null</tt>
     * when not in a cluster.
     *
     * @return a byte[] that uniquely identifies this senior cluster member or null when not in a cluster.
     */
    public static byte[] getSeniorClusterMemberID() {
        return cacheFactoryStrategy.getSeniorClusterMemberID();
    }

431 432 433 434 435 436 437 438 439 440 441
    /**
     * Returns true if this member is the senior member in the cluster. If clustering
     * is not enabled, this method will also return true. This test is useful for
     * tasks that should only be run on a single member in a cluster.
     *
     * @return true if this cluster member is the senior or if clustering is not enabled.
     */
    public static boolean isSeniorClusterMember() {
        return cacheFactoryStrategy.isSeniorClusterMember();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
442 443 444 445 446 447 448 449 450 451 452
    /**
     * Returns basic information about the current members of the cluster or an empty
     * collection if not running in a cluster.
     *
     * @return information about the current members of the cluster or an empty
     *         collection if not running in a cluster.
     */
    public static Collection<ClusterNodeInfo> getClusterNodesInfo() {
        return cacheFactoryStrategy.getClusterNodesInfo();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
453
    /**
Gaston Dombiak's avatar
Gaston Dombiak committed
454
     * Returns the maximum number of cluster members allowed. A value of 0 or 1 will
Gaston Dombiak's avatar
Gaston Dombiak committed
455 456
     * be returned when clustering is not allowed.
     *
Gaston Dombiak's avatar
Gaston Dombiak committed
457
     * @return the maximum number of cluster members allowed or 0 or 1 if clustering is not allowed.
Gaston Dombiak's avatar
Gaston Dombiak committed
458 459
     */
    public static int getMaxClusterNodes() {
Gaston Dombiak's avatar
Gaston Dombiak committed
460 461 462
        try {
            CacheFactoryStrategy cacheFactory = (CacheFactoryStrategy) Class.forName(
                    clusteredCacheFactoryClass, true,
463
                    getClusteredCacheStrategyClassLoader()).newInstance();
Gaston Dombiak's avatar
Gaston Dombiak committed
464
            return cacheFactory.getMaxClusterNodes();
465 466
        } catch (ClassNotFoundException e) {
            // Do nothing
Gaston Dombiak's avatar
Gaston Dombiak committed
467 468
        } catch (Exception e) {
            Log.error("Error instantiating clustered cache factory", e);
Gaston Dombiak's avatar
Gaston Dombiak committed
469 470
        }
        return 0;
Gaston Dombiak's avatar
Gaston Dombiak committed
471
    }
472 473 474 475 476 477 478 479
    /**
     * Invokes a task on other cluster members in an asynchronous fashion. The task will not be
     * executed on the local cluster member. If clustering is not enabled, this method
     * will do nothing.
     *
     * @param task the task to be invoked on all other cluster members.
     */
    public static void doClusterTask(final ClusterTask task) {
480
        cacheFactoryStrategy.doClusterTask(task);
481 482
    }

483 484 485 486 487 488
    /**
     * Invokes a task on a given cluster member in an asynchronous fashion. If clustering is not enabled,
     * this method will do nothing.
     *
     * @param task the task to be invoked on the specified cluster member.
     * @param nodeID the byte array that identifies the target cluster member.
489
     * @throws IllegalStateException if requested node was not found or not running in a cluster. 
490
     */
491 492
    public static void doClusterTask(final ClusterTask task, byte[] nodeID) {
        cacheFactoryStrategy.doClusterTask(task, nodeID);
493 494
    }

495 496 497 498 499 500 501 502 503 504 505
    /**
     * Invokes a task on other cluster members synchronously and returns the result as a Collection
     * (method will not return until the task has been executed on each cluster member).
     * The task will not be executed on the local cluster member. If clustering is not enabled,
     * this method will return an empty collection.
     *
     * @param task               the ClusterTask object to be invoked on all other cluster members.
     * @param includeLocalMember true to run the task on the local member, false otherwise
     * @return collection with the result of the execution.
     */
    public static Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) {
506
        return cacheFactoryStrategy.doSynchronousClusterTask(task, includeLocalMember);
507 508
    }

509 510 511 512 513 514 515
    /**
     * Invokes a task on a given cluster member synchronously and returns the result of
     * the remote operation. If clustering is not enabled, this method will return null.
     *
     * @param task        the ClusterTask object to be invoked on a given cluster member.
     * @param nodeID      the byte array that identifies the target cluster member.
     * @return result of remote operation or null if operation failed or operation returned null.
516
     * @throws IllegalStateException if requested node was not found or not running in a cluster.
517 518 519 520
     */
    public static Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
        return cacheFactoryStrategy.doSynchronousClusterTask(task, nodeID);
    }
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537

    public static synchronized void initialize() throws InitializationException {
        try {
            cacheFactoryStrategy = (CacheFactoryStrategy) Class
                        .forName(localCacheFactoryClass).newInstance();
        }
        catch (InstantiationException e) {
             throw new InitializationException(e);
        }
        catch (IllegalAccessException e) {
             throw new InitializationException(e);
        }
        catch (ClassNotFoundException e) {
            throw new InitializationException(e);
        }
    }

538
    private static ClassLoader getClusteredCacheStrategyClassLoader() {
539
        PluginManager pluginManager = XMPPServer.getInstance().getPluginManager();
540 541 542 543 544
        Plugin plugin = pluginManager.getPlugin("clustering");
        if (plugin == null) {
            plugin = pluginManager.getPlugin("enterprise");
        }
        PluginClassLoader pluginLoader = pluginManager.getPluginClassloader(plugin);
545
        if (pluginLoader != null) {
546
            return pluginLoader;
547 548
        }
        else {
549
            Log.debug("CacheFactory - Unable to find a Plugin that provides clustering support.");
550 551 552 553
            return Thread.currentThread().getContextClassLoader();
        }
    }

554
    public static void startClustering() {
555
        clusteringStarted = false;
Gaston Dombiak's avatar
Gaston Dombiak committed
556
        clusteringStarting = true;
557
        try {
558
            cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(clusteredCacheFactoryClass, true,
559
                    getClusteredCacheStrategyClassLoader())
560
                    .newInstance();
561
            clusteringStarted = cacheFactoryStrategy.startCluster();
562 563 564 565
        }
        catch (Exception e) {
            Log.error("Unable to start clustering - continuing in local mode", e);
        }
566
        if (!clusteringStarted) {
567 568 569 570 571 572 573
            // Revert to local cache factory if cluster fails to start
            try {
                cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance();
            } catch (Exception e) {
                Log.error("Fatal error - Failed to join the cluster and failed to use local cache", e);
            }
        }
574 575 576 577 578 579 580 581 582 583 584 585 586 587
        else {
            if (statsThread == null) {
                // Start a timing thread with 1 second of accuracy.
                statsThread = new Thread("Cache Stats") {
                    private volatile boolean destroyed = false;

                    public void run() {
                        XMPPServer.getInstance().addServerListener(new XMPPServerListener() {
                            public void serverStarted() {}

                            public void serverStopping() {
                                destroyed = true;
                            }
                        });
Gaston Dombiak's avatar
Gaston Dombiak committed
588 589 590 591 592 593 594 595 596 597 598 599 600 601
                        ClusterManager.addListener(new ClusterEventListener() {
                            public void joinedCluster() {}

                            public void joinedCluster(byte[] nodeID) {}

                            public void leftCluster() {
                                destroyed = true;
                                ClusterManager.removeListener(this);
                            }

                            public void leftCluster(byte[] nodeID) {}

                            public void markedAsSeniorClusterMember() {}
                        });
602 603

                        // Run the timer indefinitely.
Gaston Dombiak's avatar
Gaston Dombiak committed
604
                        while (!destroyed && ClusterManager.isClusteringEnabled()) {
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
                            // Publish cache stats for this cluster node (assuming clustering is
                            // enabled and there are stats to publish).
                            try {
                                cacheFactoryStrategy.updateCacheStats(caches);
                            }
                            catch (Exception e) {
                                Log.error(e);
                            }
                            try {
                                // Sleep 10 seconds.
                                sleep(10000);
                            }
                            catch (InterruptedException ie) {
                                // Ignore.
                            }
                        }
Gaston Dombiak's avatar
Gaston Dombiak committed
621
                        statsThread = null;
622 623 624 625 626 627 628
                        Log.debug("Cache stats thread terminated.");
                    }
                };
                statsThread.setDaemon(true);
                statsThread.start();
            }
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
629
        clusteringStarting = false;
630 631
    }

632
    public static void stopClustering() {
633
        try {
Gaston Dombiak's avatar
Gaston Dombiak committed
634 635 636
            // Stop the cluster
            cacheFactoryStrategy.stopCluster();
            // Set the strategy to local
637 638 639
            cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass)
                    .newInstance();

640
            clusteringStarted = false;
641 642 643 644 645 646
        }
        catch (Exception e) {
            Log.error("Unable to stop clustering - continuing in clustered mode", e);
        }
    }

647
    /**
648
     * Notification message indicating that this JVM has joined a cluster.
649
     */
650 651 652 653 654 655 656
    public static void joinedCluster() {
        // Loop through local caches and switch them to clustered cache (migrate content)
        for (Cache cache : getAllCaches()) {
            CacheWrapper cacheWrapper = ((CacheWrapper) cache);
            Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
            cacheWrapper.setWrappedCache(clusteredCache);
        }
657 658
    }

659 660 661 662 663
    /**
     * Notification message indicating that this JVM has left the cluster.
     */
    public static void leftCluster() {
        // Loop through clustered caches and change them to local caches (migrate content)
Gaston Dombiak's avatar
Gaston Dombiak committed
664
        try {
Gaston Dombiak's avatar
Gaston Dombiak committed
665
            cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance();
Gaston Dombiak's avatar
Gaston Dombiak committed
666 667 668

            for (Cache cache : getAllCaches()) {
                CacheWrapper cacheWrapper = ((CacheWrapper) cache);
Gaston Dombiak's avatar
Gaston Dombiak committed
669
                Cache standaloneCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
Gaston Dombiak's avatar
Gaston Dombiak committed
670 671 672 673
                cacheWrapper.setWrappedCache(standaloneCache);
            }
        } catch (Exception e) {
            Log.error("Error reverting caches to local caches", e);
674 675
        }
    }
676
}