Commit cb1a6734 authored by Robin Collier's avatar Robin Collier Committed by rcollier

OF-205 Some updates to provide extra cluster tasks such as node removal and...

OF-205 Some updates to provide extra cluster tasks such as node removal and flushing persistence cache.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/pubsub_clustering@13255 b35dd754-fafc-0310-a699-88a17e54d16e
parent 45a31670
...@@ -30,7 +30,10 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -30,7 +30,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.pubsub.cluster.CancelSubscriptionTask;
import org.jivesoftware.openfire.pubsub.cluster.ModifySubscriptionTask;
import org.jivesoftware.openfire.pubsub.cluster.NewSubscriptionTask; import org.jivesoftware.openfire.pubsub.cluster.NewSubscriptionTask;
import org.jivesoftware.openfire.pubsub.cluster.RemoveNodeTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel; import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel; import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
...@@ -417,6 +420,21 @@ public abstract class Node { ...@@ -417,6 +420,21 @@ public abstract class Node {
return subscriptionsByID.values(); return subscriptionsByID.values();
} }
/**
* Returns all subscriptions to the node. If multiple subscriptions are enabled,
* this method returns the subscriptions by <tt>subId</tt>, otherwise it returns
* the subscriptions by {@link JID}.
*
* @return All subscriptions to the node.
*/
public Collection<NodeSubscription> getAllSubscriptions() {
if (isMultipleSubscriptionsEnabled()) {
return subscriptionsByID.values();
} else {
return subscriptionsByJID.values();
}
}
/** /**
* Returns the {@link NodeAffiliate} of the specified {@link JID} or <tt>null</tt> * Returns the {@link NodeAffiliate} of the specified {@link JID} or <tt>null</tt>
* if none was found. Users that have a subscription with the node will ALWAYS * if none was found. Users that have a subscription with the node will ALWAYS
...@@ -1817,6 +1835,7 @@ public abstract class Node { ...@@ -1817,6 +1835,7 @@ public abstract class Node {
cancelPresenceSubscriptions(); cancelPresenceSubscriptions();
// Remove the node from memory // Remove the node from memory
service.removeNode(getNodeID()); service.removeNode(getNodeID());
CacheFactory.doClusterTask(new RemoveNodeTask(this));
// Clear collections in memory (clear them after broadcast was sent) // Clear collections in memory (clear them after broadcast was sent)
affiliates.clear(); affiliates.clear();
subscriptionsByID.clear(); subscriptionsByID.clear();
...@@ -2094,6 +2113,8 @@ public abstract class Node { ...@@ -2094,6 +2113,8 @@ public abstract class Node {
// Remove the subscription from the database // Remove the subscription from the database
PubSubPersistenceManager.removeSubscription(subscription); PubSubPersistenceManager.removeSubscription(subscription);
} }
CacheFactory.doClusterTask(new CancelSubscriptionTask(subscription));
// Check if we need to unsubscribe from the presence of the owner // Check if we need to unsubscribe from the presence of the owner
if (isPresenceBasedDelivery() && getSubscriptions(subscription.getOwner()).isEmpty()) { if (isPresenceBasedDelivery() && getSubscriptions(subscription.getOwner()).isEmpty()) {
service.presenceSubscriptionNotRequired(this, subscription.getOwner()); service.presenceSubscriptionNotRequired(this, subscription.getOwner());
...@@ -2194,6 +2215,7 @@ public abstract class Node { ...@@ -2194,6 +2215,7 @@ public abstract class Node {
if (approved) { if (approved) {
// Mark that the subscription to the node has been approved // Mark that the subscription to the node has been approved
subscription.approved(); subscription.approved();
CacheFactory.doClusterTask(new ModifySubscriptionTask(subscription));
} }
else { else {
// Cancel the subscription to the node // Cancel the subscription to the node
......
...@@ -41,12 +41,14 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -41,12 +41,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.jivesoftware.database.DbConnectionManager; import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.database.DbConnectionManager.DatabaseType; import org.jivesoftware.database.DbConnectionManager.DatabaseType;
import org.jivesoftware.openfire.pubsub.cluster.FlushTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel; import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel; import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LinkedList; import org.jivesoftware.util.LinkedList;
import org.jivesoftware.util.LinkedListNode; import org.jivesoftware.util.LinkedListNode;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
...@@ -197,7 +199,7 @@ public class PubSubPersistenceManager { ...@@ -197,7 +199,7 @@ public class PubSubPersistenceManager {
private static final int POOL_SIZE = 50; private static final int POOL_SIZE = 50;
private static final int ITEM_CACHE_SIZE = JiveGlobals.getIntProperty("xmpp.pubsub.flush.cache", 1000); private static final int ITEM_CACHE_SIZE = JiveGlobals.getIntProperty("xmpp.pubsub.item.cache", 1000);
private static int publishedItemSize = 0; private static int publishedItemSize = 0;
private static final Object itemLock = new Object(); private static final Object itemLock = new Object();
...@@ -1128,7 +1130,7 @@ public class PubSubPersistenceManager { ...@@ -1128,7 +1130,7 @@ public class PubSubPersistenceManager {
itemMap.put(item.getID(), listNode); itemMap.put(item.getID(), listNode);
publishedItemSize++; publishedItemSize++;
if (publishedItemSize > ITEM_CACHE_SIZE) if (publishedItemSize > ITEM_CACHE_SIZE)
flushItems(); flushItems();
} }
} }
...@@ -1140,6 +1142,8 @@ public class PubSubPersistenceManager { ...@@ -1140,6 +1142,8 @@ public class PubSubPersistenceManager {
{ {
log.debug("Flushing items to database"); log.debug("Flushing items to database");
CacheFactory.doSynchronousClusterTask(new FlushTask(), false);
boolean abortTransaction = false; boolean abortTransaction = false;
LinkedList addList = null; LinkedList addList = null;
LinkedList delList = null; LinkedList delList = null;
......
...@@ -2,9 +2,13 @@ package org.jivesoftware.openfire.pubsub.cluster; ...@@ -2,9 +2,13 @@ package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.Node; import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.NodeSubscription; import org.jivesoftware.openfire.pubsub.NodeSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CancelSubscriptionTask extends SubscriptionTask public class CancelSubscriptionTask extends SubscriptionTask
{ {
private static final Logger log = LoggerFactory.getLogger(CancelSubscriptionTask.class);
public CancelSubscriptionTask() public CancelSubscriptionTask()
{ {
} }
...@@ -17,7 +21,7 @@ public class CancelSubscriptionTask extends SubscriptionTask ...@@ -17,7 +21,7 @@ public class CancelSubscriptionTask extends SubscriptionTask
@Override @Override
public void run() public void run()
{ {
System.out.println("Running DeleteSubscriptionTask: " + toString()); log.debug("[TASK] Cancel Subscription : {}", toString());
Node node = getNode(); Node node = getNode();
......
package org.jivesoftware.openfire.pubsub.cluster;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.jivesoftware.util.cache.ClusterTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlushTask implements ClusterTask
{
private static final Logger log = LoggerFactory.getLogger(FlushTask.class);
public FlushTask()
{
}
@Override
public void run()
{
log.debug("[TASK] Flush pubsub");
PubSubPersistenceManager.flushItems();
}
@Override
public Object getResult()
{
return null;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException
{
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
}
}
...@@ -2,9 +2,13 @@ package org.jivesoftware.openfire.pubsub.cluster; ...@@ -2,9 +2,13 @@ package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.NodeSubscription; import org.jivesoftware.openfire.pubsub.NodeSubscription;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager; import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ModifySubscriptionTask extends SubscriptionTask public class ModifySubscriptionTask extends SubscriptionTask
{ {
private static final Logger log = LoggerFactory.getLogger(ModifySubscriptionTask.class);
public ModifySubscriptionTask() public ModifySubscriptionTask()
{ {
...@@ -18,6 +22,7 @@ public class ModifySubscriptionTask extends SubscriptionTask ...@@ -18,6 +22,7 @@ public class ModifySubscriptionTask extends SubscriptionTask
@Override @Override
public void run() public void run()
{ {
log.debug("[TASK] Modify subscription : {}", toString());
PubSubPersistenceManager.loadSubscription(getService(), getNode(), getSubscriptionId()); PubSubPersistenceManager.loadSubscription(getService(), getNode(), getSubscriptionId());
} }
} }
...@@ -2,10 +2,13 @@ package org.jivesoftware.openfire.pubsub.cluster; ...@@ -2,10 +2,13 @@ package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.Node; import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.NodeSubscription; import org.jivesoftware.openfire.pubsub.NodeSubscription;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NewSubscriptionTask extends SubscriptionTask public class NewSubscriptionTask extends SubscriptionTask
{ {
private static final Logger log = LoggerFactory.getLogger(NewSubscriptionTask.class);
public NewSubscriptionTask() public NewSubscriptionTask()
{ {
...@@ -19,7 +22,7 @@ public class NewSubscriptionTask extends SubscriptionTask ...@@ -19,7 +22,7 @@ public class NewSubscriptionTask extends SubscriptionTask
@Override @Override
public void run() public void run()
{ {
System.out.println("Running NewSubscriptionTask: " + toString()); log.debug("[TASK] New subscription : {}", toString());
Node node = getNode(); Node node = getNode();
...@@ -45,7 +48,5 @@ public class NewSubscriptionTask extends SubscriptionTask ...@@ -45,7 +48,5 @@ public class NewSubscriptionTask extends SubscriptionTask
getService().presenceSubscriptionRequired(getNode(), getOwner()); getService().presenceSubscriptionRequired(getNode(), getOwner());
} }
} }
// We have to flush so the originating node can do a get last item.
PubSubPersistenceManager.flushItems();
} }
} }
...@@ -2,6 +2,8 @@ package org.jivesoftware.openfire.pubsub.cluster; ...@@ -2,6 +2,8 @@ package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.Node; import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager; import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Forces the node to be refreshed from the database. This will load a node from * Forces the node to be refreshed from the database. This will load a node from
...@@ -13,6 +15,8 @@ import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager; ...@@ -13,6 +15,8 @@ import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
*/ */
public class RefreshNodeTask extends NodeTask public class RefreshNodeTask extends NodeTask
{ {
private static final Logger log = LoggerFactory.getLogger(RefreshNodeTask.class);
public RefreshNodeTask() public RefreshNodeTask()
{ {
} }
...@@ -31,7 +35,7 @@ public class RefreshNodeTask extends NodeTask ...@@ -31,7 +35,7 @@ public class RefreshNodeTask extends NodeTask
@Override @Override
public void run() public void run()
{ {
System.out.println("Refreshing node task"); log.debug("[TASK] Refreshing node - nodeID: {}", getNodeId());
PubSubPersistenceManager.loadNode(getService(), getNodeId()); PubSubPersistenceManager.loadNode(getService(), getNodeId());
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment