Commit 92d42081 authored by Tom Evans's avatar Tom Evans

OF-857: Fix C2S locking issues

Updates the MINA 2.0.9 core to fix a locking issue documented here:
https://issues.apache.org/jira/browse/DIRMINA-995

Also updates connection manager and thread pool configurations to better
allocate resources within the JVM.
parent 92e35494
......@@ -53,7 +53,7 @@ jzlib.jar | 1.1.3 (Apache Mina 2.0.9)
libidn.jar | 1.15 | GNU Lesser General Public License version 2.1 or later (http://www.gnu.org/licenses/licenses.html)
log4j.jar | 1.2.15 | Apache 2.0 (http://logging.apache.org/log4j/1.2/license.html)
mail.jar | 1.4.1 (JavaMail) |
mina-core.jar | Apache Mina 2.0.9 | Apache 2.0
mina-core.jar | Apache Mina 2.0.10-SNAPSHOT (fixes OF-857) | Apache 2.0
mina-filter-compression.jar | Apache Mina 2.0.9 | Apache 2.0
mina-integration-beans.jar | Apache Mina 2.0.9 | Apache 2.0
mina-integration-jmx.jar | Apache Mina 2.0.9 | Apache 2.0
......
......@@ -20,41 +20,28 @@
package org.jivesoftware.openfire.container;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.security.KeyStore;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.management.remote.JMXAuthenticator;
import javax.management.remote.JMXPrincipal;
import javax.management.remote.JMXServiceURL;
import javax.security.auth.Subject;
import org.eclipse.jetty.jmx.ConnectorServer;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.spdy.server.http.HTTPSPDYServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.spdy.server.http.HTTPSPDYServerConnector;
import org.jivesoftware.openfire.JMXManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.admin.AdminManager;
import org.jivesoftware.openfire.auth.AuthFactory;
import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.util.CertificateEventListener;
import org.jivesoftware.util.CertificateManager;
......@@ -113,7 +100,7 @@ public class AdminConsolePlugin implements Plugin {
adminPort = JiveGlobals.getXMLProperty("adminConsole.port", 9090);
adminSecurePort = JiveGlobals.getXMLProperty("adminConsole.securePort", 9091);
final QueuedThreadPool tp = new QueuedThreadPool(254);
final QueuedThreadPool tp = new QueuedThreadPool(16,2);
tp.setName("Jetty-QTP-AdminConsole");
adminServer = new Server(tp);
......
......@@ -133,8 +133,8 @@ public class IQRosterHandler extends IQHandler implements ServerFeaturesProvider
return result;
}
catch (Exception e) {
if (e.getCause() instanceof IDNAException) {
Log.warn(LocaleUtils.getLocalizedString("admin.error"), e);
if (e.getCause() instanceof IDNAException || e.getCause() instanceof IllegalArgumentException) {
Log.warn(LocaleUtils.getLocalizedString("admin.error") + e.getMessage());
IQ result = IQ.createResultIQ(packet);
result.setChildElement(packet.getChildElement().createCopy());
result.setError(PacketError.Condition.jid_malformed);
......
......@@ -27,24 +27,23 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.spdy.server.http.HTTPSPDYServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.spdy.server.http.HTTPSPDYServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.webapp.WebAppContext;
import org.jivesoftware.openfire.JMXManager;
import org.jivesoftware.openfire.XMPPServer;
......@@ -78,7 +77,7 @@ public final class HttpBindManager {
public static final String HTTP_BIND_THREADS = "httpbind.client.processing.threads";
public static final int HTTP_BIND_THREADS_DEFAULT = 254;
public static final int HTTP_BIND_THREADS_DEFAULT = 16;
private static final String HTTP_BIND_FORWARDED = "httpbind.forwarded.enabled";
......@@ -513,7 +512,8 @@ public final class HttpBindManager {
* @param securePort the port to start the TLS (secure) HTTP Bind service on.
*/
private synchronized void configureHttpBindServer(int port, int securePort) {
final QueuedThreadPool tp = new QueuedThreadPool(JiveGlobals.getIntProperty(HTTP_BIND_THREADS, HTTP_BIND_THREADS_DEFAULT));
int maxThreads = JiveGlobals.getIntProperty(HTTP_BIND_THREADS, HTTP_BIND_THREADS_DEFAULT);
final QueuedThreadPool tp = new QueuedThreadPool(maxThreads, getMinThreads(maxThreads));
tp.setName("Jetty-QTP-BOSH");
httpBindServer = new Server(tp);
......@@ -544,6 +544,10 @@ public final class HttpBindManager {
collection.setHandlers(new Handler[] { contexts, new DefaultHandler() });
}
private int getMinThreads(int maxThreads) {
return (maxThreads/4)+1;
}
private void createBoshHandler(ContextHandlerCollection contexts, String boshPath)
{
ServletContextHandler context = new ServletContextHandler(contexts, boshPath, ServletContextHandler.SESSIONS);
......
......@@ -34,10 +34,8 @@ import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.eclipse.jetty.util.log.Log;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
......@@ -88,12 +86,12 @@ public class HttpSessionManager {
// BOSH installations expecting heavy loads may want to allocate additional threads
// to this worker pool to ensure timely delivery of inbound packets
int poolSize = JiveGlobals.getIntProperty("xmpp.httpbind.worker.threads",
int maxPoolSize = JiveGlobals.getIntProperty("xmpp.httpbind.worker.threads",
// use deprecated property as default (shared with ConnectionManagerImpl)
JiveGlobals.getIntProperty("xmpp.client.processing.threads", 16));
int keepAlive = JiveGlobals.getIntProperty("xmpp.httpbind.worker.timeout", 60);
sendPacketPool = new ThreadPoolExecutor(poolSize, poolSize, keepAlive, TimeUnit.SECONDS,
sendPacketPool = new ThreadPoolExecutor(getCorePoolSize(maxPoolSize), maxPoolSize, keepAlive, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), // unbounded task queue
new ThreadFactory() { // custom thread factory for BOSH workers
final AtomicInteger counter = new AtomicInteger(1);
......@@ -106,6 +104,10 @@ public class HttpSessionManager {
});
}
private int getCorePoolSize(int maxPoolSize) {
return (maxPoolSize/4)+1;
}
/**
* Starts the services used by the HttpSessionManager.
*/
......
......@@ -395,7 +395,7 @@ public class NIOConnection implements Connection {
}
public void startCompression() {
CompressionFilter ioFilter = (CompressionFilter) ioSession.getFilterChain().get("compression");
CompressionFilter ioFilter = (CompressionFilter) ioSession.getFilterChain().get(COMPRESSION_FILTER_NAME);
ioFilter.setCompressOutbound(true);
}
......@@ -429,7 +429,7 @@ public class NIOConnection implements Connection {
}
public boolean isCompressed() {
return ioSession.getFilterChain().contains("compression");
return ioSession.getFilterChain().contains(COMPRESSION_FILTER_NAME);
}
public CompressionPolicy getCompressionPolicy() {
......
......@@ -239,8 +239,13 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Create SocketAcceptor with correct number of processors
multiplexerSocketAcceptor = buildSocketAcceptor(MULTIPLEXER_SOCKET_ACCEPTOR_NAME);
// Customize Executor that will be used by processors to process incoming stanzas
int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.threads", 16);
multiplexerSocketAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, new ExecutorFilter(eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS));
int maxPoolSize = JiveGlobals.getIntProperty("xmpp.multiplex.processing.threads", 16);
ExecutorFilter executorFilter = new ExecutorFilter(getCorePoolSize(maxPoolSize), maxPoolSize, 60, TimeUnit.SECONDS);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)executorFilter.getExecutor();
ThreadFactory threadFactory = eventExecutor.getThreadFactory();
threadFactory = new DelegatingThreadFactory("Multiplexer-Thread-", threadFactory);
eventExecutor.setThreadFactory(threadFactory);
multiplexerSocketAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, executorFilter);
// Add the XMPP codec filter
multiplexerSocketAcceptor.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, XMPP_CODEC_FILTER_NAME, new ProtocolCodecFilter(new XMPPCodecFactory()));
......@@ -297,8 +302,13 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
if (isComponentListenerEnabled() && componentAcceptor == null) {
// Create SocketAcceptor with correct number of processors
componentAcceptor = buildSocketAcceptor(COMPONENT_SOCKET_ACCEPTOR_NAME);
int eventThreads = JiveGlobals.getIntProperty("xmpp.component.processing.threads", 16);
componentAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, new ExecutorFilter(eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS));
int maxPoolSize = JiveGlobals.getIntProperty("xmpp.component.processing.threads", 16);
ExecutorFilter executorFilter = new ExecutorFilter(getCorePoolSize(maxPoolSize), maxPoolSize, 60, TimeUnit.SECONDS);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)executorFilter.getExecutor();
ThreadFactory threadFactory = eventExecutor.getThreadFactory();
threadFactory = new DelegatingThreadFactory("Component-Thread-", threadFactory);
eventExecutor.setThreadFactory(threadFactory);
componentAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, executorFilter);
// Add the XMPP codec filter
componentAcceptor.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, XMPP_CODEC_FILTER_NAME, new ProtocolCodecFilter(new XMPPCodecFactory()));
}
......@@ -355,16 +365,12 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Create SocketAcceptor with correct number of processors
socketAcceptor = buildSocketAcceptor(CLIENT_SOCKET_ACCEPTOR_NAME);
// Customize Executor that will be used by processors to process incoming stanzas
int eventThreads = JiveGlobals.getIntProperty(ConnectionSettings.Client.MAX_THREADS, 16);
ExecutorFilter executorFilter = new ExecutorFilter();
int maxPoolSize = JiveGlobals.getIntProperty(ConnectionSettings.Client.MAX_THREADS, 16);
ExecutorFilter executorFilter = new ExecutorFilter(getCorePoolSize(maxPoolSize), maxPoolSize, 60, TimeUnit.SECONDS);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)executorFilter.getExecutor();
ThreadFactory threadFactory = eventExecutor.getThreadFactory();
threadFactory = new DelegatingThreadFactory("Old executor thread - ", threadFactory);
threadFactory = new DelegatingThreadFactory("C2S-Thread-", threadFactory);
eventExecutor.setThreadFactory(threadFactory);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
// Add the XMPP codec filter
socketAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, executorFilter);
......@@ -430,24 +436,12 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
String algorithm = JiveGlobals.getProperty(ConnectionSettings.Client.TLS_ALGORITHM, "TLS");
try {
// Customize Executor that will be used by processors to process incoming stanzas
int eventThreads = JiveGlobals.getIntProperty(ConnectionSettings.Client.MAX_THREADS_SSL, 16);
ExecutorFilter executorFilter = new ExecutorFilter(eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS);
int maxPoolSize = JiveGlobals.getIntProperty(ConnectionSettings.Client.MAX_THREADS_SSL, 16);
ExecutorFilter executorFilter = new ExecutorFilter(getCorePoolSize(maxPoolSize), maxPoolSize, 60, TimeUnit.SECONDS);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)executorFilter.getExecutor();
final ThreadFactory originalThreadFactory = eventExecutor.getThreadFactory();
ThreadFactory newThreadFactory = new ThreadFactory()
{
private final AtomicInteger threadId = new AtomicInteger( 0 );
public Thread newThread( Runnable runnable )
{
Thread t = originalThreadFactory.newThread( runnable );
t.setName("Old SSL executor thread - " + threadId.incrementAndGet() );
t.setDaemon( true );
return t;
}
};
eventExecutor.setThreadFactory( newThreadFactory );
ThreadFactory threadFactory = eventExecutor.getThreadFactory();
threadFactory = new DelegatingThreadFactory("LegacySSL-Thread-", threadFactory);
eventExecutor.setThreadFactory(threadFactory);
// Create SocketAcceptor with correct number of processors
sslSocketAcceptor = buildSocketAcceptor(CLIENT_SSL_SOCKET_ACCEPTOR_NAME);
......@@ -461,8 +455,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Throttle sessions who send data too fast
int maxBufferSize = JiveGlobals.getIntProperty(ConnectionSettings.Client.MAX_READ_BUFFER_SSL, 10 * MB);
sslSocketAcceptor.getSessionConfig().setMaxReadBufferSize(maxBufferSize);
Log.debug("Throttling read buffer for connections from socketAcceptor={} to max={} bytes",
socketAcceptor, maxBufferSize);
Log.debug("Throttling read buffer for connections from sslSocketAcceptor={} to max={} bytes",
sslSocketAcceptor, maxBufferSize);
// Add the SSL filter now since sockets are "borned" encrypted in the old ssl method
SSLContext sslContext = SSLContext.getInstance(algorithm);
......@@ -482,7 +476,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
else if(JiveGlobals.getProperty(ConnectionSettings.Client.AUTH_PER_CLIENTCERT_POLICY,"disabled").equals("wanted")) {
sslFilter.setWantClientAuth(true);
}
sslSocketAcceptor.getFilterChain().addFirst(TLS_FILTER_NAME, sslFilter);
sslSocketAcceptor.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, TLS_FILTER_NAME, sslFilter);
}
catch (Exception e) {
......@@ -908,7 +902,11 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
}
// #####################################################################
private int getCorePoolSize(int maxPoolSize) {
return (maxPoolSize/4)+1;
}
// #####################################################################
// Module management
// #####################################################################
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment