Commit 507c70a1 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gaston

Changed way packets are being routed to internal components. Internal and...

Changed way packets are being routed to internal components. Internal and external components now have a route in the RoutingTable. LIVE-592


git-svn-id: http://svn.igniterealtime.org/svn/repos/messenger/trunk@943 b35dd754-fafc-0310-a699-88a17e54d16e
parent a2ee8ade
......@@ -11,7 +11,7 @@
package org.jivesoftware.messenger;
import org.xmpp.packet.Packet;
/**
* Interface for Components.
......@@ -19,12 +19,16 @@ import org.xmpp.packet.Packet;
* @see ComponentManager
* @author Derek DeMoro
*/
public interface Component {
public interface Component extends RoutableChannelHandler {
/**
* Processes an incoming packet addressed to this component.
* Returns the service name of this component. The service name is usually the part before the
* dot before the server address in a JID. For example, given this JID jdoe@broadcast.localhost
* the service name would be broadcast.<p>
*
* This information is useful when adding or removing the component from the ComponentManager.
*
* @param packet the packet.
* @return the service name of this component.
*/
void processPacket(Packet packet);
public String getServiceName();
}
......@@ -11,12 +11,14 @@
package org.jivesoftware.messenger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.xmpp.packet.Packet;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Manages the registration and delegation of Components. The ComponentManager
* is responsible for managing registration and delegation of {@link Component Components},
......@@ -49,7 +51,9 @@ public class ComponentManager {
/**
* Registers a <code>Component</code> with the server and maps
* to particular jid.
* to particular jid. A route for the new Component will be added to
* the <code>RoutingTable</code> based on the address of the component.
* Note: The address of the component will be a JID wih empties node and resource.
*
* @param jid the jid to map to.
* @param component the <code>Component</code> to register.
......@@ -58,17 +62,27 @@ public class ComponentManager {
jid = new JID(jid).toBareJID();
components.put(jid, component);
// Add the route to the new service provided by the component
XMPPServer.getInstance().getRoutingTable().addRoute(component.getAddress(), component);
// Check for potential interested users.
checkPresences();
}
/**
* Removes a <code>Component</code> from the server.
* Removes a <code>Component</code> from the server. The route for the new Component
* will be removed from the <code>RoutingTable</code>.
*
* @param jid the jid mapped to the particular component.
*/
public void removeComponent(String jid) {
components.remove(new JID(jid).toBareJID());
JID componentJID = new JID(jid);
components.remove(componentJID.toBareJID());
// Remove the route for the service provided by the component
if (XMPPServer.getInstance().getRoutingTable() != null) {
XMPPServer.getInstance().getRoutingTable().removeRoute(componentJID);
}
}
/**
......@@ -127,10 +141,16 @@ public class ComponentManager {
Presence presence = new Presence();
presence.setFrom(prober);
presence.setTo(probee);
component.processPacket(presence);
try {
component.process(presence);
// No reason to hold onto prober reference.
presenceMap.remove(prober);
}
catch (UnauthorizedException e) {
// Do nothing. This error should never occur
}
// No reason to hold onto prober reference.
presenceMap.remove(prober);
}
}
}
......
......@@ -11,18 +11,18 @@
package org.jivesoftware.messenger;
import org.xmpp.packet.IQ;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.JID;
import org.dom4j.Element;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.messenger.container.BasicModule;
import org.jivesoftware.messenger.handler.IQHandler;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.LocaleUtils;
import org.dom4j.Element;
import org.jivesoftware.util.Log;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError;
import java.util.List;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -40,6 +40,7 @@ public class IQRouter extends BasicModule {
private List<IQHandler> iqHandlers = new ArrayList<IQHandler>();
private Map<String, IQHandler> namespace2Handlers = new ConcurrentHashMap<String, IQHandler>();
private SessionManager sessionManager;
private ComponentManager componentManager;
/**
* Creates a packet router.
......@@ -76,12 +77,7 @@ public class IQRouter extends BasicModule {
else {
packet.setTo(sessionManager.getSession(packet.getFrom()).getAddress());
packet.setError(PacketError.Condition.not_authorized);
try {
sessionManager.getSession(packet.getFrom()).process(packet);
}
catch (UnauthorizedException ue) {
Log.error(ue);
}
sessionManager.getSession(packet.getFrom()).process(packet);
}
}
......@@ -128,8 +124,15 @@ public class IQRouter extends BasicModule {
routingTable = server.getRoutingTable();
iqHandlers.addAll(server.getIQHandlers());
sessionManager = server.getSessionManager();
componentManager = ComponentManager.getInstance();
}
/**
* A JID is considered local if:
* 1) is null or
* 2) has no domain or domain is empty or
* 3) has no resource or resource is empty
*/
private boolean isLocalServer(JID recipientJID) {
return recipientJID == null || recipientJID.getDomain() == null
|| "".equals(recipientJID.getDomain()) || recipientJID.getResource() == null
......@@ -139,8 +142,17 @@ public class IQRouter extends BasicModule {
private void handle(IQ packet) {
JID recipientJID = packet.getTo();
try {
if (isLocalServer(recipientJID)) {
// Check for registered components
Component component = null;
if (recipientJID != null) {
component = componentManager.getComponent(packet.getTo().toBareJID());
}
if (component != null) {
// A component was found that can handle the Packet
component.process(packet);
}
else if (isLocalServer(recipientJID)) {
// Let the server handle the Packet
Element childElement = packet.getChildElement();
String namespace = null;
if (childElement != null) {
......@@ -158,7 +170,8 @@ public class IQRouter extends BasicModule {
// Answer an error since the server can't handle the requested namespace
reply.setError(PacketError.Condition.service_unavailable);
}
else if (recipientJID.getNode() == null || "".equals(recipientJID.getNode())) {
else if (recipientJID.getNode() == null ||
"".equals(recipientJID.getNode())) {
// Answer an error if JID is of the form <domain>
reply.setError(PacketError.Condition.feature_not_implemented);
}
......
......@@ -30,18 +30,11 @@ public class PacketRouter extends BasicModule {
private PresenceRouter presenceRouter;
private MessageRouter messageRouter;
/**
* Initialize ComponentManager to handle delegation of packets.
*/
private ComponentManager componentManager;
/**
* Constructs a packet router.
*/
public PacketRouter() {
super("XMPP Packet Router");
componentManager = ComponentManager.getInstance();
}
/**
......@@ -52,13 +45,8 @@ public class PacketRouter extends BasicModule {
* any accesses to class resources.
*
* @param packet The packet to route
* @throws NullPointerException If the packet is null or the packet could not be routed
*/
public void route(Packet packet) {
if(hasRouted(packet)){
return;
}
if (packet instanceof Message) {
route((Message)packet);
}
......@@ -74,34 +62,15 @@ public class PacketRouter extends BasicModule {
}
public void route(IQ packet) {
if (!hasRouted(packet)){
iqRouter.route(packet);
}
iqRouter.route(packet);
}
public void route(Message packet) {
if (!hasRouted(packet)){
messageRouter.route(packet);
}
messageRouter.route(packet);
}
public void route(Presence packet) {
if (!hasRouted(packet)) {
presenceRouter.route(packet);
}
}
public boolean hasRouted(Packet packet){
if (packet.getTo() == null) {
return false;
}
// Check for registered components
Component component = componentManager.getComponent(packet.getTo().toBareJID());
if (component != null) {
component.processPacket(packet);
return true;
}
return false;
presenceRouter.route(packet);
}
public void initialize(XMPPServer server) {
......
......@@ -90,7 +90,7 @@ public class PresenceRouter extends BasicModule {
ChannelHandler handler = routingTable.getRoute(recipientJID);
handler.process(packet);
// Notify the PresenceUpdateHandler of the directed presence
updateHandler.directedPresenceSent(packet, handler);
updateHandler.directedPresenceSent(packet, handler, recipientJID.toString());
}
}
......
......@@ -11,22 +11,19 @@
package org.jivesoftware.messenger.handler;
import org.jivesoftware.messenger.container.BasicModule;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.messenger.*;
import org.jivesoftware.messenger.roster.RosterItem;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.messenger.spi.SessionImpl;
import org.jivesoftware.messenger.roster.RosterManager;
import org.jivesoftware.messenger.container.BasicModule;
import org.jivesoftware.messenger.roster.Roster;
import org.jivesoftware.messenger.roster.RosterItem;
import org.jivesoftware.messenger.roster.RosterManager;
import org.jivesoftware.messenger.user.UserNotFoundException;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmpp.packet.*;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* Implements the presence protocol. Clients use this protocol to
......@@ -70,7 +67,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
*/
public class PresenceUpdateHandler extends BasicModule implements ChannelHandler {
private Map<String, Set> directedPresences = new ConcurrentHashMap<String, Set>();
private Map<String, WeakHashMap<ChannelHandler, Set<String>>> directedPresences;
private RosterManager rosterManager;
private XMPPServer localServer;
......@@ -81,12 +78,13 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
public PresenceUpdateHandler() {
super("Presence update handler");
directedPresences = new ConcurrentHashMap<String, WeakHashMap<ChannelHandler, Set<String>>>();
}
public void process(Packet xmppPacket) throws UnauthorizedException, PacketException {
Presence presence = (Presence)xmppPacket;
try {
Session session = sessionManager.getSession(presence.getFrom());
ClientSession session = sessionManager.getSession(presence.getFrom());
Presence.Type type = presence.getType();
// Available
if (type == null) {
......@@ -101,7 +99,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
}
else if (Presence.Type.unavailable == type) {
broadcastUpdate(presence.createCopy());
broadcastUnavailableForDirectedPresences(presence.createCopy());
broadcastUnavailableForDirectedPresences(presence);
if (session != null) {
session.setPresence(presence);
}
......@@ -285,12 +283,14 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
*
* @param update the directed Presence sent by the user to an entity.
* @param handler the handler that routed the presence to the entity.
* @param jid the jid that the handler has processed
*/
public void directedPresenceSent(Presence update, ChannelHandler handler) {
public void directedPresenceSent(Presence update, ChannelHandler handler, String jid) {
if (update.getFrom() == null) {
return;
}
if (localServer.isLocal(update.getFrom())) {
WeakHashMap<ChannelHandler, Set<String>> map;
String name = update.getFrom().getNode();
try {
if (name != null && !"".equals(name)) {
......@@ -298,32 +298,42 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
Roster roster = rosterManager.getRoster(name);
// If the directed presence was sent to an entity that is not in the user's
// roster, keep a registry of this so that when the user goes offline we will
// be able to send the unavialable presence to the entity
// be able to send the unavailable presence to the entity
if (!roster.isRosterItem(update.getTo())) {
Set set = (Set)directedPresences.get(update.getFrom().toString());
if (set == null) {
// We are using a set to avoid duplicate handlers in case the user
// sends several directed presences to the same entity
set = new CopyOnWriteArraySet();
directedPresences.put(update.getFrom().toString(), set);
map = directedPresences.get(update.getFrom().toString());
if (map == null) {
// We are using a set to avoid duplicate jids in case the user
// sends several directed presences to the same handler. The Map also
// ensures that if the user sends several presences to the same handler
// we will have only one entry in the Map
map = new WeakHashMap<ChannelHandler,Set<String>>();
map.put(handler, new HashSet<String>());
directedPresences.put(update.getFrom().toString(), map);
}
if (Presence.Type.unavailable.equals(update.getType())) {
// It's a directed unavailable presence so remove the target entity
// from the registry
if (handler instanceof SessionImpl) {
set.remove(new HandlerWeakReference(handler));
if (set.isEmpty()) {
// It's a directed unavailable presence
if (handler instanceof ClientSession) {
// Client sessions will receive only presences to the same JID (the
// address of the session) so remove the handler from the map
map.remove(handler);
if (map.isEmpty()) {
// Remove the user from the registry since the list of directed
// presences is empty
directedPresences.remove(update.getFrom().toString());
}
}
else {
// A service may receive presences for many JIDs so in this case we
// just need to remove the jid that has received a directed
// unavailable presence
map.get(handler).remove(jid);
}
}
else {
// Add the handler to the list of handler that processed the directed
// presence sent by the user. This handler will be used to send
// the unavailable presence when the user goes offline
set.add(new HandlerWeakReference(handler));
map.get(handler).add(jid);
}
}
}
......@@ -348,21 +358,15 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
return;
}
if (localServer.isLocal(update.getFrom())) {
Set set = (Set)directedPresences.get(update.getFrom().toString());
if (set != null) {
RoutableChannelHandler handler;
Map<ChannelHandler, Set<String>> map = directedPresences.get(update.getFrom().toString());
if (map != null) {
// Iterate over all the entities that the user sent a directed presence
for (Iterator it = set.iterator(); it.hasNext();) {
// It is assumed that any type of PacketHandler (besides SessionImpl) will be
// responsible for sending/processing the offline presence to ALL the entities
// were the user has sent a directed presence. This is a consequence of using
// a set in order to prevent duplicte handlers.
// e.g. MultiUserChatServerImpl will remove the user from ALL the rooms
handler = (RoutableChannelHandler)((HandlerWeakReference)it.next()).get();
if (handler != null) {
update.setTo(handler.getAddress());
for (ChannelHandler handler : map.keySet()) {
for (String jid : map.get(handler)) {
Presence presence = update.createCopy();
presence.setTo(new JID(jid));
try {
handler.process(update);
handler.process(presence);
}
catch (UnauthorizedException ue) {
Log.error(ue);
......@@ -384,46 +388,4 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
messageStore = server.getOfflineMessageStore();
sessionManager = server.getSessionManager();
}
/**
* A WeakReference that redefines #equals(Object) so that the referent objects
* could be compared as if the weak reference does not exists.
*
* @author Gaston Dombiak
*/
private class HandlerWeakReference extends WeakReference {
/**
* We need to store the hash code separately since the referent
* could be removed by the GC.
*/
private int hash;
public HandlerWeakReference(Object referent) {
super(referent);
hash = referent.hashCode();
}
public int hashCode() {
return hash;
}
public boolean equals(Object object) {
if (this == object) return true;
if (object instanceof HandlerWeakReference) {
Object t = this.get();
Object u = ((HandlerWeakReference)object).get();
if ((t == null) || (u == null)) return false;
if (t == u) return true;
return t.equals(u);
}
else {
Object t = this.get();
if (t == null || (object == null)) return false;
if (t == object) return true;
return t.equals(object);
}
}
}
}
......@@ -52,7 +52,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
routeLock.writeLock().lock();
try {
if (routes.isEmpty()) {
if (routes.isEmpty() || destination instanceof Component) {
routes.put(node.getDomain(), destination);
}
else {
......
......@@ -46,6 +46,7 @@ public class BroadcastPlugin implements Plugin, Component {
private GroupManager groupManager;
private List<JID> allowedUsers;
private boolean groupMembersAllowed;
private JID serviceJID;
/**
* Constructs a new broadcast plugin.
......@@ -55,6 +56,8 @@ public class BroadcastPlugin implements Plugin, Component {
groupMembersAllowed = JiveGlobals.getBooleanProperty(
"plugin.broadcast.groupMembersAllowed", true);
allowedUsers = stringToList(JiveGlobals.getProperty("plugin.broadcast.allowedUsers", ""));
serviceJID =
new JID(serviceName + "." + XMPPServer.getInstance().getServerInfo().getName());
}
// Plugin Interface
......@@ -93,7 +96,7 @@ public class BroadcastPlugin implements Plugin, Component {
// Component Interface
public void processPacket(Packet packet) {
public void process(Packet packet) {
// Only respond to incoming messages. TODO: handle disco, presence, etc.
if (packet instanceof Message) {
Message message = (Message)packet;
......@@ -183,6 +186,10 @@ public class BroadcastPlugin implements Plugin, Component {
return serviceName;
}
public JID getAddress() {
return serviceJID;
}
/**
* Sets the service name of this component, which is "broadcast" by default.
*
......@@ -200,6 +207,8 @@ public class BroadcastPlugin implements Plugin, Component {
ComponentManager.getInstance().removeComponent(this.serviceName);
ComponentManager.getInstance().addComponent(serviceName, this);
this.serviceName = serviceName;
serviceJID =
new JID(serviceName + "." + XMPPServer.getInstance().getServerInfo().getName());
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment