Commit a7d7b422 authored by Robin Collier's avatar Robin Collier Committed by rcollier

OF-205 Initial refactoring for clustering support.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/pubsub_clustering@13170 b35dd754-fafc-0310-a699-88a17e54d16e
parents 31d27670 a748b317
...@@ -157,6 +157,11 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -157,6 +157,11 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
pepServiceManager = new PEPServiceManager(); pepServiceManager = new PEPServiceManager();
} }
public PEPServiceManager getServiceManager()
{
return pepServiceManager;
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
...@@ -441,7 +446,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -441,7 +446,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
* *
* @return the knownRemotePresences map * @return the knownRemotePresences map
*/ */
public Map<String, Set<JID>> getKnownRemotePresenes() { public Map<String, Set<JID>> getKnownRemotePresences() {
return knownRemotePresences; return knownRemotePresences;
} }
......
...@@ -348,7 +348,8 @@ public class PEPService implements PubSubService, Cacheable { ...@@ -348,7 +348,8 @@ public class PEPService implements PubSubService, Cacheable {
else { else {
// Since recipientJID is not local, try to get presence info from cached known remote // Since recipientJID is not local, try to get presence info from cached known remote
// presences. // presences.
Map<String, Set<JID>> knownRemotePresences = XMPPServer.getInstance().getIQPEPHandler().getKnownRemotePresenes(); Map<String, Set<JID>> knownRemotePresences = XMPPServer.getInstance().getIQPEPHandler()
.getKnownRemotePresences();
Set<JID> remotePresenceSet = knownRemotePresences.get(getAddress().toBareJID()); Set<JID> remotePresenceSet = knownRemotePresences.get(getAddress().toBareJID());
if (remotePresenceSet != null) { if (remotePresenceSet != null) {
......
...@@ -21,19 +21,15 @@ ...@@ -21,19 +21,15 @@
package org.jivesoftware.openfire.pubsub; package org.jivesoftware.openfire.pubsub;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm; import org.xmpp.forms.DataForm;
...@@ -76,11 +72,9 @@ public class LeafNode extends Node { ...@@ -76,11 +72,9 @@ public class LeafNode extends Node {
*/ */
private boolean sendItemSubscribe; private boolean sendItemSubscribe;
/** /**
* List of items that were published to the node and that are still active. If the node is * The last item published to this node. In a cluster this may have occurred on a different cluster node.
* not configured to persist items then the last published item will be kept. The list is
* sorted cronologically.
*/ */
volatile private PublishedItem lastPublished; private PublishedItem lastPublished;
// TODO Add checking of max payload size. Return <not-acceptable> plus a application specific error condition of <payload-too-big/>. // TODO Add checking of max payload size. Return <not-acceptable> plus a application specific error condition of <payload-too-big/>.
...@@ -173,8 +167,10 @@ public class LeafNode extends Node { ...@@ -173,8 +167,10 @@ public class LeafNode extends Node {
protected void deletingNode() { protected void deletingNode() {
} }
void setLastPublishedItem(PublishedItem item) { public synchronized void setLastPublishedItem(PublishedItem item)
lastPublished = item; {
if ((lastPublished == null) || (item != null) && item.getCreationDate().after(lastPublished.getCreationDate()))
lastPublished = item;
} }
public int getMaxPayloadSize() { public int getMaxPayloadSize() {
...@@ -360,7 +356,7 @@ public class LeafNode extends Node { ...@@ -360,7 +356,7 @@ public class LeafNode extends Node {
} }
@Override @Override
public PublishedItem getLastPublishedItem() { public synchronized PublishedItem getLastPublishedItem() {
return lastPublished; return lastPublished;
} }
......
...@@ -20,21 +20,28 @@ ...@@ -20,21 +20,28 @@
package org.jivesoftware.openfire.pubsub; package org.jivesoftware.openfire.pubsub;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.pubsub.cluster.NewSubscriptionTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel; import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel; import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.CacheFactory;
import org.xmpp.forms.DataForm; import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField; import org.xmpp.forms.FormField;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* A virtual location to which information can be published and from which event * A virtual location to which information can be published and from which event
* notifications and/or payloads can be received (in other pubsub systems, this may * notifications and/or payloads can be received (in other pubsub systems, this may
...@@ -1734,7 +1741,8 @@ public abstract class Node { ...@@ -1734,7 +1741,8 @@ public abstract class Node {
affiliates.add(affiliate); affiliates.add(affiliate);
} }
void addSubscription(NodeSubscription subscription) { public void addSubscription(NodeSubscription subscription)
{
subscriptionsByID.put(subscription.getID(), subscription); subscriptionsByID.put(subscription.getID(), subscription);
subscriptionsByJID.put(subscription.getJID().toString(), subscription); subscriptionsByJID.put(subscription.getJID().toString(), subscription);
} }
...@@ -2017,8 +2025,7 @@ public abstract class Node { ...@@ -2017,8 +2025,7 @@ public abstract class Node {
// Generate a subscription ID (override even if one was sent by the client) // Generate a subscription ID (override even if one was sent by the client)
String id = StringUtils.randomString(40); String id = StringUtils.randomString(40);
// Create new subscription // Create new subscription
NodeSubscription subscription = NodeSubscription subscription = new NodeSubscription(this, owner, subscriber, subState, id);
new NodeSubscription(service, this, owner, subscriber, subState, id);
// Configure the subscription with the specified configuration (if any) // Configure the subscription with the specified configuration (if any)
if (options != null) { if (options != null) {
subscription.configure(options); subscription.configure(options);
...@@ -2041,6 +2048,10 @@ public abstract class Node { ...@@ -2041,6 +2048,10 @@ public abstract class Node {
if (subscription.isAuthorizationPending()) { if (subscription.isAuthorizationPending()) {
subscription.sendAuthorizationRequest(); subscription.sendAuthorizationRequest();
} }
// Synchronous so the task can flush all other cluster nodes before
// the last item is retrieved.
CacheFactory.doSynchronousClusterTask(new NewSubscriptionTask(subscription), false);
// Send last published item (if node is leaf node and subscription status is ok) // Send last published item (if node is leaf node and subscription status is ok)
if (isSendItemSubscribe() && subscription.isActive()) { if (isSendItemSubscribe() && subscription.isActive()) {
......
...@@ -69,10 +69,7 @@ public class NodeSubscription { ...@@ -69,10 +69,7 @@ public class NodeSubscription {
private static final SimpleDateFormat dateFormat; private static final SimpleDateFormat dateFormat;
private static final FastDateFormat fastDateFormat; private static final FastDateFormat fastDateFormat;
/**
* Reference to the publish and subscribe service.
*/
private PubSubService service;
/** /**
* The node to which this subscription is interested in. * The node to which this subscription is interested in.
*/ */
...@@ -160,8 +157,8 @@ public class NodeSubscription { ...@@ -160,8 +157,8 @@ public class NodeSubscription {
* @param state the state of the subscription with the node. * @param state the state of the subscription with the node.
* @param id the id the uniquely identifies this subscriptin within the node. * @param id the id the uniquely identifies this subscriptin within the node.
*/ */
NodeSubscription(PubSubService service, Node node, JID owner, JID jid, State state, String id) { public NodeSubscription(Node node, JID owner, JID jid, State state, String id)
this.service = service; {
this.node = node; this.node = node;
this.jid = jid; this.jid = jid;
this.owner = owner; this.owner = owner;
...@@ -405,7 +402,7 @@ public class NodeSubscription { ...@@ -405,7 +402,7 @@ public class NodeSubscription {
configure(options); configure(options);
if (originalIQ != null) { if (originalIQ != null) {
// Return success response // Return success response
service.send(IQ.createResultIQ(originalIQ)); node.getService().send(IQ.createResultIQ(originalIQ));
} }
if (wasUnconfigured) { if (wasUnconfigured) {
...@@ -519,10 +516,10 @@ public class NodeSubscription { ...@@ -519,10 +516,10 @@ public class NodeSubscription {
// Check if the service needs to subscribe or unsubscribe from the owner presence // Check if the service needs to subscribe or unsubscribe from the owner presence
if (!node.isPresenceBasedDelivery() && wasUsingPresence != !presenceStates.isEmpty()) { if (!node.isPresenceBasedDelivery() && wasUsingPresence != !presenceStates.isEmpty()) {
if (presenceStates.isEmpty()) { if (presenceStates.isEmpty()) {
service.presenceSubscriptionNotRequired(node, owner); node.getService().presenceSubscriptionNotRequired(node, owner);
} }
else { else {
service.presenceSubscriptionRequired(node, owner); node.getService().presenceSubscriptionRequired(node, owner);
} }
} }
} }
...@@ -719,7 +716,7 @@ public class NodeSubscription { ...@@ -719,7 +716,7 @@ public class NodeSubscription {
} }
// Check if delivery is subject to presence-based policy // Check if delivery is subject to presence-based policy
if (!getPresenceStates().isEmpty()) { if (!getPresenceStates().isEmpty()) {
Collection<String> shows = service.getShowPresences(jid); Collection<String> shows = node.getService().getShowPresences(jid);
if (shows.isEmpty() || Collections.disjoint(getPresenceStates(), shows)) { if (shows.isEmpty() || Collections.disjoint(getPresenceStates(), shows)) {
return false; return false;
} }
...@@ -727,7 +724,8 @@ public class NodeSubscription { ...@@ -727,7 +724,8 @@ public class NodeSubscription {
// Check if node is only sending events when user is online // Check if node is only sending events when user is online
if (node.isPresenceBasedDelivery()) { if (node.isPresenceBasedDelivery()) {
// Check that user is online // Check that user is online
if (service.getShowPresences(jid).isEmpty()) { if (node.getService().getShowPresences(jid).isEmpty())
{
return false; return false;
} }
} }
...@@ -795,7 +793,7 @@ public class NodeSubscription { ...@@ -795,7 +793,7 @@ public class NodeSubscription {
subscribeOptions.addElement("required"); subscribeOptions.addElement("required");
} }
// Send the result // Send the result
service.send(result); node.getService().send(result);
} }
/** /**
...@@ -834,7 +832,7 @@ public class NodeSubscription { ...@@ -834,7 +832,7 @@ public class NodeSubscription {
notification.getElement().addElement("delay", "urn:xmpp:delay") notification.getElement().addElement("delay", "urn:xmpp:delay")
.addAttribute("stamp", fastDateFormat.format(publishedItem.getCreationDate())); .addAttribute("stamp", fastDateFormat.format(publishedItem.getCreationDate()));
// Send the event notification to the subscriber // Send the event notification to the subscriber
service.sendNotification(node, notification, jid); node.getService().sendNotification(node, notification, jid);
} }
/** /**
...@@ -846,7 +844,7 @@ public class NodeSubscription { ...@@ -846,7 +844,7 @@ public class NodeSubscription {
* @return true if the specified user is allowed to modify or cancel the subscription. * @return true if the specified user is allowed to modify or cancel the subscription.
*/ */
boolean canModify(JID user) { boolean canModify(JID user) {
return user.equals(getJID()) || user.equals(getOwner()) || service.isServiceAdmin(user); return user.equals(getJID()) || user.equals(getOwner()) || node.getService().isServiceAdmin(user);
} }
/** /**
...@@ -899,9 +897,9 @@ public class NodeSubscription { ...@@ -899,9 +897,9 @@ public class NodeSubscription {
Message authRequest = new Message(); Message authRequest = new Message();
authRequest.addExtension(node.getAuthRequestForm(this)); authRequest.addExtension(node.getAuthRequestForm(this));
authRequest.setTo(owner); authRequest.setTo(owner);
authRequest.setFrom(service.getAddress()); authRequest.setFrom(node.getService().getAddress());
// Send authentication request to node owners // Send authentication request to node owners
service.send(authRequest); node.getService().send(authRequest);
} }
/** /**
...@@ -912,7 +910,7 @@ public class NodeSubscription { ...@@ -912,7 +910,7 @@ public class NodeSubscription {
Message authRequest = new Message(); Message authRequest = new Message();
authRequest.addExtension(node.getAuthRequestForm(this)); authRequest.addExtension(node.getAuthRequestForm(this));
// Send authentication request to node owners // Send authentication request to node owners
service.broadcast(node, authRequest, node.getOwners()); node.getService().broadcast(node, authRequest, node.getOwners());
} }
/** /**
......
...@@ -39,9 +39,11 @@ import org.jivesoftware.openfire.RoutingTable; ...@@ -39,9 +39,11 @@ import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.XMPPServerListener; import org.jivesoftware.openfire.XMPPServerListener;
import org.jivesoftware.openfire.component.InternalComponentManager; import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.pubsub.cluster.RefreshNodeTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel; import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm; import org.xmpp.forms.DataForm;
...@@ -1210,6 +1212,8 @@ public class PubSubEngine { ...@@ -1210,6 +1212,8 @@ public class PubSubEngine {
else { else {
newNode.saveToDB(); newNode.saveToDB();
} }
CacheFactory.doClusterTask(new RefreshNodeTask(newNode));
} }
else { else {
conflict = true; conflict = true;
...@@ -1302,6 +1306,8 @@ public class PubSubEngine { ...@@ -1302,6 +1306,8 @@ public class PubSubEngine {
// Update node configuration with the provided data form // Update node configuration with the provided data form
// (and update the backend store) // (and update the backend store)
node.configure(completedForm); node.configure(completedForm);
CacheFactory.doClusterTask(new RefreshNodeTask(node));
// Return that node configuration was successful // Return that node configuration was successful
router.route(IQ.createResultIQ(iq)); router.route(IQ.createResultIQ(iq));
} }
......
...@@ -25,11 +25,8 @@ import java.util.Collection; ...@@ -25,11 +25,8 @@ import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
...@@ -166,10 +163,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -166,10 +163,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
} }
public void process(Packet packet) { public void process(Packet packet) {
// TODO Remove this method when moving PubSub as a component and removing module code
// The MUC service will receive all the packets whose domain matches the domain of the MUC
// service. This means that, for instance, a disco request should be responded by the
// service itself instead of relying on the server to handle the request.
try { try {
// Check if the packet is a disco request or a packet with namespace iq:register // Check if the packet is a disco request or a packet with namespace iq:register
if (packet instanceof IQ) { if (packet instanceof IQ) {
...@@ -510,27 +503,32 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -510,27 +503,32 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
return serviceEnabled; return serviceEnabled;
} }
@Override
public void joinedCluster() { public void joinedCluster() {
// Disable the service until we know that we are the senior cluster member // Disable the service until we know that we are the senior cluster member
enableService(false); // enableService(false);
} }
@Override
public void joinedCluster(byte[] nodeID) { public void joinedCluster(byte[] nodeID) {
// Do nothing // Do nothing
} }
@Override
public void leftCluster() { public void leftCluster() {
// Offer the service when not running in a cluster // Offer the service when not running in a cluster
enableService(true); // enableService(true);
} }
@Override
public void leftCluster(byte[] nodeID) { public void leftCluster(byte[] nodeID) {
// Do nothing // Do nothing
} }
@Override
public void markedAsSeniorClusterMember() { public void markedAsSeniorClusterMember() {
// Offer the service since we are the senior cluster member // Offer the service since we are the senior cluster member
enableService(true); // enableService(true);
} }
public Iterator<DiscoServerItem> getItems() { public Iterator<DiscoServerItem> getItems() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment