Commit a72ea6a3 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Fixed distributed locking problem when running in a cluster. ENT-425. Reviewed by Daniel.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches@10064 b35dd754-fafc-0310-a699-88a17e54d16e
parent e2ed3b9e
...@@ -30,7 +30,6 @@ import org.jivesoftware.util.LocaleUtils; ...@@ -30,7 +30,6 @@ import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
...@@ -394,7 +393,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -394,7 +393,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
// Keep track of the nodeID hosting the incoming server session // Keep track of the nodeID hosting the incoming server session
incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray()); incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray());
// Update list of sockets/sessions coming from the same remote hostname // Update list of sockets/sessions coming from the same remote hostname
Lock lock = LockManager.getLock(hostname); Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try { try {
lock.lock(); lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname); List<String> streamIDs = hostnameSessionsCache.get(hostname);
...@@ -408,7 +407,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -408,7 +407,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
lock.unlock(); lock.unlock();
} }
// Add to clustered cache // Add to clustered cache
lock = LockManager.getLock(streamID); lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try { try {
lock.lock(); lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID); Set<String> validatedDomains = validatedDomainsCache.get(streamID);
...@@ -438,7 +437,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -438,7 +437,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
incomingServerSessionsCache.remove(streamID); incomingServerSessionsCache.remove(streamID);
// Remove from list of sockets/sessions coming from the remote hostname // Remove from list of sockets/sessions coming from the remote hostname
Lock lock = LockManager.getLock(hostname); Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try { try {
lock.lock(); lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname); List<String> streamIDs = hostnameSessionsCache.get(hostname);
...@@ -456,7 +455,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -456,7 +455,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
lock.unlock(); lock.unlock();
} }
// Remove from clustered cache // Remove from clustered cache
lock = LockManager.getLock(streamID); lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try { try {
lock.lock(); lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID); Set<String> validatedDomains = validatedDomainsCache.get(streamID);
...@@ -488,7 +487,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -488,7 +487,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
* @return domains, subdomains and virtual hosts that where validated. * @return domains, subdomains and virtual hosts that where validated.
*/ */
public Collection<String> getValidatedDomains(String streamID) { public Collection<String> getValidatedDomains(String streamID) {
Lock lock = LockManager.getLock(streamID); Lock lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try { try {
lock.lock(); lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID); Set<String> validatedDomains = validatedDomainsCache.get(streamID);
...@@ -763,7 +762,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -763,7 +762,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
public List<IncomingServerSession> getIncomingServerSessions(String hostname) { public List<IncomingServerSession> getIncomingServerSessions(String hostname) {
List<String> streamIDs; List<String> streamIDs;
// Get list of sockets/sessions coming from the remote hostname // Get list of sockets/sessions coming from the remote hostname
Lock lock = LockManager.getLock(hostname); Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try { try {
lock.lock(); lock.lock();
streamIDs = hostnameSessionsCache.get(hostname); streamIDs = hostnameSessionsCache.get(hostname);
...@@ -1437,7 +1436,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -1437,7 +1436,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray()); incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray());
for (String hostname : session.getValidatedDomains()) { for (String hostname : session.getValidatedDomains()) {
// Update list of sockets/sessions coming from the same remote hostname // Update list of sockets/sessions coming from the same remote hostname
Lock lock = LockManager.getLock(hostname); Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try { try {
lock.lock(); lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname); List<String> streamIDs = hostnameSessionsCache.get(hostname);
...@@ -1451,7 +1450,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -1451,7 +1450,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
lock.unlock(); lock.unlock();
} }
// Add to clustered cache // Add to clustered cache
lock = LockManager.getLock(streamID); lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try { try {
lock.lock(); lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID); Set<String> validatedDomains = validatedDomainsCache.get(streamID);
......
...@@ -16,8 +16,6 @@ import org.jivesoftware.util.JiveGlobals; ...@@ -16,8 +16,6 @@ import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.JiveProperties; import org.jivesoftware.util.JiveProperties;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LocalLockFactory;
import org.jivesoftware.util.lock.LockManager;
import java.util.Collection; import java.util.Collection;
import java.util.Queue; import java.util.Queue;
...@@ -236,8 +234,8 @@ public class ClusterManager { ...@@ -236,8 +234,8 @@ public class ClusterManager {
/** /**
* Starts the cluster service if clustering is enabled. The process of starting clustering * Starts the cluster service if clustering is enabled. The process of starting clustering
* will recreate caches as distributed caches.<p> * will recreate caches as distributed caches.<p>
* <p/> *
* Before starting a cluster the {@link LockManager#setLockFactory(org.jivesoftware.util.lock.LockFactory)}, * Before starting a cluster the
* {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and * {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and
* {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)} * {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)}
* need to be properly configured. * need to be properly configured.
...@@ -267,8 +265,6 @@ public class ClusterManager { ...@@ -267,8 +265,6 @@ public class ClusterManager {
* If clustering is not enabled, this method will do nothing. * If clustering is not enabled, this method will do nothing.
*/ */
public static synchronized void shutdown() { public static synchronized void shutdown() {
// Reset the LockFactory to the default one
LockManager.setLockFactory(new LocalLockFactory());
// Reset packet router to use to deliver packets to remote cluster nodes // Reset packet router to use to deliver packets to remote cluster nodes
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null); XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
if (isClusteringStarted()) { if (isClusteringStarted()) {
......
...@@ -83,14 +83,30 @@ public class CacheInfo { ...@@ -83,14 +83,30 @@ public class CacheInfo {
public static enum Type { public static enum Type {
/** /**
* An optimistic scheme defines a cache which fully replicates all of its data to all cluster nodes that * Data is fully replicated to every member in the cluster. Offers the fastest read performance. Clustered,
* are running the service. This cache is good for frequent reads and not frequent writes. However, this * fault-tolerant cache with linear performance scalability for reads, but poor scalability for writes
* cache will not scale fine if it has lot of content that will end up consuming all the JVM memory. For * (as writes must be processed by every member in the cluster). Because data is replicated to all machines,
* this case a {@link #distributed} is a better option. * adding servers does not increase aggregate cache capacity.
*/
replicated("replicated"),
/**
* OptimisticCache is a clustered cache implementation similar to the ReplicatedCache implementation, but
* without any concurrency control. This implementation has the highest possible throughput. It also allows
* to use an alternative underlying store for the cached data (for example, a MRU/MFU-based cache). However,
* if two cluster members are independently pruning or purging the underlying local stores, it is possible
* that a cluster member may have a different store content than that held by another cluster member.
* This cache is good for frequent reads and not frequent writes. However, this cache will not scale fine
* if it has lot of content that will end up consuming all the JVM memory. For this case a
* {@link #distributed} is a better option.
*/ */
optimistic("optimistic"), optimistic("optimistic"),
/** /**
* An distributed-scheme defines caches where the storage for entries is partitioned across cluster nodes. * An distributed-scheme defines caches where the storage for entries is partitioned across cluster nodes.
* A hybrid cache; fronts a fault-tolerant, scalable partitioned cache with a local cache. Near cache
* invalidates front cache entries, using configurable invalidation strategy, and provides excellent
* performance and synchronization. Near cache backed by a partitioned cache offers zero-millisecond local
* access for repeat data access, while enabling concurrency and ensuring coherency and fail-over,
* effectively combining the best attributes of replicated and partitioned caches.
*/ */
distributed("near-distributed"); distributed("near-distributed");
......
...@@ -28,7 +28,6 @@ import org.jivesoftware.openfire.user.UserNotFoundException; ...@@ -28,7 +28,6 @@ import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
...@@ -260,7 +259,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene ...@@ -260,7 +259,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
*/ */
public void addServerFeature(String namespace) { public void addServerFeature(String namespace) {
if (localServerFeatures.add(namespace)) { if (localServerFeatures.add(namespace)) {
Lock lock = LockManager.getLock(namespace); Lock lock = CacheFactory.getLock(namespace, serverFeatures);
try { try {
lock.lock(); lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(namespace); Set<NodeID> nodeIDs = serverFeatures.get(namespace);
...@@ -284,7 +283,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene ...@@ -284,7 +283,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
*/ */
public void removeServerFeature(String namespace) { public void removeServerFeature(String namespace) {
if (localServerFeatures.remove(namespace)) { if (localServerFeatures.remove(namespace)) {
Lock lock = LockManager.getLock(namespace); Lock lock = CacheFactory.getLock(namespace, serverFeatures);
try { try {
lock.lock(); lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(namespace); Set<NodeID> nodeIDs = serverFeatures.get(namespace);
...@@ -353,7 +352,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene ...@@ -353,7 +352,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
// Remove server features added by node that is gone // Remove server features added by node that is gone
for (Map.Entry<String, Set<NodeID>> entry : serverFeatures.entrySet()) { for (Map.Entry<String, Set<NodeID>> entry : serverFeatures.entrySet()) {
String namespace = entry.getKey(); String namespace = entry.getKey();
Lock lock = LockManager.getLock(namespace); Lock lock = CacheFactory.getLock(namespace, serverFeatures);
try { try {
lock.lock(); lock.lock();
Set<NodeID> nodeIDs = entry.getValue(); Set<NodeID> nodeIDs = entry.getValue();
...@@ -379,7 +378,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene ...@@ -379,7 +378,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
private void restoreCacheContent() { private void restoreCacheContent() {
for (String feature : localServerFeatures) { for (String feature : localServerFeatures) {
Lock lock = LockManager.getLock(feature); Lock lock = CacheFactory.getLock(feature, serverFeatures);
try { try {
lock.lock(); lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(feature); Set<NodeID> nodeIDs = serverFeatures.get(feature);
......
...@@ -32,7 +32,6 @@ import org.jivesoftware.openfire.user.UserNotFoundException; ...@@ -32,7 +32,6 @@ import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
...@@ -324,7 +323,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv ...@@ -324,7 +323,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
* @param name the discovered name of the component. * @param name the discovered name of the component.
*/ */
public void addComponentItem(String jid, String node, String name) { public void addComponentItem(String jid, String node, String name) {
Lock lock = LockManager.getLock(jid + "item"); Lock lock = CacheFactory.getLock(jid + "item", serverItems);
try { try {
lock.lock(); lock.lock();
ClusteredServerItem item = serverItems.get(jid); ClusteredServerItem item = serverItems.get(jid);
...@@ -356,7 +355,11 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv ...@@ -356,7 +355,11 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
* @param jid the jid of the component being removed. * @param jid the jid of the component being removed.
*/ */
public void removeComponentItem(String jid) { public void removeComponentItem(String jid) {
Lock lock = LockManager.getLock(jid + "item"); if (serverItems == null) {
// Safety check
return;
}
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
try { try {
lock.lock(); lock.lock();
ClusteredServerItem item = serverItems.get(jid); ClusteredServerItem item = serverItems.get(jid);
...@@ -422,7 +425,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv ...@@ -422,7 +425,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
NodeID leftNode = NodeID.getInstance(nodeID); NodeID leftNode = NodeID.getInstance(nodeID);
for (Map.Entry<String, ClusteredServerItem> entry : serverItems.entrySet()) { for (Map.Entry<String, ClusteredServerItem> entry : serverItems.entrySet()) {
String jid = entry.getKey(); String jid = entry.getKey();
Lock lock = LockManager.getLock(jid + "item"); Lock lock = CacheFactory.getLock(jid + "item", serverItems);
try { try {
lock.lock(); lock.lock();
ClusteredServerItem item = entry.getValue(); ClusteredServerItem item = entry.getValue();
...@@ -450,7 +453,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv ...@@ -450,7 +453,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
for (Map.Entry<String, Element> entry : localServerItems.entrySet()) { for (Map.Entry<String, Element> entry : localServerItems.entrySet()) {
String jid = entry.getKey(); String jid = entry.getKey();
Element element = entry.getValue(); Element element = entry.getValue();
Lock lock = LockManager.getLock(jid + "item"); Lock lock = CacheFactory.getLock(jid + "item", serverItems);
try { try {
lock.lock(); lock.lock();
ClusteredServerItem item = serverItems.get(jid); ClusteredServerItem item = serverItems.get(jid);
......
...@@ -511,6 +511,11 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -511,6 +511,11 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
// we could still send directed presences to entities that when connected to a cluster // we could still send directed presences to entities that when connected to a cluster
// they will be replicated. An example would be MUC rooms. // they will be replicated. An example would be MUC rooms.
for (Map.Entry<String, Collection<DirectedPresence>> entry : localDirectedPresences.entrySet()) { for (Map.Entry<String, Collection<DirectedPresence>> entry : localDirectedPresences.entrySet()) {
if (entry.getValue().isEmpty()) {
Log.warn("PresenceUpdateHandler - Skipping empty directed presences when joining cluster for sender: " +
entry.getKey());
continue;
}
directedPresencesCache.put(entry.getKey(), entry.getValue()); directedPresencesCache.put(entry.getKey(), entry.getValue());
} }
} }
...@@ -523,6 +528,12 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -523,6 +528,12 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
if (!XMPPServer.getInstance().isShuttingDown()) { if (!XMPPServer.getInstance().isShuttingDown()) {
// Populate directedPresencesCache with local content // Populate directedPresencesCache with local content
for (Map.Entry<String, Collection<DirectedPresence>> entry : localDirectedPresences.entrySet()) { for (Map.Entry<String, Collection<DirectedPresence>> entry : localDirectedPresences.entrySet()) {
if (entry.getValue().isEmpty()) {
Log.warn(
"PresenceUpdateHandler - Skipping empty directed presences when leaving cluster for sender: " +
entry.getKey());
continue;
}
directedPresencesCache.put(entry.getKey(), entry.getValue()); directedPresencesCache.put(entry.getKey(), entry.getValue());
} }
} }
......
...@@ -15,9 +15,11 @@ import org.jivesoftware.openfire.RoutableChannelHandler; ...@@ -15,9 +15,11 @@ import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable; import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.LocalOutgoingServerSession; import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.lock.LockManager; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.util.HashMap; import java.util.HashMap;
...@@ -55,6 +57,11 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -55,6 +57,11 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
private Map<String, PacketsProcessor> packetsProcessors = new HashMap<String, PacketsProcessor>(); private Map<String, PacketsProcessor> packetsProcessors = new HashMap<String, PacketsProcessor>();
/**
* Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
* Key: server domain, Value: nodeID
*/
private Cache<String, byte[]> serversCache;
/** /**
* Flag that indicates if the process that consumed the queued packets should stop. * Flag that indicates if the process that consumed the queued packets should stop.
*/ */
...@@ -67,6 +74,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -67,6 +74,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
} }
private void init() { private void init() {
serversCache = CacheFactory.createCache(RoutingTableImpl.S2S_CACHE_NAME);
routingTable = XMPPServer.getInstance().getRoutingTable(); routingTable = XMPPServer.getInstance().getRoutingTable();
// Create a pool of threads that will process queued packets. // Create a pool of threads that will process queued packets.
int maxThreads = JiveGlobals.getIntProperty("xmpp.server.outgoing.max.threads", 20); int maxThreads = JiveGlobals.getIntProperty("xmpp.server.outgoing.max.threads", 20);
...@@ -164,7 +172,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -164,7 +172,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
} }
} }
private static class PacketsProcessor implements Runnable { private class PacketsProcessor implements Runnable {
private OutgoingSessionPromise promise; private OutgoingSessionPromise promise;
private String domain; private String domain;
...@@ -199,7 +207,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -199,7 +207,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
// Create a connection to the remote server from the domain where the packet has been sent // Create a connection to the remote server from the domain where the packet has been sent
boolean created; boolean created;
// Make sure that only one cluster node is creating the outgoing connection // Make sure that only one cluster node is creating the outgoing connection
Lock lock = LockManager.getLock(domain + "oss"); Lock lock = CacheFactory.getLock(domain + "oss", serversCache);
try { try {
lock.lock(); lock.lock();
created = LocalOutgoingServerSession created = LocalOutgoingServerSession
......
...@@ -29,7 +29,6 @@ import org.jivesoftware.util.Log; ...@@ -29,7 +29,6 @@ import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory; import org.xmlpull.v1.XmlPullParserFactory;
...@@ -689,7 +688,7 @@ public class ServerDialback { ...@@ -689,7 +688,7 @@ public class ServerDialback {
*/ */
private static String getSecretkey() { private static String getSecretkey() {
String key = "secretKey"; String key = "secretKey";
Lock lock = LockManager.getLock(key); Lock lock = CacheFactory.getLock(key, secretKeyCache);
try { try {
lock.lock(); lock.lock();
String secret = secretKeyCache.get(key); String secret = secretKeyCache.get(key);
...@@ -705,3 +704,4 @@ public class ServerDialback { ...@@ -705,3 +704,4 @@ public class ServerDialback {
} }
} }
...@@ -34,7 +34,6 @@ import org.jivesoftware.util.Log; ...@@ -34,7 +34,6 @@ import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence; import org.xmpp.packet.Presence;
...@@ -503,7 +502,7 @@ public class PresenceManagerImpl extends BasicModule implements PresenceManager ...@@ -503,7 +502,7 @@ public class PresenceManagerImpl extends BasicModule implements PresenceManager
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
Lock lock = LockManager.getLock(username + "pr"); Lock lock = CacheFactory.getLock(username + "pr", offlinePresenceCache);
try { try {
lock.lock(); lock.lock();
if (!offlinePresenceCache.containsKey(username) || !lastActivityCache.containsKey(username)) { if (!offlinePresenceCache.containsKey(username) || !lastActivityCache.containsKey(username)) {
......
...@@ -25,7 +25,6 @@ import org.jivesoftware.util.JiveGlobals; ...@@ -25,7 +25,6 @@ import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.util.*; import java.util.*;
...@@ -109,7 +108,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -109,7 +108,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
public void addComponentRoute(JID route, RoutableChannelHandler destination) { public void addComponentRoute(JID route, RoutableChannelHandler destination) {
String address = route.getDomain(); String address = route.getDomain();
localRoutingTable.addRoute(address, destination); localRoutingTable.addRoute(address, destination);
Lock lock = LockManager.getLock(address + "rt"); Lock lock = CacheFactory.getLock(address + "rt", componentsCache);
try { try {
lock.lock(); lock.lock();
Set<NodeID> nodes = componentsCache.get(address); Set<NodeID> nodes = componentsCache.get(address);
...@@ -131,7 +130,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -131,7 +130,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null;
// Add the session to the list of user sessions // Add the session to the list of user sessions
if (route.getResource() != null && (!available || added)) { if (route.getResource() != null && (!available || added)) {
Lock lock = LockManager.getLock(route.toBareJID()); Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
try { try {
lock.lock(); lock.lock();
usersSessions.put(route.toBareJID(), Arrays.asList(route.toString())); usersSessions.put(route.toBareJID(), Arrays.asList(route.toString()));
...@@ -145,7 +144,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -145,7 +144,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null;
// Add the session to the list of user sessions // Add the session to the list of user sessions
if (route.getResource() != null && (!available || added)) { if (route.getResource() != null && (!available || added)) {
Lock lock = LockManager.getLock(route.toBareJID()); Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
try { try {
lock.lock(); lock.lock();
Collection<String> jids = usersSessions.get(route.toBareJID()); Collection<String> jids = usersSessions.get(route.toBareJID());
...@@ -627,7 +626,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -627,7 +626,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
anonymous = true; anonymous = true;
} }
if (clientRoute != null && route.getResource() != null) { if (clientRoute != null && route.getResource() != null) {
Lock lock = LockManager.getLock(route.toBareJID()); Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
try { try {
lock.lock(); lock.lock();
if (anonymous) { if (anonymous) {
...@@ -664,7 +663,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -664,7 +663,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
public boolean removeComponentRoute(JID route) { public boolean removeComponentRoute(JID route) {
String address = route.getDomain(); String address = route.getDomain();
boolean removed = false; boolean removed = false;
Lock lock = LockManager.getLock(address + "rt"); Lock lock = CacheFactory.getLock(address + "rt", componentsCache);
try { try {
lock.lock(); lock.lock();
Set<NodeID> nodes = componentsCache.get(address); Set<NodeID> nodes = componentsCache.get(address);
......
...@@ -22,6 +22,7 @@ import org.jivesoftware.util.Log; ...@@ -22,6 +22,7 @@ import org.jivesoftware.util.Log;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
/** /**
* Creates Cache objects. The returned caches will either be local or clustered * Creates Cache objects. The returned caches will either be local or clustered
...@@ -337,12 +338,22 @@ public class CacheFactory { ...@@ -337,12 +338,22 @@ public class CacheFactory {
} }
} }
public static void lockKey(Object key, long timeout) { /**
cacheFactoryStrategy.lockKey(key, timeout); * Returns an existing {@link java.util.concurrent.locks.Lock} on the specified key or creates a new one
} * if none was found. This operation is thread safe. Successive calls with the same key may or may not
* return the same {@link java.util.concurrent.locks.Lock}. However, different threads asking for the
public static void unlockKey(Object key) { * same Lock at the same time will get the same Lock object.<p>
cacheFactoryStrategy.unlockKey(key); *
* The supplied cache may or may not be used depending whether the server is running on cluster mode
* or not. When not running as part of a cluster then the lock will be unrelated to the cache and will
* only be visible in this JVM.
*
* @param key the object that defines the visibility or scope of the lock.
* @param cache the cache used for holding the lock.
* @return an existing lock on the specified key or creates a new one if none was found.
*/
public static Lock getLock(Object key, Cache cache) {
return cacheFactoryStrategy.getLock(key, cache);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
......
...@@ -11,6 +11,7 @@ import org.jivesoftware.openfire.cluster.ClusterNodeInfo; ...@@ -11,6 +11,7 @@ import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.Lock;
/** /**
* Implementation of CacheFactory that relies on the specific clustering solution. * Implementation of CacheFactory that relies on the specific clustering solution.
...@@ -158,21 +159,14 @@ public interface CacheFactoryStrategy { ...@@ -158,21 +159,14 @@ public interface CacheFactoryStrategy {
void updateCacheStats(Map<String, Cache> caches); void updateCacheStats(Map<String, Cache> caches);
/** /**
* Locks the specified key in the locking map. The map should be clusterable * Returns an existing lock on the specified key or creates a new one if none was found. This
* thus locking a key is visible to the cluster. When not in cluster mode * operation is thread safe. The supplied cache may or may not be used depending whether
* the lock is only visible to this JVM. * the server is running on cluster mode or not. When not running as part of a cluster then
* the lock will be unrelated to the cache and will only be visible in this JVM.
* *
* @param key the key to lock. * @param key the object that defines the visibility or scope of the lock.
* @param timeout number of milliseconds to wait to obtain the lock. -1 means wait forever. * @param cache the cache used for holding the lock.
* @return an existing lock on the specified key or creates a new one if none was found.
*/ */
void lockKey(Object key, long timeout); Lock getLock(Object key, Cache cache);
/**
* Unlocks the specified key in the locking map. The map should be clusterable
* thus locking a key is visible to the cluster. When not in cluster mode
* the lock is only visible to this JVM.
*
* @param key the key to unlock.
*/
void unlockKey(Object key);
} }
...@@ -16,6 +16,11 @@ import org.jivesoftware.openfire.cluster.ClusterNodeInfo; ...@@ -16,6 +16,11 @@ import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* CacheFactoryStrategy for use in Openfire. It creates and manages local caches, and it's cluster * CacheFactoryStrategy for use in Openfire. It creates and manages local caches, and it's cluster
...@@ -26,6 +31,10 @@ import java.util.Map; ...@@ -26,6 +31,10 @@ import java.util.Map;
*/ */
public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
/**
* Keep track of the locks that are currently being used.
*/
private Map<Object, LockAndCount> locks = new ConcurrentHashMap<Object, LockAndCount>();
public DefaultLocalCacheStrategy() { public DefaultLocalCacheStrategy() {
} }
...@@ -87,9 +96,99 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -87,9 +96,99 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
public void updateCacheStats(Map<String, Cache> caches) { public void updateCacheStats(Map<String, Cache> caches) {
} }
public void lockKey(Object key, long timeout) { public Lock getLock(Object key, Cache cache) {
Object lockKey = key;
if (key instanceof String) {
lockKey = ((String) key).intern();
}
return new LocalLock(lockKey);
}
private void acquireLock(Object key) {
ReentrantLock lock = lookupLockForAcquire(key);
lock.lock();
}
private void releaseLock(Object key) {
ReentrantLock lock = lookupLockForRelease(key);
lock.unlock();
}
private ReentrantLock lookupLockForAcquire(Object key) {
synchronized(key) {
LockAndCount lac = locks.get(key);
if (lac == null) {
lac = new LockAndCount(new ReentrantLock());
lac.count = 1;
locks.put(key, lac);
}
else {
lac.count++;
}
return lac.lock;
}
}
private ReentrantLock lookupLockForRelease(Object key) {
synchronized(key) {
LockAndCount lac = locks.get(key);
if (lac == null) {
throw new IllegalStateException("No lock found for object " + key);
}
if (lac.count <= 1) {
locks.remove(key);
}
else {
lac.count--;
}
return lac.lock;
}
} }
public void unlockKey(Object key) {
private class LocalLock implements Lock {
private final Object key;
LocalLock(Object key) {
this.key = key;
}
public void lock(){
acquireLock(key);
}
public void unlock() {
releaseLock(key);
}
public void lockInterruptibly(){
throw new UnsupportedOperationException();
}
public Condition newCondition(){
throw new UnsupportedOperationException();
}
public boolean tryLock() {
throw new UnsupportedOperationException();
}
public boolean tryLock(long time, TimeUnit unit) {
throw new UnsupportedOperationException();
}
}
private static class LockAndCount {
final ReentrantLock lock;
int count;
LockAndCount(ReentrantLock lock) {
this.lock = lock;
}
} }
} }
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.util.lock;
import org.jivesoftware.util.TaskEngine;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
/**
* LockFactory to be used when not running in cluster mode. The locks returned by this
* factory are only visibile within this JVM.
*
* @author Gaston Dombiak
*/
public class LocalLockFactory implements LockFactory {
private Map<Object, LockAndCount> locks = new ConcurrentHashMap<Object, LockAndCount>();
public Lock getLock(Object key) {
Object lockKey = key;
if (key instanceof String) {
lockKey = ((String) key).intern();
}
return new LocalLock(lockKey);
}
private void acquireLock(Object key) {
ReentrantLock lock = lookupLockForAcquire(key);
lock.lock();
}
private void releaseLock(Object key) {
ReentrantLock lock = lookupLockForRelease(key);
lock.unlock();
}
private ReentrantLock lookupLockForAcquire(Object key) {
synchronized(key) {
LockAndCount lac = locks.get(key);
if (lac == null) {
lac = new LockAndCount(new ReentrantLock());
lac.count = 1;
locks.put(key, lac);
}
else {
lac.count++;
}
return lac.lock;
}
}
private ReentrantLock lookupLockForRelease(Object key) {
synchronized(key) {
LockAndCount lac = locks.get(key);
if (lac == null) {
throw new IllegalStateException("No lock found for object " + key);
}
if (lac.count <= 1) {
locks.remove(key);
}
else {
lac.count--;
}
return lac.lock;
}
}
private class LocalLock implements Lock {
private final Object key;
LocalLock(Object key) {
this.key = key;
}
public void lock(){
acquireLock(key);
}
public void unlock() {
releaseLock(key);
}
public void lockInterruptibly(){
throw new UnsupportedOperationException();
}
public Condition newCondition(){
throw new UnsupportedOperationException();
}
public boolean tryLock() {
throw new UnsupportedOperationException();
}
public boolean tryLock(long time, TimeUnit unit) {
throw new UnsupportedOperationException();
}
}
private static class LockAndCount {
final ReentrantLock lock;
int count;
LockAndCount(ReentrantLock lock) {
this.lock = lock;
}
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.util.lock;
import java.util.concurrent.locks.Lock;
/**
* Factory that creates new Locks for specified keys and keeps them in memory until they
* are no longer needed.
*
* @author Gaston Dombiak
*/
public interface LockFactory {
/**
* Returns an existing lock on the specified key or creates a new one if none was found. This
* operation should be thread safe.
*
* @param key the object that defines the visibility or scope of the lock.
* @return an existing lock on the specified key or creates a new one if none was found.
*/
Lock getLock(Object key);
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.util.lock;
import java.util.concurrent.locks.Lock;
/**
* Manager of {@link Lock Locks} that could be valid when running within a cluster or when in local mode.
* By default the LockManager will use a {@link org.jivesoftware.util.lock.LocalLockFactory} but
* you can set new factories by sending {@link #setLockFactory(LockFactory)}.
*
* @author Gaston Dombiak
*/
public class LockManager {
private static LockFactory lockFactory;
static {
setLockFactory(new LocalLockFactory());
}
/**
* Returns the existing lock factory being used for creating new Locks.
*
* @return the existing lock factory being used for creating new Locks.
*/
public static LockFactory getLockFactory() {
return lockFactory;
}
/**
* Sets the lock factory to use for creating new Locks. If <tt>null</tt> then
* use {@link LocalLockFactory}.
*
* @param lockFactory the new lock factory to use for creating new Locks.
*/
public static void setLockFactory(LockFactory lockFactory) {
LockManager.lockFactory = lockFactory;
}
/**
* Returns an existing {@link Lock} on the specified key or creates a new one if none was found. This
* operation should be thread safe. Successive calls with the same key may or may not return
* the same {@link Lock}. However, different threads asking for the same Lock at the same time will
* get the same Lock object.
*
* @param key the object that defines the visibility or scope of the lock.
* @return an existing lock on the specified key or creates a new one if none was found.
*/
public static Lock getLock(Object key) {
return lockFactory.getLock(key);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment