Commit f570beb2 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Fixed distributed locking problem when running in a cluster. ENT-425. Reviewed by Daniel.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@10065 b35dd754-fafc-0310-a699-88a17e54d16e
parent 6d4cc5da
......@@ -30,7 +30,6 @@ import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
......@@ -394,7 +393,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
// Keep track of the nodeID hosting the incoming server session
incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray());
// Update list of sockets/sessions coming from the same remote hostname
Lock lock = LockManager.getLock(hostname);
Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try {
lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname);
......@@ -408,7 +407,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
lock.unlock();
}
// Add to clustered cache
lock = LockManager.getLock(streamID);
lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
......@@ -438,7 +437,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
incomingServerSessionsCache.remove(streamID);
// Remove from list of sockets/sessions coming from the remote hostname
Lock lock = LockManager.getLock(hostname);
Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try {
lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname);
......@@ -456,7 +455,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
lock.unlock();
}
// Remove from clustered cache
lock = LockManager.getLock(streamID);
lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
......@@ -488,7 +487,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
* @return domains, subdomains and virtual hosts that where validated.
*/
public Collection<String> getValidatedDomains(String streamID) {
Lock lock = LockManager.getLock(streamID);
Lock lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
......@@ -763,7 +762,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
public List<IncomingServerSession> getIncomingServerSessions(String hostname) {
List<String> streamIDs;
// Get list of sockets/sessions coming from the remote hostname
Lock lock = LockManager.getLock(hostname);
Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try {
lock.lock();
streamIDs = hostnameSessionsCache.get(hostname);
......@@ -1437,7 +1436,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray());
for (String hostname : session.getValidatedDomains()) {
// Update list of sockets/sessions coming from the same remote hostname
Lock lock = LockManager.getLock(hostname);
Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try {
lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname);
......@@ -1451,7 +1450,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
lock.unlock();
}
// Add to clustered cache
lock = LockManager.getLock(streamID);
lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
......
......@@ -16,8 +16,6 @@ import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.JiveProperties;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LocalLockFactory;
import org.jivesoftware.util.lock.LockManager;
import java.util.Collection;
import java.util.Queue;
......@@ -236,8 +234,8 @@ public class ClusterManager {
/**
* Starts the cluster service if clustering is enabled. The process of starting clustering
* will recreate caches as distributed caches.<p>
* <p/>
* Before starting a cluster the {@link LockManager#setLockFactory(org.jivesoftware.util.lock.LockFactory)},
*
* Before starting a cluster the
* {@link XMPPServer#setRemoteSessionLocator(org.jivesoftware.openfire.session.RemoteSessionLocator)} and
* {@link org.jivesoftware.openfire.RoutingTable#setRemotePacketRouter(org.jivesoftware.openfire.RemotePacketRouter)}
* need to be properly configured.
......@@ -267,8 +265,6 @@ public class ClusterManager {
* If clustering is not enabled, this method will do nothing.
*/
public static synchronized void shutdown() {
// Reset the LockFactory to the default one
LockManager.setLockFactory(new LocalLockFactory());
// Reset packet router to use to deliver packets to remote cluster nodes
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
if (isClusteringStarted()) {
......
......@@ -83,14 +83,30 @@ public class CacheInfo {
public static enum Type {
/**
* An optimistic scheme defines a cache which fully replicates all of its data to all cluster nodes that
* are running the service. This cache is good for frequent reads and not frequent writes. However, this
* cache will not scale fine if it has lot of content that will end up consuming all the JVM memory. For
* this case a {@link #distributed} is a better option.
* Data is fully replicated to every member in the cluster. Offers the fastest read performance. Clustered,
* fault-tolerant cache with linear performance scalability for reads, but poor scalability for writes
* (as writes must be processed by every member in the cluster). Because data is replicated to all machines,
* adding servers does not increase aggregate cache capacity.
*/
replicated("replicated"),
/**
* OptimisticCache is a clustered cache implementation similar to the ReplicatedCache implementation, but
* without any concurrency control. This implementation has the highest possible throughput. It also allows
* to use an alternative underlying store for the cached data (for example, a MRU/MFU-based cache). However,
* if two cluster members are independently pruning or purging the underlying local stores, it is possible
* that a cluster member may have a different store content than that held by another cluster member.
* This cache is good for frequent reads and not frequent writes. However, this cache will not scale fine
* if it has lot of content that will end up consuming all the JVM memory. For this case a
* {@link #distributed} is a better option.
*/
optimistic("optimistic"),
/**
* An distributed-scheme defines caches where the storage for entries is partitioned across cluster nodes.
* A hybrid cache; fronts a fault-tolerant, scalable partitioned cache with a local cache. Near cache
* invalidates front cache entries, using configurable invalidation strategy, and provides excellent
* performance and synchronization. Near cache backed by a partitioned cache offers zero-millisecond local
* access for repeat data access, while enabling concurrency and ensuring coherency and fail-over,
* effectively combining the best attributes of replicated and partitioned caches.
*/
distributed("near-distributed");
......
......@@ -28,7 +28,6 @@ import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError;
......@@ -260,7 +259,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
*/
public void addServerFeature(String namespace) {
if (localServerFeatures.add(namespace)) {
Lock lock = LockManager.getLock(namespace);
Lock lock = CacheFactory.getLock(namespace, serverFeatures);
try {
lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(namespace);
......@@ -284,7 +283,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
*/
public void removeServerFeature(String namespace) {
if (localServerFeatures.remove(namespace)) {
Lock lock = LockManager.getLock(namespace);
Lock lock = CacheFactory.getLock(namespace, serverFeatures);
try {
lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(namespace);
......@@ -353,7 +352,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
// Remove server features added by node that is gone
for (Map.Entry<String, Set<NodeID>> entry : serverFeatures.entrySet()) {
String namespace = entry.getKey();
Lock lock = LockManager.getLock(namespace);
Lock lock = CacheFactory.getLock(namespace, serverFeatures);
try {
lock.lock();
Set<NodeID> nodeIDs = entry.getValue();
......@@ -379,7 +378,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
private void restoreCacheContent() {
for (String feature : localServerFeatures) {
Lock lock = LockManager.getLock(feature);
Lock lock = CacheFactory.getLock(feature, serverFeatures);
try {
lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(feature);
......
......@@ -32,7 +32,6 @@ import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError;
......@@ -324,7 +323,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
* @param name the discovered name of the component.
*/
public void addComponentItem(String jid, String node, String name) {
Lock lock = LockManager.getLock(jid + "item");
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
......@@ -356,7 +355,11 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
* @param jid the jid of the component being removed.
*/
public void removeComponentItem(String jid) {
Lock lock = LockManager.getLock(jid + "item");
if (serverItems == null) {
// Safety check
return;
}
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
......@@ -422,7 +425,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
NodeID leftNode = NodeID.getInstance(nodeID);
for (Map.Entry<String, ClusteredServerItem> entry : serverItems.entrySet()) {
String jid = entry.getKey();
Lock lock = LockManager.getLock(jid + "item");
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
try {
lock.lock();
ClusteredServerItem item = entry.getValue();
......@@ -450,7 +453,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
for (Map.Entry<String, Element> entry : localServerItems.entrySet()) {
String jid = entry.getKey();
Element element = entry.getValue();
Lock lock = LockManager.getLock(jid + "item");
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
......
......@@ -511,6 +511,11 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
// we could still send directed presences to entities that when connected to a cluster
// they will be replicated. An example would be MUC rooms.
for (Map.Entry<String, Collection<DirectedPresence>> entry : localDirectedPresences.entrySet()) {
if (entry.getValue().isEmpty()) {
Log.warn("PresenceUpdateHandler - Skipping empty directed presences when joining cluster for sender: " +
entry.getKey());
continue;
}
directedPresencesCache.put(entry.getKey(), entry.getValue());
}
}
......@@ -523,6 +528,12 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
if (!XMPPServer.getInstance().isShuttingDown()) {
// Populate directedPresencesCache with local content
for (Map.Entry<String, Collection<DirectedPresence>> entry : localDirectedPresences.entrySet()) {
if (entry.getValue().isEmpty()) {
Log.warn(
"PresenceUpdateHandler - Skipping empty directed presences when leaving cluster for sender: " +
entry.getKey());
continue;
}
directedPresencesCache.put(entry.getKey(), entry.getValue());
}
}
......
......@@ -15,9 +15,11 @@ import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.lock.LockManager;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.xmpp.packet.*;
import java.util.HashMap;
......@@ -55,6 +57,11 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
private Map<String, PacketsProcessor> packetsProcessors = new HashMap<String, PacketsProcessor>();
/**
* Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
* Key: server domain, Value: nodeID
*/
private Cache<String, byte[]> serversCache;
/**
* Flag that indicates if the process that consumed the queued packets should stop.
*/
......@@ -67,6 +74,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
}
private void init() {
serversCache = CacheFactory.createCache(RoutingTableImpl.S2S_CACHE_NAME);
routingTable = XMPPServer.getInstance().getRoutingTable();
// Create a pool of threads that will process queued packets.
int maxThreads = JiveGlobals.getIntProperty("xmpp.server.outgoing.max.threads", 20);
......@@ -164,7 +172,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
}
}
private static class PacketsProcessor implements Runnable {
private class PacketsProcessor implements Runnable {
private OutgoingSessionPromise promise;
private String domain;
......@@ -199,7 +207,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
// Create a connection to the remote server from the domain where the packet has been sent
boolean created;
// Make sure that only one cluster node is creating the outgoing connection
Lock lock = LockManager.getLock(domain + "oss");
Lock lock = CacheFactory.getLock(domain + "oss", serversCache);
try {
lock.lock();
created = LocalOutgoingServerSession
......
......@@ -29,7 +29,6 @@ import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
......@@ -689,7 +688,7 @@ public class ServerDialback {
*/
private static String getSecretkey() {
String key = "secretKey";
Lock lock = LockManager.getLock(key);
Lock lock = CacheFactory.getLock(key, secretKeyCache);
try {
lock.lock();
String secret = secretKeyCache.get(key);
......
......@@ -34,7 +34,6 @@ import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
......@@ -503,7 +502,7 @@ public class PresenceManagerImpl extends BasicModule implements PresenceManager
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
Lock lock = LockManager.getLock(username + "pr");
Lock lock = CacheFactory.getLock(username + "pr", offlinePresenceCache);
try {
lock.lock();
if (!offlinePresenceCache.containsKey(username) || !lastActivityCache.containsKey(username)) {
......
......@@ -25,7 +25,6 @@ import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.*;
import java.util.*;
......@@ -109,7 +108,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
public void addComponentRoute(JID route, RoutableChannelHandler destination) {
String address = route.getDomain();
localRoutingTable.addRoute(address, destination);
Lock lock = LockManager.getLock(address + "rt");
Lock lock = CacheFactory.getLock(address + "rt", componentsCache);
try {
lock.lock();
Set<NodeID> nodes = componentsCache.get(address);
......@@ -131,7 +130,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null;
// Add the session to the list of user sessions
if (route.getResource() != null && (!available || added)) {
Lock lock = LockManager.getLock(route.toBareJID());
Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
try {
lock.lock();
usersSessions.put(route.toBareJID(), Arrays.asList(route.toString()));
......@@ -145,7 +144,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null;
// Add the session to the list of user sessions
if (route.getResource() != null && (!available || added)) {
Lock lock = LockManager.getLock(route.toBareJID());
Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
try {
lock.lock();
Collection<String> jids = usersSessions.get(route.toBareJID());
......@@ -627,7 +626,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
anonymous = true;
}
if (clientRoute != null && route.getResource() != null) {
Lock lock = LockManager.getLock(route.toBareJID());
Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
try {
lock.lock();
if (anonymous) {
......@@ -664,7 +663,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
public boolean removeComponentRoute(JID route) {
String address = route.getDomain();
boolean removed = false;
Lock lock = LockManager.getLock(address + "rt");
Lock lock = CacheFactory.getLock(address + "rt", componentsCache);
try {
lock.lock();
Set<NodeID> nodes = componentsCache.get(address);
......
......@@ -22,6 +22,7 @@ import org.jivesoftware.util.Log;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
/**
* Creates Cache objects. The returned caches will either be local or clustered
......@@ -337,12 +338,22 @@ public class CacheFactory {
}
}
public static void lockKey(Object key, long timeout) {
cacheFactoryStrategy.lockKey(key, timeout);
}
public static void unlockKey(Object key) {
cacheFactoryStrategy.unlockKey(key);
/**
* Returns an existing {@link java.util.concurrent.locks.Lock} on the specified key or creates a new one
* if none was found. This operation is thread safe. Successive calls with the same key may or may not
* return the same {@link java.util.concurrent.locks.Lock}. However, different threads asking for the
* same Lock at the same time will get the same Lock object.<p>
*
* The supplied cache may or may not be used depending whether the server is running on cluster mode
* or not. When not running as part of a cluster then the lock will be unrelated to the cache and will
* only be visible in this JVM.
*
* @param key the object that defines the visibility or scope of the lock.
* @param cache the cache used for holding the lock.
* @return an existing lock on the specified key or creates a new one if none was found.
*/
public static Lock getLock(Object key, Cache cache) {
return cacheFactoryStrategy.getLock(key, cache);
}
@SuppressWarnings("unchecked")
......
......@@ -11,6 +11,7 @@ import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.locks.Lock;
/**
* Implementation of CacheFactory that relies on the specific clustering solution.
......@@ -158,21 +159,14 @@ public interface CacheFactoryStrategy {
void updateCacheStats(Map<String, Cache> caches);
/**
* Locks the specified key in the locking map. The map should be clusterable
* thus locking a key is visible to the cluster. When not in cluster mode
* the lock is only visible to this JVM.
* Returns an existing lock on the specified key or creates a new one if none was found. This
* operation is thread safe. The supplied cache may or may not be used depending whether
* the server is running on cluster mode or not. When not running as part of a cluster then
* the lock will be unrelated to the cache and will only be visible in this JVM.
*
* @param key the key to lock.
* @param timeout number of milliseconds to wait to obtain the lock. -1 means wait forever.
* @param key the object that defines the visibility or scope of the lock.
* @param cache the cache used for holding the lock.
* @return an existing lock on the specified key or creates a new one if none was found.
*/
void lockKey(Object key, long timeout);
/**
* Unlocks the specified key in the locking map. The map should be clusterable
* thus locking a key is visible to the cluster. When not in cluster mode
* the lock is only visible to this JVM.
*
* @param key the key to unlock.
*/
void unlockKey(Object key);
Lock getLock(Object key, Cache cache);
}
......@@ -16,6 +16,11 @@ import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* CacheFactoryStrategy for use in Openfire. It creates and manages local caches, and it's cluster
......@@ -26,6 +31,10 @@ import java.util.Map;
*/
public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
/**
* Keep track of the locks that are currently being used.
*/
private Map<Object, LockAndCount> locks = new ConcurrentHashMap<Object, LockAndCount>();
public DefaultLocalCacheStrategy() {
}
......@@ -87,9 +96,99 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
public void updateCacheStats(Map<String, Cache> caches) {
}
public void lockKey(Object key, long timeout) {
public Lock getLock(Object key, Cache cache) {
Object lockKey = key;
if (key instanceof String) {
lockKey = ((String) key).intern();
}
return new LocalLock(lockKey);
}
private void acquireLock(Object key) {
ReentrantLock lock = lookupLockForAcquire(key);
lock.lock();
}
private void releaseLock(Object key) {
ReentrantLock lock = lookupLockForRelease(key);
lock.unlock();
}
private ReentrantLock lookupLockForAcquire(Object key) {
synchronized(key) {
LockAndCount lac = locks.get(key);
if (lac == null) {
lac = new LockAndCount(new ReentrantLock());
lac.count = 1;
locks.put(key, lac);
}
else {
lac.count++;
}
return lac.lock;
}
}
public void unlockKey(Object key) {
private ReentrantLock lookupLockForRelease(Object key) {
synchronized(key) {
LockAndCount lac = locks.get(key);
if (lac == null) {
throw new IllegalStateException("No lock found for object " + key);
}
if (lac.count <= 1) {
locks.remove(key);
}
else {
lac.count--;
}
return lac.lock;
}
}
private class LocalLock implements Lock {
private final Object key;
LocalLock(Object key) {
this.key = key;
}
public void lock(){
acquireLock(key);
}
public void unlock() {
releaseLock(key);
}
public void lockInterruptibly(){
throw new UnsupportedOperationException();
}
public Condition newCondition(){
throw new UnsupportedOperationException();
}
public boolean tryLock() {
throw new UnsupportedOperationException();
}
public boolean tryLock(long time, TimeUnit unit) {
throw new UnsupportedOperationException();
}
}
private static class LockAndCount {
final ReentrantLock lock;
int count;
LockAndCount(ReentrantLock lock) {
this.lock = lock;
}
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.util.lock;
import org.jivesoftware.util.TaskEngine;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
/**
* LockFactory to be used when not running in cluster mode. The locks returned by this
* factory are only visibile within this JVM.
*
* @author Gaston Dombiak
*/
public class LocalLockFactory implements LockFactory {
private Map<Object, LockAndCount> locks = new ConcurrentHashMap<Object, LockAndCount>();
public Lock getLock(Object key) {
Object lockKey = key;
if (key instanceof String) {
lockKey = ((String) key).intern();
}
return new LocalLock(lockKey);
}
private void acquireLock(Object key) {
ReentrantLock lock = lookupLockForAcquire(key);
lock.lock();
}
private void releaseLock(Object key) {
ReentrantLock lock = lookupLockForRelease(key);
lock.unlock();
}
private ReentrantLock lookupLockForAcquire(Object key) {
synchronized(key) {
LockAndCount lac = locks.get(key);
if (lac == null) {
lac = new LockAndCount(new ReentrantLock());
lac.count = 1;
locks.put(key, lac);
}
else {
lac.count++;
}
return lac.lock;
}
}
private ReentrantLock lookupLockForRelease(Object key) {
synchronized(key) {
LockAndCount lac = locks.get(key);
if (lac == null) {
throw new IllegalStateException("No lock found for object " + key);
}
if (lac.count <= 1) {
locks.remove(key);
}
else {
lac.count--;
}
return lac.lock;
}
}
private class LocalLock implements Lock {
private final Object key;
LocalLock(Object key) {
this.key = key;
}
public void lock(){
acquireLock(key);
}
public void unlock() {
releaseLock(key);
}
public void lockInterruptibly(){
throw new UnsupportedOperationException();
}
public Condition newCondition(){
throw new UnsupportedOperationException();
}
public boolean tryLock() {
throw new UnsupportedOperationException();
}
public boolean tryLock(long time, TimeUnit unit) {
throw new UnsupportedOperationException();
}
}
private static class LockAndCount {
final ReentrantLock lock;
int count;
LockAndCount(ReentrantLock lock) {
this.lock = lock;
}
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.util.lock;
import java.util.concurrent.locks.Lock;
/**
* Factory that creates new Locks for specified keys and keeps them in memory until they
* are no longer needed.
*
* @author Gaston Dombiak
*/
public interface LockFactory {
/**
* Returns an existing lock on the specified key or creates a new one if none was found. This
* operation should be thread safe.
*
* @param key the object that defines the visibility or scope of the lock.
* @return an existing lock on the specified key or creates a new one if none was found.
*/
Lock getLock(Object key);
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.util.lock;
import java.util.concurrent.locks.Lock;
/**
* Manager of {@link Lock Locks} that could be valid when running within a cluster or when in local mode.
* By default the LockManager will use a {@link org.jivesoftware.util.lock.LocalLockFactory} but
* you can set new factories by sending {@link #setLockFactory(LockFactory)}.
*
* @author Gaston Dombiak
*/
public class LockManager {
private static LockFactory lockFactory;
static {
setLockFactory(new LocalLockFactory());
}
/**
* Returns the existing lock factory being used for creating new Locks.
*
* @return the existing lock factory being used for creating new Locks.
*/
public static LockFactory getLockFactory() {
return lockFactory;
}
/**
* Sets the lock factory to use for creating new Locks. If <tt>null</tt> then
* use {@link LocalLockFactory}.
*
* @param lockFactory the new lock factory to use for creating new Locks.
*/
public static void setLockFactory(LockFactory lockFactory) {
LockManager.lockFactory = lockFactory;
}
/**
* Returns an existing {@link Lock} on the specified key or creates a new one if none was found. This
* operation should be thread safe. Successive calls with the same key may or may not return
* the same {@link Lock}. However, different threads asking for the same Lock at the same time will
* get the same Lock object.
*
* @param key the object that defines the visibility or scope of the lock.
* @return an existing lock on the specified key or creates a new one if none was found.
*/
public static Lock getLock(Object key) {
return lockFactory.getLock(key);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment