Commit d23bafdf authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Small refactoring work. CacheFactory is about caches and ClusterManager about cluster.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8730 b35dd754-fafc-0310-a699-88a17e54d16e
parent 0b7353ac
......@@ -13,6 +13,7 @@ package org.jivesoftware.openfire.cluster;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.JiveProperties;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LocalLockFactory;
......@@ -242,7 +243,7 @@ public class ClusterManager {
* {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)}
* need to be properly configured.
*/
public static void startup() {
public static synchronized void startup() {
if (isClusteringStarted()) {
return;
}
......@@ -255,7 +256,7 @@ public class ClusterManager {
throw new IllegalStateException("No RemotePacketRouter was found.");
}
// Start up the cluster and reset caches
CacheFactory.startup();
CacheFactory.startClustering();
}
}
......@@ -272,20 +273,34 @@ public class ClusterManager {
// Reset packet router to use to deliver packets to remote cluster nodes
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
if (isClusteringStarted()) {
CacheFactory.shutdown();
Log.debug("Shutting down clustered cache service.");
CacheFactory.stopClustering();
}
// Reset the session locator to use
XMPPServer.getInstance().setRemoteSessionLocator(null);
}
/**
* Sets true if clustering support is enabled. This does not mean
* that clustering has started or that clustering will be able to start.
* Sets true if clustering support is enabled. An attempt to start or join
* an existing cluster will be attempted in the service was enabled. On the
* other hand, if disabled then this JVM will leave or stop the cluster.
*
* @param enabled if clustering support is enabled.
*/
public static void setClusteringEnabled(boolean enabled) {
if (enabled == isClusteringEnabled()) {
return;
}
JiveGlobals.setXMLProperty(CLUSTER_PROPERTY_NAME, Boolean.toString(enabled));
if (!enabled) {
CacheFactory.stopClustering();
}
else {
// Reload Jive properties. This will ensure that this nodes copy of the
// properties starts correct.
JiveProperties.getInstance().init();
CacheFactory.startClustering();
}
}
/**
......
......@@ -9,13 +9,11 @@ package org.jivesoftware.util.cache;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.XMPPServerListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.container.Plugin;
import org.jivesoftware.openfire.container.PluginClassLoader;
import org.jivesoftware.openfire.container.PluginManager;
import org.jivesoftware.util.InitializationException;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.JiveProperties;
import org.jivesoftware.util.Log;
import java.util.ArrayList;
......@@ -49,6 +47,7 @@ public class CacheFactory {
private static String localCacheFactoryClass;
private static String clusteredCacheFactoryClass;
private static CacheFactoryStrategy cacheFactoryStrategy;
private static Thread statsThread;
static {
localCacheFactoryClass = JiveGlobals.getProperty(LOCAL_CACHE_PROPERTY_NAME,
......@@ -128,29 +127,6 @@ public class CacheFactory {
return cacheFactoryStrategy.getClusterMemberID();
}
/**
* Sets whether cache clustering should be enabled. Anytime this value is
* changed, the application server must be restarted
*
* @param enabled true if cache clustering should be enabled.
* @throws Exception if an error occurs while using the new cache type.
*/
public static synchronized void setClusteringEnabled(boolean enabled) throws Exception {
if (enabled == clusteringStarted) {
return;
}
ClusterManager.setClusteringEnabled(enabled);
if (!enabled) {
stopClustering();
}
else {
// Reload Jive properties. This will ensure that this nodes copy of the
// properties starts correct.
JiveProperties.getInstance().init();
startClustering();
}
}
public synchronized static void clearCaches() {
for (String cacheName : caches.keySet()) {
Cache cache = caches.get(cacheName);
......@@ -219,29 +195,6 @@ public class CacheFactory {
return cacheFactoryStrategy.doSynchronousClusterTask(task, nodeID);
}
/**
* Shuts down the clustering service. This method should be called when the Jive
* system is shutting down, and must not be called otherwise. By default, a
* ServletContextListener is registered to listen for the web application shutting down, and
* will automatically call this method. However, if the Jive system is being used in
* another context, such as a command-line application, this method should be called
* explicitly. Failing to call this method may temporarily impact cluster performance,
* as the system will have to do extra work to recover from a non-clean shutdown.
* If clustering is not enabled, this method will do nothing.
*/
public static synchronized void shutdown() {
if (!clusteringStarted) {
return;
}
// See if clustering should be enabled.
boolean enabled = ClusterManager.isClusteringEnabled();
if (enabled) {
Log.debug("Shutting down clustered cache service.");
stopClustering();
}
}
public static synchronized void initialize() throws InitializationException {
try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class
......@@ -258,64 +211,6 @@ public class CacheFactory {
}
}
/**
* Starts the cluster service if clustering is enabled, and begins tracking cache statistics. Before this method is called,
* any {@link Cache}s returned by calls to {@link #createCache} will return local caches. The process of starting clustering
* will recreate them as distributed caches. This is safer than the alternative - where clustering is started before
* any caches are created. In that scenario, cluster tasks can fire off in this process before it is safe for them to do so,
* and cluster wide deadlocks can occur.
*/
public static synchronized void startup() {
if (clusteringStarted) {
return;
}
// See if clustering should be enabled.
boolean enabled = ClusterManager.isClusteringEnabled();
// If the user tried to turn on clustering, make sure they're actually allowed to.
if (enabled) {
startClustering();
}
// Start a timing thread with 1 second of accuracy.
Thread t = new Thread("Cache Stats") {
private volatile boolean destroyed = false;
public void run() {
XMPPServer.getInstance().addServerListener(new XMPPServerListener() {
public void serverStarted() {}
public void serverStopping() {
destroyed = true;
}
});
// Run the timer indefinitely.
while (!destroyed) {
// Publish cache stats for this cluster node (assuming clustering is
// enabled and there are stats to publish).
try {
cacheFactoryStrategy.updateCacheStats(caches);
}
catch (Exception e) {
Log.error(e);
}
try {
// Sleep 10 seconds.
sleep(10000);
}
catch (InterruptedException ie) {
// Ignore.
}
}
Log.debug("Cache stats thread terminated.");
}
};
t.setDaemon(true);
t.start();
}
private static ClassLoader getClusteredCacheStrategyClassLoader(String pluginName) {
PluginManager pluginManager = XMPPServer.getInstance().getPluginManager();
Plugin enterprisePlugin = pluginManager.getPlugin(pluginName);
......@@ -329,7 +224,7 @@ public class CacheFactory {
}
}
private static void startClustering() {
public static void startClustering() {
clusteringStarted = false;
try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(clusteredCacheFactoryClass, true,
......@@ -348,9 +243,49 @@ public class CacheFactory {
Log.error("Fatal error - Failed to join the cluster and failed to use local cache", e);
}
}
else {
if (statsThread == null) {
// Start a timing thread with 1 second of accuracy.
statsThread = new Thread("Cache Stats") {
private volatile boolean destroyed = false;
public void run() {
XMPPServer.getInstance().addServerListener(new XMPPServerListener() {
public void serverStarted() {}
public void serverStopping() {
destroyed = true;
}
});
// Run the timer indefinitely.
while (!destroyed) {
// Publish cache stats for this cluster node (assuming clustering is
// enabled and there are stats to publish).
try {
cacheFactoryStrategy.updateCacheStats(caches);
}
catch (Exception e) {
Log.error(e);
}
try {
// Sleep 10 seconds.
sleep(10000);
}
catch (InterruptedException ie) {
// Ignore.
}
}
Log.debug("Cache stats thread terminated.");
}
};
statsThread.setDaemon(true);
statsThread.start();
}
}
}
private static void stopClustering() {
public static void stopClustering() {
try {
// Stop the cluster
cacheFactoryStrategy.stopCluster();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment