Commit aa7a5ef6 authored by Tom Evans's avatar Tom Evans

OF-881: Apply review feedback

- Use AtomicReference instead of synchronized block
- Extend java.io.Closeable; document contract
- Consolidate close/state transition logic
parent 0d1a5c45
...@@ -20,20 +20,21 @@ ...@@ -20,20 +20,21 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import java.io.Closeable;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
/** /**
* Represents a connection on the server. * Represents a connection on the server.
* *
* @author Iain Shigeoka * @author Iain Shigeoka
*/ */
public interface Connection { public interface Connection extends Closeable {
/** /**
* Verifies that the connection is still live. Typically this is done by * Verifies that the connection is still live. Typically this is done by
* sending a whitespace character between packets. * sending a whitespace character between packets.
...@@ -144,7 +145,11 @@ public interface Connection { ...@@ -144,7 +145,11 @@ public interface Connection {
* <li>Call notifyEvent all listeners that the channel is shutting down. * <li>Call notifyEvent all listeners that the channel is shutting down.
* <li>Close the socket. * <li>Close the socket.
* </ul> * </ul>
* Note this method overrides the base interface to suppress exceptions. However,
* it otherwise fulfills the requirements of the {@link Closeable#close()} contract
* (idempotent, try-with-resources, etc.)
*/ */
@Override
public void close(); public void close();
/** /**
...@@ -435,4 +440,10 @@ public interface Connection { ...@@ -435,4 +440,10 @@ public interface Connection {
*/ */
needed needed
} }
/**
* Used to specify operational status for the corresponding connection
*/
enum State { OPEN, CLOSED }
} }
...@@ -35,6 +35,7 @@ import java.util.HashMap; ...@@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
...@@ -88,6 +89,7 @@ public class SocketConnection implements Connection { ...@@ -88,6 +89,7 @@ public class SocketConnection implements Connection {
private Writer writer; private Writer writer;
private AtomicBoolean writing = new AtomicBoolean(false); private AtomicBoolean writing = new AtomicBoolean(false);
private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
/** /**
* Deliverer to use when the connection is closed or was closed when delivering * Deliverer to use when the connection is closed or was closed when delivering
...@@ -307,10 +309,7 @@ public class SocketConnection implements Connection { ...@@ -307,10 +309,7 @@ public class SocketConnection implements Connection {
@Override @Override
public boolean isClosed() { public boolean isClosed() {
if (session == null) { return state.get() == State.CLOSED;
return socket.isClosed();
}
return session.getStatus() == Session.STATUS_CLOSED;
} }
@Override @Override
...@@ -468,40 +467,50 @@ public class SocketConnection implements Connection { ...@@ -468,40 +467,50 @@ public class SocketConnection implements Connection {
@Override @Override
public void close() { public void close() {
close(false);
synchronized (this) { }
if (isClosed()) {
return; /**
} * Normal connection close will attempt to write the stream end tag. Otherwise this method
* forces the connection closed immediately. This method will be called from {@link SocketSendingTracker}
* when sending data over the socket has taken a long time and we need to close the socket, discard
* the connection and its session.
*/
private void close(boolean force) {
if (state.compareAndSet(State.OPEN, State.CLOSED)) {
if (session != null) { if (session != null) {
session.setStatus(Session.STATUS_CLOSED); session.setStatus(Session.STATUS_CLOSED);
} }
}
if (!force) {
boolean allowedToWrite = false; boolean allowedToWrite = false;
try { try {
requestWriting(); requestWriting();
allowedToWrite = true; allowedToWrite = true;
// Register that we started sending data on the connection // Register that we started sending data on the connection
writeStarted(); writeStarted();
writer.write("</stream:stream>"); writer.write("</stream:stream>");
if (flashClient) { if (flashClient) {
writer.write('\0'); writer.write('\0');
}
writer.flush();
}
catch (Exception e) {
Log.error("Failed to deliver stream close tag: " + e.getMessage());
}
// Register that we finished sending data on the connection
writeFinished();
if (allowedToWrite) {
releaseWriting();
}
} }
writer.flush();
} closeConnection();
catch (Exception e) { notifyCloseListeners();
Log.error("Failed to deliver stream close tag: " + e.getMessage());
}
// Register that we finished sending data on the connection
writeFinished();
if (allowedToWrite) {
releaseWriting();
}
closeConnection(); }
notifyCloseListeners();
} }
@Override @Override
...@@ -537,7 +546,7 @@ public class SocketConnection implements Connection { ...@@ -537,7 +546,7 @@ public class SocketConnection implements Connection {
Log.debug("Closing connection: " + this + " that started sending data at: " + Log.debug("Closing connection: " + this + " that started sending data at: " +
new Date(writeTimestamp)); new Date(writeTimestamp));
} }
forceClose(); close(true); // force
return true; return true;
} }
else { else {
...@@ -550,7 +559,7 @@ public class SocketConnection implements Connection { ...@@ -550,7 +559,7 @@ public class SocketConnection implements Connection {
if (Log.isDebugEnabled()) { if (Log.isDebugEnabled()) {
Log.debug("Closing connection that has been idle: " + this); Log.debug("Closing connection that has been idle: " + this);
} }
forceClose(); close(true); // force
return true; return true;
} }
} }
...@@ -562,24 +571,6 @@ public class SocketConnection implements Connection { ...@@ -562,24 +571,6 @@ public class SocketConnection implements Connection {
instances.remove(this); instances.remove(this);
} }
/**
* Forces the connection to be closed immediately no matter if closing the socket takes
* a long time. This method should only be called from {@link SocketSendingTracker} when
* sending data over the socket has taken a long time and we need to close the socket, discard
* the connection and its session.
*/
private void forceClose() {
if (session != null) {
// Set that the session is closed. This will prevent threads from trying to
// deliver packets to this session thus preventing future locks.
session.setStatus(Session.STATUS_CLOSED);
}
closeConnection();
// Notify the close listeners so that the SessionManager can send unavailable
// presences if required.
notifyCloseListeners();
}
private void closeConnection() { private void closeConnection() {
release(); release();
try { try {
......
...@@ -23,6 +23,7 @@ package org.jivesoftware.openfire.net; ...@@ -23,6 +23,7 @@ package org.jivesoftware.openfire.net;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener; import org.jivesoftware.openfire.ConnectionCloseListener;
...@@ -52,7 +53,7 @@ public abstract class VirtualConnection implements Connection { ...@@ -52,7 +53,7 @@ public abstract class VirtualConnection implements Connection {
final private Map<ConnectionCloseListener, Object> listeners = final private Map<ConnectionCloseListener, Object> listeners =
new HashMap<>(); new HashMap<>();
private boolean closed = false; private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
@Override @Override
public String getLanguage() { public String getLanguage() {
...@@ -95,10 +96,7 @@ public abstract class VirtualConnection implements Connection { ...@@ -95,10 +96,7 @@ public abstract class VirtualConnection implements Connection {
@Override @Override
public boolean isClosed() { public boolean isClosed() {
if (session == null) { return state.get() == State.CLOSED;
return closed;
}
return session.getStatus() == Session.STATUS_CLOSED;
} }
@Override @Override
...@@ -194,25 +192,21 @@ public abstract class VirtualConnection implements Connection { ...@@ -194,25 +192,21 @@ public abstract class VirtualConnection implements Connection {
*/ */
@Override @Override
public void close() { public void close() {
if (state.compareAndSet(State.OPEN, State.CLOSED)) {
synchronized (this) {
if (isClosed()) {
return;
}
if (session != null) { if (session != null) {
session.setStatus(Session.STATUS_CLOSED); session.setStatus(Session.STATUS_CLOSED);
} }
try {
closeVirtualConnection();
} catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close") + "\n" + toString(), e);
}
notifyCloseListeners();
} }
try {
closeVirtualConnection();
} catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close")
+ "\n" + this.toString(), e);
}
closed = true;
notifyCloseListeners();
} }
@Override @Override
......
...@@ -33,6 +33,7 @@ import java.nio.charset.CodingErrorAction; ...@@ -33,6 +33,7 @@ import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.KeyStore; import java.security.KeyStore;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
...@@ -53,7 +54,9 @@ import org.jivesoftware.openfire.auth.UnauthorizedException; ...@@ -53,7 +54,9 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.keystore.IdentityStoreConfig; import org.jivesoftware.openfire.keystore.IdentityStoreConfig;
import org.jivesoftware.openfire.keystore.Purpose; import org.jivesoftware.openfire.keystore.Purpose;
import org.jivesoftware.openfire.keystore.TrustStoreConfig; import org.jivesoftware.openfire.keystore.TrustStoreConfig;
import org.jivesoftware.openfire.net.*; import org.jivesoftware.openfire.net.ClientTrustManager;
import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.ServerTrustManager;
import org.jivesoftware.openfire.session.ConnectionSettings; import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
...@@ -75,8 +78,6 @@ public class NIOConnection implements Connection { ...@@ -75,8 +78,6 @@ public class NIOConnection implements Connection {
private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class); private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class);
public enum State { RUNNING, CLOSING, CLOSED }
private LocalSession session; private LocalSession session;
private IoSession ioSession; private IoSession ioSession;
...@@ -111,7 +112,7 @@ public class NIOConnection implements Connection { ...@@ -111,7 +112,7 @@ public class NIOConnection implements Connection {
* keep this flag to avoid using the connection between #close was used and the socket is actually * keep this flag to avoid using the connection between #close was used and the socket is actually
* closed. * closed.
*/ */
private volatile State state; private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
/** /**
* Lock used to ensure the integrity of the underlying IoSession (refer to * Lock used to ensure the integrity of the underlying IoSession (refer to
...@@ -127,7 +128,6 @@ public class NIOConnection implements Connection { ...@@ -127,7 +128,6 @@ public class NIOConnection implements Connection {
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session; this.ioSession = session;
this.backupDeliverer = packetDeliverer; this.backupDeliverer = packetDeliverer;
state = State.RUNNING;
} }
@Override @Override
...@@ -228,33 +228,29 @@ public class NIOConnection implements Connection { ...@@ -228,33 +228,29 @@ public class NIOConnection implements Connection {
@Override @Override
public void close() { public void close() {
synchronized ( this ) { if (state.compareAndSet(State.OPEN, State.CLOSED)) {
// prevent recursion while closing
if ( state == State.CLOSED || state == State.CLOSING) {
return;
}
state = State.CLOSING;
}
try { // Ensure that the state of this connection, its session and the MINA context are eventually closed.
deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
} catch ( Exception e ) { if ( session != null ) {
Log.error("Failed to deliver stream close tag: " + e.getMessage()); session.setStatus( Session.STATUS_CLOSED );
} }
// Ensure that the state of this connection, its session and the MINA context are eventually closed. try {
if ( session != null ) { deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
session.setStatus( Session.STATUS_CLOSED ); } catch ( Exception e ) {
} Log.error("Failed to deliver stream close tag: " + e.getMessage());
}
try {
ioSession.close( true ); try {
} catch (Exception e) { ioSession.close( true );
Log.error("Exception while closing MINA session", e); } catch (Exception e) {
} Log.error("Exception while closing MINA session", e);
}
state = State.CLOSED; notifyCloseListeners(); // clean up session, etc.
notifyCloseListeners(); // clean up session, etc.
}
} }
@Override @Override
...@@ -285,7 +281,7 @@ public class NIOConnection implements Connection { ...@@ -285,7 +281,7 @@ public class NIOConnection implements Connection {
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return state == State.CLOSED; return state.get() == State.CLOSED;
} }
@Override @Override
...@@ -295,7 +291,7 @@ public class NIOConnection implements Connection { ...@@ -295,7 +291,7 @@ public class NIOConnection implements Connection {
@Override @Override
public void deliver(Packet packet) throws UnauthorizedException { public void deliver(Packet packet) throws UnauthorizedException {
if (state != State.RUNNING) { if (isClosed()) {
backupDeliverer.deliver(packet); backupDeliverer.deliver(packet);
} }
else { else {
...@@ -341,7 +337,7 @@ public class NIOConnection implements Connection { ...@@ -341,7 +337,7 @@ public class NIOConnection implements Connection {
@Override @Override
public void deliverRawText(String text) { public void deliverRawText(String text) {
if (state != State.CLOSED) { if (!isClosed()) {
boolean errorDelivering = false; boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(text.length()); IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true); buffer.setAutoExpand(true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment