Commit 18fffdac authored by jackrabbit128's avatar jackrabbit128

Fix for OF-835:

- install ReadThrottleFilterBuilder into filter chains
- adjust SSLFilter positioning in chain so that ReadThrottleFilter works correctly
parent fad60d88
...@@ -372,9 +372,10 @@ public class NIOConnection implements Connection { ...@@ -372,9 +372,10 @@ public class NIOConnection implements Connection {
filter.setWantClientAuth(true); filter.setWantClientAuth(true);
} }
// TODO Temporary workaround (placing SSLFilter before ExecutorFilter) to avoid deadlock. Waiting for // TODO Temporary workaround (placing SSLFilter before ExecutorFilter) to avoid deadlock. Waiting for
// MINA devs feedback // MINA devs feedback. Note that ExecutorFilter is flanked by ReadThrottleFilterBuilder and we must not
ioSession.getFilterChain().addBefore("org.apache.mina.common.ExecutorThreadModel", "tls", filter); // get in between them.
//ioSession.getFilterChain().addAfter("org.apache.mina.common.ExecutorThreadModel", "tls", filter); ioSession.getFilterChain().addBefore("org.apache.mina.filter.ReadThrottleFilterBuilder.add", "tls", filter);
//ioSession.getFilterChain().addAfter("org.apache.mina.filter.ReadThrottleFilterBuilder.release", "tls", filter);
ioSession.setAttribute(SSLFilter.DISABLE_ENCRYPTION_ONCE, Boolean.TRUE); ioSession.setAttribute(SSLFilter.DISABLE_ENCRYPTION_ONCE, Boolean.TRUE);
if (!clientMode) { if (!clientMode) {
// Indicate the client that the server is ready to negotiate TLS // Indicate the client that the server is ready to negotiate TLS
......
...@@ -47,15 +47,8 @@ import javax.net.ssl.KeyManagerFactory; ...@@ -47,15 +47,8 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.*;
import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoServiceListener;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.common.ThreadModel;
import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.filter.executor.ExecutorFilter;
...@@ -98,6 +91,7 @@ import org.slf4j.LoggerFactory; ...@@ -98,6 +91,7 @@ import org.slf4j.LoggerFactory;
public class ConnectionManagerImpl extends BasicModule implements ConnectionManager, CertificateEventListener { public class ConnectionManagerImpl extends BasicModule implements ConnectionManager, CertificateEventListener {
private static final Logger Log = LoggerFactory.getLogger(ConnectionManagerImpl.class); private static final Logger Log = LoggerFactory.getLogger(ConnectionManagerImpl.class);
private static final int MB = 1024 * 1024;
private SocketAcceptor socketAcceptor; private SocketAcceptor socketAcceptor;
private SocketAcceptor sslSocketAcceptor; private SocketAcceptor sslSocketAcceptor;
...@@ -360,19 +354,39 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -360,19 +354,39 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Create SocketAcceptor with correct number of processors // Create SocketAcceptor with correct number of processors
socketAcceptor = buildSocketAcceptor("client"); socketAcceptor = buildSocketAcceptor("client");
// Customize Executor that will be used by processors to process incoming stanzas // Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client");
int eventThreads = JiveGlobals.getIntProperty("xmpp.client.processing.threads", 16); int eventThreads = JiveGlobals.getIntProperty("xmpp.client.processing.threads", 16);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor(); ExecutorFilter executorFilter = new ExecutorFilter();
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)executorFilter.getExecutor();
ThreadFactory threadFactory = eventExecutor.getThreadFactory();
threadFactory = new DelegatingThreadFactory("Old executor thread - ", threadFactory);
eventExecutor.setThreadFactory(threadFactory);
eventExecutor.setCorePoolSize(eventThreads + 1); eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1); eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS); eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
socketAcceptor.getDefaultConfig().setThreadModel(threadModel); socketAcceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL);
// Add the XMPP codec filter // Add the XMPP codec filter
socketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory())); socketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
socketAcceptor.getFilterChain().addFirst("threadModel", executorFilter);
// Kill sessions whose outgoing queues keep growing and fail to send traffic // Kill sessions whose outgoing queues keep growing and fail to send traffic
socketAcceptor.getFilterChain().addAfter("xmpp", "outCap", new StalledSessionsFilter()); socketAcceptor.getFilterChain().addAfter("xmpp", "outCap", new StalledSessionsFilter());
// Throttle sessions who send data too fast
int maxBufferSize = JiveGlobals.getIntProperty("xmpp.client.maxReadBufferSize", 10 * MB);
installReadThrottle(socketAcceptor, maxBufferSize);
}
}
private static void installReadThrottle(IoAcceptor socketAcceptor, int maxBufferSize) {
if (maxBufferSize <= 0) {
return;
} }
// Install filter that throttles the incoming data
Log.debug("Throttling read buffer for connections from socketAcceptor={} to max={} bytes",
socketAcceptor, maxBufferSize);
ReadThrottleFilterBuilder readThrottle = new ReadThrottleFilterBuilder();
readThrottle.setMaximumConnectionBufferSize(maxBufferSize);
readThrottle.attach(socketAcceptor.getFilterChain());
} }
private void startClientListeners(String localIPAddress) { private void startClientListeners(String localIPAddress) {
...@@ -431,20 +445,9 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -431,20 +445,9 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
int eventThreads = JiveGlobals.getIntProperty("xmpp.client_ssl.processing.threads", 16); int eventThreads = JiveGlobals.getIntProperty("xmpp.client_ssl.processing.threads", 16);
ExecutorFilter executorFilter = new ExecutorFilter(); ExecutorFilter executorFilter = new ExecutorFilter();
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)executorFilter.getExecutor(); ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)executorFilter.getExecutor();
final ThreadFactory originalThreadFactory = eventExecutor.getThreadFactory(); ThreadFactory threadFactory = eventExecutor.getThreadFactory();
ThreadFactory newThreadFactory = new ThreadFactory() threadFactory = new DelegatingThreadFactory("Old SSL executor thread - ", threadFactory);
{ eventExecutor.setThreadFactory(threadFactory);
private final AtomicInteger threadId = new AtomicInteger( 0 );
public Thread newThread( Runnable runnable )
{
Thread t = originalThreadFactory.newThread( runnable );
t.setName("Old SSL executor thread - " + threadId.incrementAndGet() );
t.setDaemon( true );
return t;
}
};
eventExecutor.setThreadFactory( newThreadFactory );
eventExecutor.setCorePoolSize(eventThreads + 1); eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1); eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS); eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
...@@ -455,6 +458,9 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -455,6 +458,9 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
sslSocketAcceptor.getFilterChain().addFirst("threadModel", executorFilter); sslSocketAcceptor.getFilterChain().addFirst("threadModel", executorFilter);
// Kill sessions whose outgoing queues keep growing and fail to send traffic // Kill sessions whose outgoing queues keep growing and fail to send traffic
sslSocketAcceptor.getFilterChain().addAfter("xmpp", "outCap", new StalledSessionsFilter()); sslSocketAcceptor.getFilterChain().addAfter("xmpp", "outCap", new StalledSessionsFilter());
// Throttle sessions who send data too fast
int maxBufferSize = JiveGlobals.getIntProperty("xmpp.client_ssl.maxReadBufferSize", 10 * MB);
installReadThrottle(sslSocketAcceptor, maxBufferSize);
// Add the SSL filter now since sockets are "borned" encrypted in the old ssl method // Add the SSL filter now since sockets are "borned" encrypted in the old ssl method
SSLContext sslContext = SSLContext.getInstance(algorithm); SSLContext sslContext = SSLContext.getInstance(algorithm);
...@@ -929,4 +935,24 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -929,4 +935,24 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
CertificateManager.removeListener(this); CertificateManager.removeListener(this);
serverName = null; serverName = null;
} }
private static class DelegatingThreadFactory implements ThreadFactory {
private final AtomicInteger threadId;
private final ThreadFactory originalThreadFactory;
private String threadNamePrefix;
public DelegatingThreadFactory(String threadNamePrefix, ThreadFactory originalThreadFactory) {
this.originalThreadFactory = originalThreadFactory;
threadId = new AtomicInteger(0);
this.threadNamePrefix = threadNamePrefix;
}
public Thread newThread(Runnable runnable)
{
Thread t = originalThreadFactory.newThread(runnable);
t.setName(threadNamePrefix + threadId.incrementAndGet());
t.setDaemon(true);
return t;
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment