Commit 1dbe2e49 authored by Daniel Henninger's avatar Daniel Henninger Committed by dhenninger

[JM-1217] Couple more improvements to multiple connection support.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@9777 b35dd754-fafc-0310-a699-88a17e54d16e
parent 8cb0cdad
...@@ -1150,11 +1150,15 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -1150,11 +1150,15 @@ public class SessionManager extends BasicModule implements ClusterEventListener
finally { finally {
// Remove the session // Remove the session
localSessionManager.getComponentsSessions().remove(session); localSessionManager.getComponentsSessions().remove(session);
// Remove track of the cluster node hosting the external component // Remove track of the cluster node hosting the external component
// if no more components are handling it.
if (!InternalComponentManager.getInstance().hasComponent(session.getAddress())) {
componentSessionsCache.remove(session.getAddress().toString()); componentSessionsCache.remove(session.getAddress().toString());
} }
} }
} }
}
private class IncomingServerSessionListener implements ConnectionCloseListener { private class IncomingServerSessionListener implements ConnectionCloseListener {
/** /**
......
...@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit; ...@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
*/ */
public class InternalComponentManager extends BasicModule implements ComponentManager, RoutableChannelHandler { public class InternalComponentManager extends BasicModule implements ComponentManager, RoutableChannelHandler {
private Map<String, RoutableComponent> routables = new ConcurrentHashMap<String, RoutableComponent>(); final private Map<String, RoutableComponents> routables = new ConcurrentHashMap<String, RoutableComponents>();
private Map<String, IQ> componentInfo = new ConcurrentHashMap<String, IQ>(); private Map<String, IQ> componentInfo = new ConcurrentHashMap<String, IQ>();
private Map<JID, JID> presenceMap = new ConcurrentHashMap<JID, JID>(); private Map<JID, JID> presenceMap = new ConcurrentHashMap<JID, JID>();
/** /**
...@@ -104,7 +104,8 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -104,7 +104,8 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
} }
public void addComponent(String subdomain, Component component) throws ComponentException { public void addComponent(String subdomain, Component component) throws ComponentException {
RoutableComponent routable = routables.get(subdomain); synchronized (routables) {
RoutableComponents routable = routables.get(subdomain);
if (routable != null && routable.hasComponent(component)) { if (routable != null && routable.hasComponent(component)) {
// This component has already registered with this subdomain. // This component has already registered with this subdomain.
// TODO: Is this all we should do? Should we return an error? // TODO: Is this all we should do? Should we return an error?
...@@ -116,7 +117,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -116,7 +117,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
routable.addComponent(component); routable.addComponent(component);
} }
else { else {
routable = new RoutableComponent(componentJID, component); routable = new RoutableComponents(componentJID, component);
routables.put(subdomain, routable); routables.put(subdomain, routable);
// Add the route to the new service provided by the component // Add the route to the new service provided by the component
...@@ -158,19 +159,22 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -158,19 +159,22 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
XMPPServer.getInstance().getRoutingTable().removeComponentRoute(componentJID); XMPPServer.getInstance().getRoutingTable().removeComponentRoute(componentJID);
} }
} }
}
} }
public void removeComponent(String subdomain) { public void removeComponent(String subdomain) {
synchronized (routables) {
Log.debug("InternalComponentManager: Unregistering all components for domain: " + subdomain); Log.debug("InternalComponentManager: Unregistering all components for domain: " + subdomain);
RoutableComponent routable = routables.get(subdomain); RoutableComponents routable = routables.get(subdomain);
routable.removeAllComponents(); routable.removeAllComponents();
routables.remove(subdomain); routables.remove(subdomain);
} }
}
public void removeComponent(String subdomain, Component component) { public void removeComponent(String subdomain, Component component) {
synchronized (routables) {
Log.debug("InternalComponentManager: Unregistering component for domain: " + subdomain); Log.debug("InternalComponentManager: Unregistering component for domain: " + subdomain);
RoutableComponent routable = routables.get(subdomain); RoutableComponents routable = routables.get(subdomain);
routable.removeComponent(component); routable.removeComponent(component);
if (routable.numberOfComponents() == 0) { if (routable.numberOfComponents() == 0) {
routables.remove(subdomain); routables.remove(subdomain);
...@@ -207,6 +211,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -207,6 +211,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
Log.debug("InternalComponentManager: Other components still tied to domain: " + subdomain); Log.debug("InternalComponentManager: Other components still tied to domain: " + subdomain);
} }
} }
}
public void sendPacket(Component component, Packet packet) { public void sendPacket(Component component, Packet packet) {
if (packet != null && packet.getFrom() == null) { if (packet != null && packet.getFrom() == null) {
...@@ -259,9 +264,9 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -259,9 +264,9 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
public void addListener(ComponentEventListener listener) { public void addListener(ComponentEventListener listener) {
listeners.add(listener); listeners.add(listener);
// Notify the new listener about existing components // Notify the new listener about existing components
for (Map.Entry<String, RoutableComponent> entry : routables.entrySet()) { for (Map.Entry<String, RoutableComponents> entry : routables.entrySet()) {
String subdomain = entry.getKey(); String subdomain = entry.getKey();
RoutableComponent routable = entry.getValue(); RoutableComponents routable = entry.getValue();
for (Component component : routable.getComponents()) { for (Component component : routable.getComponents()) {
JID componentJID = new JID(subdomain + "." + serverDomain); JID componentJID = new JID(subdomain + "." + serverDomain);
listener.componentRegistered(component, componentJID); listener.componentRegistered(component, componentJID);
...@@ -366,10 +371,11 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -366,10 +371,11 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
* @return the component with the specified id. * @return the component with the specified id.
*/ */
private Component getComponent(JID componentJID) { private Component getComponent(JID componentJID) {
synchronized (routables) {
if (componentJID.getNode() != null) { if (componentJID.getNode() != null) {
return null; return null;
} }
RoutableComponent routable = routables.get(componentJID.getDomain()); RoutableComponents routable = routables.get(componentJID.getDomain());
if (routable != null) { if (routable != null) {
return routable.getNextComponent(); return routable.getNextComponent();
} }
...@@ -387,6 +393,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -387,6 +393,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
} }
return null; return null;
} }
}
/** /**
* Returns true if a component is associated to the specified address. Components * Returns true if a component is associated to the specified address. Components
...@@ -396,14 +403,16 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -396,14 +403,16 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
* @return true if a component is associated to the specified address. * @return true if a component is associated to the specified address.
*/ */
public boolean hasComponent(JID componentJID) { public boolean hasComponent(JID componentJID) {
synchronized (routables) {
if (componentJID.getNode() != null || componentJID.getResource() != null) { if (componentJID.getNode() != null || componentJID.getResource() != null) {
return false; return false;
} }
// if (componentJID.getDomain().lastIndexOf("." + serverDomain) == -1) { // if (componentJID.getDomain().lastIndexOf("." + serverDomain) == -1) {
// componentJID = new JID(componentJID.getDomain() + "." + serverDomain); // componentJID = new JID(componentJID.getDomain() + "." + serverDomain);
// } // }
return routingTable.hasComponentRoute(componentJID); return routingTable.hasComponentRoute(componentJID);
} }
}
/** /**
* Registers Probeers who have not yet been serviced. * Registers Probeers who have not yet been serviced.
...@@ -446,7 +455,8 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -446,7 +455,8 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
iq.setChildElement("query", "http://jabber.org/protocol/disco#info"); iq.setChildElement("query", "http://jabber.org/protocol/disco#info");
// Send the disco#info request to the component. The reply (if any) will be processed in // Send the disco#info request to the component. The reply (if any) will be processed in
// #process(Packet) // #process(Packet)
sendPacket(component, iq); // sendPacket(component, iq);
component.processPacket(iq);
} }
public JID getAddress() { public JID getAddress() {
...@@ -468,10 +478,8 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -468,10 +478,8 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
if (packet instanceof IQ && IQ.Type.result == ((IQ) packet).getType()) { if (packet instanceof IQ && IQ.Type.result == ((IQ) packet).getType()) {
IQ iq = (IQ) packet; IQ iq = (IQ) packet;
Element childElement = iq.getChildElement(); Element childElement = iq.getChildElement();
String namespace = null;
if (childElement != null) { if (childElement != null) {
namespace = childElement.getNamespaceURI(); String namespace = childElement.getNamespaceURI();
}
if ("http://jabber.org/protocol/disco#info".equals(namespace)) { if ("http://jabber.org/protocol/disco#info".equals(namespace)) {
// Add a disco item to the server for the component that supports disco // Add a disco item to the server for the component that supports disco
Element identity = childElement.element("identity"); Element identity = childElement.element("identity");
...@@ -506,16 +514,17 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -506,16 +514,17 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
} }
} }
} }
}
/** /**
* Exposes a Component as a RoutableChannelHandler. * Exposes a Component as a RoutableChannelHandler.
*/ */
public static class RoutableComponent implements RoutableChannelHandler { private static class RoutableComponents implements RoutableChannelHandler {
private JID jid; private JID jid;
final private List<Component> components; final private List<Component> components;
public RoutableComponent(JID jid, Component component) { public RoutableComponents(JID jid, Component component) {
this.jid = jid; this.jid = jid;
this.components = new ArrayList<Component>(); this.components = new ArrayList<Component>();
addComponent(component); addComponent(component);
......
...@@ -111,11 +111,11 @@ public class ComponentSocketReader extends SocketReader { ...@@ -111,11 +111,11 @@ public class ComponentSocketReader extends SocketReader {
return false; return false;
} }
boolean createSession(String namespace, Boolean allowMultiple) throws UnauthorizedException, XmlPullParserException, boolean createSession(String namespace) throws UnauthorizedException, XmlPullParserException,
IOException { IOException {
if ("jabber:component:accept".equals(namespace)) { if ("jabber:component:accept".equals(namespace)) {
// The connected client is a component so create a ComponentSession // The connected client is a component so create a ComponentSession
session = LocalComponentSession.createSession(serverName, reader, connection, allowMultiple); session = LocalComponentSession.createSession(serverName, reader, connection);
return true; return true;
} }
return false; return false;
......
...@@ -204,9 +204,8 @@ public class ServerSocketReader extends SocketReader { ...@@ -204,9 +204,8 @@ public class ServerSocketReader extends SocketReader {
threadPool.shutdown(); threadPool.shutdown();
} }
boolean createSession(String namespace, Boolean allowMultiple) throws UnauthorizedException, XmlPullParserException, boolean createSession(String namespace) throws UnauthorizedException, XmlPullParserException,
IOException { IOException {
// TODO: Should we ever consider allowing multiple of this?
if ("jabber:server".equals(namespace)) { if ("jabber:server".equals(namespace)) {
// The connected client is a server so create an IncomingServerSession // The connected client is a server so create an IncomingServerSession
session = LocalIncomingServerSession.createSession(serverName, reader, connection); session = LocalIncomingServerSession.createSession(serverName, reader, connection);
......
...@@ -231,6 +231,7 @@ public abstract class SocketReader implements Runnable { ...@@ -231,6 +231,7 @@ public abstract class SocketReader implements Runnable {
* another thread. * another thread.
* *
* @param packet the received packet. * @param packet the received packet.
* @throws UnauthorizedException if the connection required security but was not secured.
*/ */
protected void processIQ(IQ packet) throws UnauthorizedException { protected void processIQ(IQ packet) throws UnauthorizedException {
// Ensure that connection was secured if TLS was required // Ensure that connection was secured if TLS was required
...@@ -253,6 +254,7 @@ public abstract class SocketReader implements Runnable { ...@@ -253,6 +254,7 @@ public abstract class SocketReader implements Runnable {
* another thread. * another thread.
* *
* @param packet the received packet. * @param packet the received packet.
* @throws UnauthorizedException if the connection required security but was not secured.
*/ */
protected void processPresence(Presence packet) throws UnauthorizedException { protected void processPresence(Presence packet) throws UnauthorizedException {
// Ensure that connection was secured if TLS was required // Ensure that connection was secured if TLS was required
...@@ -275,6 +277,7 @@ public abstract class SocketReader implements Runnable { ...@@ -275,6 +277,7 @@ public abstract class SocketReader implements Runnable {
* another thread. * another thread.
* *
* @param packet the received packet. * @param packet the received packet.
* @throws UnauthorizedException if the connection required security but was not secured.
*/ */
protected void processMessage(Message packet) throws UnauthorizedException { protected void processMessage(Message packet) throws UnauthorizedException {
// Ensure that connection was secured if TLS was required // Ensure that connection was secured if TLS was required
...@@ -348,6 +351,10 @@ public abstract class SocketReader implements Runnable { ...@@ -348,6 +351,10 @@ public abstract class SocketReader implements Runnable {
* If the connection remains open, the XPP will be set to be ready for the * If the connection remains open, the XPP will be set to be ready for the
* first packet. A call to next() should result in an START_TAG state with * first packet. A call to next() should result in an START_TAG state with
* the first packet in the stream. * the first packet in the stream.
*
* @throws UnauthorizedException if the connection required security but was not secured.
* @throws XmlPullParserException if there was an XML error while creating the session.
* @throws IOException if an IO error occured while creating the session.
*/ */
protected void createSession() protected void createSession()
throws UnauthorizedException, XmlPullParserException, IOException { throws UnauthorizedException, XmlPullParserException, IOException {
...@@ -360,7 +367,6 @@ public abstract class SocketReader implements Runnable { ...@@ -360,7 +367,6 @@ public abstract class SocketReader implements Runnable {
// subdomain. If the value of the 'to' attribute is not valid then return a host-unknown // subdomain. If the value of the 'to' attribute is not valid then return a host-unknown
// error and close the underlying connection. // error and close the underlying connection.
String host = reader.getXPPParser().getAttributeValue("", "to"); String host = reader.getXPPParser().getAttributeValue("", "to");
String allowMultiple = reader.getXPPParser().getAttributeValue("", "allowMultiple");
if (validateHost() && isHostUnknown(host)) { if (validateHost() && isHostUnknown(host)) {
StringBuilder sb = new StringBuilder(250); StringBuilder sb = new StringBuilder(250);
sb.append("<?xml version='1.0' encoding='"); sb.append("<?xml version='1.0' encoding='");
...@@ -388,7 +394,7 @@ public abstract class SocketReader implements Runnable { ...@@ -388,7 +394,7 @@ public abstract class SocketReader implements Runnable {
// Create the correct session based on the sent namespace. At this point the server // Create the correct session based on the sent namespace. At this point the server
// may offer the client to secure the connection. If the client decides to secure // may offer the client to secure the connection. If the client decides to secure
// the connection then a <starttls> stanza should be received // the connection then a <starttls> stanza should be received
else if (!createSession(xpp.getNamespace(null), allowMultiple != null)) { else if (!createSession(xpp.getNamespace(null))) {
// No session was created because of an invalid namespace prefix so answer a stream // No session was created because of an invalid namespace prefix so answer a stream
// error and close the underlying connection // error and close the underlying connection
StringBuilder sb = new StringBuilder(250); StringBuilder sb = new StringBuilder(250);
...@@ -457,12 +463,11 @@ public abstract class SocketReader implements Runnable { ...@@ -457,12 +463,11 @@ public abstract class SocketReader implements Runnable {
* Creates the appropriate {@link org.jivesoftware.openfire.session.Session} subclass based on the specified namespace. * Creates the appropriate {@link org.jivesoftware.openfire.session.Session} subclass based on the specified namespace.
* *
* @param namespace the namespace sent in the stream element. eg. jabber:client. * @param namespace the namespace sent in the stream element. eg. jabber:client.
* @param allowMultiple Allow multiple bindings to the specified domain.
* @return the created session or null. * @return the created session or null.
* @throws UnauthorizedException * @throws UnauthorizedException if the connection required security but was not secured.
* @throws XmlPullParserException * @throws XmlPullParserException if there was an XML error while creating the session.
* @throws IOException * @throws IOException if an IO error occured while creating the session.
*/ */
abstract boolean createSession(String namespace, Boolean allowMultiple) throws UnauthorizedException, abstract boolean createSession(String namespace) throws UnauthorizedException,
XmlPullParserException, IOException; XmlPullParserException, IOException;
} }
...@@ -55,15 +55,18 @@ public class LocalComponentSession extends LocalSession implements ComponentSess ...@@ -55,15 +55,18 @@ public class LocalComponentSession extends LocalSession implements ComponentSess
* @param serverName the name of the server where the session is connecting to. * @param serverName the name of the server where the session is connecting to.
* @param reader the reader that is reading the provided XML through the connection. * @param reader the reader that is reading the provided XML through the connection.
* @param connection the connection with the component. * @param connection the connection with the component.
* @param allowMultiple the specified domain is allowed to be connected to multiple times.
* @return a newly created session between the server and a component. * @return a newly created session between the server and a component.
* @throws UnauthorizedException if the connection required security but was not secured.
* @throws XmlPullParserException if there was an XML error while creating the session.
* @throws IOException if an IO error occured while creating the session.
*/ */
public static LocalComponentSession createSession(String serverName, XMPPPacketReader reader, public static LocalComponentSession createSession(String serverName, XMPPPacketReader reader,
SocketConnection connection, Boolean allowMultiple) throws UnauthorizedException, IOException, SocketConnection connection) throws UnauthorizedException, IOException,
XmlPullParserException XmlPullParserException
{ {
XmlPullParser xpp = reader.getXPPParser(); XmlPullParser xpp = reader.getXPPParser();
String domain = xpp.getAttributeValue("", "to"); String domain = xpp.getAttributeValue("", "to");
Boolean allowMultiple = reader.getXPPParser().getAttributeValue("", "allowMultiple") != null;
Log.debug("LocalComponentSession: [ExComp] Starting registration of new external component for domain: " + domain); Log.debug("LocalComponentSession: [ExComp] Starting registration of new external component for domain: " + domain);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment