Commit 9f16356a authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Clustering work.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8353 b35dd754-fafc-0310-a699-88a17e54d16e
parent 542205c4
...@@ -12,8 +12,6 @@ ...@@ -12,8 +12,6 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.handler.IQHandler; import org.jivesoftware.openfire.handler.IQHandler;
...@@ -24,10 +22,9 @@ import org.jivesoftware.openfire.privacy.PrivacyListManager; ...@@ -24,10 +22,9 @@ import org.jivesoftware.openfire.privacy.PrivacyListManager;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.xmpp.packet.IQ; import org.jivesoftware.util.LocaleUtils;
import org.xmpp.packet.JID; import org.jivesoftware.util.Log;
import org.xmpp.packet.Message; import org.xmpp.packet.*;
import org.xmpp.packet.PacketError;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -244,10 +241,9 @@ public class IQRouter extends BasicModule { ...@@ -244,10 +241,9 @@ public class IQRouter extends BasicModule {
try { try {
// Check for registered components, services or remote servers // Check for registered components, services or remote servers
if (recipientJID != null) { if (recipientJID != null) {
RoutableChannelHandler serviceRoute = routingTable.getRoute(recipientJID); if (routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID)) {
if (serviceRoute != null && !(serviceRoute instanceof ClientSession)) {
// A component/service/remote server was found that can handle the Packet // A component/service/remote server was found that can handle the Packet
serviceRoute.process(packet); routingTable.routePacket(recipientJID, packet);
return; return;
} }
} }
...@@ -266,10 +262,9 @@ public class IQRouter extends BasicModule { ...@@ -266,10 +262,9 @@ public class IQRouter extends BasicModule {
} }
else { else {
// Check if communication to local users is allowed // Check if communication to local users is allowed
if (recipientJID != null && if (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {
userManager.isRegisteredUser(recipientJID.getNode())) { PrivacyList list =
PrivacyList list = PrivacyListManager.getInstance() PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());
.getDefaultPrivacyList(recipientJID.getNode());
if (list != null && list.shouldBlockPacket(packet)) { if (list != null && list.shouldBlockPacket(packet)) {
// Communication is blocked // Communication is blocked
if (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) { if (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {
...@@ -300,41 +295,10 @@ public class IQRouter extends BasicModule { ...@@ -300,41 +295,10 @@ public class IQRouter extends BasicModule {
handler.process(packet); handler.process(packet);
} }
} }
} }
else { else {
// JID is of the form <node@domain/resource> // JID is of the form <node@domain/resource>
boolean handlerFound = false; routingTable.routePacket(recipientJID, packet);
// IQ packets should be sent to users even before they send an available presence.
// So if the target address belongs to this server then use the sessionManager
// instead of the routingTable since unavailable clients won't have a route to them
if (XMPPServer.getInstance().isLocal(recipientJID)) {
ClientSession session = sessionManager.getSession(recipientJID);
if (session != null) {
if (session.canProcess(packet)) {
session.process(packet);
handlerFound = true;
}
}
else {
Log.info("Packet sent to unreachable address " + packet);
}
}
else {
ChannelHandler route = routingTable.getRoute(recipientJID);
if (route != null) {
route.process(packet);
handlerFound = true;
}
else {
Log.info("Packet sent to unreachable address " + packet);
}
}
// If a route to the target address was not found then try to answer a
// service_unavailable error code to the sender of the IQ packet
if (!handlerFound && IQ.Type.result != packet.getType() && IQ.Type.error != packet.getType()) {
sendErrorPacket(packet, PacketError.Condition.service_unavailable);
}
} }
} }
catch (Exception e) { catch (Exception e) {
...@@ -349,8 +313,7 @@ public class IQRouter extends BasicModule { ...@@ -349,8 +313,7 @@ public class IQRouter extends BasicModule {
} }
} }
private void sendErrorPacket(IQ originalPacket, PacketError.Condition condition) private void sendErrorPacket(IQ originalPacket, PacketError.Condition condition) {
throws UnauthorizedException {
if (IQ.Type.error == originalPacket.getType()) { if (IQ.Type.error == originalPacket.getType()) {
Log.error("Cannot reply an IQ error to another IQ error: " + originalPacket); Log.error("Cannot reply an IQ error to another IQ error: " + originalPacket);
return; return;
...@@ -364,23 +327,11 @@ public class IQRouter extends BasicModule { ...@@ -364,23 +327,11 @@ public class IQRouter extends BasicModule {
handle(reply); handle(reply);
return; return;
} }
// Locate a route to the sender of the IQ and ask it to process // Route the error packet to the original sender of the IQ.
// the packet. Use the routingTable so that routes to remote servers try {
// may be found routingTable.routePacket(reply.getTo(), reply);
ChannelHandler route = routingTable.getRoute(originalPacket.getFrom()); } catch (UnauthorizedException e) {
if (route != null) { // Should never happen
route.process(reply);
}
else {
// No root was found so try looking for local sessions that have never
// sent an available presence or haven't authenticated yet
Session session = sessionManager.getSession(originalPacket.getFrom());
if (session != null) {
session.process(reply);
}
else {
Log.warn("Error packet could not be delivered " + reply);
}
} }
} }
...@@ -398,4 +349,22 @@ public class IQRouter extends BasicModule { ...@@ -398,4 +349,22 @@ public class IQRouter extends BasicModule {
} }
return handler; return handler;
} }
}
\ No newline at end of file /**
* Notification message indicating that a packet has failed to be routed to the receipient.
*
* @param packet IQ packet that failed to be sent to the receipient.
*/
public void routingFailed(Packet packet) {
IQ iq = (IQ) packet;
// If a route to the target address was not found then try to answer a
// service_unavailable error code to the sender of the IQ packet
if (IQ.Type.result != iq.getType() && IQ.Type.error != iq.getType()) {
Log.info("Packet sent to unreachable address " + packet);
sendErrorPacket(iq, PacketError.Condition.service_unavailable);
}
else {
Log.warn("Error or result packet could not be delivered " + packet);
}
}
}
...@@ -18,10 +18,7 @@ import org.jivesoftware.openfire.session.ClientSession; ...@@ -18,10 +18,7 @@ import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.xmpp.packet.JID; import org.xmpp.packet.*;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
import java.util.*; import java.util.*;
...@@ -100,8 +97,8 @@ public class MessageRouter extends BasicModule { ...@@ -100,8 +97,8 @@ public class MessageRouter extends BasicModule {
routeToBareJID(recipientJID, packet); routeToBareJID(recipientJID, packet);
} }
else { else {
// Deliver stanza to best route // Deliver stanza to requested route
routingTable.getBestRoute(recipientJID).process(packet); routingTable.routePacket(recipientJID, packet);
} }
} }
catch (Exception e) { catch (Exception e) {
...@@ -279,4 +276,13 @@ public class MessageRouter extends BasicModule { ...@@ -279,4 +276,13 @@ public class MessageRouter extends BasicModule {
multicastRouter = server.getMulticastRouter(); multicastRouter = server.getMulticastRouter();
serverName = server.getServerInfo().getName(); serverName = server.getServerInfo().getName();
} }
/**
* Notification message indicating that a packet has failed to be routed to the receipient.
*
* @param packet Message packet that failed to be sent to the receipient.
*/
public void routingFailed(Packet packet) {
messageStrategy.storeOffline((Message) packet);
}
} }
...@@ -11,12 +11,12 @@ ...@@ -11,12 +11,12 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.privacy.PrivacyList; import org.jivesoftware.openfire.privacy.PrivacyList;
import org.jivesoftware.openfire.privacy.PrivacyListManager; import org.jivesoftware.openfire.privacy.PrivacyListManager;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
...@@ -67,7 +67,7 @@ public class OfflineMessageStrategy extends BasicModule { ...@@ -67,7 +67,7 @@ public class OfflineMessageStrategy extends BasicModule {
public void storeOffline(Message message) { public void storeOffline(Message message) {
if (message != null) { if (message != null) {
// Do nothing if the message was sent to the server itself or to an anonymous user // Do nothing if the message was sent to the server itself, an anonymous user or a non-existent user
JID recipientJID = message.getTo(); JID recipientJID = message.getTo();
if (recipientJID == null || serverAddress.equals(recipientJID) || if (recipientJID == null || serverAddress.equals(recipientJID) ||
recipientJID.getNode() == null || recipientJID.getNode() == null ||
......
...@@ -11,8 +11,6 @@ ...@@ -11,8 +11,6 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.handler.PresenceSubscribeHandler; import org.jivesoftware.openfire.handler.PresenceSubscribeHandler;
import org.jivesoftware.openfire.handler.PresenceUpdateHandler; import org.jivesoftware.openfire.handler.PresenceUpdateHandler;
...@@ -20,10 +18,9 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager; ...@@ -20,10 +18,9 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager;
import org.jivesoftware.openfire.interceptor.PacketRejectedException; import org.jivesoftware.openfire.interceptor.PacketRejectedException;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.xmpp.packet.JID; import org.jivesoftware.util.LocaleUtils;
import org.xmpp.packet.Message; import org.jivesoftware.util.Log;
import org.xmpp.packet.PacketError; import org.xmpp.packet.*;
import org.xmpp.packet.Presence;
/** /**
* <p>Route presence packets throughout the server.</p> * <p>Route presence packets throughout the server.</p>
...@@ -132,11 +129,11 @@ public class PresenceRouter extends BasicModule { ...@@ -132,11 +129,11 @@ public class PresenceRouter extends BasicModule {
// The user sent a directed presence to an entity // The user sent a directed presence to an entity
// Broadcast it to all connected resources // Broadcast it to all connected resources
for (ChannelHandler route : routingTable.getRoutes(recipientJID)) { for (JID jid : routingTable.getRoutes(recipientJID)) {
// Register the sent directed presence // Register the sent directed presence
updateHandler.directedPresenceSent(packet, route, recipientJID.toString()); updateHandler.directedPresenceSent(packet, jid, recipientJID.toString());
// Route the packet // Route the packet
route.process(packet); routingTable.routePacket(jid, packet);
} }
} }
...@@ -151,11 +148,7 @@ public class PresenceRouter extends BasicModule { ...@@ -151,11 +148,7 @@ public class PresenceRouter extends BasicModule {
else if (Presence.Type.probe == type) { else if (Presence.Type.probe == type) {
// Handle a presence probe sent by a remote server // Handle a presence probe sent by a remote server
if (!XMPPServer.getInstance().isLocal(recipientJID)) { if (!XMPPServer.getInstance().isLocal(recipientJID)) {
// Target is a component of the server so forward it routingTable.routePacket(recipientJID, packet);
ChannelHandler route = routingTable.getRoute(recipientJID);
if (route != null) {
route.process(packet);
}
} }
else { else {
// Handle probe to a local user // Handle probe to a local user
...@@ -165,10 +158,7 @@ public class PresenceRouter extends BasicModule { ...@@ -165,10 +158,7 @@ public class PresenceRouter extends BasicModule {
else { else {
// It's an unknown or ERROR type, just deliver it because there's nothing // It's an unknown or ERROR type, just deliver it because there's nothing
// else to do with it // else to do with it
ChannelHandler route = routingTable.getRoute(recipientJID); routingTable.routePacket(recipientJID, packet);
if (route != null) {
route.process(packet);
}
} }
} }
...@@ -194,4 +184,13 @@ public class PresenceRouter extends BasicModule { ...@@ -194,4 +184,13 @@ public class PresenceRouter extends BasicModule {
multicastRouter = server.getMulticastRouter(); multicastRouter = server.getMulticastRouter();
sessionManager = server.getSessionManager(); sessionManager = server.getSessionManager();
} }
/**
* Notification message indicating that a packet has failed to be routed to the receipient.
*
* @param packet Presence packet that failed to be sent to the receipient.
*/
public void routingFailed(Packet packet) {
// presence packets are dropped silently
}
} }
...@@ -11,7 +11,10 @@ ...@@ -11,7 +11,10 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.ClientSession;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import java.util.List; import java.util.List;
...@@ -74,74 +77,166 @@ import java.util.List; ...@@ -74,74 +77,166 @@ import java.util.List;
public interface RoutingTable { public interface RoutingTable {
/** /**
* <p>Add a route to the routing table.</p> * Adds a route to the routing table for the specified outoing server session. When running
* <p>A single access method allows you to add any of the acceptable * inside of a cluster this message <tt>must</tt> be sent from the cluster node that is
* route to the table. It is expected that routes are added and removed * actually holding the physical connectoin to the remote server.
* on a relatively rare occassion so routing tables should be optimized *
* for lookup speed.</p> * @param route the address associated to the route.
* * @param destination the outgoing server session.
* @param node The route's destination node */
* @param destination The destination object for this route void addServerRoute(JID route, RoutableChannelHandler destination);
/**
* Adds a route to the routing table for the specified internal or external component. When
* running inside of a cluster this message <tt>must</tt> be sent from the cluster node
* that is actually hosting the component.
*
* @param route the address associated to the route.
* @param destination the component.
*/ */
void addRoute(JID node, RoutableChannelHandler destination); void addComponentRoute(JID route, RoutableChannelHandler destination);
/** /**
* <p>Obtain a route to a packet handler for the given node.</p> * Adds a route to the routing table for the specified client session. The client
* <p>If a route doesn't exist, the method returns null.</p> * session will be added as soon as the user has finished authenticating with the server.
* Moreover, when the user becomes available or unavailable then the routing table will
* get updated again. When running inside of a cluster this message <tt>must</tt> be sent
* from the cluster node that is actually holding the client session.
* *
* @param node The address we want a route to * @param route the address associated to the route.
* @return The handler corresponding to the route, or null indicating no route exists * @param destination the client session.
*/ */
RoutableChannelHandler getRoute(JID node); void addClientRoute(JID route, ClientSession destination);
/** /**
* <p>Obtain all child routes for the given node.</p> * Routes a packet to the specified address. The packet destination can be a
* <p>See the class documentation for the matching algorithm of child routes for * user on the local server, a component, or a foreign server.<p>
* any given node. If a route doesn't exist, the method returns an empty iterator (not null).</p>
* *
* @param node The address we want a route to * When routing a packet to a remote server then a new outgoing connection
* @return An iterator over all applicable routes * will be created to the remote server if none was found and the packet
* will be delivered. If an existing outgoing connection already exists then
* it will be used for delivering the packet. Moreover, when runing inside of a cluster
* the node that has the actual outgoing connection will be requested to deliver
* the requested packet.<p>
*
* Packets routed to components will only be sent if the internal or external
* component is connected to the server. Moreover, when runing inside of a cluster
* the node that is hosting the component will be requested to deliver the requested
* packet.<p>
*
* Packets routed to users will be delivered if the user is connected to the server. Depending
* on the packet type and the sender of the packet only available or all user sessions could
* be considered. For instance, {@link org.xmpp.packet.Message Messages} and
* {@link org.xmpp.packet.Presence Presences} are only sent to available client sessions whilst
* {@link org.xmpp.packet.IQ IQs} originated to the server can be sent to available or unavailable
* sessions. When runing inside of a cluster the node that is hosting the user session will be
* requested to deliver the requested packet.<p>
*
* @param jid the receipient of the packet to route.
* @param packet the packet to route.
* @throws UnauthorizedException if not allowed to process the packet.
* @throws PacketException thrown if the packet is malformed (results in the sender's
* session being shutdown).
*/ */
List<ChannelHandler> getRoutes(JID node); void routePacket(JID jid, Packet packet) throws PacketException, UnauthorizedException;
/** /**
* <p>Obtain a route to a handler at the given node falling back to a user branch if no resource leaf exists.</p> * Returns true if a registered user or anonymous user with the specified full JID is
* <p>Matching differs slightly from getRoute() which does matching according * currently logged. When running inside of a cluster a true value will be returned
* to the general matching algorithm described in the class notes. This method * as long as the user is connecte to any cluster node.
* searches using the standard matching rules, and if that does not find a *
* match and the address name component is not null, or empty, searches again * @param jid the full JID of the user.
* with the resource set to null (wild card). This is essentially a convenience * @return true if a registered user or anonymous user with the specified full JID is
* for falling back to the best route to a user node when a specific resource * currently logged.
* is not available.</p> */
* <p>For example, consider we're searching for a route to user@server.com/work. boolean hasClientRoute(JID jid);
* There is no route to that resource but a session is available at
* user@server.com/home. The routing table will contain entries for user@server.com /**
* and user@server.com/home. getBestLocalRoute() will first do a search for * Returns true if an outgoing server session exists to the specified remote server.
* user@server.com/work and not find a match. It will then do another search * The JID can be a full JID or a bare JID since only the domain of the specified
* on user@server.com and find the alias for the session user@server.com/home * address will be used to look up the route.<p>
* (the alias must be maintained by the session manager for the highest priority *
* resource for any given user). In most cases, the caller doesn't care as long * When running inside of a cluster the look up will be done in all the cluster. So
* as they get a legitimate route to the user, so this behavior is 'better' than * as long as a node has a connection to the remote server a true value will be
* the exact matching used in getLocalRoute().</p> * returned.
* <p>However, it is important to note that sometimes you don't want the best route *
* to a node. In the previous example, if the packet is an error packet, it is * @param jid JID that specifies the remote server address.
* probably only relevant to the sending session. If a route to that particular * @return true if an outgoing server session exists to the specified remote server.
* session can't be found, the error should not be sent to another session logged */
* into the account.</p> boolean hasServerRoute(JID jid);
* <p/>
* <p>If a route doesn't exist, the method returns null.</p> /**
* * Returns true if an internal or external component is hosting the specified address.
* @param node The address we want a route to * The JID can be a full JID or a bare JID since only the domain of the specified
* @return The Session corresponding to the route, or null indicating no route exists * address will be used to look up the route.<p>
*
* When running inside of a cluster the look up will be done in all the cluster. So
* as long as a node is hosting the component a true value will be returned.
*
* @param jid JID that specifies the component address.
* @return true if an internal or external component is hosting the specified address.
*/
boolean hasComponentRoute(JID jid);
/**
* Returns the list of routes associated to the specified route address. When asking
* for routes to a remote server then the requested JID will be included as the only
* value of the returned collection. It is indifferent if an outgoing session to the
* specified remote server exists or not.<p>
*
* When asking for routes to client sessions the specified route address could either
* be a full JID of a bare JID. In the case of a full JID, a single element will be
* included in the answer in case the specified full JID exists or an empty collection
* if the full JID does not exist. Moreover, when passing a bare JID a list of full
* JIDs will be returned for each available resource associated to the bare JID. In
* any case, only JIDs of <tt>available</tt> client sessions are returned.<p>
*
* When asking for routes to components a single element will be returned in the answer
* only if an internal or external component is found for the specified route address.
* If no component was found then an empty collection will be returned.
*
* @param route The address we want a route to.
* @return list of routes associated to the specified route address.
*/
List<JID> getRoutes(JID route);
/**
* Returns true if a route of a client session has been successfully removed. When running
* inside of a cluster this message <tt>must</tt> be sent from the cluster node that is
* actually hosting the client session.
*
* @param route the route to remove.
* @return true if a route of a client session has been successfully removed.
*/
boolean removeClientRoute(JID route);
/**
* Returns true if a route to an outoing server has been successfully removed. When running
* inside of a cluster this message <tt>must</tt> be sent from the cluster node that is
* actually holding the physical connectoin to the remote server.
*
* @param route the route to remove.
* @return true if the route was successfully removed.
*/
boolean removeServerRoute(JID route);
/**
* Returns true if a route of a component has been successfully removed. Both internal
* and external components have a route in the table. When running inside of a cluster
* this message <tt>must</tt> be sent from the cluster node that is actually hosting the
* component.
*
* @param route the route to remove.
* @return true if a route of a component has been successfully removed.
*/ */
ChannelHandler getBestRoute(JID node); boolean removeComponentRoute(JID route);
/** /**
* <p>Remove a route from the routing table.</p> * Sets the {@link RemotePacketRouter} to use for deliverying packets to entities hosted
* <p>If a route doesn't exist, the method returns null.</p> * in remote nodes of the cluster.
* *
* @param node The address we want a route to * @param remotePacketRouter the RemotePacketRouter to use for deliverying packets to entities hosted
* @return The destination object previously registered under the given address, or null if none existed * in remote nodes of the cluster.
*/ */
ChannelHandler removeRoute(JID node); void setRemotePacketRouter(RemotePacketRouter remotePacketRouter);
} }
...@@ -743,6 +743,8 @@ public class SessionManager extends BasicModule { ...@@ -743,6 +743,8 @@ public class SessionManager extends BasicModule {
preAuthenticatedSessions.remove(session.getStreamID().toString()); preAuthenticatedSessions.remove(session.getStreamID().toString());
// Increment counter of authenticated sessions // Increment counter of authenticated sessions
userSessionsCounter.incrementAndGet(); userSessionsCounter.incrementAndGet();
// Add session to the routing table (routing table will know session is not available yet)
routingTable.addClientRoute(session.getAddress(), session);
// Fire session created event. // Fire session created event.
SessionEventDispatcher SessionEventDispatcher
.dispatchEvent(session, SessionEventDispatcher.EventType.session_created); .dispatchEvent(session, SessionEventDispatcher.EventType.session_created);
...@@ -760,11 +762,11 @@ public class SessionManager extends BasicModule { ...@@ -760,11 +762,11 @@ public class SessionManager extends BasicModule {
if (anonymousSessions.containsValue(session)) { if (anonymousSessions.containsValue(session)) {
// Anonymous session always have resources so we only need to add one route. That is // Anonymous session always have resources so we only need to add one route. That is
// the route to the anonymous session // the route to the anonymous session
routingTable.addRoute(session.getAddress(), session); routingTable.addClientRoute(session.getAddress(), session);
} }
else { else {
// A non-anonymous session is now available // A non-anonymous session is now available
Session defaultSession; ClientSession defaultSession;
try { try {
SessionMap sessionMap = sessions.get(session.getUsername()); SessionMap sessionMap = sessions.get(session.getUsername());
if (sessionMap == null) { if (sessionMap == null) {
...@@ -776,11 +778,11 @@ public class SessionManager extends BasicModule { ...@@ -776,11 +778,11 @@ public class SessionManager extends BasicModule {
defaultSession = sessionMap.getDefaultSession(true); defaultSession = sessionMap.getDefaultSession(true);
if (defaultSession != null) { if (defaultSession != null) {
// Add route to default session (used when no resource is specified) // Add route to default session (used when no resource is specified)
JID node = new JID(session.getAddress().getNode(), session.getAddress().getDomain(), null); JID node = new JID(session.getAddress().getNode(), session.getAddress().getDomain(), null, true);
routingTable.addRoute(node, defaultSession); routingTable.addClientRoute(node, defaultSession);
} }
// Add route to the new session // Add route to the new session
routingTable.addRoute(session.getAddress(), session); routingTable.addClientRoute(session.getAddress(), session);
// Broadcast presence between the user's resources // Broadcast presence between the user's resources
broadcastPresenceOfOtherResource(session); broadcastPresenceOfOtherResource(session);
} }
...@@ -847,8 +849,8 @@ public class SessionManager extends BasicModule { ...@@ -847,8 +849,8 @@ public class SessionManager extends BasicModule {
public void sessionUnavailable(ClientSession session) { public void sessionUnavailable(ClientSession session) {
if (session.getAddress() != null && routingTable != null && if (session.getAddress() != null && routingTable != null &&
session.getAddress().toBareJID().trim().length() != 0) { session.getAddress().toBareJID().trim().length() != 0) {
// Remove route to the removed session (anonymous or not) // Update route to unavailable session (anonymous or not)
routingTable.removeRoute(session.getAddress()); routingTable.addClientRoute(session.getAddress(), session);
try { try {
if (session.getUsername() == null) { if (session.getUsername() == null) {
// Do nothing since this is an anonymous session // Do nothing since this is an anonymous session
...@@ -858,33 +860,32 @@ public class SessionManager extends BasicModule { ...@@ -858,33 +860,32 @@ public class SessionManager extends BasicModule {
// If sessionMap is null, which is an irregular case, try to clean up the routes to // If sessionMap is null, which is an irregular case, try to clean up the routes to
// the user from the routing table // the user from the routing table
if (sessionMap == null) { if (sessionMap == null) {
JID userJID = new JID(session.getUsername(), serverName, ""); JID userJID = new JID(session.getUsername(), serverName, "", true);
if (routingTable.getRoute(userJID) != null) { if (routingTable.hasClientRoute(userJID)) {
// Remove the route for the session's BARE address // Remove the route for the session's BARE address
routingTable.removeRoute(new JID(session.getAddress().getNode(), routingTable.removeClientRoute(new JID(session.getAddress().getNode(),
session.getAddress().getDomain(), "")); session.getAddress().getDomain(), "", true));
} }
} }
// If all the user sessions are gone then remove the route to the default session // If all the user sessions are gone then remove the route to the default session
else if (sessionMap.getAvailableSessions().isEmpty()) { else if (sessionMap.getAvailableSessions().isEmpty()) {
// Remove the route for the session's BARE address // Remove the route for the session's BARE address
routingTable.removeRoute(new JID(session.getAddress().getNode(), routingTable.removeClientRoute(new JID(session.getAddress().getNode(),
session.getAddress().getDomain(), "")); session.getAddress().getDomain(), "", true));
} }
else { else {
// Update the order of the sessions based on the new presence of this session // Update the order of the sessions based on the new presence of this session
sessionMap.sessionUnavailable(session); sessionMap.sessionUnavailable(session);
// Update the route for the session's BARE address // Update the route for the session's BARE address
Session defaultSession = sessionMap.getDefaultSession(true); ClientSession defaultSession = sessionMap.getDefaultSession(true);
JID jid = JID jid = new JID(session.getAddress().getNode(), session.getAddress().getDomain(), "", true);
new JID(session.getAddress().getNode(), session.getAddress().getDomain(), "");
if (defaultSession != null) { if (defaultSession != null) {
// Set the route to the bare JID to the session with highest priority // Set the route to the bare JID to the session with highest priority
routingTable.addRoute(jid, defaultSession); routingTable.addClientRoute(jid, defaultSession);
} }
else { else {
// All sessions have a negative priority presence so delete the route to the bare JID // All sessions have a negative priority presence so delete the route to the bare JID
routingTable.removeRoute(jid); routingTable.removeClientRoute(jid);
} }
} }
} }
...@@ -905,7 +906,7 @@ public class SessionManager extends BasicModule { ...@@ -905,7 +906,7 @@ public class SessionManager extends BasicModule {
// Do nothing if the session belongs to an anonymous user // Do nothing if the session belongs to an anonymous user
return; return;
} }
Session defaultSession; ClientSession defaultSession;
String username = sender.getNode(); String username = sender.getNode();
SessionMap resources = sessions.get(username); SessionMap resources = sessions.get(username);
if (resources == null) { if (resources == null) {
...@@ -918,12 +919,12 @@ public class SessionManager extends BasicModule { ...@@ -918,12 +919,12 @@ public class SessionManager extends BasicModule {
defaultSession = resources.getDefaultSession(true); defaultSession = resources.getDefaultSession(true);
} }
// Update the route to the bareJID with the session with highest priority // Update the route to the bareJID with the session with highest priority
JID defaultAddress = new JID(sender.getNode(), sender.getDomain(), ""); JID defaultAddress = new JID(sender.getNode(), sender.getDomain(), "", true);
// Update the route to the bare JID // Update the route to the bare JID
if (defaultSession != null) { if (defaultSession != null) {
boolean hadDefault = routingTable.getRoute(defaultAddress) != null; boolean hadDefault = routingTable.hasClientRoute(defaultAddress);
// Set the route to the bare JID to the session with highest priority // Set the route to the bare JID to the session with highest priority
routingTable.addRoute(defaultAddress, defaultSession); routingTable.addClientRoute(defaultAddress, defaultSession);
// Check if we need to deliver offline messages // Check if we need to deliver offline messages
if (!hadDefault) { if (!hadDefault) {
// User sessions had negative presence before this change so deliver messages // User sessions had negative presence before this change so deliver messages
...@@ -939,7 +940,7 @@ public class SessionManager extends BasicModule { ...@@ -939,7 +940,7 @@ public class SessionManager extends BasicModule {
} }
else { else {
// All sessions have a negative priority presence so delete the route to the bare JID // All sessions have a negative priority presence so delete the route to the bare JID
routingTable.removeRoute(defaultAddress); routingTable.removeClientRoute(defaultAddress);
} }
} }
...@@ -995,11 +996,13 @@ public class SessionManager extends BasicModule { ...@@ -995,11 +996,13 @@ public class SessionManager extends BasicModule {
} }
public boolean isAnonymousRoute(String username) { public boolean isAnonymousRoute(String username) {
// TODO Ask the routing table for this. Should we have user and anon caches in routing table? Or some kind of flag?
// JID's node and resource are the same for anonymous sessions // JID's node and resource are the same for anonymous sessions
return anonymousSessions.containsKey(username); return anonymousSessions.containsKey(username);
} }
public boolean isActiveRoute(String username, String resource) { public boolean isActiveRoute(String username, String resource) {
//TODO Check anonymous sessions in RT. Check for not available sessions here. Check for available sessions in RT.
boolean hasRoute = false; boolean hasRoute = false;
// Check if there is an anonymous session // Check if there is an anonymous session
...@@ -1057,6 +1060,8 @@ public class SessionManager extends BasicModule { ...@@ -1057,6 +1060,8 @@ public class SessionManager extends BasicModule {
* @return the <code>Session</code> associated with the JID data. * @return the <code>Session</code> associated with the JID data.
*/ */
public ClientSession getSession(String username, String domain, String resource) { public ClientSession getSession(String username, String domain, String resource) {
//TODO Check available(anonymous or not) sessions in RT. Check for not available(anonymous or not)/preAuthenticated/ sessions here.
//TODO For sessions in SM return real object. RT should return real object for local sessions and surrogates for remote ones. Check usage of returned object
// Return null if the JID's data belongs to a foreign server. If the server is // Return null if the JID's data belongs to a foreign server. If the server is
// shutting down then serverName will be null so answer null too in this case. // shutting down then serverName will be null so answer null too in this case.
if (serverName == null || !serverName.equals(domain)) { if (serverName == null || !serverName.equals(domain)) {
...@@ -1286,10 +1291,6 @@ public class SessionManager extends BasicModule { ...@@ -1286,10 +1291,6 @@ public class SessionManager extends BasicModule {
} }
} }
public Iterator getAnonymousSessions() {
return Collections.unmodifiableCollection(anonymousSessions.values()).iterator();
}
public Collection<ClientSession> getSessions(String username) { public Collection<ClientSession> getSessions(String username) {
List<ClientSession> sessionList = new ArrayList<ClientSession>(); List<ClientSession> sessionList = new ArrayList<ClientSession>();
if (username != null) { if (username != null) {
...@@ -1329,6 +1330,7 @@ public class SessionManager extends BasicModule { ...@@ -1329,6 +1330,7 @@ public class SessionManager extends BasicModule {
* @return number of client sessions that are authenticated with the server using a non-anoymous user. * @return number of client sessions that are authenticated with the server using a non-anoymous user.
*/ */
public int getUserSessionsCount() { public int getUserSessionsCount() {
//TODO Merge getUserSessionsCount() and getAnonymousSessionCount() and move it to routing table
return userSessionsCounter.get(); return userSessionsCounter.get();
} }
...@@ -1349,6 +1351,7 @@ public class SessionManager extends BasicModule { ...@@ -1349,6 +1351,7 @@ public class SessionManager extends BasicModule {
} }
public int getAnonymousSessionCount() { public int getAnonymousSessionCount() {
//TODO Merge getUserSessionsCount() and getAnonymousSessionCount() and move it to routing table
return anonymousSessions.size(); return anonymousSessions.size();
} }
...@@ -1452,6 +1455,7 @@ public class SessionManager extends BasicModule { ...@@ -1452,6 +1455,7 @@ public class SessionManager extends BasicModule {
* @throws UnauthorizedException if not allowed to perform the operation. * @throws UnauthorizedException if not allowed to perform the operation.
*/ */
public void broadcast(Packet packet) throws UnauthorizedException { public void broadcast(Packet packet) throws UnauthorizedException {
// TODO Move responsibility to routing table
for (SessionMap sessionMap : sessions.values()) { for (SessionMap sessionMap : sessions.values()) {
sessionMap.broadcast(packet); sessionMap.broadcast(packet);
} }
...@@ -1490,6 +1494,10 @@ public class SessionManager extends BasicModule { ...@@ -1490,6 +1494,10 @@ public class SessionManager extends BasicModule {
if (session == null || serverName == null) { if (session == null || serverName == null) {
return false; return false;
} }
// Remove route to the removed session (anonymous or not)
routingTable.removeClientRoute(session.getAddress());
boolean auth_removed = false; boolean auth_removed = false;
if (anonymousSessions.remove(session.getAddress().getResource()) != null) { if (anonymousSessions.remove(session.getAddress().getResource()) != null) {
// Fire session event. // Fire session event.
...@@ -1530,7 +1538,7 @@ public class SessionManager extends BasicModule { ...@@ -1530,7 +1538,7 @@ public class SessionManager extends BasicModule {
if (presence.isAvailable()) { if (presence.isAvailable()) {
Presence offline = new Presence(); Presence offline = new Presence();
offline.setFrom(session.getAddress()); offline.setFrom(session.getAddress());
offline.setTo(new JID(null, serverName, null)); offline.setTo(new JID(null, serverName, null, true));
offline.setType(Presence.Type.unavailable); offline.setType(Presence.Type.unavailable);
router.route(offline); router.route(offline);
} }
...@@ -1546,6 +1554,8 @@ public class SessionManager extends BasicModule { ...@@ -1546,6 +1554,8 @@ public class SessionManager extends BasicModule {
anonymousSessions.put(session.getAddress().getResource(), session); anonymousSessions.put(session.getAddress().getResource(), session);
// Remove the session from the pre-Authenticated sessions list // Remove the session from the pre-Authenticated sessions list
preAuthenticatedSessions.remove(session.getAddress().getResource()); preAuthenticatedSessions.remove(session.getAddress().getResource());
// Add session to the routing table (routing table will know session is not available yet)
routingTable.addClientRoute(session.getAddress(), session);
// Fire session event. // Fire session event.
SessionEventDispatcher.dispatchEvent(session, SessionEventDispatcher.dispatchEvent(session,
SessionEventDispatcher.EventType.anonymous_session_created); SessionEventDispatcher.EventType.anonymous_session_created);
...@@ -1656,7 +1666,7 @@ public class SessionManager extends BasicModule { ...@@ -1656,7 +1666,7 @@ public class SessionManager extends BasicModule {
for (String hostname : session.getHostnames()) { for (String hostname : session.getHostnames()) {
unregisterOutgoingServerSession(hostname); unregisterOutgoingServerSession(hostname);
// Remove the route to the session using the hostname // Remove the route to the session using the hostname
XMPPServer.getInstance().getRoutingTable().removeRoute(new JID(hostname)); XMPPServer.getInstance().getRoutingTable().removeServerRoute(new JID(hostname));
} }
} }
} }
......
...@@ -12,11 +12,11 @@ ...@@ -12,11 +12,11 @@
package org.jivesoftware.openfire.component; package org.jivesoftware.openfire.component;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.session.ComponentSession; import org.jivesoftware.openfire.session.ComponentSession;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.xmpp.component.Component; import org.xmpp.component.Component;
import org.xmpp.component.ComponentException; import org.xmpp.component.ComponentException;
import org.xmpp.component.ComponentManager; import org.xmpp.component.ComponentManager;
...@@ -85,7 +85,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -85,7 +85,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
serviceAddress = new JID(null, "component." + serverDomain, null); serviceAddress = new JID(null, "component." + serverDomain, null);
if (!server.isSetupMode()) { if (!server.isSetupMode()) {
// Add a route to this service // Add a route to this service
server.getRoutingTable().addRoute(getAddress(), this); server.getRoutingTable().addComponentRoute(getAddress(), this);
} }
} }
...@@ -93,7 +93,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -93,7 +93,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
super.stop(); super.stop();
if (getAddress() != null) { if (getAddress() != null) {
// Remove the route to this service // Remove the route to this service
XMPPServer.getInstance().getRoutingTable().removeRoute(getAddress()); XMPPServer.getInstance().getRoutingTable().removeComponentRoute(getAddress());
} }
} }
...@@ -111,7 +111,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -111,7 +111,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
JID componentJID = new JID(subdomain + "." + serverDomain); JID componentJID = new JID(subdomain + "." + serverDomain);
// Add the route to the new service provided by the component // Add the route to the new service provided by the component
XMPPServer.getInstance().getRoutingTable().addRoute(componentJID, XMPPServer.getInstance().getRoutingTable().addComponentRoute(componentJID,
new RoutableComponent(componentJID, component)); new RoutableComponent(componentJID, component));
// Initialize the new component // Initialize the new component
...@@ -135,7 +135,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -135,7 +135,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
// Unregister the componet's domain // Unregister the componet's domain
components.remove(subdomain); components.remove(subdomain);
// Remove the route // Remove the route
XMPPServer.getInstance().getRoutingTable().removeRoute(componentJID); XMPPServer.getInstance().getRoutingTable().removeComponentRoute(componentJID);
if (e instanceof ComponentException) { if (e instanceof ComponentException) {
// Rethrow the exception // Rethrow the exception
throw (ComponentException)e; throw (ComponentException)e;
...@@ -156,7 +156,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -156,7 +156,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
// Remove the route for the service provided by the component // Remove the route for the service provided by the component
if (XMPPServer.getInstance().getRoutingTable() != null) { if (XMPPServer.getInstance().getRoutingTable() != null) {
XMPPServer.getInstance().getRoutingTable().removeRoute(componentJID); XMPPServer.getInstance().getRoutingTable().removeComponentRoute(componentJID);
} }
// Remove the disco item from the server for the component that is being removed // Remove the disco item from the server for the component that is being removed
......
...@@ -13,10 +13,6 @@ package org.jivesoftware.openfire.filetransfer.proxy; ...@@ -13,10 +13,6 @@ package org.jivesoftware.openfire.filetransfer.proxy;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.PropertyEventDispatcher;
import org.jivesoftware.util.PropertyEventListener;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
...@@ -26,6 +22,10 @@ import org.jivesoftware.openfire.disco.DiscoServerItem; ...@@ -26,6 +22,10 @@ import org.jivesoftware.openfire.disco.DiscoServerItem;
import org.jivesoftware.openfire.disco.ServerItemsProvider; import org.jivesoftware.openfire.disco.ServerItemsProvider;
import org.jivesoftware.openfire.filetransfer.FileTransferManager; import org.jivesoftware.openfire.filetransfer.FileTransferManager;
import org.jivesoftware.openfire.forms.spi.XDataFormImpl; import org.jivesoftware.openfire.forms.spi.XDataFormImpl;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.PropertyEventDispatcher;
import org.jivesoftware.util.PropertyEventListener;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
...@@ -196,7 +196,7 @@ public class FileTransferProxy extends BasicModule ...@@ -196,7 +196,7 @@ public class FileTransferProxy extends BasicModule
private void startProxy() { private void startProxy() {
connectionManager.processConnections(bindInterface, getProxyPort()); connectionManager.processConnections(bindInterface, getProxyPort());
routingTable.addRoute(getAddress(), this); routingTable.addComponentRoute(getAddress(), this);
XMPPServer server = XMPPServer.getInstance(); XMPPServer server = XMPPServer.getInstance();
server.getIQDiscoItemsHandler().addServerItemsProvider(this); server.getIQDiscoItemsHandler().addServerItemsProvider(this);
...@@ -207,7 +207,7 @@ public class FileTransferProxy extends BasicModule ...@@ -207,7 +207,7 @@ public class FileTransferProxy extends BasicModule
XMPPServer.getInstance().getIQDiscoItemsHandler() XMPPServer.getInstance().getIQDiscoItemsHandler()
.removeComponentItem(getAddress().toString()); .removeComponentItem(getAddress().toString());
routingTable.removeRoute(getAddress()); routingTable.removeComponentRoute(getAddress());
connectionManager.disable(); connectionManager.disable();
} }
......
...@@ -11,8 +11,6 @@ ...@@ -11,8 +11,6 @@
package org.jivesoftware.openfire.handler; package org.jivesoftware.openfire.handler;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.roster.Roster; import org.jivesoftware.openfire.roster.Roster;
...@@ -21,6 +19,8 @@ import org.jivesoftware.openfire.roster.RosterManager; ...@@ -21,6 +19,8 @@ import org.jivesoftware.openfire.roster.RosterManager;
import org.jivesoftware.openfire.user.UserAlreadyExistsException; import org.jivesoftware.openfire.user.UserAlreadyExistsException;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException; import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
...@@ -120,14 +120,12 @@ public class PresenceSubscribeHandler extends BasicModule implements ChannelHand ...@@ -120,14 +120,12 @@ public class PresenceSubscribeHandler extends BasicModule implements ChannelHand
// Do not forward the packet to the recipient if the presence is of type subscribed // Do not forward the packet to the recipient if the presence is of type subscribed
// and the recipient user has not changed its subscription state. // and the recipient user has not changed its subscription state.
if (!(type == Presence.Type.subscribed && recipientRoster != null && if (!(type == Presence.Type.subscribed && recipientRoster != null && !recipientSubChanged)) {
!recipientSubChanged)) {
// If the user is already subscribed to the *local* user's presence then do not // If the user is already subscribed to the *local* user's presence then do not
// forward the subscription request and instead send an auto-reply on behalf // forward the subscription request and instead send an auto-reply on behalf
// of the user // of the user
if (type == Presence.Type.subscribe && recipientRoster != null && if (type == Presence.Type.subscribe && recipientRoster != null && !recipientSubChanged) {
!recipientSubChanged) {
try { try {
RosterItem.SubType subType = recipientRoster.getRosterItem(senderJID) RosterItem.SubType subType = recipientRoster.getRosterItem(senderJID)
.getSubStatus(); .getSubStatus();
...@@ -151,13 +149,13 @@ public class PresenceSubscribeHandler extends BasicModule implements ChannelHand ...@@ -151,13 +149,13 @@ public class PresenceSubscribeHandler extends BasicModule implements ChannelHand
// a module, the module will be able to handle the packet. If the handler is a // a module, the module will be able to handle the packet. If the handler is a
// Session the packet will be routed to the client. If a route cannot be found // Session the packet will be routed to the client. If a route cannot be found
// then the packet will be delivered based on its recipient and sender. // then the packet will be delivered based on its recipient and sender.
List<ChannelHandler> handlers = routingTable.getRoutes(recipientJID); List<JID> jids = routingTable.getRoutes(recipientJID);
if (!handlers.isEmpty()) { if (!jids.isEmpty()) {
for (ChannelHandler handler : handlers) { for (JID jid : jids) {
Presence presenteToSend = presence.createCopy(); Presence presenteToSend = presence.createCopy();
// Stamp the presence with the user's bare JID as the 'from' address // Stamp the presence with the user's bare JID as the 'from' address
presenteToSend.setFrom(senderJID.toBareJID()); presenteToSend.setFrom(senderJID.toBareJID());
handler.process(presenteToSend); routingTable.routePacket(jid, presenteToSend);
} }
} }
else { else {
......
...@@ -26,7 +26,10 @@ import org.jivesoftware.util.LocaleUtils; ...@@ -26,7 +26,10 @@ import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.util.*; import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
...@@ -69,8 +72,9 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -69,8 +72,9 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class PresenceUpdateHandler extends BasicModule implements ChannelHandler { public class PresenceUpdateHandler extends BasicModule implements ChannelHandler {
private Map<String, WeakHashMap<ChannelHandler, Set<String>>> directedPresences; private Map<String, Map<String, Set<String>>> directedPresences;
private RoutingTable routingTable;
private RosterManager rosterManager; private RosterManager rosterManager;
private XMPPServer localServer; private XMPPServer localServer;
private PresenceManager presenceManager; private PresenceManager presenceManager;
...@@ -81,7 +85,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -81,7 +85,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
public PresenceUpdateHandler() { public PresenceUpdateHandler() {
super("Presence update handler"); super("Presence update handler");
directedPresences = new ConcurrentHashMap<String, WeakHashMap<ChannelHandler, Set<String>>>(); directedPresences = new ConcurrentHashMap<String, Map<String, Set<String>>>();
} }
public void process(Packet packet) throws UnauthorizedException, PacketException { public void process(Packet packet) throws UnauthorizedException, PacketException {
...@@ -123,7 +127,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -123,7 +127,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
else { else {
presence = presence.createCopy(); presence = presence.createCopy();
if (session != null) { if (session != null) {
presence.setFrom(new JID(null, session.getServerName(), null)); presence.setFrom(new JID(null, session.getServerName(), null, true));
presence.setTo(session.getAddress()); presence.setTo(session.getAddress());
} }
else { else {
...@@ -155,7 +159,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -155,7 +159,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
Session session = sessionManager.getSession(presence.getFrom()); Session session = sessionManager.getSession(presence.getFrom());
presence = presence.createCopy(); presence = presence.createCopy();
if (session != null) { if (session != null) {
presence.setFrom(new JID(null, session.getServerName(), null)); presence.setFrom(new JID(null, session.getServerName(), null, true));
presence.setTo(session.getAddress()); presence.setTo(session.getAddress());
} }
else { else {
...@@ -308,16 +312,16 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -308,16 +312,16 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
* registry of sent directed presences by the user. * registry of sent directed presences by the user.
* *
* @param update the directed Presence sent by the user to an entity. * @param update the directed Presence sent by the user to an entity.
* @param handler the handler that routed the presence to the entity. * @param handlerJID the JID of the handler that will receive/handle/process the sent packet.
* @param jid the jid that the handler has processed * @param jid the receipient specified in the packet to handle.
*/ */
public void directedPresenceSent(Presence update, ChannelHandler handler, String jid) { public void directedPresenceSent(Presence update, JID handlerJID, String jid) {
if (update.getFrom() == null) { if (update.getFrom() == null) {
return; return;
} }
if (localServer.isLocal(update.getFrom())) { if (localServer.isLocal(update.getFrom())) {
boolean keepTrack = false; boolean keepTrack = false;
WeakHashMap<ChannelHandler, Set<String>> map; Map<String, Set<String>> map;
String name = update.getFrom().getNode(); String name = update.getFrom().getNode();
if (name != null && !"".equals(name)) { if (name != null && !"".equals(name)) {
// Keep track of all directed presences if roster service is disabled // Keep track of all directed presences if roster service is disabled
...@@ -354,55 +358,56 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -354,55 +358,56 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
keepTrack = true; keepTrack = true;
} }
if (keepTrack) { if (keepTrack) {
map = directedPresences.get(update.getFrom().toString()); String sender = update.getFrom().toString();
if (map == null) { map = directedPresences.get(sender);
// We are using a set to avoid duplicate jids in case the user
// sends several directed presences to the same handler. The Map also
// ensures that if the user sends several presences to the same handler
// we will have only one entry in the Map
map = new WeakHashMap<ChannelHandler, Set<String>>();
map.put(handler, new ConcurrentHashSet<String>());
directedPresences.put(update.getFrom().toString(), map);
}
if (Presence.Type.unavailable.equals(update.getType())) { if (Presence.Type.unavailable.equals(update.getType())) {
// It's a directed unavailable presence if (map != null) {
if (handler instanceof ClientSession) { // It's a directed unavailable presence
// Client sessions will receive only presences to the same JID (the if (routingTable.hasClientRoute(handlerJID)) {
// address of the session) so remove the handler from the map // Client sessions will receive only presences to the same JID (the
map.remove(handler); // address of the session) so remove the handler from the map
if (map.isEmpty()) { map.remove(handlerJID.toString());
// Remove the user from the registry since the list of directed if (map.isEmpty()) {
// presences is empty // Remove the user from the registry since the list of directed
directedPresences.remove(update.getFrom().toString()); // presences is empty
directedPresences.remove(sender);
}
} }
} else {
else { // A service may receive presences for many JIDs so in this case we
// A service may receive presences for many JIDs so in this case we // just need to remove the jid that has received a directed
// just need to remove the jid that has received a directed // unavailable presence
// unavailable presence Set<String> jids = map.get(handlerJID.toString());
Set<String> jids = map.get(handler); if (jids != null) {
if (jids != null) { jids.remove(jid);
jids.remove(jid); if (jids.isEmpty()) {
if (jids.isEmpty()) { map.remove(handlerJID.toString());
map.remove(handler); if (map.isEmpty()) {
if (map.isEmpty()) { // Remove the user from the registry since the list of directed
// Remove the user from the registry since the list of directed // presences is empty
// presences is empty directedPresences.remove(sender);
directedPresences.remove(update.getFrom().toString()); }
} }
} }
} }
} }
} }
else { else {
if (map == null) {
// We are using a set to avoid duplicate jids in case the user
// sends several directed presences to the same handler. The Map also
// ensures that if the user sends several presences to the same handler
// we will have only one entry in the Map
map = new ConcurrentHashMap<String, Set<String>>();
directedPresences.put(sender, map);
}
// Add the handler to the list of handler that processed the directed // Add the handler to the list of handler that processed the directed
// presence sent by the user. This handler will be used to send // presence sent by the user. This handler will be used to send
// the unavailable presence when the user goes offline // the unavailable presence when the user goes offline
if (map.get(handler) == null) { if (map.get(handlerJID.toString()) == null) {
map.put(handler, new ConcurrentHashSet<String>()); map.put(handlerJID.toString(), new ConcurrentHashSet<String>());
} }
map.get(handler).add(jid); map.get(handlerJID.toString()).add(jid);
} }
} }
} }
...@@ -420,19 +425,20 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -420,19 +425,20 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
} }
if (localServer.isLocal(update.getFrom())) { if (localServer.isLocal(update.getFrom())) {
// Remove the registry of directed presences of this user // Remove the registry of directed presences of this user
Map<ChannelHandler, Set<String>> map = directedPresences.remove(update.getFrom().toString()); Map<String, Set<String>> map = directedPresences.remove(update.getFrom().toString());
if (map != null) { if (map != null) {
// Iterate over all the entities that the user sent a directed presence // Iterate over all the entities that the user sent a directed presence
for (ChannelHandler handler : new HashSet<ChannelHandler>(map.keySet())) { for (String handler : new HashSet<String>(map.keySet())) {
JID handlerJID = new JID(handler);
Set<String> jids = map.get(handler); Set<String> jids = map.get(handler);
if (jids == null) { if (jids == null) {
continue; continue;
} }
for (String jid : jids) { for (String jid : jids) {
Presence presence = update.createCopy(); Presence presence = update.createCopy();
presence.setTo(new JID(jid)); presence.setTo(jid);
try { try {
handler.process(presence); routingTable.routePacket(handlerJID, presence);
} }
catch (UnauthorizedException ue) { catch (UnauthorizedException ue) {
Log.error(ue); Log.error(ue);
...@@ -444,8 +450,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -444,8 +450,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
} }
public boolean hasDirectPresence(Session session, JID recipientJID) { public boolean hasDirectPresence(Session session, JID recipientJID) {
Map<ChannelHandler, Set<String>> map = Map<String, Set<String>> map = directedPresences.get(session.getAddress().toString());
directedPresences.get(session.getAddress().toString());
if (map != null) { if (map != null) {
String recipient = recipientJID.toBareJID(); String recipient = recipientJID.toBareJID();
for (Set<String> fullJIDs : map.values()) { for (Set<String> fullJIDs : map.values()) {
...@@ -468,6 +473,8 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -468,6 +473,8 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
messageStore = server.getOfflineMessageStore(); messageStore = server.getOfflineMessageStore();
sessionManager = server.getSessionManager(); sessionManager = server.getSessionManager();
userManager = server.getUserManager(); userManager = server.getUserManager();
routingTable = server.getRoutingTable();
// TODO Add as route listener (to remove direct presences info for removed routes)
} }
} }
...@@ -13,8 +13,6 @@ package org.jivesoftware.openfire.mediaproxy; ...@@ -13,8 +13,6 @@ package org.jivesoftware.openfire.mediaproxy;
import org.dom4j.Attribute; import org.dom4j.Attribute;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
...@@ -23,14 +21,16 @@ import org.jivesoftware.openfire.disco.DiscoItemsProvider; ...@@ -23,14 +21,16 @@ import org.jivesoftware.openfire.disco.DiscoItemsProvider;
import org.jivesoftware.openfire.disco.DiscoServerItem; import org.jivesoftware.openfire.disco.DiscoServerItem;
import org.jivesoftware.openfire.disco.ServerItemsProvider; import org.jivesoftware.openfire.disco.ServerItemsProvider;
import org.jivesoftware.openfire.forms.spi.XDataFormImpl; import org.jivesoftware.openfire.forms.spi.XDataFormImpl;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
import java.util.*;
import java.net.UnknownHostException;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.*;
/** /**
* A proxy service for UDP traffic such as RTP. It provides Jingle transport candidates * A proxy service for UDP traffic such as RTP. It provides Jingle transport candidates
...@@ -94,7 +94,7 @@ public class MediaProxyService extends BasicModule ...@@ -94,7 +94,7 @@ public class MediaProxyService extends BasicModule
} catch (SocketException e) { } catch (SocketException e) {
} }
routingTable.addRoute(getAddress(), this); routingTable.addComponentRoute(getAddress(), this);
XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this); XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this);
} else { } else {
if (echo != null) echo.cancel(); if (echo != null) echo.cancel();
...@@ -106,7 +106,7 @@ public class MediaProxyService extends BasicModule ...@@ -106,7 +106,7 @@ public class MediaProxyService extends BasicModule
super.stop(); super.stop();
mediaProxy.stopProxy(); mediaProxy.stopProxy();
XMPPServer.getInstance().getIQDiscoItemsHandler().removeComponentItem(getAddress().toString()); XMPPServer.getInstance().getIQDiscoItemsHandler().removeComponentItem(getAddress().toString());
routingTable.removeRoute(getAddress()); routingTable.removeComponentRoute(getAddress());
if (echo != null) echo.cancel(); if (echo != null) echo.cancel();
} }
......
...@@ -14,12 +14,12 @@ package org.jivesoftware.openfire.muc.spi; ...@@ -14,12 +14,12 @@ package org.jivesoftware.openfire.muc.spi;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.QName; import org.dom4j.QName;
import org.jivesoftware.util.ElementUtil;
import org.jivesoftware.openfire.PacketRouter; import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.muc.*; import org.jivesoftware.openfire.muc.*;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.ElementUtil;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence; import org.xmpp.packet.Presence;
...@@ -117,6 +117,7 @@ public class MUCRoleImpl implements MUCRole { ...@@ -117,6 +117,7 @@ public class MUCRoleImpl implements MUCRole {
this.role = role; this.role = role;
this.affiliation = affiliation; this.affiliation = affiliation;
// Cache the user's session (will only work for local users) // Cache the user's session (will only work for local users)
//TODO Probably remove this instance variable that was added for optimization
this.session = XMPPServer.getInstance().getSessionManager().getSession(presence.getFrom()); this.session = XMPPServer.getInstance().getSessionManager().getSession(presence.getFrom());
extendedInformation = extendedInformation =
......
...@@ -13,7 +13,6 @@ package org.jivesoftware.openfire.muc.spi; ...@@ -13,7 +13,6 @@ package org.jivesoftware.openfire.muc.spi;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.*;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
...@@ -29,14 +28,15 @@ import org.jivesoftware.openfire.muc.*; ...@@ -29,14 +28,15 @@ import org.jivesoftware.openfire.muc.*;
import org.jivesoftware.openfire.stats.Statistic; import org.jivesoftware.openfire.stats.Statistic;
import org.jivesoftware.openfire.stats.StatisticsManager; import org.jivesoftware.openfire.stats.StatisticsManager;
import org.jivesoftware.openfire.user.UserNotFoundException; import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.*;
import org.xmpp.component.ComponentManager; import org.xmpp.component.ComponentManager;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
...@@ -295,7 +295,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -295,7 +295,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
} }
public JID getAddress() { public JID getAddress() {
return new JID(null, getServiceDomain(), null); return new JID(null, getServiceDomain(), null, true);
} }
/** /**
...@@ -801,7 +801,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -801,7 +801,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
public void start() { public void start() {
super.start(); super.start();
// Add the route to this service // Add the route to this service
routingTable.addRoute(getAddress(), this); routingTable.addComponentRoute(getAddress(), this);
ArrayList<String> params = new ArrayList<String>(); ArrayList<String> params = new ArrayList<String>();
params.clear(); params.clear();
params.add(getServiceDomain()); params.add(getServiceDomain());
...@@ -822,7 +822,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -822,7 +822,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
public void stop() { public void stop() {
super.stop(); super.stop();
// Remove the route to this service // Remove the route to this service
routingTable.removeRoute(getAddress()); routingTable.removeComponentRoute(getAddress());
timer.cancel(); timer.cancel();
logAllConversation(); logAllConversation();
// Remove the statistics. // Remove the statistics.
......
...@@ -75,7 +75,7 @@ public class Route extends Packet { ...@@ -75,7 +75,7 @@ public class Route extends Packet {
} }
/** /**
* Sets the wrapped stanza by this Route packet. Route packets may have a single child * Sets the wrapped stanza by this Route packet. ClientRoute packets may have a single child
* element. This is a convenience method to avoid manipulating this underlying packet's * element. This is a convenience method to avoid manipulating this underlying packet's
* Element instance directly. * Element instance directly.
* *
......
...@@ -11,12 +11,11 @@ ...@@ -11,12 +11,11 @@
package org.jivesoftware.openfire.net; package org.jivesoftware.openfire.net;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
...@@ -34,6 +33,9 @@ public class SocketPacketWriteHandler implements ChannelHandler { ...@@ -34,6 +33,9 @@ public class SocketPacketWriteHandler implements ChannelHandler {
private SessionManager sessionManager; private SessionManager sessionManager;
private OfflineMessageStrategy messageStrategy; private OfflineMessageStrategy messageStrategy;
private RoutingTable routingTable; private RoutingTable routingTable;
private IQRouter iqRouter;
private MessageRouter messageRouter;
private PresenceRouter presenceRouter;
public SocketPacketWriteHandler(SessionManager sessionManager, RoutingTable routingTable, public SocketPacketWriteHandler(SessionManager sessionManager, RoutingTable routingTable,
OfflineMessageStrategy messageStrategy) { OfflineMessageStrategy messageStrategy) {
...@@ -41,6 +43,9 @@ public class SocketPacketWriteHandler implements ChannelHandler { ...@@ -41,6 +43,9 @@ public class SocketPacketWriteHandler implements ChannelHandler {
this.messageStrategy = messageStrategy; this.messageStrategy = messageStrategy;
this.routingTable = routingTable; this.routingTable = routingTable;
this.server = XMPPServer.getInstance(); this.server = XMPPServer.getInstance();
iqRouter = server.getIQRouter();
messageRouter = server.getMessageRouter();
presenceRouter = server.getPresenceRouter();
} }
public void process(Packet packet) throws UnauthorizedException, PacketException { public void process(Packet packet) throws UnauthorizedException, PacketException {
...@@ -48,29 +53,12 @@ public class SocketPacketWriteHandler implements ChannelHandler { ...@@ -48,29 +53,12 @@ public class SocketPacketWriteHandler implements ChannelHandler {
JID recipient = packet.getTo(); JID recipient = packet.getTo();
// Check if the target domain belongs to a remote server or a component // Check if the target domain belongs to a remote server or a component
if (server.matchesComponent(recipient) || server.isRemote(recipient)) { if (server.matchesComponent(recipient) || server.isRemote(recipient)) {
// Locate the route to the remote server or component and ask it routingTable.routePacket(recipient, packet);
// to process the packet
ChannelHandler route = routingTable.getRoute(recipient);
if (route != null) {
route.process(packet);
}
else {
// No root was found so either drop or store the packet
handleUnprocessedPacket(packet);
}
return;
} }
// The target domain belongs to the local server // The target domain belongs to the local server
if (recipient == null || (recipient.getNode() == null && recipient.getResource() == null)) { if (recipient == null || (recipient.getNode() == null && recipient.getResource() == null)) {
// no TO was found so send back the packet to the sender // no TO was found so send back the packet to the sender
ClientSession senderSession = sessionManager.getSession(packet.getFrom()); routingTable.routePacket(packet.getFrom(), packet);
if (senderSession != null) {
senderSession.process(packet);
}
else {
// The sender is no longer available so drop the packet
dropPacket(packet);
}
} }
else { else {
Session session = sessionManager.getBestRoute(recipient); Session session = sessionManager.getBestRoute(recipient);
...@@ -94,24 +82,13 @@ public class SocketPacketWriteHandler implements ChannelHandler { ...@@ -94,24 +82,13 @@ public class SocketPacketWriteHandler implements ChannelHandler {
private void handleUnprocessedPacket(Packet packet) { private void handleUnprocessedPacket(Packet packet) {
if (packet instanceof Message) { if (packet instanceof Message) {
messageStrategy.storeOffline((Message)packet); messageRouter.routingFailed(packet);
} }
else if (packet instanceof Presence) { else if (packet instanceof Presence) {
// presence packets are dropped silently presenceRouter.routingFailed(packet);
//dropPacket(packet);
} }
else { else {
// IQ packets are logged but dropped iqRouter.routingFailed(packet);
dropPacket(packet);
} }
} }
/**
* Drop the packet.
*
* @param packet The packet being dropped
*/
private void dropPacket(Packet packet) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.routing") + "\n" + packet.toString());
}
} }
...@@ -13,16 +13,14 @@ package org.jivesoftware.openfire.net; ...@@ -13,16 +13,14 @@ package org.jivesoftware.openfire.net;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader; import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.PacketRouter; import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable; import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory; import org.xmlpull.v1.XmlPullParserFactory;
...@@ -425,13 +423,7 @@ public abstract class SocketReader implements Runnable { ...@@ -425,13 +423,7 @@ public abstract class SocketReader implements Runnable {
return false; return false;
} }
// Check if the host matches a subdomain of this host // Check if the host matches a subdomain of this host
RoutableChannelHandler route = routingTable.getRoute(new JID(host)); return !routingTable.hasComponentRoute(new JID(host));
if (route == null || route instanceof OutgoingSessionPromise) {
return true;
}
else {
return false;
}
} }
/** /**
......
...@@ -13,10 +13,6 @@ package org.jivesoftware.openfire.pubsub; ...@@ -13,10 +13,6 @@ package org.jivesoftware.openfire.pubsub;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.openfire.PacketRouter; import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.RoutableChannelHandler; import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable; import org.jivesoftware.openfire.RoutingTable;
...@@ -32,6 +28,10 @@ import org.jivesoftware.openfire.forms.spi.XDataFormImpl; ...@@ -32,6 +28,10 @@ import org.jivesoftware.openfire.forms.spi.XDataFormImpl;
import org.jivesoftware.openfire.forms.spi.XFormFieldImpl; import org.jivesoftware.openfire.forms.spi.XFormFieldImpl;
import org.jivesoftware.openfire.pubsub.models.AccessModel; import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel; import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.util.*; import java.util.*;
...@@ -391,7 +391,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -391,7 +391,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
public void start() { public void start() {
super.start(); super.start();
// Add the route to this service // Add the route to this service
routingTable.addRoute(getAddress(), this); routingTable.addComponentRoute(getAddress(), this);
// Start the pubsub engine // Start the pubsub engine
engine.start(); engine.start();
ArrayList<String> params = new ArrayList<String>(); ArrayList<String> params = new ArrayList<String>();
...@@ -403,7 +403,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -403,7 +403,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
public void stop() { public void stop() {
super.stop(); super.stop();
// Remove the route to this service // Remove the route to this service
routingTable.removeRoute(getAddress()); routingTable.removeComponentRoute(getAddress());
// Stop the pubsub engine. This will gives us the chance to // Stop the pubsub engine. This will gives us the chance to
// save queued items to the database. // save queued items to the database.
engine.shutdown(); engine.shutdown();
......
...@@ -576,17 +576,16 @@ public class Roster implements Cacheable, Externalizable { ...@@ -576,17 +576,16 @@ public class Roster implements Cacheable, Externalizable {
} }
// Broadcast presence to subscribed entities // Broadcast presence to subscribed entities
for (RosterItem item : rosterItems.values()) { for (RosterItem item : rosterItems.values()) {
if (item.getSubStatus() == RosterItem.SUB_BOTH if (item.getSubStatus() == RosterItem.SUB_BOTH || item.getSubStatus() == RosterItem.SUB_FROM) {
|| item.getSubStatus() == RosterItem.SUB_FROM) {
packet.setTo(item.getJid()); packet.setTo(item.getJid());
if (list != null && list.shouldBlockPacket(packet)) { if (list != null && list.shouldBlockPacket(packet)) {
// Outgoing presence notifications are blocked for this contact // Outgoing presence notifications are blocked for this contact
continue; continue;
} }
JID searchNode = new JID(item.getJid().getNode(), item.getJid().getDomain(), null); JID searchNode = new JID(item.getJid().getNode(), item.getJid().getDomain(), null, true);
for (ChannelHandler session : routingTable.getRoutes(searchNode)) { for (JID jid : routingTable.getRoutes(searchNode)) {
try { try {
session.process(packet); routingTable.routePacket(jid, packet);
} }
catch (Exception e) { catch (Exception e) {
// Theoretically only happens if session has been closed. // Theoretically only happens if session has been closed.
...@@ -602,9 +601,9 @@ public class Roster implements Cacheable, Externalizable { ...@@ -602,9 +601,9 @@ public class Roster implements Cacheable, Externalizable {
// Outgoing presence notifications are blocked for this contact // Outgoing presence notifications are blocked for this contact
continue; continue;
} }
for (ChannelHandler session : routingTable.getRoutes(new JID(contact))) { for (JID jid: routingTable.getRoutes(new JID(contact))) {
try { try {
session.process(packet); routingTable.routePacket(jid, packet);
} }
catch (Exception e) { catch (Exception e) {
// Theoretically only happens if session has been closed. // Theoretically only happens if session has been closed.
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
package org.jivesoftware.openfire.roster; package org.jivesoftware.openfire.roster;
import org.jivesoftware.openfire.ChannelHandler;
import org.jivesoftware.openfire.RoutingTable; import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SharedGroupException; import org.jivesoftware.openfire.SharedGroupException;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
...@@ -719,10 +718,7 @@ public class RosterManager extends BasicModule implements GroupEventListener, Us ...@@ -719,10 +718,7 @@ public class RosterManager extends BasicModule implements GroupEventListener, Us
presence.setType(Presence.Type.unsubscribe); presence.setType(Presence.Type.unsubscribe);
} }
try { try {
ChannelHandler handler = routingTable.getRoute(recipient); routingTable.routePacket(recipient, presence);
if (handler != null) {
handler.process(presence);
}
} }
catch (UnauthorizedException e) { catch (UnauthorizedException e) {
// Do nothing // Do nothing
...@@ -959,4 +955,4 @@ public class RosterManager extends BasicModule implements GroupEventListener, Us ...@@ -959,4 +955,4 @@ public class RosterManager extends BasicModule implements GroupEventListener, Us
// Remove this module as a listener of group events // Remove this module as a listener of group events
GroupEventDispatcher.removeListener(this); GroupEventDispatcher.removeListener(this);
} }
} }
\ No newline at end of file
...@@ -11,14 +11,13 @@ ...@@ -11,14 +11,13 @@
package org.jivesoftware.openfire.server; package org.jivesoftware.openfire.server;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.openfire.ChannelHandler;
import org.jivesoftware.openfire.RoutableChannelHandler; import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable; import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.OutgoingServerSession; import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.util.HashMap; import java.util.HashMap;
...@@ -200,13 +199,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -200,13 +199,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
.authenticateDomain(packet.getFrom().getDomain(), packet.getTo().getDomain()); .authenticateDomain(packet.getFrom().getDomain(), packet.getTo().getDomain());
if (created) { if (created) {
// A connection to the remote server was created so get the route and send the packet // A connection to the remote server was created so get the route and send the packet
ChannelHandler route = routingTable.getRoute(packet.getTo()); routingTable.routePacket(packet.getTo(), packet);
if (route != null) {
route.process(packet);
}
else {
throw new Exception("Failed to create connection to remote server");
}
} }
else { else {
throw new Exception("Failed to create connection to remote server"); throw new Exception("Failed to create connection to remote server");
...@@ -234,10 +227,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -234,10 +227,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
reply.setFrom(to); reply.setFrom(to);
reply.setChildElement(((IQ) packet).getChildElement().createCopy()); reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setError(PacketError.Condition.remote_server_not_found); reply.setError(PacketError.Condition.remote_server_not_found);
ChannelHandler route = routingTable.getRoute(reply.getTo()); routingTable.routePacket(reply.getTo(), reply);
if (route != null) {
route.process(reply);
}
} }
else if (packet instanceof Presence) { else if (packet instanceof Presence) {
Presence reply = new Presence(); Presence reply = new Presence();
...@@ -245,10 +235,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -245,10 +235,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
reply.setTo(from); reply.setTo(from);
reply.setFrom(to); reply.setFrom(to);
reply.setError(PacketError.Condition.remote_server_not_found); reply.setError(PacketError.Condition.remote_server_not_found);
ChannelHandler route = routingTable.getRoute(reply.getTo()); routingTable.routePacket(reply.getTo(), reply);
if (route != null) {
route.process(reply);
}
} }
else if (packet instanceof Message) { else if (packet instanceof Message) {
Message reply = new Message(); Message reply = new Message();
...@@ -258,10 +245,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -258,10 +245,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
reply.setType(((Message)packet).getType()); reply.setType(((Message)packet).getType());
reply.setThread(((Message)packet).getThread()); reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found); reply.setError(PacketError.Condition.remote_server_not_found);
ChannelHandler route = routingTable.getRoute(reply.getTo()); routingTable.routePacket(reply.getTo(), reply);
if (route != null) {
route.process(reply);
}
} }
} }
catch (UnauthorizedException e) { catch (UnauthorizedException e) {
......
...@@ -14,9 +14,6 @@ package org.jivesoftware.openfire.server; ...@@ -14,9 +14,6 @@ package org.jivesoftware.openfire.server;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader; import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.AuthFactory; import org.jivesoftware.openfire.auth.AuthFactory;
import org.jivesoftware.openfire.net.DNSUtil; import org.jivesoftware.openfire.net.DNSUtil;
...@@ -26,6 +23,9 @@ import org.jivesoftware.openfire.net.SocketConnection; ...@@ -26,6 +23,9 @@ import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.session.IncomingServerSession; import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.OutgoingServerSession; import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory; import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory; import org.xmlpull.v1.XmlPullParserFactory;
...@@ -496,13 +496,7 @@ public class ServerDialback { ...@@ -496,13 +496,7 @@ public class ServerDialback {
// trick is useful when subdomains of this server are registered in the DNS so remote // trick is useful when subdomains of this server are registered in the DNS so remote
// servers may establish connections directly to a subdomain of this server // servers may establish connections directly to a subdomain of this server
if (host_unknown && recipient.contains(serverName)) { if (host_unknown && recipient.contains(serverName)) {
RoutableChannelHandler route = routingTable.getRoute(new JID(recipient)); host_unknown = !routingTable.hasComponentRoute(new JID(recipient));
if (route == null || route instanceof OutgoingSessionPromise) {
host_unknown = true;
}
else {
host_unknown = false;
}
} }
return host_unknown; return host_unknown;
} }
......
...@@ -11,262 +11,361 @@ ...@@ -11,262 +11,361 @@
package org.jivesoftware.openfire.spi; package org.jivesoftware.openfire.spi;
import org.jivesoftware.util.Log; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.ChannelHandler; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.server.OutgoingSessionPromise; import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.xmpp.packet.JID; import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.*;
import java.util.*; import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
/** /**
* <p>Uses simple Maps for table storage.</p> * Routing table that stores routes to client sessions, outgoing server sessions
* <p>Leaves in the tree are indicated by a PacketHandler, while branches are stored in Maps. * and components. As soon as a user authenticates with the server its client session
* Traverse the tree according to an XMPPAddress' fields (host -> name -> resource) and when you * will be added to the routing table. Whenever the client session becomes available
* hit a PacketHandler, you have found the handler for that node and all sub-nodes. </p> * or unavailable the routing table will be updated too.<p>
*
* When running inside of a cluster the routing table will also keep references to routes
* hosted in other cluster nodes. A {@link RemotePacketRouter} will be use to route packets
* to routes hosted in other cluster nodes.<p>
* *
* @author Iain Shigeoka * Failure to route a packet will end up sending {@link IQRouter#routingFailed(org.xmpp.packet.Packet)},
* {@link MessageRouter#routingFailed(org.xmpp.packet.Packet)} or
* {@link PresenceRouter#routingFailed(org.xmpp.packet.Packet)} depending on the packet type
* that tried to be sent.
*
* @author Gaston Dombiak
*/ */
public class RoutingTableImpl extends BasicModule implements RoutingTable { public class RoutingTableImpl extends BasicModule implements RoutingTable {
/** /**
* We need a three level tree built of hashtables: host -> name -> resource * Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
*/
private Cache<String, byte[]> serversCache;
/**
* Cache (unlimited, never expire) that holds sessions of external components connected to the server.
*/ */
private Map routes = new ConcurrentHashMap(); private Cache<String, byte[]> componentsCache;
/**
* Cache (unlimited, never expire) that holds sessions of user that have authenticated with the server.
*/
private Cache<String, ClientRoute> usersCache;
/**
* Cache (unlimited, never expire) that holds sessions of anoymous user that have authenticated with the server.
*/
private Cache<String, ClientRoute> anonymousUsersCache;
/**
* Cache (unlimited, never expire) that holds list of connected resources of authenticated users
* (includes anonymous). Key: bare jid, Value: List of full JIDs.
*/
private Cache<String, List<String>> usersSessions;
private String serverName; private String serverName;
private InternalComponentManager componentManager; private XMPPServer server;
private LocalRoutingTable localRoutingTable;
private RemotePacketRouter remotePacketRouter;
private IQRouter iqRouter;
private MessageRouter messageRouter;
private PresenceRouter presenceRouter;
public RoutingTableImpl() { public RoutingTableImpl() {
super("Routing table"); super("Routing table");
serversCache = CacheFactory.createCache("Routing Servers Cache");
componentsCache = CacheFactory.createCache("Routing Components Cache");
usersCache = CacheFactory.createCache("Routing Users Cache");
anonymousUsersCache = CacheFactory.createCache("Routing AnonymousUsers Cache");
usersSessions = CacheFactory.createCache("Routing User Sessions");
localRoutingTable = new LocalRoutingTable();
} }
public void addRoute(JID node, RoutableChannelHandler destination) { public void addServerRoute(JID route, RoutableChannelHandler destination) {
String address = destination.getAddress().getDomain();
localRoutingTable.addRoute(address, destination);
serversCache.put(address, server.getNodeID());
}
String nodeJID = node.getNode() == null ? "" : node.getNode(); public void addComponentRoute(JID route, RoutableChannelHandler destination) {
String resourceJID = node.getResource() == null ? "" : node.getResource(); String address = destination.getAddress().getDomain();
localRoutingTable.addRoute(address, destination);
componentsCache.put(address, server.getNodeID());
}
if (destination instanceof ClientSession) { public void addClientRoute(JID route, ClientSession destination) {
Object nameRoutes = routes.get(node.getDomain()); String address = destination.getAddress().toString();
if (nameRoutes == null) { boolean available = destination.getPresence().isAvailable();
// No route to the requested domain. Create a new entry in the table localRoutingTable.addRoute(address, destination);
synchronized (node.getDomain().intern()) { if (destination.getAuthToken().isAnonymous()) {
// Check again if a route exists now that we have a lock anonymousUsersCache.put(address, new ClientRoute(server.getNodeID(), available));
nameRoutes = routes.get(node.getDomain()); // Add the session to the list of user sessions
if (nameRoutes == null) { if (route.getResource() != null && !available) {
// Still nothing so create a new entry in the map for domain Lock lock = LockManager.getLock(route.toBareJID());
nameRoutes = new ConcurrentHashMap(); try {
routes.put(node.getDomain(), nameRoutes); lock.lock();
} usersSessions.put(route.toBareJID(), Arrays.asList(route.toString()));
} }
} finally {
// Check if there is something associated with the node of the JID lock.unlock();
Object resourceRoutes = ((Map) nameRoutes).get(nodeJID);
if (resourceRoutes == null) {
// Nothing was found so create a new entry for this node (a.k.a. user)
synchronized (nodeJID.intern()) {
resourceRoutes = ((Map) nameRoutes).get(nodeJID);
if (resourceRoutes == null) {
resourceRoutes = new ConcurrentHashMap();
((Map) nameRoutes).put(nodeJID, resourceRoutes);
}
} }
} }
// Add the connected resource to the node's Map
((Map) resourceRoutes).put(resourceJID, destination);
} }
else { else {
routes.put(node.getDomain(), destination); usersCache.put(address, new ClientRoute(server.getNodeID(), available));
// Add the session to the list of user sessions
if (route.getResource() != null && !available) {
Lock lock = LockManager.getLock(route.toBareJID());
try {
lock.lock();
List<String> jids = usersSessions.get(route.toBareJID());
if (jids == null) {
jids = new ArrayList<String>();
}
jids.add(route.toString());
usersSessions.put(route.toBareJID(), jids);
}
finally {
lock.unlock();
}
}
} }
} }
public RoutableChannelHandler getRoute(JID node) { public void routePacket(JID jid, Packet packet) throws UnauthorizedException, PacketException {
if (node == null) { boolean routed = false;
return null; JID address = packet.getTo();
if (address == null) {
throw new PacketException("To address cannot be null.");
} }
return getRoute(node.toString(), node.getNode() == null ? "" : node.getNode(),
node.getDomain(), node.getResource() == null ? "" : node.getResource());
}
private RoutableChannelHandler getRoute(String jid, String node, String domain,
String resource) {
RoutableChannelHandler route = null;
// Check if the address belongs to a remote server if (serverName.equals(jid.getDomain())) {
if (!serverName.equals(domain) && routes.get(domain) == null && boolean onlyAvailable = true;
componentManager.getComponent(domain) == null) { if (packet instanceof IQ) {
// Return a promise of a remote session. This object will queue packets pending onlyAvailable = packet.getFrom() != null;
// to be sent to remote servers }
return OutgoingSessionPromise.getInstance(); else if (packet instanceof Message) {
} onlyAvailable = true;
}
else if (packet instanceof Presence) {
onlyAvailable = true;
}
try { // Packet sent to local user
Object nameRoutes = routes.get(domain); ClientRoute clientRoute = usersCache.get(jid.toString());
if (nameRoutes instanceof ChannelHandler) { if (clientRoute == null) {
route = (RoutableChannelHandler) nameRoutes; clientRoute = anonymousUsersCache.get(jid.toString());
} }
else if (nameRoutes != null) { if (clientRoute != null) {
Object resourceRoutes = ((Map) nameRoutes).get(node); if (onlyAvailable && !clientRoute.isAvailable()) {
if (resourceRoutes instanceof ChannelHandler) { // Packet should only be sent to available sessions and the route is not available
route = (RoutableChannelHandler) resourceRoutes; routed = false;
} }
else if (resourceRoutes != null) { else {
route = (RoutableChannelHandler) ((Map) resourceRoutes).get(resource); if (clientRoute.getNodeID() == server.getNodeID()) {
// This is a route to a local user hosted in this node
localRoutingTable.getRoute(jid.toString()).process(packet);
routed = true;
}
else {
// This is a route to a local user hosted in other node
if (remotePacketRouter != null) {
routed = remotePacketRouter.routePacket(clientRoute.getNodeID(), jid, packet);
}
}
}
}
}
else if (jid.getDomain().contains(serverName)) {
// Packet sent to component hosted in this server
byte[] nodeID = componentsCache.get(jid.getDomain());
if (nodeID != null) {
if (nodeID == server.getNodeID()) {
// This is a route to a local component hosted in this node
localRoutingTable.getRoute(jid.getDomain()).process(packet);
routed = true;
}
else {
// This is a route to a local component hosted in other node
if (remotePacketRouter != null) {
routed = remotePacketRouter.routePacket(nodeID, jid, packet);
}
}
}
}
else {
// Packet sent to remote server
byte[] nodeID = serversCache.get(jid.getDomain());
if (nodeID != null) {
if (nodeID == server.getNodeID()) {
// This is a route to a remote server connected from this node
localRoutingTable.getRoute(jid.getDomain()).process(packet);
routed = true;
} }
else { else {
route = null; // This is a route to a remote server connected from other node
if (remotePacketRouter != null) {
routed = remotePacketRouter.routePacket(nodeID, jid, packet);
}
} }
} }
else {
// Return a promise of a remote session. This object will queue packets pending
// to be sent to remote servers
// TODO Make sure that creating outgoing connections is thread-safe across cluster nodes
OutgoingSessionPromise.getInstance().process(packet);
routed = true;
}
} }
catch (Exception e) {
if (!routed) {
if (Log.isDebugEnabled()) { if (Log.isDebugEnabled()) {
Log.debug("Route not found for JID: " + jid, e); Log.debug("Failed to route packet to JID: " + jid + " packet: " + packet);
}
if (packet instanceof IQ) {
iqRouter.routingFailed(packet);
}
else if (packet instanceof Message) {
messageRouter.routingFailed(packet);
}
else if (packet instanceof Presence) {
presenceRouter.routingFailed(packet);
} }
} }
}
return route; public boolean hasClientRoute(JID jid) {
return usersCache.get(jid.toString()) != null || anonymousUsersCache.get(jid.toString()) != null;
} }
public List<ChannelHandler> getRoutes(JID node) { public boolean hasServerRoute(JID jid) {
// Check if the address belongs to a remote server return serversCache.get(jid.getDomain()) != null;
if (!serverName.equals(node.getDomain()) && routes.get(node.getDomain()) == null && }
componentManager.getComponent(node) == null) {
// Return a promise of a remote session. This object will queue packets pending
// to be sent to remote servers
List<ChannelHandler> list = new ArrayList<ChannelHandler>();
list.add(OutgoingSessionPromise.getInstance());
return list;
}
LinkedList list = null; public boolean hasComponentRoute(JID jid) {
Object nameRoutes = routes.get(node.getDomain()); return componentsCache.get(jid.getDomain()) != null;
if (nameRoutes != null) { }
if (nameRoutes instanceof ChannelHandler) {
list = new LinkedList(); public List<JID> getRoutes(JID route) {
list.add(nameRoutes); // TODO Refactor API to be able to get c2s sessions available only/all
} List<JID> jids = new ArrayList<JID>();
else if (node.getNode() == null) { if (serverName.equals(route.getDomain())) {
list = new LinkedList(); // Address belongs to local user
getRoutes(list, (Map) nameRoutes); if (route.getResource() != null) {
// Address is a full JID of a user
ClientRoute clientRoute = usersCache.get(route.toString());
if (clientRoute == null) {
clientRoute = anonymousUsersCache.get(route.toString());
}
if (clientRoute != null && clientRoute.isAvailable()) {
jids.add(route);
}
} }
else { else {
Object resourceRoutes = ((Map) nameRoutes).get(node.getNode()); // Address is a bare JID so return all AVAILABLE resources of user
if (resourceRoutes != null) { List<String> sessions = usersSessions.get(route.toBareJID());
if (resourceRoutes instanceof ChannelHandler) { // Select only available sessions
list = new LinkedList(); for (String jid : sessions) {
list.add(resourceRoutes); ClientRoute clientRoute = usersCache.get(jid);
} if (clientRoute == null) {
else if (node.getResource() == null || node.getResource().length() == 0) { clientRoute = anonymousUsersCache.get(jid);
list = new LinkedList();
getRoutes(list, (Map) resourceRoutes);
} }
else { if (clientRoute != null && clientRoute.isAvailable()) {
Object entry = ((Map) resourceRoutes).get(node.getResource()); jids.add(new JID(jid));
if (entry != null) {
list = new LinkedList();
list.add(entry);
}
} }
} }
} }
} }
if (list == null) { else if (route.getDomain().contains(serverName)) {
return Collections.emptyList(); // Packet sent to component hosted in this server
byte[] nodeID = componentsCache.get(route.getDomain());
if (nodeID != null) {
jids.add(route);
}
} }
else { else {
return list; // Packet sent to remote server
} byte[] nodeID = serversCache.get(route.getDomain());
} if (nodeID != null) {
jids.add(route);
/**
* Recursive method to iterate through the given table (and any embedded map)
* and stuff non-Map values into the given list.<p>
*
* There should be no recursion problems since the routing table is at most 3 levels deep.
*
* @param list The list to stuff entries into
* @param table The hashtable who's values should be entered into the list
*/
private void getRoutes(LinkedList list, Map table) {
Iterator entryIter = table.values().iterator();
while (entryIter.hasNext()) {
Object entry = entryIter.next();
if (entry instanceof ConcurrentHashMap) {
getRoutes(list, (Map)entry);
} }
else { else {
// Do not include the same entry many times. This could be the case when the same // TODO Decide if we want to return address of remote server we don't have a route to
// session is associated with the bareJID and with a given resource jids.add(route);
if (!list.contains(entry)) {
list.add(entry);
}
} }
} }
return jids;
} }
public ChannelHandler getBestRoute(JID node) { public boolean removeClientRoute(JID route) {
ChannelHandler route = getRoute(node); boolean anonymous = false;
if (route == null) { String address = route.toString();
// Try looking for a route based on the bare JID ClientRoute clientRoute = usersCache.remove(address);
String nodeJID = node.getNode() == null ? "" : node.getNode(); if (clientRoute == null) {
route = getRoute(node.toBareJID(), nodeJID, node.getDomain(), ""); clientRoute = anonymousUsersCache.remove(address);
anonymous = true;
} }
return route; if (clientRoute != null && route.getResource() != null) {
} Lock lock = LockManager.getLock(route.toBareJID());
try {
public ChannelHandler removeRoute(JID node) { lock.lock();
if (anonymous) {
ChannelHandler route = null; usersSessions.remove(route.toBareJID());
String nodeJID = node.getNode() == null ? "" : node.getNode();
String resourceJID = node.getResource() == null ? "" : node.getResource();
try {
Object nameRoutes = routes.get(node.getDomain());
if (nameRoutes instanceof ConcurrentHashMap) {
Object resourceRoutes = ((Map) nameRoutes).get(nodeJID);
if (resourceRoutes instanceof ConcurrentHashMap) {
// Remove the requested resource for this user
route = (ChannelHandler) ((Map) resourceRoutes).remove(resourceJID);
if (((Map) resourceRoutes).isEmpty()) {
((Map) nameRoutes).remove(nodeJID);
if (((Map) nameRoutes).isEmpty()) {
routes.remove(node.getDomain());
}
}
} }
else { else {
// Remove the unique route to this node List<String> jids = usersSessions.get(route.toBareJID());
((Map) nameRoutes).remove(nodeJID); if (jids != null) {
jids.remove(route.toString());
if (!jids.isEmpty()) {
usersSessions.put(route.toBareJID(), jids);
}
else {
usersSessions.remove(route.toBareJID());
}
}
} }
} }
else if (nameRoutes != null) { finally {
// The retrieved route points to a RoutableChannelHandler lock.unlock();
if (("".equals(nodeJID) && "".equals(resourceJID)) ||
((RoutableChannelHandler) nameRoutes).getAddress().equals(node)) {
// Remove the route to this domain
routes.remove(node.getDomain());
}
} }
} }
catch (Exception e) { localRoutingTable.removeRoute(address);
Log.error("Error removing route", e); return clientRoute != null;
} }
return route;
public boolean removeServerRoute(JID route) {
String address = route.getDomain();
boolean removed = serversCache.remove(address) != null;
localRoutingTable.removeRoute(address);
return removed;
}
public boolean removeComponentRoute(JID route) {
String address = route.getDomain();
boolean removed = componentsCache.remove(address) != null;
localRoutingTable.removeRoute(address);
return removed;
}
public void setRemotePacketRouter(RemotePacketRouter remotePacketRouter) {
this.remotePacketRouter = remotePacketRouter;
} }
public void initialize(XMPPServer server) { public void initialize(XMPPServer server) {
super.initialize(server); super.initialize(server);
this.server = server;
serverName = server.getServerInfo().getName(); serverName = server.getServerInfo().getName();
iqRouter = server.getIQRouter();
messageRouter = server.getMessageRouter();
presenceRouter = server.getPresenceRouter();
} }
public void start() throws IllegalStateException { public void start() throws IllegalStateException {
super.start(); super.start();
componentManager = InternalComponentManager.getInstance();
} }
} }
\ No newline at end of file
...@@ -11,10 +11,7 @@ ...@@ -11,10 +11,7 @@
package org.jivesoftware.util.cache; package org.jivesoftware.util.cache;
import java.io.DataInput; import java.io.*;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -202,6 +199,14 @@ public class ExternalizableUtil { ...@@ -202,6 +199,14 @@ public class ExternalizableUtil {
return strategy.readByteArray(in); return strategy.readByteArray(in);
} }
public void writeSerializable(DataOutput out, Serializable value) throws IOException {
strategy.writeSerializable(out, value);
}
public Serializable readSerializable(DataInput in) throws IOException {
return strategy.readSerializable(in);
}
public void writeSafeUTF(DataOutput out, String value) throws IOException { public void writeSafeUTF(DataOutput out, String value) throws IOException {
strategy.writeSafeUTF(out, value); strategy.writeSafeUTF(out, value);
} }
......
...@@ -10,10 +10,7 @@ ...@@ -10,10 +10,7 @@
*/ */
package org.jivesoftware.util.cache; package org.jivesoftware.util.cache;
import java.io.DataInput; import java.io.*;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -120,6 +117,10 @@ public interface ExternalizableUtilStrategy { ...@@ -120,6 +117,10 @@ public interface ExternalizableUtilStrategy {
byte[] readByteArray(DataInput in) throws IOException; byte[] readByteArray(DataInput in) throws IOException;
void writeSerializable(DataOutput out, Serializable value) throws IOException;
Serializable readSerializable(DataInput in) throws IOException;
void writeSafeUTF(DataOutput out, String value) throws IOException; void writeSafeUTF(DataOutput out, String value) throws IOException;
String readSafeUTF(DataInput in) throws IOException; String readSafeUTF(DataInput in) throws IOException;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment