Commit 45b3ddd4 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Refactoring to use NIO for c2s.

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@6577 b35dd754-fafc-0310-a699-88a17e54d16e
parent b369394d
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
package org.jivesoftware.wildfire.spi; package org.jivesoftware.wildfire.spi;
import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.SocketAcceptor; import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
...@@ -21,9 +22,13 @@ import org.jivesoftware.wildfire.*; ...@@ -21,9 +22,13 @@ import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.container.BasicModule; import org.jivesoftware.wildfire.container.BasicModule;
import org.jivesoftware.wildfire.http.HttpBindManager; import org.jivesoftware.wildfire.http.HttpBindManager;
import org.jivesoftware.wildfire.net.*; import org.jivesoftware.wildfire.net.*;
import org.jivesoftware.wildfire.nio.ClientConnectionHandler;
import org.jivesoftware.wildfire.nio.MultiplexerConnectionHandler; import org.jivesoftware.wildfire.nio.MultiplexerConnectionHandler;
import org.jivesoftware.wildfire.nio.XMPPCodecFactory; import org.jivesoftware.wildfire.nio.XMPPCodecFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
...@@ -42,8 +47,8 @@ import java.util.concurrent.TimeUnit; ...@@ -42,8 +47,8 @@ import java.util.concurrent.TimeUnit;
public class ConnectionManagerImpl extends BasicModule implements ConnectionManager, CertificateEventListener { public class ConnectionManagerImpl extends BasicModule implements ConnectionManager, CertificateEventListener {
private SocketAcceptThread socketThread; private SocketAcceptor socketAcceptor;
private SSLSocketAcceptThread sslSocketThread; private SocketAcceptor sslSocketAcceptor;
private SocketAcceptThread componentSocketThread; private SocketAcceptThread componentSocketThread;
private SocketAcceptThread serverSocketThread; private SocketAcceptThread serverSocketThread;
private SocketAcceptor multiplexerSocketAcceptor; private SocketAcceptor multiplexerSocketAcceptor;
...@@ -140,7 +145,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -140,7 +145,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
multiplexerSocketAcceptor = buildSocketAcceptor(); multiplexerSocketAcceptor = buildSocketAcceptor();
// Customize Executor that will be used by processors to process incoming stanzas // Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("connectionManager"); ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("connectionManager");
int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.max.threads", 16); int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.threads", 16);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) threadModel.getExecutor(); ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) threadModel.getExecutor();
eventExecutor.setCorePoolSize(eventThreads + 1); eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1); eventExecutor.setMaximumPoolSize(eventThreads + 1);
...@@ -224,17 +229,37 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -224,17 +229,37 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Start clients plain socket unless it's been disabled. // Start clients plain socket unless it's been disabled.
if (isClientListenerEnabled()) { if (isClientListenerEnabled()) {
int port = getClientListenerPort(); int port = getClientListenerPort();
// Create SocketAcceptor with correct number of processors
socketAcceptor = buildSocketAcceptor();
// Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client");
int eventThreads = JiveGlobals.getIntProperty("xmpp.client.processing.threads", 16);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor();
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
socketAcceptor.getDefaultConfig().setThreadModel(threadModel);
// Add the XMPP codec filter
socketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
try { try {
socketThread = new SocketAcceptThread(this, new ServerPort(port, serverName, // Listen on a specific network interface if it has been set.
localIPAddress, false, null, ServerPort.Type.client)); String interfaceName = JiveGlobals.getXMLProperty("xmpp.socket.network.interface");
ports.add(socketThread.getServerPort()); InetAddress bindInterface = null;
socketThread.setDaemon(true); if (interfaceName != null) {
socketThread.setPriority(Thread.MAX_PRIORITY); if (interfaceName.trim().length() > 0) {
socketThread.start(); bindInterface = InetAddress.getByName(interfaceName);
}
}
// Start accepting connections
socketAcceptor
.bind(new InetSocketAddress(bindInterface, port), new ClientConnectionHandler(serverName));
ports.add(new ServerPort(port, serverName, localIPAddress, false, null, ServerPort.Type.client));
List<String> params = new ArrayList<String>(); List<String> params = new ArrayList<String>();
params.add(Integer.toString(socketThread.getPort())); params.add(Integer.toString(port));
Log.info(LocaleUtils.getLocalizedString("startup.plain", params)); Log.info(LocaleUtils.getLocalizedString("startup.plain", params));
} }
catch (Exception e) { catch (Exception e) {
...@@ -246,10 +271,15 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -246,10 +271,15 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
private void stopClientListeners() { private void stopClientListeners() {
if (socketThread != null) { if (socketAcceptor != null) {
socketThread.shutdown(); socketAcceptor.unbindAll();
ports.remove(socketThread.getServerPort()); for (ServerPort port : ports) {
socketThread = null; if (port.isClientPort() && !port.isSecure()) {
ports.remove(port);
break;
}
}
socketAcceptor = null;
} }
} }
...@@ -262,15 +292,49 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -262,15 +292,49 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
algorithm = "TLS"; algorithm = "TLS";
} }
try { try {
sslSocketThread = new SSLSocketAcceptThread(this, new ServerPort(port, serverName, // Create SocketAcceptor with correct number of processors
localIPAddress, true, algorithm, ServerPort.Type.client)); sslSocketAcceptor = buildSocketAcceptor();
ports.add(sslSocketThread.getServerPort()); // Customize Executor that will be used by processors to process incoming stanzas
sslSocketThread.setDaemon(true); ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client_ssl");
sslSocketThread.setPriority(Thread.MAX_PRIORITY); int eventThreads = JiveGlobals.getIntProperty("xmpp.client_ssl.processing.threads", 16);
sslSocketThread.start(); ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor();
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
sslSocketAcceptor.getDefaultConfig().setThreadModel(threadModel);
// Add the XMPP codec filter
sslSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
// Add the SSL filter now since sockets are "borned" encrypted in the old ssl method
SSLContext sslContext = SSLContext.getInstance(algorithm);
KeyManagerFactory keyFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyFactory.init(SSLConfig.getKeyStore(), SSLConfig.getKeyPassword().toCharArray());
TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustFactory.init(SSLConfig.getTrustStore());
sslContext.init(keyFactory.getKeyManagers(),
trustFactory.getTrustManagers(),
new java.security.SecureRandom());
sslSocketAcceptor.getFilterChain().addFirst("tls", new SSLFilter(sslContext));
// Listen on a specific network interface if it has been set.
String interfaceName = JiveGlobals.getXMLProperty("xmpp.socket.network.interface");
InetAddress bindInterface = null;
if (interfaceName != null) {
if (interfaceName.trim().length() > 0) {
bindInterface = InetAddress.getByName(interfaceName);
}
}
// Start accepting connections
sslSocketAcceptor
.bind(new InetSocketAddress(bindInterface, port), new ClientConnectionHandler(serverName));
ports.add(new ServerPort(port, serverName, localIPAddress, true, null, ServerPort.Type.client));
List<String> params = new ArrayList<String>(); List<String> params = new ArrayList<String>();
params.add(Integer.toString(sslSocketThread.getPort())); params.add(Integer.toString(port));
Log.info(LocaleUtils.getLocalizedString("startup.ssl", params)); Log.info(LocaleUtils.getLocalizedString("startup.ssl", params));
} }
catch (Exception e) { catch (Exception e) {
...@@ -282,10 +346,15 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -282,10 +346,15 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
private void stopClientSSLListeners() { private void stopClientSSLListeners() {
if (sslSocketThread != null) { if (sslSocketAcceptor != null) {
sslSocketThread.shutdown(); sslSocketAcceptor.unbindAll();
ports.remove(sslSocketThread.getServerPort()); for (ServerPort port : ports) {
sslSocketThread = null; if (port.isClientPort() && port.isSecure()) {
ports.remove(port);
break;
}
}
sslSocketAcceptor = null;
} }
} }
...@@ -312,12 +381,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -312,12 +381,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
public SocketReader createSocketReader(Socket sock, boolean isSecure, ServerPort serverPort, public SocketReader createSocketReader(Socket sock, boolean isSecure, ServerPort serverPort,
boolean useBlockingMode) throws IOException { boolean useBlockingMode) throws IOException {
if (serverPort.isClientPort()) { if (serverPort.isComponentPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
return new ClientSocketReader(router, routingTable, serverName, sock, conn,
useBlockingMode);
}
else if (serverPort.isComponentPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure); SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
return new ComponentSocketReader(router, routingTable, serverName, sock, conn, return new ComponentSocketReader(router, routingTable, serverName, sock, conn,
useBlockingMode); useBlockingMode);
...@@ -469,8 +533,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -469,8 +533,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
public int getClientListenerPort() { public int getClientListenerPort() {
return JiveGlobals.getIntProperty("xmpp.socket.plain.port", return JiveGlobals.getIntProperty("xmpp.socket.plain.port", DEFAULT_PORT);
SocketAcceptThread.DEFAULT_PORT);
} }
public void setClientSSLListenerPort(int port) { public void setClientSSLListenerPort(int port) {
...@@ -488,8 +551,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -488,8 +551,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
public int getClientSSLListenerPort() { public int getClientSSLListenerPort() {
return JiveGlobals.getIntProperty("xmpp.socket.ssl.port", return JiveGlobals.getIntProperty("xmpp.socket.ssl.port", DEFAULT_SSL_PORT);
SSLSocketAcceptThread.DEFAULT_PORT);
} }
public void setComponentListenerPort(int port) { public void setComponentListenerPort(int port) {
...@@ -507,8 +569,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -507,8 +569,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
public int getComponentListenerPort() { public int getComponentListenerPort() {
return JiveGlobals.getIntProperty("xmpp.component.socket.port", return JiveGlobals.getIntProperty("xmpp.component.socket.port", DEFAULT_COMPONENT_PORT);
SocketAcceptThread.DEFAULT_COMPONENT_PORT);
} }
public void setServerListenerPort(int port) { public void setServerListenerPort(int port) {
...@@ -526,8 +587,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -526,8 +587,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
public int getServerListenerPort() { public int getServerListenerPort() {
return JiveGlobals.getIntProperty("xmpp.server.socket.port", return JiveGlobals.getIntProperty("xmpp.server.socket.port", DEFAULT_SERVER_PORT);
SocketAcceptThread.DEFAULT_SERVER_PORT);
} }
public void setConnectionManagerListenerPort(int port) { public void setConnectionManagerListenerPort(int port) {
...@@ -545,8 +605,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -545,8 +605,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
public int getConnectionManagerListenerPort() { public int getConnectionManagerListenerPort() {
return JiveGlobals.getIntProperty("xmpp.multiplex.socket.port", return JiveGlobals.getIntProperty("xmpp.multiplex.socket.port", DEFAULT_MULTIPLEX_PORT);
SocketAcceptThread.DEFAULT_MULTIPLEX_PORT);
} }
// ##################################################################### // #####################################################################
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment