Commit aeaab52a authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Connection manager work. JM-666

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3875 b35dd754-fafc-0310-a699-88a17e54d16e
parent 63c7d541
...@@ -11,8 +11,8 @@ ...@@ -11,8 +11,8 @@
package org.jivesoftware.wildfire; package org.jivesoftware.wildfire;
import java.util.Iterator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
/** /**
* Represents a port on which the server will listen for connections. * Represents a port on which the server will listen for connections.
...@@ -117,11 +117,22 @@ public class ServerPort { ...@@ -117,11 +117,22 @@ public class ServerPort {
return type == Type.component; return type == Type.component;
} }
/**
* Returns true if connection managers can connect to this port.
*
* @return true if connection managers can connect to this port.
*/
public boolean isConnectionManagerPort() {
return type == Type.connectionManager;
}
public static enum Type { public static enum Type {
client, client,
server, server,
component; component,
connectionManager
} }
} }
...@@ -74,7 +74,11 @@ public class SocketConnection implements Connection { ...@@ -74,7 +74,11 @@ public class SocketConnection implements Connection {
private Writer writer; private Writer writer;
private AtomicBoolean writing = new AtomicBoolean(false); private AtomicBoolean writing = new AtomicBoolean(false);
private PacketDeliverer deliverer; /**
* Deliverer to use when the connection is closed or was closed when delivering
* a packet.
*/
private PacketDeliverer backupDeliverer;
private Session session; private Session session;
private boolean secure; private boolean secure;
...@@ -105,12 +109,12 @@ public class SocketConnection implements Connection { ...@@ -105,12 +109,12 @@ public class SocketConnection implements Connection {
/** /**
* Create a new session using the supplied socket. * Create a new session using the supplied socket.
* *
* @param deliverer the packet deliverer this connection will use. * @param backupDeliverer the packet deliverer this connection will use when socket is closed.
* @param socket the socket to represent. * @param socket the socket to represent.
* @param isSecure true if this is a secure connection. * @param isSecure true if this is a secure connection.
* @throws NullPointerException if the socket is null. * @throws NullPointerException if the socket is null.
*/ */
public SocketConnection(PacketDeliverer deliverer, Socket socket, boolean isSecure) public SocketConnection(PacketDeliverer backupDeliverer, Socket socket, boolean isSecure)
throws IOException { throws IOException {
if (socket == null) { if (socket == null) {
throw new NullPointerException("Socket channel must be non-null"); throw new NullPointerException("Socket channel must be non-null");
...@@ -119,7 +123,7 @@ public class SocketConnection implements Connection { ...@@ -119,7 +123,7 @@ public class SocketConnection implements Connection {
this.secure = isSecure; this.secure = isSecure;
this.socket = socket; this.socket = socket;
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), CHARSET)); writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), CHARSET));
this.deliverer = deliverer; this.backupDeliverer = backupDeliverer;
xmlSerializer = new XMLSocketWriter(writer, this); xmlSerializer = new XMLSocketWriter(writer, this);
instances.put(this, ""); instances.put(this, "");
...@@ -384,6 +388,17 @@ public class SocketConnection implements Connection { ...@@ -384,6 +388,17 @@ public class SocketConnection implements Connection {
return null; return null;
} }
/**
* Returns the packet deliverer to use when delivering a packet over the socket fails. The
* packet deliverer will retry to send the packet using some other connection, will store
* the packet offline for later retrieval or will just drop it.
*
* @return the packet deliverer to use when delivering a packet over the socket fails.
*/
public PacketDeliverer getPacketDeliverer() {
return backupDeliverer;
}
public void close() { public void close() {
boolean wasClosed = false; boolean wasClosed = false;
synchronized (this) { synchronized (this) {
...@@ -518,7 +533,7 @@ public class SocketConnection implements Connection { ...@@ -518,7 +533,7 @@ public class SocketConnection implements Connection {
public void deliver(Packet packet) throws UnauthorizedException, PacketException { public void deliver(Packet packet) throws UnauthorizedException, PacketException {
if (isClosed()) { if (isClosed()) {
deliverer.deliver(packet); backupDeliverer.deliver(packet);
} }
else { else {
try { try {
...@@ -548,7 +563,7 @@ public class SocketConnection implements Connection { ...@@ -548,7 +563,7 @@ public class SocketConnection implements Connection {
close(); close();
// Retry sending the packet again. Most probably if the packet is a // Retry sending the packet again. Most probably if the packet is a
// Message it will be stored offline // Message it will be stored offline
deliverer.deliver(packet); backupDeliverer.deliver(packet);
} }
else { else {
// Invoke the interceptors after we have sent the packet // Invoke the interceptors after we have sent the packet
......
...@@ -313,8 +313,25 @@ public abstract class SocketReader implements Runnable { ...@@ -313,8 +313,25 @@ public abstract class SocketReader implements Runnable {
closeNeverSecuredConnection(); closeNeverSecuredConnection();
return false; return false;
} }
SASLAuthentication saslAuth = new SASLAuthentication(session, reader);
return saslAuth.doHandshake(doc); boolean isComplete = false;
boolean success = false;
while (!isComplete) {
SASLAuthentication.Status status = SASLAuthentication.handle(session, doc);
if (status == SASLAuthentication.Status.needResponse) {
// Get the next answer since we are not done yet
doc = reader.parseDocument().getRootElement();
if (doc == null) {
// Nothing was read because the connection was closed or dropped
isComplete = true;
}
}
else {
isComplete = true;
success = status == SASLAuthentication.Status.authenticated;
}
}
return success;
} }
/** /**
......
...@@ -11,12 +11,13 @@ ...@@ -11,12 +11,13 @@
package org.jivesoftware.wildfire.spi; package org.jivesoftware.wildfire.spi;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.*; import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.container.BasicModule; import org.jivesoftware.wildfire.container.BasicModule;
import org.jivesoftware.wildfire.multiplex.MultiplexerPacketDeliverer;
import org.jivesoftware.wildfire.net.*; import org.jivesoftware.wildfire.net.*;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.JiveGlobals;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
...@@ -32,6 +33,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -32,6 +33,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
private SSLSocketAcceptThread sslSocketThread; private SSLSocketAcceptThread sslSocketThread;
private SocketAcceptThread componentSocketThread; private SocketAcceptThread componentSocketThread;
private SocketAcceptThread serverSocketThread; private SocketAcceptThread serverSocketThread;
private SocketAcceptThread multiplexerSocketThread;
private ArrayList<ServerPort> ports; private ArrayList<ServerPort> ports;
private SessionManager sessionManager; private SessionManager sessionManager;
...@@ -71,6 +73,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -71,6 +73,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
// Start the port listener for s2s communication // Start the port listener for s2s communication
startServerListener(localIPAddress); startServerListener(localIPAddress);
// Start the port listener for Connections Multiplexers
startMultiplexerListener(localIPAddress);
// Start the port listener for external components // Start the port listener for external components
startComponentListener(localIPAddress); startComponentListener(localIPAddress);
// Start the port listener for clients // Start the port listener for clients
...@@ -112,6 +116,39 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -112,6 +116,39 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
} }
private void startMultiplexerListener(String localIPAddress) {
// Start multiplexers socket unless it's been disabled.
if (isMultiplexerListenerEnabled()) {
int port = getMultiplexerListenerPort();
ServerPort serverPort = new ServerPort(port, serverName, localIPAddress,
false, null, ServerPort.Type.connectionManager);
try {
multiplexerSocketThread = new SocketAcceptThread(this, serverPort);
ports.add(serverPort);
multiplexerSocketThread.setDaemon(true);
multiplexerSocketThread.setPriority(Thread.MAX_PRIORITY);
multiplexerSocketThread.start();
List<String> params = new ArrayList<String>();
params.add(Integer.toString(multiplexerSocketThread.getPort()));
Log.info(LocaleUtils.getLocalizedString("startup.multiplexer", params));
}
catch (Exception e) {
System.err.println("Error starting multiplexer listener on port " + port + ": " +
e.getMessage());
Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);
}
}
}
private void stopMultiplexerListener() {
if (multiplexerSocketThread != null) {
multiplexerSocketThread.shutdown();
ports.remove(multiplexerSocketThread.getServerPort());
multiplexerSocketThread = null;
}
}
private void startComponentListener(String localIPAddress) { private void startComponentListener(String localIPAddress) {
// Start components socket unless it's been disabled. // Start components socket unless it's been disabled.
if (isComponentListenerEnabled()) { if (isComponentListenerEnabled()) {
...@@ -222,21 +259,32 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -222,21 +259,32 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
public void addSocket(Socket sock, boolean isSecure, ServerPort serverPort) { public void addSocket(Socket sock, boolean isSecure, ServerPort serverPort) {
try { try {
// the order of these calls is critical (stupid huh?) // the order of these calls is critical (stupid huh?)
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure); SocketReader reader;
SocketReader reader = null; String threadName;
String threadName = null;
if (serverPort.isClientPort()) { if (serverPort.isClientPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
reader = new ClientSocketReader(router, serverName, sock, conn); reader = new ClientSocketReader(router, serverName, sock, conn);
threadName = "Client SR - " + reader.hashCode(); threadName = "Client SR - " + reader.hashCode();
} }
else if (serverPort.isComponentPort()) { else if (serverPort.isComponentPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
reader = new ComponentSocketReader(router, serverName, sock, conn); reader = new ComponentSocketReader(router, serverName, sock, conn);
threadName = "Component SR - " + reader.hashCode(); threadName = "Component SR - " + reader.hashCode();
} }
else { else if (serverPort.isServerPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
reader = new ServerSocketReader(router, serverName, sock, conn); reader = new ServerSocketReader(router, serverName, sock, conn);
threadName = "Server SR - " + reader.hashCode(); threadName = "Server SR - " + reader.hashCode();
} }
else {
// Use the appropriate packeet deliverer for connection managers. The packet
// deliverer will be configured with the domain of the connection manager once
// the connection manager has finished the handshake.
SocketConnection conn =
new SocketConnection(new MultiplexerPacketDeliverer(), sock, isSecure);
reader = new ConnectionMultiplexerSocketReader(router, serverName, sock, conn);
threadName = "ConnectionMultiplexer SR - " + reader.hashCode();
}
Thread thread = new Thread(reader, threadName); Thread thread = new Thread(reader, threadName);
thread.setDaemon(true); thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY); thread.setPriority(Thread.NORM_PRIORITY);
...@@ -339,6 +387,27 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -339,6 +387,27 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
return JiveGlobals.getBooleanProperty("xmpp.server.socket.active", true); return JiveGlobals.getBooleanProperty("xmpp.server.socket.active", true);
} }
public void enableMultiplexerListener(boolean enabled) {
if (enabled == isMultiplexerListenerEnabled()) {
// Ignore new setting
return;
}
if (enabled) {
JiveGlobals.setProperty("xmpp.multiplex.socket.active", "true");
// Start the port listener for s2s communication
startMultiplexerListener(localIPAddress);
}
else {
JiveGlobals.setProperty("xmpp.multiplex.socket.active", "false");
// Stop the port listener for s2s communication
stopMultiplexerListener();
}
}
public boolean isMultiplexerListenerEnabled() {
return JiveGlobals.getBooleanProperty("xmpp.multiplex.socket.active", true);
}
public void setClientListenerPort(int port) { public void setClientListenerPort(int port) {
if (port == getClientListenerPort()) { if (port == getClientListenerPort()) {
// Ignore new setting // Ignore new setting
...@@ -415,6 +484,11 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -415,6 +484,11 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
SocketAcceptThread.DEFAULT_SERVER_PORT); SocketAcceptThread.DEFAULT_SERVER_PORT);
} }
public int getMultiplexerListenerPort() {
return JiveGlobals.getIntProperty("xmpp.multiplex.socket.port",
SocketAcceptThread.DEFAULT_MULTIPLEX_PORT);
}
// ##################################################################### // #####################################################################
// Module management // Module management
// ##################################################################### // #####################################################################
...@@ -432,6 +506,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -432,6 +506,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
stopClientListeners(); stopClientListeners();
stopClientSSLListeners(); stopClientSSLListeners();
stopComponentListener(); stopComponentListener();
stopMultiplexerListener();
stopServerListener(); stopServerListener();
SocketSendingTracker.getInstance().shutdown(); SocketSendingTracker.getInstance().shutdown();
serverName = null; serverName = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment