Commit 641cdac9 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

More cluster cache work. We are now locking not only the same cache but the same entry.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches@10076 b35dd754-fafc-0310-a699-88a17e54d16e
parent 48238d0e
......@@ -591,7 +591,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
* the session unavailable means that the session is not eligible for receiving messages from
* other clients.
*
* @param session the session that receieved an unavailable presence.
* @param session the session that received an unavailable presence.
*/
public void sessionUnavailable(LocalClientSession session) {
if (session.getAddress() != null && routingTable != null &&
......
......@@ -323,7 +323,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
* @param name the discovered name of the component.
*/
public void addComponentItem(String jid, String node, String name) {
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
Lock lock = CacheFactory.getLock(jid, serverItems);
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
......@@ -359,7 +359,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
// Safety check
return;
}
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
Lock lock = CacheFactory.getLock(jid, serverItems);
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
......@@ -425,7 +425,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
NodeID leftNode = NodeID.getInstance(nodeID);
for (Map.Entry<String, ClusteredServerItem> entry : serverItems.entrySet()) {
String jid = entry.getKey();
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
Lock lock = CacheFactory.getLock(jid, serverItems);
try {
lock.lock();
ClusteredServerItem item = entry.getValue();
......@@ -453,7 +453,7 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
for (Map.Entry<String, Element> entry : localServerItems.entrySet()) {
String jid = entry.getKey();
Element element = entry.getValue();
Lock lock = CacheFactory.getLock(jid + "item", serverItems);
Lock lock = CacheFactory.getLock(jid, serverItems);
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
......
......@@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
/**
* Implements the presence protocol. Clients use this protocol to
......@@ -352,73 +353,79 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
}
if (keepTrack) {
String sender = update.getFrom().toString();
Collection<DirectedPresence> directedPresences = directedPresencesCache.get(sender);
if (Presence.Type.unavailable.equals(update.getType())) {
if (directedPresences != null) {
// It's a directed unavailable presence
if (routingTable.hasClientRoute(handlerJID)) {
// Client sessions will receive only presences to the same JID (the
// address of the session) so remove the handler from the map
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
directedPresences.remove(directedPresence);
break;
Lock lock = CacheFactory.getLock(sender, directedPresencesCache);
try {
lock.lock();
Collection<DirectedPresence> directedPresences = directedPresencesCache.get(sender);
if (Presence.Type.unavailable.equals(update.getType())) {
if (directedPresences != null) {
// It's a directed unavailable presence
if (routingTable.hasClientRoute(handlerJID)) {
// Client sessions will receive only presences to the same JID (the
// address of the session) so remove the handler from the map
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
directedPresences.remove(directedPresence);
break;
}
}
}
}
else {
// A service may receive presences for many JIDs so in this case we
// just need to remove the jid that has received a directed
// unavailable presence
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
directedPresence.removeReceiver(jid);
if (directedPresence.isEmpty()) {
directedPresences.remove(directedPresence);
else {
// A service may receive presences for many JIDs so in this case we
// just need to remove the jid that has received a directed
// unavailable presence
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
directedPresence.removeReceiver(jid);
if (directedPresence.isEmpty()) {
directedPresences.remove(directedPresence);
}
break;
}
break;
}
}
if (directedPresences.isEmpty()) {
// Remove the user from the registry since the list of directed
// presences is empty
directedPresencesCache.remove(sender);
localDirectedPresences.remove(sender);
}
else {
directedPresencesCache.put(sender, directedPresences);
localDirectedPresences.put(sender, directedPresences);
}
}
if (directedPresences.isEmpty()) {
// Remove the user from the registry since the list of directed
// presences is empty
directedPresencesCache.remove(sender);
localDirectedPresences.remove(sender);
}
else {
if (directedPresences == null) {
// We are using a set to avoid duplicate jids in case the user
// sends several directed presences to the same handler. The Map also
// ensures that if the user sends several presences to the same handler
// we will have only one entry in the Map
directedPresences = new ConcurrentLinkedQueue<DirectedPresence>();
}
else {
directedPresencesCache.put(sender, directedPresences);
localDirectedPresences.put(sender, directedPresences);
// Add the handler to the list of handler that processed the directed
// presence sent by the user. This handler will be used to send
// the unavailable presence when the user goes offline
DirectedPresence affectedDirectedPresence = null;
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
affectedDirectedPresence = directedPresence;
break;
}
}
}
}
else {
if (directedPresences == null) {
// We are using a set to avoid duplicate jids in case the user
// sends several directed presences to the same handler. The Map also
// ensures that if the user sends several presences to the same handler
// we will have only one entry in the Map
directedPresences = new ConcurrentLinkedQueue<DirectedPresence>();
}
// Add the handler to the list of handler that processed the directed
// presence sent by the user. This handler will be used to send
// the unavailable presence when the user goes offline
DirectedPresence affectedDirectedPresence = null;
for (DirectedPresence directedPresence : directedPresences) {
if (directedPresence.getHandler().equals(handlerJID)) {
affectedDirectedPresence = directedPresence;
break;
if (affectedDirectedPresence == null) {
affectedDirectedPresence = new DirectedPresence(handlerJID);
directedPresences.add(affectedDirectedPresence);
}
}
affectedDirectedPresence.addReceiver(jid);
if (affectedDirectedPresence == null) {
affectedDirectedPresence = new DirectedPresence(handlerJID);
directedPresences.add(affectedDirectedPresence);
directedPresencesCache.put(sender, directedPresences);
localDirectedPresences.put(sender, directedPresences);
}
affectedDirectedPresence.addReceiver(jid);
directedPresencesCache.put(sender, directedPresences);
localDirectedPresences.put(sender, directedPresences);
} finally {
lock.unlock();
}
}
}
......@@ -437,7 +444,16 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
}
if (localServer.isLocal(from)) {
// Remove the registry of directed presences of this user
Collection<DirectedPresence> directedPresences = directedPresencesCache.remove(from.toString());
Collection<DirectedPresence> directedPresences = null;
Lock lock = CacheFactory.getLock(from.toString(), directedPresencesCache);
try {
lock.lock();
directedPresences = directedPresencesCache.remove(from.toString());
} finally {
lock.unlock();
}
if (directedPresences != null) {
// Iterate over all the entities that the user sent a directed presence
for (DirectedPresence directedPresence : directedPresences) {
......@@ -516,7 +532,19 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
entry.getKey());
continue;
}
directedPresencesCache.put(entry.getKey(), entry.getValue());
// TODO perhaps we should not lock for every entry. Instead, lock it
// once (using a LOCK_ALL global key), and handle iterations in
// one go. We should first make sure that this doesn't lead to
// deadlocks though! The tryLock() mechanism could be used to first
// try one approach, but fall back on the other approach.
Lock lock = CacheFactory.getLock(entry.getKey(), directedPresencesCache);
try {
lock.lock();
directedPresencesCache.put(entry.getKey(), entry.getValue());
} finally {
lock.unlock();
}
}
}
......@@ -533,8 +561,21 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
"PresenceUpdateHandler - Skipping empty directed presences when leaving cluster for sender: " +
entry.getKey());
continue;
}
directedPresencesCache.put(entry.getKey(), entry.getValue());
}
// TODO perhaps we should not lock for every entry. Instead, lock it
// once (using a LOCK_ALL global key), and handle iterations in
// one go. We should first make sure that this doesn't lead to
// deadlocks though! The tryLock() mechanism could be used to first
// try one approach, but fall back on the other approach.
Lock lock = CacheFactory.getLock(entry.getKey(), directedPresencesCache);
try {
lock.lock();
directedPresencesCache.put(entry.getKey(), entry.getValue());
} finally {
lock.unlock();
}
}
}
}
......
......@@ -207,7 +207,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
// Create a connection to the remote server from the domain where the packet has been sent
boolean created;
// Make sure that only one cluster node is creating the outgoing connection
Lock lock = CacheFactory.getLock(domain + "oss", serversCache);
Lock lock = CacheFactory.getLock(domain, serversCache);
try {
lock.lock();
created = LocalOutgoingServerSession
......
......@@ -502,7 +502,7 @@ public class PresenceManagerImpl extends BasicModule implements PresenceManager
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
Lock lock = CacheFactory.getLock(username + "pr", offlinePresenceCache);
Lock lock = CacheFactory.getLock(username, offlinePresenceCache);
try {
lock.lock();
if (!offlinePresenceCache.containsKey(username) || !lastActivityCache.containsKey(username)) {
......
......@@ -108,7 +108,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
public void addComponentRoute(JID route, RoutableChannelHandler destination) {
String address = route.getDomain();
localRoutingTable.addRoute(address, destination);
Lock lock = CacheFactory.getLock(address + "rt", componentsCache);
Lock lock = CacheFactory.getLock(address, componentsCache);
try {
lock.lock();
Set<NodeID> nodes = componentsCache.get(address);
......@@ -663,7 +663,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
public boolean removeComponentRoute(JID route) {
String address = route.getDomain();
boolean removed = false;
Lock lock = CacheFactory.getLock(address + "rt", componentsCache);
Lock lock = CacheFactory.getLock(address, componentsCache);
try {
lock.lock();
Set<NodeID> nodes = componentsCache.get(address);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment