Commit 02b35b47 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

1) Regression test fix.

2) Optimization - removed bottleneck in cluster - removed counter of sessions from cache. Calculated now on demand.
3) Fixed counter of authenticated session to not decrement when non-authenticated session is removed. JM-1095

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8664 b35dd754-fafc-0310-a699-88a17e54d16e
parent 05de80b2
......@@ -243,7 +243,7 @@ public class IQRouter extends BasicModule {
if (recipientJID != null) {
if (routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID)) {
// A component/service/remote server was found that can handle the Packet
routingTable.routePacket(recipientJID, packet);
routingTable.routePacket(recipientJID, packet, false);
return;
}
}
......@@ -298,7 +298,7 @@ public class IQRouter extends BasicModule {
}
else {
// JID is of the form <node@domain/resource>
routingTable.routePacket(recipientJID, packet);
routingTable.routePacket(recipientJID, packet, false);
}
}
catch (Exception e) {
......@@ -325,7 +325,7 @@ public class IQRouter extends BasicModule {
return;
}
// Route the error packet to the original sender of the IQ.
routingTable.routePacket(reply.getTo(), reply);
routingTable.routePacket(reply.getTo(), reply, false);
}
private IQHandler getHandler(String namespace) {
......
......@@ -93,7 +93,7 @@ public class MessageRouter extends BasicModule {
try {
// Deliver stanza to requested route
routingTable.routePacket(recipientJID, packet);
routingTable.routePacket(recipientJID, packet, false);
}
catch (Exception e) {
routingFailed(recipientJID, packet);
......@@ -182,7 +182,7 @@ public class MessageRouter extends BasicModule {
// If message was sent to an unavailable full JID of a user then retry using the bare JID
if (serverName.equals(receipient.getDomain()) && receipient.getResource() != null &&
userManager.isRegisteredUser(receipient.getNode())) {
routingTable.routePacket(new JID(receipient.toBareJID()), packet);
routingTable.routePacket(new JID(receipient.toBareJID()), packet, false);
} else {
// Just store the message offline
messageStrategy.storeOffline((Message) packet);
......
......@@ -133,7 +133,7 @@ public class PresenceRouter extends BasicModule {
// Register the sent directed presence
updateHandler.directedPresenceSent(packet, jid, recipientJID.toString());
// Route the packet
routingTable.routePacket(jid, packet);
routingTable.routePacket(jid, packet, false);
}
}
......@@ -148,7 +148,7 @@ public class PresenceRouter extends BasicModule {
else if (Presence.Type.probe == type) {
// Handle a presence probe sent by a remote server
if (!XMPPServer.getInstance().isLocal(recipientJID)) {
routingTable.routePacket(recipientJID, packet);
routingTable.routePacket(recipientJID, packet, false);
}
else {
// Handle probe to a local user
......@@ -158,7 +158,7 @@ public class PresenceRouter extends BasicModule {
else {
// It's an unknown or ERROR type, just deliver it because there's nothing
// else to do with it
routingTable.routePacket(recipientJID, packet);
routingTable.routePacket(recipientJID, packet, false);
}
}
......
......@@ -143,10 +143,12 @@ public interface RoutingTable {
*
* @param jid the receipient of the packet to route.
* @param packet the packet to route.
* @param fromServer true if the packet was created by the server. This packets should
* always be delivered
* @throws PacketException thrown if the packet is malformed (results in the sender's
* session being shutdown).
*/
void routePacket(JID jid, Packet packet) throws PacketException;
void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException;
/**
* Returns true if a registered user or anonymous user with the specified full JID is
......
......@@ -38,6 +38,7 @@ import org.xmpp.packet.Presence;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
/**
......@@ -63,12 +64,15 @@ public class SessionManager extends BasicModule implements ClusterEventListener
private int conflictLimit;
/**
* Cache (unlimited, never expire) that holds counters of:
* 1) anonymous and non-anonymous user sessions and
* 2) user connections. A connection is counted just after it was created and not
* Counter of user connections. A connection is counted just after it was created and not
* after the user became available.
*/
private Cache<String, Integer> countersCache;
private final AtomicInteger connectionsCounter = new AtomicInteger(0);
/**
* Counter of anonymous and non-anonymous user sessions.
*/
private final AtomicInteger userSessionsCounter = new AtomicInteger(0);
/**
* Cache (unlimited, never expire) that holds external component sessions.
* Key: component address, Value: nodeID
......@@ -300,7 +304,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
// Add to pre-authenticated sessions.
localSessionManager.getPreAuthenticatedSessions().put(session.getAddress().getResource(), session);
// Increment the counter of user sessions
incrementCounter("conncounter");
connectionsCounter.incrementAndGet();
return session;
}
......@@ -316,7 +320,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
conn.init(session);
conn.registerCloseListener(clientSessionListener, session);
localSessionManager.getPreAuthenticatedSessions().put(session.getAddress().getResource(), session);
incrementCounter("conncounter");
connectionsCounter.incrementAndGet();
return session;
}
......@@ -511,7 +515,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
// Add session to the routing table (routing table will know session is not available yet)
if (routingTable.addClientRoute(session.getAddress(), session)) {
// Increment counter of authenticated sessions
incrementCounter("usercounter");
userSessionsCounter.incrementAndGet();
}
SessionEventDispatcher.EventType event = session.getAuthToken().isAnonymous() ?
SessionEventDispatcher.EventType.anonymous_session_created :
......@@ -584,7 +588,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
// Send the presence of the session whose presence has changed to
// this other user's session
presence.setTo(address);
routingTable.routePacket(address, presence);
routingTable.routePacket(address, presence, false);
}
}
......@@ -825,20 +829,44 @@ public class SessionManager extends BasicModule implements ClusterEventListener
* Returns number of client sessions that are connected to the server. Sessions that
* are authenticated and not authenticated will be included
*
* @param onlyLocal true if only sessions connected to this JVM will be considered. Otherwise count cluster wise.
* @return number of client sessions that are connected to the server.
*/
public int getConnectionsCount() {
return getCounter("conncounter");
public int getConnectionsCount(boolean onlyLocal) {
if (onlyLocal) {
return connectionsCounter.get();
}
Collection<Object> results = CacheFactory.doSynchronousClusterTask(new GetSessionsCountTask(false), false);
int total = connectionsCounter.get();
for (Object result : results) {
if (result == null) {
continue;
}
total = total + (Integer) result;
}
return total;
}
/**
* Returns number of client sessions that are authenticated with the server. This includes
* anonymous and non-anoymous users.
*
* @param onlyLocal true if only sessions connected to this JVM will be considered. Otherwise count cluster wise.
* @return number of client sessions that are authenticated with the server.
*/
public int getUserSessionsCount() {
return getCounter("usercounter");
public int getUserSessionsCount(boolean onlyLocal) {
if (onlyLocal) {
return userSessionsCounter.get();
}
Collection<Object> results = CacheFactory.doSynchronousClusterTask(new GetSessionsCountTask(true), false);
int total = userSessionsCounter.get();
for (Object result : results) {
if (result == null) {
continue;
}
total = total + (Integer) result;
}
return total;
}
/**
......@@ -850,12 +878,12 @@ public class SessionManager extends BasicModule implements ClusterEventListener
* @return number of available sessions for a user.
*/
public int getActiveSessionCount(String username) {
return routingTable.getRoutes(new JID(username, serverName, null)).size();
return routingTable.getRoutes(new JID(username, serverName, null, true)).size();
}
public int getSessionCount(String username) {
// TODO Count ALL sessions not only available
return routingTable.getRoutes(new JID(username, serverName, null)).size();
return routingTable.getRoutes(new JID(username, serverName, null, true)).size();
}
/**
......@@ -948,7 +976,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
// TODO broadcast to ALL sessions of the user and not only available
for (JID address : routingTable.getRoutes(new JID(username, serverName, null))) {
packet.setTo(address);
routingTable.routePacket(address, packet);
routingTable.routePacket(address, packet, true);
}
}
......@@ -1018,10 +1046,12 @@ public class SessionManager extends BasicModule implements ClusterEventListener
router.route(offline);
}
if (removed || preauth_removed) {
if (removed) {
// Decrement number of authenticated sessions (of anonymous and non-anonymous users)
decrementCounter("usercounter");
userSessionsCounter.decrementAndGet();
}
// Decrement the counter of user sessions
decrementCounter("conncounter");
connectionsCounter.decrementAndGet();
return true;
}
return false;
......@@ -1203,7 +1233,6 @@ public class SessionManager extends BasicModule implements ClusterEventListener
}
// Initialize caches.
countersCache = CacheFactory.createCache("Session Manager Counters");
componentSessionsCache = CacheFactory.createCache(COMPONENT_SESSION_CACHE_NAME);
multiplexerSessionsCache = CacheFactory.createCache(CM_CACHE_NAME);
incomingServerSessionsCache = CacheFactory.createCache(ISS_CACHE_NAME);
......@@ -1245,7 +1274,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
userBroadcast(address.getNode(), packet);
}
else {
routingTable.routePacket(address, packet);
routingTable.routePacket(address, packet, true);
}
}
......@@ -1357,48 +1386,6 @@ public class SessionManager extends BasicModule implements ClusterEventListener
return JiveGlobals.getIntProperty("xmpp.server.session.idle", 10 * 60 * 1000);
}
private int getCounter(String counterName) {
Integer counter = countersCache.get(counterName);
if (counter == null) {
return 0;
}
return counter;
}
private void incrementCounter(String counterName) {
Lock lock = LockManager.getLock(counterName);
try {
lock.lock();
Integer counter = countersCache.get(counterName);
if (counter == null) {
countersCache.put(counterName, 1);
}
else {
countersCache.put(counterName, ++counter);
}
}
finally {
lock.unlock();
}
}
private void decrementCounter(String counterName) {
Lock lock = LockManager.getLock(counterName);
try {
lock.lock();
Integer counter = countersCache.get(counterName);
if (counter == null) {
Log.error("Failed to decrement counter. Counter not found: " + counterName);
}
else {
countersCache.put(counterName, --counter);
}
}
finally {
lock.unlock();
}
}
public void joinedCluster() {
restoreCacheContent();
}
......@@ -1408,11 +1395,6 @@ public class SessionManager extends BasicModule implements ClusterEventListener
}
public void leftCluster() {
// TODO Send unavailable presence TO roster contacts not hosted in this JVM (type=FROM)
// TODO Send unavailable presence FROM roster contacts not hosted in this JVM (type=TO)
// TODO Send unavailable presence FROM & TO roster contacts not hosted in this JVM (type=BOTH)
// TODO Send unavailable presence TO other resources of the user not hosted in this JVM
if (!XMPPServer.getInstance().isShuttingDown()) {
// Add local sessions to caches
restoreCacheContent();
......@@ -1474,15 +1456,5 @@ public class SessionManager extends BasicModule implements ClusterEventListener
}
}
}
// Update counters of client sessions
for (ClientSession session : routingTable.getClientsRoutes(true)) {
// Increment the counter of user sessions
incrementCounter("conncounter");
if (session.getStatus() == Session.STATUS_AUTHENTICATED) {
// Increment counter of authenticated sessions
incrementCounter("usercounter");
}
}
}
}
......@@ -111,7 +111,7 @@ public class IQOfflineMessagesHandler extends IQHandler implements ServerFeature
offlineInfo.addElement("item").addAttribute("node",
dateFormat.format(offlineMessage.getCreationDate()));
}
routingTable.routePacket(receipient, offlineMessage);
routingTable.routePacket(receipient, offlineMessage, true);
}
public IQHandlerInfo getInfo() {
......
......@@ -155,7 +155,7 @@ public class PresenceSubscribeHandler extends BasicModule implements ChannelHand
Presence presenteToSend = presence.createCopy();
// Stamp the presence with the user's bare JID as the 'from' address
presenteToSend.setFrom(senderJID.toBareJID());
routingTable.routePacket(jid, presenteToSend);
routingTable.routePacket(jid, presenteToSend, false);
}
}
else {
......
......@@ -443,7 +443,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
for (String receiver : directedPresence.getReceivers()) {
Presence presence = update.createCopy();
presence.setTo(receiver);
routingTable.routePacket(directedPresence.getHandler(), presence);
routingTable.routePacket(directedPresence.getHandler(), presence, false);
}
}
......
......@@ -181,7 +181,7 @@ public class PacketCopier implements PacketInterceptor, ComponentEventListener {
childElement.addAttribute("date", dateFormat.format(interceptedPacket.getCreationDate()));
childElement.add(interceptedPacket.getElement().createCopy());
// Send message notification to subscribed component
routingTable.routePacket(message.getTo(), message);
routingTable.routePacket(message.getTo(), message, true);
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
......
......@@ -40,21 +40,21 @@ public class SocketPacketWriteHandler implements ChannelHandler {
JID recipient = packet.getTo();
// Check if the target domain belongs to a remote server or a component
if (server.matchesComponent(recipient) || server.isRemote(recipient)) {
routingTable.routePacket(recipient, packet);
routingTable.routePacket(recipient, packet, false);
}
// The target domain belongs to the local server
else if (recipient == null || (recipient.getNode() == null && recipient.getResource() == null)) {
// no TO was found so send back the packet to the sender
routingTable.routePacket(packet.getFrom(), packet);
routingTable.routePacket(packet.getFrom(), packet, false);
}
else if (recipient.getResource() != null || !(packet instanceof Presence)) {
// JID is of the form <user@domain/resource>
routingTable.routePacket(recipient, packet);
routingTable.routePacket(recipient, packet, false);
}
else {
// JID is of the form <user@domain>
for (JID route : routingTable.getRoutes(recipient)) {
routingTable.routePacket(route, packet);
routingTable.routePacket(route, packet, false);
}
}
}
......
......@@ -584,7 +584,7 @@ public class Roster implements Cacheable, Externalizable {
JID searchNode = new JID(item.getJid().getNode(), item.getJid().getDomain(), null, true);
for (JID jid : routingTable.getRoutes(searchNode)) {
try {
routingTable.routePacket(jid, packet);
routingTable.routePacket(jid, packet, false);
}
catch (Exception e) {
// Theoretically only happens if session has been closed.
......@@ -602,7 +602,7 @@ public class Roster implements Cacheable, Externalizable {
}
for (JID jid: routingTable.getRoutes(new JID(contact))) {
try {
routingTable.routePacket(jid, packet);
routingTable.routePacket(jid, packet, false);
}
catch (Exception e) {
// Theoretically only happens if session has been closed.
......
......@@ -716,7 +716,7 @@ public class RosterManager extends BasicModule implements GroupEventListener, Us
else {
presence.setType(Presence.Type.unsubscribe);
}
routingTable.routePacket(recipient, presence);
routingTable.routePacket(recipient, presence, false);
}
private Collection<Group> getVisibleGroups(Group groupToCheck) {
......
......@@ -209,7 +209,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
}
if (created) {
// A connection to the remote server was created so get the route and send the packet
routingTable.routePacket(packet.getTo(), packet);
routingTable.routePacket(packet.getTo(), packet, false);
}
else {
throw new Exception("Failed to create connection to remote server");
......@@ -237,7 +237,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
reply.setFrom(to);
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
Presence reply = new Presence();
......@@ -245,7 +245,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
reply.setTo(from);
reply.setFrom(to);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Message) {
Message reply = new Message();
......@@ -255,7 +255,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
reply.setType(((Message)packet).getType());
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
......
......@@ -555,7 +555,7 @@ public class LocalOutgoingServerSession extends LocalSession implements Outgoing
reply.setFrom(packet.getTo());
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
Presence reply = new Presence();
......@@ -563,7 +563,7 @@ public class LocalOutgoingServerSession extends LocalSession implements Outgoing
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Message) {
Message reply = new Message();
......@@ -573,7 +573,7 @@ public class LocalOutgoingServerSession extends LocalSession implements Outgoing
reply.setType(((Message)packet).getType());
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
......
......@@ -397,7 +397,7 @@ public class PresenceManagerImpl extends BasicModule implements PresenceManager
presence.setType(Presence.Type.probe);
presence.setFrom(prober);
presence.setTo(probee);
routingTable.routePacket(probee, presence);
routingTable.routePacket(probee, presence, true);
}
else {
// Check if the probee may be hosted by this server
......
......@@ -180,7 +180,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
}
}
public void routePacket(JID jid, Packet packet) throws PacketException {
public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException {
boolean routed = false;
if (serverName.equals(jid.getDomain())) {
if (jid.getResource() == null) {
......@@ -200,7 +200,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
clientRoute = anonymousUsersCache.get(jid.toString());
}
if (clientRoute != null) {
if (!clientRoute.isAvailable() && routeOnlyAvailable(packet)) {
if (!clientRoute.isAvailable() && routeOnlyAvailable(packet, fromServer)) {
// Packet should only be sent to available sessions and the route is not available
routed = false;
}
......@@ -314,9 +314,15 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
* Returns true if the specified packet must only be route to available client sessions.
*
* @param packet the packet to route.
* @param fromServer true if the packet was created by the server.
* @return true if the specified packet must only be route to available client sessions.
*/
private boolean routeOnlyAvailable(Packet packet) {
private boolean routeOnlyAvailable(Packet packet, boolean fromServer) {
if (fromServer) {
// Packets created by the server (no matter their FROM value) must always be delivered no
// matter the available presence of the user
return false;
}
boolean onlyAvailable = true;
JID from = packet.getFrom();
boolean hasSender = from != null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment