Commit 3c89a459 authored by akrherz's avatar akrherz

OF-881 Simplify Connection close logic

patch by tevans, applied to master, backported to 3.10 by tevans
parent 1cdea3fd
...@@ -20,19 +20,20 @@ ...@@ -20,19 +20,20 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import java.io.Closeable;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
/** /**
* Represents a connection on the server. * Represents a connection on the server.
* *
* @author Iain Shigeoka * @author Iain Shigeoka
*/ */
public interface Connection { public interface Connection extends Closeable {
/** /**
* Verifies that the connection is still live. Typically this is done by * Verifies that the connection is still live. Typically this is done by
...@@ -144,29 +145,12 @@ public interface Connection { ...@@ -144,29 +145,12 @@ public interface Connection {
* <li>Call notifyEvent all listeners that the channel is shutting down. * <li>Call notifyEvent all listeners that the channel is shutting down.
* <li>Close the socket. * <li>Close the socket.
* </ul> * </ul>
* * Note this method overrides the base interface to suppress exceptions. However,
* An invocation of this method is equal to invoking {@link #close(boolean)} with a parameter * it otherwise fulfills the requirements of the {@link Closeable#close()} contract
* that is false. * (idempotent, try-with-resources, etc.)
*/ */
public void close(); public void close();
/**
* Close this session including associated socket connection. The order of
* events for closing the session is:
* <ul>
* <li>Set closing flag to prevent redundant shutdowns.
* <li>Call notifyEvent all listeners that the channel is shutting down.
* <li>Close the socket.
* </ul>
*
* This method takes into account the connection state of the peer. Specifically,
* when the peer is known to be in a disconnected state, no data will be sent
* (otherwise, this method can trigger the delivery of an end-of-stream signal).
*
* @param peerIsKnownToBeDisconnected should be set to true when the peer is known to no longer be available.
*/
public void close( boolean peerIsKnownToBeDisconnected );
/** /**
* Notification message indicating that the server is being shutdown. Implementors * Notification message indicating that the server is being shutdown. Implementors
* should send a stream error whose condition is system-shutdown before closing * should send a stream error whose condition is system-shutdown before closing
...@@ -455,4 +439,10 @@ public interface Connection { ...@@ -455,4 +439,10 @@ public interface Connection {
*/ */
needed needed
} }
/**
* Used to specify operational status for the corresponding connection
*/
enum State { OPEN, CLOSED }
} }
...@@ -34,6 +34,7 @@ import java.util.HashMap; ...@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
...@@ -92,7 +93,8 @@ public class SocketConnection implements Connection { ...@@ -92,7 +93,8 @@ public class SocketConnection implements Connection {
private Writer writer; private Writer writer;
private AtomicBoolean writing = new AtomicBoolean(false); private AtomicBoolean writing = new AtomicBoolean(false);
private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
/** /**
* Deliverer to use when the connection is closed or was closed when delivering * Deliverer to use when the connection is closed or was closed when delivering
* a packet. * a packet.
...@@ -300,10 +302,7 @@ public class SocketConnection implements Connection { ...@@ -300,10 +302,7 @@ public class SocketConnection implements Connection {
} }
public boolean isClosed() { public boolean isClosed() {
if (session == null) { return state.get() == State.CLOSED;
return socket.isClosed();
}
return session.getStatus() == Session.STATUS_CLOSED;
} }
public boolean isSecure() { public boolean isSecure() {
...@@ -441,57 +440,54 @@ public class SocketConnection implements Connection { ...@@ -441,57 +440,54 @@ public class SocketConnection implements Connection {
return backupDeliverer; return backupDeliverer;
} }
@Override
public void close() { public void close() {
close( false ); close(false);
} }
public void close( boolean peerIsKnownToBeDisconnected ) { /**
boolean wasClosed = false; * Normal connection close will attempt to write the stream end tag. Otherwise this method
synchronized (this) { * forces the connection closed immediately. This method will be called from {@link SocketSendingTracker}
if (!isClosed()) { * when sending data over the socket has taken a long time and we need to close the socket, discard
try { * the connection and its session.
if (session != null) { */
session.setStatus(Session.STATUS_CLOSED); private void close(boolean force) {
} if (state.compareAndSet(State.OPEN, State.CLOSED)) {
if ( !peerIsKnownToBeDisconnected ) { if (session != null) {
boolean allowedToWrite = false; session.setStatus(Session.STATUS_CLOSED);
try {
requestWriting();
allowedToWrite = true;
// Register that we started sending data on the connection
writeStarted();
writer.write("</stream:stream>");
if (flashClient) {
writer.write('\0');
}
writer.flush();
}
catch (IOException e) {
// Do nothing
}
finally {
// Register that we finished sending data on the connection
writeFinished();
if (allowedToWrite) {
releaseWriting();
}
}
}
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close")
+ "\n" + this.toString(), e);
}
closeConnection();
wasClosed = true;
} }
}
if (wasClosed) { if (!force) {
boolean allowedToWrite = false;
try {
requestWriting();
allowedToWrite = true;
// Register that we started sending data on the connection
writeStarted();
writer.write("</stream:stream>");
if (flashClient) {
writer.write('\0');
}
writer.flush();
}
catch (Exception e) {
Log.error("Failed to deliver stream close tag: " + e.getMessage());
}
// Register that we finished sending data on the connection
writeFinished();
if (allowedToWrite) {
releaseWriting();
}
}
closeConnection();
notifyCloseListeners(); notifyCloseListeners();
}
}
} }
public void systemShutdown() { public void systemShutdown() {
deliverRawText("<stream:error><system-shutdown " + deliverRawText("<stream:error><system-shutdown " +
"xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error>"); "xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error>");
...@@ -524,7 +520,7 @@ public class SocketConnection implements Connection { ...@@ -524,7 +520,7 @@ public class SocketConnection implements Connection {
Log.debug("Closing connection: " + this + " that started sending data at: " + Log.debug("Closing connection: " + this + " that started sending data at: " +
new Date(writeTimestamp)); new Date(writeTimestamp));
} }
forceClose(); close(true); // force
return true; return true;
} }
else { else {
...@@ -537,7 +533,7 @@ public class SocketConnection implements Connection { ...@@ -537,7 +533,7 @@ public class SocketConnection implements Connection {
if (Log.isDebugEnabled()) { if (Log.isDebugEnabled()) {
Log.debug("Closing connection that has been idle: " + this); Log.debug("Closing connection that has been idle: " + this);
} }
forceClose(); close(true); // force
return true; return true;
} }
} }
...@@ -549,24 +545,6 @@ public class SocketConnection implements Connection { ...@@ -549,24 +545,6 @@ public class SocketConnection implements Connection {
instances.remove(this); instances.remove(this);
} }
/**
* Forces the connection to be closed immediately no matter if closing the socket takes
* a long time. This method should only be called from {@link SocketSendingTracker} when
* sending data over the socket has taken a long time and we need to close the socket, discard
* the connection and its session.
*/
private void forceClose() {
if (session != null) {
// Set that the session is closed. This will prevent threads from trying to
// deliver packets to this session thus preventing future locks.
session.setStatus(Session.STATUS_CLOSED);
}
closeConnection();
// Notify the close listeners so that the SessionManager can send unavailable
// presences if required.
notifyCloseListeners();
}
private void closeConnection() { private void closeConnection() {
release(); release();
try { try {
......
...@@ -23,6 +23,7 @@ package org.jivesoftware.openfire.net; ...@@ -23,6 +23,7 @@ package org.jivesoftware.openfire.net;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener; import org.jivesoftware.openfire.ConnectionCloseListener;
...@@ -52,8 +53,8 @@ public abstract class VirtualConnection implements Connection { ...@@ -52,8 +53,8 @@ public abstract class VirtualConnection implements Connection {
final private Map<ConnectionCloseListener, Object> listeners = final private Map<ConnectionCloseListener, Object> listeners =
new HashMap<ConnectionCloseListener, Object>(); new HashMap<ConnectionCloseListener, Object>();
private boolean closed = false; private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
public String getLanguage() { public String getLanguage() {
// Information not available. Return any value. This is not actually used. // Information not available. Return any value. This is not actually used.
return null; return null;
...@@ -87,10 +88,7 @@ public abstract class VirtualConnection implements Connection { ...@@ -87,10 +88,7 @@ public abstract class VirtualConnection implements Connection {
} }
public boolean isClosed() { public boolean isClosed() {
if (session == null) { return state.get() == State.CLOSED;
return closed;
}
return session.getStatus() == Session.STATUS_CLOSED;
} }
public Connection.CompressionPolicy getCompressionPolicy() { public Connection.CompressionPolicy getCompressionPolicy() {
...@@ -168,30 +166,22 @@ public abstract class VirtualConnection implements Connection { ...@@ -168,30 +166,22 @@ public abstract class VirtualConnection implements Connection {
* Closes the session, the virtual connection and notifies listeners that the connection * Closes the session, the virtual connection and notifies listeners that the connection
* has been closed. * has been closed.
*/ */
@Override
public void close() { public void close() {
close( false ); if (state.compareAndSet(State.OPEN, State.CLOSED)) {
}
if (session != null) {
public void close(boolean peerIsKnownToBeDisconnected) { session.setStatus(Session.STATUS_CLOSED);
boolean wasClosed = false;
synchronized (this) {
if (!isClosed()) {
try {
if (session != null) {
session.setStatus(Session.STATUS_CLOSED);
}
closeVirtualConnection();
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close")
+ "\n" + this.toString(), e);
}
closed = true;
wasClosed = true;
} }
}
if (wasClosed) { try {
closeVirtualConnection();
} catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close") + "\n" + toString(), e);
}
notifyCloseListeners(); notifyCloseListeners();
} }
} }
......
...@@ -31,6 +31,7 @@ import java.nio.charset.CharsetEncoder; ...@@ -31,6 +31,7 @@ import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction; import java.nio.charset.CodingErrorAction;
import java.security.KeyStore; import java.security.KeyStore;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManager;
...@@ -75,8 +76,6 @@ public class NIOConnection implements Connection { ...@@ -75,8 +76,6 @@ public class NIOConnection implements Connection {
private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class); private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class);
public enum State { RUNNING, CLOSING, CLOSED }
/** /**
* The utf-8 charset for decoding and encoding XMPP packet streams. * The utf-8 charset for decoding and encoding XMPP packet streams.
*/ */
...@@ -116,7 +115,7 @@ public class NIOConnection implements Connection { ...@@ -116,7 +115,7 @@ public class NIOConnection implements Connection {
* keep this flag to avoid using the connection between #close was used and the socket is actually * keep this flag to avoid using the connection between #close was used and the socket is actually
* closed. * closed.
*/ */
private volatile State state; private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
/** /**
* Lock used to ensure the integrity of the underlying IoSession (refer to * Lock used to ensure the integrity of the underlying IoSession (refer to
...@@ -132,7 +131,6 @@ public class NIOConnection implements Connection { ...@@ -132,7 +131,6 @@ public class NIOConnection implements Connection {
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session; this.ioSession = session;
this.backupDeliverer = packetDeliverer; this.backupDeliverer = packetDeliverer;
state = State.RUNNING;
} }
public boolean validate() { public boolean validate() {
...@@ -220,60 +218,31 @@ public class NIOConnection implements Connection { ...@@ -220,60 +218,31 @@ public class NIOConnection implements Connection {
return backupDeliverer; return backupDeliverer;
} }
@Override
public void close() { public void close() {
close( false ); if (state.compareAndSet(State.OPEN, State.CLOSED)) {
}
public void close( boolean peerIsKnownToBeDisconnected )
{
boolean notifyClose = false;
synchronized ( this ) {
try
{
if ( state == State.CLOSED )
{
return;
}
// This prevents any action after the first invocation of close() on this connection. // Ensure that the state of this connection, its session and the MINA context are eventually closed.
if ( state != State.CLOSING )
{
state = State.CLOSING;
if ( !peerIsKnownToBeDisconnected )
{
try
{
deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
}
catch ( Exception e )
{
// Ignore
}
}
}
// deliverRawText might already have forced the state from Closing to Closed. In that case, there's no need if ( session != null ) {
// to invoke the CloseListeners again. session.setStatus( Session.STATUS_CLOSED );
if ( state == State.CLOSING )
{
notifyClose = true;
}
} }
finally
{ try {
// Ensure that the state of this connection, its session and the MINA context are eventually closed. deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
state = State.CLOSED; } catch ( Exception e ) {
if ( session != null ) Log.error("Failed to deliver stream close tag: " + e.getMessage());
{
session.setStatus( Session.STATUS_CLOSED );
}
ioSession.close( true );
} }
}
if (notifyClose) try {
{ ioSession.close( true );
} catch (Exception e) {
Log.error("Exception while closing MINA session", e);
}
notifyCloseListeners(); // clean up session, etc. notifyCloseListeners(); // clean up session, etc.
}
}
} }
public void systemShutdown() { public void systemShutdown() {
...@@ -301,7 +270,7 @@ public class NIOConnection implements Connection { ...@@ -301,7 +270,7 @@ public class NIOConnection implements Connection {
} }
public boolean isClosed() { public boolean isClosed() {
return state == State.CLOSED; return state.get() == State.CLOSED;
} }
public boolean isSecure() { public boolean isSecure() {
...@@ -309,7 +278,7 @@ public class NIOConnection implements Connection { ...@@ -309,7 +278,7 @@ public class NIOConnection implements Connection {
} }
public void deliver(Packet packet) throws UnauthorizedException { public void deliver(Packet packet) throws UnauthorizedException {
if (state != State.RUNNING) { if (isClosed()) {
backupDeliverer.deliver(packet); backupDeliverer.deliver(packet);
} }
else { else {
...@@ -354,7 +323,7 @@ public class NIOConnection implements Connection { ...@@ -354,7 +323,7 @@ public class NIOConnection implements Connection {
} }
public void deliverRawText(String text) { public void deliverRawText(String text) {
if (state != State.CLOSED) { if (!isClosed()) {
boolean errorDelivering = false; boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(text.length()); IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true); buffer.setAutoExpand(true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment