Commit 32867543 authored by Matt Tucker's avatar Matt Tucker Committed by matt

Additional pubsub work.

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3604 b35dd754-fafc-0310-a699-88a17e54d16e
parent 31acfb6b
......@@ -128,6 +128,7 @@ public class CollectionNode extends Node {
}
void removeChildNode(Node child) {
// TODO Send notification to subscribers?
nodes.remove(child.getNodeID());
}
......
......@@ -70,7 +70,7 @@ public class DefaultNodeConfiguration {
/**
* Flag that indicates whether to send items to new subscribers.
*/
private boolean sendItemSubscribe;
private boolean sendItemSubscribe = false;
/**
* Publisher model that specifies who is allowed to publish items to the node.
*/
......
......@@ -18,7 +18,6 @@ import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.IQ;
import java.util.*;
......@@ -47,6 +46,10 @@ public class LeafNode extends Node {
* The maximum payload size in bytes.
*/
private int maxPayloadSize;
/**
* Flag that indicates whether to send items to new subscribers.
*/
private boolean sendItemSubscribe;
/**
* List of items that were published to the node and that are still active. If the node is
* not configured to persist items then the last published item will be kept. The list is
......@@ -64,6 +67,7 @@ public class LeafNode extends Node {
this.persistPublishedItems = defaultConfiguration.isPersistPublishedItems();
this.maxPublishedItems = defaultConfiguration.getMaxPublishedItems();
this.maxPayloadSize = defaultConfiguration.getMaxPayloadSize();
this.sendItemSubscribe = defaultConfiguration.isSendItemSubscribe();
}
void configure(FormField field) {
......@@ -78,11 +82,15 @@ public class LeafNode extends Node {
values = field.getValues();
maxPayloadSize = values.size() > 0 ? Integer.parseInt(values.get(0)) : 5120;
}
else if ("pubsub#send_item_subscribe".equals(field.getVariable())) {
values = field.getValues();
booleanValue = (values.size() > 0 ? values.get(0) : "1");
sendItemSubscribe = "1".equals(booleanValue);
}
}
void postConfigure(DataForm completedForm) {
List<String> values;
// TODO Remove stored published items based on the new max items
if (!persistPublishedItems) {
// Always save the last published item when not configured to use persistent items
maxPublishedItems = 1;
......@@ -94,12 +102,30 @@ public class LeafNode extends Node {
maxPublishedItems = values.size() > 0 ? Integer.parseInt(values.get(0)) : 50;
}
}
// Remove stored published items based on the new max items
while (!publishedItems.isEmpty() && maxPublishedItems > publishedItems.size()) {
PublishedItem removedItem = publishedItems.remove(0);
itemsByID.remove(removedItem.getID());
// Add the removed item to the queue of items to delete from the database. The
// queue is going to be processed by another thread
service.getPubSubEngine().queueItemToRemove(removedItem);
}
}
protected void addFormFields(DataForm form, boolean isEditing) {
super.addFormFields(form, isEditing);
FormField formField = form.addField();
formField.setVariable("pubsub#send_item_subscribe");
if (isEditing) {
formField.setType(FormField.Type.boolean_type);
formField.setLabel(
LocaleUtils.getLocalizedString("pubsub.form.conf.send_item_subscribe"));
}
formField.addValue(sendItemSubscribe);
formField = form.addField();
formField.setVariable("pubsub#persist_items");
if (isEditing) {
formField.setType(FormField.Type.boolean_type);
......@@ -157,14 +183,17 @@ public class LeafNode extends Node {
* item to this node.
*/
public boolean isItemRequired() {
return isPersistPublishedItems() || isDeliverPayloads();
return isPersistPublishedItems() || isPayloadDelivered();
}
/**
* Sends event notifications to subscribers for the new published event. The published
* event may or may not include an item. When the node is not persistent and does not
* require payloads then an item is not going to be created nore included in
* the event notification.<p>
* Publishes the list of items to the node. Event notifications will be sent to subscribers
* for the new published event. The published event may or may not include an item. When the
* node is not persistent and does not require payloads then an item is not going to be created
* nore included in the event notification.<p>
*
* When an affiliate has many subscriptions to the node, the affiliate will get a
* notification for each set of items that affected the same list of subscriptions.<p>
*
* When an item is included in the published event then a new {@link PublishedItem} is
* going to be created and added to the list of published item. Each published item will
......@@ -174,58 +203,57 @@ public class LeafNode extends Node {
*
* For performance reasons the newly added published items and the deleted items (if any)
* are saved to the database using a background thread. Sending event notifications to
* node subscribers may also use another thread to ensure good performance.
* node subscribers may also use another thread to ensure good performance.<p>
*
* @param originalIQ the IQ packet used by the publisher to publish the item.
* @param itemID the ID of the item or null if none was published.
* @param payload the payload of the new published item or null if none was published.
* @param publisher the full JID of the user that sent the new published event.
* @param itemElements list of dom4j elements that contain info about the published items.
*/
public void sendEventNotification(IQ originalIQ, String itemID, Element payload) {
PublishedItem newItem = null;
public void publishItems(JID publisher, List<Element> itemElements) {
List<PublishedItem> newPublishedItems = new ArrayList<PublishedItem>();
if (isItemRequired()) {
// Create a published item from the published data and add it to the node (and the db)
synchronized (publishedItems) {
// Make sure that the published item has an ID and that it's unique in the node
if (itemID == null) {
itemID = StringUtils.randomString(15);
}
while (itemsByID.get(itemID) != null) {
itemID = StringUtils.randomString(15);
}
// Create a new published item
newItem = new PublishedItem(this, originalIQ.getFrom(), itemID, new Date());
newItem.setPayload(payload);
// Add the published item to the list of items to persist (using another thread)
while (!publishedItems.isEmpty() && maxPublishedItems >= publishedItems.size()) {
PublishedItem removedItem = publishedItems.remove(0);
itemsByID.remove(removedItem.getID());
// TODO Add removed item to the queue of items to delete from the database
String itemID;
Element payload;
PublishedItem newItem = null;
for (Element item : itemElements) {
itemID = item.attributeValue("id");
List entries = item.elements();
payload = entries.isEmpty() ? null : (Element) entries.get(0);
// Create a published item from the published data and add it to the node and the db
synchronized (publishedItems) {
// Make sure that the published item has an ID and that it's unique in the node
if (itemID == null) {
itemID = StringUtils.randomString(15);
}
while (itemsByID.get(itemID) != null) {
itemID = StringUtils.randomString(15);
}
// Create a new published item
newItem = new PublishedItem(this, publisher, itemID, new Date());
newItem.setPayload(payload);
// Add the new item to the list of published items
newPublishedItems.add(newItem);
// Add the published item to the list of items to persist (using another thread)
// but check that we don't exceed the limit. Remove oldest items if required.
while (!publishedItems.isEmpty() && maxPublishedItems >= publishedItems.size()) {
PublishedItem removedItem = publishedItems.remove(0);
itemsByID.remove(removedItem.getID());
// Add the removed item to the queue of items to delete from the database. The
// queue is going to be processed by another thread
service.getPubSubEngine().queueItemToRemove(removedItem);
}
addPublishedItem(newItem);
// Add the new published item to the queue of items to add to the database. The
// queue is going to be processed by another thread
service.getPubSubEngine().queueItemToAdd(newItem);
}
addPublishedItem(newItem);
// TODO Add new published item to the queue of items to add to the database
}
}
// Return success operation
service.send(IQ.createResultIQ(originalIQ));
// Build event notification packet to broadcast to subscribers
Message message = new Message();
Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event");
Element items = event.addElement("items");
items.addAttribute("node", nodeID);
if (newItem != null) {
// Add item information to the event notification if an item was published
Element item = items.addElement("item");
if (isItemRequired()) {
item.addAttribute("id", newItem.getID());
}
if (deliverPayloads) {
item.add(newItem.getPayload().createCopy());
}
}
// Broadcast event notification to subscribers and parent node subscribers
Set<NodeAffiliate> affiliatesToNotify = new HashSet<NodeAffiliate>(affiliates);
// Get affiliates that are subscribed to a parent in the hierarchy of parent nodes
......@@ -236,7 +264,54 @@ public class LeafNode extends Node {
}
// TODO Use another thread for this (if # of subscribers is > X)????
for (NodeAffiliate affiliate : affiliatesToNotify) {
affiliate.sendEventNotification(message, this, newItem);
affiliate.sendPublishedNotifications(message, event, this, newPublishedItems);
}
}
/**
* Deletes the list of published items from the node. Event notifications may be sent to
* subscribers for the deleted items. When an affiliate has many subscriptions to the node,
* the affiliate will get a notification for each set of items that affected the same list
* of subscriptions.<p>
*
* For performance reasons the deleted published items are saved to the database
* using a background thread. Sending event notifications to node subscribers may
* also use another thread to ensure good performance.<p>
*
* @param toDelete list of items that were deleted from the node.
*/
public void deleteItems(List<PublishedItem> toDelete) {
synchronized (publishedItems) {
for (PublishedItem item : toDelete) {
// Remove items to delete from memory
publishedItems.remove(item);
// Update fast look up cache of published items
itemsByID.remove(item.getID());
}
}
// Remove deleted items from the database
for (PublishedItem item : toDelete) {
service.getPubSubEngine().queueItemToRemove(item);
}
if (isNotifiedOfRetract()) {
// Broadcast notification deletion to subscribers
// Build packet to broadcast to subscribers
Message message = new Message();
Element event =
message.addChildElement("event", "http://jabber.org/protocol/pubsub#event");
// Send notification that items have been deleted to subscribers and parent node
// subscribers
Set<NodeAffiliate> affiliatesToNotify = new HashSet<NodeAffiliate>(affiliates);
// Get affiliates that are subscribed to a parent in the hierarchy of parent nodes
for (CollectionNode parentNode : getParents()) {
for (NodeSubscription subscription : parentNode.getSubscriptions()) {
affiliatesToNotify.add(subscription.getAffiliate());
}
}
// TODO Use another thread for this (if # of subscribers is > X)????
for (NodeAffiliate affiliate : affiliatesToNotify) {
affiliate.sendDeletionNotifications(message, event, this, toDelete);
}
}
}
......@@ -279,6 +354,15 @@ public class LeafNode extends Node {
}
}
/**
* Returns true if the last published item is going to be sent to new subscribers.
*
* @return true if the last published item is going to be sent to new subscribers.
*/
public boolean isSendItemSubscribe() {
return sendItemSubscribe;
}
void setMaxPayloadSize(int maxPayloadSize) {
this.maxPayloadSize = maxPayloadSize;
}
......@@ -291,6 +375,10 @@ public class LeafNode extends Node {
this.maxPublishedItems = maxPublishedItems;
}
void setSendItemSubscribe(boolean sendItemSubscribe) {
this.sendItemSubscribe = sendItemSubscribe;
}
/**
* Purges items that were published to the node. Only owners can request this operation.
* This operation is only available for nodes configured to store items in the database. All
......@@ -313,7 +401,7 @@ public class LeafNode extends Node {
if (toDelete != null) {
// Delete purged items from the database
for (PublishedItem item : toDelete) {
PubSubPersistenceManager.removePublishedItem(service, this, item);
service.getPubSubEngine().queueItemToRemove(item);
}
// Broadcast purge notification to subscribers
// Build packet to broadcast to subscribers
......@@ -325,5 +413,4 @@ public class LeafNode extends Node {
broadcastSubscribers(message, false);
}
}
}
......@@ -71,10 +71,6 @@ public abstract class Node {
* Flag that indicates whether to deliver notifications to available users only.
*/
protected boolean presenceBasedDelivery;
/**
* Flag that indicates whether to send items to new subscribers.
*/
protected boolean sendItemSubscribe;
/**
* Publisher model that specifies who is allowed to publish items to the node.
*/
......@@ -163,6 +159,15 @@ public abstract class Node {
*/
protected Map<String, NodeSubscription> subscriptionsByID =
new ConcurrentHashMap<String, NodeSubscription>();
/**
* Map that contains the current subscriptions to the node. This map should be used only
* when node is not configured to allow multiple subscriptions. When multiple subscriptions
* is not allowed the subscriptions can be searched by the subscriber JID. Otherwise searches
* should be done using the subscription ID.
* Key: Subscriber full JID, Value: the subscription.
*/
protected Map<String, NodeSubscription> subscriptionsByJID =
new ConcurrentHashMap<String, NodeSubscription>();
Node(PubSubService service, CollectionNode parent, String nodeID, JID creator) {
this.service = service;
......@@ -177,7 +182,6 @@ public abstract class Node {
service.getDefaultNodeConfiguration(!isCollectionNode());
this.subscriptionEnabled = defaultConfiguration.isSubscriptionEnabled();
this.deliverPayloads = defaultConfiguration.isDeliverPayloads();
this.sendItemSubscribe = defaultConfiguration.isSendItemSubscribe();
this.notifyConfigChanges = defaultConfiguration.isNotifyConfigChanges();
this.notifyDelete = defaultConfiguration.isNotifyDelete();
this.notifyRetract = defaultConfiguration.isNotifyRetract();
......@@ -341,14 +345,14 @@ public abstract class Node {
return subscription;
}
/**
* Removes all subscriptions owned by the specified entity.
*
* @param owner the owner of the subscriptions to be cancelled.
*/
private void removeSubscriptions(JID owner) {
for (NodeSubscription subscription : getSubscriptions(owner)) {
// Remove the existing subscription from the list in memory
subscriptionsByID.remove(subscription.getID());
if (savedToDB) {
// Remove the subscription from the database
PubSubPersistenceManager.removeSubscription(service, this, subscription);
}
cancelSubscription(subscription);
}
}
......@@ -359,8 +363,9 @@ public abstract class Node {
* notifications in different resources (or even JIDs).
*
* @param owner the owner of the subscriptions.
* @return the list of subscriptions owned by the specified user.
*/
Collection<NodeSubscription> getSubscriptions(JID owner) {
public Collection<NodeSubscription> getSubscriptions(JID owner) {
Collection<NodeSubscription> subscriptions = new ArrayList<NodeSubscription>();
for (NodeSubscription subscription : subscriptionsByID.values()) {
if (owner.equals(subscription.getOwner())) {
......@@ -396,6 +401,14 @@ public abstract class Node {
return null;
}
/**
* Returns a collection with the JID of the node owners. Entities that are node owners have
* an affiliation of {@link NodeAffiliate.Affiliation#owner}. Owners are allowed to purge
* and delete the node. Moreover, owners may also get The collection can be modified
* since it represents a snapshot.
*
* @return a collection with the JID of the node owners.
*/
public Collection<JID> getOwners() {
Collection<JID> jids = new ArrayList<JID>();
for (NodeAffiliate affiliate : affiliates) {
......@@ -406,6 +419,15 @@ public abstract class Node {
return jids;
}
/**
* Returns a collection with the JID of the enitities with an affiliation of
* {@link NodeAffiliate.Affiliation#publisher}. When using the publisher model
* {@link org.jivesoftware.wildfire.pubsub.models.OpenPublisher} anyone may publish
* to the node so this collection may be empty or may not contain the complete list
* of publishers. The returned collection can be modified since it represents a snapshot.
*
* @return a collection with the JID of the enitities with an affiliation of publishers.
*/
public Collection<JID> getPublishers() {
Collection<JID> jids = new ArrayList<JID>();
for (NodeAffiliate affiliate : affiliates) {
......@@ -416,6 +438,15 @@ public abstract class Node {
return jids;
}
/**
* Changes the node configuration based on the completed data form. Only owners or
* sysadmins are allowed to change the node configuration. The completed data form
* cannot remove all node owners. An exception is going to be thrown if the new form
* tries to leave the node without owners.
*
* @param completedForm the completed data form.
* @throws NotAcceptableException if completed data form tries to leave the node without owners.
*/
public void configure(DataForm completedForm) throws NotAcceptableException {
if (DataForm.Type.cancel.equals(completedForm.getType())) {
// Existing node configuration is applied (i.e. nothing is changed)
......@@ -444,8 +475,7 @@ public abstract class Node {
for (FormField field : completedForm.getFields()) {
if ("FORM_TYPE".equals(field.getVariable())) {
// Ignore this variable
continue;
// Do nothing
}
else if ("pubsub#deliver_payloads".equals(field.getVariable())) {
values = field.getValues();
......@@ -472,11 +502,6 @@ public abstract class Node {
booleanValue = (values.size() > 0 ? values.get(0) : "1");
presenceBasedDelivery = "1".equals(booleanValue);
}
else if ("pubsub#send_item_subscribe".equals(field.getVariable())) {
values = field.getValues();
booleanValue = (values.size() > 0 ? values.get(0) : "1");
sendItemSubscribe = "1".equals(booleanValue);
}
else if ("pubsub#subscribe".equals(field.getVariable())) {
values = field.getValues();
booleanValue = (values.size() > 0 ? values.get(0) : "1");
......@@ -577,13 +602,13 @@ public abstract class Node {
if (ownersSent) {
// Calculate owners to remove and remove them from the DB
Collection<JID> oldOwners = getOwners();
oldOwners.remove(owners);
oldOwners.removeAll(owners);
for (JID jid : oldOwners) {
removeOwner(jid);
}
// Calculate new owners and add them to the DB
owners.remove(getOwners());
owners.removeAll(getOwners());
for (JID jid : owners) {
addOwner(jid);
}
......@@ -602,13 +627,13 @@ public abstract class Node {
}
// Calculate publishers to remove and remove them from the DB
Collection<JID> oldPublishers = getPublishers();
oldPublishers.remove(publishers);
oldPublishers.removeAll(publishers);
for (JID jid : oldPublishers) {
removePublisher(jid);
}
// Calculate new publishers and add them to the DB
publishers.remove(getPublishers());
publishers.removeAll(getPublishers());
for (JID jid : publishers) {
addPublisher(jid);
}
......@@ -646,8 +671,13 @@ public abstract class Node {
*/
abstract void postConfigure(DataForm completedForm);
/**
* The node configuration has changed. If this is the first time the node is configured
* after it was created (i.e. is not yet persistent) then do nothing. Otherwise, send
* a notification to the node subscribers informing that the configuration has changed.
*/
private void nodeConfigurationChanged() {
if (!notifyConfigChanges || !savedToDB) {
if (!isNotifiedOfConfigChanges() || !savedToDB) {
// Do nothing if node was just created and configure or if notification
// of config changes is disabled
return;
......@@ -683,6 +713,15 @@ public abstract class Node {
return form;
}
/**
* Adds the required form fields to the specified form. When editing is true the field type
* and a label is included in each fields. The form being completed will contain the current
* node configuration. This information can be used for editing the node or for notifing that
* the node configuration has changed.
*
* @param form the form containing the node configuration.
* @param isEditing true when the form will be used to edit the node configuration.
*/
protected void addFormFields(DataForm form, boolean isEditing) {
FormField formField = form.addField();
formField.setVariable("FORM_TYPE");
......@@ -730,15 +769,6 @@ public abstract class Node {
}
formField.addValue(deliverPayloads);
formField = form.addField();
formField.setVariable("pubsub#send_item_subscribe");
if (isEditing) {
formField.setType(FormField.Type.boolean_type);
formField.setLabel(
LocaleUtils.getLocalizedString("pubsub.form.conf.send_item_subscribe"));
}
formField.addValue(sendItemSubscribe);
formField = form.addField();
formField.setVariable("pubsub#notify_config");
if (isEditing) {
......@@ -914,6 +944,11 @@ public abstract class Node {
return form;
}
/**
* Returns true if this node is the root node of the pubsub service.
*
* @return true if this node is the root node of the pubsub service.
*/
public boolean isRootCollectionNode() {
return service.getRootCollectionNode() == this;
}
......@@ -930,6 +965,12 @@ public abstract class Node {
return true;
}
/**
* Returns true if this node is a node container. Node containers may only contain nodes
* but are not allowed to get items published.
*
* @return true if this node is a node container.
*/
public boolean isCollectionNode() {
return false;
}
......@@ -978,15 +1019,34 @@ public abstract class Node {
return false;
}
/**
* Returns the unique identifier for a node within the context of a pubsub service.
*
* @return the unique identifier for a node within the context of a pubsub service.
*/
public String getNodeID() {
return nodeID;
}
/**
* Returns the name of the node. The node may not have a configured name. The node's
* name can be changed by submiting a completed data form.
*
* @return the name of the node.
*/
public String getName() {
return name;
}
public boolean isDeliverPayloads() {
/**
* Returns true if event notifications will include payloads. Payloads are included when
* publishing new items. However, new items may not always include a payload depending
* on the node configuration. Nodes can be configured to not deliver payloads for performance
* reasons.
*
* @return true if event notifications will include payloads.
*/
public boolean isPayloadDelivered() {
return deliverPayloads;
}
......@@ -994,42 +1054,97 @@ public abstract class Node {
return replyPolicy;
}
public boolean isNotifyConfigChanges() {
/**
* Returns true if subscribers will be notified when the node configuration changes.
*
* @return true if subscribers will be notified when the node configuration changes.
*/
public boolean isNotifiedOfConfigChanges() {
return notifyConfigChanges;
}
public boolean isNotifyDelete() {
/**
* Returns true if subscribers will be notified when the node is deleted.
*
* @return true if subscribers will be notified when the node is deleted.
*/
public boolean isNotifiedOfDelete() {
return notifyDelete;
}
public boolean isNotifyRetract() {
/**
* Returns true if subscribers will be notified when items are removed from the node.
*
* @return true if subscribers will be notified when items are removed from the node.
*/
public boolean isNotifiedOfRetract() {
return notifyRetract;
}
/**
* Returns true if notifications are going to be delivered to available users only.
*
* @return true if notifications are going to be delivered to available users only.
*/
public boolean isPresenceBasedDelivery() {
// TODO Use this variable somewhere :)
return presenceBasedDelivery;
}
/**
* Returns true if the last published item is going to be sent to new subscribers.
*
* @return true if the last published item is going to be sent to new subscribers.
*/
public boolean isSendItemSubscribe() {
return sendItemSubscribe;
return false;
}
/**
* Returns the publisher model that specifies who is allowed to publish items to the node.
*
* @return the publisher model that specifies who is allowed to publish items to the node.
*/
public PublisherModel getPublisherModel() {
return publisherModel;
}
/**
* Returns true if users are allowed to subscribe and unsubscribe.
*
* @return true if users are allowed to subscribe and unsubscribe.
*/
public boolean isSubscriptionEnabled() {
return subscriptionEnabled;
}
/**
* Returns true if new subscriptions should be configured to be active. Inactive
* subscriptions will not get event notifications. However, subscribers will be
* notified when a node is deleted no matter the subscription status.
*
* @return true if new subscriptions should be configured to be active.
*/
public boolean isSubscriptionConfigurationRequired() {
return subscriptionConfigurationRequired;
}
/**
* Returns the access model that specifies who is allowed to subscribe and retrieve items.
*
* @return the access model that specifies who is allowed to subscribe and retrieve items.
*/
public AccessModel getAccessModel() {
return accessModel;
}
/**
* Returns the roster group(s) allowed to subscribe and retrieve items. This information
* is going to be used only when using the
* {@link org.jivesoftware.wildfire.pubsub.models.RosterAccess} access model.
*
* @return the roster group(s) allowed to subscribe and retrieve items.
*/
public Collection<String> getRosterGroupsAllowed() {
return rosterGroupsAllowed;
}
......@@ -1042,42 +1157,102 @@ public abstract class Node {
return replyTo;
}
/**
* Returns the type of payload data to be provided at the node. Usually specified by the
* namespace of the payload (if any).
*
* @return the type of payload data to be provided at the node.
*/
public String getPayloadType() {
return payloadType;
}
/**
* Returns the URL of an XSL transformation which can be applied to payloads in order
* to generate an appropriate message body element.
*
* @return the URL of an XSL transformation which can be applied to payloads.
*/
public String getBodyXSLT() {
return bodyXSLT;
}
/**
* Returns the URL of an XSL transformation which can be applied to the payload format
* in order to generate a valid Data Forms result that the client could display
* using a generic Data Forms rendering engine.
*
* @return the URL of an XSL transformation which can be applied to the payload format.
*/
public String getDataformXSLT() {
return dataformXSLT;
}
/**
* Returns the datetime when the node was created.
*
* @return the datetime when the node was created.
*/
public Date getCreationDate() {
return creationDate;
}
/**
* Returns the last date when the ndoe's configuration was modified.
*
* @return the last date when the ndoe's configuration was modified.
*/
public Date getModificationDate() {
return modificationDate;
}
/**
* Returns the JID of the node creator. This is usually the sender's full JID of the
* IQ packet used for creating the node.
*
* @return the JID of the node creator.
*/
public JID getCreator() {
return creator;
}
/**
* Returns the description of the node. This information is really optional and can be
* modified by submiting a completed data form with the new node configuration.
*
* @return the description of the node.
*/
public String getDescription() {
return description;
}
/**
* Returns the default language of the node. This information is really optional and can be
* modified by submiting a completed data form with the new node configuration.
*
* @return the default language of the node.
*/
public String getLanguage() {
return language;
}
/**
* Returns the JIDs of those to contact with questions. This information is not used by
* the pubsub service. It is meant to be "discovered" by users and redirect any question
* to the returned people to contact.
*
* @return the JIDs of those to contact with questions.
*/
public Collection<JID> getContacts() {
return contacts;
}
/**
* Returns the list of nodes contained by this node. Only {@link CollectionNode} may
* contain other nodes.
*
* @return the list of nodes contained by this node.
*/
public Collection<Node> getNodes() {
return Collections.emptyList();
}
......@@ -1107,7 +1282,15 @@ public abstract class Node {
return parents;
}
void setDeliverPayloads(boolean deliverPayloads) {
/**
* Sets whether event notifications will include payloads. Payloads are included when
* publishing new items. However, new items may not always include a payload depending
* on the node configuration. Nodes can be configured to not deliver payloads for performance
* reasons.
*
* @param deliverPayloads true if event notifications will include payloads.
*/
void setPayloadDelivered(boolean deliverPayloads) {
this.deliverPayloads = deliverPayloads;
}
......@@ -1115,15 +1298,32 @@ public abstract class Node {
this.replyPolicy = replyPolicy;
}
void setNotifyConfigChanges(boolean notifyConfigChanges) {
/**
* Sets whether subscribers will be notified when the node configuration changes.
*
* @param notifyConfigChanges true if subscribers will be notified when the node
* configuration changes.
*/
void setNotifiedOfConfigChanges(boolean notifyConfigChanges) {
this.notifyConfigChanges = notifyConfigChanges;
}
void setNotifyDelete(boolean notifyDelete) {
/**
* Sets whether subscribers will be notified when the node is deleted.
*
* @param notifyDelete true if subscribers will be notified when the node is deleted.
*/
void setNotifiedOfDelete(boolean notifyDelete) {
this.notifyDelete = notifyDelete;
}
void setNotifyRetract(boolean notifyRetract) {
/**
* Sets whether subscribers will be notified when items are removed from the node.
*
* @param notifyRetract true if subscribers will be notified when items are removed from
* the node.
*/
void setNotifiedOfRetract(boolean notifyRetract) {
this.notifyRetract = notifyRetract;
}
......@@ -1131,26 +1331,58 @@ public abstract class Node {
this.presenceBasedDelivery = presenceBasedDelivery;
}
void setSendItemSubscribe(boolean sendItemSubscribe) {
this.sendItemSubscribe = sendItemSubscribe;
}
/**
* Sets the publisher model that specifies who is allowed to publish items to the node.
*
* @param publisherModel the publisher model that specifies who is allowed to publish items
* to the node.
*/
void setPublisherModel(PublisherModel publisherModel) {
this.publisherModel = publisherModel;
}
/**
* Sets whether users are allowed to subscribe and unsubscribe.
*
* @param subscriptionEnabled true if users are allowed to subscribe and unsubscribe.
*/
void setSubscriptionEnabled(boolean subscriptionEnabled) {
this.subscriptionEnabled = subscriptionEnabled;
}
/**
* Sets whether new subscriptions should be configured to be active. Inactive
* subscriptions will not get event notifications. However, subscribers will be
* notified when a node is deleted no matter the subscription status.
*
* @param subscriptionConfigurationRequired true if new subscriptions should be
* configured to be active.
*/
void setSubscriptionConfigurationRequired(boolean subscriptionConfigurationRequired) {
this.subscriptionConfigurationRequired = subscriptionConfigurationRequired;
}
/**
* Sets the access model that specifies who is allowed to subscribe and retrieve items.
*
* @param accessModel the access model that specifies who is allowed to subscribe and
* retrieve items.
*/
void setAccessModel(AccessModel accessModel) {
this.accessModel = accessModel;
}
/**
* Sets the roster group(s) allowed to subscribe and retrieve items. This information
* is going to be used only when using the
* {@link org.jivesoftware.wildfire.pubsub.models.RosterAccess} access model.
*
* @param rosterGroupsAllowed the roster group(s) allowed to subscribe and retrieve items.
*/
void setRosterGroupsAllowed(Collection<String> rosterGroupsAllowed) {
this.rosterGroupsAllowed = rosterGroupsAllowed;
}
void setReplyRooms(Collection<JID> replyRooms) {
this.replyRooms = replyRooms;
}
......@@ -1159,14 +1391,34 @@ public abstract class Node {
this.replyTo = replyTo;
}
/**
* Sets the type of payload data to be provided at the node. Usually specified by the
* namespace of the payload (if any).
*
* @param payloadType the type of payload data to be provided at the node.
*/
void setPayloadType(String payloadType) {
this.payloadType = payloadType;
}
/**
* Sets the URL of an XSL transformation which can be applied to payloads in order
* to generate an appropriate message body element.
*
* @param bodyXSLT the URL of an XSL transformation which can be applied to payloads.
*/
void setBodyXSLT(String bodyXSLT) {
this.bodyXSLT = bodyXSLT;
}
/**
* Sets the URL of an XSL transformation which can be applied to the payload format
* in order to generate a valid Data Forms result that the client could display
* using a generic Data Forms rendering engine.
*
* @param dataformXSLT the URL of an XSL transformation which can be applied to the
* payload format.
*/
void setDataformXSLT(String dataformXSLT) {
this.dataformXSLT = dataformXSLT;
}
......@@ -1179,34 +1431,68 @@ public abstract class Node {
}
}
/**
* Sets the datetime when the node was created.
*
* @param creationDate the datetime when the node was created.
*/
void setCreationDate(Date creationDate) {
this.creationDate = creationDate;
}
/**
* Sets the last date when the ndoe's configuration was modified.
*
* @param modificationDate the last date when the ndoe's configuration was modified.
*/
void setModificationDate(Date modificationDate) {
this.modificationDate = modificationDate;
}
/**
* Sets the description of the node. This information is really optional and can be
* modified by submiting a completed data form with the new node configuration.
*
* @param description the description of the node.
*/
void setDescription(String description) {
this.description = description;
}
/**
* Sets the default language of the node. This information is really optional and can be
* modified by submiting a completed data form with the new node configuration.
*
* @param language the default language of the node.
*/
void setLanguage(String language) {
this.language = language;
}
/**
* Sets the name of the node. The node may not have a configured name. The node's
* name can be changed by submiting a completed data form.
*
* @param name the name of the node.
*/
void setName(String name) {
this.name = name;
}
void setRosterGroupsAllowed(Collection<String> rosterGroupsAllowed) {
this.rosterGroupsAllowed = rosterGroupsAllowed;
}
/**
* Sets the JIDs of those to contact with questions. This information is not used by
* the pubsub service. It is meant to be "discovered" by users and redirect any question
* to the returned people to contact.
*
* @param contacts the JIDs of those to contact with questions.
*/
void setContacts(Collection<JID> contacts) {
this.contacts = contacts;
}
/**
* Saves the node configuration to the backend store.
*/
public void saveToDB() {
// Make the room persistent
if (!savedToDB) {
......@@ -1221,6 +1507,8 @@ public abstract class Node {
for (NodeSubscription subscription : subscriptionsByID.values()) {
PubSubPersistenceManager.saveSubscription(service, this, subscription, true);
}
// Add the new node to the list of available nodes
service.addNode(this);
}
else {
PubSubPersistenceManager.updateNode(service, this);
......@@ -1233,6 +1521,7 @@ public abstract class Node {
void addSubscription(NodeSubscription subscription) {
subscriptionsByID.put(subscription.getID(), subscription);
subscriptionsByJID.put(subscription.getJID().toString(), subscription);
}
/**
......@@ -1242,20 +1531,19 @@ public abstract class Node {
* If the node allows multiple subscriptions and this message is sent then an
* IllegalStateException exception is going to be thrown.
*
* @param subscriptionJID the JID of the entity that receives event notifications.
* @param subscriberJID the JID of the entity that receives event notifications.
* @return the subscription whose subscription JID matches the specified JID or <tt>null</tt>
* if none was found.
* @throws IllegalStateException If this message was used when the node supports multiple
* subscriptions.
*/
NodeSubscription getSubscription(JID subscriptionJID) {
NodeSubscription getSubscription(JID subscriberJID) {
// Check that node does not support multiple subscriptions
if (isMultipleSubscriptionsEnabled()) {
throw new IllegalStateException(
"Multiple subscriptions is enabled so subscriptions should be retrieved using subID.");
throw new IllegalStateException("Multiple subscriptions is enabled so subscriptions " +
"should be retrieved using subID.");
}
// TODO implement this
return null;
return subscriptionsByJID.get(subscriberJID.toString());
}
/**
......@@ -1290,10 +1578,11 @@ public abstract class Node {
}
// TODO Update child nodes to use the root node or the parent node of this node as the new parent node
for (Node node : getNodes()) {
//node.set
//node.changeParent(parent);
}
// TODO Leaf nodes should remove queued items from the pubsub engine (subclass should do this work)
// Broadcast delete notification to subscribers (if enabled)
if (notifyDelete) {
if (isNotifiedOfDelete()) {
// Build packet to broadcast to subscribers
Message message = new Message();
Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event");
......@@ -1302,9 +1591,12 @@ public abstract class Node {
// Send notification that the node was deleted
broadcastSubscribers(message, true);
}
// Remove the node from memory
service.removeNode(getNodeID());
// Clear collections in memory (clear them after broadcast was sent)
affiliates.clear();
subscriptionsByID.clear();
subscriptionsByJID.clear();
return true;
}
return false;
......@@ -1316,7 +1608,7 @@ public abstract class Node {
*
* @param iqRequest IQ request sent by an owner of the node.
*/
public void sendAffiliatedEntities(IQ iqRequest) {
void sendAffiliatedEntities(IQ iqRequest) {
IQ reply = IQ.createResultIQ(iqRequest);
Element childElement = iqRequest.getChildElement().createCopy();
reply.setChildElement(childElement);
......@@ -1341,6 +1633,15 @@ public abstract class Node {
service.broadcast(this, message, jids);
}
/**
* Sends an event notification to the specified subscriber. The event notification may
* include information about the affected subscriptions.
*
* @param subscriberJID the subscriber JID that will get the notification.
* @param notification the message to send to the subscriber.
* @param subIDs the list of affected subscription IDs or null when node does not
* allow multiple subscriptions.
*/
protected void sendEventNotification(JID subscriberJID, Message notification,
Collection<String> subIDs) {
Element headers = null;
......@@ -1367,8 +1668,14 @@ public abstract class Node {
* Creates a new subscription and possibly a new affiliate if the owner of the subscription
* does not have any existing affiliation with the node. The new subscription might require
* to be authorized by a node owner to be active. If new subscriptions are required to be
* configured before being active then the subscription state would be "unconfigured".
* configured before being active then the subscription state would be "unconfigured".<p>
*
* The originalIQ parameter may be <tt>null</tt> when using this API internally. When no
* IQ packet was sent then no IQ result will be sent to the sender. The rest of the
* functionality is the same.
*
* @param originalIQ the IQ packet sent by the entity to subscribe to the node or
* null when using this API internally.
* @param owner the JID of the affiliate.
* @param subscriber the JID where event notifications are going to be sent.
* @param authorizationRequired true if the new subscriptions needs to be authorized by
......@@ -1376,8 +1683,8 @@ public abstract class Node {
* @param options the data form with the subscription configuration or null if subscriber
* didn't provide a configuration.
*/
void createSubscription(IQ originalIQ, JID owner, JID subscriber, boolean authorizationRequired,
DataForm options) {
public void createSubscription(IQ originalIQ, JID owner, JID subscriber,
boolean authorizationRequired, DataForm options) {
// Create a new affiliation if required
if (getAffiliate(owner) == null) {
addNoneAffiliation(owner);
......@@ -1395,9 +1702,11 @@ public abstract class Node {
// Create new subscription
NodeSubscription subscription = addSubscription(owner, subscriber, subState, options);
// Reply with subscription and affiliation status indicating if subscription
// must be configured
subscription.sendSubscriptionState(originalIQ);
if (originalIQ != null) {
// Reply with subscription and affiliation status indicating if subscription
// must be configured (only when subscription was made through an IQ packet)
subscription.sendSubscriptionState(originalIQ);
}
// Send last published item (if node is leaf node and subscription status is ok)
if (isSendItemSubscribe()) {
......@@ -1415,9 +1724,10 @@ public abstract class Node {
*
* @param subscription the subscription to cancel.
*/
void cancelSubscription(NodeSubscription subscription) {
public void cancelSubscription(NodeSubscription subscription) {
// Remove subscription from memory
subscriptionsByID.remove(subscription.getID());
subscriptionsByJID.remove(subscription.getJID().toString());
// Check if user has affiliation of type "none" and there are no more subscriptions
NodeAffiliate affiliate = subscription.getAffiliate();
if (affiliate != null && affiliate.getAffiliation() == NodeAffiliate.Affiliation.none &&
......@@ -1438,6 +1748,7 @@ public abstract class Node {
* published items are not persistent then item ID is not used. In this case a <tt>null</tt>
* value will always be returned.
*
* @param itemID the ID of the item to retrieve.
* @return the PublishedItem whose ID matches the specified item ID or null if none was found.
*/
public PublishedItem getPublishedItem(String itemID) {
......@@ -1460,6 +1771,7 @@ public abstract class Node {
* the node. The returned collection cannot be modified. Collection nodes does not
* support publishing of items so an empty list will be returned in that case.
*
* @param recentItems number of recent items to retrieve.
* @return a list of PublishedItem with the most recent N items published to
* the node.
*/
......
......@@ -13,6 +13,7 @@ package org.jivesoftware.wildfire.pubsub;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.dom4j.Element;
import java.util.*;
......@@ -60,29 +61,126 @@ public class NodeAffiliate {
return node.getSubscriptions(jid);
}
/**
* Sends an event notification for the published items to the affiliate. The event
* notification may contain zero, one or many published items based on the items
* included in the original publication. If the affiliate has many subscriptions and
* many items were published then the affiliate will get a notification for each set
* of items that affected the same subscriptions.
*
* @param notification the message to sent to the subscribers. The message will be completed
* with the items to include in each notification.
* @param event the event Element included in the notification message. Passed as an
* optimization to avoid future look ups.
* @param node the leaf node where the items where published.
* @param publishedItems the list of items that were published. Could be an empty list.
*/
void sendPublishedNotifications(Message notification, Element event, LeafNode node,
List<PublishedItem> publishedItems) {
if (!publishedItems.isEmpty()) {
Map<List<NodeSubscription>, List<PublishedItem>> itemsBySubs =
getItemsBySubscriptions(node, publishedItems);
// Send one notification for published items that affect the same subscriptions
for (List<NodeSubscription> nodeSubscriptions : itemsBySubs.keySet()) {
// Add items information
Element items = event.addElement("items");
items.addAttribute("node", node.getNodeID());
for (PublishedItem publishedItem : itemsBySubs.get(nodeSubscriptions)) {
// Add item information to the event notification
Element item = items.addElement("item");
if (node.isItemRequired()) {
item.addAttribute("id", publishedItem.getID());
}
if (node.isPayloadDelivered()) {
item.add(publishedItem.getPayload().createCopy());
}
}
// Send the event notification
sendEventNotification(notification, node, nodeSubscriptions);
// Remove the added items information
event.remove(items);
}
}
else {
// Filter affiliate subscriptions and only use approved and configured ones
List<NodeSubscription> affectedSubscriptions = new ArrayList<NodeSubscription>();;
for (NodeSubscription subscription : getSubscriptions()) {
if (subscription.canSendEventNotification(node, null)) {
affectedSubscriptions.add(subscription);
}
}
// Add item information to the event notification
Element items = event.addElement("items");
items.addAttribute("node", node.getNodeID());
// Send the event notification
sendEventNotification(notification, node, affectedSubscriptions);
// Remove the added items information
event.remove(items);
}
}
/**
* Sends an event notification to the affiliate for the deleted items. The event
* notification may contain one or many published items based on the items included
* in the original publication. If the affiliate has many subscriptions and many
* items were deleted then the affiliate will get a notification for each set
* of items that affected the same subscriptions.
*
* @param notification the message to sent to the subscribers. The message will be completed
* with the items to include in each notification.
* @param event the event Element included in the notification message. Passed as an
* optimization to avoid future look ups.
* @param node the leaf node where the items where deleted from.
* @param publishedItems the list of items that were deleted.
*/
void sendDeletionNotifications(Message notification, Element event, LeafNode node,
List<PublishedItem> publishedItems) {
if (!publishedItems.isEmpty()) {
Map<List<NodeSubscription>, List<PublishedItem>> itemsBySubs =
getItemsBySubscriptions(node, publishedItems);
// Send one notification for published items that affect the same subscriptions
for (List<NodeSubscription> nodeSubscriptions : itemsBySubs.keySet()) {
// Add items information
Element items = event.addElement("items");
items.addAttribute("node", node.getNodeID());
for (PublishedItem publishedItem : itemsBySubs.get(nodeSubscriptions)) {
// Add retract information to the event notification
Element item = items.addElement("retract");
if (node.isItemRequired()) {
item.addAttribute("id", publishedItem.getID());
}
}
// Send the event notification
sendEventNotification(notification, node, nodeSubscriptions);
// Remove the added items information
event.remove(items);
}
}
}
/**
* Sends an event notification to each affected subscription of the affiliate. If the owner
* has many subscriptions from the same full JID then a single notification is going to be
* sent including a detail of the subscription IDs for which the notification is being sent.<p>
*
* Event notifications may include notifications of new published items or of items that
* were deleted.<p>
*
* The original publication to the node may or may not contain a {@link PublishedItem}. The
* subscriptions of the affiliation will be filtered based on the published item (if one was
* specified), the subscription status and originating node.
*
* @param notification the message to send containing the event notification.
* @param node the node that received a new publication.
* @param publishedItem the item was was published in the publication or null if none
* was published.
* @param notifySubscriptions list of subscriptions that were affected and are going to be
* included in the notification message. The list should not be empty.
*/
public void sendEventNotification(Message notification, LeafNode node,
PublishedItem publishedItem) {
// Filter affiliate subscriptions and only use approved and configured ones
List<NodeSubscription> notifySubscriptions = new ArrayList<NodeSubscription>();
for (NodeSubscription subscription : getSubscriptions()) {
if (subscription.canSendEventNotification(node, publishedItem)) {
notifySubscriptions.add(subscription);
}
}
private void sendEventNotification(Message notification, LeafNode node,
List<NodeSubscription> notifySubscriptions) {
if (node.isMultipleSubscriptionsEnabled()) {
// Group subscriptions with the same subscriber JID
Map<JID, Collection<String>> groupedSubs = new HashMap<JID, Collection<String>>();
......@@ -112,6 +210,43 @@ public class NodeAffiliate {
}
}
private Map<List<NodeSubscription>, List<PublishedItem>> getItemsBySubscriptions(LeafNode node,
List<PublishedItem> publishedItems) {
// Identify which subscriptions can receive each item
Map<PublishedItem, List<NodeSubscription>> subsByItem =
new HashMap<PublishedItem, List<NodeSubscription>>();
// Filter affiliate subscriptions and only use approved and configured ones
Collection<NodeSubscription> subscriptions = getSubscriptions();
for (PublishedItem publishedItem : publishedItems) {
for (NodeSubscription subscription : subscriptions) {
if (subscription.canSendEventNotification(node, publishedItem)) {
List<NodeSubscription> nodeSubscriptions = subsByItem.get(publishedItem);
if (nodeSubscriptions == null) {
nodeSubscriptions = new ArrayList<NodeSubscription>();
subsByItem.put(publishedItem, nodeSubscriptions);
}
nodeSubscriptions.add(subscription);
}
}
}
// Identify which items should be sent together to the same subscriptions
Map<List<NodeSubscription>, List<PublishedItem>> itemsBySubs =
new HashMap<List<NodeSubscription>, List<PublishedItem>>();
List<PublishedItem> affectedSubscriptions;
for (PublishedItem publishedItem : subsByItem.keySet()) {
affectedSubscriptions = itemsBySubs.get(subsByItem.get(publishedItem));
if (affectedSubscriptions == null) {
itemsBySubs.put(subsByItem.get(publishedItem), Arrays.asList(publishedItem));
}
else {
affectedSubscriptions.add(publishedItem);
}
}
return itemsBySubs;
}
public String toString() {
return super.toString() + " - JID: " + getJID() + " - Affiliation: " +
getAffiliation().name();
......
......@@ -384,6 +384,37 @@ public class NodeSubscription {
this.savedToDB = savedToDB;
}
/**
* Configures the subscription based on the sent {@link DataForm} included in the IQ
* packet sent by the subscriber. If the subscription was pending of configuration
* then the last published item is going to be sent to the subscriber.<p>
*
* The originalIQ parameter may be <tt>null</tt> when using this API internally. When no
* IQ packet was sent then no IQ result will be sent to the sender. The rest of the
* functionality is the same.
*
* @param originalIQ the IQ packet sent by the subscriber to configure his subscription or
* null when using this API internally.
* @param options the data form containing the new subscription configuration.
*/
public void configure(IQ originalIQ, DataForm options) {
boolean wasUnconfigured = isConfigurationPending();
// Change the subscription configuration based on the completed form
configure(options);
if (originalIQ != null) {
// Return success response
service.send(IQ.createResultIQ(originalIQ));
}
// Send last published item if subscription is now configured (and authorized)
if (wasUnconfigured && !isConfigurationPending() && node.isSendItemSubscribe()) {
PublishedItem lastItem = node.getLastPublishedItem();
if (lastItem != null) {
sendLastPublishedItem(lastItem);
}
}
}
void configure(DataForm options) {
List<String> values;
String booleanValue;
......@@ -473,7 +504,7 @@ public class NodeSubscription {
*
* @return data form used by the subscriber to edit the subscription configuration.
*/
DataForm getConfigurationForm() {
public DataForm getConfigurationForm() {
DataForm form = new DataForm(DataForm.Type.form);
form.setTitle(LocaleUtils.getLocalizedString("pubsub.form.subscription.title"));
List<String> params = new ArrayList<String>();
......@@ -701,7 +732,7 @@ public class NodeSubscription {
if (((LeafNode) node).isItemRequired()) {
item.addAttribute("id", publishedItem.getID());
}
if ((forceToIncludePayload || node.isDeliverPayloads()) &&
if ((forceToIncludePayload || node.isPayloadDelivered()) &&
publishedItem.getPayload() != null) {
item.add(publishedItem.getPayload().createCopy());
}
......@@ -736,7 +767,7 @@ public class NodeSubscription {
if (((LeafNode) node).isItemRequired()) {
item.addAttribute("id", publishedItem.getID());
}
if (node.isDeliverPayloads() && publishedItem.getPayload() != null) {
if (node.isPayloadDelivered() && publishedItem.getPayload() != null) {
item.add(publishedItem.getPayload().createCopy());
}
// Add a message body (if required)
......
......@@ -14,19 +14,18 @@ package org.jivesoftware.wildfire.pubsub;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.user.UserManager;
import org.jivesoftware.wildfire.pubsub.models.AccessModel;
import org.jivesoftware.wildfire.user.UserManager;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.*;
import java.util.Iterator;
import java.util.List;
import java.util.Collection;
import java.util.ArrayList;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A PubSubEngine is responsible for handling packets sent to the pub-sub service.
......@@ -36,6 +35,32 @@ import java.util.ArrayList;
public class PubSubEngine {
private PubSubService service;
/**
* The time to elapse between each execution of the maintenance process. Default
* is 2 minutes.
*/
private int items_task_timeout = 2 * 60 * 1000;
/**
* The number of items to save on each run of the maintenance process.
*/
private int items_batch_size = 50;
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>();
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>();
/**
* Task that saves or deletes published items from the database.
*/
private PublishedItemTask publishedItemTask;
/**
* Timer to save published items to the database or remove deleted or old items.
*/
private Timer timer = new Timer("PubSub maintenance");
/**
* The packet router for the server.
......@@ -45,6 +70,11 @@ public class PubSubEngine {
public PubSubEngine(PubSubService pubSubService, PacketRouter router) {
this.service = pubSubService;
this.router = router;
// Save or delete published items from the database every 2 minutes starting in
// 2 minutes (default values)
publishedItemTask = new PublishedItemTask();
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
}
/**
......@@ -69,7 +99,7 @@ public class PubSubEngine {
Element action = childElement.element("publish");
if (action != null) {
// Entity publishes an item
publishItemToNode(iq, action);
publishItemsToNode(iq, action);
return true;
}
action = childElement.element("subscribe");
......@@ -116,7 +146,8 @@ public class PubSubEngine {
}
action = childElement.element("retract");
if (action != null) {
// TODO Entity deletes an item
// Entity deletes an item
deleteItems(iq, action);
return true;
}
// Unknown action requested
......@@ -202,7 +233,7 @@ public class PubSubEngine {
// See "Handling Notification-Related Errors" section
}
private void publishItemToNode(IQ iq, Element publishElement) {
private void publishItemsToNode(IQ iq, Element publishElement) {
String nodeID = publishElement.attributeValue("node");
Node node = null;
if (nodeID == null) {
......@@ -241,18 +272,10 @@ public class PubSubEngine {
}
LeafNode leafNode = (LeafNode) node;
Element item = publishElement.element("item");
List entries = null;
Element payload = null;
String itemID = null;
if (item != null) {
entries = item.elements();
payload = entries.isEmpty() ? null : (Element) entries.get(0);
itemID = item.attributeValue("id");
}
Iterator itemElements = publishElement.elementIterator("item");
// Check that an item was included if node persist items or includes payload
if (item == null && leafNode.isItemRequired()) {
if (!itemElements.hasNext() && leafNode.isItemRequired()) {
Element pubsubError = DocumentHelper.createElement(QName.get(
"item-required", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
......@@ -260,30 +283,115 @@ public class PubSubEngine {
}
// Check that no item was included if node doesn't persist items and doesn't
// includes payload
if (item != null && !leafNode.isItemRequired()) {
if (itemElements.hasNext() && !leafNode.isItemRequired()) {
Element pubsubError = DocumentHelper.createElement(QName.get(
"item-forbidden", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
return;
}
// Check that a payload was included if node is configured to include payload
// in notifications
if (payload == null && leafNode.isDeliverPayloads()) {
List<Element> items = new ArrayList<Element>();
List entries = null;
Element payload = null;
while (itemElements.hasNext()) {
Element item = (Element) itemElements.next();
entries = item.elements();
payload = entries.isEmpty() ? null : (Element) entries.get(0);
// Check that a payload was included if node is configured to include payload
// in notifications
if (payload == null && leafNode.isPayloadDelivered()) {
Element pubsubError = DocumentHelper.createElement(QName.get(
"payload-required", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
return;
}
// Check that the payload (if any) contains only one child element
if (entries.size() > 1) {
Element pubsubError = DocumentHelper.createElement(QName.get(
"invalid-payload", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
return;
}
items.add(item);
}
// Return success operation
router.route(IQ.createResultIQ(iq));
// Publish item and send event notifications to subscribers
leafNode.publishItems(from, items);
}
private void deleteItems(IQ iq, Element retractElement) {
String nodeID = retractElement.attributeValue("node");
Node node = null;
if (nodeID == null) {
// No node was specified. Return bad_request error
Element pubsubError = DocumentHelper.createElement(QName.get(
"payload-required", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
"nodeid-required", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.item_not_found, pubsubError);
return;
}
// Check that the payload (if any) contains only one child element
if (entries.size() > 1) {
else {
// Look for the specified node
node = service.getNode(nodeID);
if (node == null) {
// Node does not exist. Return item-not-found error
sendErrorPacket(iq, PacketError.Condition.item_not_found, null);
return;
}
}
// Get the items to delete
Iterator itemElements = retractElement.elementIterator("item");
if (!itemElements.hasNext()) {
// TODO Confirm that at least one item should be present in the retract element. Waiting for stpeter confirmation.
Element pubsubError = DocumentHelper.createElement(QName.get(
"invalid-payload", "http://jabber.org/protocol/pubsub#errors"));
"item-required", "http://jabber.org/protocol/pubsub#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);
return;
}
// Publish item and send event notifications to subscribers
leafNode.sendEventNotification(iq, itemID, payload);
if (node.isCollectionNode()) {
// TODO Is this the correct error to return???
// Cannot delete items from a collection node. Return an error.
sendErrorPacket(iq, PacketError.Condition.not_allowed, null);
return;
}
LeafNode leafNode = (LeafNode) node;
if (!leafNode.isItemRequired()) {
// TODO JEP specifies to return this error if node is not persisting items but this checking makes more sense. Waiting for stpeter confirmation.
// Cannot delete items from a leaf node that doesn't handle itemIDs. Return an error.
sendErrorPacket(iq, PacketError.Condition.not_allowed, null);
return;
}
List<PublishedItem> items = new ArrayList<PublishedItem>();
while (itemElements.hasNext()) {
Element itemElement = (Element) itemElements.next();
String itemID = itemElement.attributeValue("id");
// TODO Return an error if no id was specified?
if (itemID != null) {
PublishedItem item = node.getPublishedItem(itemID);
if (item == null) {
// ItemID does not exist. Return item-not-found error
sendErrorPacket(iq, PacketError.Condition.item_not_found, null);
return;
}
else {
if (item.canDelete(iq.getFrom())) {
items.add(item);
}
else {
// Publisher does not have sufficient privileges to delete this item
sendErrorPacket(iq, PacketError.Condition.not_authorized, null);
return;
}
}
}
}
// Send reply with success
router.route(IQ.createResultIQ(iq));
// Delete items and send subscribers a notification
leafNode.deleteItems(items);
}
private void subscribeNode(IQ iq, Element childElement, Element subscribeElement) {
......@@ -623,18 +731,8 @@ public class PubSubEngine {
Element formElement = optionsElement.element(QName.get("x", "jabber:x:data"));
if (formElement != null) {
boolean wasUnconfigured = subscription.isConfigurationPending();
// Change the subscription configuration based on the completed form
subscription.configure(new DataForm(formElement));
// Return success response
router.route(IQ.createResultIQ(iq));
// Send last published item if subscription is now configured (and authorized)
if (wasUnconfigured && !subscription.isConfigurationPending()) {
PublishedItem lastItem = node.getLastPublishedItem();
if (lastItem != null) {
subscription.sendLastPublishedItem(lastItem);
}
}
subscription.configure(iq, new DataForm(formElement));
}
else {
// No data form was included so return bad request error
......@@ -931,8 +1029,6 @@ public class PubSubEngine {
else {
newNode.saveToDB();
}
// Add the new node to the list of available nodes
service.addNode(newNode);
}
else {
conflict = true;
......@@ -1060,8 +1156,6 @@ public class PubSubEngine {
// Delete the node
if (node.delete()) {
// Remove the node from memory
service.removeNode(node.getNodeID());
// Return that node was deleted successfully
router.route(IQ.createResultIQ(iq));
}
......@@ -1209,4 +1303,122 @@ public class PubSubEngine {
}
return completedForm;
}
public void shutdown() {
// Stop te maintenance processes
timer.cancel();
// Delete from the database items contained in the itemsToDelete queue
PublishedItem entry = null;
while (!itemsToDelete.isEmpty()) {
entry = itemsToDelete.poll();
if (entry != null) {
PubSubPersistenceManager.removePublishedItem(service, entry);
}
}
// Save to the database items contained in the itemsToAdd queue
while (!itemsToAdd.isEmpty()) {
entry = itemsToAdd.poll();
if (entry != null) {
PubSubPersistenceManager.createPublishedItem(service, entry);
}
}
}
/*******************************************************************************
* Methods related to PubSub maintenance tasks. Such as
* saving or deleting published items.
******************************************************************************/
/**
* Schedules the maintenance task for repeated <i>fixed-delay execution</i>,
* beginning after the specified delay. Subsequent executions take place
* at approximately regular intervals separated by the specified period.
*
* @param timeout the new frequency of the maintenance task.
*/
void setPublishedItemTaskTimeout(int timeout) {
if (this.items_task_timeout == timeout) {
return;
}
// Cancel the existing task because the timeout has changed
if (publishedItemTask != null) {
publishedItemTask.cancel();
}
this.items_task_timeout = timeout;
// Create a new task and schedule it with the new timeout
publishedItemTask = new PublishedItemTask();
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
}
/**
* Adds the item to the queue of items to remove from the database. The queue is going
* to be processed by another thread.
*
* @param removedItem the item to remove from the database.
*/
void queueItemToRemove(PublishedItem removedItem) {
// Remove the removed item from the queue of items to add to the database
if (!itemsToAdd.remove(removedItem)) {
// The item is already present in the database so add the removed item
// to the queue of items to delete from the database
itemsToDelete.add(removedItem);
}
}
/**
* Adds the item to the queue of items to add to the database. The queue is going
* to be processed by another thread.
*
* @param newItem the item to add to the database.
*/
void queueItemToAdd(PublishedItem newItem) {
itemsToAdd.add(newItem);
}
/**
* Cancels any queued operation for the specified list of items. This operation is
* usually required when a node was deleted so any pending operation of the node items
* should be cancelled.
*
* @param items the list of items to remove the from queues.
*/
void cancelQueuedItems(Collection<PublishedItem> items) {
for (PublishedItem item : items) {
itemsToAdd.remove(item);
itemsToDelete.remove(item);
}
}
private class PublishedItemTask extends TimerTask {
public void run() {
try {
PublishedItem entry = null;
boolean success = false;
// Delete from the database items contained in the itemsToDelete queue
for (int index = 0; index <= items_batch_size && !itemsToDelete.isEmpty(); index++) {
entry = itemsToDelete.poll();
if (entry != null) {
success = PubSubPersistenceManager.removePublishedItem(service, entry);
if (!success) {
itemsToDelete.add(entry);
}
}
}
// Save to the database items contained in the itemsToAdd queue
for (int index = 0; index <= items_batch_size && !itemsToAdd.isEmpty(); index++) {
entry = itemsToAdd.poll();
if (entry != null) {
success = PubSubPersistenceManager.createPublishedItem(service, entry);
if (!success) {
itemsToAdd.add(entry);
}
}
}
}
catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
}
......@@ -223,6 +223,10 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
return presences.get(subscriber);
}
public PubSubEngine getPubSubEngine() {
return engine;
}
public String getServiceName() {
return serviceName;
}
......@@ -371,8 +375,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
JiveGlobals.getBooleanProperty("xmpp.pubsub.default.notify.retract", true));
collectionDefaultConfiguration.setPresenceBasedDelivery(JiveGlobals.getBooleanProperty(
"xmpp.pubsub.default.presenceBasedDelivery", false));
collectionDefaultConfiguration.setSendItemSubscribe(
JiveGlobals.getBooleanProperty("xmpp.pubsub.default.sendItemSubscribe", false));
collectionDefaultConfiguration.setSubscriptionEnabled(JiveGlobals.getBooleanProperty(
"xmpp.pubsub.default.subscriptionEnabled", true));
leafDefaultConfiguration.setReplyPolicy(null);
......@@ -397,8 +399,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
rootCollectionNode.addOwner(creatorJID);
// Save new root node
rootCollectionNode.saveToDB();
// Add the new root node to the list of available nodes
addNode(rootCollectionNode);
}
else {
rootCollectionNode = (CollectionNode) getNode(rootNodeID);
......@@ -419,8 +419,9 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
super.stop();
// Remove the route to this service
routingTable.removeRoute(getAddress());
// TODO this
//savePublishedItems();
// Stop the pubsub engine. This will gives us the chance to
// save queued items to the database.
engine.shutdown();
}
public Iterator<DiscoServerItem> getItems() {
......
......@@ -154,7 +154,7 @@ public class PubSubPersistenceManager {
pstmt.setString(4, StringUtils.dateToMillis(node.getCreationDate()));
pstmt.setString(5, StringUtils.dateToMillis(node.getModificationDate()));
pstmt.setString(6, node.getParent() != null ? node.getParent().getNodeID() : null);
pstmt.setInt(7, (node.isDeliverPayloads() ? 1 : 0));
pstmt.setInt(7, (node.isPayloadDelivered() ? 1 : 0));
if (!node.isCollectionNode()) {
pstmt.setInt(8, ((LeafNode) node).getMaxPayloadSize());
pstmt.setInt(9, (((LeafNode) node).isPersistPublishedItems() ? 1 : 0));
......@@ -165,9 +165,9 @@ public class PubSubPersistenceManager {
pstmt.setInt(9, 0);
pstmt.setInt(10, 0);
}
pstmt.setInt(11, (node.isNotifyConfigChanges() ? 1 : 0));
pstmt.setInt(12, (node.isNotifyDelete() ? 1 : 0));
pstmt.setInt(13, (node.isNotifyRetract() ? 1 : 0));
pstmt.setInt(11, (node.isNotifiedOfConfigChanges() ? 1 : 0));
pstmt.setInt(12, (node.isNotifiedOfDelete() ? 1 : 0));
pstmt.setInt(13, (node.isNotifiedOfRetract() ? 1 : 0));
pstmt.setInt(14, (node.isPresenceBasedDelivery() ? 1 : 0));
pstmt.setInt(15, (node.isSendItemSubscribe() ? 1 : 0));
pstmt.setString(16, node.getPublisherModel().getName());
......@@ -228,7 +228,7 @@ public class PubSubPersistenceManager {
pstmt = con.prepareStatement(UPDATE_NODE);
pstmt.setString(1, StringUtils.dateToMillis(node.getModificationDate()));
pstmt.setString(2, node.getParent() != null ? node.getParent().getNodeID() : null);
pstmt.setInt(3, (node.isDeliverPayloads() ? 1 : 0));
pstmt.setInt(3, (node.isPayloadDelivered() ? 1 : 0));
if (!node.isCollectionNode()) {
pstmt.setInt(4, ((LeafNode) node).getMaxPayloadSize());
pstmt.setInt(5, (((LeafNode) node).isPersistPublishedItems() ? 1 : 0));
......@@ -239,9 +239,9 @@ public class PubSubPersistenceManager {
pstmt.setInt(5, 0);
pstmt.setInt(6, 0);
}
pstmt.setInt(7, (node.isNotifyConfigChanges() ? 1 : 0));
pstmt.setInt(8, (node.isNotifyDelete() ? 1 : 0));
pstmt.setInt(9, (node.isNotifyRetract() ? 1 : 0));
pstmt.setInt(7, (node.isNotifiedOfConfigChanges() ? 1 : 0));
pstmt.setInt(8, (node.isNotifiedOfDelete() ? 1 : 0));
pstmt.setInt(9, (node.isNotifiedOfRetract() ? 1 : 0));
pstmt.setInt(10, (node.isPresenceBasedDelivery() ? 1 : 0));
pstmt.setInt(11, (node.isSendItemSubscribe() ? 1 : 0));
pstmt.setString(12, node.getPublisherModel().getName());
......@@ -433,17 +433,17 @@ public class PubSubPersistenceManager {
}
node.setCreationDate(new Date(Long.parseLong(rs.getString(3).trim())));
node.setModificationDate(new Date(Long.parseLong(rs.getString(4).trim())));
node.setDeliverPayloads(rs.getInt(6) == 1);
node.setPayloadDelivered(rs.getInt(6) == 1);
if (leaf) {
((LeafNode) node).setMaxPayloadSize(rs.getInt(7));
((LeafNode) node).setPersistPublishedItems(rs.getInt(8) == 1);
((LeafNode) node).setMaxPublishedItems(rs.getInt(9));
((LeafNode) node).setSendItemSubscribe(rs.getInt(14) == 1);
}
node.setNotifyConfigChanges(rs.getInt(10) == 1);
node.setNotifyDelete(rs.getInt(11) == 1);
node.setNotifyRetract(rs.getInt(12) == 1);
node.setNotifiedOfConfigChanges(rs.getInt(10) == 1);
node.setNotifiedOfDelete(rs.getInt(11) == 1);
node.setNotifiedOfRetract(rs.getInt(12) == 1);
node.setPresenceBasedDelivery(rs.getInt(13) == 1);
node.setSendItemSubscribe(rs.getInt(14) == 1);
node.setPublisherModel(PublisherModel.valueOf(rs.getString(15)));
node.setSubscriptionEnabled(rs.getInt(16) == 1);
node.setSubscriptionConfigurationRequired(rs.getInt(17) == 1);
......@@ -788,14 +788,52 @@ public class PubSubPersistenceManager {
}
}
/**
* Creates and stores the published item in the database.
*
* @param service the pubsub service that is hosting the node.
* @param item The published item to save.
* @return true if the item was successfully saved to the database.
*/
public static boolean createPublishedItem(PubSubService service, PublishedItem item) {
boolean success = false;
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
// Remove the published item from the database
pstmt = con.prepareStatement(ADD_ITEM);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, item.getNode().getNodeID());
pstmt.setString(3, item.getID());
pstmt.setString(4, item.getPublisher().toString());
pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate()));
pstmt.setString(6, item.getPayloadXML());
pstmt.executeUpdate();
// Set that the item was successfully saved to the database
success = true;
}
catch (SQLException sqle) {
Log.error(sqle);
}
finally {
try {if (pstmt != null) {pstmt.close();}}
catch (Exception e) {Log.error(e);}
try {if (con != null) {con.close();}}
catch (Exception e) {Log.error(e);}
}
return success;
}
/**
* Removes the specified published item from the DB.
*
* @param service the pubsub service that is hosting the node.
* @param node The node where the published item was published.
* @param item The published item to delete.
* @return true if the item was successfully deleted from the database.
*/
public static void removePublishedItem(PubSubService service, Node node, PublishedItem item) {
public static boolean removePublishedItem(PubSubService service, PublishedItem item) {
boolean success = false;
Connection con = null;
PreparedStatement pstmt = null;
try {
......@@ -803,9 +841,11 @@ public class PubSubPersistenceManager {
// Remove the published item from the database
pstmt = con.prepareStatement(DELETE_ITEM);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, item.getNode().getNodeID());
pstmt.setString(3, item.getID());
pstmt.executeUpdate();
// Set that the item was successfully deleted from the database
success = true;
}
catch (SQLException sqle) {
Log.error(sqle);
......@@ -816,6 +856,7 @@ public class PubSubPersistenceManager {
try {if (con != null) {con.close();}}
catch (Exception e) {Log.error(e);}
}
return success;
}
/**
......@@ -1032,13 +1073,13 @@ public class PubSubPersistenceManager {
}
node.setCreationDate(new Date(Long.parseLong(rs.getString(2).trim())));
node.setModificationDate(new Date(Long.parseLong(rs.getString(3).trim())));
node.setDeliverPayloads(rs.getInt(5) == 1);
node.setPayloadDelivered(rs.getInt(5) == 1);
node.setMaxPayloadSize(rs.getInt(6));
node.setPersistPublishedItems(rs.getInt(7) == 1);
node.setMaxPublishedItems(rs.getInt(8));
node.setNotifyConfigChanges(rs.getInt(9) == 1);
node.setNotifyDelete(rs.getInt(10) == 1);
node.setNotifyRetract(rs.getInt(11) == 1);
node.setNotifiedOfConfigChanges(rs.getInt(9) == 1);
node.setNotifiedOfDelete(rs.getInt(10) == 1);
node.setNotifiedOfRetract(rs.getInt(11) == 1);
node.setPresenceBasedDelivery(rs.getInt(12) == 1);
node.setSendItemSubscribe(rs.getInt(13) == 1);
node.setPublisherModel(Node.PublisherModel.valueOf(rs.getString(14)));
......
......@@ -175,4 +175,13 @@ public interface PubSubService {
* in RFC 3921.
*/
String getShowPresence(JID subscriber);
/**
* Returns the pubsub engine responsible for handling packets sent to the pub-sub service.
* The engine is the actual place where the pubsub magic happens like creating nodes,
* publishing items or subscribing to nodes.
*
* @return the pubsub engine responsible for handling packets sent to the pub-sub service.
*/
PubSubEngine getPubSubEngine();
}
......@@ -31,7 +31,8 @@ import java.util.Date;
public class PublishedItem {
/**
* JID of the entity that published the item to the node.
* JID of the entity that published the item to the node. This is the full JID
* of the publisher.
*/
private JID publisher;
/**
......@@ -110,6 +111,17 @@ public class PublishedItem {
return payload;
}
/**
* Returns a textual representation of the payload or <tt>null</tt> if no payload
* was specified with the item.
*
* @return a textual representation of the payload or null if no payload was specified
* with the item.
*/
public String getPayloadXML() {
return payloadXML;
}
/**
* Sets the payload included when publishing the item. A published item may or may not
* have a payload. Transient nodes that are configured to not broadcast payloads may allow
......@@ -142,4 +154,19 @@ public class PublishedItem {
}
return payloadXML.contains(keyword);
}
/**
* Returns true if the user that is trying to delete an item is allowed to delete it.
* Only the publisher or node admins (i.e. owners and sysadmins) are allowed to delete items.
*
* @param user the full JID of the user trying to delete the item.
* @return true if the user that is trying to delete an item is allowed to delete it.
*/
public boolean canDelete(JID user) {
if (publisher.equals(user) || publisher.toBareJID().equals(user.toBareJID()) ||
node.isAdmin(user)) {
return true;
}
return false;
}
}
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
</head>
<body>
<p>Defines policies that define who is allowed to subscribe and retrieve items or policies that
define who is allowed to publish items to nodes. (JEP-0060).</p>
</body>
</html>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment