Commit 1a0b665a authored by Dave Cridland's avatar Dave Cridland

Merge pull request #220 from guusdk/OF-883

Two additional fixes that relate to OF-883
parents 49a4799f 1b2dd66e
...@@ -109,18 +109,18 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -109,18 +109,18 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
@Override @Override
public void inputClosed( IoSession session ) throws Exception { public void inputClosed( IoSession session ) throws Exception {
// Get the connection for this session final Connection connection = (Connection) session.getAttribute(CONNECTION);
Connection connection = (Connection) session.getAttribute(CONNECTION); if ( connection != null ) {
// Inform the connection that it was closed connection.close();
connection.close(); }
} }
@Override @Override
public void sessionClosed(IoSession session) throws Exception { public void sessionClosed(IoSession session) throws Exception {
// Get the connection for this session final Connection connection = (Connection) session.getAttribute(CONNECTION);
Connection connection = (Connection) session.getAttribute(CONNECTION); if ( connection != null ) {
// Inform the connection that it was closed connection.close();
connection.close(); }
} }
/** /**
...@@ -128,32 +128,33 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -128,32 +128,33 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
* session idle time as specified by {@link #getMaxIdleTime()}. This method * session idle time as specified by {@link #getMaxIdleTime()}. This method
* will be invoked each time that such a period passes (even if no IO has * will be invoked each time that such a period passes (even if no IO has
* occurred in between). * occurred in between).
* *
* Openfire will disconnect a session the second time this method is * Openfire will disconnect a session the second time this method is
* invoked, if no IO has occurred between the first and second invocation. * invoked, if no IO has occurred between the first and second invocation.
* This allows extensions of this class to use the first invocation to check * This allows extensions of this class to use the first invocation to check
* for livelyness of the MINA session (e.g by polling the remote entity, as * for livelyness of the MINA session (e.g by polling the remote entity, as
* {@link ClientConnectionHandler} does). * {@link ClientConnectionHandler} does).
* *
* @see org.apache.mina.common.IoHandlerAdapter#sessionIdle(org.apache.mina.common.IoSession, * @see IoHandlerAdapter#sessionIdle(IoSession, IdleStatus)
* org.apache.mina.common.IdleStatus)
*/ */
@Override @Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception { public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
if (session.getIdleCount(status) > 1) { if (session.getIdleCount(status) > 1) {
// Get the connection for this session // Get the connection for this session
final Connection connection = (Connection) session.getAttribute(CONNECTION); final Connection connection = (Connection) session.getAttribute(CONNECTION);
// Close idle connection if (connection != null) {
if (Log.isDebugEnabled()) { // Close idle connection
Log.debug("ConnectionHandler: Closing connection that has been idle: " + connection); if (Log.isDebugEnabled()) {
} Log.debug("ConnectionHandler: Closing connection that has been idle: " + connection);
connection.close(); }
connection.close();
}
} }
} }
@Override @Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception { public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
Log.warn("Closing session due to exception: " + session, cause); Log.warn("Closing connection due to exception in session: " + session, cause);
try { try {
// OF-524: Determine stream:error message. // OF-524: Determine stream:error message.
...@@ -167,7 +168,10 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -167,7 +168,10 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
final Connection connection = (Connection) session.getAttribute( CONNECTION ); final Connection connection = (Connection) session.getAttribute( CONNECTION );
connection.deliverRawText( error.toXML() ); connection.deliverRawText( error.toXML() );
} finally { } finally {
session.close( false ); final Connection connection = (Connection) session.getAttribute( CONNECTION );
if (connection != null) {
connection.close();
}
} }
} }
...@@ -188,13 +192,16 @@ public abstract class ConnectionHandler extends IoHandlerAdapter { ...@@ -188,13 +192,16 @@ public abstract class ConnectionHandler extends IoHandlerAdapter {
handler.process((String) message, parser); handler.process((String) message, parser);
} catch (Exception e) { } catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e); Log.error("Closing connection due to error while processing message: " + message, e);
Connection connection = (Connection) session.getAttribute(CONNECTION); final Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.close(); if ( connection != null ) {
connection.close();
}
} }
} }
@Override @Override
public void messageSent(IoSession session, Object message) throws Exception { public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message); super.messageSent(session, message);
// Update counter of written btyes // Update counter of written btyes
updateWrittenBytesCounter(session); updateWrittenBytesCounter(session);
......
...@@ -77,6 +77,8 @@ public class NIOConnection implements Connection { ...@@ -77,6 +77,8 @@ public class NIOConnection implements Connection {
private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class); private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class);
public enum State { RUNNING, CLOSING, CLOSED }
/** /**
* The utf-8 charset for decoding and encoding XMPP packet streams. * The utf-8 charset for decoding and encoding XMPP packet streams.
*/ */
...@@ -109,13 +111,14 @@ public class NIOConnection implements Connection { ...@@ -109,13 +111,14 @@ public class NIOConnection implements Connection {
*/ */
private CompressionPolicy compressionPolicy = CompressionPolicy.disabled; private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
private static ThreadLocal<CharsetEncoder> encoder = new ThreadLocalEncoder(); private static ThreadLocal<CharsetEncoder> encoder = new ThreadLocalEncoder();
/** /**
* Flag that specifies if the connection should be considered closed. Closing a NIO connection * Flag that specifies if the connection should be considered closed. Closing a NIO connection
* is an asynch operation so instead of waiting for the connection to be actually closed just * is an asynch operation so instead of waiting for the connection to be actually closed just
* keep this flag to avoid using the connection between #close was used and the socket is actually * keep this flag to avoid using the connection between #close was used and the socket is actually
* closed. * closed.
*/ */
private boolean closed; private State state;
/** /**
* Lock used to ensure the integrity of the underlying IoSession (refer to * Lock used to ensure the integrity of the underlying IoSession (refer to
...@@ -131,7 +134,7 @@ public class NIOConnection implements Connection { ...@@ -131,7 +134,7 @@ public class NIOConnection implements Connection {
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session; this.ioSession = session;
this.backupDeliverer = packetDeliverer; this.backupDeliverer = packetDeliverer;
closed = false; state = State.RUNNING;
} }
public boolean validate() { public boolean validate() {
...@@ -216,26 +219,53 @@ public class NIOConnection implements Connection { ...@@ -216,26 +219,53 @@ public class NIOConnection implements Connection {
return backupDeliverer; return backupDeliverer;
} }
public void close() { public void close()
synchronized(this) { {
if (isClosed()) { boolean notifyClose = false;
return; synchronized ( this ) {
} try
try { {
deliverRawText(flashClient ? "</flash:stream>" : "</stream:stream>", true); if ( state == State.CLOSED )
} catch (Exception e) { {
// Ignore return;
}
// This prevents any action after the first invocation of close() on this connection.
if ( state != State.CLOSING )
{
state = State.CLOSING;
try
{
deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
}
catch ( Exception e )
{
// Ignore
}
}
// deliverRawText might already have forced the state from Closing to Closed. In that case, there's no need
// to invoke the CloseListeners again.
if ( state == State.CLOSING )
{
notifyClose = true;
}
} }
if (session != null) { finally
session.setStatus(Session.STATUS_CLOSED); {
// Ensure that the state of this connection, its session and the MINA context are eventually closed.
state = State.CLOSED;
if ( session != null )
{
session.setStatus( Session.STATUS_CLOSED );
}
ioSession.close( true );
} }
closed = true; }
} if (notifyClose)
{
// OF-881: Notify any close listeners after the synchronized block has completed. notifyCloseListeners(); // clean up session, etc.
notifyCloseListeners(); // clean up session, etc. }
ioSession.close(true); // sync via MINA
} }
public void systemShutdown() { public void systemShutdown() {
...@@ -263,7 +293,7 @@ public class NIOConnection implements Connection { ...@@ -263,7 +293,7 @@ public class NIOConnection implements Connection {
} }
public synchronized boolean isClosed() { public synchronized boolean isClosed() {
return closed; return state == State.CLOSED;
} }
public boolean isSecure() { public boolean isSecure() {
...@@ -324,12 +354,7 @@ public class NIOConnection implements Connection { ...@@ -324,12 +354,7 @@ public class NIOConnection implements Connection {
} }
public void deliverRawText(String text) { public void deliverRawText(String text) {
// Deliver the packet in asynchronous mode if (state != State.CLOSED) {
deliverRawText(text, true);
}
private void deliverRawText(String text, boolean asynchronous) {
if (!isClosed()) {
boolean errorDelivering = false; boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(text.length()); IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true); buffer.setAutoExpand(true);
...@@ -343,22 +368,12 @@ public class NIOConnection implements Connection { ...@@ -343,22 +368,12 @@ public class NIOConnection implements Connection {
buffer.flip(); buffer.flip();
ioSessionLock.lock(); ioSessionLock.lock();
try { try {
if (asynchronous) { // OF-464: handle dropped connections (no backupDeliverer in this case?)
// OF-464: handle dropped connections (no backupDeliverer in this case?) if (!ioSession.isConnected()) {
if (!ioSession.isConnected()) { throw new IOException("Connection reset/closed by peer");
throw new IOException("Connection reset/closed by peer");
}
ioSession.write(buffer);
}
else {
// Send stanza and wait for ACK (using a 2 seconds default timeout)
boolean ok =
ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
if (!ok) {
Log.warn("No ACK was received when sending stanza to: " + this.toString());
}
} }
} ioSession.write(buffer);
}
finally { finally {
ioSessionLock.unlock(); ioSessionLock.unlock();
} }
...@@ -368,8 +383,8 @@ public class NIOConnection implements Connection { ...@@ -368,8 +383,8 @@ public class NIOConnection implements Connection {
errorDelivering = true; errorDelivering = true;
} }
// Close the connection if delivering text fails and we are already not closing the connection // Attempt to close the connection if delivering text fails.
if (errorDelivering && asynchronous) { if (errorDelivering) {
close(); close();
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment