Commit 77a1cd23 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Optimization: Keep track of all socket connections and ask all connections to...

Optimization: Keep track of all socket connections and ask all connections to check their health. JM-480

git-svn-id: http://svn.igniterealtime.org/svn/repos/messenger/trunk@3169 b35dd754-fafc-0310-a699-88a17e54d16e
parent 5a4cf8d1
...@@ -17,6 +17,7 @@ import org.jivesoftware.messenger.interceptor.InterceptorManager; ...@@ -17,6 +17,7 @@ import org.jivesoftware.messenger.interceptor.InterceptorManager;
import org.jivesoftware.messenger.interceptor.PacketRejectedException; import org.jivesoftware.messenger.interceptor.PacketRejectedException;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.JiveGlobals;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import java.io.BufferedWriter; import java.io.BufferedWriter;
...@@ -27,6 +28,9 @@ import java.net.InetAddress; ...@@ -27,6 +28,9 @@ import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* An object to track the state of a XMPP client-server session. * An object to track the state of a XMPP client-server session.
...@@ -42,6 +46,9 @@ public class SocketConnection implements Connection { ...@@ -42,6 +46,9 @@ public class SocketConnection implements Connection {
*/ */
public static final String CHARSET = "UTF-8"; public static final String CHARSET = "UTF-8";
private static Map<SocketConnection, String> instances =
new ConcurrentHashMap<SocketConnection, String>();
private Map<ConnectionCloseListener, Object> listeners = new HashMap<ConnectionCloseListener, Object>(); private Map<ConnectionCloseListener, Object> listeners = new HashMap<ConnectionCloseListener, Object>();
private Socket socket; private Socket socket;
...@@ -57,12 +64,19 @@ public class SocketConnection implements Connection { ...@@ -57,12 +64,19 @@ public class SocketConnection implements Connection {
private int majorVersion = 1; private int majorVersion = 1;
private int minorVersion = 0; private int minorVersion = 0;
private String language = null; private String language = null;
private TLSStreamHandler tlsStreamHandler; private TLSStreamHandler tlsStreamHandler;
private long writeStarted = -1;
/** /**
* TLS policy currently in use for this connection. * TLS policy currently in use for this connection.
*/ */
private TLSPolicy tlsPolicy = TLSPolicy.optional; private TLSPolicy tlsPolicy = TLSPolicy.optional;
public static Collection<SocketConnection> getInstances() {
return instances.keySet();
}
/** /**
* Create a new session using the supplied socket. * Create a new session using the supplied socket.
* *
...@@ -83,6 +97,8 @@ public class SocketConnection implements Connection { ...@@ -83,6 +97,8 @@ public class SocketConnection implements Connection {
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), CHARSET)); writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), CHARSET));
this.deliverer = deliverer; this.deliverer = deliverer;
xmlSerializer = new XMLSocketWriter(writer, this); xmlSerializer = new XMLSocketWriter(writer, this);
instances.put(this, "");
} }
/** /**
...@@ -93,8 +109,8 @@ public class SocketConnection implements Connection { ...@@ -93,8 +109,8 @@ public class SocketConnection implements Connection {
* the corresponding input and output streams. * the corresponding input and output streams.
*/ */
public TLSStreamHandler getTLSStreamHandler() { public TLSStreamHandler getTLSStreamHandler() {
return tlsStreamHandler; return tlsStreamHandler;
} }
/** /**
* Secures the plain connection by negotiating TLS with the client. * Secures the plain connection by negotiating TLS with the client.
...@@ -103,14 +119,14 @@ public class SocketConnection implements Connection { ...@@ -103,14 +119,14 @@ public class SocketConnection implements Connection {
* @throws IOException if an error occured while securing the connection. * @throws IOException if an error occured while securing the connection.
*/ */
public void startTLS(boolean clientMode) throws IOException { public void startTLS(boolean clientMode) throws IOException {
if (!secure) { if (!secure) {
secure = true; secure = true;
tlsStreamHandler = new TLSStreamHandler(socket, clientMode); tlsStreamHandler = new TLSStreamHandler(socket, clientMode);
writer = new BufferedWriter(new OutputStreamWriter(tlsStreamHandler.getOutputStream(), CHARSET)); writer = new BufferedWriter(new OutputStreamWriter(tlsStreamHandler.getOutputStream(), CHARSET));
xmlSerializer = new XMLSocketWriter(writer, this); xmlSerializer = new XMLSocketWriter(writer, this);
} }
} }
public boolean validate() { public boolean validate() {
if (isClosed()) { if (isClosed()) {
return false; return false;
...@@ -118,7 +134,7 @@ public class SocketConnection implements Connection { ...@@ -118,7 +134,7 @@ public class SocketConnection implements Connection {
try { try {
synchronized (writer) { synchronized (writer) {
// Register that we started sending data on the connection // Register that we started sending data on the connection
SocketSendingTracker.getInstance().socketStartedSending(this); writeStarted();
writer.write(" "); writer.write(" ");
writer.flush(); writer.flush();
} }
...@@ -129,7 +145,7 @@ public class SocketConnection implements Connection { ...@@ -129,7 +145,7 @@ public class SocketConnection implements Connection {
} }
finally { finally {
// Register that we finished sending data on the connection // Register that we finished sending data on the connection
SocketSendingTracker.getInstance().socketFinishedSending(this); writeFinished();
} }
return !isClosed(); return !isClosed();
} }
...@@ -259,7 +275,7 @@ public class SocketConnection implements Connection { ...@@ -259,7 +275,7 @@ public class SocketConnection implements Connection {
synchronized (writer) { synchronized (writer) {
try { try {
// Register that we started sending data on the connection // Register that we started sending data on the connection
SocketSendingTracker.getInstance().socketStartedSending(this); writeStarted();
writer.write("</stream:stream>"); writer.write("</stream:stream>");
if (flashClient) { if (flashClient) {
writer.write('\0'); writer.write('\0');
...@@ -269,7 +285,7 @@ public class SocketConnection implements Connection { ...@@ -269,7 +285,7 @@ public class SocketConnection implements Connection {
catch (IOException e) {} catch (IOException e) {}
finally { finally {
// Register that we finished sending data on the connection // Register that we finished sending data on the connection
SocketSendingTracker.getInstance().socketFinishedSending(this); writeFinished();
} }
} }
} }
...@@ -286,6 +302,32 @@ public class SocketConnection implements Connection { ...@@ -286,6 +302,32 @@ public class SocketConnection implements Connection {
} }
} }
void writeStarted() {
writeStarted = System.currentTimeMillis();
}
void writeFinished() {
writeStarted = -1;
}
void checkHealth() {
// Check that the sending operation is still active
if (writeStarted > -1 && System.currentTimeMillis() - writeStarted >
JiveGlobals.getIntProperty("xmpp.session.sending-limit", 60000)) {
// Close the socket
if (Log.isDebugEnabled()) {
Log.debug("Closing connection: " + this + " that started sending data at: " +
new Date(writeStarted));
}
forceClose();
}
}
void release() {
writeStarted = -1;
instances.remove(this);
}
/** /**
* Forces the connection to be closed immediately no matter if closing the socket takes * Forces the connection to be closed immediately no matter if closing the socket takes
* a long time. This method should only be called from {@link SocketSendingTracker} when * a long time. This method should only be called from {@link SocketSendingTracker} when
...@@ -368,7 +410,7 @@ public class SocketConnection implements Connection { ...@@ -368,7 +410,7 @@ public class SocketConnection implements Connection {
synchronized (writer) { synchronized (writer) {
try { try {
// Register that we started sending data on the connection // Register that we started sending data on the connection
SocketSendingTracker.getInstance().socketStartedSending(this); writeStarted();
writer.write(text); writer.write(text);
if (flashClient) { if (flashClient) {
writer.write('\0'); writer.write('\0');
...@@ -381,7 +423,7 @@ public class SocketConnection implements Connection { ...@@ -381,7 +423,7 @@ public class SocketConnection implements Connection {
} }
finally { finally {
// Register that we finished sending data on the connection // Register that we finished sending data on the connection
SocketSendingTracker.getInstance().socketFinishedSending(this); writeFinished();
} }
} }
if (errorDelivering) { if (errorDelivering) {
......
...@@ -155,6 +155,7 @@ public abstract class SocketReader implements Runnable { ...@@ -155,6 +155,7 @@ public abstract class SocketReader implements Runnable {
Log.error(LocaleUtils.getLocalizedString("admin.error.connection") Log.error(LocaleUtils.getLocalizedString("admin.error.connection")
+ "\n" + socket.toString()); + "\n" + socket.toString());
} }
connection.release();
shutdown(); shutdown();
} }
} }
......
package org.jivesoftware.messenger.net; package org.jivesoftware.messenger.net;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* A SocketSendingTracker keeps track of all the sockets that are currently sending data and * A SocketSendingTracker keeps track of all the sockets that are currently sending data and
* checks the health of the sockets to detect hanged connections. If a sending operation takes * checks the health of the sockets to detect hanged connections. If a sending operation takes
...@@ -29,11 +22,6 @@ public class SocketSendingTracker { ...@@ -29,11 +22,6 @@ public class SocketSendingTracker {
private static SocketSendingTracker instance = new SocketSendingTracker(); private static SocketSendingTracker instance = new SocketSendingTracker();
/**
* Map that holds the connections that are currently sending information together with the date
* when the sending operation started.
*/
private Map<SocketConnection, Date> sockets = new ConcurrentHashMap<SocketConnection, Date>();
/** /**
* Flag that indicates if the tracket should shutdown the tracking process. * Flag that indicates if the tracket should shutdown the tracking process.
...@@ -61,27 +49,6 @@ public class SocketSendingTracker { ...@@ -61,27 +49,6 @@ public class SocketSendingTracker {
private SocketSendingTracker() { private SocketSendingTracker() {
} }
/**
* Register that the specified socket has started sending information. The registration will
* include the timestamp when the sending operation started so that if after several minutes
* it hasn't finished then the socket will be closed.
*
* @param socket the socket that started sending data.
*/
public void socketStartedSending(SocketConnection socket) {
sockets.put(socket, new Date());
}
/**
* Register that the specified socket has finished sending information. The socket will
* be removed from the tracking list.
*
* @param socket the socket that finished sending data.
*/
public void socketFinishedSending(SocketConnection socket) {
sockets.remove(socket);
}
/** /**
* Start up the daemon thread that will check for the health of the sockets that are * Start up the daemon thread that will check for the health of the sockets that are
* currently sending data. * currently sending data.
...@@ -129,26 +96,8 @@ public class SocketSendingTracker { ...@@ -129,26 +96,8 @@ public class SocketSendingTracker {
* quite small. * quite small.
*/ */
private void checkHealth() { private void checkHealth() {
for (SocketConnection connection : sockets.keySet()) { for (SocketConnection connection : SocketConnection.getInstances()) {
Date startDate = sockets.get(connection); connection.checkHealth();
if (startDate != null &&
System.currentTimeMillis() - startDate.getTime() >
JiveGlobals.getIntProperty("xmpp.session.sending-limit", 60000)) {
// Check that the sending operation is still active
if (sockets.get(connection) != null) {
// Close the socket
try {
Log.debug("Closing connection: " + connection +
" that started sending data at: " + startDate);
connection.forceClose();
}
finally {
// Remove tracking on this socket
sockets.remove(connection);
}
}
}
} }
} }
} }
...@@ -26,13 +26,13 @@ public class XMLSocketWriter extends XMLWriter { ...@@ -26,13 +26,13 @@ public class XMLSocketWriter extends XMLWriter {
*/ */
public void flush() throws IOException { public void flush() throws IOException {
// Register that we have started sending data // Register that we have started sending data
SocketSendingTracker.getInstance().socketStartedSending(connection); connection.writeStarted();
try { try {
super.flush(); super.flush();
} }
finally { finally {
// Register that we have finished sending data // Register that we have finished sending data
SocketSendingTracker.getInstance().socketFinishedSending(connection); connection.writeFinished();
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment