Commit 8a9df3ff authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-205: Prevent recursion when canceling subscriptions across cluster members

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/pubsub_clustering@13278 b35dd754-fafc-0310-a699-88a17e54d16e
parent 7a18f0f0
...@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.pubsub.cluster.CancelSubscriptionTask; import org.jivesoftware.openfire.pubsub.cluster.CancelSubscriptionTask;
import org.jivesoftware.openfire.pubsub.cluster.ModifySubscriptionTask; import org.jivesoftware.openfire.pubsub.cluster.ModifySubscriptionTask;
import org.jivesoftware.openfire.pubsub.cluster.NewSubscriptionTask; import org.jivesoftware.openfire.pubsub.cluster.NewSubscriptionTask;
...@@ -1759,8 +1760,8 @@ public abstract class Node { ...@@ -1759,8 +1760,8 @@ public abstract class Node {
affiliates.add(affiliate); affiliates.add(affiliate);
} }
public void addSubscription(NodeSubscription subscription) public void addSubscription(NodeSubscription subscription)
{ {
subscriptionsByID.put(subscription.getID(), subscription); subscriptionsByID.put(subscription.getID(), subscription);
subscriptionsByJID.put(subscription.getJID().toString(), subscription); subscriptionsByJID.put(subscription.getJID().toString(), subscription);
} }
...@@ -2067,10 +2068,10 @@ public abstract class Node { ...@@ -2067,10 +2068,10 @@ public abstract class Node {
if (subscription.isAuthorizationPending()) { if (subscription.isAuthorizationPending()) {
subscription.sendAuthorizationRequest(); subscription.sendAuthorizationRequest();
} }
// Synchronous so the task can flush all other cluster nodes before // Synchronous so the task can flush all other cluster nodes before
// the last item is retrieved. // the last item is retrieved.
CacheFactory.doSynchronousClusterTask(new NewSubscriptionTask(subscription), false); CacheFactory.doSynchronousClusterTask(new NewSubscriptionTask(subscription), false);
// Send last published item (if node is leaf node and subscription status is ok) // Send last published item (if node is leaf node and subscription status is ok)
if (isSendItemSubscribe() && subscription.isActive()) { if (isSendItemSubscribe() && subscription.isActive()) {
...@@ -2097,8 +2098,9 @@ public abstract class Node { ...@@ -2097,8 +2098,9 @@ public abstract class Node {
* remove the existing affiliation too. * remove the existing affiliation too.
* *
* @param subscription the subscription to cancel. * @param subscription the subscription to cancel.
* @param sendToCluster True to forward cancel order to cluster peers
*/ */
public void cancelSubscription(NodeSubscription subscription) { public void cancelSubscription(NodeSubscription subscription, boolean sendToCluster) {
// Remove subscription from memory // Remove subscription from memory
subscriptionsByID.remove(subscription.getID()); subscriptionsByID.remove(subscription.getID());
subscriptionsByJID.remove(subscription.getJID().toString()); subscriptionsByJID.remove(subscription.getJID().toString());
...@@ -2113,7 +2115,9 @@ public abstract class Node { ...@@ -2113,7 +2115,9 @@ public abstract class Node {
// Remove the subscription from the database // Remove the subscription from the database
PubSubPersistenceManager.removeSubscription(subscription); PubSubPersistenceManager.removeSubscription(subscription);
} }
CacheFactory.doClusterTask(new CancelSubscriptionTask(subscription)); if (sendToCluster) {
CacheFactory.doClusterTask(new CancelSubscriptionTask(subscription));
}
// Check if we need to unsubscribe from the presence of the owner // Check if we need to unsubscribe from the presence of the owner
if (isPresenceBasedDelivery() && getSubscriptions(subscription.getOwner()).isEmpty()) { if (isPresenceBasedDelivery() && getSubscriptions(subscription.getOwner()).isEmpty()) {
...@@ -2121,6 +2125,17 @@ public abstract class Node { ...@@ -2121,6 +2125,17 @@ public abstract class Node {
} }
} }
/**
* Cancels an existing subscription to the node. If the subscriber does not have any
* other subscription to the node and his affiliation was of type <tt>none</tt> then
* remove the existing affiliation too.
*
* @param subscription the subscription to cancel.
*/
public void cancelSubscription(NodeSubscription subscription) {
cancelSubscription(subscription, ClusterManager.isClusteringEnabled());
}
/** /**
* Returns the {@link PublishedItem} whose ID matches the specified item ID or <tt>null</tt> * Returns the {@link PublishedItem} whose ID matches the specified item ID or <tt>null</tt>
* if none was found. Item ID may or may not exist and it depends on the node's configuration. * if none was found. Item ID may or may not exist and it depends on the node's configuration.
...@@ -2182,7 +2197,7 @@ public abstract class Node { ...@@ -2182,7 +2197,7 @@ public abstract class Node {
} }
@Override @Override
public String toString() { public String toString() {
return super.toString() + " - ID: " + getNodeID(); return super.toString() + " - ID: " + getNodeID();
} }
...@@ -2215,7 +2230,7 @@ public abstract class Node { ...@@ -2215,7 +2230,7 @@ public abstract class Node {
if (approved) { if (approved) {
// Mark that the subscription to the node has been approved // Mark that the subscription to the node has been approved
subscription.approved(); subscription.approved();
CacheFactory.doClusterTask(new ModifySubscriptionTask(subscription)); CacheFactory.doClusterTask(new ModifySubscriptionTask(subscription));
} }
else { else {
// Cancel the subscription to the node // Cancel the subscription to the node
......
...@@ -32,6 +32,6 @@ public class CancelSubscriptionTask extends SubscriptionTask ...@@ -32,6 +32,6 @@ public class CancelSubscriptionTask extends SubscriptionTask
// This method will make a db call, but it will simply do nothing since // This method will make a db call, but it will simply do nothing since
// the record will already be deleted. // the record will already be deleted.
node.cancelSubscription(getSubscription()); node.cancelSubscription(getSubscription(), false);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment