Commit c51067b8 authored by Guus der Kinderen's avatar Guus der Kinderen

OF-883: Prevent multiple closes of the same connection.

When a connection is closed, several events can be involved (and can be triggered).
Some of these events will attempt to close the connection (if it hadn't been already).
This, at best, least to multiple invocations of close-listeners. At worst, a loop of
close-calls is created.

This commit replaces the two-way boolean that guards closure (isClosed) with a
three-way guard. Now, a distinction is made between between a connection that is
closed, and one that is closing.

Additionally, some null pointer guards have been added, as I've seen those pop up in
my local logs during development.
parent 83e51522
...@@ -109,18 +109,18 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -109,18 +109,18 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
@Override @Override
public void inputClosed( IoSession session ) throws Exception { public void inputClosed( IoSession session ) throws Exception {
// Get the connection for this session final Connection connection = (Connection) session.getAttribute(CONNECTION);
Connection connection = (Connection) session.getAttribute(CONNECTION); if ( connection != null ) {
// Inform the connection that it was closed connection.close();
connection.close(); }
} }
@Override @Override
public void sessionClosed(IoSession session) throws Exception { public void sessionClosed(IoSession session) throws Exception {
// Get the connection for this session final Connection connection = (Connection) session.getAttribute(CONNECTION);
Connection connection = (Connection) session.getAttribute(CONNECTION); if ( connection != null ) {
// Inform the connection that it was closed connection.close();
connection.close(); }
} }
/** /**
...@@ -128,31 +128,32 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -128,31 +128,32 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
* session idle time as specified by {@link #getMaxIdleTime()}. This method * session idle time as specified by {@link #getMaxIdleTime()}. This method
* will be invoked each time that such a period passes (even if no IO has * will be invoked each time that such a period passes (even if no IO has
* occurred in between). * occurred in between).
* *
* Openfire will disconnect a session the second time this method is * Openfire will disconnect a session the second time this method is
* invoked, if no IO has occurred between the first and second invocation. * invoked, if no IO has occurred between the first and second invocation.
* This allows extensions of this class to use the first invocation to check * This allows extensions of this class to use the first invocation to check
* for livelyness of the MINA session (e.g by polling the remote entity, as * for livelyness of the MINA session (e.g by polling the remote entity, as
* {@link ClientConnectionHandler} does). * {@link ClientConnectionHandler} does).
* *
* @see org.apache.mina.common.IoHandlerAdapter#sessionIdle(org.apache.mina.common.IoSession, * @see IoHandlerAdapter#sessionIdle(IoSession, IdleStatus)
* org.apache.mina.common.IdleStatus)
*/ */
@Override @Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception { public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
if (session.getIdleCount(status) > 1) { if (session.getIdleCount(status) > 1) {
// Get the connection for this session // Get the connection for this session
final Connection connection = (Connection) session.getAttribute(CONNECTION); final Connection connection = (Connection) session.getAttribute(CONNECTION);
// Close idle connection if (connection != null) {
if (Log.isDebugEnabled()) { // Close idle connection
Log.debug("ConnectionHandler: Closing connection that has been idle: " + connection); if (Log.isDebugEnabled()) {
} Log.debug("ConnectionHandler: Closing connection that has been idle: " + connection);
connection.close(); }
connection.close();
}
} }
} }
@Override @Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception { public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
Log.warn("Closing connection due to exception in session: " + session, cause); Log.warn("Closing connection due to exception in session: " + session, cause);
try { try {
...@@ -191,13 +192,16 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -191,13 +192,16 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
handler.process((String) message, parser); handler.process((String) message, parser);
} catch (Exception e) { } catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e); Log.error("Closing connection due to error while processing message: " + message, e);
Connection connection = (Connection) session.getAttribute(CONNECTION); final Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.close(); if ( connection != null ) {
connection.close();
}
} }
} }
@Override @Override
public void messageSent(IoSession session, Object message) throws Exception { public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message); super.messageSent(session, message);
// Update counter of written btyes // Update counter of written btyes
updateWrittenBytesCounter(session); updateWrittenBytesCounter(session);
......
...@@ -77,6 +77,8 @@ public class NIOConnection implements Connection { ...@@ -77,6 +77,8 @@ public class NIOConnection implements Connection {
private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class); private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class);
public enum State { RUNNING, CLOSING, CLOSED }
/** /**
* The utf-8 charset for decoding and encoding XMPP packet streams. * The utf-8 charset for decoding and encoding XMPP packet streams.
*/ */
...@@ -109,13 +111,14 @@ public class NIOConnection implements Connection { ...@@ -109,13 +111,14 @@ public class NIOConnection implements Connection {
*/ */
private CompressionPolicy compressionPolicy = CompressionPolicy.disabled; private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
private static ThreadLocal<CharsetEncoder> encoder = new ThreadLocalEncoder(); private static ThreadLocal<CharsetEncoder> encoder = new ThreadLocalEncoder();
/** /**
* Flag that specifies if the connection should be considered closed. Closing a NIO connection * Flag that specifies if the connection should be considered closed. Closing a NIO connection
* is an asynch operation so instead of waiting for the connection to be actually closed just * is an asynch operation so instead of waiting for the connection to be actually closed just
* keep this flag to avoid using the connection between #close was used and the socket is actually * keep this flag to avoid using the connection between #close was used and the socket is actually
* closed. * closed.
*/ */
private boolean closed; private State state;
/** /**
* Lock used to ensure the integrity of the underlying IoSession (refer to * Lock used to ensure the integrity of the underlying IoSession (refer to
...@@ -131,7 +134,7 @@ public class NIOConnection implements Connection { ...@@ -131,7 +134,7 @@ public class NIOConnection implements Connection {
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session; this.ioSession = session;
this.backupDeliverer = packetDeliverer; this.backupDeliverer = packetDeliverer;
closed = false; state = State.RUNNING;
} }
public boolean validate() { public boolean validate() {
...@@ -216,26 +219,47 @@ public class NIOConnection implements Connection { ...@@ -216,26 +219,47 @@ public class NIOConnection implements Connection {
return backupDeliverer; return backupDeliverer;
} }
public void close() { public synchronized void close()
synchronized(this) { {
if (isClosed()) { try
return; {
} if ( state == State.CLOSED )
try { {
deliverRawText(flashClient ? "</flash:stream>" : "</stream:stream>", true); return;
} catch (Exception e) { }
// Ignore
// This prevents any action after the first invocation of close() on this connection.
if ( state != State.CLOSING )
{
state = State.CLOSING;
try
{
deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
}
catch ( Exception e )
{
// Ignore
}
}
// deliverRawText might already have forced the state from Closing to Closed. In that case, there's no need
// to invoke the CloseListeners again.
if ( state == State.CLOSING )
{
// TODO Check for regression of OF-881 (which placed the call below outside of the synchronized block).
notifyCloseListeners(); // clean up session, etc.
} }
if (session != null) { }
session.setStatus(Session.STATUS_CLOSED); finally
{
// Ensure that the state of this connection, its session and the MINA context are eventually closed.
state = State.CLOSED;
if ( session != null )
{
session.setStatus( Session.STATUS_CLOSED );
} }
closed = true; ioSession.close( true );
} }
// OF-881: Notify any close listeners after the synchronized block has completed.
notifyCloseListeners(); // clean up session, etc.
ioSession.close(true); // sync via MINA
} }
public void systemShutdown() { public void systemShutdown() {
...@@ -263,7 +287,7 @@ public class NIOConnection implements Connection { ...@@ -263,7 +287,7 @@ public class NIOConnection implements Connection {
} }
public synchronized boolean isClosed() { public synchronized boolean isClosed() {
return closed; return state == State.CLOSED;
} }
public boolean isSecure() { public boolean isSecure() {
...@@ -324,12 +348,7 @@ public class NIOConnection implements Connection { ...@@ -324,12 +348,7 @@ public class NIOConnection implements Connection {
} }
public void deliverRawText(String text) { public void deliverRawText(String text) {
// Deliver the packet in asynchronous mode if (state != State.CLOSED) {
deliverRawText(text, true);
}
private void deliverRawText(String text, boolean asynchronous) {
if (!isClosed()) {
boolean errorDelivering = false; boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(text.length()); IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true); buffer.setAutoExpand(true);
...@@ -343,22 +362,12 @@ public class NIOConnection implements Connection { ...@@ -343,22 +362,12 @@ public class NIOConnection implements Connection {
buffer.flip(); buffer.flip();
ioSessionLock.lock(); ioSessionLock.lock();
try { try {
if (asynchronous) { // OF-464: handle dropped connections (no backupDeliverer in this case?)
// OF-464: handle dropped connections (no backupDeliverer in this case?) if (!ioSession.isConnected()) {
if (!ioSession.isConnected()) { throw new IOException("Connection reset/closed by peer");
throw new IOException("Connection reset/closed by peer");
}
ioSession.write(buffer);
} }
else { ioSession.write(buffer);
// Send stanza and wait for ACK (using a 2 seconds default timeout) }
boolean ok =
ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
if (!ok) {
Log.warn("No ACK was received when sending stanza to: " + this.toString());
}
}
}
finally { finally {
ioSessionLock.unlock(); ioSessionLock.unlock();
} }
...@@ -368,8 +377,8 @@ public class NIOConnection implements Connection { ...@@ -368,8 +377,8 @@ public class NIOConnection implements Connection {
errorDelivering = true; errorDelivering = true;
} }
// Close the connection if delivering text fails and we are already not closing the connection // Attempt to close the connection if delivering text fails.
if (errorDelivering && asynchronous) { if (errorDelivering) {
close(); close();
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment