Commit 7bfc8c62 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Same component service can now be offered by many cluster nodes.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8429 b35dd754-fafc-0310-a699-88a17e54d16e
parent 8c9d8d79
......@@ -91,9 +91,12 @@ public interface RoutingTable {
void addServerRoute(JID route, LocalOutgoingServerSession destination);
/**
* Adds a route to the routing table for the specified internal or external component. When
* running inside of a cluster this message <tt>must</tt> be sent from the cluster node
* that is actually hosting the component.
* Adds a route to the routing table for the specified internal or external component. <p>
*
* When running inside of a cluster this message <tt>must</tt> be sent from the cluster
* node that is actually hosting the component. The component may be available in all
* or some of cluster nodes. The routing table will keep track of all nodes hosting
* the component.
*
* @param route the address associated to the route.
* @param destination the component.
......@@ -126,7 +129,8 @@ public interface RoutingTable {
* Packets routed to components will only be sent if the internal or external
* component is connected to the server. Moreover, when runing inside of a cluster
* the node that is hosting the component will be requested to deliver the requested
* packet.<p>
* packet. It will be first checked if the component is available in this JVM and if not
* then the first cluster node found hosting the component will be used.<p>
*
* Packets routed to users will be delivered if the user is connected to the server. Depending
* on the packet type and the sender of the packet only available or all user sessions could
......
......@@ -59,9 +59,6 @@ public class LocalClientSession extends LocalSession implements ClientSession {
*/
private static Map<String,String> allowedIPs = new HashMap<String,String>();
private static Connection.TLSPolicy tlsPolicy;
private static Connection.CompressionPolicy compressionPolicy;
/**
* The authentication token for this session.
*/
......@@ -109,15 +106,6 @@ public class LocalClientSession extends LocalSession implements ClientSession {
String address = tokens.nextToken().trim();
allowedIPs.put(address, "");
}
// Set the TLS policy stored as a system property
String policyName = JiveGlobals.getProperty("xmpp.client.tls.policy",
Connection.TLSPolicy.optional.toString());
tlsPolicy = Connection.TLSPolicy.valueOf(policyName);
// Set the Compression policy stored as a system property
policyName = JiveGlobals.getProperty("xmpp.client.compression.policy",
Connection.CompressionPolicy.optional.toString());
compressionPolicy = Connection.CompressionPolicy.valueOf(policyName);
}
/**
......@@ -246,6 +234,7 @@ public class LocalClientSession extends LocalSession implements ClientSession {
catch (Exception e) {
Log.error(e);
}
Connection.TLSPolicy tlsPolicy = getTLSPolicy();
if (Connection.TLSPolicy.required == tlsPolicy && !hasCertificates) {
Log.error("Client session rejected. TLS is required but no certificates " +
"were created.");
......@@ -259,7 +248,7 @@ public class LocalClientSession extends LocalSession implements ClientSession {
}
// Indicate the compression policy to use for this connection
connection.setCompressionPolicy(compressionPolicy);
connection.setCompressionPolicy(getCompressionPolicy());
// Create a ClientSession for this user.
LocalClientSession session = SessionManager.getInstance().createClientSession(connection);
......@@ -353,6 +342,15 @@ public class LocalClientSession extends LocalSession implements ClientSession {
* @return whether TLS is mandatory, optional or is disabled.
*/
public static SocketConnection.TLSPolicy getTLSPolicy() {
// Set the TLS policy stored as a system property
String policyName = JiveGlobals.getProperty("xmpp.client.tls.policy", Connection.TLSPolicy.optional.toString());
SocketConnection.TLSPolicy tlsPolicy;
try {
tlsPolicy = Connection.TLSPolicy.valueOf(policyName);
} catch (IllegalArgumentException e) {
Log.error("Error parsing xmpp.client.tls.policy: " + policyName, e);
tlsPolicy = Connection.TLSPolicy.optional;
}
return tlsPolicy;
}
......@@ -366,8 +364,7 @@ public class LocalClientSession extends LocalSession implements ClientSession {
* @param policy whether TLS is mandatory, optional or is disabled.
*/
public static void setTLSPolicy(SocketConnection.TLSPolicy policy) {
tlsPolicy = policy;
JiveGlobals.setProperty("xmpp.client.tls.policy", tlsPolicy.toString());
JiveGlobals.setProperty("xmpp.client.tls.policy", policy.toString());
}
/**
......@@ -376,6 +373,16 @@ public class LocalClientSession extends LocalSession implements ClientSession {
* @return whether compression is optional or is disabled.
*/
public static SocketConnection.CompressionPolicy getCompressionPolicy() {
// Set the Compression policy stored as a system property
String policyName = JiveGlobals
.getProperty("xmpp.client.compression.policy", Connection.CompressionPolicy.optional.toString());
SocketConnection.CompressionPolicy compressionPolicy;
try {
compressionPolicy = Connection.CompressionPolicy.valueOf(policyName);
} catch (IllegalArgumentException e) {
Log.error("Error parsing xmpp.client.compression.policy: " + policyName, e);
compressionPolicy = Connection.CompressionPolicy.optional;
}
return compressionPolicy;
}
......@@ -385,8 +392,7 @@ public class LocalClientSession extends LocalSession implements ClientSession {
* @param policy whether compression is optional or is disabled.
*/
public static void setCompressionPolicy(SocketConnection.CompressionPolicy policy) {
compressionPolicy = policy;
JiveGlobals.setProperty("xmpp.client.compression.policy", compressionPolicy.toString());
JiveGlobals.setProperty("xmpp.client.compression.policy", policy.toString());
}
/**
......
......@@ -53,7 +53,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
* Cache (unlimited, never expire) that holds sessions of external components connected to the server.
* Key: component domain, Value: nodeID
*/
private Cache<String, byte[]> componentsCache;
private Cache<String, Set<byte[]>> componentsCache;
/**
* Cache (unlimited, never expire) that holds sessions of user that have authenticated with the server.
* Key: full JID, Value: {nodeID, available/unavailable}
......@@ -98,7 +98,18 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
public void addComponentRoute(JID route, RoutableChannelHandler destination) {
String address = destination.getAddress().getDomain();
localRoutingTable.addRoute(address, destination);
componentsCache.put(address, server.getNodeID());
Lock lock = LockManager.getLock(address + "rt");
try {
lock.lock();
Set<byte[]> nodes = componentsCache.get(address);
if (nodes == null) {
nodes = new HashSet<byte[]>();
}
nodes.add(server.getNodeID());
componentsCache.put(address, nodes);
} finally {
lock.unlock();
}
}
public void addClientRoute(JID route, LocalClientSession destination) {
......@@ -213,13 +224,28 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
}
else if (jid.getDomain().contains(serverName)) {
// Packet sent to component hosted in this server
byte[] nodeID = componentsCache.get(jid.getDomain());
if (nodeID != null) {
// First check if the component is being hosted in this JVM
RoutableChannelHandler route = localRoutingTable.getRoute(jid.getDomain());
if (route != null) {
try {
route.process(packet);
routed = true;
} catch (UnauthorizedException e) {
Log.error(e);
}
}
else {
// Check if other cluster nodes are hosting this component
Set<byte[]> nodes = componentsCache.get(jid.getDomain());
if (nodes != null) {
for (byte[] nodeID : nodes) {
if (Arrays.equals(nodeID, server.getNodeID())) {
// This is a route to a local component hosted in this node
// This is a route to a local component hosted in this node (route
// could have been added after our previous check)
try {
localRoutingTable.getRoute(jid.getDomain()).process(packet);
routed = true;
break;
} catch (UnauthorizedException e) {
Log.error(e);
}
......@@ -228,6 +254,11 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
// This is a route to a local component hosted in other node
if (remotePacketRouter != null) {
routed = remotePacketRouter.routePacket(nodeID, jid, packet);
if (routed) {
break;
}
}
}
}
}
}
......@@ -526,8 +557,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
}
else if (route.getDomain().contains(serverName)) {
// Packet sent to component hosted in this server
byte[] nodeID = componentsCache.get(route.getDomain());
if (nodeID != null) {
if (componentsCache.containsKey(route.getDomain())) {
jids.add(route);
}
}
......@@ -583,7 +613,23 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
public boolean removeComponentRoute(JID route) {
String address = route.getDomain();
boolean removed = componentsCache.remove(address) != null;
boolean removed = false;
Lock lock = LockManager.getLock(address + "rt");
try {
lock.lock();
Set<byte[]> nodes = componentsCache.get(address);
if (nodes != null) {
removed = nodes.remove(server.getNodeID());
if (nodes.isEmpty()) {
componentsCache.remove(address);
}
else {
componentsCache.put(address, nodes);
}
}
} finally {
lock.unlock();
}
localRoutingTable.removeRoute(address);
return removed;
}
......
......@@ -37,7 +37,7 @@ public class CacheFactory {
public static String LOCAL_CACHE_PROPERTY_NAME = "cache.clustering.local.class";
public static String CLUSTERED_CACHE_PROPERTY_NAME = "cache.clustering.clustered.class";
private static boolean clusteringEnabled = false;
private static boolean clusteringStarted = false;
/**
* Storage for all caches that get created.
......@@ -117,8 +117,8 @@ public class CacheFactory {
*
* @return true if this node is currently a member of a cluster.
*/
public static boolean isClusteringEnabled() {
return clusteringEnabled;
public static boolean isClusteringStarted() {
return clusteringStarted;
}
/**
......@@ -148,7 +148,7 @@ public class CacheFactory {
* @throws Exception if an error occurs while using the new cache type.
*/
public static synchronized void setClusteringEnabled(boolean enabled) throws Exception {
if (enabled == clusteringEnabled) {
if (enabled == clusteringStarted) {
return;
}
JiveGlobals.setXMLProperty(CLUSTER_PROPERTY_NAME, String.valueOf(enabled));
......@@ -179,7 +179,7 @@ public class CacheFactory {
*/
public static boolean isSeniorClusterMember() {
synchronized(CacheFactory.class) {
if (!isClusteringEnabled()) {
if (!isClusteringStarted()) {
return true;
}
}
......@@ -194,11 +194,11 @@ public class CacheFactory {
* @param task the task to be invoked on all other cluster members.
*/
public static void doClusterTask(final ClusterTask task) {
if (!clusteringEnabled) {
if (!clusteringStarted) {
return;
}
synchronized(CacheFactory.class) {
if (!clusteringEnabled) {
if (!clusteringStarted) {
return;
}
}
......@@ -215,11 +215,11 @@ public class CacheFactory {
* @return false if not in a cluster or specified cluster node was not found.
*/
public static boolean doClusterTask(final ClusterTask task, byte[] nodeID) {
if (!clusteringEnabled) {
if (!clusteringStarted) {
return false;
}
synchronized(CacheFactory.class) {
if (!clusteringEnabled) {
if (!clusteringStarted) {
return false;
}
}
......@@ -239,7 +239,7 @@ public class CacheFactory {
*/
public static Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) {
synchronized(CacheFactory.class) {
if (!clusteringEnabled) {
if (!clusteringStarted) {
return Collections.emptyList();
}
}
......@@ -257,7 +257,7 @@ public class CacheFactory {
*/
public static Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
synchronized(CacheFactory.class) {
if (!clusteringEnabled) {
if (!clusteringStarted) {
return null;
}
}
......@@ -276,7 +276,7 @@ public class CacheFactory {
* If clustering is not enabled, this method will do nothing.
*/
public static synchronized void shutdown() {
if (!clusteringEnabled) {
if (!clusteringStarted) {
return;
}
// See if clustering should be enabled.
......@@ -333,7 +333,7 @@ public class CacheFactory {
*/
public static synchronized void startup() {
if (clusteringEnabled) {
if (clusteringStarted) {
return;
}
// See if clustering should be enabled.
......@@ -396,7 +396,7 @@ public class CacheFactory {
}
private static void startClustering() {
clusteringEnabled = false;
clusteringStarted = false;
boolean clusterStarted = false;
try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(clusteredCacheFactoryClass, true,
......@@ -410,7 +410,7 @@ public class CacheFactory {
wrapper.setWrappedCache(cacheFactoryStrategy.createCache(cacheName));
}
clusteringEnabled = true;
clusteringStarted = true;
// Set the ID of this cluster node
XMPPServer.getInstance().setNodeID(CacheFactory.getClusterMemberID());
// Fire event that cluster has been started
......@@ -442,7 +442,7 @@ public class CacheFactory {
wrapper.setWrappedCache(cacheFactoryStrategy.createCache(cacheName));
}
clusteringEnabled = false;
clusteringStarted = false;
// Reset the node ID
XMPPServer.getInstance().setNodeID(null);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment