Commit ca008069 authored by Matt Tucker's avatar Matt Tucker Committed by matt

Additional pub-sub work.

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3639 b35dd754-fafc-0310-a699-88a17e54d16e
parent 1f557666
......@@ -448,13 +448,6 @@ public class DefaultNodeConfiguration {
formField.setLabel(LocaleUtils.getLocalizedString("pubsub.form.conf.deliver_payloads"));
formField.addValue(deliverPayloads);
formField = form.addField();
formField.setVariable("pubsub#send_item_subscribe");
formField.setType(FormField.Type.boolean_type);
formField.setLabel(
LocaleUtils.getLocalizedString("pubsub.form.conf.send_item_subscribe"));
formField.addValue(sendItemSubscribe);
formField = form.addField();
formField.setVariable("pubsub#notify_config");
formField.setType(FormField.Type.boolean_type);
......@@ -480,6 +473,13 @@ public class DefaultNodeConfiguration {
formField.addValue(presenceBasedDelivery);
if (leaf) {
formField = form.addField();
formField.setVariable("pubsub#send_item_subscribe");
formField.setType(FormField.Type.boolean_type);
formField.setLabel(
LocaleUtils.getLocalizedString("pubsub.form.conf.send_item_subscribe"));
formField.addValue(sendItemSubscribe);
formField = form.addField();
formField.setVariable("pubsub#persist_items");
formField.setType(FormField.Type.boolean_type);
......
......@@ -103,7 +103,7 @@ public class LeafNode extends Node {
}
}
// Remove stored published items based on the new max items
while (!publishedItems.isEmpty() && maxPublishedItems > publishedItems.size()) {
while (!publishedItems.isEmpty() && publishedItems.size() > maxPublishedItems) {
PublishedItem removedItem = publishedItems.remove(0);
itemsByID.remove(removedItem.getID());
// Add the removed item to the queue of items to delete from the database. The
......@@ -236,7 +236,8 @@ public class LeafNode extends Node {
// Add the published item to the list of items to persist (using another thread)
// but check that we don't exceed the limit. Remove oldest items if required.
while (!publishedItems.isEmpty() && maxPublishedItems >= publishedItems.size()) {
while (!publishedItems.isEmpty() && publishedItems.size() >= maxPublishedItems)
{
PublishedItem removedItem = publishedItems.remove(0);
itemsByID.remove(removedItem.getID());
// Add the removed item to the queue of items to delete from the database. The
......@@ -410,7 +411,7 @@ public class LeafNode extends Node {
Element items = event.addElement("purge");
items.addAttribute("node", nodeID);
// Send notification that the node configuration has changed
broadcastSubscribers(message, false);
broadcastNodeEvent(message, false);
}
}
}
......@@ -203,10 +203,15 @@ public abstract class Node {
Collection<NodeSubscription> subscriptions = getSubscriptions(jid);
if (subscriptions.isEmpty()) {
// User does not have a subscription with the node so create a default one
addSubscription(jid, jid, NodeSubscription.State.subscribed, null);
createSubscription(null, jid, jid, false, null);
}
else {
// TODO Approve any pending subscription
// Approve any pending subscription
for (NodeSubscription subscription : getSubscriptions(jid)) {
if (subscription.isAuthorizationPending()) {
subscription.approved();
}
}
}
}
......@@ -232,10 +237,15 @@ public abstract class Node {
Collection<NodeSubscription> subscriptions = getSubscriptions(jid);
if (subscriptions.isEmpty()) {
// User does not have a subscription with the node so create a default one
addSubscription(jid, jid, NodeSubscription.State.subscribed, null);
createSubscription(null, jid, jid, false, null);
}
else {
// TODO Approve any pending subscription
// Approve any pending subscription
for (NodeSubscription subscription : getSubscriptions(jid)) {
if (subscription.isAuthorizationPending()) {
subscription.approved();
}
}
}
}
......@@ -327,24 +337,6 @@ public abstract class Node {
}
}
private NodeSubscription addSubscription(JID owner, JID jid, NodeSubscription.State subscribed,
DataForm options) {
// Generate a subscription ID (override even if one was sent by the client)
String id = StringUtils.randomString(40);
NodeSubscription subscription = new NodeSubscription(service, this, owner, jid, subscribed, id);
// Configure the subscription with the specified configuration (if any)
if (options != null) {
subscription.configure(options);
}
addSubscription(subscription);
if (savedToDB) {
// Add the new subscription to the database
PubSubPersistenceManager.saveSubscription(service, this, subscription, true);
}
return subscription;
}
/**
* Removes all subscriptions owned by the specified entity.
*
......@@ -694,9 +686,54 @@ public abstract class Node {
item.add(getConfigurationChangeForm().getElement());
}
// Send notification that the node configuration has changed
broadcastSubscribers(message, false);
broadcastNodeEvent(message, false);
}
/**
* Returns the data form to be included in the authorization request to be sent to
* node owners when a new subscription needs to be approved.
*
* @param subscription the new subscription that needs to be approved.
* @return the data form to be included in the authorization request.
*/
DataForm getAuthRequestForm(NodeSubscription subscription) {
DataForm form = new DataForm(DataForm.Type.form);
form.setTitle(LocaleUtils.getLocalizedString("pubsub.form.authorization.title"));
form.addInstruction(
LocaleUtils.getLocalizedString("pubsub.form.authorization.instruction"));
FormField formField = form.addField();
formField.setVariable("FORM_TYPE");
formField.setType(FormField.Type.hidden);
formField.addValue("http://jabber.org/protocol/pubsub#subscribe_authorization");
formField = form.addField();
formField.setVariable("pubsub#subid");
formField.setType(FormField.Type.hidden);
formField.addValue(subscription.getID());
formField = form.addField();
formField.setVariable("pubsub#node");
formField.setType(FormField.Type.text_single);
formField.setLabel(LocaleUtils.getLocalizedString("pubsub.form.authorization.node"));
formField.addValue(getNodeID());
formField = form.addField();
formField.setVariable("pusub#subscriber_jid");
formField.setType(FormField.Type.jid_single);
formField.setLabel(LocaleUtils.getLocalizedString("pubsub.form.authorization.subscriber"));
formField.addValue(subscription.getJID().toString());
formField = form.addField();
formField.setVariable("pubsub#allow");
formField.setType(FormField.Type.boolean_type);
formField.setLabel(LocaleUtils.getLocalizedString("pubsub.form.authorization.allow"));
formField.addValue(Boolean.FALSE);
return form;
}
/**
* Returns a data form used by the owner to edit the node configuration.
*
......@@ -708,6 +745,12 @@ public abstract class Node {
List<String> params = new ArrayList<String>();
params.add(getNodeID());
form.addInstruction(LocaleUtils.getLocalizedString("pubsub.form.conf.instruction", params));
FormField formField = form.addField();
formField.setVariable("FORM_TYPE");
formField.setType(FormField.Type.hidden);
formField.addValue("http://jabber.org/protocol/pubsub#node_config");
// Add the form fields and configure them for edition
addFormFields(form, true);
return form;
......@@ -724,11 +767,6 @@ public abstract class Node {
*/
protected void addFormFields(DataForm form, boolean isEditing) {
FormField formField = form.addField();
formField.setVariable("FORM_TYPE");
formField.setType(FormField.Type.hidden);
formField.addValue("http://jabber.org/protocol/pubsub#node_config");
formField = form.addField();
formField.setVariable("pubsub#title");
if (isEditing) {
formField.setType(FormField.Type.text_single);
......@@ -938,12 +976,33 @@ public abstract class Node {
*/
private DataForm getConfigurationChangeForm() {
DataForm form = new DataForm(DataForm.Type.result);
FormField formField = form.addField();
formField.setVariable("FORM_TYPE");
formField.setType(FormField.Type.hidden);
formField.addValue("http://jabber.org/protocol/pubsub#node_config");
// Add the form fields and configure them for notification
// (i.e. no label or options are included)
addFormFields(form, false);
return form;
}
/**
* Returns a data form containing the node configuration that is going to be used for
* service discovery.
*
* @return a data form with the node configuration.
*/
public DataForm getMetadataForm() {
DataForm form = new DataForm(DataForm.Type.result);
FormField formField = form.addField();
formField.setVariable("FORM_TYPE");
formField.setType(FormField.Type.hidden);
formField.addValue("http://jabber.org/protocol/pubsub#meta-data");
// Add the form fields
addFormFields(form, true);
return form;
}
/**
* Returns true if this node is the root node of the pubsub service.
*
......@@ -1589,7 +1648,7 @@ public abstract class Node {
Element items = event.addElement("delete");
items.addAttribute("node", nodeID);
// Send notification that the node was deleted
broadcastSubscribers(message, true);
broadcastNodeEvent(message, true);
}
// Remove the node from memory
service.removeNode(getNodeID());
......@@ -1622,10 +1681,17 @@ public abstract class Node {
}
}
protected void broadcastSubscribers(Message message, boolean includeAll) {
/**
* Broadcasts a node event to subscribers of the node.
*
* @param message the message containing the node event.
* @param includeAll true if all subscribers will be notified no matter their
* subscriptions status or configuration.
*/
protected void broadcastNodeEvent(Message message, boolean includeAll) {
Collection<JID> jids = new ArrayList<JID>();
for (NodeSubscription subscription : subscriptionsByID.values()) {
if (includeAll || subscription.isApproved()) {
if (includeAll || subscription.canSendNodeEvents()) {
jids.add(subscription.getJID());
}
}
......@@ -1655,8 +1721,7 @@ public abstract class Node {
}
}
notification.setTo(subscriberJID);
service.send(notification);
service.sendNotification(this, notification, subscriberJID);
if (headers != null) {
// Remove the added child element that includes subscription IDs information
......@@ -1699,8 +1764,21 @@ public abstract class Node {
// User has to configure the subscription to make it active
subState = NodeSubscription.State.unconfigured;
}
// Generate a subscription ID (override even if one was sent by the client)
String id = StringUtils.randomString(40);
// Create new subscription
NodeSubscription subscription = addSubscription(owner, subscriber, subState, options);
NodeSubscription subscription =
new NodeSubscription(service, this, owner, subscriber, subState, id);
// Configure the subscription with the specified configuration (if any)
if (options != null) {
subscription.configure(options);
}
addSubscription(subscription);
if (savedToDB) {
// Add the new subscription to the database
PubSubPersistenceManager.saveSubscription(service, this, subscription, true);
}
if (originalIQ != null) {
// Reply with subscription and affiliation status indicating if subscription
......@@ -1708,8 +1786,14 @@ public abstract class Node {
subscription.sendSubscriptionState(originalIQ);
}
// If subscription is pending then send notification to node owners asking to approve
// new subscription
if (subscription.isAuthorizationPending()) {
subscription.sendAuthorizationRequest();
}
// Send last published item (if node is leaf node and subscription status is ok)
if (isSendItemSubscribe()) {
if (isSendItemSubscribe() && subscription.isActive()) {
PublishedItem lastItem = getLastPublishedItem();
if (lastItem != null) {
subscription.sendLastPublishedItem(lastItem);
......@@ -1779,6 +1863,28 @@ public abstract class Node {
return Collections.emptyList();
}
/**
* Returns a list with the subscriptions to the node that are pending to be approved by
* a node owner. If the node is not using the access model
* {@link org.jivesoftware.wildfire.pubsub.models.AuthorizeAccess} then the result will
* be an empty collection.
*
* @return a list with the subscriptions to the node that are pending to be approved by
* a node owner.
*/
public Collection<NodeSubscription> getPendingSubscriptions() {
if (accessModel.isAuthorizationRequired()) {
List<NodeSubscription> pendingSubscriptions = new ArrayList<NodeSubscription>();
for (NodeSubscription subscription : subscriptionsByID.values()) {
if (subscription.isAuthorizationPending()) {
pendingSubscriptions.add(subscription);
}
}
return pendingSubscriptions;
}
return Collections.emptyList();
}
public String toString() {
return super.toString() + " - ID: " + getNodeID();
}
......@@ -1795,6 +1901,30 @@ public abstract class Node {
return null;
}
/**
* Approves or cancels a subscriptions that was pending to be approved by a node owner.
* Subscriptions that were not approved will be deleted. Approved subscriptions will be
* activated (i.e. will be able to receive event notifications) as long as the subscriber
* is not required to configure the subscription.
*
* @param subscription the subscriptions that was approved or rejected.
* @param approved true when susbcription was approved. Otherwise the subscription was rejected.
*/
public void approveSubscription(NodeSubscription subscription, boolean approved) {
if (!subscription.isAuthorizationPending()) {
// Do nothing if the subscription is no longer pending
return;
}
if (approved) {
// Mark that the subscription to the node has been approved
subscription.approved();
}
else {
// Cancel the subscription to the node
cancelSubscription(subscription);
}
}
/**
* Policy that defines whether owners or publisher should receive replies to items.
*/
......
......@@ -11,9 +11,9 @@
package org.jivesoftware.wildfire.pubsub;
import org.dom4j.Element;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.dom4j.Element;
import java.util.*;
......@@ -86,7 +86,7 @@ public class NodeAffiliate {
for (List<NodeSubscription> nodeSubscriptions : itemsBySubs.keySet()) {
// Add items information
Element items = event.addElement("items");
items.addAttribute("node", node.getNodeID());
items.addAttribute("node", getNode().getNodeID());
for (PublishedItem publishedItem : itemsBySubs.get(nodeSubscriptions)) {
// Add item information to the event notification
Element item = items.addElement("item");
......@@ -96,6 +96,11 @@ public class NodeAffiliate {
if (node.isPayloadDelivered()) {
item.add(publishedItem.getPayload().createCopy());
}
// Add leaf node information if affiliated node and node
// where the item was published are different
if (node != getNode()) {
item.addAttribute("node", node.getNodeID());
}
}
// Send the event notification
sendEventNotification(notification, node, nodeSubscriptions);
......
......@@ -12,14 +12,14 @@
package org.jivesoftware.wildfire.pubsub;
import org.dom4j.Element;
import org.jivesoftware.util.FastDateFormat;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.FastDateFormat;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Presence;
import java.text.ParseException;
......@@ -76,11 +76,6 @@ public class NodeSubscription {
* Current subscription state.
*/
private State state;
/**
* Flag that indicates if configuration is required by the node and is still pending to
* be configured by the subscriber.
*/
private boolean configurationPending = false;
/**
* Flag indicating whether an entity wants to receive or has disabled notifications.
*/
......@@ -130,9 +125,7 @@ public class NodeSubscription {
*/
private boolean savedToDB = false;
// TODO Do not send event notifications (e.g. node purge, node deleted) to unconfigured subscriptions ????
// TODO Send last published item when a subscription is authorized. We may need to move this logic to another place
// TODO Implement presence subscription from the node to the subscriber to figure out if event notifications can be sent
// TODO Implement presence subscription from the node to the subscriber to figure out if event notifications can be sent
static {
dateFormat = new SimpleDateFormat("yyyy-MM-DD'T'HH:mm:ss.SSS'Z'");
......@@ -158,11 +151,6 @@ public class NodeSubscription {
this.owner = owner;
this.state = state;
this.id = id;
if (node.isSubscriptionConfigurationRequired()) {
// Subscription configuration is required and it's still pending
setConfigurationPending(true);
}
}
/**
......@@ -224,23 +212,25 @@ public class NodeSubscription {
/**
* Returns true if configuration is required by the node and is still pending to
* be configured by the subscriber. Otherwise return false.
* be configured by the subscriber. Otherwise return false. Once a subscription is
* configured it might need to be approved by a node owner to become active.
*
* @return true if configuration is required by the node and is still pending to
* be configured by the subscriber.
*/
public boolean isConfigurationPending() {
return configurationPending;
return state == State.unconfigured;
}
/**
* Returns true if the subscription was approved by a node owner. Nodes that don't
* require node owners to approve subscription assume that all subscriptions are approved.
* Returns true if the subscription needs to be approved by a node owner to become
* active. Until the subscription is not activated the subscriber will not receive
* event notifications.
*
* @return true if the subscription was approved by a node owner.
* @return true if the subscription needs to be approved by a node owner to become active.
*/
public boolean isApproved() {
return State.subscribed == state;
public boolean isAuthorizationPending() {
return state == State.pending;
}
/**
......@@ -340,10 +330,6 @@ public class NodeSubscription {
return keyword;
}
void setConfigurationPending(boolean configurationPending) {
this.configurationPending = configurationPending;
}
void setShouldDeliverNotifications(boolean deliverNotifications) {
this.deliverNotifications = deliverNotifications;
}
......@@ -405,14 +391,22 @@ public class NodeSubscription {
// Return success response
service.send(IQ.createResultIQ(originalIQ));
}
// Send last published item if subscription is now configured (and authorized)
if (wasUnconfigured && !isConfigurationPending() && node.isSendItemSubscribe()) {
PublishedItem lastItem = node.getLastPublishedItem();
if (lastItem != null) {
sendLastPublishedItem(lastItem);
if (wasUnconfigured) {
// If subscription is pending then send notification to node owners
// asking to approve the now configured subscription
if (isAuthorizationPending()) {
sendAuthorizationRequest();
}
}
// Send last published item (if node is leaf node and subscription status is ok)
if (node.isSendItemSubscribe() && isActive()) {
PublishedItem lastItem = node.getLastPublishedItem();
if (lastItem != null) {
sendLastPublishedItem(lastItem);
}
}
}
}
void configure(DataForm options) {
......@@ -488,8 +482,13 @@ public class NodeSubscription {
fieldExists = false;
}
if (fieldExists) {
// Mark that the subscription has been configured
setConfigurationPending(false);
// Subscription has been configured so set the next state
if (node.getAccessModel().isAuthorizationRequired()) {
state = State.pending;
}
else {
state = State.subscribed;
}
}
}
if (savedToDB) {
......@@ -640,6 +639,32 @@ public class NodeSubscription {
return true;
}
/**
* Returns true if node events such as configuration changed or node purged can be
* sent to the subscriber.
*
* @return true if node events such as configuration changed or node purged can be
* sent to the subscriber.
*/
boolean canSendNodeEvents() {
// Check if the subscription is active
if (!isActive()) {
return false;
}
// Check if delivery of notifications is disabled
if (!shouldDeliverNotifications()) {
return false;
}
// Check if delivery is subject to presence-based policy
if (!getPresenceStates().isEmpty()) {
String show = service.getShowPresence(jid);
if (show == null || !getPresenceStates().contains(show)) {
return false;
}
}
return true;
}
/**
* Returns true if the published item matches the keyword filter specified in
* the subscription. If no keyword was specified then answer true.
......@@ -662,9 +687,9 @@ public class NodeSubscription {
*
* @return true if the subscription is active.
*/
private boolean isActive() {
public boolean isActive() {
// Check if subscription is approved and configured (if required)
if (!isApproved() || this.isConfigurationPending()) {
if (state != State.subscribed) {
return false;
}
// Check if the subscription has expired
......@@ -808,6 +833,56 @@ public class NodeSubscription {
return super.toString() + " - JID: " + getJID() + " - State: " + getState().name();
}
/**
* The subscription has been approved by a node owner. The subscription is now active so
* the subscriber is now allowed to get event notifications.
*/
void approved() {
if (state == State.subscribed) {
// Do nothing
return;
}
state = State.subscribed;
if (savedToDB) {
// Update the subscription in the backend store
PubSubPersistenceManager.saveSubscription(service, node, this, false);
}
// Send last published item (if node is leaf node and subscription status is ok)
if (node.isSendItemSubscribe() && isActive()) {
PublishedItem lastItem = node.getLastPublishedItem();
if (lastItem != null) {
sendLastPublishedItem(lastItem);
}
}
}
/**
* Sends an request to authorize the pending subscription to the specified owner.
*
* @param owner the JID of the user that will get the authorization request.
*/
public void sendAuthorizationRequest(JID owner) {
Message authRequest = new Message();
authRequest.addExtension(node.getAuthRequestForm(this));
authRequest.setTo(owner);
authRequest.setFrom(service.getAddress());
// Send authentication request to node owners
service.send(authRequest);
}
/**
* Sends an request to authorize the pending subscription to all owners. The first
* answer sent by a owner will be processed. Rest of the answers will be discarded.
*/
public void sendAuthorizationRequest() {
Message authRequest = new Message();
authRequest.addExtension(node.getAuthRequestForm(this));
// Send authentication request to node owners
service.broadcast(node, authRequest, node.getOwners());
}
/**
* Subscriptions to a node may exist in several states. Delivery of event notifications
* varies according to the subscription state of the user with the node.
......
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.pubsub;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.wildfire.commands.AdHocCommand;
import org.jivesoftware.wildfire.commands.SessionData;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.JID;
import java.util.Arrays;
import java.util.List;
/**
* Ad-hoc command that sends pending subscriptions to node owners.
*
* @author Matt Tucker
*/
class PendingSubscriptionsCommand extends AdHocCommand {
private PubSubService service;
PendingSubscriptionsCommand(PubSubService service) {
this.service = service;
}
protected void addStageInformation(SessionData data, Element command) {
DataForm form = new DataForm(DataForm.Type.form);
form.setTitle(LocaleUtils.getLocalizedString("pubsub.command.pending-subscriptions.title"));
form.addInstruction(
LocaleUtils.getLocalizedString("pubsub.command.pending-subscriptions.instruction"));
FormField formField = form.addField();
formField.setVariable("pubsub#node");
formField.setType(FormField.Type.list_single);
formField.setLabel(
LocaleUtils.getLocalizedString("pubsub.command.pending-subscriptions.node"));
for (Node node : service.getNodes()) {
if (!node.isCollectionNode() && node.isAdmin(data.getOwner())) {
formField.addOption(null, node.getNodeID());
}
}
// Add the form to the command
command.add(form.getElement());
}
public void execute(SessionData data, Element command) {
Element note = command.addElement("note");
List<String> nodeIDs = data.getData().get("pubsub#node");
if (nodeIDs.isEmpty()) {
// No nodeID was provided by the requester
note.addAttribute("type", "error");
note.setText(LocaleUtils.getLocalizedString(
"pubsub.command.pending-subscriptions.error.idrequired"));
}
else if (nodeIDs.size() > 1) {
// More than one nodeID was provided by the requester
note.addAttribute("type", "error");
note.setText(LocaleUtils.getLocalizedString(
"pubsub.command.pending-subscriptions.error.manyIDs"));
}
else {
Node node = service.getNode(nodeIDs.get(0));
if (node != null) {
if (node.isAdmin(data.getOwner())) {
note.addAttribute("type", "info");
note.setText(LocaleUtils.getLocalizedString(
"pubsub.command.pending-subscriptions.success"));
for (NodeSubscription subscription : node.getPendingSubscriptions()) {
subscription.sendAuthorizationRequest(data.getOwner());
}
}
else {
// Requester is not an admin of the specified node
note.addAttribute("type", "error");
note.setText(LocaleUtils.getLocalizedString(
"pubsub.command.pending-subscriptions.error.forbidden"));
}
}
else {
// Node with the specified nodeID was not found
note.addAttribute("type", "error");
note.setText(LocaleUtils.getLocalizedString(
"pubsub.command.pending-subscriptions.error.badid"));
}
}
}
public String getCode() {
return "http://jabber.org/protocol/pubsub#get-pending";
}
public String getDefaultLabel() {
return LocaleUtils.getLocalizedString("pubsub.command.pending-subscriptions.label");
}
protected List<Action> getActions(SessionData data) {
return Arrays.asList(Action.complete);
}
protected Action getExecuteAction(SessionData data) {
return Action.complete;
}
public int getMaxStages(SessionData data) {
return 1;
}
public boolean hasPermission(JID requester) {
// User has permission if he is an owner of at least one node or is a sysadmin
for (Node node : service.getNodes()) {
if (!node.isCollectionNode() && node.isAdmin(requester)) {
return true;
}
}
return false;
}
}
......@@ -18,6 +18,7 @@ import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.commands.AdHocCommandManager;
import org.jivesoftware.wildfire.pubsub.models.AccessModel;
import org.jivesoftware.wildfire.user.UserManager;
import org.xmpp.forms.DataForm;
......@@ -35,6 +36,10 @@ import java.util.concurrent.LinkedBlockingQueue;
public class PubSubEngine {
private PubSubService service;
/**
* Manager that keeps the list of ad-hoc commands and processing command requests.
*/
private AdHocCommandManager manager;
/**
* The time to elapse between each execution of the maintenance process. Default
* is 2 minutes.
......@@ -70,7 +75,9 @@ public class PubSubEngine {
public PubSubEngine(PubSubService pubSubService, PacketRouter router) {
this.service = pubSubService;
this.router = router;
// Initialize the ad-hoc commands manager to use for this pubsub service
manager = new AdHocCommandManager();
manager.addCommand(new PendingSubscriptionsCommand(service));
// Save or delete published items from the database every 2 minutes starting in
// 2 minutes (default values)
publishedItemTask = new PublishedItemTask();
......@@ -211,6 +218,11 @@ public class PubSubEngine {
sendErrorPacket(iq, PacketError.Condition.bad_request, null);
return true;
}
else if ("http://jabber.org/protocol/commands".equals(namespace)) {
// Process ad-hoc command
IQ reply = manager.process(iq);
router.route(reply);
}
return false;
}
......@@ -224,13 +236,53 @@ public class PubSubEngine {
}
/**
* Handles Message packets sent to the pubsub service.
* Handles Message packets sent to the pubsub service. Messages may be of type error
* when an event notification was sent to a susbcriber whose address is no longer available.<p>
*
* Answers to authorization requests sent to node owners to approve pending subscriptions
* will also be processed by this method.
*
* @param message the Message packet sent to the pubsub service.
*/
public void process(Message message) {
// TODO Process Messages of type error to identify possible subscribers that no longer exist
// See "Handling Notification-Related Errors" section
if (message.getType() == Message.Type.error) {
// See "Handling Notification-Related Errors" section
}
else if (message.getType() == Message.Type.normal) {
// Check that this is an answer to an authorization request
DataForm authForm = (DataForm) message.getExtension("x", "jabber:x:data");
if (authForm != null && authForm.getType() == DataForm.Type.submit) {
String formType = authForm.getField("FORM_TYPE").getValues().get(0);
// Check that completed data form belongs to an authorization request
if ("http://jabber.org/protocol/pubsub#subscribe_authorization".equals(formType)) {
String nodeID = authForm.getField("pubsub#node").getValues().get(0);
String subID = authForm.getField("pubsub#subid").getValues().get(0);
String allow = authForm.getField("pubsub#allow").getValues().get(0);
boolean approved;
if ("1".equals(allow) || "true".equals(allow)) {
approved = true;
}
else if ("0".equals(allow) || "false".equals(allow)) {
approved = false;
}
else {
// Unknown allow value. Ignore completed form
Log.warn("Invalid allow value in completed authorization form: " +
message.toXML());
return;
}
// Approve or cancel the pending subscription to the node
Node node = service.getNode(nodeID);
if (node != null) {
NodeSubscription subscription = node.getSubscription(subID);
if (subscription != null) {
node.approveSubscription(subscription, approved);
}
}
}
}
}
}
private void publishItemsToNode(IQ iq, Element publishElement) {
......@@ -1322,6 +1374,8 @@ public class PubSubEngine {
PubSubPersistenceManager.createPublishedItem(service, entry);
}
}
// Stop executing ad-hoc commands
manager.stop();
}
/*******************************************************************************
......
......@@ -28,7 +28,6 @@ import org.jivesoftware.wildfire.disco.DiscoItemsProvider;
import org.jivesoftware.wildfire.disco.DiscoServerItem;
import org.jivesoftware.wildfire.disco.ServerItemsProvider;
import org.jivesoftware.wildfire.forms.DataForm;
import org.jivesoftware.wildfire.forms.FormField;
import org.jivesoftware.wildfire.forms.spi.XDataFormImpl;
import org.jivesoftware.wildfire.forms.spi.XFormFieldImpl;
import org.jivesoftware.wildfire.pubsub.models.AccessModel;
......@@ -462,7 +461,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
Element identity = DocumentHelper.createElement("identity");
identity.addAttribute("category", "pubsub");
identity.addAttribute("name", "Publish-Subscribe service");
identity.addAttribute("type", "generic");
identity.addAttribute("type", "service");
identities.add(identity);
}
......@@ -485,26 +484,32 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
if (name == null && node == null) {
// Answer the features of the PubSub service
features.add("http://jabber.org/protocol/pubsub");
// Collection nodes are supported
features.add("http://jabber.org/protocol/pubsub#collections");
// Configuration of node options is supported
features.add("http://jabber.org/protocol/pubsub#config-node");
if (isCollectionNodesSupported()) {
// Collection nodes are supported
features.add("http://jabber.org/protocol/pubsub#collections");
}
// Configuration of node options is supported
features.add("http://jabber.org/protocol/pubsub#config-node");
// Simultaneous creation and configuration of nodes is supported
features.add("http://jabber.org/protocol/pubsub#create-and-configure");
// Creation of nodes is supported
features.add("http://jabber.org/protocol/pubsub#create-nodes");
// Any publisher (not only the originating publisher) may delete an item
features.add("http://jabber.org/protocol/pubsub#delete-any");
// Deletion of nodes is supported
features.add("http://jabber.org/protocol/pubsub#delete-nodes");
// Creation of instant nodes is supported
features.add("http://jabber.org/protocol/pubsub#instant-nodes");
// TODO Retrieval of pending subscription approvals is supported
//features.add("http://jabber.org/protocol/pubsub#get-pending");
if (isInstantNodeSupported()) {
// Creation of instant nodes is supported
features.add("http://jabber.org/protocol/pubsub#instant-nodes");
}
// Publishers may specify item identifiers
features.add("http://jabber.org/protocol/pubsub#item-ids");
// Time-based subscriptions are supported
// TODO Time-based subscriptions are supported (clean up thread missing, rest is supported)
//features.add("http://jabber.org/protocol/pubsub#leased-subscription");
// Node meta-data is supported
features.add("http://jabber.org/protocol/pubsub#meta-data");
// Node owners may modify affiliations
features.add("http://jabber.org/protocol/pubsub#modify-affiliations");
// A single entity may subscribe to a node multiple times
features.add("http://jabber.org/protocol/pubsub#multi-subscribe");
// The outcast affiliation is supported
......@@ -513,6 +518,8 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
features.add("http://jabber.org/protocol/pubsub#persistent-items");
// Presence-based delivery of event notifications is supported
features.add("http://jabber.org/protocol/pubsub#presence-notifications");
// Publishing items is supported (note: not valid for collection nodes)
features.add("http://jabber.org/protocol/pubsub#publish");
// The publisher affiliation is supported
features.add("http://jabber.org/protocol/pubsub#publisher-affiliation");
// Purging of nodes is supported
......@@ -527,12 +534,16 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
features.add("http://jabber.org/protocol/pubsub#subscribe");
// Configuration of subscription options is supported
features.add("http://jabber.org/protocol/pubsub#subscription-options");
// Default access model for nodes created on the service
String modelName = getDefaultNodeConfiguration(true).getAccessModel().getName();
features.add("http://jabber.org/protocol/pubsub#default_access_model_" + modelName);
}
else if (name == null && node != null) {
// Answer the features of a given node
// TODO lock the node while gathering this info???
Node pubNode = getNode(node);
if (node != null && canDiscoverNode(pubNode)) {
// Answer the features of the PubSub service
features.add("http://jabber.org/protocol/pubsub");
}
}
......@@ -542,35 +553,21 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
public XDataFormImpl getExtendedInfo(String name, String node, JID senderJID) {
if (name == null && node != null) {
// Answer the extended info of a given node
// TODO lock the node while gathering this info???
Node pubNode = getNode(node);
if (node != null && canDiscoverNode(pubNode)) {
XDataFormImpl dataForm = new XDataFormImpl(DataForm.TYPE_RESULT);
XFormFieldImpl field = new XFormFieldImpl("FORM_TYPE");
field.setType(FormField.TYPE_HIDDEN);
field.addValue("http://jabber.org/protocol/muc#roominfo");
dataForm.addField(field);
field = new XFormFieldImpl("muc#roominfo_description");
field.setLabel(LocaleUtils.getLocalizedString("muc.extended.info.desc"));
//field.addValue(room.getDescription());
dataForm.addField(field);
field = new XFormFieldImpl("muc#roominfo_subject");
field.setLabel(LocaleUtils.getLocalizedString("muc.extended.info.subject"));
//field.addValue(room.getSubject());
dataForm.addField(field);
// Get the metadata data form
org.xmpp.forms.DataForm metadataForm = pubNode.getMetadataForm();
field = new XFormFieldImpl("muc#roominfo_occupants");
field.setLabel(LocaleUtils.getLocalizedString("muc.extended.info.occupants"));
//field.addValue(Integer.toString(room.getOccupantsCount()));
dataForm.addField(field);
field = new XFormFieldImpl("x-muc#roominfo_creationdate");
field.setLabel(LocaleUtils.getLocalizedString("muc.extended.info.creationdate"));
//field.addValue(dateFormatter.format(room.getCreationDate()));
dataForm.addField(field);
// Convert Whack data form into old data form format (will go away someday)
XDataFormImpl dataForm = new XDataFormImpl(DataForm.TYPE_RESULT);
for (org.xmpp.forms.FormField formField : metadataForm.getFields()) {
XFormFieldImpl field = new XFormFieldImpl(formField.getVariable());
field.setLabel(formField.getLabel());
for (String value : formField.getValues()) {
field.addValue(value);
}
dataForm.addField(field);
}
return dataForm;
}
......@@ -592,13 +589,14 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
public Iterator<Element> getItems(String name, String node, JID senderJID) {
List<Element> answer = new ArrayList<Element>();
String serviceDomain = getServiceDomain();
if (name == null && node == null) {
Element item;
// Answer all public nodes as items
for (Node pubNode : nodes.values()) {
// Answer all first level nodes
for (Node pubNode : rootCollectionNode.getNodes()) {
if (canDiscoverNode(pubNode)) {
item = DocumentHelper.createElement("item");
item.addAttribute("jid", getServiceDomain());
item.addAttribute("jid", serviceDomain);
item.addAttribute("node", pubNode.getNodeID());
item.addAttribute("name", pubNode.getName());
answer.add(item);
......@@ -614,7 +612,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
for (Node nestedNode : pubNode.getNodes()) {
if (canDiscoverNode(nestedNode)) {
item = DocumentHelper.createElement("item");
item.addAttribute("jid", getServiceDomain());
item.addAttribute("jid", serviceDomain);
item.addAttribute("node", nestedNode.getNodeID());
item.addAttribute("name", nestedNode.getName());
answer.add(item);
......@@ -626,12 +624,16 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
Element item;
for (PublishedItem publishedItem : ((LeafNode) pubNode).getPublishedItems()) {
item = DocumentHelper.createElement("item");
item.addAttribute("jid", getServiceDomain());
item.addAttribute("jid", serviceDomain);
item.addAttribute("name", publishedItem.getID());
answer.add(item);
}
}
}
else {
// Answer null to indicate that specified item was not found
return null;
}
}
return answer.iterator();
}
......@@ -680,7 +682,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
private boolean canDiscoverNode(Node pubNode) {
return false; //TODO this
return true;
}
/**
......
......@@ -77,16 +77,16 @@ public class PubSubPersistenceManager {
"DELETE FROM pubsubAffiliation WHERE serviceID=? AND nodeID=?";
private static final String LOAD_SUBSCRIPTIONS =
"SELECT nodeID, id, jid, owner, state, confPending, deliver, digest, " +
"digest_frequency, expire, includeBody, showValues, subscriptionType, " +
"subscriptionDepth, keyword FROM pubsubSubscription WHERE serviceID=? ORDER BY nodeID";
"SELECT nodeID, id, jid, owner, state, deliver, digest, digest_frequency, " +
"expire, includeBody, showValues, subscriptionType, subscriptionDepth, " +
"keyword FROM pubsubSubscription WHERE serviceID=? ORDER BY nodeID";
private static final String ADD_SUBSCRIPTION =
"INSERT INTO pubsubSubscription (serviceID, nodeID, id, jid, owner, state, " +
"confPending, deliver, digest, digest_frequency, expire, includeBody, showValues, " +
"deliver, digest, digest_frequency, expire, includeBody, showValues, " +
"subscriptionType, subscriptionDepth, keyword) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
private static final String UPDATE_SUBSCRIPTION =
"UPDATE pubsubSubscription SET owner=?, state=?, confPending=? deliver=?, digest=?, " +
"UPDATE pubsubSubscription SET owner=?, state=?, deliver=?, digest=?, " +
"digest_frequency=?, expire=?, includeBody=?, showValues=?, subscriptionType=?, " +
"subscriptionDepth=?, keyword=? WHERE serviceID=? AND nodeID=? AND id=?";
private static final String DELETE_SUBSCRIPTION =
......@@ -94,6 +94,9 @@ public class PubSubPersistenceManager {
private static final String DELETE_SUBSCRIPTIONS =
"DELETE FROM pubsubSubscription WHERE serviceID=? AND nodeID=?";
private static final String LOAD_ALL_ITEMS =
"SELECT id,jid,creationDate,payload,nodeID FROM pubsubItem " +
"WHERE serviceID=? ORDER BY creationDate";
private static final String LOAD_ITEMS =
"SELECT id,jid,creationDate,payload FROM pubsubItem " +
"WHERE serviceID=? AND nodeID=? ORDER BY creationDate";
......@@ -383,6 +386,17 @@ public class PubSubPersistenceManager {
loadSubscriptions(service, nodes, rs);
}
rs.close();
// TODO We may need to optimize memory consumption and load items on-demand
// Load published items of all nodes
pstmt = con.prepareStatement(LOAD_ALL_ITEMS);
pstmt.setString(1, service.getServiceID());
rs = pstmt.executeQuery();
// Add to each node the correspondiding subscriptions
while(rs.next()) {
loadItems(nodes, rs);
}
rs.close();
}
catch (SQLException sqle) {
Log.error(sqle);
......@@ -509,18 +523,17 @@ public class PubSubPersistenceManager {
NodeSubscription.State state = NodeSubscription.State.valueOf(rs.getString(5));
NodeSubscription subscription =
new NodeSubscription(service, node, owner, subscriber, state, subID);
subscription.setConfigurationPending(rs.getInt(6) == 1);
subscription.setShouldDeliverNotifications(rs.getInt(7) == 1);
subscription.setUsingDigest(rs.getInt(8) == 1);
subscription.setDigestFrequency(rs.getInt(9));
if (rs.getString(10) != null) {
subscription.setExpire(new Date(Long.parseLong(rs.getString(10).trim())));
subscription.setShouldDeliverNotifications(rs.getInt(6) == 1);
subscription.setUsingDigest(rs.getInt(7) == 1);
subscription.setDigestFrequency(rs.getInt(8));
if (rs.getString(9) != null) {
subscription.setExpire(new Date(Long.parseLong(rs.getString(9).trim())));
}
subscription.setIncludingBody(rs.getInt(11) == 1);
subscription.setPresenceStates(decodeWithComma(rs.getString(12)));
subscription.setType(NodeSubscription.Type.valueOf(rs.getString(13)));
subscription.setDepth(rs.getInt(14));
subscription.setKeyword(rs.getString(15));
subscription.setIncludingBody(rs.getInt(10) == 1);
subscription.setPresenceStates(decodeWithComma(rs.getString(11)));
subscription.setType(NodeSubscription.Type.valueOf(rs.getString(12)));
subscription.setDepth(rs.getInt(13));
subscription.setKeyword(rs.getString(14));
// Indicate the subscription that is has already been saved to the database
subscription.setSavedToDB(true);
node.addSubscription(subscription);
......@@ -530,6 +543,42 @@ public class PubSubPersistenceManager {
}
}
private static void loadItems(Map<String, Node> nodes, ResultSet rs) {
SAXReader xmlReader = null;
try {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
String nodeID = rs.getString(5);
LeafNode node = (LeafNode) nodes.get(nodeID);
if (node == null) {
Log.warn("Published Item found for a non-existent node: " + nodeID);
return;
}
String itemID = rs.getString(1);
JID publisher = new JID(rs.getString(2));
Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
// Create the item
PublishedItem item = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item
if (rs.getString(4) != null) {
item.setPayload(
xmlReader.read(new StringReader(rs.getString(4))).getRootElement());
}
// Add the published item to the node
node.addPublishedItem(item);
}
catch (Exception sqle) {
Log.error(sqle);
}
finally {
// Return the sax reader to the pool
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
}
}
/**
* Update the DB with the new affiliation of the user in the node.
*
......@@ -638,22 +687,21 @@ public class PubSubPersistenceManager {
pstmt.setString(4, subscription.getJID().toString());
pstmt.setString(5, subscription.getOwner().toString());
pstmt.setString(6, subscription.getState().name());
pstmt.setInt(7, (subscription.isConfigurationPending() ? 1 : 0));
pstmt.setInt(8, (subscription.shouldDeliverNotifications() ? 1 : 0));
pstmt.setInt(9, (subscription.isUsingDigest() ? 1 : 0));
pstmt.setInt(10, subscription.getDigestFrequency());
pstmt.setInt(7, (subscription.shouldDeliverNotifications() ? 1 : 0));
pstmt.setInt(8, (subscription.isUsingDigest() ? 1 : 0));
pstmt.setInt(9, subscription.getDigestFrequency());
Date expireDate = subscription.getExpire();
if (expireDate == null) {
pstmt.setString(11, null);
pstmt.setString(10, null);
}
else {
pstmt.setString(11, StringUtils.dateToMillis(expireDate));
pstmt.setString(10, StringUtils.dateToMillis(expireDate));
}
pstmt.setInt(12, (subscription.isIncludingBody() ? 1 : 0));
pstmt.setString(13, encodeWithComma(subscription.getPresenceStates()));
pstmt.setString(14, subscription.getType().name());
pstmt.setInt(15, subscription.getDepth());
pstmt.setString(16, subscription.getKeyword());
pstmt.setInt(11, (subscription.isIncludingBody() ? 1 : 0));
pstmt.setString(12, encodeWithComma(subscription.getPresenceStates()));
pstmt.setString(13, subscription.getType().name());
pstmt.setInt(14, subscription.getDepth());
pstmt.setString(15, subscription.getKeyword());
pstmt.executeUpdate();
// Indicate the subscription that is has been saved to the database
subscription.setSavedToDB(true);
......@@ -672,25 +720,24 @@ public class PubSubPersistenceManager {
pstmt = con.prepareStatement(UPDATE_SUBSCRIPTION);
pstmt.setString(1, subscription.getOwner().toString());
pstmt.setString(2, subscription.getState().name());
pstmt.setInt(3, (subscription.isConfigurationPending() ? 1 : 0));
pstmt.setInt(4, (subscription.shouldDeliverNotifications() ? 1 : 0));
pstmt.setInt(5, (subscription.isUsingDigest() ? 1 : 0));
pstmt.setInt(6, subscription.getDigestFrequency());
pstmt.setInt(3, (subscription.shouldDeliverNotifications() ? 1 : 0));
pstmt.setInt(4, (subscription.isUsingDigest() ? 1 : 0));
pstmt.setInt(5, subscription.getDigestFrequency());
Date expireDate = subscription.getExpire();
if (expireDate == null) {
pstmt.setString(7, null);
pstmt.setString(6, null);
}
else {
pstmt.setString(7, StringUtils.dateToMillis(expireDate));
pstmt.setString(6, StringUtils.dateToMillis(expireDate));
}
pstmt.setInt(8, (subscription.isIncludingBody() ? 1 : 0));
pstmt.setString(9, encodeWithComma(subscription.getPresenceStates()));
pstmt.setString(10, subscription.getType().name());
pstmt.setInt(11, subscription.getDepth());
pstmt.setString(12, subscription.getKeyword());
pstmt.setString(13, service.getServiceID());
pstmt.setString(14, node.getNodeID());
pstmt.setString(15, subscription.getID());
pstmt.setInt(7, (subscription.isIncludingBody() ? 1 : 0));
pstmt.setString(8, encodeWithComma(subscription.getPresenceStates()));
pstmt.setString(9, subscription.getType().name());
pstmt.setInt(10, subscription.getDepth());
pstmt.setString(11, subscription.getKeyword());
pstmt.setString(12, service.getServiceID());
pstmt.setString(13, node.getNodeID());
pstmt.setString(14, subscription.getID());
pstmt.executeUpdate();
}
}
......
......@@ -34,6 +34,13 @@ import java.util.Collection;
*/
public interface PubSubService {
/**
* Returns the XMPP address of the service.
*
* @return the XMPP address of the service.
*/
JID getAddress();
/**
* Returns a String that uniquely identifies this pubsub service. This information is
* being used when storing node information in the database so it's possible to have
......
......@@ -11,14 +11,14 @@
package org.jivesoftware.wildfire.pubsub.models;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.wildfire.pubsub.Node;
import org.jivesoftware.wildfire.pubsub.NodeSubscription;
import org.jivesoftware.wildfire.pubsub.NodeAffiliate;
import org.jivesoftware.wildfire.pubsub.NodeSubscription;
import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError;
import org.dom4j.Element;
import org.dom4j.DocumentHelper;
import org.dom4j.QName;
/**
* Subscription requests must be approved and only subscribers may retrieve items.
......@@ -43,7 +43,7 @@ public class AuthorizeAccess extends AccessModel {
// Any subscription of this entity that was approved will give him access
// to retrieve the node items
for (NodeSubscription subscription : nodeAffiliate.getSubscriptions()) {
if (subscription.isApproved()) {
if (subscription.isActive()) {
return true;
}
}
......
......@@ -37,7 +37,7 @@ public class OnlySubscribers extends PublisherModel {
}
// Grant access if at least one subscription of this user was approved
for (NodeSubscription subscription : nodeAffiliate.getSubscriptions()) {
if (subscription.isApproved()) {
if (subscription.isActive()) {
return true;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment