Commit 4158dd77 authored by Florian Schmaus's avatar Florian Schmaus

Use ReentrackLock instead of Semaphore

Also decrease the size of the critical section the lock protects.

Related to OF-857.
parent 7bbfb9db
...@@ -31,7 +31,7 @@ import java.nio.charset.CharsetEncoder; ...@@ -31,7 +31,7 @@ import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction; import java.nio.charset.CodingErrorAction;
import java.security.KeyStore; import java.security.KeyStore;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import java.util.concurrent.Semaphore; import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
...@@ -116,16 +116,20 @@ public class NIOConnection implements Connection { ...@@ -116,16 +116,20 @@ public class NIOConnection implements Connection {
private boolean closed; private boolean closed;
/** /**
* Lock used to ensure the integrity of the underlying IoSession * Lock used to ensure the integrity of the underlying IoSession (refer to
* (refer to https://issues.apache.org/jira/browse/DIRMINA-653 for details) * https://issues.apache.org/jira/browse/DIRMINA-653 for details)
* <p>
* This lock can be removed once Openfire guarantees a stable delivery
* order, in which case {@link #deliver(Packet)} won't be called
* concurrently any more, which made this lock necessary in the first place.
* </p>
*/ */
private Semaphore ioSessionLock; private final ReentrantLock ioSessionLock = new ReentrantLock(true);
public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) { public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
this.ioSession = session; this.ioSession = session;
this.backupDeliverer = packetDeliverer; this.backupDeliverer = packetDeliverer;
closed = false; closed = false;
ioSessionLock = new Semaphore(1, true);
} }
public boolean validate() { public boolean validate() {
...@@ -261,12 +265,9 @@ public class NIOConnection implements Connection { ...@@ -261,12 +265,9 @@ public class NIOConnection implements Connection {
} }
else { else {
boolean errorDelivering = false; boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(4096);
buffer.setAutoExpand(true);
try { try {
ioSessionLock.acquire();
IoBuffer buffer = IoBuffer.allocate(4096);
buffer.setAutoExpand(true);
// OF-464: if the connection has been dropped, fail over to backupDeliverer (offline) // OF-464: if the connection has been dropped, fail over to backupDeliverer (offline)
if (!ioSession.isConnected()) { if (!ioSession.isConnected()) {
throw new IOException("Connection reset/closed by peer"); throw new IOException("Connection reset/closed by peer");
...@@ -279,15 +280,18 @@ public class NIOConnection implements Connection { ...@@ -279,15 +280,18 @@ public class NIOConnection implements Connection {
buffer.put((byte) '\0'); buffer.put((byte) '\0');
} }
buffer.flip(); buffer.flip();
ioSession.write(buffer);
ioSessionLock.lock();
try {
ioSession.write(buffer);
} finally {
ioSessionLock.unlock();
}
} }
catch (Exception e) { catch (Exception e) {
Log.debug("Error delivering packet:\n" + packet, e); Log.debug("Error delivering packet:\n" + packet, e);
errorDelivering = true; errorDelivering = true;
} }
finally {
ioSessionLock.release();
}
if (errorDelivering) { if (errorDelivering) {
close(); close();
// Retry sending the packet again. Most probably if the packet is a // Retry sending the packet again. Most probably if the packet is a
...@@ -307,14 +311,10 @@ public class NIOConnection implements Connection { ...@@ -307,14 +311,10 @@ public class NIOConnection implements Connection {
private void deliverRawText(String text, boolean asynchronous) { private void deliverRawText(String text, boolean asynchronous) {
if (!isClosed()) { if (!isClosed()) {
boolean errorDelivering = false; boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true);
try { try {
ioSessionLock.acquire();
IoBuffer buffer = IoBuffer.allocate(text.length());
buffer.setAutoExpand(true);
//Charset charset = Charset.forName(CHARSET); //Charset charset = Charset.forName(CHARSET);
//buffer.putString(text, charset.newEncoder()); //buffer.putString(text, charset.newEncoder());
buffer.put(text.getBytes(CHARSET)); buffer.put(text.getBytes(CHARSET));
...@@ -322,29 +322,33 @@ public class NIOConnection implements Connection { ...@@ -322,29 +322,33 @@ public class NIOConnection implements Connection {
buffer.put((byte) '\0'); buffer.put((byte) '\0');
} }
buffer.flip(); buffer.flip();
if (asynchronous) { ioSessionLock.lock();
// OF-464: handle dropped connections (no backupDeliverer in this case?) try {
if (!ioSession.isConnected()) { if (asynchronous) {
throw new IOException("Connection reset/closed by peer"); // OF-464: handle dropped connections (no backupDeliverer in this case?)
} if (!ioSession.isConnected()) {
ioSession.write(buffer); throw new IOException("Connection reset/closed by peer");
} }
else { ioSession.write(buffer);
// Send stanza and wait for ACK (using a 2 seconds default timeout)
boolean ok =
ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
if (!ok) {
Log.warn("No ACK was received when sending stanza to: " + this.toString());
} }
else {
// Send stanza and wait for ACK (using a 2 seconds default timeout)
boolean ok =
ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
if (!ok) {
Log.warn("No ACK was received when sending stanza to: " + this.toString());
}
}
}
finally {
ioSessionLock.unlock();
} }
} }
catch (Exception e) { catch (Exception e) {
Log.debug("Error delivering raw text:\n" + text, e); Log.debug("Error delivering raw text:\n" + text, e);
errorDelivering = true; errorDelivering = true;
} }
finally {
ioSessionLock.release();
}
// Close the connection if delivering text fails and we are already not closing the connection // Close the connection if delivering text fails and we are already not closing the connection
if (errorDelivering && asynchronous) { if (errorDelivering && asynchronous) {
close(); close();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment