Commit 40aa7bd0 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Refactoring to add PEP support. JM-1122

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@9158 b35dd754-fafc-0310-a699-88a17e54d16e
parent 02095e50
......@@ -22,10 +22,7 @@ import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.container.AdminConsolePlugin;
import org.jivesoftware.openfire.container.Module;
import org.jivesoftware.openfire.container.PluginManager;
import org.jivesoftware.openfire.disco.IQDiscoInfoHandler;
import org.jivesoftware.openfire.disco.IQDiscoItemsHandler;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.disco.ServerItemsProvider;
import org.jivesoftware.openfire.disco.*;
import org.jivesoftware.openfire.filetransfer.DefaultFileTransferManager;
import org.jivesoftware.openfire.filetransfer.FileTransferManager;
import org.jivesoftware.openfire.filetransfer.proxy.FileTransferProxy;
......@@ -36,6 +33,8 @@ import org.jivesoftware.openfire.muc.spi.MultiUserChatServerImpl;
import org.jivesoftware.openfire.net.MulticastDNSService;
import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.ServerTrafficCounter;
import org.jivesoftware.openfire.pep.IQPEPHandler;
import org.jivesoftware.openfire.pep.IQPEPOwnerHandler;
import org.jivesoftware.openfire.pubsub.PubSubModule;
import org.jivesoftware.openfire.roster.RosterManager;
import org.jivesoftware.openfire.session.RemoteSessionLocator;
......@@ -98,6 +97,7 @@ public class XMPPServer {
private Version version;
private Date startDate;
private boolean initialized = false;
private boolean started = false;
private NodeID nodeID;
private static final NodeID DEFAULT_NODE_ID = NodeID.getInstance(new byte[0]);
......@@ -451,6 +451,8 @@ public class XMPPServer {
Log.info(startupBanner);
System.out.println(startupBanner);
started = true;
// Notify server listeners that the server has been started
for (XMPPServerListener listener : listeners) {
listener.serverStarted();
......@@ -497,9 +499,9 @@ public class XMPPServer {
loadModule(IQLastActivityHandler.class.getName());
loadModule(PresenceSubscribeHandler.class.getName());
loadModule(PresenceUpdateHandler.class.getName());
loadModule(IQDiscoInfoHandler.class.getName());
loadModule(IQDiscoItemsHandler.class.getName());
loadModule(IQOfflineMessagesHandler.class.getName());
loadModule(IQPEPHandler.class.getName());
loadModule(IQPEPOwnerHandler.class.getName());
loadModule(MultiUserChatServerImpl.class.getName());
loadModule(MulticastDNSService.class.getName());
loadModule(IQSharedGroupHandler.class.getName());
......@@ -510,6 +512,8 @@ public class XMPPServer {
loadModule(MediaProxyService.class.getName());
loadModule(STUNService.class.getName());
loadModule(PubSubModule.class.getName());
loadModule(IQDiscoInfoHandler.class.getName());
loadModule(IQDiscoItemsHandler.class.getName());
loadModule(UpdateManager.class.getName());
loadModule(InternalComponentManager.class.getName());
// Load this module always last since we don't want to start listening for clients
......@@ -1028,6 +1032,17 @@ public class XMPPServer {
return (IQAuthHandler) modules.get(IQAuthHandler.class);
}
/**
* Returns the <code>IQPEPHandler</code> registered with this server. The
* <code>IQPEPHandler</code> was registered with the server as a module while starting up
* the server.
*
* @return the <code>IQPEPHandler</code> registered with this server.
*/
public IQPEPHandler getIQPEPHandler() {
return (IQPEPHandler) modules.get(IQPEPHandler.class);
}
/**
* Returns the <code>PluginManager</code> instance registered with this server.
*
......@@ -1199,6 +1214,21 @@ public class XMPPServer {
return answer;
}
/**
* Returns a list with all the modules that provide "discoverable" identities.
*
* @return a list with all the modules that provide "discoverable" identities.
*/
public List<ServerIdentitiesProvider> getServerIdentitiesProviders() {
List<ServerIdentitiesProvider> answer = new ArrayList<ServerIdentitiesProvider>();
for (Module module : modules.values()) {
if (module instanceof ServerIdentitiesProvider) {
answer.add((ServerIdentitiesProvider) module);
}
}
return answer;
}
/**
* Returns a list with all the modules that provide "discoverable" items associated with
* the server.
......@@ -1216,6 +1246,38 @@ public class XMPPServer {
return answer;
}
/**
* Returns a list with all the modules that provide "discoverable" user identities.
*
* @return a list with all the modules that provide "discoverable" user identities.
*/
public List<UserIdentitiesProvider> getUserIdentitiesProviders() {
List<UserIdentitiesProvider> answer = new ArrayList<UserIdentitiesProvider>();
for (Module module : modules.values()) {
if (module instanceof UserIdentitiesProvider) {
answer.add((UserIdentitiesProvider) module);
}
}
return answer;
}
/**
* Returns a list with all the modules that provide "discoverable" items associated with
* users.
*
* @return a list with all the modules that provide "discoverable" items associated with
* users.
*/
public List<UserItemsProvider> getUserItemsProviders() {
List<UserItemsProvider> answer = new ArrayList<UserItemsProvider>();
for (Module module : modules.values()) {
if (module instanceof UserItemsProvider) {
answer.add((UserItemsProvider) module);
}
}
return answer;
}
/**
* Returns the <code>IQDiscoInfoHandler</code> registered with this server. The
* <code>IQDiscoInfoHandler</code> was registered with the server as a module while starting up
......@@ -1355,4 +1417,13 @@ public class XMPPServer {
public void setRemoteSessionLocator(RemoteSessionLocator remoteSessionLocator) {
this.remoteSessionLocator = remoteSessionLocator;
}
/**
* Returns whether or not the server has been started.
*
* @return whether or not the server has been started.
*/
public boolean isStarted() {
return started;
}
}
......@@ -11,12 +11,12 @@
package org.jivesoftware.openfire.pubsub;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.forms.FormField;
import org.xmpp.forms.DataForm;
import org.jivesoftware.util.LocaleUtils;
import org.dom4j.Element;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
......@@ -51,7 +51,7 @@ public class CollectionNode extends Node {
*/
private int maxLeafNodes = -1;
CollectionNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
public CollectionNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
super(service, parentNode, nodeID, creator);
// Configure node with default values (get them from the pubsub service)
DefaultNodeConfiguration defaultConfiguration = service.getDefaultNodeConfiguration(false);
......
......@@ -61,7 +61,7 @@ public class LeafNode extends Node {
// TODO Add checking of max payload size. Return <not-acceptable> plus a application specific error condition of <payload-too-big/>.
LeafNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
public LeafNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
super(service, parentNode, nodeID, creator);
// Configure node with default values (get them from the pubsub service)
DefaultNodeConfiguration defaultConfiguration = service.getDefaultNodeConfiguration(true);
......
......@@ -12,10 +12,10 @@
package org.jivesoftware.openfire.pubsub;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.IQ;
......@@ -1735,7 +1735,7 @@ public abstract class Node {
* @throws IllegalStateException If this message was used when the node supports multiple
* subscriptions.
*/
NodeSubscription getSubscription(JID subscriberJID) {
public NodeSubscription getSubscription(JID subscriberJID) {
// Check that node does not support multiple subscriptions
if (isMultipleSubscriptionsEnabled()) {
throw new IllegalStateException("Multiple subscriptions is enabled so subscriptions " +
......
......@@ -88,6 +88,14 @@ public class NodeAffiliate {
Element items = event.addElement("items");
items.addAttribute("node", getNode().getNodeID());
for (PublishedItem publishedItem : itemsBySubs.get(nodeSubscriptions)) {
// FIXME: This was added for compatibility with PEP supporting clients.
// Alternate solution needed when XEP-0163 version > 1.0 is released.
//
// If the node ID looks like a JID, replace it with the published item's node ID.
if (getNode().getNodeID().indexOf("@") >= 0) {
items.addAttribute("node", publishedItem.getNode().getNodeID());
}
// Add item information to the event notification
Element item = items.addElement("item");
if (leafNode.isItemRequired()) {
......
......@@ -129,7 +129,7 @@ public class NodeSubscription {
dateFormat = new SimpleDateFormat("yyyy-MM-DD'T'HH:mm:ss.SSS'Z'");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
fastDateFormat = FastDateFormat
.getInstance("yyyy-MM-DD'T'HH:mm:ss.SSS'Z'", TimeZone.getTimeZone("UTC"));
.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", TimeZone.getTimeZone("UTC"));
}
/**
......@@ -613,7 +613,7 @@ public class NodeSubscription {
* @return true if an event notification can be sent to the subscriber for the specified
* published item.
*/
boolean canSendPublicationEvent(LeafNode leafNode, PublishedItem publishedItem) {
public boolean canSendPublicationEvent(LeafNode leafNode, PublishedItem publishedItem) {
if (!canSendEvents()) {
return false;
}
......
......@@ -11,9 +11,9 @@
package org.jivesoftware.openfire.pubsub;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.openfire.commands.AdHocCommand;
import org.jivesoftware.openfire.commands.SessionData;
import org.jivesoftware.util.LocaleUtils;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.JID;
......@@ -26,11 +26,11 @@ import java.util.List;
*
* @author Matt Tucker
*/
class PendingSubscriptionsCommand extends AdHocCommand {
public class PendingSubscriptionsCommand extends AdHocCommand {
private PubSubService service;
PendingSubscriptionsCommand(PubSubService service) {
public PendingSubscriptionsCommand(PubSubService service) {
this.service = service;
}
......
......@@ -19,6 +19,7 @@ import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.commands.AdHocCommandManager;
import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.disco.DiscoInfoProvider;
......@@ -39,6 +40,7 @@ import org.xmpp.packet.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Module that implements JEP-60: Publish-Subscribe. By default node collections and
......@@ -63,6 +65,47 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
* Nodes managed by this manager, table: key nodeID (String); value Node
*/
private Map<String, Node> nodes = new ConcurrentHashMap<String, Node>();
/**
* Keep a registry of the presence's show value of users that subscribed to a node of
* the pubsub service and for which the node only delivers notifications for online users
* or node subscriptions deliver events based on the user presence show value. Offline
* users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key
* is full JID of connected resource and value is show value of the last received presence.
*/
private Map<String, Map<String, String>> barePresences =
new ConcurrentHashMap<String, Map<String, String>>();
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>();
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>();
/**
* Manager that keeps the list of ad-hoc commands and processing command requests.
*/
private AdHocCommandManager manager;
/**
* The time to elapse between each execution of the maintenance process. Default
* is 2 minutes.
*/
private int items_task_timeout = 2 * 60 * 1000;
/**
* Task that saves or deletes published items from the database.
*/
private PublishedItemTask publishedItemTask;
/**
* Timer to save published items to the database or remove deleted or old items.
*/
private Timer timer = new Timer("PubSub maintenance");
/**
* Returns the permission policy for creating nodes. A true value means that not anyone can
* create a node, only the JIDs listed in <code>allowedToCreate</code> are allowed to create
......@@ -117,6 +160,15 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
public PubSubModule() {
super("Publish Subscribe Service");
// Initialize the ad-hoc commands manager to use for this pubsub service
manager = new AdHocCommandManager();
manager.addCommand(new PendingSubscriptionsCommand(this));
// Save or delete published items from the database every 2 minutes starting in
// 2 minutes (default values)
publishedItemTask = new PublishedItemTask(this);
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
}
public void process(Packet packet) {
......@@ -127,15 +179,15 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
try {
// Check if the packet is a disco request or a packet with namespace iq:register
if (packet instanceof IQ) {
if (!engine.process((IQ) packet)) {
if (!engine.process(this, (IQ) packet)) {
process((IQ) packet);
}
}
else if (packet instanceof Presence) {
engine.process((Presence) packet);
engine.process(this, (Presence) packet);
}
else {
engine.process((Message) packet);
engine.process(this, (Message) packet);
}
}
catch (Exception e) {
......@@ -216,23 +268,23 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
public Collection<String> getShowPresences(JID subscriber) {
return engine.getShowPresences(subscriber);
return PubSubEngine.getShowPresences(this, subscriber);
}
public void presenceSubscriptionNotRequired(Node node, JID user) {
engine.presenceSubscriptionNotRequired(node, user);
PubSubEngine.presenceSubscriptionNotRequired(this, node, user);
}
public void presenceSubscriptionRequired(Node node, JID user) {
engine.presenceSubscriptionRequired(node, user);
PubSubEngine.presenceSubscriptionRequired(this, node, user);
}
public void queueItemToAdd(PublishedItem newItem) {
engine.queueItemToAdd(newItem);
PubSubEngine.queueItemToAdd(this, newItem);
}
public void queueItemToRemove(PublishedItem removedItem) {
engine.queueItemToRemove(removedItem);
PubSubEngine.queueItemToRemove(this, removedItem);
}
public String getServiceName() {
......@@ -334,7 +386,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
routingTable = server.getRoutingTable();
router = server.getPacketRouter();
engine = new PubSubEngine(this, server.getPacketRouter());
engine = new PubSubEngine(server.getPacketRouter());
// Load default configuration for leaf nodes
leafDefaultConfiguration = PubSubPersistenceManager.loadDefaultConfiguration(this, true);
......@@ -406,7 +458,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
// Add the route to this service
routingTable.addComponentRoute(getAddress(), this);
// Start the pubsub engine
engine.start();
engine.start(this);
ArrayList<String> params = new ArrayList<String>();
params.clear();
params.add(getServiceDomain());
......@@ -419,7 +471,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
routingTable.removeComponentRoute(getAddress());
// Stop the pubsub engine. This will gives us the chance to
// save queued items to the database.
engine.shutdown();
engine.shutdown(this);
}
private void enableService(boolean enabled) {
......@@ -750,4 +802,40 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
return buf.toString();
}
public Map<String, Map<String, String>> getBarePresences() {
return barePresences;
}
public Queue<PublishedItem> getItemsToAdd() {
return itemsToAdd;
}
public Queue<PublishedItem> getItemsToDelete() {
return itemsToDelete;
}
public AdHocCommandManager getManager() {
return manager;
}
public PublishedItemTask getPublishedItemTask() {
return publishedItemTask;
}
public void setPublishedItemTask(PublishedItemTask task) {
publishedItemTask = task;
}
public Timer getTimer() {
return timer;
}
public int getItemsTaskTimeout() {
return items_task_timeout;
}
public void setItemsTaskTimeout(int timeout) {
items_task_timeout = timeout;
}
}
......@@ -13,10 +13,10 @@ package org.jivesoftware.openfire.pubsub;
import org.dom4j.io.SAXReader;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.xmpp.packet.JID;
import java.io.StringReader;
......@@ -35,14 +35,22 @@ import java.util.concurrent.LinkedBlockingQueue;
*/
public class PubSubPersistenceManager {
private static final String LOAD_NODES =
private static final String LOAD_NON_LEAF_NODES =
"SELECT nodeID, leaf, creationDate, modificationDate, parent, deliverPayloads, " +
"maxPayloadSize, persistItems, maxItems, notifyConfigChanges, notifyDelete, " +
"notifyRetract, presenceBased, sendItemSubscribe, publisherModel, " +
"subscriptionEnabled, configSubscription, accessModel, payloadType, " +
"bodyXSLT, dataformXSLT, creator, description, language, name, " +
"replyPolicy, associationPolicy, maxLeafNodes FROM pubsubNode " +
"WHERE serviceID=? AND leaf=0 ORDER BY nodeID";
private static final String LOAD_LEAF_NODES =
"SELECT nodeID, leaf, creationDate, modificationDate, parent, deliverPayloads, " +
"maxPayloadSize, persistItems, maxItems, notifyConfigChanges, notifyDelete, " +
"notifyRetract, presenceBased, sendItemSubscribe, publisherModel, " +
"subscriptionEnabled, configSubscription, accessModel, payloadType, " +
"bodyXSLT, dataformXSLT, creator, description, language, name, " +
"replyPolicy, associationPolicy, maxLeafNodes FROM pubsubNode " +
"WHERE serviceID=? ORDER BY nodeID";
"WHERE serviceID=? AND leaf=1 ORDER BY nodeID";
private static final String UPDATE_NODE =
"UPDATE pubsubNode SET modificationDate=?, parent=?, deliverPayloads=?, " +
"maxPayloadSize=?, persistItems=?, maxItems=?, " +
......@@ -446,11 +454,22 @@ public class PubSubPersistenceManager {
Map<String, Node> nodes = new HashMap<String, Node>();
try {
con = DbConnectionManager.getConnection();
// Get all nodes at once (with 1 query)
pstmt = con.prepareStatement(LOAD_NODES);
// Get all non-leaf nodes (to ensure parent nodes are loaded before their children)
pstmt = con.prepareStatement(LOAD_NON_LEAF_NODES);
pstmt.setString(1, service.getServiceID());
ResultSet rs = pstmt.executeQuery();
// Rebuild all loaded nodes
// Rebuild loaded non-leaf nodes
while(rs.next()) {
loadNode(service, nodes, rs);
}
rs.close();
pstmt.close();
// Get all leaf nodes (remaining unloaded nodes)
pstmt = con.prepareStatement(LOAD_LEAF_NODES);
pstmt.setString(1, service.getServiceID());
rs = pstmt.executeQuery();
// Rebuild loaded leaf nodes
while(rs.next()) {
loadNode(service, nodes, rs);
}
......
......@@ -11,11 +11,15 @@
package org.jivesoftware.openfire.pubsub;
import org.jivesoftware.openfire.commands.AdHocCommandManager;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
/**
* A PubSubService is responsible for keeping the hosted nodes by the service, the default
......@@ -50,6 +54,18 @@ public interface PubSubService {
*/
String getServiceID();
/**
* Returns a registry of the presence's show value of users that subscribed to a node of
* the pubsub service and for which the node only delivers notifications for online users
* or node subscriptions deliver events based on the user presence show value. Offline
* users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key
* is full JID of connected resource and value is show value of the last received presence.
*
* @return a registry of the presence's show value of users that subscribed to a node
* of the pubsub service.
*/
Map<String, Map<String, String>> getBarePresences();
/**
* Returns true if the pubsub service allows the specified user to create nodes.
*
......@@ -222,6 +238,41 @@ public interface PubSubService {
*/
void queueItemToAdd(PublishedItem newItem);
/**
* Gets the queue that holds the items that need to be added to the database.
*
* @return the queue that holds the items that need to be added to the database.
*/
Queue<PublishedItem> getItemsToAdd();
/**
* Gets the queue that holds the items that need to be deleted from the database.
*
* @return the queue that holds the items that need to be deleted from the database.
*/
Queue<PublishedItem> getItemsToDelete();
/**
* Returns the ad-hoc commands manager used for this service.
*
* @return the ad-hoc commands manager used for this service.
*/
AdHocCommandManager getManager();
/**
* Returns the published item task used for this service.
*
* @return the published item task used for this service.
*/
PublishedItemTask getPublishedItemTask();
/**
* Sets the published item task used for this service.
*
* @param task the PublishedItemTask to set for this service.
*/
void setPublishedItemTask(PublishedItemTask task);
/**
* Adds the item to the queue of items to remove from the database. The queue is going
* to be processed by another thread.
......@@ -229,4 +280,26 @@ public interface PubSubService {
* @param removedItem the item to remove from the database.
*/
void queueItemToRemove(PublishedItem removedItem);
/**
* Returns the timer used for the maintenance process of this service.
*
* @return the timer used for the maintenance process of this service.
*/
Timer getTimer();
/**
* Returns the timeout value for the published items maintenance task.
*
* @return the timeout value for the published items maintenance task.
*/
int getItemsTaskTimeout();
/**
* Sets the timeout value for the published items maintenance task.
*
* @param timeout the timeout value for the published items maintenance task.
*/
void setItemsTaskTimeout(int timeout);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment