Commit a765eb86 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Added timeout to IQRouter for IQResultListener. JM-1151

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@9312 b35dd754-fafc-0310-a699-88a17e54d16e
parent 9f9bb9b9
...@@ -31,4 +31,12 @@ public interface IQResultListener { ...@@ -31,4 +31,12 @@ public interface IQResultListener {
* @param packet the IQ packet answering a previously sent IQ packet. * @param packet the IQ packet answering a previously sent IQ packet.
*/ */
void receivedAnswer(IQ packet); void receivedAnswer(IQ packet);
/**
* Notification method indicating that a predefined time has passed without
* receiving answer to a previously sent IQ packet.
*
* @param packetId The packet id of a previously sent IQ packet that wasn't answered.
*/
void answerTimeout(String packetId);
} }
...@@ -23,11 +23,10 @@ import org.jivesoftware.openfire.session.Session; ...@@ -23,11 +23,10 @@ import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.TaskEngine;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
...@@ -45,8 +44,8 @@ public class IQRouter extends BasicModule { ...@@ -45,8 +44,8 @@ public class IQRouter extends BasicModule {
private String serverName; private String serverName;
private List<IQHandler> iqHandlers = new ArrayList<IQHandler>(); private List<IQHandler> iqHandlers = new ArrayList<IQHandler>();
private Map<String, IQHandler> namespace2Handlers = new ConcurrentHashMap<String, IQHandler>(); private Map<String, IQHandler> namespace2Handlers = new ConcurrentHashMap<String, IQHandler>();
private Map<String, IQResultListener> resultListeners = private Map<String, IQResultListener> resultListeners = new ConcurrentHashMap<String, IQResultListener>();
new ConcurrentHashMap<String, IQResultListener>(); private Map<String, Long> resultTimeout = new ConcurrentHashMap<String, Long>();
private SessionManager sessionManager; private SessionManager sessionManager;
private UserManager userManager; private UserManager userManager;
...@@ -173,25 +172,65 @@ public class IQRouter extends BasicModule { ...@@ -173,25 +172,65 @@ public class IQRouter extends BasicModule {
} }
/** /**
* Adds an {@link IQResultListener} that will be invoked when an IQ result is sent to the * Adds an {@link IQResultListener} that will be invoked when an IQ result
* server itself and is of type result or error. This is a nice way for the server to * is sent to the server itself and is of type result or error. This is a
* send IQ packets to other XMPP entities and be waked up when a response is received back.<p> * nice way for the server to send IQ packets to other XMPP entities and be
* waked up when a response is received back.<p>
* *
* Once an IQ result was received, the listener will be invoked and removed from * Once an IQ result was received, the listener will be invoked and removed
* the list of listeners. * from the list of listeners.<p>
*
* If no result was received within one minute, the timeout method of the
* listener will be invoked and the listener will be removed from the list
* of listeners.
* *
* @param id the id of the IQ packet being sent from the server to an XMPP entity. * @param id
* @param listener the IQResultListener that will be invoked when an answer is received * the id of the IQ packet being sent from the server to an XMPP
* entity.
* @param listener
* the IQResultListener that will be invoked when an answer is
* received
*/ */
public void addIQResultListener(String id, IQResultListener listener) { public void addIQResultListener(String id, IQResultListener listener) {
// TODO Add a check that if no IQ reply was received for a while then an IQ error should addIQResultListener(id, listener, 60 * 1000);
// be generated by the server and simulate like the client sent it. This will let listeners }
// react and be removed from the collection
/**
* Adds an {@link IQResultListener} that will be invoked when an IQ result
* is sent to the server itself and is of type result or error. This is a
* nice way for the server to send IQ packets to other XMPP entities and be
* waked up when a response is received back.<p>
*
* Once an IQ result was received, the listener will be invoked and removed
* from the list of listeners.<p>
*
* If no result was received within the specified amount of milliseconds,
* the timeout method of the listener will be invoked and the listener will
* be removed from the list of listeners.<p>
*
* Note that the listener will remain active for <em>at least</em> the
* specified timeout value. The listener will not be removed at the exact
* moment it times out. Instead, purging of timed out listeners is a
* periodic scheduled job.
*
* @param id
* the id of the IQ packet being sent from the server to an XMPP
* entity.
* @param listener
* the IQResultListener that will be invoked when an answer is
* received.
* @param timeoutmillis
* The amount of milliseconds after which waiting for a response
* should be stopped.
*/
public void addIQResultListener(String id, IQResultListener listener, long timeoutmillis) {
resultListeners.put(id, listener); resultListeners.put(id, listener);
resultTimeout.put(id, System.currentTimeMillis() + timeoutmillis);
} }
public void initialize(XMPPServer server) { public void initialize(XMPPServer server) {
super.initialize(server); super.initialize(server);
TaskEngine.getInstance().scheduleAtFixedRate(new TimeoutTask(), 5000, 5000);
serverName = server.getServerInfo().getName(); serverName = server.getServerInfo().getName();
routingTable = server.getRoutingTable(); routingTable = server.getRoutingTable();
multicastRouter = server.getMulticastRouter(); multicastRouter = server.getMulticastRouter();
...@@ -240,7 +279,8 @@ public class IQRouter extends BasicModule { ...@@ -240,7 +279,8 @@ public class IQRouter extends BasicModule {
} }
else if (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType()) { else if (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType()) {
// The server got an answer to an IQ packet that was sent from the server // The server got an answer to an IQ packet that was sent from the server
IQResultListener iqResultListener = resultListeners.remove(packet.getID()); IQResultListener iqResultListener = resultListeners.remove(packet.getID());
resultTimeout.remove(packet.getID());
if (iqResultListener != null) { if (iqResultListener != null) {
try { try {
iqResultListener.receivedAnswer(packet); iqResultListener.receivedAnswer(packet);
...@@ -377,4 +417,51 @@ public class IQRouter extends BasicModule { ...@@ -377,4 +417,51 @@ public class IQRouter extends BasicModule {
Log.warn("Error or result packet could not be delivered " + packet); Log.warn("Error or result packet could not be delivered " + packet);
} }
} }
/**
* Timer task that will remove Listeners that wait for results to IQ stanzas
* that have timed out. Time out values can be set to each listener
* individually by adjusting the timeout value in the third parameter of
* {@link IQRouter#addIQResultListener(String, IQResultListener, long)}.
*
* @author Guus der Kinderen, guus@nimbuzz.com
*/
private class TimeoutTask extends TimerTask {
/**
* Iterates over and removes all timed out results.<p>
*
* The map that keeps track of timeout values is ordered by timeout
* date. This way, iteration can be stopped as soon as the first value
* has been found that didn't timeout yet.
*/
@Override
public void run() {
// Use an Iterator to allow changes to the Map that is backing
// the Iterator.
final Iterator<Map.Entry<String, Long>> it = resultTimeout.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<String, Long> pointer = it.next();
if (System.currentTimeMillis() < pointer.getValue()) {
// All other values do not time out yet. Abort this run.
break;
}
final String packetId = pointer.getKey();
// remove this listener from the list
final IQResultListener listener = resultListeners.remove(packetId);
if (listener != null) {
// notify listener of the timeout.
listener.answerTimeout(packetId);
}
// remove the packet from the list that's used to track
// timeouts
it.remove();
}
}
}
} }
...@@ -12,10 +12,11 @@ ...@@ -12,10 +12,11 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider; import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
...@@ -370,6 +371,10 @@ public class MulticastRouter extends BasicModule implements ServerFeaturesProvid ...@@ -370,6 +371,10 @@ public class MulticastRouter extends BasicModule implements ServerFeaturesProvid
} }
} }
public void answerTimeout(String packetId) {
Log.warn("An answer to a previously sent IQ stanza was never received. Packet id: " + packetId);
}
public Iterator<String> getFeatures() { public Iterator<String> getFeatures() {
ArrayList<String> features = new ArrayList<String>(); ArrayList<String> features = new ArrayList<String>();
features.add(NAMESPACE); features.add(NAMESPACE);
......
...@@ -199,6 +199,10 @@ public class InternalComponentManager extends BasicModule implements ComponentMa ...@@ -199,6 +199,10 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
public void receivedAnswer(IQ packet) { public void receivedAnswer(IQ packet) {
answer.offer(packet); answer.offer(packet);
} }
public void answerTimeout(String packetId) {
Log.warn("An answer to a previously sent IQ stanza was never received. Packet id: " + packetId);
}
}); });
sendPacket(component, packet); sendPacket(component, packet);
IQ reply = null; IQ reply = null;
......
...@@ -393,6 +393,10 @@ public class UserManager implements IQResultListener { ...@@ -393,6 +393,10 @@ public class UserManager implements IQResultListener {
} }
} }
public void answerTimeout(String packetId) {
Log.warn("An answer to a previously sent IQ stanza was never received. Packet id: " + packetId);
}
private void initProvider() { private void initProvider() {
String className = JiveGlobals.getXMLProperty("provider.user.className", String className = JiveGlobals.getXMLProperty("provider.user.className",
"org.jivesoftware.openfire.user.DefaultUserProvider"); "org.jivesoftware.openfire.user.DefaultUserProvider");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment