Commit 9ee2a4b9 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gaston

Added support for s2s communication. JM-6


git-svn-id: http://svn.igniterealtime.org/svn/repos/messenger/trunk@1378 b35dd754-fafc-0310-a699-88a17e54d16e
parent a40b4613
......@@ -24,20 +24,24 @@ import java.util.ArrayList;
public class ServerPort {
private int port;
private String interfaceName;
private ArrayList names;
private String address;
private boolean secure;
private String algorithm;
private Type type;
public ServerPort(int port, String name, String address, boolean isSecure,
String algorithm)
public ServerPort(int port, String interfaceName, String name, String address,
boolean isSecure, String algorithm, Type type)
{
this.port = port;
this.interfaceName = interfaceName;
this.names = new ArrayList(1);
this.names.add(name);
this.address = address;
this.secure = isSecure;
this.algorithm = algorithm;
this.type = type;
}
/**
......@@ -49,6 +53,10 @@ public class ServerPort {
return port;
}
public String getInterfaceName() {
return interfaceName;
}
/**
* Returns the logical domains for this server port. As multiple
* domains may point to the same server, this helps to define what
......@@ -87,4 +95,39 @@ public class ServerPort {
public String getSecurityType() {
return algorithm;
}
/**
* Returns true if other servers can connect to this port for s2s communication.
*
* @return true if other servers can connect to this port for s2s communication.
*/
public boolean isServerPort() {
return type == Type.server;
}
/**
* Returns true if clients can connect to this port.
*
* @return true if clients can connect to this port.
*/
public boolean isClientPort() {
return type == Type.client;
}
/**
* Returns true if external components can connect to this port.
*
* @return true if external components can connect to this port.
*/
public boolean isComponentPort() {
return type == Type.component;
}
public static enum Type {
client,
server,
component;
}
}
......@@ -15,6 +15,8 @@ import org.jivesoftware.messenger.audit.AuditStreamIDFactory;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.messenger.container.BasicModule;
import org.jivesoftware.messenger.handler.PresenceUpdateHandler;
import org.jivesoftware.messenger.server.IncomingServerSession;
import org.jivesoftware.messenger.server.OutgoingServerSession;
import org.jivesoftware.messenger.spi.BasicStreamIDFactory;
import org.jivesoftware.messenger.user.UserManager;
import org.jivesoftware.messenger.user.UserNotFoundException;
......@@ -51,6 +53,8 @@ public class SessionManager extends BasicModule {
private ClientSessionListener clientSessionListener = new ClientSessionListener();
private ComponentSessionListener componentSessionListener = new ComponentSessionListener();
private IncomingServerSessionListener incomingServerListener = new IncomingServerSessionListener();
private OutgoingServerSessionListener outgoingServerListener = new OutgoingServerSessionListener();
/**
* Map that holds sessions that has been created but haven't been authenticated yet. The Map
......@@ -78,6 +82,22 @@ public class SessionManager extends BasicModule {
*/
private List<ComponentSession> componentsSessions = new CopyOnWriteArrayList<ComponentSession>();
/**
* The sessions contained in this Map are server sessions originated by a remote server. These
* sessions can only receive packets from the remote server but are not capable of sending
* packets to the remote server. Sessions will be added to this collecion only after they were
* authenticated. The key of the Map is the hostname of the remote server.
*/
private Map<String, IncomingServerSession> incomingServerSessions = new ConcurrentHashMap<String, IncomingServerSession>();
/**
* The sessions contained in this Map are server sessions originated from this server to remote
* servers. These sessions can only send packets to the remote server but are not capable of
* receiving packets from the remote server. Sessions will be added to this collecion only
* after they were authenticated. The key of the Map is the hostname of the remote server.
*/
private Map<String, OutgoingServerSession> outgoingServerSessions = new ConcurrentHashMap<String, OutgoingServerSession>();
/**
* <p>Session manager must maintain the routing table as sessions are added and
* removed.</p>
......@@ -316,6 +336,15 @@ public class SessionManager extends BasicModule {
}
}
/**
* Returns a randomly created ID to be used in a stream element.
*
* @return a randomly created ID to be used in a stream element.
*/
public StreamID nextStreamID() {
return streamIDFactory.createStreamID();
}
/**
* Creates a new <tt>ClientSession</tt>.
*
......@@ -327,11 +356,11 @@ public class SessionManager extends BasicModule {
if (serverName == null) {
throw new UnauthorizedException("Server not initialized");
}
StreamID id = streamIDFactory.createStreamID();
StreamID id = nextStreamID();
ClientSession session = new ClientSession(serverName, conn, id);
conn.init(session);
// Register to receive close notification on this session so we can
// remove its route from the sessions set and also send an unavailable presence if it wasn't
// remove and also send an unavailable presence if it wasn't
// sent before
conn.registerCloseListener(clientSessionListener, session);
......@@ -344,7 +373,7 @@ public class SessionManager extends BasicModule {
if (serverName == null) {
throw new UnauthorizedException("Server not initialized");
}
StreamID id = streamIDFactory.createStreamID();
StreamID id = nextStreamID();
ComponentSession session = new ComponentSession(serverName, conn, id);
conn.init(session);
// Register to receive close notification on this session so we can
......@@ -356,6 +385,91 @@ public class SessionManager extends BasicModule {
return session;
}
/**
* Creates a session for a remote server. The session should be created only after the
* remote server has been authenticated either using "server dialback" or SASL.
*
* @param conn the connection to the remote server.
* @param id the stream ID used in the stream element when authenticating the server.
* @return the newly created {@link IncomingServerSession}.
* @throws UnauthorizedException if the local server has not been initialized yet.
*/
public IncomingServerSession createIncomingServerSession(Connection conn, StreamID id)
throws UnauthorizedException {
if (serverName == null) {
throw new UnauthorizedException("Server not initialized");
}
IncomingServerSession session = new IncomingServerSession(serverName, conn, id);
conn.init(session);
// Register to receive close notification on this session so we can
// remove its route from the sessions set
conn.registerCloseListener(incomingServerListener, session);
return session;
}
/**
* Notification message that a new OutgoingServerSession has been created. Register a listener
* that will react when the connection gets closed.
*
* @param session the newly created OutgoingServerSession.
*/
public void outgoingServerSessionCreated(OutgoingServerSession session) {
// Register to receive close notification on this session so we can
// remove its route from the sessions set
session.getConnection().registerCloseListener(outgoingServerListener, session);
}
/**
* Registers that a server session originated by a remote server is hosting a given hostname.
* Notice that the remote server may be hosting several subdomains as well as virtual hosts so
* the same IncomingServerSession may be associated with many keys.
*
* @param hostname the hostname that is being served by the remote server.
* @param session the incoming server session to the remote server.
*/
public void registerIncomingServerSession(String hostname, IncomingServerSession session) {
incomingServerSessions.put(hostname, session);
}
/**
* Unregisters that a server session originated by a remote server is hosting a given hostname.
* Notice that the remote server may be hosting several subdomains as well as virtual hosts so
* the same IncomingServerSession may be associated with many keys.
*
* @param hostname the hostname that is being served by the remote server.
*/
public void unregisterIncomingServerSession(String hostname) {
incomingServerSessions.remove(hostname);
}
/**
* Registers that a server session originated by this server has been established to
* a remote server named hostname. This session will only be used for sending packets
* to the remote server and cannot receive packets. The {@link OutgoingServerSession}
* may have one or more domains, subdomains or virtual hosts authenticated with the
* remote server.
*
* @param hostname the hostname that is being served by the remote server.
* @param session the outgoing server session to the remote server.
*/
public void registerOutgoingServerSession(String hostname, OutgoingServerSession session) {
outgoingServerSessions.put(hostname, session);
}
/**
* Unregisters the server session that was originated by this server to a remote server
* named hostname. This session was only being used for sending packets
* to the remote server and not for receiving packets. The {@link OutgoingServerSession}
* may have one or more domains, subdomains or virtual hosts authenticated with the
* remote server.
*
* @param hostname the hostname that the session was connected with.
*/
public void unregisterOutgoingServerSession(String hostname) {
outgoingServerSessions.remove(hostname);
}
/**
* Add a new session to be managed.
*/
......@@ -781,6 +895,30 @@ public class SessionManager extends BasicModule {
return results;
}
/**
* Returns a session that was originated by a remote server. IncomingServerSession can only
* receive packets from the remote server but are not capable of sending packets to the remote
* server.
*
* @param hostname the name of the remote server.
* @return a session that was originated by a remote server.
*/
public IncomingServerSession getIncomingServerSession(String hostname) {
return incomingServerSessions.get(hostname);
}
/**
* Returns a session that was originated from this server to a remote server.
* OutgoingServerSession an only send packets to the remote server but are not capable of
* receiving packets from the remote server.
*
* @param hostname the name of the remote server.
* @return a session that was originated from this server to a remote server.
*/
public OutgoingServerSession getOutgoingServerSession(String hostname) {
return outgoingServerSessions.get(hostname);
}
/**
* <p>Determines if the given date is before the min date, or after the max date.</p>
* <p>The check is complicated somewhat by the fact that min can be null indicating
......@@ -1048,19 +1186,53 @@ public class SessionManager extends BasicModule {
* @param handback The session that just closed
*/
public void onConnectionClose(Object handback) {
try {
ComponentSession session = (ComponentSession)handback;
try {
// Unbind the domain for this external component
String domain = session.getAddress().getDomain();
String subdomain = domain.substring(0, domain.indexOf(serverName) - 1);
InternalComponentManager.getInstance().removeComponent(subdomain);
// Remove the session
componentsSessions.remove(session);
}
catch (Exception e) {
// Can't do anything about this problem...
Log.error(LocaleUtils.getLocalizedString("admin.error.close"), e);
}
finally {
// Remove the session
componentsSessions.remove(session);
}
}
}
private class IncomingServerSessionListener implements ConnectionCloseListener {
/**
* Handle a session that just closed.
*
* @param handback The session that just closed
*/
public void onConnectionClose(Object handback) {
IncomingServerSession session = (IncomingServerSession)handback;
// Remove all the hostnames that were registered for this server session
for (String hostname : session.getValidatedDomains()) {
unregisterIncomingServerSession(hostname);
}
}
}
private class OutgoingServerSessionListener implements ConnectionCloseListener {
/**
* Handle a session that just closed.
*
* @param handback The session that just closed
*/
public void onConnectionClose(Object handback) {
OutgoingServerSession session = (OutgoingServerSession)handback;
// Remove all the hostnames that were registered for this server session
for (String hostname : session.getHostnames()) {
unregisterOutgoingServerSession(hostname);
// Remove the route to the session using the hostname
XMPPServer.getInstance().getRoutingTable().removeRoute(new JID(hostname));
}
}
}
......
......@@ -11,16 +11,17 @@
package org.jivesoftware.messenger.net;
import org.jivesoftware.messenger.ConnectionManager;
import org.jivesoftware.messenger.ServerPort;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.messenger.ConnectionManager;
import org.jivesoftware.util.JiveGlobals;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import javax.net.ssl.SSLException;
/**
* Implements a network front end with a dedicated thread reading
......@@ -38,6 +39,11 @@ public class SSLSocketAcceptThread extends Thread {
*/
private InetAddress bindInterface;
/**
* Holds information about the port on which the server will listen for connections.
*/
private ServerPort serverPort;
/**
* True while this thread should continue running.
*/
......@@ -66,12 +72,13 @@ public class SSLSocketAcceptThread extends Thread {
* generated by this thread
* @throws IOException if there was trouble initializing the SSL configuration.
*/
public SSLSocketAcceptThread(ConnectionManager connManager) throws IOException {
public SSLSocketAcceptThread(ConnectionManager connManager, ServerPort serverPort)
throws IOException {
super("Secure Socket Listener");
this.connManager = connManager;
int port = JiveGlobals.getIntProperty("xmpp.socket.ssl.port", DEFAULT_PORT);
String interfaceName = JiveGlobals.getProperty("xmpp.socket.ssl.interface");
this.serverPort = serverPort;
int port = serverPort.getPort();
String interfaceName = serverPort.getInterfaceName();
bindInterface = null;
if (interfaceName != null) {
try {
......@@ -126,7 +133,7 @@ public class SSLSocketAcceptThread extends Thread {
try {
Socket sock = serverSocket.accept();
Log.debug("SSL Connect " + sock.toString());
connManager.addSocket(sock, true);
connManager.addSocket(sock, true, serverPort);
}
catch (SSLException se) {
long exceptionTime = System.currentTimeMillis();
......
......@@ -13,10 +13,7 @@ package org.jivesoftware.messenger.spi;
import org.jivesoftware.messenger.*;
import org.jivesoftware.messenger.container.BasicModule;
import org.jivesoftware.messenger.net.SSLSocketAcceptThread;
import org.jivesoftware.messenger.net.SocketAcceptThread;
import org.jivesoftware.messenger.net.SocketConnection;
import org.jivesoftware.messenger.net.SocketReadThread;
import org.jivesoftware.messenger.net.*;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.JiveGlobals;
......@@ -33,6 +30,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
private SocketAcceptThread socketThread;
private SSLSocketAcceptThread sslSocketThread;
private SocketAcceptThread componentSocketThread;
private SocketAcceptThread serverSocketThread;
private ArrayList<ServerPort> ports;
private SessionManager sessionManager;
......@@ -43,7 +42,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
public ConnectionManagerImpl() {
super("Connection Manager");
ports = new ArrayList<ServerPort>(2);
ports = new ArrayList<ServerPort>(4);
}
private void createSocket() {
......@@ -65,12 +64,77 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
localIPAddress = "Unknown";
}
}
// Start plain socket unless it's been disabled.
// Start the port listener for s2s communication
startServerListener(localIPAddress);
// Start the port listener for external components
startComponentListener(localIPAddress);
// Start the port listener for clients
startClientListeners(localIPAddress);
}
private void startServerListener(String localIPAddress) {
// Start servers socket unless it's been disabled.
if (JiveGlobals.getBooleanProperty("xmpp.server.socket.active", true)) {
int port = JiveGlobals.getIntProperty("xmpp.server.socket.port",
SocketAcceptThread.DEFAULT_SERVER_PORT);
String interfaceName = JiveGlobals.getProperty("xmpp.server.socket.interface");
ServerPort serverPort = new ServerPort(port, interfaceName, serverName, localIPAddress,
false, null, ServerPort.Type.server);
try {
serverSocketThread = new SocketAcceptThread(this, serverPort);
ports.add(serverPort);
serverSocketThread.setDaemon(true);
serverSocketThread.start();
List params = new ArrayList();
params.add(Integer.toString(serverSocketThread.getPort()));
Log.info(LocaleUtils.getLocalizedString("startup.server", params));
}
catch (Exception e) {
System.err.println("Error starting server listener on port " + port + ": " +
e.getMessage());
Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);
}
}
}
private void startComponentListener(String localIPAddress) {
// Start components socket unless it's been disabled.
if (JiveGlobals.getBooleanProperty("xmpp.component.socket.active", true)) {
int port = JiveGlobals.getIntProperty("xmpp.component.socket.port",
SocketAcceptThread.DEFAULT_COMPONENT_PORT);
String interfaceName = JiveGlobals.getProperty("xmpp.component.socket.interface");
ServerPort serverPort = new ServerPort(port, interfaceName, serverName, localIPAddress,
false, null, ServerPort.Type.component);
try {
componentSocketThread = new SocketAcceptThread(this, serverPort);
ports.add(serverPort);
componentSocketThread.setDaemon(true);
componentSocketThread.start();
List params = new ArrayList();
params.add(Integer.toString(componentSocketThread.getPort()));
Log.info(LocaleUtils.getLocalizedString("startup.component", params));
}
catch (Exception e) {
System.err.println("Error starting component listener on port " + port + ": " +
e.getMessage());
Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);
}
}
}
private void startClientListeners(String localIPAddress) {
// Start clients plain socket unless it's been disabled.
if (JiveGlobals.getBooleanProperty("xmpp.socket.plain.active", true)) {
int port = JiveGlobals.getIntProperty("xmpp.socket.plain.port",
SocketAcceptThread.DEFAULT_PORT);
String interfaceName = JiveGlobals.getProperty("xmpp.socket.plain.interface");
ServerPort serverPort = new ServerPort(port, interfaceName, serverName, localIPAddress,
false, null, ServerPort.Type.client);
try {
socketThread = new SocketAcceptThread(this);
ports.add(new ServerPort(socketThread.getPort(),
serverName, localIPAddress, false, null));
socketThread = new SocketAcceptThread(this, serverPort);
ports.add(serverPort);
socketThread.setDaemon(true);
socketThread.start();
......@@ -79,22 +143,25 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
Log.info(LocaleUtils.getLocalizedString("startup.plain", params));
}
catch (Exception e) {
System.err.println("Error starting XMPP listener on port " +
JiveGlobals.getIntProperty("xmpp.socket.plain.port", SocketAcceptThread.DEFAULT_PORT) +
": " + e.getMessage());
System.err.println("Error starting XMPP listener on port " + port + ": " +
e.getMessage());
Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);
}
}
// Start SSL unless it's been disabled.
// Start clients SSL unless it's been disabled.
if (JiveGlobals.getBooleanProperty("xmpp.socket.ssl.active", true)) {
try {
sslSocketThread = new SSLSocketAcceptThread(this);
int port = JiveGlobals.getIntProperty("xmpp.socket.ssl.port",
SSLSocketAcceptThread.DEFAULT_PORT);
String interfaceName = JiveGlobals.getProperty("xmpp.socket.ssl.interface");
String algorithm = JiveGlobals.getProperty("xmpp.socket.ssl.algorithm");
if ("".equals(algorithm) || algorithm == null) {
algorithm = "TLS";
}
ports.add(new ServerPort(sslSocketThread.getPort(), serverName,
localIPAddress, true, algorithm));
ServerPort serverPort = new ServerPort(port, interfaceName, serverName, localIPAddress,
true, algorithm, ServerPort.Type.client);
try {
sslSocketThread = new SSLSocketAcceptThread(this, serverPort);
ports.add(serverPort);
sslSocketThread.setDaemon(true);
sslSocketThread.start();
......@@ -103,9 +170,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
Log.info(LocaleUtils.getLocalizedString("startup.ssl", params));
}
catch (Exception e) {
System.err.println("Error starting SSL XMPP listener on port " +
JiveGlobals.getIntProperty("xmpp.socket.ssl.port", SSLSocketAcceptThread.DEFAULT_PORT) +
": " + e.getMessage());
System.err.println("Error starting SSL XMPP listener on port " + port + ": " +
e.getMessage());
Log.error(LocaleUtils.getLocalizedString("admin.error.ssl"), e);
}
}
......@@ -115,13 +181,27 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
return ports.iterator();
}
public void addSocket(Socket sock, boolean isSecure) {
public void addSocket(Socket sock, boolean isSecure, ServerPort serverPort) {
try {
// the order of these calls is critical (stupid huh?)
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
SocketReadThread reader = new SocketReadThread(router, serverName, sock, conn);
reader.setDaemon(true);
reader.start();
SocketReader reader = null;
String threadName = null;
if (serverPort.isClientPort()) {
reader = new ClientSocketReader(router, serverName, sock, conn);
threadName = "Client SR";
}
else if (serverPort.isComponentPort()) {
reader = new ComponentSocketReader(router, serverName, sock, conn);
threadName = "Component SR";
}
else {
reader = new ServerSocketReader(router, serverName, sock, conn);
threadName = "Server SR";
}
Thread thread = new Thread(reader, threadName);
thread.setDaemon(true);
thread.start();
}
catch (IOException e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
......@@ -162,6 +242,14 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
sslSocketThread.shutdown();
sslSocketThread = null;
}
if (componentSocketThread != null) {
componentSocketThread.shutdown();
componentSocketThread = null;
}
if (serverSocketThread != null) {
serverSocketThread.shutdown();
serverSocketThread = null;
}
serverName = null;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment