Commit fa3bec5c authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Close the connection if no data has been received for a while. JM-486

git-svn-id: http://svn.igniterealtime.org/svn/repos/messenger/trunk@3187 b35dd754-fafc-0310-a699-88a17e54d16e
parent b1645b79
...@@ -20,6 +20,7 @@ import org.dom4j.QName; ...@@ -20,6 +20,7 @@ import org.dom4j.QName;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory; import org.xmlpull.v1.XmlPullParserFactory;
import org.jivesoftware.messenger.net.MXParser;
/** /**
* <p><code>XPPPacketReader</code> is a Reader of DOM4J documents that * <p><code>XPPPacketReader</code> is a Reader of DOM4J documents that
...@@ -41,7 +42,7 @@ public class XPPPacketReader { ...@@ -41,7 +42,7 @@ public class XPPPacketReader {
/** /**
* <code>XmlPullParser</code> used to parse XML * <code>XmlPullParser</code> used to parse XML
*/ */
private XmlPullParser xppParser; private MXParser xppParser;
/** /**
* <code>XmlPullParser</code> used to parse XML * <code>XmlPullParser</code> used to parse XML
...@@ -53,6 +54,12 @@ public class XPPPacketReader { ...@@ -53,6 +54,12 @@ public class XPPPacketReader {
*/ */
private DispatchHandler dispatchHandler; private DispatchHandler dispatchHandler;
/**
* Last time a full Document was read or a heartbeat was received. Hearbeats
* are represented as whitespaces received while a Document is not being parsed.
*/
private long lastActive;
public XPPPacketReader() { public XPPPacketReader() {
} }
...@@ -179,16 +186,16 @@ public class XPPPacketReader { ...@@ -179,16 +186,16 @@ public class XPPPacketReader {
// Properties // Properties
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
public XmlPullParser getXPPParser() throws XmlPullParserException { public MXParser getXPPParser() throws XmlPullParserException {
if (xppParser == null) { if (xppParser == null) {
xppParser = getXPPFactory().newPullParser(); xppParser = (MXParser) getXPPFactory().newPullParser();
} }
return xppParser; return xppParser;
} }
public XmlPullParserFactory getXPPFactory() throws XmlPullParserException { public XmlPullParserFactory getXPPFactory() throws XmlPullParserException {
if (xppFactory == null) { if (xppFactory == null) {
xppFactory = XmlPullParserFactory.newInstance(); xppFactory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
} }
xppFactory.setNamespaceAware(true); xppFactory.setNamespaceAware(true);
return xppFactory; return xppFactory;
...@@ -255,6 +262,16 @@ public class XPPPacketReader { ...@@ -255,6 +262,16 @@ public class XPPPacketReader {
getDispatchHandler().setDefaultHandler(handler); getDispatchHandler().setDefaultHandler(handler);
} }
/**
* Returns the last time a full Document was read or a heartbeat was received. Hearbeats
* are represented as whitespaces received while a Document is not being parsed.
*
* @return the time in milliseconds when the last document or heartbeat was received.
*/
public long getLastActive() {
return lastActive;
}
// Implementation methods // Implementation methods
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
public Document parseDocument() throws DocumentException, IOException, XmlPullParserException { public Document parseDocument() throws DocumentException, IOException, XmlPullParserException {
...@@ -271,17 +288,21 @@ public class XPPPacketReader { ...@@ -271,17 +288,21 @@ public class XPPPacketReader {
String text = pp.getText(); String text = pp.getText();
int loc = text.indexOf(" "); int loc = text.indexOf(" ");
if (loc >= 0) { if (loc >= 0) {
document.addProcessingInstruction(text.substring(0, loc), text.substring(loc + 1)); document.addProcessingInstruction(text.substring(0, loc),
text.substring(loc + 1));
} }
else else {
document.addProcessingInstruction(text, ""); document.addProcessingInstruction(text, "");
}
break; break;
} }
case XmlPullParser.COMMENT: { case XmlPullParser.COMMENT: {
if (parent != null) if (parent != null) {
parent.addComment(pp.getText()); parent.addComment(pp.getText());
else }
else {
document.addComment(pp.getText()); document.addComment(pp.getText());
}
break; break;
} }
case XmlPullParser.CDSECT: { case XmlPullParser.CDSECT: {
...@@ -328,9 +349,12 @@ public class XPPPacketReader { ...@@ -328,9 +349,12 @@ public class XPPPacketReader {
} }
int nsStart = pp.getNamespaceCount(pp.getDepth() - 1); int nsStart = pp.getNamespaceCount(pp.getDepth() - 1);
int nsEnd = pp.getNamespaceCount(pp.getDepth()); int nsEnd = pp.getNamespaceCount(pp.getDepth());
for (int i = nsStart; i < nsEnd; i++) for (int i = nsStart; i < nsEnd; i++) {
if (pp.getNamespacePrefix(i) != null) if (pp.getNamespacePrefix(i) != null) {
newElement.addNamespace(pp.getNamespacePrefix(i), pp.getNamespaceUri(i)); newElement
.addNamespace(pp.getNamespacePrefix(i), pp.getNamespaceUri(i));
}
}
for (int i = 0; i < pp.getAttributeCount(); i++) { for (int i = 0; i < pp.getAttributeCount(); i++) {
QName qa = (pp.getAttributePrefix(i) == null) ? df.createQName(pp.getAttributeName(i)) : df.createQName(pp.getAttributeName(i), pp.getAttributePrefix(i), pp.getAttributeNamespace(i)); QName qa = (pp.getAttributePrefix(i) == null) ? df.createQName(pp.getAttributeName(i)) : df.createQName(pp.getAttributeName(i), pp.getAttributePrefix(i), pp.getAttributeNamespace(i));
newElement.addAttribute(qa, pp.getAttributeValue(i)); newElement.addAttribute(qa, pp.getAttributeValue(i));
...@@ -351,6 +375,8 @@ public class XPPPacketReader { ...@@ -351,6 +375,8 @@ public class XPPPacketReader {
} }
count--; count--;
if (count < 1) { if (count < 1) {
// Update the last time a Document was received
lastActive = System.currentTimeMillis();
return document; return document;
} }
break; break;
...@@ -367,6 +393,11 @@ public class XPPPacketReader { ...@@ -367,6 +393,11 @@ public class XPPPacketReader {
} }
break; break;
} }
case XmlPullParser.IGNORABLE_WHITESPACE: {
//System.out.println("Heartbeat was detected");
// Update the last time a heartbeat was received
lastActive = System.currentTimeMillis();
}
default: default:
{ {
; ;
......
...@@ -58,6 +58,16 @@ public class ClientSession extends Session { ...@@ -58,6 +58,16 @@ public class ClientSession extends Session {
private static Connection.TLSPolicy tlsPolicy; private static Connection.TLSPolicy tlsPolicy;
private static Connection.CompressionPolicy compressionPolicy; private static Connection.CompressionPolicy compressionPolicy;
/**
* Milliseconds a connection has to be idle to be closed. Default is 30 minutes. Sending
* stanzas to the client is not considered as activity. We are only considering the connection
* active when the client sends some data or hearbeats (i.e. whitespaces) to the server.
* The reason for this is that sending data will fail if the connection is closed. And if
* the thread is blocked while sending data (because the socket is closed) then the clean up
* thread will close the socket anyway.
*/
private static long idleTimeout;
/** /**
* The authentication token for this session. * The authentication token for this session.
*/ */
...@@ -103,6 +113,9 @@ public class ClientSession extends Session { ...@@ -103,6 +113,9 @@ public class ClientSession extends Session {
policyName = JiveGlobals.getProperty("xmpp.client.compression.policy", policyName = JiveGlobals.getProperty("xmpp.client.compression.policy",
Connection.CompressionPolicy.disabled.toString()); Connection.CompressionPolicy.disabled.toString());
compressionPolicy = Connection.CompressionPolicy.valueOf(policyName); compressionPolicy = Connection.CompressionPolicy.valueOf(policyName);
// Set the default read idle timeout. If none was set then assume 30 minutes
idleTimeout = JiveGlobals.getIntProperty("xmpp.client.idle", 30 * 60 * 1000);
} }
/** /**
...@@ -212,6 +225,10 @@ public class ClientSession extends Session { ...@@ -212,6 +225,10 @@ public class ClientSession extends Session {
// Indicate the compression policy to use for this connection // Indicate the compression policy to use for this connection
connection.setCompressionPolicy(compressionPolicy); connection.setCompressionPolicy(compressionPolicy);
// Set the max number of milliseconds the connection may not receive data from the
// client before closing the connection
connection.setIdleTimeout(idleTimeout);
// Create a ClientSession for this user. // Create a ClientSession for this user.
Session session = SessionManager.getInstance().createClientSession(connection); Session session = SessionManager.getInstance().createClientSession(connection);
...@@ -363,6 +380,31 @@ public class ClientSession extends Session { ...@@ -363,6 +380,31 @@ public class ClientSession extends Session {
JiveGlobals.setProperty("xmpp.client.compression.policy", compressionPolicy.toString()); JiveGlobals.setProperty("xmpp.client.compression.policy", compressionPolicy.toString());
} }
/**
* Returns the number of milliseconds a connection has to be idle to be closed. Default is
* 30 minutes. Sending stanzas to the client is not considered as activity. We are only
* considering the connection active when the client sends some data or hearbeats
* (i.e. whitespaces) to the server.
*
* @return the number of milliseconds a connection has to be idle to be closed.
*/
public static long getIdleTimeout() {
return idleTimeout;
}
/**
* Sets the number of milliseconds a connection has to be idle to be closed. Default is
* 30 minutes. Sending stanzas to the client is not considered as activity. We are only
* considering the connection active when the client sends some data or hearbeats
* (i.e. whitespaces) to the server.
*
* @param timeout the number of milliseconds a connection has to be idle to be closed.
*/
public static void setIdleTimeout(long timeout) {
idleTimeout = timeout;
JiveGlobals.setProperty("xmpp.client.idle", Long.toString(idleTimeout));
}
/** /**
* Creates a session with an underlying connection and permission protection. * Creates a session with an underlying connection and permission protection.
* *
......
...@@ -11,13 +11,12 @@ ...@@ -11,13 +11,12 @@
package org.jivesoftware.messenger; package org.jivesoftware.messenger;
import org.xmpp.packet.Packet;
import org.jivesoftware.messenger.auth.UnauthorizedException; import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.messenger.net.SocketConnection; import org.xmpp.packet.Packet;
import java.io.Writer;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.io.Writer;
/** /**
* Represents a connection on the server. * Represents a connection on the server.
...@@ -221,6 +220,26 @@ public interface Connection { ...@@ -221,6 +220,26 @@ public interface Connection {
*/ */
void setTlsPolicy(TLSPolicy tlsPolicy); void setTlsPolicy(TLSPolicy tlsPolicy);
/**
* Returns the number of milliseconds a connection has to be idle to be closed. Sending
* stanzas to the client is not considered as activity. We are only considering the
* connection active when the client sends some data or hearbeats (i.e. whitespaces)
* to the server.
*
* @return the number of milliseconds a connection has to be idle to be closed.
*/
long getIdleTimeout();
/**
* Sets the number of milliseconds a connection has to be idle to be closed. Sending
* stanzas to the client is not considered as activity. We are only considering the
* connection active when the client sends some data or hearbeats (i.e. whitespaces)
* to the server.
*
* @param timeout the number of milliseconds a connection has to be idle to be closed.
*/
void setIdleTimeout(long timeout);
/** /**
* Enumeration of possible compression policies required to interact with the server. * Enumeration of possible compression policies required to interact with the server.
*/ */
......
...@@ -15,9 +15,9 @@ import org.jivesoftware.messenger.*; ...@@ -15,9 +15,9 @@ import org.jivesoftware.messenger.*;
import org.jivesoftware.messenger.auth.UnauthorizedException; import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.messenger.interceptor.InterceptorManager; import org.jivesoftware.messenger.interceptor.InterceptorManager;
import org.jivesoftware.messenger.interceptor.PacketRejectedException; import org.jivesoftware.messenger.interceptor.PacketRejectedException;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.JiveGlobals;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import java.io.BufferedWriter; import java.io.BufferedWriter;
...@@ -26,12 +26,12 @@ import java.io.OutputStreamWriter; ...@@ -26,12 +26,12 @@ import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.*; import java.util.zip.ZipOutputStream;
/** /**
* An object to track the state of a XMPP client-server session. * An object to track the state of a XMPP client-server session.
...@@ -50,9 +50,21 @@ public class SocketConnection implements Connection { ...@@ -50,9 +50,21 @@ public class SocketConnection implements Connection {
private static Map<SocketConnection, String> instances = private static Map<SocketConnection, String> instances =
new ConcurrentHashMap<SocketConnection, String>(); new ConcurrentHashMap<SocketConnection, String>();
/**
* Milliseconds a connection has to be idle to be closed. Timeout is disabled by default. It's
* up to the connection's owner to configure the timeout value. Sending stanzas to the client
* is not considered as activity. We are only considering the connection active when the
* client sends some data or hearbeats (i.e. whitespaces) to the server.
* The reason for this is that sending data will fail if the connection is closed. And if
* the thread is blocked while sending data (because the socket is closed) then the clean up
* thread will close the socket anyway.
*/
private long idleTimeout = -1;
private Map<ConnectionCloseListener, Object> listeners = new HashMap<ConnectionCloseListener, Object>(); private Map<ConnectionCloseListener, Object> listeners = new HashMap<ConnectionCloseListener, Object>();
private Socket socket; private Socket socket;
private SocketReader socketReader;
private Writer writer; private Writer writer;
...@@ -234,6 +246,14 @@ public class SocketConnection implements Connection { ...@@ -234,6 +246,14 @@ public class SocketConnection implements Connection {
this.compressionPolicy = compressionPolicy; this.compressionPolicy = compressionPolicy;
} }
public long getIdleTimeout() {
return idleTimeout;
}
public void setIdleTimeout(long timeout) {
this.idleTimeout = timeout;
}
public int getMajorXMPPVersion() { public int getMajorXMPPVersion() {
return majorVersion; return majorVersion;
} }
...@@ -341,6 +361,19 @@ public class SocketConnection implements Connection { ...@@ -341,6 +361,19 @@ public class SocketConnection implements Connection {
} }
forceClose(); forceClose();
} }
else {
// Check if the connection has been idle. A connection is considered idle if the client
// has not been receiving data for a period. Sending data to the client is not
// considered as activity.
if (idleTimeout > -1 && socketReader != null &&
System.currentTimeMillis() - socketReader.getLastActive() > idleTimeout) {
// Close the socket
if (Log.isDebugEnabled()) {
Log.debug("Closing connection that has been idle: " + this);
}
forceClose();
}
}
} }
void release() { void release() {
...@@ -473,4 +506,7 @@ public class SocketConnection implements Connection { ...@@ -473,4 +506,7 @@ public class SocketConnection implements Connection {
return super.toString() + " socket: " + socket + " session: " + session; return super.toString() + " socket: " + socket + " session: " + session;
} }
public void setSocketReader(SocketReader socketReader) {
this.socketReader = socketReader;
}
} }
\ No newline at end of file
...@@ -67,7 +67,7 @@ public abstract class SocketReader implements Runnable { ...@@ -67,7 +67,7 @@ public abstract class SocketReader implements Runnable {
static { static {
try { try {
factory = XmlPullParserFactory.newInstance(); factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
} }
catch (XmlPullParserException e) { catch (XmlPullParserException e) {
Log.error("Error creating a parser factory", e); Log.error("Error creating a parser factory", e);
...@@ -88,6 +88,8 @@ public abstract class SocketReader implements Runnable { ...@@ -88,6 +88,8 @@ public abstract class SocketReader implements Runnable {
this.router = router; this.router = router;
this.connection = connection; this.connection = connection;
this.socket = socket; this.socket = socket;
connection.setSocketReader(this);
} }
/** /**
...@@ -514,6 +516,16 @@ public abstract class SocketReader implements Runnable { ...@@ -514,6 +516,16 @@ public abstract class SocketReader implements Runnable {
*/ */
abstract boolean processUnknowPacket(Element doc); abstract boolean processUnknowPacket(Element doc);
/**
* Returns the last time a full Document was read or a heartbeat was received. Hearbeats
* are represented as whitespaces received while a Document is not being parsed.
*
* @return the time in milliseconds when the last document or heartbeat was received.
*/
long getLastActive() {
return reader.getLastActive();
}
/** /**
* Close the connection since TLS was mandatory and the entity never negotiated TLS. Before * Close the connection since TLS was mandatory and the entity never negotiated TLS. Before
* closing the connection a stream error will be sent to the entity. * closing the connection a stream error will be sent to the entity.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment