Commit 5a6409d8 authored by Matt Tucker's avatar Matt Tucker Committed by matt

Latest pubsub work (JM-613).

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3652 b35dd754-fafc-0310-a699-88a17e54d16e
parent 75a3c4f4
...@@ -12,9 +12,11 @@ ...@@ -12,9 +12,11 @@
package org.jivesoftware.wildfire.pubsub; package org.jivesoftware.wildfire.pubsub;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.forms.FormField; import org.xmpp.forms.FormField;
import org.xmpp.forms.DataForm; import org.xmpp.forms.DataForm;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.dom4j.Element;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -49,10 +51,6 @@ public class CollectionNode extends Node { ...@@ -49,10 +51,6 @@ public class CollectionNode extends Node {
*/ */
private int maxLeafNodes = -1; private int maxLeafNodes = -1;
// TODO Send event notification when a new child node is added (section 9.2)
// TODO Add checking that max number of leaf nodes has been reached
// TODO Add checking that verifies that user that is associating leaf node with collection node is allowed
CollectionNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) { CollectionNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
super(service, parentNode, nodeID, creator); super(service, parentNode, nodeID, creator);
// Configure node with default values (get them from the pubsub service) // Configure node with default values (get them from the pubsub service)
...@@ -77,7 +75,9 @@ public class CollectionNode extends Node { ...@@ -77,7 +75,9 @@ public class CollectionNode extends Node {
try { try {
associationTrusted.add(new JID(value)); associationTrusted.add(new JID(value));
} }
catch (Exception e) {} catch (Exception e) {
// Do nothing
}
} }
} }
else if ("pubsub#leaf_nodes_max".equals(field.getVariable())) { else if ("pubsub#leaf_nodes_max".equals(field.getVariable())) {
...@@ -123,15 +123,96 @@ public class CollectionNode extends Node { ...@@ -123,15 +123,96 @@ public class CollectionNode extends Node {
formField.addValue(maxLeafNodes); formField.addValue(maxLeafNodes);
} }
/**
* Adds a child node to the list of child nodes. The new child node may just have been
* created or just restored from the database. This method will not trigger notifications
* to node subscribers since the node could be a node that has just been loaded from the
* database.
*
* @param child the node to add to the list of child nodes.
*/
void addChildNode(Node child) { void addChildNode(Node child) {
nodes.put(child.getNodeID(), child); nodes.put(child.getNodeID(), child);
} }
/**
* Removes a child node from the list of child nodes. This method will not trigger
* notifications to node subscribers.
*
* @param child the node to remove from the list of child nodes.
*/
void removeChildNode(Node child) { void removeChildNode(Node child) {
// TODO Send notification to subscribers?
nodes.remove(child.getNodeID()); nodes.remove(child.getNodeID());
} }
/**
* Notification that a new node was created and added to this node. Trigger notifications
* to node subscribers whose subscription type is {@link NodeSubscription.Type#nodes} and
* have the proper depth.
*
* @param child the newly created node that was added to this node.
*/
void childNodeAdded(Node child) {
// Build packet to broadcast to subscribers
Message message = new Message();
Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event");
Element item = event.addElement("items").addElement("item");
item.addAttribute("id", child.getNodeID());
if (deliverPayloads) {
item.add(child.getMetadataForm().getElement());
}
// Broadcast event notification to subscribers
broadcastCollectionNodeEvent(child, message);
}
/**
* Notification that a child node was deleted from this node. Trigger notifications
* to node subscribers whose subscription type is {@link NodeSubscription.Type#nodes} and
* have the proper depth.
*
* @param child the deleted node that was removed from this node.
*/
void childNodeDeleted(Node child) {
// Build packet to broadcast to subscribers
Message message = new Message();
Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event");
event.addElement("delete").addAttribute("node", child.getNodeID());
// Broadcast event notification to subscribers
broadcastCollectionNodeEvent(child, message);
}
private void broadcastCollectionNodeEvent(Node child, Message notification) {
// Get affected subscriptions (of this node and all parent nodes)
Collection<NodeSubscription> subscriptions = new ArrayList<NodeSubscription>();
subscriptions.addAll(getSubscriptions(child));
for (CollectionNode parentNode : getParents()) {
subscriptions.addAll(parentNode.getSubscriptions(child));
}
// TODO Possibly use a thread pool for sending packets (based on the jids size)
for (NodeSubscription subscription : subscriptions) {
service.sendNotification(subscription.getNode(), notification, subscription.getJID());
}
}
/**
* Returns a collection with the subscriptions to this node that should be notified
* that a new child was added or deleted.
*
* @param child the added or deleted child.
* @return a collection with the subscriptions to this node that should be notified
* that a new child was added or deleted.
*/
private Collection<NodeSubscription> getSubscriptions(Node child) {
Collection<NodeSubscription> subscriptions = new ArrayList<NodeSubscription>();
for (NodeSubscription subscription : getSubscriptions()) {
if (subscription.canSendChildNodeEvent(child)) {
subscriptions.add(subscription);
}
}
return subscriptions;
}
public boolean isCollectionNode() { public boolean isCollectionNode() {
return true; return true;
} }
...@@ -232,6 +313,54 @@ public class CollectionNode extends Node { ...@@ -232,6 +313,54 @@ public class CollectionNode extends Node {
this.maxLeafNodes = maxLeafNodes; this.maxLeafNodes = maxLeafNodes;
} }
/**
* Returns true if the specified user is allowed to associate a leaf node with this
* node. The decision is taken based on the association policy that the node is
* using.
*
* @param user the user trying to associate a leaf node with this node.
* @return true if the specified user is allowed to associate a leaf node with this
* node.
*/
public boolean isAssociationAllowed(JID user) {
if (associationPolicy == LeafNodeAssociationPolicy.all) {
// Anyone is allowed to associate leaf nodes with this node
return true;
}
else if (associationPolicy == LeafNodeAssociationPolicy.owners) {
// Only owners or sysadmins are allowed to associate leaf nodes with this node
return isAdmin(user);
}
else {
// Owners, sysadmins and a whitelist of usres are allowed to
// associate leaf nodes with this node
return isAdmin(user) || associationTrusted.contains(user);
}
}
/**
* Returns true if the max number of leaf nodes associated with this node has
* reached to the maximum allowed.
*
* @return true if the max number of leaf nodes associated with this node has
* reached to the maximum allowed.
*/
public boolean isMaxLeafNodeReached() {
if (maxLeafNodes < 0) {
// There is no maximum limit
return false;
}
// Count number of child leaf nodes
int counter = 0;
for (Node node : getNodes()) {
if (!node.isCollectionNode()) {
counter = counter + 1;
}
}
// Compare count with maximum allowed
return counter >= maxLeafNodes;
}
/** /**
* Policy that defines who may associate leaf nodes with a collection. * Policy that defines who may associate leaf nodes with a collection.
*/ */
...@@ -248,6 +377,6 @@ public class CollectionNode extends Node { ...@@ -248,6 +377,6 @@ public class CollectionNode extends Node {
/** /**
* Only those on a whitelist may associate leaf nodes with the collection. * Only those on a whitelist may associate leaf nodes with the collection.
*/ */
whitelist; whitelist
} }
} }
...@@ -18,6 +18,7 @@ import org.xmpp.forms.DataForm; ...@@ -18,6 +18,7 @@ import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField; import org.xmpp.forms.FormField;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.IQ;
import java.util.*; import java.util.*;
...@@ -55,10 +56,10 @@ public class LeafNode extends Node { ...@@ -55,10 +56,10 @@ public class LeafNode extends Node {
* not configured to persist items then the last published item will be kept. The list is * not configured to persist items then the last published item will be kept. The list is
* sorted cronologically. * sorted cronologically.
*/ */
protected List<PublishedItem> publishedItems = new ArrayList<PublishedItem>(); protected final List<PublishedItem> publishedItems = new ArrayList<PublishedItem>();
protected Map<String, PublishedItem> itemsByID = new HashMap<String, PublishedItem>(); protected Map<String, PublishedItem> itemsByID = new HashMap<String, PublishedItem>();
// TODO Add checking of max payload size // TODO Add checking of max payload size. Return <not-acceptable> plus a application specific error condition of <payload-too-big/>.
LeafNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) { LeafNode(PubSubService service, CollectionNode parentNode, String nodeID, JID creator) {
super(service, parentNode, nodeID, creator); super(service, parentNode, nodeID, creator);
...@@ -213,7 +214,7 @@ public class LeafNode extends Node { ...@@ -213,7 +214,7 @@ public class LeafNode extends Node {
if (isItemRequired()) { if (isItemRequired()) {
String itemID; String itemID;
Element payload; Element payload;
PublishedItem newItem = null; PublishedItem newItem;
for (Element item : itemElements) { for (Element item : itemElements) {
itemID = item.attributeValue("id"); itemID = item.attributeValue("id");
List entries = item.elements(); List entries = item.elements();
...@@ -316,6 +317,35 @@ public class LeafNode extends Node { ...@@ -316,6 +317,35 @@ public class LeafNode extends Node {
} }
} }
/**
* Sends an IQ result with the list of items published to the node. Item ID and payload
* may be included in the result based on the node configuration.
*
* @param originalRequest the IQ packet sent by a subscriber (or anyone) to get the node items.
* @param publishedItems the list of published items to send to the subscriber.
* @param forceToIncludePayload true if the item payload should be include if one exists. When
* false the decision is up to the node.
*/
void sendPublishedItems(IQ originalRequest, List<PublishedItem> publishedItems,
boolean forceToIncludePayload) {
IQ result = IQ.createResultIQ(originalRequest);
Element childElement = originalRequest.getChildElement().createCopy();
result.setChildElement(childElement);
Element items = childElement.element("items");
for (PublishedItem publishedItem : publishedItems) {
Element item = items.addElement("item");
if (isItemRequired()) {
item.addAttribute("id", publishedItem.getID());
}
if ((forceToIncludePayload || isPayloadDelivered()) &&
publishedItem.getPayload() != null) {
item.add(publishedItem.getPayload().createCopy());
}
}
// Send the result
service.send(result);
}
public PublishedItem getPublishedItem(String itemID) { public PublishedItem getPublishedItem(String itemID) {
if (!isItemRequired()) { if (!isItemRequired()) {
return null; return null;
......
...@@ -72,15 +72,15 @@ public class NodeAffiliate { ...@@ -72,15 +72,15 @@ public class NodeAffiliate {
* with the items to include in each notification. * with the items to include in each notification.
* @param event the event Element included in the notification message. Passed as an * @param event the event Element included in the notification message. Passed as an
* optimization to avoid future look ups. * optimization to avoid future look ups.
* @param node the leaf node where the items where published. * @param leafNode the leaf node where the items where published.
* @param publishedItems the list of items that were published. Could be an empty list. * @param publishedItems the list of items that were published. Could be an empty list.
*/ */
void sendPublishedNotifications(Message notification, Element event, LeafNode node, void sendPublishedNotifications(Message notification, Element event, LeafNode leafNode,
List<PublishedItem> publishedItems) { List<PublishedItem> publishedItems) {
if (!publishedItems.isEmpty()) { if (!publishedItems.isEmpty()) {
Map<List<NodeSubscription>, List<PublishedItem>> itemsBySubs = Map<List<NodeSubscription>, List<PublishedItem>> itemsBySubs =
getItemsBySubscriptions(node, publishedItems); getItemsBySubscriptions(leafNode, publishedItems);
// Send one notification for published items that affect the same subscriptions // Send one notification for published items that affect the same subscriptions
for (List<NodeSubscription> nodeSubscriptions : itemsBySubs.keySet()) { for (List<NodeSubscription> nodeSubscriptions : itemsBySubs.keySet()) {
...@@ -90,37 +90,37 @@ public class NodeAffiliate { ...@@ -90,37 +90,37 @@ public class NodeAffiliate {
for (PublishedItem publishedItem : itemsBySubs.get(nodeSubscriptions)) { for (PublishedItem publishedItem : itemsBySubs.get(nodeSubscriptions)) {
// Add item information to the event notification // Add item information to the event notification
Element item = items.addElement("item"); Element item = items.addElement("item");
if (node.isItemRequired()) { if (leafNode.isItemRequired()) {
item.addAttribute("id", publishedItem.getID()); item.addAttribute("id", publishedItem.getID());
} }
if (node.isPayloadDelivered()) { if (leafNode.isPayloadDelivered()) {
item.add(publishedItem.getPayload().createCopy()); item.add(publishedItem.getPayload().createCopy());
} }
// Add leaf node information if affiliated node and node // Add leaf leafNode information if affiliated leafNode and node
// where the item was published are different // where the item was published are different
if (node != getNode()) { if (leafNode != getNode()) {
item.addAttribute("node", node.getNodeID()); item.addAttribute("node", leafNode.getNodeID());
} }
} }
// Send the event notification // Send the event notification
sendEventNotification(notification, node, nodeSubscriptions); sendEventNotification(notification, nodeSubscriptions);
// Remove the added items information // Remove the added items information
event.remove(items); event.remove(items);
} }
} }
else { else {
// Filter affiliate subscriptions and only use approved and configured ones // Filter affiliate subscriptions and only use approved and configured ones
List<NodeSubscription> affectedSubscriptions = new ArrayList<NodeSubscription>();; List<NodeSubscription> affectedSubscriptions = new ArrayList<NodeSubscription>();
for (NodeSubscription subscription : getSubscriptions()) { for (NodeSubscription subscription : getSubscriptions()) {
if (subscription.canSendEventNotification(node, null)) { if (subscription.canSendPublicationEvent(leafNode, null)) {
affectedSubscriptions.add(subscription); affectedSubscriptions.add(subscription);
} }
} }
// Add item information to the event notification // Add item information to the event notification
Element items = event.addElement("items"); Element items = event.addElement("items");
items.addAttribute("node", node.getNodeID()); items.addAttribute("node", leafNode.getNodeID());
// Send the event notification // Send the event notification
sendEventNotification(notification, node, affectedSubscriptions); sendEventNotification(notification, affectedSubscriptions);
// Remove the added items information // Remove the added items information
event.remove(items); event.remove(items);
} }
...@@ -137,30 +137,30 @@ public class NodeAffiliate { ...@@ -137,30 +137,30 @@ public class NodeAffiliate {
* with the items to include in each notification. * with the items to include in each notification.
* @param event the event Element included in the notification message. Passed as an * @param event the event Element included in the notification message. Passed as an
* optimization to avoid future look ups. * optimization to avoid future look ups.
* @param node the leaf node where the items where deleted from. * @param leafNode the leaf node where the items where deleted from.
* @param publishedItems the list of items that were deleted. * @param publishedItems the list of items that were deleted.
*/ */
void sendDeletionNotifications(Message notification, Element event, LeafNode node, void sendDeletionNotifications(Message notification, Element event, LeafNode leafNode,
List<PublishedItem> publishedItems) { List<PublishedItem> publishedItems) {
if (!publishedItems.isEmpty()) { if (!publishedItems.isEmpty()) {
Map<List<NodeSubscription>, List<PublishedItem>> itemsBySubs = Map<List<NodeSubscription>, List<PublishedItem>> itemsBySubs =
getItemsBySubscriptions(node, publishedItems); getItemsBySubscriptions(leafNode, publishedItems);
// Send one notification for published items that affect the same subscriptions // Send one notification for published items that affect the same subscriptions
for (List<NodeSubscription> nodeSubscriptions : itemsBySubs.keySet()) { for (List<NodeSubscription> nodeSubscriptions : itemsBySubs.keySet()) {
// Add items information // Add items information
Element items = event.addElement("items"); Element items = event.addElement("items");
items.addAttribute("node", node.getNodeID()); items.addAttribute("node", leafNode.getNodeID());
for (PublishedItem publishedItem : itemsBySubs.get(nodeSubscriptions)) { for (PublishedItem publishedItem : itemsBySubs.get(nodeSubscriptions)) {
// Add retract information to the event notification // Add retract information to the event notification
Element item = items.addElement("retract"); Element item = items.addElement("retract");
if (node.isItemRequired()) { if (leafNode.isItemRequired()) {
item.addAttribute("id", publishedItem.getID()); item.addAttribute("id", publishedItem.getID());
} }
} }
// Send the event notification // Send the event notification
sendEventNotification(notification, node, nodeSubscriptions); sendEventNotification(notification, nodeSubscriptions);
// Remove the added items information // Remove the added items information
event.remove(items); event.remove(items);
} }
...@@ -180,11 +180,10 @@ public class NodeAffiliate { ...@@ -180,11 +180,10 @@ public class NodeAffiliate {
* specified), the subscription status and originating node. * specified), the subscription status and originating node.
* *
* @param notification the message to send containing the event notification. * @param notification the message to send containing the event notification.
* @param node the node that received a new publication.
* @param notifySubscriptions list of subscriptions that were affected and are going to be * @param notifySubscriptions list of subscriptions that were affected and are going to be
* included in the notification message. The list should not be empty. * included in the notification message. The list should not be empty.
*/ */
private void sendEventNotification(Message notification, LeafNode node, private void sendEventNotification(Message notification,
List<NodeSubscription> notifySubscriptions) { List<NodeSubscription> notifySubscriptions) {
if (node.isMultipleSubscriptionsEnabled()) { if (node.isMultipleSubscriptionsEnabled()) {
// Group subscriptions with the same subscriber JID // Group subscriptions with the same subscriber JID
...@@ -215,8 +214,8 @@ public class NodeAffiliate { ...@@ -215,8 +214,8 @@ public class NodeAffiliate {
} }
} }
private Map<List<NodeSubscription>, List<PublishedItem>> getItemsBySubscriptions(LeafNode node, private Map<List<NodeSubscription>, List<PublishedItem>> getItemsBySubscriptions(
List<PublishedItem> publishedItems) { LeafNode leafNode, List<PublishedItem> publishedItems) {
// Identify which subscriptions can receive each item // Identify which subscriptions can receive each item
Map<PublishedItem, List<NodeSubscription>> subsByItem = Map<PublishedItem, List<NodeSubscription>> subsByItem =
new HashMap<PublishedItem, List<NodeSubscription>>(); new HashMap<PublishedItem, List<NodeSubscription>>();
...@@ -225,7 +224,7 @@ public class NodeAffiliate { ...@@ -225,7 +224,7 @@ public class NodeAffiliate {
Collection<NodeSubscription> subscriptions = getSubscriptions(); Collection<NodeSubscription> subscriptions = getSubscriptions();
for (PublishedItem publishedItem : publishedItems) { for (PublishedItem publishedItem : publishedItems) {
for (NodeSubscription subscription : subscriptions) { for (NodeSubscription subscription : subscriptions) {
if (subscription.canSendEventNotification(node, publishedItem)) { if (subscription.canSendPublicationEvent(leafNode, publishedItem)) {
List<NodeSubscription> nodeSubscriptions = subsByItem.get(publishedItem); List<NodeSubscription> nodeSubscriptions = subsByItem.get(publishedItem);
if (nodeSubscriptions == null) { if (nodeSubscriptions == null) {
nodeSubscriptions = new ArrayList<NodeSubscription>(); nodeSubscriptions = new ArrayList<NodeSubscription>();
...@@ -277,6 +276,6 @@ public class NodeAffiliate { ...@@ -277,6 +276,6 @@ public class NodeAffiliate {
/** /**
* Outcast users are not allowed to subscribe to the node. * Outcast users are not allowed to subscribe to the node.
*/ */
outcast; outcast
} }
} }
...@@ -49,8 +49,8 @@ import java.util.*; ...@@ -49,8 +49,8 @@ import java.util.*;
*/ */
public class NodeSubscription { public class NodeSubscription {
private static SimpleDateFormat dateFormat; private static final SimpleDateFormat dateFormat;
private static FastDateFormat fastDateFormat; private static final FastDateFormat fastDateFormat;
/** /**
* Reference to the publish and subscribe service. * Reference to the publish and subscribe service.
*/ */
...@@ -412,6 +412,9 @@ public class NodeSubscription { ...@@ -412,6 +412,9 @@ public class NodeSubscription {
void configure(DataForm options) { void configure(DataForm options) {
List<String> values; List<String> values;
String booleanValue; String booleanValue;
boolean wasUsingPresence = !presenceStates.isEmpty();
// Remove this field from the form // Remove this field from the form
options.removeField("FORM_TYPE"); options.removeField("FORM_TYPE");
// Process and remove specific collection node fields // Process and remove specific collection node fields
...@@ -471,7 +474,9 @@ public class NodeSubscription { ...@@ -471,7 +474,9 @@ public class NodeSubscription {
try { try {
presenceStates.add(value); presenceStates.add(value);
} }
catch (Exception e) {} catch (Exception e) {
// Do nothing
}
} }
} }
else if ("x-pubsub#keywords".equals(field.getVariable())) { else if ("x-pubsub#keywords".equals(field.getVariable())) {
...@@ -483,7 +488,7 @@ public class NodeSubscription { ...@@ -483,7 +488,7 @@ public class NodeSubscription {
} }
if (fieldExists) { if (fieldExists) {
// Subscription has been configured so set the next state // Subscription has been configured so set the next state
if (node.getAccessModel().isAuthorizationRequired()) { if (node.getAccessModel().isAuthorizationRequired() && !node.isAdmin(owner)) {
state = State.pending; state = State.pending;
} }
else { else {
...@@ -495,6 +500,15 @@ public class NodeSubscription { ...@@ -495,6 +500,15 @@ public class NodeSubscription {
// Update the subscription in the backend store // Update the subscription in the backend store
PubSubPersistenceManager.saveSubscription(service, node, this, false); PubSubPersistenceManager.saveSubscription(service, node, this, false);
} }
// Check if the service needs to subscribe or unsubscribe from the owner presence
if (!node.isPresenceBasedDelivery() && wasUsingPresence != !presenceStates.isEmpty()) {
if (presenceStates.isEmpty()) {
service.presenceSubscriptionNotRequired(node, owner);
}
else {
service.presenceSubscriptionRequired(node, owner);
}
}
} }
/** /**
...@@ -601,22 +615,10 @@ public class NodeSubscription { ...@@ -601,22 +615,10 @@ public class NodeSubscription {
* @return true if an event notification can be sent to the subscriber for the specified * @return true if an event notification can be sent to the subscriber for the specified
* published item. * published item.
*/ */
boolean canSendEventNotification(LeafNode leafNode, PublishedItem publishedItem) { boolean canSendPublicationEvent(LeafNode leafNode, PublishedItem publishedItem) {
// Check if the subscription is active if (!canSendEvents()) {
if (!isActive()) {
return false; return false;
} }
// Check if delivery of notifications is disabled
if (!shouldDeliverNotifications()) {
return false;
}
// Check if delivery is subject to presence-based policy
if (!getPresenceStates().isEmpty()) {
String show = service.getShowPresence(jid);
if (show == null || !getPresenceStates().contains(show)) {
return false;
}
}
// Check that any defined keyword was matched (applies only if an item was published) // Check that any defined keyword was matched (applies only if an item was published)
if (publishedItem != null && !isKeywordMatched(publishedItem)) { if (publishedItem != null && !isKeywordMatched(publishedItem)) {
return false; return false;
...@@ -639,6 +641,39 @@ public class NodeSubscription { ...@@ -639,6 +641,39 @@ public class NodeSubscription {
return true; return true;
} }
/**
* Returns true if an event notification can be sent to the subscriber of the collection
* node for a newly created node that was associated to the collection node or a child
* node that was deleted. The subscription has to be of type {@link Type#nodes}.
*
* @param originatingNode the node that was added or deleted from the collection node.
* @return true if an event notification can be sent to the subscriber of the collection
* node.
*/
boolean canSendChildNodeEvent(Node originatingNode) {
// Check that this is a subscriber to a collection node
if (!node.isCollectionNode()) {
return false;
}
if (!canSendEvents()) {
return false;
}
// Check that subscriber is using type "nodes"
if (Type.nodes != type) {
return false;
}
// Check if added/deleted node is a first-level child of the subscribed node
if (getDepth() == 1 && !node.isChildNode(originatingNode)) {
return false;
}
// Check if added/deleted node is a descendant child of the subscribed node
if (getDepth() == 0 && !node.isDescendantNode(originatingNode)) {
return false;
}
return true;
}
/** /**
* Returns true if node events such as configuration changed or node purged can be * Returns true if node events such as configuration changed or node purged can be
* sent to the subscriber. * sent to the subscriber.
...@@ -647,6 +682,17 @@ public class NodeSubscription { ...@@ -647,6 +682,17 @@ public class NodeSubscription {
* sent to the subscriber. * sent to the subscriber.
*/ */
boolean canSendNodeEvents() { boolean canSendNodeEvents() {
return canSendEvents();
}
/**
* Returns true if events in general can be sent. This method checks basic
* conditions common to all type of event notifications (e.g. item was published,
* node configuration has changed, new child node was added to collection node, etc.).
*
* @return true if events in general can be sent.
*/
private boolean canSendEvents() {
// Check if the subscription is active // Check if the subscription is active
if (!isActive()) { if (!isActive()) {
return false; return false;
...@@ -657,8 +703,15 @@ public class NodeSubscription { ...@@ -657,8 +703,15 @@ public class NodeSubscription {
} }
// Check if delivery is subject to presence-based policy // Check if delivery is subject to presence-based policy
if (!getPresenceStates().isEmpty()) { if (!getPresenceStates().isEmpty()) {
String show = service.getShowPresence(jid); Collection<String> shows = service.getShowPresences(jid);
if (show == null || !getPresenceStates().contains(show)) { if (shows.isEmpty() || Collections.disjoint(getPresenceStates(), shows)) {
return false;
}
}
// Check if node is only sending events when user is online
if (node.isPresenceBasedDelivery()) {
// Check that user is online
if (service.getShowPresences(jid).isEmpty()) {
return false; return false;
} }
} }
...@@ -731,42 +784,6 @@ public class NodeSubscription { ...@@ -731,42 +784,6 @@ public class NodeSubscription {
service.send(result); service.send(result);
} }
/**
* Sends an IQ result with the list of items published to the node to the subscriber. The
* items to include in the result is subject to the subscription configuration. If the
* subscription is still pending to be approved or unconfigured then no items will be included.
*
* @param originalRequest the IQ packet sent by the subscriber to get the node items.
* @param publishedItems the list of published items to send to the subscriber.
* @param forceToIncludePayload true if the item payload should be include if one exists. When
* false the decision is up to the node.
*/
void sendPublishedItems(IQ originalRequest, List<PublishedItem> publishedItems,
boolean forceToIncludePayload) {
IQ result = IQ.createResultIQ(originalRequest);
Element childElement = originalRequest.getChildElement().createCopy();
result.setChildElement(childElement);
Element items = childElement.element("items");
if (isActive()) {
for (PublishedItem publishedItem : publishedItems) {
// Check if this published item can be included in the result
if (!isKeywordMatched(publishedItem)) {
continue;
}
Element item = items.addElement("item");
if (((LeafNode) node).isItemRequired()) {
item.addAttribute("id", publishedItem.getID());
}
if ((forceToIncludePayload || node.isPayloadDelivered()) &&
publishedItem.getPayload() != null) {
item.add(publishedItem.getPayload().createCopy());
}
}
}
// Send the result
service.send(result);
}
/** /**
* Sends an event notification for the last published item to the subscriber. If * Sends an event notification for the last published item to the subscriber. If
* the subscription has not yet been authorized or is pending to be configured then * the subscription has not yet been authorized or is pending to be configured then
...@@ -779,7 +796,7 @@ public class NodeSubscription { ...@@ -779,7 +796,7 @@ public class NodeSubscription {
*/ */
void sendLastPublishedItem(PublishedItem publishedItem) { void sendLastPublishedItem(PublishedItem publishedItem) {
// Check if the published item can be sent to the subscriber // Check if the published item can be sent to the subscriber
if (!canSendEventNotification(publishedItem.getNode(), publishedItem)) { if (!canSendPublicationEvent(publishedItem.getNode(), publishedItem)) {
return; return;
} }
// Send event notification to the subscriber // Send event notification to the subscriber
...@@ -910,7 +927,7 @@ public class NodeSubscription { ...@@ -910,7 +927,7 @@ public class NodeSubscription {
* An entity is subscribed to a node. The node will send all event notifications * An entity is subscribed to a node. The node will send all event notifications
* (and, if configured, payloads) to the entity while it is in this state. * (and, if configured, payloads) to the entity while it is in this state.
*/ */
subscribed; subscribed
} }
public static enum Type { public static enum Type {
...@@ -922,6 +939,6 @@ public class NodeSubscription { ...@@ -922,6 +939,6 @@ public class NodeSubscription {
/** /**
* Receive notification of new nodes only. * Receive notification of new nodes only.
*/ */
nodes; nodes
} }
} }
...@@ -103,9 +103,13 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -103,9 +103,13 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
/** /**
* Keep a registry of the presence's show value of users that subscribed to a node of * Keep a registry of the presence's show value of users that subscribed to a node of
* the pubsub service. The Map will have a value only for only users. * the pubsub service and for which the node only delivers notifications for online users
* or node subscriptions deliver events based on the user presence show value. Offline
* users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key
* is full JID of connected resource and value is show value of the last received presence.
*/ */
private Map<JID, String> presences = new ConcurrentHashMap<JID, String>(); private Map<String, Map<String, String>> barePresences =
new ConcurrentHashMap<String, Map<String, String>>();
public PubSubModule() { public PubSubModule() {
super("Publish Subscribe Service"); super("Publish Subscribe Service");
...@@ -216,10 +220,40 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -216,10 +220,40 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
return collectionDefaultConfiguration; return collectionDefaultConfiguration;
} }
public String getShowPresence(JID subscriber) { public Collection<String> getShowPresences(JID subscriber) {
// TODO Implement subscribe to presence and update registry of show values. Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());
// TODO Remove presence subscription when user removed his subscription or the node was deleted. if (fullPresences == null) {
return presences.get(subscriber); // User is offline so return empty list
return Collections.emptyList();
}
if (subscriber.getResource() == null) {
// Subscriber used bared JID so return show value of all connected resources
return fullPresences.values();
}
else {
// Look for the show value using the full JID
String show = fullPresences.get(subscriber.toString());
if (show == null) {
// User at the specified resource is offline so return empty list
return Collections.emptyList();
}
// User is connected at specified resource so answer list with presence show value
return Arrays.asList(show);
}
}
public void presenceSubscriptionNotRequired(Node node, JID user) {
// TODO Implement this
}
public void presenceSubscriptionRequired(Node node, JID user) {
Map<String, String> fullPresences = barePresences.get(user.toString());
if (fullPresences == null || fullPresences.isEmpty()) {
Presence subscription = new Presence(Presence.Type.subscribe);
subscription.setTo(user);
subscription.setFrom(getAddress());
send(subscription);
}
} }
public PubSubEngine getPubSubEngine() { public PubSubEngine getPubSubEngine() {
...@@ -251,7 +285,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -251,7 +285,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
sysadmins.add(userJID.trim().toLowerCase()); sysadmins.add(userJID.trim().toLowerCase());
// Update the config. // Update the config.
String[] jids = new String[sysadmins.size()]; String[] jids = new String[sysadmins.size()];
jids = (String[])sysadmins.toArray(jids); jids = sysadmins.toArray(jids);
JiveGlobals.setProperty("xmpp.pubsub.sysadmin.jid", fromArray(jids)); JiveGlobals.setProperty("xmpp.pubsub.sysadmin.jid", fromArray(jids));
} }
...@@ -259,7 +293,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -259,7 +293,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
sysadmins.remove(userJID.trim().toLowerCase()); sysadmins.remove(userJID.trim().toLowerCase());
// Update the config. // Update the config.
String[] jids = new String[sysadmins.size()]; String[] jids = new String[sysadmins.size()];
jids = (String[])sysadmins.toArray(jids); jids = sysadmins.toArray(jids);
JiveGlobals.setProperty("xmpp.pubsub.sysadmin.jid", fromArray(jids)); JiveGlobals.setProperty("xmpp.pubsub.sysadmin.jid", fromArray(jids));
} }
...@@ -277,7 +311,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -277,7 +311,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
allowedToCreate.add(userJID.trim().toLowerCase()); allowedToCreate.add(userJID.trim().toLowerCase());
// Update the config. // Update the config.
String[] jids = new String[allowedToCreate.size()]; String[] jids = new String[allowedToCreate.size()];
jids = (String[])allowedToCreate.toArray(jids); jids = allowedToCreate.toArray(jids);
JiveGlobals.setProperty("xmpp.pubsub.create.jid", fromArray(jids)); JiveGlobals.setProperty("xmpp.pubsub.create.jid", fromArray(jids));
} }
...@@ -286,7 +320,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -286,7 +320,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
allowedToCreate.remove(userJID.trim().toLowerCase()); allowedToCreate.remove(userJID.trim().toLowerCase());
// Update the config. // Update the config.
String[] jids = new String[allowedToCreate.size()]; String[] jids = new String[allowedToCreate.size()];
jids = (String[])allowedToCreate.toArray(jids); jids = allowedToCreate.toArray(jids);
JiveGlobals.setProperty("xmpp.pubsub.create.jid", fromArray(jids)); JiveGlobals.setProperty("xmpp.pubsub.create.jid", fromArray(jids));
} }
...@@ -408,6 +442,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -408,6 +442,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
super.start(); super.start();
// Add the route to this service // Add the route to this service
routingTable.addRoute(getAddress(), this); routingTable.addRoute(getAddress(), this);
// TODO Probe presences of users that this service has subscribed to
ArrayList<String> params = new ArrayList<String>(); ArrayList<String> params = new ArrayList<String>();
params.clear(); params.clear();
params.add(getServiceDomain()); params.add(getServiceDomain());
...@@ -576,8 +611,8 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -576,8 +611,8 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
} }
public boolean hasInfo(String name, String node, JID senderJID) { public boolean hasInfo(String name, String node, JID senderJID) {
if (name == null && node == node) { if (name == null && node == null) {
// We always have info about the MUC service // We always have info about the Pubsub service
return true; return true;
} }
else if (name == null && node != null) { else if (name == null && node != null) {
...@@ -622,7 +657,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -622,7 +657,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
else { else {
// This is a leaf node so answer the published items which exist on the service // This is a leaf node so answer the published items which exist on the service
Element item; Element item;
for (PublishedItem publishedItem : ((LeafNode) pubNode).getPublishedItems()) { for (PublishedItem publishedItem : pubNode.getPublishedItems()) {
item = DocumentHelper.createElement("item"); item = DocumentHelper.createElement("item");
item.addAttribute("jid", serviceDomain); item.addAttribute("jid", serviceDomain);
item.addAttribute("name", publishedItem.getID()); item.addAttribute("name", publishedItem.getID());
......
...@@ -520,6 +520,11 @@ public class PubSubPersistenceManager { ...@@ -520,6 +520,11 @@ public class PubSubPersistenceManager {
String subID = rs.getString(2); String subID = rs.getString(2);
JID subscriber = new JID(rs.getString(3)); JID subscriber = new JID(rs.getString(3));
JID owner = new JID(rs.getString(4)); JID owner = new JID(rs.getString(4));
if (node.getAffiliate(owner) == null) {
Log.warn("Subscription found for a non-existent affiliate: " + owner +
" in node: " + nodeID);
return;
}
NodeSubscription.State state = NodeSubscription.State.valueOf(rs.getString(5)); NodeSubscription.State state = NodeSubscription.State.valueOf(rs.getString(5));
NodeSubscription subscription = NodeSubscription subscription =
new NodeSubscription(service, node, owner, subscriber, state, subID); new NodeSubscription(service, node, owner, subscriber, state, subID);
...@@ -603,23 +608,13 @@ public class PubSubPersistenceManager { ...@@ -603,23 +608,13 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate(); pstmt.executeUpdate();
} }
else { else {
if (NodeAffiliate.Affiliation.none == affiliate.getAffiliation()) { // Update the affiliate's data in the backend store
// Remove the affiliate from the table of node affiliates pstmt = con.prepareStatement(UPDATE_AFFILIATION);
pstmt = con.prepareStatement(DELETE_AFFILIATION); pstmt.setString(1, affiliate.getAffiliation().name());
pstmt.setString(1, service.getServiceID()); pstmt.setString(2, service.getServiceID());
pstmt.setString(2, node.getNodeID()); pstmt.setString(3, node.getNodeID());
pstmt.setString(3, affiliate.getJID().toString()); pstmt.setString(4, affiliate.getJID().toString());
pstmt.executeUpdate(); pstmt.executeUpdate();
}
else {
// Update the affiliate's data in the backend store
pstmt = con.prepareStatement(UPDATE_AFFILIATION);
pstmt.setString(1, affiliate.getAffiliation().name());
pstmt.setString(2, service.getServiceID());
pstmt.setString(3, node.getNodeID());
pstmt.setString(4, affiliate.getJID().toString());
pstmt.executeUpdate();
}
} }
} }
catch (SQLException sqle) { catch (SQLException sqle) {
......
...@@ -173,15 +173,17 @@ public interface PubSubService { ...@@ -173,15 +173,17 @@ public interface PubSubService {
DefaultNodeConfiguration getDefaultNodeConfiguration(boolean leafType); DefaultNodeConfiguration getDefaultNodeConfiguration(boolean leafType);
/** /**
* Returns the show value of the last know presence of the specified subscriber. If the user * Returns the show values of the last know presence of all connected resources of the
* is offline then a <tt>null</tt> value is returned. Available show status is represented * specified subscriber. When the subscriber JID is a bare JID then the answered collection
* will have many entries one for each connected resource. Moreover, if the user
* is offline then an empty collectin is returned. Available show status is represented
* by a <tt>online</tt> value. The rest of the possible show values as defined in RFC 3921. * by a <tt>online</tt> value. The rest of the possible show values as defined in RFC 3921.
* *
* @param subscriber the JID of the subscriber. This is not the JID of the affiliate. * @param subscriber the JID of the subscriber. This is not the JID of the affiliate.
* @return null when offline, online when user if available or show values as defined * @return an empty collection when offline. Otherwise, a collection with the show value
* in RFC 3921. * of each connected resource.
*/ */
String getShowPresence(JID subscriber); Collection<String> getShowPresences(JID subscriber);
/** /**
* Returns the pubsub engine responsible for handling packets sent to the pub-sub service. * Returns the pubsub engine responsible for handling packets sent to the pub-sub service.
...@@ -191,4 +193,23 @@ public interface PubSubService { ...@@ -191,4 +193,23 @@ public interface PubSubService {
* @return the pubsub engine responsible for handling packets sent to the pub-sub service. * @return the pubsub engine responsible for handling packets sent to the pub-sub service.
*/ */
PubSubEngine getPubSubEngine(); PubSubEngine getPubSubEngine();
/**
* Requests the pubsub service to subscribe to the presence of the user. If the service
* has already subscribed to the user's presence then do nothing.
*
* @param node the node that originated the subscription request.
* @param user the JID of the affiliate to subscribe to his presence.
*/
void presenceSubscriptionRequired(Node node, JID user);
/**
* Requests the pubsub service to unsubscribe from the presence of the user. If the service
* was not subscribed to the user's presence or any node still requires to be subscribed to
* the user presence then do nothing.
*
* @param node the node that originated the unsubscription request.
* @param user the JID of the affiliate to unsubscribe from his presence.
*/
void presenceSubscriptionNotRequired(Node node, JID user);
} }
...@@ -35,6 +35,10 @@ public class AuthorizeAccess extends AccessModel { ...@@ -35,6 +35,10 @@ public class AuthorizeAccess extends AccessModel {
} }
public boolean canAccessItems(Node node, JID owner, JID subscriber) { public boolean canAccessItems(Node node, JID owner, JID subscriber) {
// Let node owners and sysadmins always get node items
if (node.isAdmin(owner)) {
return true;
}
NodeAffiliate nodeAffiliate = node.getAffiliate(owner); NodeAffiliate nodeAffiliate = node.getAffiliate(owner);
if (nodeAffiliate == null) { if (nodeAffiliate == null) {
// This is an unknown entity to the node so deny access // This is an unknown entity to the node so deny access
......
...@@ -34,8 +34,16 @@ public class PresenceAccess extends AccessModel { ...@@ -34,8 +34,16 @@ public class PresenceAccess extends AccessModel {
} }
public boolean canSubscribe(Node node, JID owner, JID subscriber) { public boolean canSubscribe(Node node, JID owner, JID subscriber) {
// Let node owners and sysadmins always subcribe to the node
if (node.isAdmin(owner)) {
return true;
}
// Get the only owner of the node // Get the only owner of the node
JID nodeOwner = node.getOwners().iterator().next(); JID nodeOwner = node.getOwners().iterator().next();
// Give access to the owner of the roster :)
if (nodeOwner.toBareJID().equals(owner.toBareJID())) {
return true;
}
// Get the roster of the node owner // Get the roster of the node owner
XMPPServer server = XMPPServer.getInstance(); XMPPServer server = XMPPServer.getInstance();
// Check that the node owner is a local user // Check that the node owner is a local user
......
...@@ -38,8 +38,16 @@ public class RosterAccess extends AccessModel { ...@@ -38,8 +38,16 @@ public class RosterAccess extends AccessModel {
} }
public boolean canSubscribe(Node node, JID owner, JID subscriber) { public boolean canSubscribe(Node node, JID owner, JID subscriber) {
// Let node owners and sysadmins always subcribe to the node
if (node.isAdmin(owner)) {
return true;
}
// Get the only owner of the node // Get the only owner of the node
JID nodeOwner = node.getOwners().iterator().next(); JID nodeOwner = node.getOwners().iterator().next();
// Give access to the owner of the roster :)
if (nodeOwner.toBareJID().equals(owner.toBareJID())) {
return true;
}
// Get the roster of the node owner // Get the roster of the node owner
XMPPServer server = XMPPServer.getInstance(); XMPPServer server = XMPPServer.getInstance();
// Check that the node owner is a local user // Check that the node owner is a local user
...@@ -64,6 +72,7 @@ public class RosterAccess extends AccessModel { ...@@ -64,6 +72,7 @@ public class RosterAccess extends AccessModel {
} }
} }
catch (UserNotFoundException e) { catch (UserNotFoundException e) {
// Do nothing
} }
} }
else { else {
......
...@@ -30,6 +30,10 @@ public class WhitelistAccess extends AccessModel { ...@@ -30,6 +30,10 @@ public class WhitelistAccess extends AccessModel {
} }
public boolean canSubscribe(Node node, JID owner, JID subscriber) { public boolean canSubscribe(Node node, JID owner, JID subscriber) {
// Let node owners and sysadmins always subcribe to the node
if (node.isAdmin(owner)) {
return true;
}
// User is in the whitelist if he has an affiliation and it is not of type outcast // User is in the whitelist if he has an affiliation and it is not of type outcast
NodeAffiliate nodeAffiliate = node.getAffiliate(owner); NodeAffiliate nodeAffiliate = node.getAffiliate(owner);
return nodeAffiliate != null && return nodeAffiliate != null &&
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment