Commit 5ebe8173 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Fix deadlock when writing data to a socket. JM-574

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3487 b35dd754-fafc-0310-a699-88a17e54d16e
parent c27592d2
...@@ -35,6 +35,7 @@ import java.util.Date; ...@@ -35,6 +35,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* An object to track the state of a XMPP client-server session. * An object to track the state of a XMPP client-server session.
...@@ -70,6 +71,7 @@ public class SocketConnection implements Connection { ...@@ -70,6 +71,7 @@ public class SocketConnection implements Connection {
private SocketReader socketReader; private SocketReader socketReader;
private Writer writer; private Writer writer;
private AtomicBoolean writing = new AtomicBoolean(false);
private PacketDeliverer deliverer; private PacketDeliverer deliverer;
...@@ -181,13 +183,14 @@ public class SocketConnection implements Connection { ...@@ -181,13 +183,14 @@ public class SocketConnection implements Connection {
if (isClosed()) { if (isClosed()) {
return false; return false;
} }
boolean allowedToWrite = false;
try { try {
synchronized (writer) { requestWriting();
// Register that we started sending data on the connection allowedToWrite = true;
writeStarted(); // Register that we started sending data on the connection
writer.write(" "); writeStarted();
writer.flush(); writer.write(" ");
} writer.flush();
} }
catch (Exception e) { catch (Exception e) {
Log.warn("Closing no longer valid connection" + "\n" + this.toString(), e); Log.warn("Closing no longer valid connection" + "\n" + this.toString(), e);
...@@ -196,6 +199,9 @@ public class SocketConnection implements Connection { ...@@ -196,6 +199,9 @@ public class SocketConnection implements Connection {
finally { finally {
// Register that we finished sending data on the connection // Register that we finished sending data on the connection
writeFinished(); writeFinished();
if (allowedToWrite) {
releaseWriting();
}
} }
return !isClosed(); return !isClosed();
} }
...@@ -331,20 +337,24 @@ public class SocketConnection implements Connection { ...@@ -331,20 +337,24 @@ public class SocketConnection implements Connection {
if (session != null) { if (session != null) {
session.setStatus(Session.STATUS_CLOSED); session.setStatus(Session.STATUS_CLOSED);
} }
synchronized (writer) { boolean allowedToWrite = false;
try { try {
// Register that we started sending data on the connection requestWriting();
writeStarted(); allowedToWrite = true;
writer.write("</stream:stream>"); // Register that we started sending data on the connection
if (flashClient) { writeStarted();
writer.write('\0'); writer.write("</stream:stream>");
} if (flashClient) {
writer.flush(); writer.write('\0');
} }
catch (IOException e) {} writer.flush();
finally { }
// Register that we finished sending data on the connection catch (IOException e) {}
writeFinished(); finally {
// Register that we finished sending data on the connection
writeFinished();
if (allowedToWrite) {
releaseWriting();
} }
} }
} }
...@@ -369,7 +379,15 @@ public class SocketConnection implements Connection { ...@@ -369,7 +379,15 @@ public class SocketConnection implements Connection {
writeStarted = -1; writeStarted = -1;
} }
void checkHealth() { /**
* Returns true if the socket was closed due to a bad health. The socket is considered to
* be in a bad state if a thread has been writing for a while and the write operation has
* not finished in a long time or when the client has not sent a heartbeat for a long time.
* In any of both cases the socket will be closed.
*
* @return true if the socket was closed due to a bad health.s
*/
boolean checkHealth() {
// Check that the sending operation is still active // Check that the sending operation is still active
if (writeStarted > -1 && System.currentTimeMillis() - writeStarted > if (writeStarted > -1 && System.currentTimeMillis() - writeStarted >
JiveGlobals.getIntProperty("xmpp.session.sending-limit", 60000)) { JiveGlobals.getIntProperty("xmpp.session.sending-limit", 60000)) {
...@@ -379,6 +397,7 @@ public class SocketConnection implements Connection { ...@@ -379,6 +397,7 @@ public class SocketConnection implements Connection {
new Date(writeStarted)); new Date(writeStarted));
} }
forceClose(); forceClose();
return true;
} }
else { else {
// Check if the connection has been idle. A connection is considered idle if the client // Check if the connection has been idle. A connection is considered idle if the client
...@@ -391,8 +410,10 @@ public class SocketConnection implements Connection { ...@@ -391,8 +410,10 @@ public class SocketConnection implements Connection {
Log.debug("Closing connection that has been idle: " + this); Log.debug("Closing connection that has been idle: " + this);
} }
forceClose(); forceClose();
return true;
} }
} }
return false;
} }
private void release() { private void release() {
...@@ -446,17 +467,23 @@ public class SocketConnection implements Connection { ...@@ -446,17 +467,23 @@ public class SocketConnection implements Connection {
// Invoke the interceptors before we send the packet // Invoke the interceptors before we send the packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, false, false); InterceptorManager.getInstance().invokeInterceptors(packet, session, false, false);
boolean errorDelivering = false; boolean errorDelivering = false;
synchronized (writer) { boolean allowedToWrite = false;
try { try {
xmlSerializer.write(packet.getElement()); requestWriting();
if (flashClient) { allowedToWrite = true;
writer.write('\0'); xmlSerializer.write(packet.getElement());
} if (flashClient) {
xmlSerializer.flush(); writer.write('\0');
} }
catch (IOException e) { xmlSerializer.flush();
Log.debug("Error delivering packet" + "\n" + this.toString(), e); }
errorDelivering = true; catch (Exception e) {
Log.debug("Error delivering packet" + "\n" + this.toString(), e);
errorDelivering = true;
}
finally {
if (allowedToWrite) {
releaseWriting();
} }
} }
if (errorDelivering) { if (errorDelivering) {
...@@ -480,23 +507,27 @@ public class SocketConnection implements Connection { ...@@ -480,23 +507,27 @@ public class SocketConnection implements Connection {
public void deliverRawText(String text) { public void deliverRawText(String text) {
if (!isClosed()) { if (!isClosed()) {
boolean errorDelivering = false; boolean errorDelivering = false;
synchronized (writer) { boolean allowedToWrite = false;
try { try {
// Register that we started sending data on the connection requestWriting();
writeStarted(); allowedToWrite = true;
writer.write(text); // Register that we started sending data on the connection
if (flashClient) { writeStarted();
writer.write('\0'); writer.write(text);
} if (flashClient) {
writer.flush(); writer.write('\0');
}
catch (IOException e) {
Log.debug("Error delivering raw text" + "\n" + this.toString(), e);
errorDelivering = true;
} }
finally { writer.flush();
// Register that we finished sending data on the connection }
writeFinished(); catch (Exception e) {
Log.debug("Error delivering raw text" + "\n" + this.toString(), e);
errorDelivering = true;
}
finally {
// Register that we finished sending data on the connection
writeFinished();
if (allowedToWrite) {
releaseWriting();
} }
} }
if (errorDelivering) { if (errorDelivering) {
...@@ -522,6 +553,29 @@ public class SocketConnection implements Connection { ...@@ -522,6 +553,29 @@ public class SocketConnection implements Connection {
} }
} }
private void requestWriting() throws Exception {
for (;;) {
if (writing.compareAndSet(false, true)) {
// We are now in writing mode and only we can write to the socket
return;
}
else {
// Check health of the socket
if (checkHealth()) {
// Connection was closed then stop
throw new Exception("Probable dead connection was closed");
}
else {
Thread.sleep(1);
}
}
}
}
private void releaseWriting() {
writing.compareAndSet(true, false);
}
public String toString() { public String toString() {
return super.toString() + " socket: " + socket + " session: " + session; return super.toString() + " socket: " + socket + " session: " + session;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment