Commit 74da47c6 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Optimization - Use NIO for CM. JM-925

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@6557 b35dd754-fafc-0310-a699-88a17e54d16e
parent 2b81fb4f
......@@ -3,7 +3,7 @@
* $Revision: 3159 $
* $Date: 2005-12-04 22:56:40 -0300 (Sun, 04 Dec 2005) $
*
* Copyright (C) 2004 Jive Software. All rights reserved.
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
......@@ -11,15 +11,22 @@
package org.jivesoftware.wildfire.spi;
import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.jivesoftware.util.*;
import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.http.HttpBindManager;
import org.jivesoftware.wildfire.container.BasicModule;
import org.jivesoftware.wildfire.multiplex.MultiplexerPacketDeliverer;
import org.jivesoftware.wildfire.http.HttpBindManager;
import org.jivesoftware.wildfire.net.*;
import org.jivesoftware.wildfire.nio.MultiplexerConnectionHandler;
import org.jivesoftware.wildfire.nio.XMPPCodecFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.security.KeyStore;
......@@ -28,6 +35,10 @@ import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConnectionManagerImpl extends BasicModule implements ConnectionManager, CertificateEventListener {
......@@ -35,7 +46,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
private SSLSocketAcceptThread sslSocketThread;
private SocketAcceptThread componentSocketThread;
private SocketAcceptThread serverSocketThread;
private SocketAcceptThread multiplexerSocketThread;
private SocketAcceptor multiplexerSocketAcceptor;
private ArrayList<ServerPort> ports;
private SessionManager sessionManager;
......@@ -124,17 +135,36 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Start multiplexers socket unless it's been disabled.
if (isConnectionManagerListenerEnabled()) {
int port = getConnectionManagerListenerPort();
// Create SocketAcceptor with correct number of processors
multiplexerSocketAcceptor = buildSocketAcceptor();
// Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("connectionManager");
int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.max.threads", 16);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) threadModel.getExecutor();
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
multiplexerSocketAcceptor.getDefaultConfig().setThreadModel(threadModel);
// Add the XMPP codec filter
multiplexerSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
try {
multiplexerSocketThread = new SocketAcceptThread(this, new ServerPort(port,
serverName, localIPAddress, false, null,
ServerPort.Type.connectionManager));
ports.add(multiplexerSocketThread.getServerPort());
multiplexerSocketThread.setDaemon(true);
multiplexerSocketThread.setPriority(Thread.MAX_PRIORITY);
multiplexerSocketThread.start();
// Listen on a specific network interface if it has been set.
String interfaceName = JiveGlobals.getXMLProperty("xmpp.socket.network.interface");
InetAddress bindInterface = null;
if (interfaceName != null) {
if (interfaceName.trim().length() > 0) {
bindInterface = InetAddress.getByName(interfaceName);
}
}
// Start accepting connections
multiplexerSocketAcceptor.bind(new InetSocketAddress(bindInterface, port), new MultiplexerConnectionHandler(serverName));
ports.add(new ServerPort(port, serverName, localIPAddress, false, null, ServerPort.Type.connectionManager));
List<String> params = new ArrayList<String>();
params.add(Integer.toString(multiplexerSocketThread.getPort()));
params.add(Integer.toString(port));
Log.info(LocaleUtils.getLocalizedString("startup.multiplexer", params));
}
catch (Exception e) {
......@@ -146,10 +176,15 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
private void stopConnectionManagerListener() {
if (multiplexerSocketThread != null) {
multiplexerSocketThread.shutdown();
ports.remove(multiplexerSocketThread.getServerPort());
multiplexerSocketThread = null;
if (multiplexerSocketAcceptor != null) {
multiplexerSocketAcceptor.unbindAll();
for (ServerPort port : ports) {
if (port.isConnectionManagerPort()) {
ports.remove(port);
break;
}
}
multiplexerSocketAcceptor = null;
}
}
......@@ -292,15 +327,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
return new ServerSocketReader(router, routingTable, serverName, sock, conn,
useBlockingMode);
}
else {
// Use the appropriate packeet deliverer for connection managers. The packet
// deliverer will be configured with the domain of the connection manager once
// the connection manager has finished the handshake.
SocketConnection conn =
new SocketConnection(new MultiplexerPacketDeliverer(), sock, isSecure);
return new ConnectionMultiplexerSocketReader(router, routingTable, serverName, sock,
conn, useBlockingMode);
}
return null;
}
private void startHTTPBindListeners() {
......@@ -538,6 +565,41 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
restartClientSSLListeners();
}
private SocketAcceptor buildSocketAcceptor() {
SocketAcceptor socketAcceptor;
// Create SocketAcceptor with correct number of processors
int ioThreads = JiveGlobals.getIntProperty("xmpp.processor.count", Runtime.getRuntime().availableProcessors());
// Set the executor that processors will use. Note that processors will use another executor
// for processing events (i.e. incoming traffic)
Executor ioExecutor = new ThreadPoolExecutor(
ioThreads + 1, ioThreads + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
socketAcceptor = new SocketAcceptor(ioThreads, ioExecutor);
// Set that it will be possible to bind a socket if there is a connection in the timeout state
SocketAcceptorConfig socketAcceptorConfig = (SocketAcceptorConfig) socketAcceptor.getDefaultConfig();
socketAcceptorConfig.setReuseAddress(true);
// Set the listen backlog (queue) length. Default is 50.
socketAcceptorConfig.setBacklog(JiveGlobals.getIntProperty("xmpp.socket.backlog", 50));
// Set default (low level) settings for new socket connections
SocketSessionConfig socketSessionConfig = socketAcceptorConfig.getSessionConfig();
//socketSessionConfig.setKeepAlive();
int receiveBuffer = JiveGlobals.getIntProperty("xmpp.socket.buffer.receive", -1);
if (receiveBuffer > 0 ) {
socketSessionConfig.setReceiveBufferSize(receiveBuffer);
}
int sendBuffer = JiveGlobals.getIntProperty("xmpp.socket.buffer.send", -1);
if (sendBuffer > 0 ) {
socketSessionConfig.setSendBufferSize(sendBuffer);
}
int linger = JiveGlobals.getIntProperty("xmpp.socket.linger", -1);
if (linger > 0 ) {
socketSessionConfig.setSoLinger(linger);
}
socketSessionConfig.setTcpNoDelay(
JiveGlobals.getBooleanProperty("xmpp.socket.tcp-nodelay", socketSessionConfig.isTcpNoDelay()));
return socketAcceptor;
}
// #####################################################################
// Module management
// #####################################################################
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment