Commit 4a4cb5d8 authored by André Berenguel's avatar André Berenguel

OF-421 - Update MINA library to version 2.0.7

parent e4c2ac5a
......@@ -43,9 +43,13 @@ jzlib.jar | 1.0.7
libidn.jar | 1.15 | GNU Lesser General Public License version 2.1 or later (http://www.gnu.org/licenses/licenses.html)
log4j.jar | 1.2.15 | Apache 2.0 (http://logging.apache.org/log4j/1.2/license.html)
mail.jar | 1.4.1 (JavaMail) |
mina-core.jar | 1.1.8 (https://svn.apache.org/repos/asf/mina/branches/1.1) | Apache 2.0
mina-filter-compression.jar | 1.1.8 (https://svn.apache.org/repos/asf/mina/branches/1.1) | Apache 2.0
mina-filter-ssl.jar | 1.1.8-SNAPSHOT (see note #2) | Apache 2.0
mina-core.jar | Apache Mina 2.0.7 | Apache 2.0
mina-filter-compression.jar | Apache Mina 2.0.7 | Apache 2.0
mina-integration-beans.jar | Apache Mina 2.0.7 | Apache 2.0
mina-integration-jmx.jar | Apache Mina 2.0.7 | Apache 2.0
mina-integration-ognl.jar | Apache Mina 2.0.7 | Apache 2.0
javassist.jar | 3.11.0.GA (Apache Mina 2.0.7) | Apache 2.0
ognl.jar | 3.0.5 (Apache Mina 2.0.7) | Apache 2.0
mysql.jar | 5.1.30 | GPL
objenesis | 1.0 (JMock 2.1.0) | BSD (http://www.jmock.org/license.html)
pack200task.jar | August 5, 2004 | LGPL
......@@ -67,4 +71,3 @@ xpp3.jar | XPP_3 1.1.4c
Notes
-----
1) proxool - patched ProxyConnection to send log message in #registerClosedStatement to debug instead of warn
2) mina-filter-ssl - applied patch to resize buffers OF-496 DIRMINA-914. Source code used: https://git-wip-us.apache.org/repos/asf?p=mina.git;a=commit;h=dd6395befe672d7bdb210b28b1b81592a3dc5e64
package org.apache.mina.management;
import org.apache.mina.common.*;
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.EXECUTOR_FILTER_NAME;
import org.apache.mina.core.service.IoService;
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
/**
......@@ -59,15 +65,6 @@ public class MINAStatCollector {
private final IoServiceListener serviceListener = new IoServiceListener()
{
public void serviceActivated( IoService service, SocketAddress serviceAddress, IoHandler handler,
IoServiceConfig config )
{
}
public void serviceDeactivated( IoService service, SocketAddress serviceAddress, IoHandler handler,
IoServiceConfig config )
{
}
public void sessionCreated( IoSession session )
{
......@@ -78,6 +75,18 @@ public class MINAStatCollector {
{
removeSession( session );
}
@Override
public void serviceActivated(IoService service) throws Exception {
}
@Override
public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
}
@Override
public void serviceDeactivated(IoService service) throws Exception {
}
};
/**
......@@ -115,14 +124,12 @@ public class MINAStatCollector {
polledSessions = new ConcurrentLinkedQueue<IoSession>();
Set<SocketAddress> addresses = service.getManagedServiceAddresses();
if (addresses != null) {
for (SocketAddress element : addresses) {
for (IoSession ioSession : service.getManagedSessions(element)) {
Map<Long, IoSession> sessions = service.getManagedSessions();
if (sessions != null) {
for (IoSession ioSession : sessions.values()) {
addSession(ioSession);
}
}
}
// listen for new ones
service.addListener( serviceListener );
......@@ -307,12 +314,15 @@ public class MINAStatCollector {
tmpMsgRead += (readMessages - sessStat.lastMessageRead);
tmpBytesWritten += (writtenBytes - sessStat.lastByteWrite);
tmpBytesRead += (readBytes - sessStat.lastByteRead);
tmpScheduledWrites += session.getScheduledWriteRequests();
tmpScheduledWrites += session.getScheduledWriteMessages();
ExecutorFilter executorFilter =
(ExecutorFilter) session.getFilterChain().get(ExecutorThreadModel.class.getName());
(ExecutorFilter) session.getFilterChain().get(EXECUTOR_FILTER_NAME);
if (executorFilter != null) {
tmpQueuevedEvents += executorFilter.getEventQueueSize(session);
Executor executor = executorFilter.getExecutor();
if (executor instanceof OrderedThreadPoolExecutor) {
tmpQueuevedEvents += ((OrderedThreadPoolExecutor) executor).getActiveCount();
}
}
sessStat.lastByteRead = readBytes;
......@@ -331,4 +341,85 @@ public class MINAStatCollector {
}
}
}
public class IoSessionStat {
long lastByteRead = -1;
long lastByteWrite = -1;
long lastMessageRead = -1;
long lastMessageWrite = -1;
float byteWrittenThroughput = 0;
float byteReadThroughput = 0;
float messageWrittenThroughput = 0;
float messageReadThroughput = 0;
// last time the session was polled
long lastPollingTime = System.currentTimeMillis();
/**
* Bytes read per second
* @return bytes per second
*/
public float getByteReadThroughput() {
return byteReadThroughput;
}
/**
* Bytes written per second
* @return bytes per second
*/
public float getByteWrittenThroughput() {
return byteWrittenThroughput;
}
/**
* Messages read per second
* @return messages per second
*/
public float getMessageReadThroughput() {
return messageReadThroughput;
}
/**
* Messages written per second
* @return messages per second
*/
public float getMessageWrittenThroughput() {
return messageWrittenThroughput;
}
/**
* used for the StatCollector, last polling value
*/
long getLastByteRead() {
return lastByteRead;
}
/**
* used for the StatCollector, last polling value
*/
long getLastByteWrite() {
return lastByteWrite;
}
/**
* used for the StatCollector, last polling value
*/
long getLastMessageRead() {
return lastMessageRead;
}
/**
* used for the StatCollector, last polling value
*/
long getLastMessageWrite() {
return lastMessageWrite;
}
}
}
......@@ -23,8 +23,9 @@ package org.jivesoftware.openfire.net;
import java.io.IOException;
import java.util.Date;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.WriteRequest;
import org.jivesoftware.util.JiveGlobals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,15 +48,15 @@ public class StalledSessionsFilter extends IoFilterAdapter {
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)
throws Exception {
// Get number of pending requests
int pendingBytes = session.getScheduledWriteBytes();
long pendingBytes = session.getScheduledWriteBytes();
if (pendingBytes > bytesCap) {
// Get last time we were able to send something to the connected client
long writeTime = session.getLastWriteTime();
int pendingRequests = session.getScheduledWriteRequests();
int pendingRequests = session.getScheduledWriteMessages();
Log.debug("About to kill session with pendingBytes: " + pendingBytes + " pendingWrites: " +
pendingRequests + " lastWrite: " + new Date(writeTime) + "session: " + session);
// Close the session and throw an exception
session.close();
session.close(false);
throw new IOException("Closing session that seems to be stalled. Preventing OOM");
}
// Call next filter (everything is fine)
......
......@@ -19,7 +19,7 @@
package org.jivesoftware.openfire.nio;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.core.buffer.IoBuffer;
import java.io.IOException;
import java.io.Writer;
......@@ -32,17 +32,17 @@ import java.nio.charset.CharsetEncoder;
*/
public class ByteBufferWriter extends Writer {
private CharsetEncoder encoder;
private ByteBuffer byteBuffer;
private IoBuffer ioBuffer;
public ByteBufferWriter(ByteBuffer byteBuffer, CharsetEncoder encoder) {
public ByteBufferWriter(IoBuffer byteBuffer, CharsetEncoder encoder) {
this.encoder = encoder;
this.byteBuffer = byteBuffer;
this.ioBuffer = byteBuffer;
}
@Override
public void write(char cbuf[], int off, int len) throws IOException {
byteBuffer.putString(new String(cbuf, off, len), encoder);
ioBuffer.putString(new String(cbuf, off, len), encoder);
}
@Override
......
......@@ -19,8 +19,8 @@
package org.jivesoftware.openfire.nio;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.XMPPServer;
......
......@@ -19,7 +19,7 @@
package org.jivesoftware.openfire.nio;
import org.apache.mina.common.IoSession;
import org.apache.mina.core.session.IoSession;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.net.ComponentStanzaHandler;
import org.jivesoftware.openfire.net.StanzaHandler;
......
......@@ -23,9 +23,9 @@ import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderException;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.openfire.Connection;
......@@ -92,7 +92,7 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
// removing connections without warning.
final int idleTime = getMaxIdleTime() / 2;
if (idleTime > 0) {
session.setIdleTime(IdleStatus.READER_IDLE, idleTime);
session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime);
}
}
......@@ -151,7 +151,7 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
final Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.deliverRawText(error.toXML());
session.close();
session.close(true);
}
else {
Log.error("ConnectionHandler reports unexpected exception for session: " + session, cause);
......
......@@ -19,7 +19,7 @@
package org.jivesoftware.openfire.nio;
import org.apache.mina.common.IoSession;
import org.apache.mina.core.session.IoSession;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.multiplex.MultiplexerPacketDeliverer;
......
......@@ -19,6 +19,10 @@
package org.jivesoftware.openfire.nio;
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.EXECUTOR_FILTER_NAME;
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.TLS_FILTER_NAME;
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.COMPRESSION_FILTER_NAME;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
......@@ -34,11 +38,11 @@ import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.CompressionFilter;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.compression.CompressionFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.dom4j.io.OutputFormat;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener;
......@@ -155,7 +159,7 @@ public class NIOConnection implements Connection {
}
public Certificate[] getLocalCertificates() {
SSLSession sslSession = (SSLSession) ioSession.getAttribute(SSLFilter.SSL_SESSION);
SSLSession sslSession = (SSLSession) ioSession.getAttribute(SslFilter.SSL_SESSION);
if (sslSession != null) {
return sslSession.getLocalCertificates();
}
......@@ -164,7 +168,7 @@ public class NIOConnection implements Connection {
public Certificate[] getPeerCertificates() {
try {
SSLSession sslSession = (SSLSession) ioSession.getAttribute(SSLFilter.SSL_SESSION);
SSLSession sslSession = (SSLSession) ioSession.getAttribute(SslFilter.SSL_SESSION);
if (sslSession != null) {
return sslSession.getPeerCertificates();
}
......@@ -198,7 +202,7 @@ public class NIOConnection implements Connection {
if (session != null) {
session.setStatus(Session.STATUS_CLOSED);
}
ioSession.close();
ioSession.close(false);
closed = true;
closedSuccessfully = true;
}
......@@ -240,7 +244,7 @@ public class NIOConnection implements Connection {
}
public boolean isSecure() {
return ioSession.getFilterChain().contains("tls");
return ioSession.getFilterChain().contains(TLS_FILTER_NAME);
}
public void deliver(Packet packet) throws UnauthorizedException {
......@@ -248,7 +252,7 @@ public class NIOConnection implements Connection {
backupDeliverer.deliver(packet);
}
else {
ByteBuffer buffer = ByteBuffer.allocate(4096);
IoBuffer buffer = IoBuffer.allocate(4096);
buffer.setAutoExpand(true);
boolean errorDelivering = false;
......@@ -290,7 +294,7 @@ public class NIOConnection implements Connection {
private void deliverRawText(String text, boolean asynchronous) {
if (!isClosed()) {
ByteBuffer buffer = ByteBuffer.allocate(text.length());
IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true);
boolean errorDelivering = false;
......@@ -312,7 +316,7 @@ public class NIOConnection implements Connection {
else {
// Send stanza and wait for ACK (using a 2 seconds default timeout)
boolean ok =
ioSession.write(buffer).join(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
if (!ok) {
Log.warn("No ACK was received when sending stanza to: " + this.toString());
}
......@@ -360,7 +364,7 @@ public class NIOConnection implements Connection {
tlsContext.init(km, tm, null);
SSLFilter filter = new SSLFilter(tlsContext);
SslFilter filter = new SslFilter(tlsContext);
filter.setUseClientMode(clientMode);
if (authentication == ClientAuth.needed) {
filter.setNeedClientAuth(true);
......@@ -371,11 +375,8 @@ public class NIOConnection implements Connection {
// good
filter.setWantClientAuth(true);
}
// TODO Temporary workaround (placing SSLFilter before ExecutorFilter) to avoid deadlock. Waiting for
// MINA devs feedback
ioSession.getFilterChain().addBefore("org.apache.mina.common.ExecutorThreadModel", "tls", filter);
//ioSession.getFilterChain().addAfter("org.apache.mina.common.ExecutorThreadModel", "tls", filter);
ioSession.setAttribute(SSLFilter.DISABLE_ENCRYPTION_ONCE, Boolean.TRUE);
ioSession.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, TLS_FILTER_NAME, filter);
ioSession.setAttribute(SslFilter.DISABLE_ENCRYPTION_ONCE, Boolean.TRUE);
if (!clientMode) {
// Indicate the client that the server is ready to negotiate TLS
deliverRawText("<proceed xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"/>");
......@@ -384,11 +385,11 @@ public class NIOConnection implements Connection {
public void addCompression() {
IoFilterChain chain = ioSession.getFilterChain();
String baseFilter = "org.apache.mina.common.ExecutorThreadModel";
if (chain.contains("tls")) {
baseFilter = "tls";
String baseFilter = EXECUTOR_FILTER_NAME;
if (chain.contains(TLS_FILTER_NAME)) {
baseFilter = TLS_FILTER_NAME;
}
chain.addAfter(baseFilter, "compression", new CompressionFilter(true, false, CompressionFilter.COMPRESSION_MAX));
chain.addAfter(baseFilter, COMPRESSION_FILTER_NAME, new CompressionFilter(true, false, CompressionFilter.COMPRESSION_MAX));
}
public void startCompression() {
......
......@@ -29,8 +29,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.filter.codec.ProtocolDecoderException;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.PropertyEventDispatcher;
......@@ -179,7 +178,7 @@ class XMLLightweightParser {
/*
* Main reading method
*/
public void read(ByteBuffer byteBuffer) throws Exception {
public void read(IoBuffer byteBuffer) throws Exception {
if (buffer == null) {
// exception was thrown before, avoid duplicate exception(s)
// "read" and discard remaining data
......
......@@ -19,6 +19,7 @@
package org.jivesoftware.openfire.nio;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
......@@ -38,11 +39,11 @@ public class XMPPCodecFactory implements ProtocolCodecFactory {
decoder = new XMPPDecoder();
}
public ProtocolEncoder getEncoder() throws Exception {
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return encoder;
}
public ProtocolDecoder getDecoder() throws Exception {
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return decoder;
}
}
......@@ -19,8 +19,8 @@
package org.jivesoftware.openfire.nio;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
......@@ -33,7 +33,7 @@ import org.apache.mina.filter.codec.ProtocolDecoderOutput;
public class XMPPDecoder extends CumulativeProtocolDecoder {
@Override
protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// Get the XML light parser from the IoSession
XMLLightweightParser parser = (XMLLightweightParser) session.getAttribute(ConnectionHandler.XML_PARSER);
......
......@@ -19,7 +19,7 @@
package org.jivesoftware.openfire.nio;
import org.apache.mina.common.IoSession;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
......
......@@ -25,7 +25,6 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.security.KeyStore;
import java.security.KeyStoreException;
......@@ -33,8 +32,6 @@ import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
......@@ -47,23 +44,19 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoServiceListener;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.common.ThreadModel;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoService;
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.buffer.SimpleBufferAllocator;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.integration.jmx.IoServiceManager;
import org.apache.mina.integration.jmx.IoSessionManager;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.integration.jmx.IoServiceMBean;
import org.apache.mina.integration.jmx.IoSessionMBean;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jivesoftware.openfire.ConnectionManager;
import org.jivesoftware.openfire.JMXManager;
import org.jivesoftware.openfire.PacketDeliverer;
......@@ -96,13 +89,24 @@ import org.slf4j.LoggerFactory;
public class ConnectionManagerImpl extends BasicModule implements ConnectionManager, CertificateEventListener {
public static final String EXECUTOR_FILTER_NAME = "threadPool";
public static final String TLS_FILTER_NAME = "tls";
public static final String COMPRESSION_FILTER_NAME = "compression";
public static final String XMPP_CODEC_FILTER_NAME = "xmpp";
public static final String CAPACITY_FILTER_NAME = "outCap";
private static final String CLIENT_SOCKET_ACCEPTOR_NAME = "client";
private static final String CLIENT_SSL_SOCKET_ACCEPTOR_NAME = "client_ssl";
private static final String COMPONENT_SOCKET_ACCEPTOR_NAME = "component";
private static final String MULTIPLEXER_SOCKET_ACCEPTOR_NAME = "multiplexer";
private static final Logger Log = LoggerFactory.getLogger(ConnectionManagerImpl.class);
private SocketAcceptor socketAcceptor;
private SocketAcceptor sslSocketAcceptor;
private SocketAcceptor componentAcceptor;
private NioSocketAcceptor socketAcceptor;
private NioSocketAcceptor sslSocketAcceptor;
private NioSocketAcceptor componentAcceptor;
private SocketAcceptThread serverSocketThread;
private SocketAcceptor multiplexerSocketAcceptor;
private NioSocketAcceptor multiplexerSocketAcceptor;
private ArrayList<ServerPort> ports;
private SessionManager sessionManager;
......@@ -230,17 +234,12 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Start multiplexers socket unless it's been disabled.
if (isConnectionManagerListenerEnabled()) {
// Create SocketAcceptor with correct number of processors
multiplexerSocketAcceptor = buildSocketAcceptor("multiplexer");
multiplexerSocketAcceptor = buildSocketAcceptor(MULTIPLEXER_SOCKET_ACCEPTOR_NAME);
// Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("connectionManager");
int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.threads", 16);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) threadModel.getExecutor();
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
multiplexerSocketAcceptor.getDefaultConfig().setThreadModel(threadModel);
multiplexerSocketAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, new ExecutorFilter(eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS));
// Add the XMPP codec filter
multiplexerSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
multiplexerSocketAcceptor.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, XMPP_CODEC_FILTER_NAME, new ProtocolCodecFilter(new XMPPCodecFactory()));
}
}
......@@ -260,7 +259,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
}
// Start accepting connections
multiplexerSocketAcceptor.bind(new InetSocketAddress(bindInterface, port), new MultiplexerConnectionHandler(serverName));
multiplexerSocketAcceptor.setHandler(new MultiplexerConnectionHandler(serverName));
multiplexerSocketAcceptor.bind(new InetSocketAddress(bindInterface, port));
ports.add(new ServerPort(port, serverName, localIPAddress, false, null, ServerPort.Type.connectionManager));
......@@ -278,7 +278,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
private void stopConnectionManagerListener() {
if (multiplexerSocketAcceptor != null) {
multiplexerSocketAcceptor.unbindAll();
multiplexerSocketAcceptor.unbind();
for (ServerPort port : ports) {
if (port.isConnectionManagerPort()) {
ports.remove(port);
......@@ -293,25 +293,18 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Start components socket unless it's been disabled.
if (isComponentListenerEnabled() && componentAcceptor == null) {
// Create SocketAcceptor with correct number of processors
componentAcceptor = buildSocketAcceptor("component");
// Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("component");
componentAcceptor = buildSocketAcceptor(COMPONENT_SOCKET_ACCEPTOR_NAME);
int eventThreads = JiveGlobals.getIntProperty("xmpp.component.processing.threads", 16);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor();
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
componentAcceptor.getDefaultConfig().setThreadModel(threadModel);
componentAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, new ExecutorFilter(eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS));
// Add the XMPP codec filter
componentAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
componentAcceptor.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, XMPP_CODEC_FILTER_NAME, new ProtocolCodecFilter(new XMPPCodecFactory()));
}
}
private void startComponentListener() {
// Start components socket unless it's been disabled.
if (isComponentListenerEnabled() && componentAcceptor != null &&
componentAcceptor.getManagedServiceAddresses().isEmpty()) {
componentAcceptor.getManagedSessionCount() == 0) {
int port = getComponentListenerPort();
try {
// Listen on a specific network interface if it has been set.
......@@ -323,8 +316,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
}
// Start accepting connections
componentAcceptor
.bind(new InetSocketAddress(bindInterface, port), new ComponentConnectionHandler(serverName));
componentAcceptor.setHandler(new ComponentConnectionHandler(serverName));
componentAcceptor.bind(new InetSocketAddress(bindInterface, port));
ports.add(new ServerPort(port, serverName, localIPAddress, false, null, ServerPort.Type.component));
......@@ -342,7 +335,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
private void stopComponentListener() {
if (componentAcceptor != null) {
componentAcceptor.unbindAll();
componentAcceptor.unbind();
for (ServerPort port : ports) {
if (port.isComponentPort()) {
ports.remove(port);
......@@ -357,20 +350,14 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Start clients plain socket unless it's been disabled.
if (isClientListenerEnabled()) {
// Create SocketAcceptor with correct number of processors
socketAcceptor = buildSocketAcceptor("client");
socketAcceptor = buildSocketAcceptor(CLIENT_SOCKET_ACCEPTOR_NAME);
// Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("client");
int eventThreads = JiveGlobals.getIntProperty("xmpp.client.processing.threads", 16);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)threadModel.getExecutor();
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
socketAcceptor.getDefaultConfig().setThreadModel(threadModel);
socketAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, new ExecutorFilter(eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS));
// Add the XMPP codec filter
socketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
socketAcceptor.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, XMPP_CODEC_FILTER_NAME, new ProtocolCodecFilter(new XMPPCodecFactory()));
// Kill sessions whose outgoing queues keep growing and fail to send traffic
socketAcceptor.getFilterChain().addAfter("xmpp", "outCap", new StalledSessionsFilter());
socketAcceptor.getFilterChain().addAfter(XMPP_CODEC_FILTER_NAME, CAPACITY_FILTER_NAME, new StalledSessionsFilter());
}
}
......@@ -388,8 +375,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
}
// Start accepting connections
socketAcceptor
.bind(new InetSocketAddress(bindInterface, port), new ClientConnectionHandler(serverName));
socketAcceptor.setHandler(new ClientConnectionHandler(serverName));
socketAcceptor.bind(new InetSocketAddress(bindInterface, port));
ports.add(new ServerPort(port, serverName, localIPAddress, false, null, ServerPort.Type.client));
......@@ -407,7 +394,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
private void stopClientListeners() {
if (socketAcceptor != null) {
socketAcceptor.unbindAll();
socketAcceptor.unbind();
for (ServerPort port : ports) {
if (port.isClientPort() && !port.isSecure()) {
ports.remove(port);
......@@ -424,11 +411,9 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
int port = getClientSSLListenerPort();
String algorithm = JiveGlobals.getProperty("xmpp.socket.ssl.algorithm", "TLS");
try {
// Create SocketAcceptor with correct number of processors
sslSocketAcceptor = buildSocketAcceptor("client_ssl");
// Customize Executor that will be used by processors to process incoming stanzas
int eventThreads = JiveGlobals.getIntProperty("xmpp.client_ssl.processing.threads", 16);
ExecutorFilter executorFilter = new ExecutorFilter();
ExecutorFilter executorFilter = new ExecutorFilter(eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor)executorFilter.getExecutor();
final ThreadFactory originalThreadFactory = eventExecutor.getThreadFactory();
ThreadFactory newThreadFactory = new ThreadFactory()
......@@ -444,16 +429,14 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
};
eventExecutor.setThreadFactory( newThreadFactory );
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
sslSocketAcceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL);
// Create SocketAcceptor with correct number of processors
sslSocketAcceptor = buildSocketAcceptor(CLIENT_SSL_SOCKET_ACCEPTOR_NAME);
sslSocketAcceptor.getFilterChain().addFirst(EXECUTOR_FILTER_NAME, executorFilter);
// Add the XMPP codec filter
sslSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
sslSocketAcceptor.getFilterChain().addFirst("threadModel", executorFilter);
sslSocketAcceptor.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, XMPP_CODEC_FILTER_NAME, new ProtocolCodecFilter(new XMPPCodecFactory()));
// Kill sessions whose outgoing queues keep growing and fail to send traffic
sslSocketAcceptor.getFilterChain().addAfter("xmpp", "outCap", new StalledSessionsFilter());
sslSocketAcceptor.getFilterChain().addAfter(XMPP_CODEC_FILTER_NAME, CAPACITY_FILTER_NAME, new StalledSessionsFilter());
// Add the SSL filter now since sockets are "borned" encrypted in the old ssl method
SSLContext sslContext = SSLContext.getInstance(algorithm);
......@@ -466,14 +449,14 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
trustFactory.getTrustManagers(),
new java.security.SecureRandom());
SSLFilter sslFilter = new SSLFilter(sslContext);
SslFilter sslFilter = new SslFilter(sslContext);
if (JiveGlobals.getProperty("xmpp.client.cert.policy","disabled").equals("needed")) {
sslFilter.setNeedClientAuth(true);
}
else if(JiveGlobals.getProperty("xmpp.client.cert.policy","disabled").equals("wanted")) {
sslFilter.setWantClientAuth(true);
}
sslSocketAcceptor.getFilterChain().addFirst("tls", sslFilter);
sslSocketAcceptor.getFilterChain().addFirst(TLS_FILTER_NAME, sslFilter);
}
catch (Exception e) {
......@@ -498,8 +481,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
}
// Start accepting connections
sslSocketAcceptor
.bind(new InetSocketAddress(bindInterface, port), new ClientConnectionHandler(serverName));
sslSocketAcceptor.setHandler(new ClientConnectionHandler(serverName));
sslSocketAcceptor.bind(new InetSocketAddress(bindInterface, port));
ports.add(new ServerPort(port, serverName, localIPAddress, true, null, ServerPort.Type.client));
......@@ -517,7 +500,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
private void stopClientSSLListeners() {
if (sslSocketAcceptor != null) {
sslSocketAcceptor.unbindAll();
sslSocketAcceptor.unbind();
for (ServerPort port : ports) {
if (port.isClientPort() && port.isSecure()) {
ports.remove(port);
......@@ -575,8 +558,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Check if we need to configure MINA to use Direct or Heap Buffers
// Note: It has been reported that heap buffers are 50% faster than direct buffers
if (JiveGlobals.getBooleanProperty("xmpp.socket.heapBuffer", true)) {
ByteBuffer.setUseDirectBuffers(false);
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
IoBuffer.setUseDirectBuffer(false);
IoBuffer.setAllocator(new SimpleBufferAllocator());
}
}
......@@ -711,7 +694,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
}
public SocketAcceptor getSocketAcceptor() {
public NioSocketAcceptor getSocketAcceptor() {
return socketAcceptor;
}
......@@ -719,7 +702,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
return JiveGlobals.getIntProperty("xmpp.socket.plain.port", DEFAULT_PORT);
}
public SocketAcceptor getSSLSocketAcceptor() {
public NioSocketAcceptor getSSLSocketAcceptor() {
return sslSocketAcceptor;
}
......@@ -757,7 +740,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
}
}
public SocketAcceptor getComponentAcceptor() {
public NioSocketAcceptor getComponentAcceptor() {
return componentAcceptor;
}
......@@ -784,7 +767,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
return JiveGlobals.getIntProperty("xmpp.server.socket.port", DEFAULT_SERVER_PORT);
}
public SocketAcceptor getMultiplexerSocketAcceptor() {
public NioSocketAcceptor getMultiplexerSocketAcceptor() {
return multiplexerSocketAcceptor;
}
......@@ -823,23 +806,18 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
restartClientSSLListeners();
}
private SocketAcceptor buildSocketAcceptor(String name) {
SocketAcceptor socketAcceptor;
private NioSocketAcceptor buildSocketAcceptor(String name) {
NioSocketAcceptor socketAcceptor;
// Create SocketAcceptor with correct number of processors
int ioThreads = JiveGlobals.getIntProperty("xmpp.processor.count", Runtime.getRuntime().availableProcessors());
// Set the executor that processors will use. Note that processors will use another executor
// for processing events (i.e. incoming traffic)
Executor ioExecutor = new ThreadPoolExecutor(
ioThreads + 1, ioThreads + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
socketAcceptor = new SocketAcceptor(ioThreads, ioExecutor);
int processorCount = JiveGlobals.getIntProperty("xmpp.processor.count", Runtime.getRuntime().availableProcessors());
socketAcceptor = new NioSocketAcceptor(processorCount);
// Set that it will be possible to bind a socket if there is a connection in the timeout state
SocketAcceptorConfig socketAcceptorConfig = socketAcceptor.getDefaultConfig();
socketAcceptorConfig.setReuseAddress(true);
socketAcceptor.setReuseAddress(true);
// Set the listen backlog (queue) length. Default is 50.
socketAcceptorConfig.setBacklog(JiveGlobals.getIntProperty("xmpp.socket.backlog", 50));
socketAcceptor.setBacklog(JiveGlobals.getIntProperty("xmpp.socket.backlog", 50));
// Set default (low level) settings for new socket connections
SocketSessionConfig socketSessionConfig = socketAcceptorConfig.getSessionConfig();
SocketSessionConfig socketSessionConfig = socketAcceptor.getSessionConfig();
//socketSessionConfig.setKeepAlive();
int receiveBuffer = JiveGlobals.getIntProperty("xmpp.socket.buffer.receive", -1);
if (receiveBuffer > 0 ) {
......@@ -861,15 +839,15 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
return socketAcceptor;
}
private void configureJMX(SocketAcceptor acceptor, String suffix) {
final String prefix = IoServiceManager.class.getPackage().getName();
private void configureJMX(NioSocketAcceptor acceptor, String suffix) {
final String prefix = IoServiceMBean.class.getPackage().getName();
// monitor the IoService
try {
IoServiceManager mbean = new IoServiceManager(acceptor);
IoServiceMBean mbean = new IoServiceMBean(acceptor);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName(prefix + ":type=SocketAcceptor,name=" + suffix);
mbs.registerMBean( mbean, name );
mbean.startCollectingStats(JiveGlobals.getIntProperty("xmpp.socket.jmx.interval", 60000));
// mbean.startCollectingStats(JiveGlobals.getIntProperty("xmpp.socket.jmx.interval", 60000));
} catch (JMException ex) {
Log.warn("Failed to register MINA acceptor mbean (JMX): " + ex);
}
......@@ -878,7 +856,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
acceptor.addListener(new IoServiceListener() {
public void sessionCreated(IoSession session) {
try {
IoSessionManager mbean = new IoSessionManager(session);
IoSessionMBean mbean = new IoSessionMBean(session);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName(prefix + ":type=IoSession,name=" +
session.getRemoteAddress().toString().replace(':', '/'));
......@@ -896,8 +874,9 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
Log.warn("Failed to unregister MINA session mbean (JMX): " + ex);
}
}
public void serviceActivated(IoService is, SocketAddress sa, IoHandler ih, IoServiceConfig isc) { }
public void serviceDeactivated(IoService is, SocketAddress sa, IoHandler ih, IoServiceConfig isc) { }
public void serviceActivated(IoService service) throws Exception { }
public void serviceDeactivated(IoService service) throws Exception { }
public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception { }
});
}
}
......
......@@ -20,16 +20,19 @@
package org.jivesoftware.openfire.plugin;
import org.apache.mina.common.ExecutorThreadModel;
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.EXECUTOR_FILTER_NAME;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.management.MINAStatCollector;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.spi.ConnectionManagerImpl;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.File;
......@@ -47,6 +50,9 @@ import java.util.concurrent.ThreadPoolExecutor;
* @author Gaston Dombiak
*/
public class StatCollector extends TimerTask {
private static final Logger Log = LoggerFactory.getLogger(StatCollector.class);
private boolean headerPrinter = false;
private List<String> content = new ArrayList<String>();
private SocketAcceptor socketAcceptor;
......@@ -80,13 +86,17 @@ public class StatCollector extends TimerTask {
sb.append(DbConnectionManager.getConnectionProvider().toString());
sb.append(',');
// Add info about the thread pool that process incoming requests
ExecutorThreadModel threadModel = (ExecutorThreadModel) socketAcceptor.getDefaultConfig().getThreadModel();
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadModel.getExecutor();
ExecutorFilter executorFilter = (ExecutorFilter) socketAcceptor.getFilterChain().get(EXECUTOR_FILTER_NAME);
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorFilter.getExecutor();
sb.append(executor.getCorePoolSize());
sb.append(',');
sb.append(executor.getActiveCount());
sb.append(',');
try {
sb.append(executor.getQueue().size());
} catch (UnsupportedOperationException e) {
sb.append(-1);
}
sb.append(',');
sb.append(executor.getCompletedTaskCount());
// Add info about number of connected sessions
......
......@@ -23,7 +23,7 @@
%>
<%@ page import="com.sun.syndication.fetcher.impl.FeedFetcherCache"%>
<%@ page import="com.sun.syndication.fetcher.impl.HashMapFeedInfoCache"%>
<%@ page import="org.apache.mina.transport.socket.nio.SocketAcceptor"%>
<%@ page import="org.apache.mina.transport.socket.nio.NioSocketAcceptor"%>
<%@ page import="org.jivesoftware.admin.AdminConsole"%>
<%@ page import="org.jivesoftware.openfire.*" %>
<%@ page import="org.jivesoftware.openfire.container.AdminConsolePlugin" %>
......@@ -75,9 +75,9 @@
String interfaceName = JiveGlobals.getXMLProperty("network.interface");
ConnectionManagerImpl connectionManager = ((ConnectionManagerImpl) XMPPServer.getInstance().getConnectionManager());
SocketAcceptor socketAcceptor = connectionManager.getSocketAcceptor();
SocketAcceptor sslSocketAcceptor = connectionManager.getSSLSocketAcceptor();
SocketAcceptor multiplexerSocketAcceptor = connectionManager.getMultiplexerSocketAcceptor();
NioSocketAcceptor socketAcceptor = connectionManager.getSocketAcceptor();
NioSocketAcceptor sslSocketAcceptor = connectionManager.getSSLSocketAcceptor();
NioSocketAcceptor multiplexerSocketAcceptor = connectionManager.getMultiplexerSocketAcceptor();
ServerPort serverPort = null;
ServerPort componentPort = null;
AdminConsolePlugin adminConsolePlugin =
......@@ -445,7 +445,7 @@
</thead>
<tbody>
<% if (socketAcceptor != null) {
for (SocketAddress socketAddress : socketAcceptor.getManagedServiceAddresses()) {
for (SocketAddress socketAddress : socketAcceptor.getLocalAddresses()) {
InetSocketAddress address = (InetSocketAddress) socketAddress;
%>
<tr>
......@@ -469,7 +469,7 @@
</tr>
<% } } %>
<% if (sslSocketAcceptor != null) {
for (SocketAddress socketAddress : sslSocketAcceptor.getManagedServiceAddresses()) {
for (SocketAddress socketAddress : sslSocketAcceptor.getLocalAddresses()) {
InetSocketAddress address = (InetSocketAddress) socketAddress;
%>
<tr>
......@@ -506,7 +506,7 @@
</tr>
<% } %>
<% if (multiplexerSocketAcceptor != null) {
for (SocketAddress socketAddress : multiplexerSocketAcceptor.getManagedServiceAddresses()) {
for (SocketAddress socketAddress : multiplexerSocketAcceptor.getLocalAddresses()) {
InetSocketAddress address = (InetSocketAddress) socketAddress;
%>
<tr>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment