Commit 3e926a78 authored by GregDThomas's avatar GregDThomas Committed by daryl herzmann

OF-1435: Ensure that we start a new thread before calling UserManager… (#942)

* OF-1435: Ensure that we start a new thread before calling UserManager::isRegisteredUser(JID)

* OF-1435: Introduce a Future to highlight that the processing may be performed asynchronously.
parent 8ed00ec5
......@@ -28,7 +28,9 @@ import org.jivesoftware.openfire.pep.PEPService;
import org.jivesoftware.openfire.pubsub.cluster.RefreshNodeTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.ImmediateFuture;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.TaskEngine;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -49,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
* A PubSubEngine is responsible for handling packets sent to a pub-sub service.
......@@ -73,15 +76,19 @@ public class PubSubEngine {
* are not being handled by the engine. Instead the service itself should handle disco packets.
*
* @param service the PubSub service this action is to be performed for.
* @param iq the IQ packet sent to the pubsub service.
* @return true if the IQ packet was handled by the engine.
* @param iq the IQ packet sent to the pubsub service.
* @return <code>null</code> if the IQ packet was not handled by the engine, otherwise a {@link Future} that
* indicates when processing is complete. Processing will be carried out asynchronously if there is the possibility
* of sending a disco#info to a remote server, which could block for up to 60 seconds. If processing is carried out
* synchronously, the returned future completes immediately. Note that the returned future will only return
* <code>null</code> when it completes.
*/
public boolean process(PubSubService service, IQ iq) {
public Future process(final PubSubService service, final IQ iq) {
// Ignore IQs of type ERROR or RESULT
if (IQ.Type.error == iq.getType() || IQ.Type.result == iq.getType()) {
return true;
return new ImmediateFuture<>();
}
Element childElement = iq.getChildElement();
final Element childElement = iq.getChildElement();
String namespace = null;
if (childElement != null) {
......@@ -91,14 +98,19 @@ public class PubSubEngine {
Element action = childElement.element("publish");
if (action != null) {
// Entity publishes an item
publishItemsToNode(service, iq, action);
return true;
// Complete this asynchronously, as UserManager::isRegisteredUser(JID) blocks, waiting for a result which may come in on this thread
final Element finalAction = action;
return TaskEngine.getInstance().submit(new Runnable() {
@Override
public void run() {
publishItemsToNode(service, iq, finalAction);
}
});
}
action = childElement.element("subscribe");
if (action != null) {
// Entity subscribes to a node
subscribeNode(service, iq, childElement, action);
return true;
return subscribeNode(service, iq, childElement, action);
}
action = childElement.element("options");
if (action != null) {
......@@ -110,47 +122,53 @@ public class PubSubEngine {
// Subscriber submits completed options form
configureSubscription(service, iq, action);
}
return true;
return new ImmediateFuture<>();
}
action = childElement.element("create");
if (action != null) {
// Entity is requesting to create a new node
createNode(service, iq, childElement, action);
return true;
final Element finalAction = action;
// Complete this asynchronously, as UserManager::isRegisteredUser(JID) blocks, waiting for a result which may come in on this thread
return TaskEngine.getInstance().submit(new Runnable() {
@Override
public void run() {
createNode(service, iq, childElement, finalAction);
}
});
}
action = childElement.element("unsubscribe");
if (action != null) {
// Entity unsubscribes from a node
unsubscribeNode(service, iq, action);
return true;
return new ImmediateFuture<>();
}
action = childElement.element("subscriptions");
if (action != null) {
// Entity requests all current subscriptions
getSubscriptions(service, iq, childElement);
return true;
return new ImmediateFuture<>();
}
action = childElement.element("affiliations");
if (action != null) {
// Entity requests all current affiliations
getAffiliations(service, iq, childElement);
return true;
return new ImmediateFuture<>();
}
action = childElement.element("items");
if (action != null) {
// Subscriber requests all active items
getPublishedItems(service, iq, action);
return true;
return new ImmediateFuture<>();
}
action = childElement.element("retract");
if (action != null) {
// Entity deletes an item
deleteItems(service, iq, action);
return true;
return new ImmediateFuture<>();
}
// Unknown action requested
sendErrorPacket(iq, PacketError.Condition.bad_request, null);
return true;
return new ImmediateFuture<>();
}
else if ("http://jabber.org/protocol/pubsub#owner".equals(namespace)) {
Element action = childElement.element("configure");
......@@ -164,7 +182,7 @@ public class PubSubEngine {
Element pubsubError = DocumentHelper.createElement(QName.get(
"nodeid-required", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
return true;
return new ImmediateFuture<>();
}
else {
// Sysadmin is trying to configure root collection node
......@@ -179,20 +197,20 @@ public class PubSubEngine {
// Owner submits or cancels node configuration form
configureNode(service, iq, action, nodeID);
}
return true;
return new ImmediateFuture<>();
}
action = childElement.element("default");
if (action != null) {
// Owner requests default configuration options for
// leaf or collection nodes
getDefaultNodeConfiguration(service, iq, childElement, action);
return true;
return new ImmediateFuture<>();
}
action = childElement.element("delete");
if (action != null) {
// Owner deletes a node
deleteNode(service, iq, action);
return true;
return new ImmediateFuture<>();
}
action = childElement.element("subscriptions");
if (action != null) {
......@@ -203,7 +221,7 @@ public class PubSubEngine {
else {
modifyNodeSubscriptions(service, iq, action);
}
return true;
return new ImmediateFuture<>();
}
action = childElement.element("affiliations");
if (action != null) {
......@@ -214,25 +232,25 @@ public class PubSubEngine {
else {
modifyNodeAffiliations(service, iq, action);
}
return true;
return new ImmediateFuture<>();
}
action = childElement.element("purge");
if (action != null) {
// Owner purges items from a node
purgeNode(service, iq, action);
return true;
return new ImmediateFuture<>();
}
// Unknown action requested so return error to sender
sendErrorPacket(iq, PacketError.Condition.bad_request, null);
return true;
return new ImmediateFuture<>();
}
else if ("http://jabber.org/protocol/commands".equals(namespace)) {
// Process ad-hoc command
IQ reply = service.getManager().process(iq);
router.route(reply);
return true;
return new ImmediateFuture<>();
}
return false;
return null;
}
/**
......@@ -497,9 +515,9 @@ public class PubSubEngine {
leafNode.deleteItems(items);
}
private void subscribeNode(PubSubService service, IQ iq, Element childElement, Element subscribeElement) {
private Future subscribeNode(final PubSubService service, final IQ iq, final Element childElement, Element subscribeElement) {
String nodeID = subscribeElement.attributeValue("node");
Node node;
final Node node;
if (nodeID == null) {
if (service.isCollectionNodesSupported()) {
// Entity subscribes to root collection node
......@@ -510,7 +528,7 @@ public class PubSubEngine {
Element pubsubError = DocumentHelper.createElement(QName.get(
"nodeid-required", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
return;
return new ImmediateFuture<>();
}
}
else {
......@@ -519,28 +537,40 @@ public class PubSubEngine {
if (node == null) {
// Node does not exist. Return item-not-found error
sendErrorPacket(iq, PacketError.Condition.item_not_found, null);
return;
return new ImmediateFuture<>();
}
}
// Check if sender and subscriber JIDs match or if a valid "trusted proxy" is being used
JID from = iq.getFrom();
JID subscriberJID = new JID(subscribeElement.attributeValue("jid"));
final JID from = iq.getFrom();
final JID subscriberJID = new JID(subscribeElement.attributeValue("jid"));
if (!from.toBareJID().equals(subscriberJID.toBareJID()) && !service.isServiceAdmin(from)) {
// JIDs do not match and requestor is not a service admin so return an error
Element pubsubError = DocumentHelper.createElement(
QName.get("invalid-jid", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
return;
return new ImmediateFuture<>();
}
// TODO Assumed that the owner of the subscription is the bare JID of the subscription JID. Waiting StPeter answer for explicit field.
JID owner = subscriberJID.asBareJID();
final JID owner = subscriberJID.asBareJID();
// Check if the node's access model allows the subscription to proceed
AccessModel accessModel = node.getAccessModel();
final AccessModel accessModel = node.getAccessModel();
if (!accessModel.canSubscribe(node, owner, subscriberJID)) {
sendErrorPacket(iq, accessModel.getSubsriptionError(),
accessModel.getSubsriptionErrorDetail());
return;
return new ImmediateFuture<>();
}
// Complete this asynchronously, as UserManager::isRegisteredUser(JID) blocks, waiting for a result which may come in on this thread
return TaskEngine.getInstance().submit(new Runnable() {
@Override
public void run() {
subscribeNodeAsync(iq, subscriberJID, node, owner, service, from, childElement, accessModel);
}
});
}
private void subscribeNodeAsync(final IQ iq, final JID subscriberJID, final Node node, final JID owner, final PubSubService service, final JID from, final Element childElement, final AccessModel accessModel) {
// Check if the subscriber is an anonymous user
if (!isComponent(subscriberJID) && !UserManager.getInstance().isRegisteredUser(subscriberJID)) {
// Anonymous users cannot subscribe to the node. Return forbidden error
......@@ -1139,7 +1169,9 @@ public class PubSubEngine {
* - Node does not already exist
* - New node configuration is valid
*
* NOTE: This method should not reply to the client
* <br/>NOTE 1: This method should not reply to the client
* <br/>NOTE 2: This method calls UserManager::isRegisteredUser(JID) which can block waiting for a response - so
* do not call this method in the same thread in which a response might arrive
*
* @param service
* @param iq
......
......@@ -172,7 +172,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
try {
// Check if the packet is a disco request or a packet with namespace iq:register
if (packet instanceof IQ) {
if (!engine.process(this, (IQ) packet)) {
if (engine.process(this, (IQ) packet) == null) {
process((IQ) packet);
}
}
......
......@@ -32,6 +32,7 @@ import org.jivesoftware.openfire.event.UserEventListener;
import org.jivesoftware.openfire.user.property.DefaultUserPropertyProvider;
import org.jivesoftware.openfire.user.property.UserPropertyProvider;
import org.jivesoftware.util.ClassUtils;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.PropertyEventDispatcher;
import org.jivesoftware.util.PropertyEventListener;
......@@ -411,6 +412,10 @@ public class UserManager implements IQResultListener {
* remote users (i.e. domain does not match local domain) a disco#info request is going
* to be sent to the bare JID of the user.
*
* <p>WARNING: If the supplied JID could be a remote user and the disco#info result packet comes back on the same
* thread as the one the calls this method then it will not be processed, and this method will block for 60 seconds
* by default. To change the timeout, update the system property <code>usermanager.remote-disco-info-timeout-seconds</code>
*
* @param user to JID of the user to check it it's a registered user.
* @return true if the specified JID belongs to a local or remote registered user.
*/
......@@ -444,9 +449,9 @@ public class UserManager implements IQResultListener {
server.getIQRouter().addIQResultListener(iq.getID(), this);
synchronized (user.toBareJID().intern()) {
server.getIQRouter().route(iq);
// Wait for the reply to be processed. Time out in 1 minute.
// Wait for the reply to be processed. Time out in 1 minute by default
try {
user.toBareJID().intern().wait(60000);
user.toBareJID().intern().wait(JiveGlobals.getLongProperty("usermanager.remote-disco-info-timeout-seconds", 60) * JiveConstants.SECOND);
}
catch (InterruptedException e) {
// Do nothing
......
package org.jivesoftware.util;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* A Future that returns immediately.
*
* @param <T> The type of return value
*/
public class ImmediateFuture<T> implements Future<T> {
private final T value;
/**
* Creates a Future that returns null immediately
*/
public ImmediateFuture() {
this(null);
}
/**
* Creates a Future that returns the supplied value immediately
*
* @param value the value to return
*/
public ImmediateFuture(final T value) {
this.value = value;
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public T get() {
return value;
}
@Override
public T get(final long timeout, final TimeUnit unit) {
return value;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment