Commit 111fe941 authored by Tom Evans's avatar Tom Evans

Merge pull request #155 from Flowdalic/fixes

OF-855/OF-857 improvements
parents b1599c97 4158dd77
......@@ -134,8 +134,7 @@ public class HttpSession extends LocalClientSession {
public HttpSession(PacketDeliverer backupDeliverer, String serverName, InetAddress address,
StreamID streamID, long rid, HttpConnection connection) {
super(serverName, null, streamID);
conn = new HttpVirtualConnection(address);
super(serverName, new HttpVirtualConnection(address), streamID);
this.isClosed = false;
this.lastActivity = System.currentTimeMillis();
this.lastRequestID = rid;
......
......@@ -31,7 +31,7 @@ import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
......@@ -116,16 +116,20 @@ public class NIOConnection implements Connection {
private boolean closed;
/**
* Lock used to ensure the integrity of the underlying IoSession
* (refer to https://issues.apache.org/jira/browse/DIRMINA-653 for details)
* Lock used to ensure the integrity of the underlying IoSession (refer to
* https://issues.apache.org/jira/browse/DIRMINA-653 for details)
* <p>
* This lock can be removed once Openfire guarantees a stable delivery
* order, in which case {@link #deliver(Packet)} won't be called
* concurrently any more, which made this lock necessary in the first place.
* </p>
*/
private Semaphore ioSessionLock;
private final ReentrantLock ioSessionLock = new ReentrantLock(true);
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session;
this.backupDeliverer = packetDeliverer;
closed = false;
ioSessionLock = new Semaphore(1, true);
}
public boolean validate() {
......@@ -261,12 +265,9 @@ public class NIOConnection implements Connection {
}
else {
boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(4096);
buffer.setAutoExpand(true);
try {
ioSessionLock.acquire();
IoBuffer buffer = IoBuffer.allocate(4096);
buffer.setAutoExpand(true);
// OF-464: if the connection has been dropped, fail over to backupDeliverer (offline)
if (!ioSession.isConnected()) {
throw new IOException("Connection reset/closed by peer");
......@@ -279,15 +280,18 @@ public class NIOConnection implements Connection {
buffer.put((byte) '\0');
}
buffer.flip();
ioSession.write(buffer);
ioSessionLock.lock();
try {
ioSession.write(buffer);
} finally {
ioSessionLock.unlock();
}
}
catch (Exception e) {
Log.debug("Error delivering packet:\n" + packet, e);
errorDelivering = true;
}
finally {
ioSessionLock.release();
}
if (errorDelivering) {
close();
// Retry sending the packet again. Most probably if the packet is a
......@@ -307,14 +311,10 @@ public class NIOConnection implements Connection {
private void deliverRawText(String text, boolean asynchronous) {
if (!isClosed()) {
boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true);
try {
ioSessionLock.acquire();
IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true);
//Charset charset = Charset.forName(CHARSET);
//buffer.putString(text, charset.newEncoder());
buffer.put(text.getBytes(CHARSET));
......@@ -322,29 +322,33 @@ public class NIOConnection implements Connection {
buffer.put((byte) '\0');
}
buffer.flip();
if (asynchronous) {
// OF-464: handle dropped connections (no backupDeliverer in this case?)
if (!ioSession.isConnected()) {
throw new IOException("Connection reset/closed by peer");
}
ioSession.write(buffer);
}
else {
// Send stanza and wait for ACK (using a 2 seconds default timeout)
boolean ok =
ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
if (!ok) {
Log.warn("No ACK was received when sending stanza to: " + this.toString());
ioSessionLock.lock();
try {
if (asynchronous) {
// OF-464: handle dropped connections (no backupDeliverer in this case?)
if (!ioSession.isConnected()) {
throw new IOException("Connection reset/closed by peer");
}
ioSession.write(buffer);
}
else {
// Send stanza and wait for ACK (using a 2 seconds default timeout)
boolean ok =
ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
if (!ok) {
Log.warn("No ACK was received when sending stanza to: " + this.toString());
}
}
}
finally {
ioSessionLock.unlock();
}
}
catch (Exception e) {
Log.debug("Error delivering raw text:\n" + text, e);
errorDelivering = true;
}
finally {
ioSessionLock.release();
}
// Close the connection if delivering text fails and we are already not closing the connection
if (errorDelivering && asynchronous) {
close();
......
......@@ -854,14 +854,7 @@ public class LocalClientSession extends LocalSession implements ClientSession {
@Override
public void deliver(Packet packet) throws UnauthorizedException {
if (conn != null) {
conn.deliver(packet);
} else {
// invalid session; clean up and retry delivery (offline)
Log.error("Failed to deliver packet to invalid session (no connection); will retry");
sessionManager.removeSession(this);
XMPPServer.getInstance().getPacketDeliverer().deliver(packet);
}
conn.deliver(packet);
}
@Override
......
......@@ -302,7 +302,7 @@ public class LocalConnectionMultiplexerSession extends LocalSession implements C
@Override
void deliver(Packet packet) throws UnauthorizedException {
if (conn != null && !conn.isClosed()) {
if (!conn.isClosed()) {
conn.deliver(packet);
}
}
......
......@@ -611,7 +611,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
@Override
void deliver(Packet packet) throws UnauthorizedException {
if (conn != null && !conn.isClosed()) {
if (!conn.isClosed()) {
conn.deliver(packet);
}
}
......
......@@ -75,7 +75,7 @@ public abstract class LocalSession implements Session {
/**
* The connection that this session represents.
*/
protected Connection conn;
protected final Connection conn;
protected SessionManager sessionManager;
......@@ -101,6 +101,9 @@ public abstract class LocalSession implements Session {
* @param streamID unique identifier for this session.
*/
public LocalSession(String serverName, Connection connection, StreamID streamID) {
if (connection == null) {
throw new IllegalArgumentException("connection must not be null");
}
conn = connection;
this.streamID = streamID;
this.serverName = serverName;
......@@ -328,9 +331,7 @@ public abstract class LocalSession implements Session {
abstract void deliver(Packet packet) throws UnauthorizedException;
public void deliverRawText(String text) {
if (conn != null) {
conn.deliverRawText(text);
}
conn.deliverRawText(text);
}
/**
......@@ -342,9 +343,7 @@ public abstract class LocalSession implements Session {
public abstract String getAvailableStreamFeatures();
public void close() {
if (conn != null) {
conn.close();
}
conn.close();
}
public boolean validate() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment