Commit a98ab974 authored by Armando Jagucki's avatar Armando Jagucki Committed by ajagucki

Initial import of PEP code, with PubSubEngine refactoring.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches@8754 b35dd754-fafc-0310-a699-88a17e54d16e
parent 23b4ca03
......@@ -25,6 +25,7 @@ import org.jivesoftware.openfire.container.PluginManager;
import org.jivesoftware.openfire.disco.IQDiscoInfoHandler;
import org.jivesoftware.openfire.disco.IQDiscoItemsHandler;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.disco.ServerIdentitiesProvider;
import org.jivesoftware.openfire.disco.ServerItemsProvider;
import org.jivesoftware.openfire.filetransfer.DefaultFileTransferManager;
import org.jivesoftware.openfire.filetransfer.FileTransferManager;
......@@ -37,6 +38,8 @@ import org.jivesoftware.openfire.net.MulticastDNSService;
import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.ServerTrafficCounter;
import org.jivesoftware.openfire.pubsub.PubSubModule;
import org.jivesoftware.openfire.pep.IQPEPHandler;
import org.jivesoftware.openfire.pep.IQPEPOwnerHandler;
import org.jivesoftware.openfire.roster.RosterManager;
import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.spi.*;
......@@ -494,9 +497,9 @@ public class XMPPServer {
loadModule(IQLastActivityHandler.class.getName());
loadModule(PresenceSubscribeHandler.class.getName());
loadModule(PresenceUpdateHandler.class.getName());
loadModule(IQDiscoInfoHandler.class.getName());
loadModule(IQDiscoItemsHandler.class.getName());
loadModule(IQOfflineMessagesHandler.class.getName());
loadModule(IQPEPHandler.class.getName());
loadModule(IQPEPOwnerHandler.class.getName());
loadModule(MultiUserChatServerImpl.class.getName());
loadModule(MulticastDNSService.class.getName());
loadModule(IQSharedGroupHandler.class.getName());
......@@ -507,6 +510,8 @@ public class XMPPServer {
loadModule(MediaProxyService.class.getName());
loadModule(STUNService.class.getName());
loadModule(PubSubModule.class.getName());
loadModule(IQDiscoInfoHandler.class.getName());
loadModule(IQDiscoItemsHandler.class.getName());
loadModule(UpdateManager.class.getName());
loadModule(InternalComponentManager.class.getName());
// Load this module always last since we don't want to start listening for clients
......@@ -1024,6 +1029,17 @@ public class XMPPServer {
public IQAuthHandler getIQAuthHandler() {
return (IQAuthHandler) modules.get(IQAuthHandler.class);
}
/**
* Returns the <code>IQPEPHandler</code> registered with this server. The
* <code>IQPEPHandler</code> was registered with the server as a module while starting up
* the server.
*
* @return the <code>IQPEPHandler</code> registered with this server.
*/
public IQPEPHandler getIQPEPHandler() {
return (IQPEPHandler) modules.get(IQPEPHandler.class);
}
/**
* Returns the <code>PluginManager</code> instance registered with this server.
......@@ -1195,6 +1211,21 @@ public class XMPPServer {
}
return answer;
}
/**
* Returns a list with all the modules that provide "discoverable" identities.
*
* @return a list with all the modules that provide "discoverable" identities.
*/
public List<ServerIdentitiesProvider> getServerIdentitiesProviders() {
List<ServerIdentitiesProvider> answer = new ArrayList<ServerIdentitiesProvider>();
for (Module module : modules.values()) {
if (module instanceof ServerIdentitiesProvider) {
answer.add((ServerIdentitiesProvider) module);
}
}
return answer;
}
/**
* Returns a list with all the modules that provide "discoverable" items associated with
......
......@@ -62,6 +62,7 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
private Map<String, DiscoInfoProvider> entities = new HashMap<String, DiscoInfoProvider>();
private Set<String> localServerFeatures = new CopyOnWriteArraySet<String>();
private Cache<String, Set<NodeID>> serverFeatures;
private List<Element> serverIdentities = new ArrayList<Element>();
private Map<String, DiscoInfoProvider> serverNodeProviders = new ConcurrentHashMap<String, DiscoInfoProvider>();
private IQHandlerInfo info;
......@@ -285,6 +286,20 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
for (ServerFeaturesProvider provider : server.getServerFeaturesProviders()) {
addServerFeaturesProvider(provider);
}
// Track the implementors of ServerIdentitiesProvider so that we can collect the identities
// for protocols supported by the server
for (ServerIdentitiesProvider provider : server.getServerIdentitiesProviders()) {
for (Iterator<Element> it = provider.getIdentities(); it.hasNext();) {
serverIdentities.add(it.next());
}
}
//
if (server.getIQPEPHandler() != null) {
Element userIdentity = DocumentHelper.createElement("identity");
userIdentity.addAttribute("category", "pubsub");
userIdentity.addAttribute("type", "pep");
registeredUserIdentities.add(0, userIdentity);
}
setProvider(server.getServerInfo().getName(), getServerInfoProvider());
// Listen to cluster events
ClusterManager.addListener(this);
......@@ -379,6 +394,11 @@ public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListene
identity.addAttribute("type", "im");
identities.add(identity);
// Include identities from modules that implement ServerIdentitiesProvider
for (Element identityElement : serverIdentities) {
identities.add(identityElement);
}
}
}
return identities.iterator();
......
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.disco;
import java.util.Iterator;
import org.dom4j.Element;
/**
* <p>
* A <code>ServerIdentityProvider</code> is responsible for providing the identities
* of protocols supported by the SERVER. An example of a server identity is that
* for PEP (XEP-0163): <identity category="pubsub" type="pep" />
* <p/>
*
* <p>
* When the server starts up, IQDiscoInfoHandler will request to all the services that implement
* the ServerIdentitiesProvider interface for their identities. Whenever a disco request is received
* IQDiscoInfoHandler will add to the provided information all the collected identities. Therefore, a
* service must implement this interface in order to offer/publish its identities as part of the
* server identities.
* </p>
*
* @author Armando Jagucki
*/
public interface ServerIdentitiesProvider {
/**
* Returns an Iterator (of Element) with the supported identities by the server. The identities to
* include are the identities of protocols supported by the SERVER. The idea is that
* different modules may provide their identities that will ultimately be included in the list
* of server identities.
*
* @return an Iterator (of Element) with identities of protocols supported by the server.
*/
public abstract Iterator<Element> getIdentities();
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.pep;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.disco.ServerIdentitiesProvider;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.pubsub.LeafNode;
import org.jivesoftware.openfire.pubsub.PubSubEngine;
import org.jivesoftware.util.Log;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <p>
* An IQHandler used to implement XEP-0163: "Personal Eventing via Pubsub."
* </p>
*
* <p>
* For each user on the server there is an associated PEPService interacting
* with a PubSubEngine for managing the user's PEP nodes.
* </p>
*
* <p>
* An IQHandler can only handle one namespace in its IQHandlerInfo. However, PEP
* related packets are seen having a variety of different namespaces. Thus,
* classes like IQPEPOwnerHandler are used to forward packets having these other
* namespaces to IQPEPHandler.handleIQ().
* <p>
*
* <p>
* This handler is used for the following namespaces:
* <ul>
* <li><i>http://jabber.org/protocol/pubsub</i></li>
* <li><i>http://jabber.org/protocol/pubsub#owner</i></li>
* </ul>
* </p>
*
* @author Armando Jagucki
*
*/
public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
ServerFeaturesProvider {
// Map of PEP services. Table, Key: bare JID (String); Value: PEPService
private Map<String, PEPService> pepServices;
private IQHandlerInfo info;
private PubSubEngine pubSubEngine = null;
public IQPEPHandler() {
super("Personal Eventing Handler");
pepServices = new ConcurrentHashMap<String, PEPService>();
info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub");
}
@Override
public void initialize(XMPPServer server) {
super.initialize(server);
pubSubEngine = new PubSubEngine(server.getPacketRouter());
}
@Override
public IQHandlerInfo getInfo() {
return info;
}
@Override
public IQ handleIQ(IQ packet) throws UnauthorizedException {
// TODO: Much to be done here...
if (packet.getTo() == null) {
String jidFrom = packet.getFrom().toBareJID();
PEPService pepService = pepServices.get(jidFrom);
// If no service exists yet for jidFrom, create one.
if (pepService == null) {
pepService = new PEPService(XMPPServer.getInstance(), jidFrom);
pepServices.put(jidFrom, pepService);
pubSubEngine.start(pepService); // Keep DB synced
Log.debug("PEP: " + jidFrom + " had a PEPService created");
}
// If publishing a node, and the node doesn't exist, create it.
if (packet.getType() == IQ.Type.set) {
Element childElement = packet.getChildElement();
Element publishElement = childElement.element("publish");
if (publishElement != null) {
String nodeID = publishElement.attributeValue("node");
if (pepService.getNode(nodeID) == null) {
// Create the node
JID creator = new JID(jidFrom);
LeafNode newNode = new LeafNode(pepService, null, nodeID, creator);
newNode.addOwner(creator);
newNode.saveToDB();
Log.debug("PEP: Created node ('" + nodeID + "') for " + jidFrom);
}
}
}
// Process with PubSub as usual.
if (pubSubEngine.process(pepService, packet)) {
Log.debug("PEP: The pubSubEngine processed a packet for " + jidFrom + "'s pepService.");
}
else {
Log.debug("PEP: The pubSubEngine did not process a packet for " + jidFrom + "'s pepService.");
}
}
else {
// TODO: Ensure the packet gets handled elsewhere.
}
return null; // Error flows are handled in pubSubEngine.process(...)
}
public void start() {
super.start();
for (PEPService service : pepServices.values()) {
pubSubEngine.start(service);
}
}
public void stop() {
super.stop();
for (PEPService service : pepServices.values()) {
pubSubEngine.shutdown(service);
}
}
/**
* Implements ServerIdentitiesProvider, adding the PEP identity to the
* server's disco#info result.
*/
public Iterator<Element> getIdentities() {
ArrayList<Element> identities = new ArrayList<Element>();
Element identity = DocumentHelper.createElement("identity");
identity.addAttribute("category", "pubsub");
identity.addAttribute("type", "pep");
identities.add(identity);
return identities.iterator();
}
/**
* Implements ServerFeaturesProvider to include all supported XEP-0060 features
* in the server's disco#info result (as per section 4 of XEP-0163).
*/
public Iterator<String> getFeatures() {
return XMPPServer.getInstance().getPubSubModule().getFeatures(null, null, null);
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.pep;
import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.XMPPServer;
import org.xmpp.packet.IQ;
/**
* <p>
* An IQHandler used to implement XEP-0163: "Personal Eventing via Pubsub."
* </p>
*
* <p>
* An IQHandler can only handle one namespace in its IQHandlerInfo. However, PEP
* related packets are seen having a variety of different namespaces. This
* handler is needed to forward IQ packets with the
* <i>'http://jabber.org/protocol/pubsub#owner'</i> namespace to IQPEPHandler.
* </p>
*
* @author Armando Jagucki
*
*/
public class IQPEPOwnerHandler extends IQHandler {
private IQHandlerInfo info;
public IQPEPOwnerHandler() {
super("Personal Eventing 'pubsub#owner' Handler");
info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub#owner");
}
@Override
public IQHandlerInfo getInfo() {
return info;
}
@Override
public IQ handleIQ(IQ packet) throws UnauthorizedException {
return XMPPServer.getInstance().getIQPEPHandler().handleIQ(packet);
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.pep;
import org.jivesoftware.openfire.commands.AdHocCommandManager;
import org.jivesoftware.openfire.pubsub.CollectionNode;
import org.jivesoftware.openfire.pubsub.DefaultNodeConfiguration;
import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.PendingSubscriptionsCommand;
import org.jivesoftware.openfire.pubsub.PubSubEngine;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.jivesoftware.openfire.pubsub.PubSubService;
import org.jivesoftware.openfire.pubsub.PublishedItem;
import org.jivesoftware.openfire.pubsub.PublishedItemTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.StringUtils;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A PEPService is a PubSubService for use with XEP-0163: "Personal Eventing via
* Pubsub."
*
* @author Armando Jagucki
*
*/
public class PEPService implements PubSubService {
/**
* The bare JID that this service is identified by.
*/
private String bareJID;
/**
* Collection node that acts as the root node of the entire node hierarchy.
*/
private CollectionNode rootCollectionNode = null;
/**
* Nodes managed by this service, table: key nodeID (String); value Node
*/
private Map<String, Node> nodes = new ConcurrentHashMap<String, Node>();
/**
* Keep a registry of the presence's show value of users that subscribed to
* a node of the pubsub service and for which the node only delivers
* notifications for online users or node subscriptions deliver events based
* on the user presence show value. Offline users will not have an entry in
* the map. Note: Key-> bare JID and Value-> Map whose key is full JID of
* connected resource and value is show value of the last received presence.
*/
private Map<String, Map<String, String>> barePresences = new ConcurrentHashMap<String, Map<String, String>>();
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>();
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>();
/**
* Manager that keeps the list of ad-hoc commands and processing command
* requests.
*/
private AdHocCommandManager manager;
/**
* The time to elapse between each execution of the maintenance process.
* Default is 2 minutes.
*/
public int items_task_timeout = 2 * 60 * 1000;
/**
* Task that saves or deletes published items from the database.
*/
private PublishedItemTask publishedItemTask;
/**
* Timer to save published items to the database or remove deleted or old
* items.
*/
private Timer timer = new Timer("PEP service maintenance");
/**
* Returns the permission policy for creating nodes. A true value means that
* not anyone can create a node, only the JIDs listed in
* <code>allowedToCreate</code> are allowed to create nodes.
*/
private boolean nodeCreationRestricted = false;
/**
* Flag that indicates if a user may have more than one subscription with
* the node. When multiple subscriptions is enabled each subscription
* request, event notification and unsubscription request should include a
* subid attribute.
*/
private boolean multipleSubscriptionsEnabled = true;
/**
* The packet router for the server.
*/
private PacketRouter router = null;
/**
* Default configuration to use for newly created leaf nodes.
*/
private DefaultNodeConfiguration leafDefaultConfiguration;
/**
* Default configuration to use for newly created collection nodes.
*/
private DefaultNodeConfiguration collectionDefaultConfiguration;
/**
* Constructs a PEPService.
*
* @param server
* the XMPP server.
*/
public PEPService(XMPPServer server, String bareJID) {
this.bareJID = bareJID;
router = server.getPacketRouter();
// Initialize the ad-hoc commands manager to use for this pubsub service
manager = new AdHocCommandManager();
manager.addCommand(new PendingSubscriptionsCommand(this));
// Save or delete published items from the database every 2 minutes
// starting in
// 2 minutes (default values)
publishedItemTask = new PublishedItemTask(this);
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
multipleSubscriptionsEnabled = JiveGlobals.getBooleanProperty("xmpp.pubsub.multiple-subscriptions", true);
// Load default configuration for leaf nodes
leafDefaultConfiguration = PubSubPersistenceManager.loadDefaultConfiguration(this, true);
if (leafDefaultConfiguration == null) {
// Create and save default configuration for leaf nodes;
leafDefaultConfiguration = new DefaultNodeConfiguration(true);
leafDefaultConfiguration.setAccessModel(AccessModel.open);
leafDefaultConfiguration.setPublisherModel(PublisherModel.publishers);
leafDefaultConfiguration.setDeliverPayloads(true);
leafDefaultConfiguration.setLanguage("English");
leafDefaultConfiguration.setMaxPayloadSize(5120);
leafDefaultConfiguration.setNotifyConfigChanges(true);
leafDefaultConfiguration.setNotifyDelete(true);
leafDefaultConfiguration.setNotifyRetract(true);
leafDefaultConfiguration.setPersistPublishedItems(false);
leafDefaultConfiguration.setMaxPublishedItems(-1);
leafDefaultConfiguration.setPresenceBasedDelivery(false);
leafDefaultConfiguration.setSendItemSubscribe(true);
leafDefaultConfiguration.setSubscriptionEnabled(true);
leafDefaultConfiguration.setReplyPolicy(null);
PubSubPersistenceManager.createDefaultConfiguration(this, leafDefaultConfiguration);
}
// Load default configuration for collection nodes
collectionDefaultConfiguration = PubSubPersistenceManager.loadDefaultConfiguration(this, false);
if (collectionDefaultConfiguration == null) {
// Create and save default configuration for collection nodes;
collectionDefaultConfiguration = new DefaultNodeConfiguration(false);
collectionDefaultConfiguration.setAccessModel(AccessModel.open);
collectionDefaultConfiguration.setPublisherModel(PublisherModel.publishers);
collectionDefaultConfiguration.setDeliverPayloads(false);
collectionDefaultConfiguration.setLanguage("English");
collectionDefaultConfiguration.setNotifyConfigChanges(true);
collectionDefaultConfiguration.setNotifyDelete(true);
collectionDefaultConfiguration.setNotifyRetract(true);
collectionDefaultConfiguration.setPresenceBasedDelivery(false);
collectionDefaultConfiguration.setSubscriptionEnabled(true);
collectionDefaultConfiguration.setReplyPolicy(null);
collectionDefaultConfiguration.setAssociationPolicy(CollectionNode.LeafNodeAssociationPolicy.all);
collectionDefaultConfiguration.setMaxLeafNodes(-1);
PubSubPersistenceManager.createDefaultConfiguration(this, collectionDefaultConfiguration);
}
// Load nodes to memory
PubSubPersistenceManager.loadNodes(this);
// Ensure that we have a root collection node
String rootNodeID = JiveGlobals.getProperty("xmpp.pubsub.root.nodeID", "");
if (nodes.isEmpty()) {
// Create root collection node
String creator = JiveGlobals.getProperty("xmpp.pubsub.root.creator");
JID creatorJID = creator != null ? new JID(creator) : server.getAdmins().iterator().next();
rootCollectionNode = new CollectionNode(this, null, rootNodeID, creatorJID);
// Add the creator as the node owner
rootCollectionNode.addOwner(creatorJID);
// Save new root node
rootCollectionNode.saveToDB();
}
else {
rootCollectionNode = (CollectionNode) getNode(rootNodeID);
}
}
public void addNode(Node node) {
nodes.put(node.getNodeID(), node);
}
public void broadcast(Node node, Message message, Collection<JID> jids) {
message.setFrom(getAddress());
for (JID jid : jids) {
message.setTo(jid);
message.setID(node.getNodeID() + "__" + jid.toBareJID() + "__" + StringUtils.randomString(5));
router.route(message);
}
}
public boolean canCreateNode(JID creator) {
// Node creation is always allowed for sysadmin
if (isNodeCreationRestricted() && !isServiceAdmin(creator)) {
// The user is not allowed to create nodes
return false;
}
return true;
}
public JID getAddress() {
return new JID(bareJID);
}
public DefaultNodeConfiguration getDefaultNodeConfiguration(boolean leafType) {
if (leafType) {
return leafDefaultConfiguration;
}
return collectionDefaultConfiguration;
}
public Node getNode(String nodeID) {
return nodes.get(nodeID);
}
public Collection<Node> getNodes() {
return nodes.values();
}
public CollectionNode getRootCollectionNode() {
return rootCollectionNode;
}
public String getServiceID() {
return bareJID; // The bare JID of the user is the service ID
}
public Collection<String> getShowPresences(JID subscriber) {
return PubSubEngine.getShowPresences(this, subscriber);
}
public boolean isCollectionNodesSupported() {
return false;
}
public boolean isInstantNodeSupported() {
return true;
}
public boolean isMultipleSubscriptionsEnabled() {
return multipleSubscriptionsEnabled;
}
public boolean isServiceAdmin(JID user) {
// Here we consider a 'service admin' to be the user that this PEPService
// is associated with.
if (bareJID == user.toBareJID()) {
return true;
}
else {
return false;
}
}
public void presenceSubscriptionNotRequired(Node node, JID user) {
PubSubEngine.presenceSubscriptionNotRequired(this, node, user);
}
public void presenceSubscriptionRequired(Node node, JID user) {
PubSubEngine.presenceSubscriptionRequired(this, node, user);
}
public void removeNode(String nodeID) {
nodes.remove(nodeID);
}
public void send(Packet packet) {
router.route(packet);
}
public void sendNotification(Node node, Message message, JID jid) {
message.setFrom(getAddress());
message.setTo(jid);
message.setID(node.getNodeID() + "__" + jid.toBareJID() + "__" + StringUtils.randomString(5));
router.route(message);
}
public boolean isNodeCreationRestricted() {
return nodeCreationRestricted;
}
public void queueItemToAdd(PublishedItem newItem) {
PubSubEngine.queueItemToAdd(this, newItem);
}
public void queueItemToRemove(PublishedItem removedItem) {
PubSubEngine.queueItemToRemove(this, removedItem);
}
public Map<String, Map<String, String>> getBarePresences() {
return barePresences;
}
public Queue<PublishedItem> getItemsToAdd() {
return itemsToAdd;
}
public Queue<PublishedItem> getItemsToDelete() {
return itemsToDelete;
}
public AdHocCommandManager getManager() {
return manager;
}
public PublishedItemTask getPublishedItemTask() {
return publishedItemTask;
}
public void setPublishedItemTask(PublishedItemTask task) {
publishedItemTask = task;
}
public Timer getTimer() {
return timer;
}
public int getItemsTaskTimeout() {
return items_task_timeout;
}
public void setItemsTaskTimeout(int timeout) {
items_task_timeout = timeout;
}
}
......@@ -51,7 +51,7 @@ public class CollectionNode extends Node {
*/
private int maxLeafNodes = -1;
CollectionNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
public CollectionNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
super(service, parentNode, nodeID, creator);
// Configure node with default values (get them from the pubsub service)
DefaultNodeConfiguration defaultConfiguration = service.getDefaultNodeConfiguration(false);
......
......@@ -61,7 +61,7 @@ public class LeafNode extends Node {
// TODO Add checking of max payload size. Return <not-acceptable> plus a application specific error condition of <payload-too-big/>.
LeafNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
public LeafNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
super(service, parentNode, nodeID, creator);
// Configure node with default values (get them from the pubsub service)
DefaultNodeConfiguration defaultConfiguration = service.getDefaultNodeConfiguration(true);
......
......@@ -26,11 +26,11 @@ import java.util.List;
*
* @author Matt Tucker
*/
class PendingSubscriptionsCommand extends AdHocCommand {
public class PendingSubscriptionsCommand extends AdHocCommand {
private PubSubService service;
PendingSubscriptionsCommand(PubSubService service) {
public PendingSubscriptionsCommand(PubSubService service) {
this.service = service;
}
......
......@@ -14,13 +14,11 @@ package org.jivesoftware.openfire.pubsub;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.XMPPServerListener;
import org.jivesoftware.openfire.commands.AdHocCommandManager;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.user.UserManager;
import org.xmpp.forms.DataForm;
......@@ -29,81 +27,32 @@ import org.xmpp.packet.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A PubSubEngine is responsible for handling packets sent to the pub-sub service.
* A PubSubEngine is responsible for handling packets sent to a pub-sub service.
*
* @author Matt Tucker
*/
public class PubSubEngine {
private PubSubService service;
/**
* Manager that keeps the list of ad-hoc commands and processing command requests.
*/
private AdHocCommandManager manager;
/**
* Keep a registry of the presence's show value of users that subscribed to a node of
* the pubsub service and for which the node only delivers notifications for online users
* or node subscriptions deliver events based on the user presence show value. Offline
* users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key
* is full JID of connected resource and value is show value of the last received presence.
*/
private Map<String, Map<String, String>> barePresences =
new ConcurrentHashMap<String, Map<String, String>>();
/**
* The time to elapse between each execution of the maintenance process. Default
* is 2 minutes.
*/
private int items_task_timeout = 2 * 60 * 1000;
/**
* The number of items to save on each run of the maintenance process.
*/
private int items_batch_size = 50;
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>();
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>();
/**
* Task that saves or deletes published items from the database.
*/
private PublishedItemTask publishedItemTask;
/**
* Timer to save published items to the database or remove deleted or old items.
*/
private Timer timer = new Timer("PubSub maintenance");
/**
* The packet router for the server.
*/
private PacketRouter router = null;
public PubSubEngine(PubSubService pubSubService, PacketRouter router) {
this.service = pubSubService;
public PubSubEngine(PacketRouter router) {
this.router = router;
// Initialize the ad-hoc commands manager to use for this pubsub service
manager = new AdHocCommandManager();
manager.addCommand(new PendingSubscriptionsCommand(service));
// Save or delete published items from the database every 2 minutes starting in
// 2 minutes (default values)
publishedItemTask = new PublishedItemTask();
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
}
/**
* Handles IQ packets sent to the pubsub service. Requests of disco#info and disco#items
* are not being handled by the engine. Instead the service itself should handle disco packets.
*
* @param service the PubSub service this action is to be performed for.
* @param iq the IQ packet sent to the pubsub service.
* @return true if the IQ packet was handled by the engine.
*/
public boolean process(IQ iq) {
public boolean process(PubSubService service, IQ iq) {
// Ignore IQs of type ERROR or RESULT
if (IQ.Type.error == iq.getType() || IQ.Type.result == iq.getType()) {
return true;
......@@ -118,61 +67,61 @@ public class PubSubEngine {
Element action = childElement.element("publish");
if (action != null) {
// Entity publishes an item
publishItemsToNode(iq, action);
publishItemsToNode(service, iq, action);
return true;
}
action = childElement.element("subscribe");
if (action != null) {
// Entity subscribes to a node
subscribeNode(iq, childElement, action);
subscribeNode(service, iq, childElement, action);
return true;
}
action = childElement.element("options");
if (action != null) {
if (IQ.Type.get == iq.getType()) {
// Subscriber requests subscription options form
getSubscriptionConfiguration(iq, childElement, action);
getSubscriptionConfiguration(service, iq, childElement, action);
}
else {
// Subscriber submits completed options form
configureSubscription(iq, action);
configureSubscription(service, iq, action);
}
return true;
}
action = childElement.element("create");
if (action != null) {
// Entity is requesting to create a new node
createNode(iq, childElement, action);
createNode(service, iq, childElement, action);
return true;
}
action = childElement.element("unsubscribe");
if (action != null) {
// Entity unsubscribes from a node
unsubscribeNode(iq, action);
unsubscribeNode(service, iq, action);
return true;
}
action = childElement.element("subscriptions");
if (action != null) {
// Entity requests all current subscriptions
getSubscriptions(iq, childElement);
getSubscriptions(service, iq, childElement);
return true;
}
action = childElement.element("affiliations");
if (action != null) {
// Entity requests all current affiliations
getAffiliations(iq, childElement);
getAffiliations(service, iq, childElement);
return true;
}
action = childElement.element("items");
if (action != null) {
// Subscriber requests all active items
getPublishedItems(iq, action);
getPublishedItems(service, iq, action);
return true;
}
action = childElement.element("retract");
if (action != null) {
// Entity deletes an item
deleteItems(iq, action);
deleteItems(service, iq, action);
return true;
}
// Unknown action requested
......@@ -200,11 +149,11 @@ public class PubSubEngine {
}
if (IQ.Type.get == iq.getType()) {
// Owner requests configuration form of a node
getNodeConfiguration(iq, childElement, nodeID);
getNodeConfiguration(service, iq, childElement, nodeID);
}
else {
// Owner submits or cancels node configuration form
configureNode(iq, action, nodeID);
configureNode(service, iq, action, nodeID);
}
return true;
}
......@@ -212,23 +161,23 @@ public class PubSubEngine {
if (action != null) {
// Owner requests default configuration options for
// leaf or collection nodes
getDefaultNodeConfiguration(iq, childElement, action);
getDefaultNodeConfiguration(service, iq, childElement, action);
return true;
}
action = childElement.element("delete");
if (action != null) {
// Owner deletes a node
deleteNode(iq, action);
deleteNode(service, iq, action);
return true;
}
action = childElement.element("subscriptions");
if (action != null) {
if (IQ.Type.get == iq.getType()) {
// Owner requests all affiliated entities
getNodeSubscriptions(iq, action);
getNodeSubscriptions(service, iq, action);
}
else {
modifyNodeSubscriptions(iq, action);
modifyNodeSubscriptions(service, iq, action);
}
return true;
}
......@@ -236,17 +185,17 @@ public class PubSubEngine {
if (action != null) {
if (IQ.Type.get == iq.getType()) {
// Owner requests all affiliated entities
getNodeAffiliations(iq, action);
getNodeAffiliations(service, iq, action);
}
else {
modifyNodeAffiliations(iq, action);
modifyNodeAffiliations(service, iq, action);
}
return true;
}
action = childElement.element("purge");
if (action != null) {
// Owner purges items from a node
purgeNode(iq, action);
purgeNode(service, iq, action);
return true;
}
// Unknown action requested so return error to sender
......@@ -255,7 +204,7 @@ public class PubSubEngine {
}
else if ("http://jabber.org/protocol/commands".equals(namespace)) {
// Process ad-hoc command
IQ reply = manager.process(iq);
IQ reply = service.getManager().process(iq);
router.route(reply);
return true;
}
......@@ -266,18 +215,19 @@ public class PubSubEngine {
* Handles Presence packets sent to the pubsub service. Only process available and not
* available presences.
*
* @param service the PubSub service this action is to be performed for.
* @param presence the Presence packet sent to the pubsub service.
*/
public void process(Presence presence) {
public void process(PubSubService service, Presence presence) {
if (presence.isAvailable()) {
JID subscriber = presence.getFrom();
Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());
Map<String, String> fullPresences = service.getBarePresences().get(subscriber.toBareJID());
if (fullPresences == null) {
synchronized (subscriber.toBareJID().intern()) {
fullPresences = barePresences.get(subscriber.toBareJID());
fullPresences = service.getBarePresences().get(subscriber.toBareJID());
if (fullPresences == null) {
fullPresences = new ConcurrentHashMap<String, String>();
barePresences.put(subscriber.toBareJID(), fullPresences);
service.getBarePresences().put(subscriber.toBareJID(), fullPresences);
}
}
}
......@@ -286,11 +236,11 @@ public class PubSubEngine {
}
else if (presence.getType() == Presence.Type.unavailable) {
JID subscriber = presence.getFrom();
Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());
Map<String, String> fullPresences = service.getBarePresences().get(subscriber.toBareJID());
if (fullPresences != null) {
fullPresences.remove(subscriber.toString());
if (fullPresences.isEmpty()) {
barePresences.remove(subscriber.toBareJID());
service.getBarePresences().remove(subscriber.toBareJID());
}
}
}
......@@ -303,16 +253,17 @@ public class PubSubEngine {
* Answers to authorization requests sent to node owners to approve pending subscriptions
* will also be processed by this method.
*
* @param service the PubSub service this action is to be performed for.
* @param message the Message packet sent to the pubsub service.
*/
public void process(Message message) {
public void process(PubSubService service, Message message) {
if (message.getType() == Message.Type.error) {
// Process Messages of type error to identify possible subscribers that no longer exist
if (message.getError().getType() == PacketError.Type.cancel) {
// TODO Assuming that owner is the bare JID (as defined in the JEP). This can be replaced with an explicit owner specified in the packet
JID owner = new JID(message.getFrom().toBareJID());
// Terminate the subscription of the entity to all nodes hosted at the service
cancelAllSubscriptions(owner);
cancelAllSubscriptions(service, owner);
}
else if (message.getError().getType() == PacketError.Type.auth) {
// TODO Queue the message to be sent again later (will retry a few times and
......@@ -327,13 +278,13 @@ public class PubSubEngine {
// Check that completed data form belongs to an authorization request
if ("http://jabber.org/protocol/pubsub#subscribe_authorization".equals(formType)) {
// Process the answer to the authorization request
processAuthorizationAnswer(authForm, message);
processAuthorizationAnswer(service, authForm, message);
}
}
}
}
private void publishItemsToNode(IQ iq, Element publishElement) {
private void publishItemsToNode(PubSubService service, IQ iq, Element publishElement) {
String nodeID = publishElement.attributeValue("node");
Node node;
if (nodeID == null) {
......@@ -420,7 +371,7 @@ public class PubSubEngine {
leafNode.publishItems(from, items);
}
private void deleteItems(IQ iq, Element retractElement) {
private void deleteItems(PubSubService service, IQ iq, Element retractElement) {
String nodeID = retractElement.attributeValue("node");
Node node;
if (nodeID == null) {
......@@ -503,7 +454,7 @@ public class PubSubEngine {
leafNode.deleteItems(items);
}
private void subscribeNode(IQ iq, Element childElement, Element subscribeElement) {
private void subscribeNode(PubSubService service, IQ iq, Element childElement, Element subscribeElement) {
String nodeID = subscribeElement.attributeValue("node");
Node node;
if (nodeID == null) {
......@@ -631,7 +582,7 @@ public class PubSubEngine {
optionsForm);
}
private void unsubscribeNode(IQ iq, Element unsubscribeElement) {
private void unsubscribeNode(PubSubService service, IQ iq, Element unsubscribeElement) {
String nodeID = unsubscribeElement.attributeValue("node");
String subID = unsubscribeElement.attributeValue("subid");
Node node;
......@@ -719,7 +670,8 @@ public class PubSubEngine {
router.route(IQ.createResultIQ(iq));
}
private void getSubscriptionConfiguration(IQ iq, Element childElement, Element optionsElement) {
private void getSubscriptionConfiguration(PubSubService service, IQ iq,
Element childElement, Element optionsElement) {
String nodeID = optionsElement.attributeValue("node");
String subID = optionsElement.attributeValue("subid");
Node node;
......@@ -800,7 +752,7 @@ public class PubSubEngine {
router.route(reply);
}
private void configureSubscription(IQ iq, Element optionsElement) {
private void configureSubscription(PubSubService service, IQ iq, Element optionsElement) {
String nodeID = optionsElement.attributeValue("node");
String subID = optionsElement.attributeValue("subid");
Node node;
......@@ -885,7 +837,7 @@ public class PubSubEngine {
}
}
private void getSubscriptions(IQ iq, Element childElement) {
private void getSubscriptions(PubSubService service, IQ iq, Element childElement) {
// TODO Assuming that owner is the bare JID (as defined in the JEP). This can be replaced with an explicit owner specified in the packet
JID owner = new JID(iq.getFrom().toBareJID());
// Collect subscriptions of owner for all nodes at the service
......@@ -924,7 +876,7 @@ public class PubSubEngine {
router.route(reply);
}
private void getAffiliations(IQ iq, Element childElement) {
private void getAffiliations(PubSubService service, IQ iq, Element childElement) {
// TODO Assuming that owner is the bare JID (as defined in the JEP). This can be replaced with an explicit owner specified in the packet
JID owner = new JID(iq.getFrom().toBareJID());
// Collect affiliations of owner for all nodes at the service
......@@ -960,7 +912,7 @@ public class PubSubEngine {
router.route(reply);
}
private void getPublishedItems(IQ iq, Element itemsElement) {
private void getPublishedItems(PubSubService service, IQ iq, Element itemsElement) {
String nodeID = itemsElement.attributeValue("node");
String subID = itemsElement.attributeValue("subid");
Node node;
......@@ -1095,7 +1047,7 @@ public class PubSubEngine {
leafNode.sendPublishedItems(iq, items, forceToIncludePayload);
}
private void createNode(IQ iq, Element childElement, Element createElement) {
private void createNode(PubSubService service, IQ iq, Element childElement, Element createElement) {
// Get sender of the IQ packet
JID from = iq.getFrom();
// Verify that sender has permissions to create nodes
......@@ -1260,7 +1212,7 @@ public class PubSubEngine {
}
}
private void getNodeConfiguration(IQ iq, Element childElement, String nodeID) {
private void getNodeConfiguration(PubSubService service, IQ iq, Element childElement, String nodeID) {
Node node = service.getNode(nodeID);
if (node == null) {
// Node does not exist. Return item-not-found error
......@@ -1281,7 +1233,8 @@ public class PubSubEngine {
router.route(reply);
}
private void getDefaultNodeConfiguration(IQ iq, Element childElement, Element defaultElement) {
private void getDefaultNodeConfiguration(PubSubService service, IQ iq,
Element childElement, Element defaultElement) {
String type = defaultElement.attributeValue("type");
type = type == null ? "leaf" : type;
......@@ -1304,7 +1257,7 @@ public class PubSubEngine {
router.route(reply);
}
private void configureNode(IQ iq, Element configureElement, String nodeID) {
private void configureNode(PubSubService service, IQ iq, Element configureElement, String nodeID) {
Node node = service.getNode(nodeID);
if (node == null) {
// Node does not exist. Return item-not-found error
......@@ -1338,7 +1291,7 @@ public class PubSubEngine {
}
}
private void deleteNode(IQ iq, Element deleteElement) {
private void deleteNode(PubSubService service, IQ iq, Element deleteElement) {
String nodeID = deleteElement.attributeValue("node");
if (nodeID == null) {
// NodeID was not provided. Return bad-request error
......@@ -1373,7 +1326,7 @@ public class PubSubEngine {
}
}
private void purgeNode(IQ iq, Element purgeElement) {
private void purgeNode(PubSubService service, IQ iq, Element purgeElement) {
String nodeID = purgeElement.attributeValue("node");
if (nodeID == null) {
// NodeID was not provided. Return bad-request error
......@@ -1414,7 +1367,7 @@ public class PubSubEngine {
router.route(IQ.createResultIQ(iq));
}
private void getNodeSubscriptions(IQ iq, Element affiliationsElement) {
private void getNodeSubscriptions(PubSubService service, IQ iq, Element affiliationsElement) {
String nodeID = affiliationsElement.attributeValue("node");
if (nodeID == null) {
// NodeID was not provided. Return bad-request error.
......@@ -1437,7 +1390,7 @@ public class PubSubEngine {
node.sendSubscriptions(iq);
}
private void modifyNodeSubscriptions(IQ iq, Element entitiesElement) {
private void modifyNodeSubscriptions(PubSubService service, IQ iq, Element entitiesElement) {
String nodeID = entitiesElement.attributeValue("node");
if (nodeID == null) {
// NodeID was not provided. Return bad-request error.
......@@ -1497,7 +1450,7 @@ public class PubSubEngine {
router.route(reply);
}
private void getNodeAffiliations(IQ iq, Element affiliationsElement) {
private void getNodeAffiliations(PubSubService service, IQ iq, Element affiliationsElement) {
String nodeID = affiliationsElement.attributeValue("node");
if (nodeID == null) {
// NodeID was not provided. Return bad-request error.
......@@ -1520,7 +1473,7 @@ public class PubSubEngine {
node.sendAffiliations(iq);
}
private void modifyNodeAffiliations(IQ iq, Element entitiesElement) {
private void modifyNodeAffiliations(PubSubService service, IQ iq, Element entitiesElement) {
String nodeID = entitiesElement.attributeValue("node");
if (nodeID == null) {
// NodeID was not provided. Return bad-request error.
......@@ -1603,9 +1556,10 @@ public class PubSubEngine {
* The affiliation with the node will be removed if the entity was not a node owner or
* publisher.
*
* @param service the PubSub service this action is to be performed for.
* @param user the entity that no longer exists.
*/
private void cancelAllSubscriptions(JID user) {
private void cancelAllSubscriptions(PubSubService service, JID user) {
for (Node node : service.getNodes()) {
NodeAffiliate affiliate = node.getAffiliate(user);
if (affiliate == null) {
......@@ -1618,7 +1572,7 @@ public class PubSubEngine {
}
}
private void processAuthorizationAnswer(DataForm authForm, Message message) {
private void processAuthorizationAnswer(PubSubService service, DataForm authForm, Message message) {
String nodeID = authForm.getField("pubsub#node").getValues().get(0);
String subID = authForm.getField("pubsub#subid").getValues().get(0);
String allow = authForm.getField("pubsub#allow").getValues().get(0);
......@@ -1719,21 +1673,22 @@ public class PubSubEngine {
return completedForm;
}
public void start() {
public void start(PubSubService service) {
// Probe presences of users that this service has subscribed to (once the server
// has started)
final PubSubService tempService = service; // TODO: Needs to be tested for correctness
XMPPServer.getInstance().addServerListener(new XMPPServerListener() {
public void serverStarted() {
Set<JID> affiliates = new HashSet<JID>();
for (Node node : service.getNodes()) {
for (Node node : tempService.getNodes()) {
affiliates.addAll(node.getPresenceBasedSubscribers());
}
for (JID jid : affiliates) {
// Send probe presence
Presence subscription = new Presence(Presence.Type.probe);
subscription.setTo(jid);
subscription.setFrom(service.getAddress());
service.send(subscription);
subscription.setFrom(tempService.getAddress());
tempService.send(subscription);
}
}
......@@ -1742,26 +1697,26 @@ public class PubSubEngine {
});
}
public void shutdown() {
// Stop te maintenance processes
timer.cancel();
public void shutdown(PubSubService service) {
// Stop the maintenance processes
service.getTimer().cancel();
// Delete from the database items contained in the itemsToDelete queue
PublishedItem entry;
while (!itemsToDelete.isEmpty()) {
entry = itemsToDelete.poll();
while (!service.getItemsToDelete().isEmpty()) {
entry = service.getItemsToDelete().poll();
if (entry != null) {
PubSubPersistenceManager.removePublishedItem(service, entry);
}
}
// Save to the database items contained in the itemsToAdd queue
while (!itemsToAdd.isEmpty()) {
entry = itemsToAdd.poll();
while (!service.getItemsToAdd().isEmpty()) {
entry = service.getItemsToAdd().poll();
if (entry != null) {
PubSubPersistenceManager.createPublishedItem(service, entry);
}
}
// Stop executing ad-hoc commands
manager.stop();
service.getManager().stop();
}
/*******************************************************************************
......@@ -1775,12 +1730,13 @@ public class PubSubEngine {
* is offline then an empty collectin is returned. Available show status is represented
* by a <tt>online</tt> value. The rest of the possible show values as defined in RFC 3921.
*
* @param service the PubSub service this action is to be performed for.
* @param subscriber the JID of the subscriber. This is not the JID of the affiliate.
* @return an empty collection when offline. Otherwise, a collection with the show value
* of each connected resource.
*/
public Collection<String> getShowPresences(JID subscriber) {
Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());
public static Collection<String> getShowPresences(PubSubService service, JID subscriber) {
Map<String, String> fullPresences = service.getBarePresences().get(subscriber.toBareJID());
if (fullPresences == null) {
// User is offline so return empty list
return Collections.emptyList();
......@@ -1805,10 +1761,11 @@ public class PubSubEngine {
* Requests the pubsub service to subscribe to the presence of the user. If the service
* has already subscribed to the user's presence then do nothing.
*
* @param service the PubSub service this action is to be performed for.
* @param node the node that originated the subscription request.
* @param user the JID of the affiliate to subscribe to his presence.
*/
public void presenceSubscriptionNotRequired(Node node, JID user) {
public static void presenceSubscriptionNotRequired(PubSubService service, Node node, JID user) {
// Check that no node is requiring to be subscribed to this user
for (Node hostedNode : service.getNodes()) {
if (hostedNode.isPresenceBasedDelivery(user)) {
......@@ -1828,11 +1785,12 @@ public class PubSubEngine {
* was not subscribed to the user's presence or any node still requires to be subscribed to
* the user presence then do nothing.
*
* @param service the PubSub service this action is to be performed for.
* @param node the node that originated the unsubscription request.
* @param user the JID of the affiliate to unsubscribe from his presence.
*/
public void presenceSubscriptionRequired(Node node, JID user) {
Map<String, String> fullPresences = barePresences.get(user.toString());
public static void presenceSubscriptionRequired(PubSubService service, Node node, JID user) {
Map<String, String> fullPresences = service.getBarePresences().get(user.toString());
if (fullPresences == null || fullPresences.isEmpty()) {
Presence subscription = new Presence(Presence.Type.subscribe);
subscription.setTo(user);
......@@ -1840,10 +1798,10 @@ public class PubSubEngine {
service.send(subscription);
// Sending subscription requests based on received presences may generate
// that a sunscription request is sent to an offline user (since offline
// presences are not stored in "barePresences"). However, this not optimal
// algorithm shouldn't bother the user since the user's server should reply
// when already subscribed to the user's presence instead of asking the user
// to accept the subscription request
// presences are not stored in the service's "barePresences"). However, this
// not optimal algorithm shouldn't bother the user since the user's server
// should reply when already subscribed to the user's presence instead of
// asking the user to accept the subscription request.
}
}
......@@ -1857,34 +1815,38 @@ public class PubSubEngine {
* beginning after the specified delay. Subsequent executions take place
* at approximately regular intervals separated by the specified period.
*
* @param service the PubSub service this action is to be performed for.
* @param timeout the new frequency of the maintenance task.
*/
void setPublishedItemTaskTimeout(int timeout) {
if (this.items_task_timeout == timeout) {
void setPublishedItemTaskTimeout(PubSubService service, int timeout) {
int items_task_timeout = service.getItemsTaskTimeout();
if (items_task_timeout == timeout) {
return;
}
// Cancel the existing task because the timeout has changed
PublishedItemTask publishedItemTask = service.getPublishedItemTask();
if (publishedItemTask != null) {
publishedItemTask.cancel();
}
this.items_task_timeout = timeout;
service.setItemsTaskTimeout(timeout);
// Create a new task and schedule it with the new timeout
publishedItemTask = new PublishedItemTask();
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
service.setPublishedItemTask(new PublishedItemTask(service));
service.getTimer().schedule(publishedItemTask, items_task_timeout, items_task_timeout);
}
/**
* Adds the item to the queue of items to remove from the database. The queue is going
* to be processed by another thread.
*
* @param service the PubSub service this action is to be performed for.
* @param removedItem the item to remove from the database.
*/
void queueItemToRemove(PublishedItem removedItem) {
public static void queueItemToRemove(PubSubService service, PublishedItem removedItem) {
// Remove the removed item from the queue of items to add to the database
if (!itemsToAdd.remove(removedItem)) {
if (!service.getItemsToAdd().remove(removedItem)) {
// The item is already present in the database so add the removed item
// to the queue of items to delete from the database
itemsToDelete.add(removedItem);
service.getItemsToDelete().add(removedItem);
}
}
......@@ -1892,10 +1854,11 @@ public class PubSubEngine {
* Adds the item to the queue of items to add to the database. The queue is going
* to be processed by another thread.
*
* @param service the PubSub service this action is to be performed for.
* @param newItem the item to add to the database.
*/
void queueItemToAdd(PublishedItem newItem) {
itemsToAdd.add(newItem);
public static void queueItemToAdd(PubSubService service, PublishedItem newItem) {
service.getItemsToAdd().add(newItem);
}
/**
......@@ -1903,44 +1866,13 @@ public class PubSubEngine {
* usually required when a node was deleted so any pending operation of the node items
* should be cancelled.
*
* @param service the PubSub service this action is to be performed for.
* @param items the list of items to remove the from queues.
*/
void cancelQueuedItems(Collection<PublishedItem> items) {
void cancelQueuedItems(PubSubService service, Collection<PublishedItem> items) {
for (PublishedItem item : items) {
itemsToAdd.remove(item);
itemsToDelete.remove(item);
}
}
private class PublishedItemTask extends TimerTask {
public void run() {
try {
PublishedItem entry;
boolean success;
// Delete from the database items contained in the itemsToDelete queue
for (int index = 0; index <= items_batch_size && !itemsToDelete.isEmpty(); index++) {
entry = itemsToDelete.poll();
if (entry != null) {
success = PubSubPersistenceManager.removePublishedItem(service, entry);
if (!success) {
itemsToDelete.add(entry);
}
}
}
// Save to the database items contained in the itemsToAdd queue
for (int index = 0; index <= items_batch_size && !itemsToAdd.isEmpty(); index++) {
entry = itemsToAdd.poll();
if (entry != null) {
success = PubSubPersistenceManager.createPublishedItem(service, entry);
if (!success) {
itemsToAdd.add(entry);
}
}
}
}
catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
service.getItemsToAdd().remove(item);
service.getItemsToDelete().remove(item);
}
}
......
......@@ -19,6 +19,7 @@ import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.commands.AdHocCommandManager;
import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.disco.DiscoInfoProvider;
......@@ -39,6 +40,7 @@ import org.xmpp.packet.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Module that implements JEP-60: Publish-Subscribe. By default node collections and
......@@ -63,6 +65,47 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
* Nodes managed by this manager, table: key nodeID (String); value Node
*/
private Map<String, Node> nodes = new ConcurrentHashMap<String, Node>();
/**
* Keep a registry of the presence's show value of users that subscribed to a node of
* the pubsub service and for which the node only delivers notifications for online users
* or node subscriptions deliver events based on the user presence show value. Offline
* users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key
* is full JID of connected resource and value is show value of the last received presence.
*/
private Map<String, Map<String, String>> barePresences =
new ConcurrentHashMap<String, Map<String, String>>();
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>();
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>();
/**
* Manager that keeps the list of ad-hoc commands and processing command requests.
*/
private AdHocCommandManager manager;
/**
* The time to elapse between each execution of the maintenance process. Default
* is 2 minutes.
*/
private int items_task_timeout = 2 * 60 * 1000;
/**
* Task that saves or deletes published items from the database.
*/
private PublishedItemTask publishedItemTask;
/**
* Timer to save published items to the database or remove deleted or old items.
*/
private Timer timer = new Timer("PubSub maintenance");
/**
* Returns the permission policy for creating nodes. A true value means that not anyone can
* create a node, only the JIDs listed in <code>allowedToCreate</code> are allowed to create
......@@ -117,6 +160,15 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
public PubSubModule() {
super("Publish Subscribe Service");
// Initialize the ad-hoc commands manager to use for this pubsub service
manager = new AdHocCommandManager();
manager.addCommand(new PendingSubscriptionsCommand(this));
// Save or delete published items from the database every 2 minutes starting in
// 2 minutes (default values)
publishedItemTask = new PublishedItemTask(this);
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
}
public void process(Packet packet) {
......@@ -127,15 +179,15 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
try {
// Check if the packet is a disco request or a packet with namespace iq:register
if (packet instanceof IQ) {
if (!engine.process((IQ) packet)) {
if (!engine.process(this, (IQ) packet)) {
process((IQ) packet);
}
}
else if (packet instanceof Presence) {
engine.process((Presence) packet);
engine.process(this, (Presence) packet);
}
else {
engine.process((Message) packet);
engine.process(this, (Message) packet);
}
}
catch (Exception e) {
......@@ -216,23 +268,23 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
public Collection<String> getShowPresences(JID subscriber) {
return engine.getShowPresences(subscriber);
return PubSubEngine.getShowPresences(this, subscriber);
}
public void presenceSubscriptionNotRequired(Node node, JID user) {
engine.presenceSubscriptionNotRequired(node, user);
PubSubEngine.presenceSubscriptionNotRequired(this, node, user);
}
public void presenceSubscriptionRequired(Node node, JID user) {
engine.presenceSubscriptionRequired(node, user);
PubSubEngine.presenceSubscriptionRequired(this, node, user);
}
public void queueItemToAdd(PublishedItem newItem) {
engine.queueItemToAdd(newItem);
PubSubEngine.queueItemToAdd(this, newItem);
}
public void queueItemToRemove(PublishedItem removedItem) {
engine.queueItemToRemove(removedItem);
PubSubEngine.queueItemToRemove(this, removedItem);
}
public String getServiceName() {
......@@ -334,7 +386,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
routingTable = server.getRoutingTable();
router = server.getPacketRouter();
engine = new PubSubEngine(this, server.getPacketRouter());
engine = new PubSubEngine(server.getPacketRouter());
// Load default configuration for leaf nodes
leafDefaultConfiguration = PubSubPersistenceManager.loadDefaultConfiguration(this, true);
......@@ -406,7 +458,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
// Add the route to this service
routingTable.addComponentRoute(getAddress(), this);
// Start the pubsub engine
engine.start();
engine.start(this);
ArrayList<String> params = new ArrayList<String>();
params.clear();
params.add(getServiceDomain());
......@@ -419,7 +471,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
routingTable.removeComponentRoute(getAddress());
// Stop the pubsub engine. This will gives us the chance to
// save queued items to the database.
engine.shutdown();
engine.shutdown(this);
}
private void enableService(boolean enabled) {
......@@ -750,4 +802,40 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
return buf.toString();
}
public Map<String, Map<String, String>> getBarePresences() {
return barePresences;
}
public Queue<PublishedItem> getItemsToAdd() {
return itemsToAdd;
}
public Queue<PublishedItem> getItemsToDelete() {
return itemsToDelete;
}
public AdHocCommandManager getManager() {
return manager;
}
public PublishedItemTask getPublishedItemTask() {
return publishedItemTask;
}
public void setPublishedItemTask(PublishedItemTask task) {
publishedItemTask = task;
}
public Timer getTimer() {
return timer;
}
public int getItemsTaskTimeout() {
return items_task_timeout;
}
public void setItemsTaskTimeout(int timeout) {
items_task_timeout = timeout;
}
}
......@@ -11,11 +11,15 @@
package org.jivesoftware.openfire.pubsub;
import org.jivesoftware.openfire.commands.AdHocCommandManager;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
/**
* A PubSubService is responsible for keeping the hosted nodes by the service, the default
......@@ -50,6 +54,18 @@ public interface PubSubService {
*/
String getServiceID();
/**
* Returns a registry of the presence's show value of users that subscribed to a node of
* the pubsub service and for which the node only delivers notifications for online users
* or node subscriptions deliver events based on the user presence show value. Offline
* users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key
* is full JID of connected resource and value is show value of the last received presence.
*
* @return a registry of the presence's show value of users that subscribed to a node
* of the pubsub service.
*/
Map<String, Map<String, String>> getBarePresences();
/**
* Returns true if the pubsub service allows the specified user to create nodes.
*
......@@ -222,6 +238,41 @@ public interface PubSubService {
*/
void queueItemToAdd(PublishedItem newItem);
/**
* Gets the queue that holds the items that need to be added to the database.
*
* @return the queue that holds the items that need to be added to the database.
*/
Queue<PublishedItem> getItemsToAdd();
/**
* Gets the queue that holds the items that need to be deleted from the database.
*
* @return the queue that holds the items that need to be deleted from the database.
*/
Queue<PublishedItem> getItemsToDelete();
/**
* Returns the ad-hoc commands manager used for this service.
*
* @return the ad-hoc commands manager used for this service.
*/
AdHocCommandManager getManager();
/**
* Returns the published item task used for this service.
*
* @return the published item task used for this service.
*/
PublishedItemTask getPublishedItemTask();
/**
* Sets the published item task used for this service.
*
* @param task the PublishedItemTask to set for this service.
*/
void setPublishedItemTask(PublishedItemTask task);
/**
* Adds the item to the queue of items to remove from the database. The queue is going
* to be processed by another thread.
......@@ -229,4 +280,26 @@ public interface PubSubService {
* @param removedItem the item to remove from the database.
*/
void queueItemToRemove(PublishedItem removedItem);
/**
* Returns the timer used for the maintenance process of this service.
*
* @return the timer used for the maintenance process of this service.
*/
Timer getTimer();
/**
* Returns the timeout value for the published items maintenance task.
*
* @return the timeout value for the published items maintenance task.
*/
int getItemsTaskTimeout();
/**
* Sets the timeout value for the published items maintenance task.
*
* @param timeout the timeout value for the published items maintenance task.
*/
void setItemsTaskTimeout(int timeout);
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.pubsub;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import java.util.Queue;
import java.util.TimerTask;
/**
* A timed maintenance task that updates the database by adding and/or
* removing <code>PublishedItem</code>s in regular intervals.
*
* @author Matt Tucker
*/
public class PublishedItemTask extends TimerTask {
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = null;
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = null;
/**
* The service to perform the published item tasks on.
*/
private PubSubService service = null;
/**
* The number of items to save on each run of the maintenance process.
*/
private int items_batch_size = 50;
public PublishedItemTask(PubSubService service) {
this.service = service;
this.itemsToAdd = service.getItemsToAdd();
this.itemsToDelete = service.getItemsToDelete();
}
public void run() {
try {
PublishedItem entry;
boolean success;
// Delete from the database items contained in the itemsToDelete queue
for (int index = 0; index <= items_batch_size && !itemsToDelete.isEmpty(); index++) {
entry = itemsToDelete.poll();
if (entry != null) {
success = PubSubPersistenceManager.removePublishedItem(service, entry);
if (!success) {
itemsToDelete.add(entry);
}
}
}
// Save to the database items contained in the itemsToAdd queue
for (int index = 0; index <= items_batch_size && !itemsToAdd.isEmpty(); index++) {
entry = itemsToAdd.poll();
if (entry != null) {
success = PubSubPersistenceManager.createPublishedItem(service, entry);
if (!success) {
itemsToAdd.add(entry);
}
}
}
} catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment