Commit fbe74c8e authored by daryl herzmann's avatar daryl herzmann

Merge pull request #374 from tevans/OF-881

OF-881: Simplify connection close semantics
parents 9545836b aa7a5ef6
...@@ -20,20 +20,21 @@ ...@@ -20,20 +20,21 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import java.io.Closeable;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
/** /**
* Represents a connection on the server. * Represents a connection on the server.
* *
* @author Iain Shigeoka * @author Iain Shigeoka
*/ */
public interface Connection { public interface Connection extends Closeable {
/** /**
* Verifies that the connection is still live. Typically this is done by * Verifies that the connection is still live. Typically this is done by
* sending a whitespace character between packets. * sending a whitespace character between packets.
...@@ -144,29 +145,13 @@ public interface Connection { ...@@ -144,29 +145,13 @@ public interface Connection {
* <li>Call notifyEvent all listeners that the channel is shutting down. * <li>Call notifyEvent all listeners that the channel is shutting down.
* <li>Close the socket. * <li>Close the socket.
* </ul> * </ul>
* * Note this method overrides the base interface to suppress exceptions. However,
* An invocation of this method is equal to invoking {@link #close(boolean)} with a parameter * it otherwise fulfills the requirements of the {@link Closeable#close()} contract
* that is false. * (idempotent, try-with-resources, etc.)
*/ */
@Override
public void close(); public void close();
/**
* Close this session including associated socket connection. The order of
* events for closing the session is:
* <ul>
* <li>Set closing flag to prevent redundant shutdowns.
* <li>Call notifyEvent all listeners that the channel is shutting down.
* <li>Close the socket.
* </ul>
*
* This method takes into account the connection state of the peer. Specifically,
* when the peer is known to be in a disconnected state, no data will be sent
* (otherwise, this method can trigger the delivery of an end-of-stream signal).
*
* @param peerIsKnownToBeDisconnected should be set to true when the peer is known to no longer be available.
*/
public void close( boolean peerIsKnownToBeDisconnected );
/** /**
* Notification message indicating that the server is being shutdown. Implementors * Notification message indicating that the server is being shutdown. Implementors
* should send a stream error whose condition is system-shutdown before closing * should send a stream error whose condition is system-shutdown before closing
...@@ -455,4 +440,10 @@ public interface Connection { ...@@ -455,4 +440,10 @@ public interface Connection {
*/ */
needed needed
} }
/**
* Used to specify operational status for the corresponding connection
*/
enum State { OPEN, CLOSED }
} }
...@@ -35,6 +35,7 @@ import java.util.HashMap; ...@@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
...@@ -88,6 +89,7 @@ public class SocketConnection implements Connection { ...@@ -88,6 +89,7 @@ public class SocketConnection implements Connection {
private Writer writer; private Writer writer;
private AtomicBoolean writing = new AtomicBoolean(false); private AtomicBoolean writing = new AtomicBoolean(false);
private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
/** /**
* Deliverer to use when the connection is closed or was closed when delivering * Deliverer to use when the connection is closed or was closed when delivering
...@@ -307,10 +309,7 @@ public class SocketConnection implements Connection { ...@@ -307,10 +309,7 @@ public class SocketConnection implements Connection {
@Override @Override
public boolean isClosed() { public boolean isClosed() {
if (session == null) { return state.get() == State.CLOSED;
return socket.isClosed();
}
return session.getStatus() == Session.STATUS_CLOSED;
} }
@Override @Override
...@@ -468,55 +467,50 @@ public class SocketConnection implements Connection { ...@@ -468,55 +467,50 @@ public class SocketConnection implements Connection {
@Override @Override
public void close() { public void close() {
close( false ); close(false);
} }
@Override /**
public void close( boolean peerIsKnownToBeDisconnected ) { * Normal connection close will attempt to write the stream end tag. Otherwise this method
boolean wasClosed = false; * forces the connection closed immediately. This method will be called from {@link SocketSendingTracker}
synchronized (this) { * when sending data over the socket has taken a long time and we need to close the socket, discard
if (!isClosed()) { * the connection and its session.
try { */
if (session != null) { private void close(boolean force) {
session.setStatus(Session.STATUS_CLOSED); if (state.compareAndSet(State.OPEN, State.CLOSED)) {
}
if (session != null) {
if ( !peerIsKnownToBeDisconnected ) { session.setStatus(Session.STATUS_CLOSED);
boolean allowedToWrite = false;
try {
requestWriting();
allowedToWrite = true;
// Register that we started sending data on the connection
writeStarted();
writer.write("</stream:stream>");
if (flashClient) {
writer.write('\0');
}
writer.flush();
}
catch (IOException e) {
// Do nothing
}
finally {
// Register that we finished sending data on the connection
writeFinished();
if (allowedToWrite) {
releaseWriting();
}
}
}
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close")
+ "\n" + this.toString(), e);
}
closeConnection();
wasClosed = true;
} }
}
if (wasClosed) { if (!force) {
boolean allowedToWrite = false;
try {
requestWriting();
allowedToWrite = true;
// Register that we started sending data on the connection
writeStarted();
writer.write("</stream:stream>");
if (flashClient) {
writer.write('\0');
}
writer.flush();
}
catch (Exception e) {
Log.error("Failed to deliver stream close tag: " + e.getMessage());
}
// Register that we finished sending data on the connection
writeFinished();
if (allowedToWrite) {
releaseWriting();
}
}
closeConnection();
notifyCloseListeners(); notifyCloseListeners();
}
}
} }
@Override @Override
...@@ -552,7 +546,7 @@ public class SocketConnection implements Connection { ...@@ -552,7 +546,7 @@ public class SocketConnection implements Connection {
Log.debug("Closing connection: " + this + " that started sending data at: " + Log.debug("Closing connection: " + this + " that started sending data at: " +
new Date(writeTimestamp)); new Date(writeTimestamp));
} }
forceClose(); close(true); // force
return true; return true;
} }
else { else {
...@@ -565,7 +559,7 @@ public class SocketConnection implements Connection { ...@@ -565,7 +559,7 @@ public class SocketConnection implements Connection {
if (Log.isDebugEnabled()) { if (Log.isDebugEnabled()) {
Log.debug("Closing connection that has been idle: " + this); Log.debug("Closing connection that has been idle: " + this);
} }
forceClose(); close(true); // force
return true; return true;
} }
} }
...@@ -577,24 +571,6 @@ public class SocketConnection implements Connection { ...@@ -577,24 +571,6 @@ public class SocketConnection implements Connection {
instances.remove(this); instances.remove(this);
} }
/**
* Forces the connection to be closed immediately no matter if closing the socket takes
* a long time. This method should only be called from {@link SocketSendingTracker} when
* sending data over the socket has taken a long time and we need to close the socket, discard
* the connection and its session.
*/
private void forceClose() {
if (session != null) {
// Set that the session is closed. This will prevent threads from trying to
// deliver packets to this session thus preventing future locks.
session.setStatus(Session.STATUS_CLOSED);
}
closeConnection();
// Notify the close listeners so that the SessionManager can send unavailable
// presences if required.
notifyCloseListeners();
}
private void closeConnection() { private void closeConnection() {
release(); release();
try { try {
......
...@@ -23,6 +23,7 @@ package org.jivesoftware.openfire.net; ...@@ -23,6 +23,7 @@ package org.jivesoftware.openfire.net;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener; import org.jivesoftware.openfire.ConnectionCloseListener;
...@@ -52,7 +53,7 @@ public abstract class VirtualConnection implements Connection { ...@@ -52,7 +53,7 @@ public abstract class VirtualConnection implements Connection {
final private Map<ConnectionCloseListener, Object> listeners = final private Map<ConnectionCloseListener, Object> listeners =
new HashMap<>(); new HashMap<>();
private boolean closed = false; private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
@Override @Override
public String getLanguage() { public String getLanguage() {
...@@ -95,10 +96,7 @@ public abstract class VirtualConnection implements Connection { ...@@ -95,10 +96,7 @@ public abstract class VirtualConnection implements Connection {
@Override @Override
public boolean isClosed() { public boolean isClosed() {
if (session == null) { return state.get() == State.CLOSED;
return closed;
}
return session.getStatus() == Session.STATUS_CLOSED;
} }
@Override @Override
...@@ -194,30 +192,20 @@ public abstract class VirtualConnection implements Connection { ...@@ -194,30 +192,20 @@ public abstract class VirtualConnection implements Connection {
*/ */
@Override @Override
public void close() { public void close() {
close( false ); if (state.compareAndSet(State.OPEN, State.CLOSED)) {
}
if (session != null) {
@Override session.setStatus(Session.STATUS_CLOSED);
public void close(boolean peerIsKnownToBeDisconnected) {
boolean wasClosed = false;
synchronized (this) {
if (!isClosed()) {
try {
if (session != null) {
session.setStatus(Session.STATUS_CLOSED);
}
closeVirtualConnection();
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close")
+ "\n" + this.toString(), e);
}
closed = true;
wasClosed = true;
} }
}
if (wasClosed) { try {
closeVirtualConnection();
} catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close") + "\n" + toString(), e);
}
notifyCloseListeners(); notifyCloseListeners();
} }
} }
......
...@@ -33,6 +33,7 @@ import java.nio.charset.CodingErrorAction; ...@@ -33,6 +33,7 @@ import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.KeyStore; import java.security.KeyStore;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
...@@ -53,7 +54,9 @@ import org.jivesoftware.openfire.auth.UnauthorizedException; ...@@ -53,7 +54,9 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.keystore.IdentityStoreConfig; import org.jivesoftware.openfire.keystore.IdentityStoreConfig;
import org.jivesoftware.openfire.keystore.Purpose; import org.jivesoftware.openfire.keystore.Purpose;
import org.jivesoftware.openfire.keystore.TrustStoreConfig; import org.jivesoftware.openfire.keystore.TrustStoreConfig;
import org.jivesoftware.openfire.net.*; import org.jivesoftware.openfire.net.ClientTrustManager;
import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.ServerTrustManager;
import org.jivesoftware.openfire.session.ConnectionSettings; import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
...@@ -75,8 +78,6 @@ public class NIOConnection implements Connection { ...@@ -75,8 +78,6 @@ public class NIOConnection implements Connection {
private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class); private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class);
public enum State { RUNNING, CLOSING, CLOSED }
private LocalSession session; private LocalSession session;
private IoSession ioSession; private IoSession ioSession;
...@@ -111,7 +112,7 @@ public class NIOConnection implements Connection { ...@@ -111,7 +112,7 @@ public class NIOConnection implements Connection {
* keep this flag to avoid using the connection between #close was used and the socket is actually * keep this flag to avoid using the connection between #close was used and the socket is actually
* closed. * closed.
*/ */
private volatile State state; private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
/** /**
* Lock used to ensure the integrity of the underlying IoSession (refer to * Lock used to ensure the integrity of the underlying IoSession (refer to
...@@ -127,7 +128,6 @@ public class NIOConnection implements Connection { ...@@ -127,7 +128,6 @@ public class NIOConnection implements Connection {
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session; this.ioSession = session;
this.backupDeliverer = packetDeliverer; this.backupDeliverer = packetDeliverer;
state = State.RUNNING;
} }
@Override @Override
...@@ -228,60 +228,29 @@ public class NIOConnection implements Connection { ...@@ -228,60 +228,29 @@ public class NIOConnection implements Connection {
@Override @Override
public void close() { public void close() {
close( false ); if (state.compareAndSet(State.OPEN, State.CLOSED)) {
}
@Override // Ensure that the state of this connection, its session and the MINA context are eventually closed.
public void close( boolean peerIsKnownToBeDisconnected )
{
boolean notifyClose = false;
synchronized ( this ) {
try
{
if ( state == State.CLOSED )
{
return;
}
// This prevents any action after the first invocation of close() on this connection. if ( session != null ) {
if ( state != State.CLOSING ) session.setStatus( Session.STATUS_CLOSED );
{ }
state = State.CLOSING;
if ( !peerIsKnownToBeDisconnected )
{
try
{
deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
}
catch ( Exception e )
{
// Ignore
}
}
}
// deliverRawText might already have forced the state from Closing to Closed. In that case, there's no need try {
// to invoke the CloseListeners again. deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
if ( state == State.CLOSING ) } catch ( Exception e ) {
{ Log.error("Failed to deliver stream close tag: " + e.getMessage());
notifyClose = true;
}
} }
finally
{ try {
// Ensure that the state of this connection, its session and the MINA context are eventually closed. ioSession.close( true );
state = State.CLOSED; } catch (Exception e) {
if ( session != null ) Log.error("Exception while closing MINA session", e);
{
session.setStatus( Session.STATUS_CLOSED );
}
ioSession.close( true );
} }
}
if (notifyClose)
{
notifyCloseListeners(); // clean up session, etc. notifyCloseListeners(); // clean up session, etc.
}
}
} }
@Override @Override
...@@ -312,7 +281,7 @@ public class NIOConnection implements Connection { ...@@ -312,7 +281,7 @@ public class NIOConnection implements Connection {
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return state == State.CLOSED; return state.get() == State.CLOSED;
} }
@Override @Override
...@@ -322,7 +291,7 @@ public class NIOConnection implements Connection { ...@@ -322,7 +291,7 @@ public class NIOConnection implements Connection {
@Override @Override
public void deliver(Packet packet) throws UnauthorizedException { public void deliver(Packet packet) throws UnauthorizedException {
if (state != State.RUNNING) { if (isClosed()) {
backupDeliverer.deliver(packet); backupDeliverer.deliver(packet);
} }
else { else {
...@@ -368,7 +337,7 @@ public class NIOConnection implements Connection { ...@@ -368,7 +337,7 @@ public class NIOConnection implements Connection {
@Override @Override
public void deliverRawText(String text) { public void deliverRawText(String text) {
if (state != State.CLOSED) { if (!isClosed()) {
boolean errorDelivering = false; boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(text.length()); IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true); buffer.setAutoExpand(true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment