Commit 0cd1edef authored by Guus der Kinderen's avatar Guus der Kinderen Committed by Dave Cridland

OF-883: Prevent multiple closes of the same connection.

When a connection is closed, several events can be involved (and can be triggered).
Some of these events will attempt to close the connection (if it hadn't been already).
This, at best, least to multiple invocations of close-listeners. At worst, a loop of
close-calls is created.

This commit replaces the two-way boolean that guards closure (isClosed) with a
three-way guard. Now, a distinction is made between between a connection that is
closed, and one that is closing.

Additionally, some null pointer guards have been added, as I've seen those pop up in
my local logs during development.
parent d415e63f
...@@ -109,19 +109,19 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -109,19 +109,19 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
@Override @Override
public void inputClosed( IoSession session ) throws Exception { public void inputClosed( IoSession session ) throws Exception {
// Get the connection for this session final Connection connection = (Connection) session.getAttribute(CONNECTION);
Connection connection = (Connection) session.getAttribute(CONNECTION); if ( connection != null ) {
// Inform the connection that it was closed
connection.close(); connection.close();
} }
}
@Override @Override
public void sessionClosed(IoSession session) throws Exception { public void sessionClosed(IoSession session) throws Exception {
// Get the connection for this session final Connection connection = (Connection) session.getAttribute(CONNECTION);
Connection connection = (Connection) session.getAttribute(CONNECTION); if ( connection != null ) {
// Inform the connection that it was closed
connection.close(); connection.close();
} }
}
/** /**
* Invoked when a MINA session has been idle for half of the allowed XMPP * Invoked when a MINA session has been idle for half of the allowed XMPP
...@@ -135,14 +135,14 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -135,14 +135,14 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
* for livelyness of the MINA session (e.g by polling the remote entity, as * for livelyness of the MINA session (e.g by polling the remote entity, as
* {@link ClientConnectionHandler} does). * {@link ClientConnectionHandler} does).
* *
* @see org.apache.mina.common.IoHandlerAdapter#sessionIdle(org.apache.mina.common.IoSession, * @see IoHandlerAdapter#sessionIdle(IoSession, IdleStatus)
* org.apache.mina.common.IdleStatus)
*/ */
@Override @Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception { public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
if (session.getIdleCount(status) > 1) { if (session.getIdleCount(status) > 1) {
// Get the connection for this session // Get the connection for this session
final Connection connection = (Connection) session.getAttribute(CONNECTION); final Connection connection = (Connection) session.getAttribute(CONNECTION);
if (connection != null) {
// Close idle connection // Close idle connection
if (Log.isDebugEnabled()) { if (Log.isDebugEnabled()) {
Log.debug("ConnectionHandler: Closing connection that has been idle: " + connection); Log.debug("ConnectionHandler: Closing connection that has been idle: " + connection);
...@@ -150,6 +150,7 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -150,6 +150,7 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
connection.close(); connection.close();
} }
} }
}
@Override @Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception { public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
...@@ -191,9 +192,12 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -191,9 +192,12 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
handler.process((String) message, parser); handler.process((String) message, parser);
} catch (Exception e) { } catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e); Log.error("Closing connection due to error while processing message: " + message, e);
Connection connection = (Connection) session.getAttribute(CONNECTION); final Connection connection = (Connection) session.getAttribute(CONNECTION);
if ( connection != null ) {
connection.close(); connection.close();
} }
}
} }
@Override @Override
......
...@@ -75,6 +75,8 @@ public class NIOConnection implements Connection { ...@@ -75,6 +75,8 @@ public class NIOConnection implements Connection {
private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class); private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class);
public enum State { RUNNING, CLOSING, CLOSED }
/** /**
* The utf-8 charset for decoding and encoding XMPP packet streams. * The utf-8 charset for decoding and encoding XMPP packet streams.
*/ */
...@@ -107,13 +109,14 @@ public class NIOConnection implements Connection { ...@@ -107,13 +109,14 @@ public class NIOConnection implements Connection {
*/ */
private CompressionPolicy compressionPolicy = CompressionPolicy.disabled; private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
private static ThreadLocal<CharsetEncoder> encoder = new ThreadLocalEncoder(); private static ThreadLocal<CharsetEncoder> encoder = new ThreadLocalEncoder();
/** /**
* Flag that specifies if the connection should be considered closed. Closing a NIO connection * Flag that specifies if the connection should be considered closed. Closing a NIO connection
* is an asynch operation so instead of waiting for the connection to be actually closed just * is an asynch operation so instead of waiting for the connection to be actually closed just
* keep this flag to avoid using the connection between #close was used and the socket is actually * keep this flag to avoid using the connection between #close was used and the socket is actually
* closed. * closed.
*/ */
private boolean closed; private State state;
/** /**
* Lock used to ensure the integrity of the underlying IoSession (refer to * Lock used to ensure the integrity of the underlying IoSession (refer to
...@@ -129,7 +132,7 @@ public class NIOConnection implements Connection { ...@@ -129,7 +132,7 @@ public class NIOConnection implements Connection {
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session; this.ioSession = session;
this.backupDeliverer = packetDeliverer; this.backupDeliverer = packetDeliverer;
closed = false; state = State.RUNNING;
} }
public boolean validate() { public boolean validate() {
...@@ -205,26 +208,47 @@ public class NIOConnection implements Connection { ...@@ -205,26 +208,47 @@ public class NIOConnection implements Connection {
return backupDeliverer; return backupDeliverer;
} }
public void close() { public synchronized void close()
synchronized(this) { {
if (isClosed()) { try
{
if ( state == State.CLOSED )
{
return; return;
} }
try {
deliverRawText(flashClient ? "</flash:stream>" : "</stream:stream>", true); // This prevents any action after the first invocation of close() on this connection.
} catch (Exception e) { if ( state != State.CLOSING )
// Ignore {
state = State.CLOSING;
try
{
deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
} }
if (session != null) { catch ( Exception e )
session.setStatus(Session.STATUS_CLOSED); {
// Ignore
} }
closed = true;
} }
// OF-881: Notify any close listeners after the synchronized block has completed. // deliverRawText might already have forced the state from Closing to Closed. In that case, there's no need
// to invoke the CloseListeners again.
if ( state == State.CLOSING )
{
// TODO Check for regression of OF-881 (which placed the call below outside of the synchronized block).
notifyCloseListeners(); // clean up session, etc. notifyCloseListeners(); // clean up session, etc.
}
ioSession.close(true); // sync via MINA }
finally
{
// Ensure that the state of this connection, its session and the MINA context are eventually closed.
state = State.CLOSED;
if ( session != null )
{
session.setStatus( Session.STATUS_CLOSED );
}
ioSession.close( true );
}
} }
public void systemShutdown() { public void systemShutdown() {
...@@ -252,7 +276,7 @@ public class NIOConnection implements Connection { ...@@ -252,7 +276,7 @@ public class NIOConnection implements Connection {
} }
public synchronized boolean isClosed() { public synchronized boolean isClosed() {
return closed; return state == State.CLOSED;
} }
public boolean isSecure() { public boolean isSecure() {
...@@ -313,12 +337,7 @@ public class NIOConnection implements Connection { ...@@ -313,12 +337,7 @@ public class NIOConnection implements Connection {
} }
public void deliverRawText(String text) { public void deliverRawText(String text) {
// Deliver the packet in asynchronous mode if (state != State.CLOSED) {
deliverRawText(text, true);
}
private void deliverRawText(String text, boolean asynchronous) {
if (!isClosed()) {
boolean errorDelivering = false; boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(text.length()); IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true); buffer.setAutoExpand(true);
...@@ -332,22 +351,12 @@ public class NIOConnection implements Connection { ...@@ -332,22 +351,12 @@ public class NIOConnection implements Connection {
buffer.flip(); buffer.flip();
ioSessionLock.lock(); ioSessionLock.lock();
try { try {
if (asynchronous) {
// OF-464: handle dropped connections (no backupDeliverer in this case?) // OF-464: handle dropped connections (no backupDeliverer in this case?)
if (!ioSession.isConnected()) { if (!ioSession.isConnected()) {
throw new IOException("Connection reset/closed by peer"); throw new IOException("Connection reset/closed by peer");
} }
ioSession.write(buffer); ioSession.write(buffer);
} }
else {
// Send stanza and wait for ACK (using a 2 seconds default timeout)
boolean ok =
ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
if (!ok) {
Log.warn("No ACK was received when sending stanza to: " + this.toString());
}
}
}
finally { finally {
ioSessionLock.unlock(); ioSessionLock.unlock();
} }
...@@ -357,8 +366,8 @@ public class NIOConnection implements Connection { ...@@ -357,8 +366,8 @@ public class NIOConnection implements Connection {
errorDelivering = true; errorDelivering = true;
} }
// Close the connection if delivering text fails and we are already not closing the connection // Attempt to close the connection if delivering text fails.
if (errorDelivering && asynchronous) { if (errorDelivering) {
close(); close();
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment