Commit 02095e50 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Initial version. JM-1122

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@9157 b35dd754-fafc-0310-a699-88a17e54d16e
parent c085d764
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.pep;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.disco.ServerIdentitiesProvider;
import org.jivesoftware.openfire.disco.UserIdentitiesProvider;
import org.jivesoftware.openfire.disco.UserItemsProvider;
import org.jivesoftware.openfire.event.UserEventDispatcher;
import org.jivesoftware.openfire.event.UserEventListener;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.pubsub.*;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.roster.Roster;
import org.jivesoftware.openfire.roster.RosterEventDispatcher;
import org.jivesoftware.openfire.roster.RosterEventListener;
import org.jivesoftware.openfire.roster.RosterItem;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.user.*;
import org.jivesoftware.util.Log;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* <p>
* An IQHandler used to implement XEP-0163: "Personal Eventing via Pubsub."
* </p>
*
* <p>
* For each user on the server there is an associated PEPService interacting
* with a single PubSubEngine for managing the user's PEP nodes.
* </p>
*
* <p>
* An IQHandler can only handle one namespace in its IQHandlerInfo. However, PEP
* related packets are seen having a variety of different namespaces. Thus,
* classes like IQPEPOwnerHandler are used to forward packets having these other
* namespaces to IQPEPHandler.handleIQ().
* <p>
*
* <p>
* This handler is used for the following namespaces:
* <ul>
* <li><i>http://jabber.org/protocol/pubsub</i></li>
* <li><i>http://jabber.org/protocol/pubsub#owner</i></li>
* </ul>
* </p>
*
* @author Armando Jagucki
*
*/
public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ServerFeaturesProvider,
UserIdentitiesProvider, UserItemsProvider, PresenceEventListener, RemotePresenceEventListener,
RosterEventListener, UserEventListener {
/**
* Map of PEP services. Table, Key: bare JID (String); Value: PEPService
*/
private Map<String, PEPService> pepServices;
/**
* Nodes to send filtered notifications for, table: key JID (String); value Set of nodes
*
* filteredNodesMap are used for Contact Notification Filtering as described in XEP-0163. The JID
* of a user is associated with a set of PEP node IDs they are interested in receiving notifications
* for.
*/
private Map<String, Set<String>> filteredNodesMap = new ConcurrentHashMap<String, Set<String>>();
private IQHandlerInfo info;
private PubSubEngine pubSubEngine = null;
/**
* A map of all known full JIDs that have sent presences from a remote server.
* table: key Bare JID (String); value Set of JIDs
*
* This map is convenient for sending notifications to the full JID of remote users
* that have sent available presences to the PEP service.
*/
private Map<String, Set<JID>> knownRemotePresences = new ConcurrentHashMap<String, Set<JID>>();
private static final String GET_PEP_SERVICES = "SELECT DISTINCT serviceID FROM pubsubNode";
public IQPEPHandler() {
super("Personal Eventing Handler");
pepServices = new ConcurrentHashMap<String, PEPService>();
info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub");
}
@Override
public void initialize(XMPPServer server) {
super.initialize(server);
// Listen to presence events to manage PEP auto-subscriptions.
PresenceEventDispatcher.addListener(this);
// Listen to remote presence events to manage the knownRemotePresences map.
RemotePresenceEventDispatcher.addListener(this);
// Listen to roster events for PEP subscription cancelling on contact deletion.
RosterEventDispatcher.addListener(this);
// Listen to user events in order to destroy a PEP service when a user is deleted.
UserEventDispatcher.addListener(this);
pubSubEngine = new PubSubEngine(server.getPacketRouter());
// Restore previous PEP services for which nodes exist in the database.
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
// Get all PEP services
pstmt = con.prepareStatement(GET_PEP_SERVICES);
ResultSet rs = pstmt.executeQuery();
// Restore old PEPServices
while (rs.next()) {
String serviceID = rs.getString(1);
// Create a new PEPService if serviceID looks like a bare JID.
if (serviceID.indexOf("@") >= 0) {
PEPService pepService = new PEPService(server, serviceID);
pepServices.put(serviceID, pepService);
if (Log.isDebugEnabled()) {
Log.debug("PEP: Restored service for " + serviceID + " from the database.");
}
}
}
rs.close();
pstmt.close();
}
catch (SQLException sqle) {
Log.error(sqle);
}
finally {
try {
if (pstmt != null)
pstmt.close();
}
catch (Exception e) {
Log.error(e);
}
try {
if (con != null)
con.close();
}
catch (Exception e) {
Log.error(e);
}
}
// TODO: This will need to be refactored once XEP-0115 (Entity Capabilities) is implemented.
/*
// Add this PEP handler as a packet interceptor so we may deal with
// client packets that send disco#info's explaining capabilities
// including PEP contact notification filters.
InterceptorManager.getInstance().addInterceptor(this);
*/
}
public void start() {
super.start();
for (PEPService service : pepServices.values()) {
pubSubEngine.start(service);
}
}
public void stop() {
super.stop();
for (PEPService service : pepServices.values()) {
pubSubEngine.shutdown(service);
}
}
public void destroy() {
super.destroy();
// Remove listeners
PresenceEventDispatcher.removeListener(this);
RemotePresenceEventDispatcher.removeListener(this);
RosterEventDispatcher.removeListener(this);
UserEventDispatcher.removeListener(this);
}
@Override
public IQHandlerInfo getInfo() {
return info;
}
/**
* Returns the filteredNodesMap.
*
* @return the filteredNodesMap
*/
public Map<String, Set<String>> getFilteredNodesMap() {
return filteredNodesMap;
}
/**
* Returns the knownRemotePresences map.
*
* @return the knownRemotePresences map
*/
public Map<String, Set<JID>> getKnownRemotePresenes() {
return knownRemotePresences;
}
@Override
public IQ handleIQ(IQ packet) throws UnauthorizedException {
JID senderJID = packet.getFrom();
if (packet.getTo() == null && packet.getType() == IQ.Type.set) {
String jidFrom = senderJID.toBareJID();
PEPService pepService = pepServices.get(jidFrom);
// If no service exists yet for jidFrom, create one.
if (pepService == null) {
// Return an error if the packet is from an anonymous, unregistered user
// or remote user
if (!XMPPServer.getInstance().isLocal(senderJID) ||
!UserManager.getInstance().isRegisteredUser(senderJID.getNode())) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(PacketError.Condition.not_allowed);
return reply;
}
pepService = new PEPService(XMPPServer.getInstance(), jidFrom);
pepServices.put(jidFrom, pepService);
// Probe presences
pubSubEngine.start(pepService);
if (Log.isDebugEnabled()) {
Log.debug("PEP: " + jidFrom + " had a PEPService created");
}
// Those who already have presence subscriptions to jidFrom
// will now automatically be subscribed to this new PEPService.
try {
Roster roster = XMPPServer.getInstance().getRosterManager().getRoster(senderJID.getNode());
for (RosterItem item : roster.getRosterItems()) {
if (item.getSubStatus() == RosterItem.SUB_BOTH || item.getSubStatus() == RosterItem.SUB_FROM) {
createSubscriptionToPEPService(pepService, item.getJid(), senderJID);
}
}
}
catch (UserNotFoundException e) {
// Do nothing
}
}
// If publishing a node, and the node doesn't exist, create it.
Element childElement = packet.getChildElement();
Element publishElement = childElement.element("publish");
if (publishElement != null) {
String nodeID = publishElement.attributeValue("node");
// Do not allow User Avatar nodes to be created.
// TODO: Implement XEP-0084
if (nodeID.startsWith("http://www.xmpp.org/extensions/xep-0084.html")) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(PacketError.Condition.feature_not_implemented);
return reply;
}
if (pepService.getNode(nodeID) == null) {
// Create the node
JID creator = new JID(jidFrom);
LeafNode newNode = new LeafNode(pepService, pepService.getRootCollectionNode(), nodeID, creator);
newNode.addOwner(creator);
newNode.saveToDB();
if (Log.isDebugEnabled()) {
Log.debug("PEP: Created node ('" + newNode.getNodeID() + "') for " + jidFrom);
}
}
}
// Process with PubSub as usual.
pubSubEngine.process(pepService, packet);
}
else if (packet.getType() == IQ.Type.get || packet.getType() == IQ.Type.set) {
String jidTo = packet.getTo().toBareJID();
PEPService pepService = pepServices.get(jidTo);
if (pepService != null) {
pubSubEngine.process(pepService, packet);
}
else {
// Process with PubSub using a dummyService.
PEPService dummyService = new PEPService(XMPPServer.getInstance(), senderJID.toBareJID());
pubSubEngine.process(dummyService, packet);
}
}
else {
// Ignore IQ packets of type 'error' or 'result'.
return null;
}
// Other error flows are handled in pubSubEngine.process(...)
return null;
}
/**
* Generates and processes an IQ stanza that subscribes to a PEP service.
*
* @param pepService the PEP service of the owner.
* @param subscriber the JID of the entity that is subscribing to the PEP service.
* @param owner the JID of the owner of the PEP service.
*/
private void createSubscriptionToPEPService(PEPService pepService, JID subscriber, JID owner) {
// If `owner` has a PEP service, generate and process a pubsub subscription packet
// that is equivalent to: (where 'from' field is JID of subscriber and 'to' field is JID of owner)
//
// <iq type='set'
// from='nurse@capulet.com/chamber'
// to='juliet@capulet.com
// id='collsub'>
// <pubsub xmlns='http://jabber.org/protocol/pubsub'>
// <subscribe jid='nurse@capulet.com'/>
// <options>
// <x xmlns='jabber:x:data'>
// <field var='FORM_TYPE' type='hidden'>
// <value>http://jabber.org/protocol/pubsub#subscribe_options</value>
// </field>
// <field var='pubsub#subscription_type'>
// <value>items</value>
// </field>
// <field var='pubsub#subscription_depth'>
// <value>all</value>
// </field>
// </x>
// </options>
// </pubsub>
// </iq>
IQ subscriptionPacket = new IQ(IQ.Type.set);
subscriptionPacket.setFrom(subscriber);
subscriptionPacket.setTo(owner.toBareJID());
Element pubsubElement = subscriptionPacket.setChildElement("pubsub", "http://jabber.org/protocol/pubsub");
Element subscribeElement = pubsubElement.addElement("subscribe");
subscribeElement.addAttribute("jid", subscriber.toBareJID());
Element optionsElement = pubsubElement.addElement("options");
Element xElement = optionsElement.addElement(QName.get("x", "jabber:x:data"));
DataForm dataForm = new DataForm(xElement);
FormField formField = dataForm.addField();
formField.setVariable("FORM_TYPE");
formField.setType(FormField.Type.hidden);
formField.addValue("http://jabber.org/protocol/pubsub#subscribe_options");
formField = dataForm.addField();
formField.setVariable("pubsub#subscription_type");
formField.addValue("items");
formField = dataForm.addField();
formField.setVariable("pubsub#subscription_depth");
formField.addValue("all");
pubSubEngine.process(pepService, subscriptionPacket);
}
/**
* Cancels a subscription to a PEPService's root collection node.
*
* @param unsubscriber the JID of the subscriber whose subscription is being canceled.
* @param serviceOwner the JID of the owner of the PEP service for which the subscription is being canceled.
*/
private void cancelSubscriptionToPEPService(JID unsubscriber, JID serviceOwner) {
// Retrieve recipientJID's PEP service, if it exists.
PEPService pepService = pepServices.get(serviceOwner.toBareJID());
if (pepService == null) {
return;
}
// Cancel unsubscriberJID's subscription to recipientJID's PEP service, if it exists.
CollectionNode rootNode = pepService.getRootCollectionNode();
NodeSubscription nodeSubscription = rootNode.getSubscription(unsubscriber);
if (nodeSubscription != null) {
rootNode.cancelSubscription(nodeSubscription);
if (Log.isDebugEnabled()) {
Log.debug("PEP: " + unsubscriber + " subscription to " + serviceOwner + "'s PEP service was cancelled.");
}
}
else {
return;
}
}
/**
* Implements ServerIdentitiesProvider and UserIdentitiesProvider, adding
* the PEP identity to the respective disco#info results.
*/
public Iterator<Element> getIdentities() {
ArrayList<Element> identities = new ArrayList<Element>();
Element identity = DocumentHelper.createElement("identity");
identity.addAttribute("category", "pubsub");
identity.addAttribute("type", "pep");
identities.add(identity);
return identities.iterator();
}
/**
* Implements ServerFeaturesProvider to include all supported XEP-0060 features
* in the server's disco#info result (as per section 4 of XEP-0163).
*/
public Iterator<String> getFeatures() {
return XMPPServer.getInstance().getPubSubModule().getFeatures(null, null, null);
}
/**
* Implements UserItemsProvider, adding PEP related items to a disco#items
* result.
*/
public Iterator<Element> getUserItems(String name, JID senderJID) {
ArrayList<Element> items = new ArrayList<Element>();
String recipientJID = XMPPServer.getInstance().createJID(name, null).toBareJID();
PEPService pepService = pepServices.get(recipientJID);
if (pepService != null) {
CollectionNode rootNode = pepService.getRootCollectionNode();
Element defaultItem = DocumentHelper.createElement("item");
defaultItem.addAttribute("jid", recipientJID);
for (Node node : pepService.getNodes()) {
// Do not include the root node as an item element.
if (node == rootNode) {
continue;
}
AccessModel accessModel = node.getAccessModel();
if (accessModel.canAccessItems(node, senderJID, new JID(recipientJID))) {
Element item = defaultItem.createCopy();
item.addAttribute("node", node.getNodeID());
items.add(item);
}
}
}
return items.iterator();
}
public void subscribedToPresence(JID subscriberJID, JID authorizerJID) {
PEPService pepService = pepServices.get(authorizerJID.toBareJID());
if (pepService != null) {
createSubscriptionToPEPService(pepService, subscriberJID, authorizerJID);
// Delete any leaf node subscriptions the subscriber may have already
// had (since a subscription to the PEP service, and thus its leaf PEP
// nodes, would be duplicating publish notifications from previous leaf
// node subscriptions).
CollectionNode rootNode = pepService.getRootCollectionNode();
for (Node node : pepService.getNodes()) {
if (rootNode.isChildNode(node)) {
for (NodeSubscription subscription : node.getSubscriptions(subscriberJID)) {
node.cancelSubscription(subscription);
}
}
}
pepService.sendLastPublishedItems(subscriberJID);
}
}
public void unsubscribedToPresence(JID unsubscriberJID, JID recipientJID) {
if (Log.isDebugEnabled()) {
Log.debug("PEP: " + unsubscriberJID + " unsubscribed from " + recipientJID + "'s presence.");
}
cancelSubscriptionToPEPService(unsubscriberJID, recipientJID);
}
public void availableSession(ClientSession session, Presence presence) {
// On newly-available presences, send the last published item if the resource is a subscriber.
JID newlyAvailableJID = presence.getFrom();
for (PEPService pepService : pepServices.values()) {
pepService.sendLastPublishedItems(newlyAvailableJID);
}
}
public void remoteUserAvailable(Presence presence) {
JID jidFrom = presence.getFrom();
JID jidTo = presence.getTo();
// Manage the cache of remote presence resources.
Set<JID> remotePresenceSet = knownRemotePresences.get(jidTo.toBareJID());
if (jidFrom.getResource() != null) {
if (remotePresenceSet != null) {
if (remotePresenceSet.add(jidFrom)) {
if (Log.isDebugEnabled()) {
Log.debug("PEP: added " + jidFrom + " to " + jidTo + "'s knownRemotePresences");
}
}
}
else {
remotePresenceSet = new HashSet<JID>();
if (remotePresenceSet.add(jidFrom)) {
if (Log.isDebugEnabled()) {
Log.debug("PEP: added " + jidFrom + " to " + jidTo + "'s knownRemotePresences");
}
knownRemotePresences.put(jidTo.toBareJID(), remotePresenceSet);
}
}
// Send the presence packet recipient's last published items to the remote user.
PEPService pepService = pepServices.get(jidTo.toBareJID());
if (pepService != null) {
pepService.sendLastPublishedItems(jidFrom);
}
}
}
public void remoteUserUnavailable(Presence presence) {
JID jidFrom = presence.getFrom();
JID jidTo = presence.getTo();
// Manage the cache of remote presence resources.
Set<JID> remotePresenceSet = knownRemotePresences.get(jidTo.toBareJID());
if (remotePresenceSet != null && remotePresenceSet.remove(jidFrom)) {
if (Log.isDebugEnabled()) {
Log.debug("PEP: removed " + jidFrom + " from " + jidTo + "'s knownRemotePresences");
}
}
}
public void contactDeleted(Roster roster, RosterItem item) {
JID rosterOwner = XMPPServer.getInstance().createJID(roster.getUsername(), null);
JID deletedContact = item.getJid();
if (Log.isDebugEnabled()) {
Log.debug("PEP: contactDeleted: rosterOwner is " + rosterOwner + " and deletedContact is" + deletedContact);
}
cancelSubscriptionToPEPService(deletedContact, rosterOwner);
}
public void userDeleting(User user, Map<String, Object> params) {
JID bareJID = XMPPServer.getInstance().createJID(user.getUsername(), null);
// Remove the user's PEP service if it exists.
pepServices.remove(bareJID.toString());
}
// TODO: This will need to be refactored once XEP-0115 (Entity Capabilities) is implemented.
/*
public void interceptPacket(Packet packet, Session session, boolean incoming, boolean processed) throws PacketRejectedException {
if (processed && packet instanceof IQ && ((IQ) packet).getType() == IQ.Type.result) {
// Examine the packet and return if it does not look like a disco#info result containing
// Entity Capabilities for a client. The sooner we return the better, as this method will be called
// quite a lot.
Element element = packet.getElement();
if (element == null) {
return;
}
Element query = element.element("query");
if (query == null) {
return;
}
else {
if (query.attributeValue("node") == null) {
return;
}
String queryNamespace = query.getNamespaceURI();
if (queryNamespace == null || !queryNamespace.equals("http://jabber.org/protocol/disco#info")) {
return;
}
}
if (Log.isDebugEnabled()) {
Log.debug("PEP: Intercepted a caps result packet: " + packet.toString());
}
Iterator featuresIterator = query.elementIterator("feature");
if (featuresIterator == null) {
return;
}
// Get the sender's full JID considering they may be logged in from multiple
// clients with different notification filters.
String jidFrom = packet.getFrom().toString();
// For each feature variable, or in this case node ID, ending in "+notify" -- add
// the node ID to the set of filtered nodes that jidFrom is interested in being
// notified about.
//
// If none of the feature variables contain the node ID ending in "+notify",
// remove it from the set of filtered nodes that jidFrom is interested in being
// notified about.
Set<String> supportedNodesSet = new HashSet<String>();
while (featuresIterator.hasNext()) {
Element featureElement = (Element) featuresIterator.next();
String featureVar = featureElement.attributeValue("var");
if (featureVar == null) {
continue;
}
supportedNodesSet.add(featureVar);
}
for (String nodeID : supportedNodesSet) {
if (nodeID.endsWith("+notify")) {
// Add the nodeID to the sender's filteredNodesSet.
Set<String> filteredNodesSet = filteredNodesMap.get(jidFrom);
if (filteredNodesSet == null) {
filteredNodesSet = new HashSet<String>();
filteredNodesSet.add(nodeID);
filteredNodesMap.put(jidFrom, filteredNodesSet);
if (Log.isDebugEnabled()) {
Log.debug("PEP: Created filteredNodesSet for " + jidFrom);
Log.debug("PEP: Added " + nodeID + " to " + jidFrom + "'s set of filtered nodes.");
}
}
else {
if (filteredNodesSet.add(nodeID)) {
if (Log.isDebugEnabled()) {
Log.debug("PEP: Added " + nodeID + " to " + jidFrom + "'s set of filtered nodes: ");
Iterator tempIter = filteredNodesSet.iterator();
while (tempIter.hasNext()) {
Log.debug("PEP: " + tempIter.next());
}
}
}
}
}
else {
// Remove the nodeID from the sender's filteredNodesSet if nodeIDPlusNotify
// is not in supportedNodesSet.
Set<String> filteredNodesSet = filteredNodesMap.get(jidFrom);
if (filteredNodesSet == null) {
return;
}
String nodeIDPlusNotify = nodeID + "+notify";
if (!supportedNodesSet.contains(nodeIDPlusNotify) && filteredNodesSet.remove(nodeIDPlusNotify)) {
if (Log.isDebugEnabled()) {
Log.debug("PEP: Removed " + nodeIDPlusNotify + " from " + jidFrom + "'s set of filtered nodes: ");
Iterator tempIter = filteredNodesSet.iterator();
while (tempIter.hasNext()) {
Log.debug("PEP: " + tempIter.next());
}
}
}
}
}
}
}
*/
/**
* The following functions are unimplemented required interface methods.
*/
public void unavailableSession(ClientSession session, Presence presence) {
// Do nothing
}
public void presenceChanged(ClientSession session, Presence presence) {
// Do nothing
}
public void presencePriorityChanged(ClientSession session, Presence presence) {
// Do nothing
}
public boolean addingContact(Roster roster, RosterItem item, boolean persistent) {
// Do nothing
return true;
}
public void contactAdded(Roster roster, RosterItem item) {
// Do nothing
}
public void contactUpdated(Roster roster, RosterItem item) {
// Do nothing
}
public void rosterLoaded(Roster roster) {
// Do nothing
}
public void userCreated(User user, Map<String, Object> params) {
// Do nothing
}
public void userModified(User user, Map<String, Object> params) {
// Do nothing
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.pep;
import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.handler.IQHandler;
import org.xmpp.packet.IQ;
/**
* <p>
* An IQHandler used to implement XEP-0163: "Personal Eventing via Pubsub."
* </p>
*
* <p>
* An IQHandler can only handle one namespace in its IQHandlerInfo. However, PEP
* related packets are seen having a variety of different namespaces. This
* handler is needed to forward IQ packets with the
* <i>'http://jabber.org/protocol/pubsub#owner'</i> namespace to IQPEPHandler.
* </p>
*
* @author Armando Jagucki
*
*/
public class IQPEPOwnerHandler extends IQHandler {
private IQHandlerInfo info;
public IQPEPOwnerHandler() {
super("Personal Eventing 'pubsub#owner' Handler");
info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub#owner");
}
@Override
public IQHandlerInfo getInfo() {
return info;
}
@Override
public IQ handleIQ(IQ packet) throws UnauthorizedException {
return XMPPServer.getInstance().getIQPEPHandler().handleIQ(packet);
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.pep;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.commands.AdHocCommandManager;
import org.jivesoftware.openfire.pubsub.*;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.openfire.roster.Roster;
import org.jivesoftware.openfire.roster.RosterItem;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.FastDateFormat;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketExtension;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A PEPService is a PubSubService for use with XEP-0163: "Personal Eventing via
* Pubsub."
*
* @author Armando Jagucki
*
*/
public class PEPService implements PubSubService {
/**
* The bare JID that this service is identified by.
*/
private String serviceOwnerJID;
/**
* Collection node that acts as the root node of the entire node hierarchy.
*/
private CollectionNode rootCollectionNode = null;
/**
* Nodes managed by this service, table: key nodeID (String); value Node
*/
private Map<String, Node> nodes = new ConcurrentHashMap<String, Node>();
/**
* The packet router for the server.
*/
private PacketRouter router = null;
/**
* Default configuration to use for newly created leaf nodes.
*/
private DefaultNodeConfiguration leafDefaultConfiguration;
/**
* Default configuration to use for newly created collection nodes.
*/
private DefaultNodeConfiguration collectionDefaultConfiguration;
/**
* Returns the permission policy for creating nodes. A true value means that
* not anyone can create a node, only the service admin.
*/
private boolean nodeCreationRestricted = true;
/**
* Keep a registry of the presence's show value of users that subscribed to
* a node of the pep service and for which the node only delivers
* notifications for online users or node subscriptions deliver events based
* on the user presence show value. Offline users will not have an entry in
* the map. Note: Key-> bare JID and Value-> Map whose key is full JID of
* connected resource and value is show value of the last received presence.
*/
private Map<String, Map<String, String>> barePresences = new ConcurrentHashMap<String, Map<String, String>>();
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>();
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>();
/**
* Manager that keeps the list of ad-hoc commands and processing command
* requests.
*/
private AdHocCommandManager manager;
/**
* The time to elapse between each execution of the maintenance process.
* Default is 2 minutes.
*/
private int items_task_timeout = 2 * 60 * 1000;
/**
* Task that saves or deletes published items from the database.
*/
private PublishedItemTask publishedItemTask;
/**
* Timer to save published items to the database or remove deleted or old
* items.
*/
private Timer timer = new Timer("PEP service maintenance");
/**
* Date format to use for time stamps in delayed event notifications.
*/
private static final FastDateFormat fastDateFormat;
static {
fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", TimeZone.getTimeZone("UTC"));
}
/**
* Constructs a PEPService.
*
* @param server the XMPP server.
* @param bareJID the bare JID (service ID) of the user owning the service.
*/
public PEPService(XMPPServer server, String bareJID) {
this.serviceOwnerJID = bareJID;
router = server.getPacketRouter();
// Initialize the ad-hoc commands manager to use for this pep service
manager = new AdHocCommandManager();
manager.addCommand(new PendingSubscriptionsCommand(this));
// Save or delete published items from the database every 2 minutes
// starting in 2 minutes (default values)
publishedItemTask = new PublishedItemTask(this);
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
// Load default configuration for leaf nodes
leafDefaultConfiguration = PubSubPersistenceManager.loadDefaultConfiguration(this, true);
if (leafDefaultConfiguration == null) {
// Create and save default configuration for leaf nodes;
leafDefaultConfiguration = new DefaultNodeConfiguration(true);
leafDefaultConfiguration.setAccessModel(AccessModel.presence);
leafDefaultConfiguration.setPublisherModel(PublisherModel.publishers);
leafDefaultConfiguration.setDeliverPayloads(true);
leafDefaultConfiguration.setLanguage("English");
leafDefaultConfiguration.setMaxPayloadSize(5120);
leafDefaultConfiguration.setNotifyConfigChanges(true);
leafDefaultConfiguration.setNotifyDelete(true);
leafDefaultConfiguration.setNotifyRetract(true);
leafDefaultConfiguration.setPersistPublishedItems(false);
leafDefaultConfiguration.setMaxPublishedItems(-1);
leafDefaultConfiguration.setPresenceBasedDelivery(false);
leafDefaultConfiguration.setSendItemSubscribe(true);
leafDefaultConfiguration.setSubscriptionEnabled(true);
leafDefaultConfiguration.setReplyPolicy(null);
PubSubPersistenceManager.createDefaultConfiguration(this, leafDefaultConfiguration);
}
// Load default configuration for collection nodes
collectionDefaultConfiguration = PubSubPersistenceManager.loadDefaultConfiguration(this, false);
if (collectionDefaultConfiguration == null) {
// Create and save default configuration for collection nodes;
collectionDefaultConfiguration = new DefaultNodeConfiguration(false);
collectionDefaultConfiguration.setAccessModel(AccessModel.presence);
collectionDefaultConfiguration.setPublisherModel(PublisherModel.publishers);
collectionDefaultConfiguration.setDeliverPayloads(false);
collectionDefaultConfiguration.setLanguage("English");
collectionDefaultConfiguration.setNotifyConfigChanges(true);
collectionDefaultConfiguration.setNotifyDelete(true);
collectionDefaultConfiguration.setNotifyRetract(true);
collectionDefaultConfiguration.setPresenceBasedDelivery(false);
collectionDefaultConfiguration.setSubscriptionEnabled(true);
collectionDefaultConfiguration.setReplyPolicy(null);
collectionDefaultConfiguration.setAssociationPolicy(CollectionNode.LeafNodeAssociationPolicy.all);
collectionDefaultConfiguration.setMaxLeafNodes(-1);
PubSubPersistenceManager.createDefaultConfiguration(this, collectionDefaultConfiguration);
}
// Load nodes to memory
PubSubPersistenceManager.loadNodes(this);
// Ensure that we have a root collection node
if (nodes.isEmpty()) {
// Create root collection node
JID creatorJID = new JID(bareJID);
rootCollectionNode = new CollectionNode(this, null, bareJID, creatorJID);
// Add the creator as the node owner
rootCollectionNode.addOwner(creatorJID);
// Save new root node
rootCollectionNode.saveToDB();
}
else {
rootCollectionNode = (CollectionNode) getNode(bareJID);
}
}
public void addNode(Node node) {
nodes.put(node.getNodeID(), node);
}
public void removeNode(String nodeID) {
nodes.remove(nodeID);
}
public Node getNode(String nodeID) {
return nodes.get(nodeID);
}
public Collection<Node> getNodes() {
return nodes.values();
}
public CollectionNode getRootCollectionNode() {
return rootCollectionNode;
}
public JID getAddress() {
return new JID(serviceOwnerJID);
}
public String getServiceID() {
// The bare JID of the user is the service ID for PEP
return serviceOwnerJID;
}
public DefaultNodeConfiguration getDefaultNodeConfiguration(boolean leafType) {
if (leafType) {
return leafDefaultConfiguration;
}
return collectionDefaultConfiguration;
}
public Collection<String> getShowPresences(JID subscriber) {
return PubSubEngine.getShowPresences(this, subscriber);
}
public boolean canCreateNode(JID creator) {
// Node creation is always allowed for sysadmin
if (isNodeCreationRestricted() && !isServiceAdmin(creator)) {
// The user is not allowed to create nodes
return false;
}
return true;
}
/**
* Returns true if the the prober is allowed to see the presence of the probee.
*
* @param prober the user that is trying to probe the presence of another user.
* @param probee the username of the uset that is being probed.
* @return true if the the prober is allowed to see the presence of the probee.
* @throws UserNotFoundException If the probee does not exist in the local server or the prober
* is not present in the roster of the probee.
*/
private boolean canProbePresence(JID prober, JID probee) throws UserNotFoundException {
Roster roster;
roster = XMPPServer.getInstance().getRosterManager().getRoster(prober.getNode());
RosterItem item = roster.getRosterItem(probee);
if (item.getSubStatus() == RosterItem.SUB_BOTH || item.getSubStatus() == RosterItem.SUB_FROM) {
return true;
}
return false;
}
public boolean isCollectionNodesSupported() {
return true;
}
public boolean isInstantNodeSupported() {
return true;
}
public boolean isMultipleSubscriptionsEnabled() {
return false;
}
public boolean isServiceAdmin(JID user) {
// Here we consider a 'service admin' to be the user that this PEPService
// is associated with.
if (serviceOwnerJID.equals(user.toBareJID())) {
return true;
}
else {
return false;
}
}
public boolean isNodeCreationRestricted() {
return nodeCreationRestricted;
}
public void presenceSubscriptionNotRequired(Node node, JID user) {
PubSubEngine.presenceSubscriptionNotRequired(this, node, user);
}
public void presenceSubscriptionRequired(Node node, JID user) {
PubSubEngine.presenceSubscriptionRequired(this, node, user);
}
public void send(Packet packet) {
router.route(packet);
}
public void broadcast(Node node, Message message, Collection<JID> jids) {
message.setFrom(getAddress());
for (JID jid : jids) {
message.setTo(jid);
message.setID(node.getNodeID() + "__" + jid.toBareJID() + "__" + StringUtils.randomString(5));
router.route(message);
}
}
public void sendNotification(Node node, Message message, JID recipientJID) {
message.setTo(recipientJID);
message.setFrom(getAddress());
message.setID(node.getNodeID() + "__" + recipientJID.toBareJID() + "__" + StringUtils.randomString(5));
// If the recipient subscribed with a bare JID and this PEPService can retrieve
// presence information for the recipient, collect all of their full JIDs and
// send the notification to each below.
Set<JID> recipientFullJIDs = new HashSet<JID>();
if (recipientJID.getResource() == null && XMPPServer.getInstance().isLocal(recipientJID)) {
for (ClientSession clientSession : SessionManager.getInstance().getSessions(recipientJID.getNode())) {
recipientFullJIDs.add(clientSession.getAddress());
}
}
else {
// Since recipientJID is not local, try to get presence info from cached known remote
// presences.
Map<String, Set<JID>> knownRemotePresences = XMPPServer.getInstance().getIQPEPHandler().getKnownRemotePresenes();
Set<JID> remotePresenceSet = knownRemotePresences.get(getAddress().toBareJID());
if (remotePresenceSet != null) {
for (JID remotePresence : remotePresenceSet) {
if (recipientJID.toBareJID().equals(remotePresence.toBareJID())) {
recipientFullJIDs.add(remotePresence);
}
}
}
}
if (recipientFullJIDs.isEmpty()) {
router.route(message);
return;
}
for (JID recipientFullJID : recipientFullJIDs) {
// Include an Extended Stanza Addressing "replyto" extension specifying the publishing
// resource. However, only include the extension if the receiver has a presence subscription
// to the service owner.
try {
JID publisher = null;
// Get the ID of the node that had an item published to or retracted from.
Element itemsElement = message.getElement().element("event").element("items");
String nodeID = itemsElement.attributeValue("node");
// Get the ID of the item that was published or retracted.
String itemID = null;
Element itemElement = itemsElement.element("item");
if (itemElement == null) {
Element retractElement = itemsElement.element("retract");
if (retractElement != null) {
itemID = retractElement.attributeValue("id");
}
}
else {
itemID = itemElement.attributeValue("id");
}
// Check if the recipientFullJID is interested in notifications for this node.
// If the recipient has not yet requested any notification filtering, continue and send
// the notification.
Map<String, Set<String>> filteredNodesMap = XMPPServer.getInstance().getIQPEPHandler().getFilteredNodesMap();
Set<String> filteredNodesSet = filteredNodesMap.get(recipientFullJID.toString());
if (filteredNodesSet != null && !filteredNodesSet.contains(nodeID + "+notify")) {
return;
}
// Get the full JID of the item publisher from the node that was published to.
// This full JID will be used as the "replyto" address in the addressing extension.
if (node.isCollectionNode()) {
for (Node leafNode : node.getNodes()) {
if (leafNode.getNodeID().equals(nodeID)) {
publisher = leafNode.getPublishedItem(itemID).getPublisher();
// Ensure the recipientJID has access to receive notifications for items published to the leaf node.
AccessModel accessModel = leafNode.getAccessModel();
if (!accessModel.canAccessItems(leafNode, recipientFullJID, publisher)) {
return;
}
break;
}
}
}
else {
publisher = node.getPublishedItem(itemID).getPublisher();
}
// Ensure the recipient is subscribed to the service owner's (publisher's) presence.
if (canProbePresence(publisher, recipientFullJID)) {
Element addresses = DocumentHelper.createElement(QName.get("addresses", "http://jabber.org/protocol/address"));
Element address = addresses.addElement("address");
address.addAttribute("type", "replyto");
address.addAttribute("jid", publisher.toString());
Message extendedMessage = message.createCopy();
extendedMessage.addExtension(new PacketExtension(addresses));
extendedMessage.setTo(recipientFullJID);
router.route(extendedMessage);
}
}
catch (IndexOutOfBoundsException e) {
// Do not add addressing extension to message.
}
catch (UserNotFoundException e) {
// Do not add addressing extension to message.
router.route(message);
}
catch (NullPointerException e) {
try {
if (canProbePresence(getAddress(), recipientFullJID)) {
message.setTo(recipientFullJID);
}
}
catch (UserNotFoundException e1) {
// Do nothing
}
router.route(message);
}
}
}
/**
* Sends an event notification for the last published item of each leaf node under the
* root collection node to the recipient JID. If the recipient has no subscription to
* the root collection node, has not yet been authorized, or is pending to be
* configured -- then no notifications are going to be sent.<p>
*
* Depending on the subscription configuration the event notifications may or may not have
* a payload, may not be sent if a keyword (i.e. filter) was defined and it was not matched.
*
* @param recipientJID the recipient that is to receive the last published item notifications.
*/
public void sendLastPublishedItems(JID recipientJID) {
// Ensure the recipient has a subscription to this service's root collection node.
NodeSubscription subscription = rootCollectionNode.getSubscription(recipientJID);
if (subscription == null) {
subscription = rootCollectionNode.getSubscription(new JID(recipientJID.toBareJID()));
}
if (subscription == null) {
return;
}
// Send the last published item of each leaf node to the recipient.
for (Node leafNode : rootCollectionNode.getNodes()) {
// Retrieve last published item for the leaf node.
PublishedItem leafLastPublishedItem = null;
leafLastPublishedItem = leafNode.getLastPublishedItem();
if (leafLastPublishedItem == null) {
continue;
}
// Check if the published item can be sent to the subscriber
if (!subscription.canSendPublicationEvent(leafLastPublishedItem.getNode(), leafLastPublishedItem)) {
return;
}
// Send event notification to the subscriber
Message notification = new Message();
Element event = notification.getElement().addElement("event", "http://jabber.org/protocol/pubsub#event");
Element items = event.addElement("items");
items.addAttribute("node", leafLastPublishedItem.getNode().getNodeID());
Element item = items.addElement("item");
if (((LeafNode) leafLastPublishedItem.getNode()).isItemRequired()) {
item.addAttribute("id", leafLastPublishedItem.getID());
}
if (leafLastPublishedItem.getNode().isPayloadDelivered() && leafLastPublishedItem.getPayload() != null) {
item.add(leafLastPublishedItem.getPayload().createCopy());
}
// Add a message body (if required)
if (subscription.isIncludingBody()) {
notification.setBody(LocaleUtils.getLocalizedString("pubsub.notification.message.body"));
}
// Include date when published item was created
notification.getElement().addElement("x", "jabber:x:delay").addAttribute("stamp", fastDateFormat.format(leafLastPublishedItem.getCreationDate()));
// Send the event notification to the subscriber
this.sendNotification(subscription.getNode(), notification, subscription.getJID());
}
}
public void queueItemToAdd(PublishedItem newItem) {
PubSubEngine.queueItemToAdd(this, newItem);
}
public void queueItemToRemove(PublishedItem removedItem) {
PubSubEngine.queueItemToRemove(this, removedItem);
}
public Map<String, Map<String, String>> getBarePresences() {
return barePresences;
}
public Queue<PublishedItem> getItemsToAdd() {
return itemsToAdd;
}
public Queue<PublishedItem> getItemsToDelete() {
return itemsToDelete;
}
public AdHocCommandManager getManager() {
return manager;
}
public PublishedItemTask getPublishedItemTask() {
return publishedItemTask;
}
public void setPublishedItemTask(PublishedItemTask task) {
publishedItemTask = task;
}
public Timer getTimer() {
return timer;
}
public int getItemsTaskTimeout() {
return items_task_timeout;
}
public void setItemsTaskTimeout(int timeout) {
items_task_timeout = timeout;
}
}
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
</head>
<body>
<p>Implementation of Personal Eventing via Pubsub (XEP-0163).</p>
</body>
</html>
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.pubsub;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import java.util.Queue;
import java.util.TimerTask;
/**
* A timed maintenance task that updates the database by adding and/or
* removing <code>PublishedItem</code>s in regular intervals.
*
* @author Matt Tucker
*/
public class PublishedItemTask extends TimerTask {
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = null;
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = null;
/**
* The service to perform the published item tasks on.
*/
private PubSubService service = null;
/**
* The number of items to save on each run of the maintenance process.
*/
private int items_batch_size = 50;
public PublishedItemTask(PubSubService service) {
this.service = service;
this.itemsToAdd = service.getItemsToAdd();
this.itemsToDelete = service.getItemsToDelete();
}
public void run() {
try {
PublishedItem entry;
boolean success;
// Delete from the database items contained in the itemsToDelete queue
for (int index = 0; index <= items_batch_size && !itemsToDelete.isEmpty(); index++) {
entry = itemsToDelete.poll();
if (entry != null) {
success = PubSubPersistenceManager.removePublishedItem(service, entry);
if (!success) {
itemsToDelete.add(entry);
}
}
}
// Save to the database items contained in the itemsToAdd queue
for (int index = 0; index <= items_batch_size && !itemsToAdd.isEmpty(); index++) {
entry = itemsToAdd.poll();
if (entry != null) {
success = PubSubPersistenceManager.createPublishedItem(service, entry);
if (!success) {
itemsToAdd.add(entry);
}
}
}
} catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment