Commit 7f587e62 authored by Matt Tucker's avatar Matt Tucker Committed by matt

Additional pub-sub work.

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3696 b35dd754-fafc-0310-a699-88a17e54d16e
parent c22240a1
......@@ -182,6 +182,13 @@ public class CollectionNode extends Node {
broadcastCollectionNodeEvent(child, message);
}
protected void deletingNode() {
// Update child nodes to use the parent node of this node as the new parent node
for (Node node : getNodes()) {
node.changeParent(parent);
}
}
private void broadcastCollectionNodeEvent(Node child, Message notification) {
// Get affected subscriptions (of this node and all parent nodes)
Collection<NodeSubscription> subscriptions = new ArrayList<NodeSubscription>();
......
......@@ -103,15 +103,16 @@ public class LeafNode extends Node {
maxPublishedItems = values.size() > 0 ? Integer.parseInt(values.get(0)) : 50;
}
}
// Remove stored published items based on the new max items
while (!publishedItems.isEmpty() && publishedItems.size() > maxPublishedItems) {
PublishedItem removedItem = publishedItems.remove(0);
itemsByID.remove(removedItem.getID());
// Add the removed item to the queue of items to delete from the database. The
// queue is going to be processed by another thread
service.getPubSubEngine().queueItemToRemove(removedItem);
synchronized (publishedItems) {
// Remove stored published items based on the new max items
while (!publishedItems.isEmpty() && publishedItems.size() > maxPublishedItems) {
PublishedItem removedItem = publishedItems.remove(0);
itemsByID.remove(removedItem.getID());
// Add the removed item to the queue of items to delete from the database. The
// queue is going to be processed by another thread
service.getPubSubEngine().queueItemToRemove(removedItem);
}
}
}
protected void addFormFields(DataForm form, boolean isEditing) {
......@@ -152,6 +153,19 @@ public class LeafNode extends Node {
}
protected void deletingNode() {
synchronized (publishedItems) {
// Remove stored published items
while (!publishedItems.isEmpty()) {
PublishedItem removedItem = publishedItems.remove(0);
itemsByID.remove(removedItem.getID());
// Add the removed item to the queue of items to delete from the database. The
// queue is going to be processed by another thread
service.getPubSubEngine().queueItemToRemove(removedItem);
}
}
}
void addPublishedItem(PublishedItem item) {
synchronized (publishedItems) {
publishedItems.add(item);
......
......@@ -197,9 +197,10 @@ public abstract class Node {
* to become a node owner.
*
* @param jid the JID of the user being added as a node owner.
* @return the newly created or modified affiliation to the node.
*/
public void addOwner(JID jid) {
addAffiliation(jid, NodeAffiliate.Affiliation.owner);
public NodeAffiliate addOwner(JID jid) {
NodeAffiliate nodeAffiliate = addAffiliation(jid, NodeAffiliate.Affiliation.owner);
Collection<NodeSubscription> subscriptions = getSubscriptions(jid);
if (subscriptions.isEmpty()) {
// User does not have a subscription with the node so create a default one
......@@ -213,6 +214,7 @@ public abstract class Node {
}
}
}
return nodeAffiliate;
}
/**
......@@ -240,9 +242,10 @@ public abstract class Node {
* to become a node publisher.
*
* @param jid the JID of the user being added as a node publisher.
* @return the newly created or modified affiliation to the node.
*/
public void addPublisher(JID jid) {
addAffiliation(jid, NodeAffiliate.Affiliation.publisher);
public NodeAffiliate addPublisher(JID jid) {
NodeAffiliate nodeAffiliate = addAffiliation(jid, NodeAffiliate.Affiliation.publisher);
Collection<NodeSubscription> subscriptions = getSubscriptions(jid);
if (subscriptions.isEmpty()) {
// User does not have a subscription with the node so create a default one
......@@ -256,6 +259,7 @@ public abstract class Node {
}
}
}
return nodeAffiliate;
}
/**
......@@ -283,9 +287,10 @@ public abstract class Node {
* to become a none affiliate. Affiliates of type none are allowed to subscribe to the node.
*
* @param jid the JID of the user with affiliation "none".
* @return the newly created or modified affiliation to the node.
*/
public void addNoneAffiliation(JID jid) {
addAffiliation(jid, NodeAffiliate.Affiliation.none);
public NodeAffiliate addNoneAffiliation(JID jid) {
return addAffiliation(jid, NodeAffiliate.Affiliation.none);
}
/**
......@@ -293,11 +298,13 @@ public abstract class Node {
* able to publish or subscribe to the node. Existing subscriptions will be deleted.
*
* @param jid the JID of the user that is no longer able to publish or subscribe to the node.
* @return the newly created or modified affiliation to the node.
*/
public void addOutcast(JID jid) {
addAffiliation(jid, NodeAffiliate.Affiliation.outcast);
public NodeAffiliate addOutcast(JID jid) {
NodeAffiliate nodeAffiliate = addAffiliation(jid, NodeAffiliate.Affiliation.outcast);
// Delete existing subscriptions
removeSubscriptions(jid);
return nodeAffiliate;
}
/**
......@@ -309,14 +316,14 @@ public abstract class Node {
removeAffiliation(jid, NodeAffiliate.Affiliation.outcast);
}
private void addAffiliation(JID jid, NodeAffiliate.Affiliation affiliation) {
private NodeAffiliate addAffiliation(JID jid, NodeAffiliate.Affiliation affiliation) {
boolean created = false;
// Get the current affiliation of the specified JID
NodeAffiliate affiliate = getAffiliate(jid);
// Check if the user already has the same affiliation
if (affiliate != null && affiliation == affiliate.getAffiliation()) {
// Do nothing since the user already has the expected affiliation
return;
return affiliate;
}
else if (affiliate != null) {
// Update existing affiliation with new affiliation type
......@@ -334,6 +341,7 @@ public abstract class Node {
// Add or update the affiliate in the database
PubSubPersistenceManager.saveAffiliation(service, this, affiliate, created);
}
return affiliate;
}
private void removeAffiliation(JID jid, NodeAffiliate.Affiliation affiliation) {
......@@ -1188,6 +1196,67 @@ public abstract class Node {
return presenceBasedDelivery;
}
/**
* Returns true if notifications to the specified user will be delivered when the
* user is online.
*
* @param user the JID of the affiliate that has to be subscribed to the node.
* @return true if notifications are going to be delivered when the user is online.
*/
public boolean isPresenceBasedDelivery(JID user) {
Collection<NodeSubscription> subscriptions = getSubscriptions(user);
if (!subscriptions.isEmpty()) {
if (presenceBasedDelivery) {
// Node sends notifications only to only users so return true
return true;
}
else {
// Check if there is a subscription configured to only send notifications
// based on the user presence
for (NodeSubscription subscription : subscriptions) {
if (!subscription.getPresenceStates().isEmpty()) {
return true;
}
}
}
}
// User is not subscribed to the node so presence subscription is not required
return false;
}
/**
* Returns the JID of the affiliates that are receiving notifications based on their
* presence status.
*
* @return the JID of the affiliates that are receiving notifications based on their
* presence status.
*/
Collection<JID> getPresenceBasedSubscribers() {
Collection<JID> affiliatesJID = new ArrayList<JID>();
if (presenceBasedDelivery) {
// Add JID of all affiliates that are susbcribed to the node
for (NodeAffiliate affiliate : affiliates) {
if (!affiliate.getSubscriptions().isEmpty()) {
affiliatesJID.add(affiliate.getJID());
}
}
}
else {
// Add JID of those affiliates that have a subscription that only wants to be
// notified based on the subscriber presence
for (NodeAffiliate affiliate : affiliates) {
Collection<NodeSubscription> subscriptions = affiliate.getSubscriptions();
for (NodeSubscription subscription : subscriptions) {
if (!subscription.getPresenceStates().isEmpty()) {
affiliatesJID.add(affiliate.getJID());
break;
}
}
}
}
return affiliatesJID;
}
/**
* Returns true if the last published item is going to be sent to new subscribers.
*
......@@ -1676,11 +1745,7 @@ public abstract class Node {
if (parent != null) {
parent.removeChildNode(this);
}
// TODO Update child nodes to use the root node or the parent node of this node as the new parent node
for (Node node : getNodes()) {
//node.changeParent(parent);
}
// TODO Leaf nodes should remove queued items from the pubsub engine (subclass should do this work)
deletingNode();
// Broadcast delete notification to subscribers (if enabled)
if (isNotifiedOfDelete()) {
// Build packet to broadcast to subscribers
......@@ -1708,6 +1773,38 @@ public abstract class Node {
return false;
}
/**
* Notification message indicating that the node is being deleted. Subclasses should
* implement this method to delete any subclass specific information.
*/
protected abstract void deletingNode();
/**
* Changes the parent node of this node. The node ID of the node will not be modified
* based on the new parent so pubsub implementations where node ID has a semantic
* meaning will end up affecting the meaning of the node hierarchy and possibly messing
* up the meaning of the hierarchy.<p>
*
* No notifications are sent due to the new parent adoption process.
*
* @param newParent the new parent node of this node.
*/
protected void changeParent(CollectionNode newParent) {
if (parent != null) {
// Remove this node from the current parent node
parent.removeChildNode(this);
}
// Set the new parent of this node
parent = newParent;
if (parent != null) {
// Add this node to the new parent node
parent.addChildNode(this);
}
if (savedToDB) {
PubSubPersistenceManager.updateNode(service, this);
}
}
/**
* Unsubscribe from affiliates presences if node is only sending notifications to
* only users or only unsubscribe from those subscribers that configured their
......@@ -1746,12 +1843,25 @@ public abstract class Node {
Element childElement = iqRequest.getChildElement().createCopy();
reply.setChildElement(childElement);
for (NodeSubscription subscription : subscriptionsByID.values()) {
Element entity = childElement.addElement("entity");
entity.addAttribute("jid", subscription.getJID().toString());
entity.addAttribute("affiliation", subscription.getAffiliate().getAffiliation().name());
entity.addAttribute("subscription", subscription.getState().name());
entity.addAttribute("subid", subscription.getID());
for (NodeAffiliate affiliate : affiliates) {
Collection<NodeSubscription> subscriptions = affiliate.getSubscriptions();
if (subscriptions.isEmpty()) {
Element entity = childElement.addElement("entity");
entity.addAttribute("jid", affiliate.getJID().toString());
entity.addAttribute("affiliation", affiliate.getAffiliation().name());
entity.addAttribute("subscription", "none");
}
else {
for (NodeSubscription subscription : subscriptions) {
Element entity = childElement.addElement("entity");
entity.addAttribute("jid", subscription.getJID().toString());
entity.addAttribute("affiliation", affiliate.getAffiliation().name());
entity.addAttribute("subscription", subscription.getState().name());
if (isMultipleSubscriptionsEnabled()) {
entity.addAttribute("subid", subscription.getID());
}
}
}
}
}
......
......@@ -125,8 +125,6 @@ public class NodeSubscription {
*/
private boolean savedToDB = false;
// TODO Implement presence subscription from the node to the subscriber to figure out if event notifications can be sent
static {
dateFormat = new SimpleDateFormat("yyyy-MM-DD'T'HH:mm:ss.SSS'Z'");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
......
......@@ -18,6 +18,8 @@ import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.XMPPServerListener;
import org.jivesoftware.wildfire.commands.AdHocCommandManager;
import org.jivesoftware.wildfire.pubsub.models.AccessModel;
import org.jivesoftware.wildfire.user.UserManager;
......@@ -26,6 +28,7 @@ import org.xmpp.forms.FormField;
import org.xmpp.packet.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
......@@ -40,6 +43,15 @@ public class PubSubEngine {
* Manager that keeps the list of ad-hoc commands and processing command requests.
*/
private AdHocCommandManager manager;
/**
* Keep a registry of the presence's show value of users that subscribed to a node of
* the pubsub service and for which the node only delivers notifications for online users
* or node subscriptions deliver events based on the user presence show value. Offline
* users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key
* is full JID of connected resource and value is show value of the last received presence.
*/
private Map<String, Map<String, String>> barePresences =
new ConcurrentHashMap<String, Map<String, String>>();
/**
* The time to elapse between each execution of the maintenance process. Default
* is 2 minutes.
......@@ -204,8 +216,13 @@ public class PubSubEngine {
}
action = childElement.element("entities");
if (action != null) {
// Owner requests all affiliated entities
getAffiliatedEntities(iq, action);
if (IQ.Type.get == iq.getType()) {
// Owner requests all affiliated entities
getAffiliatedEntities(iq, action);
}
else {
modifyAffiliations(iq, action);
}
return true;
}
action = childElement.element("purge");
......@@ -227,12 +244,37 @@ public class PubSubEngine {
}
/**
* Handles Presence packets sent to the pubsub service.
* Handles Presence packets sent to the pubsub service. Only process available and not
* available presences.
*
* @param presence the Presence packet sent to the pubsub service.
*/
public void process(Presence presence) {
// TODO Handle received presence of users the service has subscribed
if (presence.isAvailable()) {
JID subscriber = presence.getFrom();
Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());
if (fullPresences == null) {
synchronized (subscriber.toBareJID().intern()) {
fullPresences = barePresences.get(subscriber.toBareJID());
if (fullPresences == null) {
fullPresences = new ConcurrentHashMap<String, String>();
barePresences.put(subscriber.toBareJID(), fullPresences);
}
}
}
Presence.Show show = presence.getShow();
fullPresences.put(subscriber.toString(), show == null ? "online" : show.name());
}
else if (presence.getType() == Presence.Type.unavailable) {
JID subscriber = presence.getFrom();
Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());
if (fullPresences != null) {
fullPresences.remove(subscriber.toString());
if (fullPresences.isEmpty()) {
barePresences.remove(subscriber.toBareJID());
}
}
}
}
/**
......@@ -523,7 +565,7 @@ public class PubSubEngine {
}
}
}
if (nodeAffiliate != null && isNodeType) {
if (nodeAffiliate != null) {
for (NodeSubscription subscription : nodeAffiliate.getSubscriptions()) {
if (isNodeType) {
// User is requesting a subscription of type "nodes"
......@@ -955,7 +997,7 @@ public class PubSubEngine {
// Get sender of the IQ packet
JID from = iq.getFrom();
// Verify that sender has permissions to create nodes
if (!service.canCreateNode(from)) {
if (!service.canCreateNode(from) || !UserManager.getInstance().isRegisteredUser(from)) {
// The user is not allowed to create nodes so return an error
sendErrorPacket(iq, PacketError.Condition.forbidden, null);
return;
......@@ -1206,6 +1248,11 @@ public class PubSubEngine {
sendErrorPacket(iq, PacketError.Condition.forbidden, null);
return;
}
if (node.isRootCollectionNode()) {
// Root collection node cannot be deleted. Return not-allowed error
sendErrorPacket(iq, PacketError.Condition.not_allowed, null);
return;
}
// Delete the node
if (node.delete()) {
......@@ -1282,6 +1329,132 @@ public class PubSubEngine {
node.sendAffiliatedEntities(iq);
}
private void modifyAffiliations(IQ iq, Element entitiesElement) {
String nodeID = entitiesElement.attributeValue("node");
if (nodeID == null) {
// NodeID was not provided. Return bad-request error.
sendErrorPacket(iq, PacketError.Condition.bad_request, null);
return;
}
Node node = service.getNode(nodeID);
if (node == null) {
// Node does not exist. Return item-not-found error.
sendErrorPacket(iq, PacketError.Condition.item_not_found, null);
return;
}
if (!node.isAdmin(iq.getFrom())) {
// Requesting entity is prohibited from getting affiliates list. Return forbidden error.
sendErrorPacket(iq, PacketError.Condition.forbidden, null);
return;
}
IQ reply = IQ.createResultIQ(iq);
Collection<JID> invalidAffiliates = new ArrayList<JID>();
// Process modifications or creations of affiliations and subscriptions.
for (Iterator it = entitiesElement.elementIterator("entity"); it.hasNext();) {
Element entity = (Element) it.next();
JID subscriber = new JID(entity.attributeValue("jid"));
// TODO Assumed that the owner of the subscription is the bare JID of the subscription JID. Waiting StPeter answer for explicit field.
JID owner = new JID(subscriber.toBareJID());
String newAffiliation = entity.attributeValue("affiliation");
String subStatus = entity.attributeValue("subscription");
String subID = entity.attributeValue("subid");
if (newAffiliation != null) {
// Get current affiliation of this user (if any)
NodeAffiliate affiliate = node.getAffiliate(owner);
// Check that we are not removing the only owner of the node
if (affiliate != null && !affiliate.getAffiliation().name().equals(newAffiliation)) {
// Trying to modify an existing affiliation
if (affiliate.getAffiliation() == NodeAffiliate.Affiliation.owner &&
node.getOwners().size() == 1) {
// Trying to remove the unique owner of the node. Include in error answer.
invalidAffiliates.add(owner);
continue;
}
}
// Owner is setting affiliations for new entities or modifying
// existing affiliations
if ("owner".equals(newAffiliation)) {
node.addOwner(owner);
}
else if ("publisher".equals(newAffiliation)) {
node.addPublisher(owner);
}
else if ("none".equals(newAffiliation)) {
node.addNoneAffiliation(owner);
}
else {
node.addOutcast(owner);
}
}
// Process subscriptions changes
if (subStatus != null) {
// Get current subscription (if any)
NodeSubscription subscription = null;
if (node.isMultipleSubscriptionsEnabled()) {
if (subID != null) {
subscription = node.getSubscription(subID);
}
}
else {
subscription = node.getSubscription(subscriber);
}
if ("none".equals(subStatus) && subscription != null) {
// Owner is cancelling an existing subscription
node.cancelSubscription(subscription);
}
else if ("subscribed".equals(subStatus)) {
if (subscription != null) {
// Owner is approving a subscription (i.e. making active)
node.approveSubscription(subscription, true);
}
else {
// Owner is creating a subscription for an entity to the node
node.createSubscription(null, owner, subscriber, false, null);
}
}
}
}
// Process invalid entities that tried to remove node owners. Send original affiliation
// of the invalid entities.
if (!invalidAffiliates.isEmpty()) {
reply.setError(PacketError.Condition.not_acceptable);
Element child =
reply.setChildElement("pubsub", "http://jabber.org/protocol/pubsub#owner");
Element entities = child.addElement("entities");
if (!node.isRootCollectionNode()) {
entities.addAttribute("node", node.getNodeID());
}
for (JID affiliateJID : invalidAffiliates) {
NodeAffiliate affiliate = node.getAffiliate(affiliateJID);
Collection<NodeSubscription> subscriptions = affiliate.getSubscriptions();
if (subscriptions.isEmpty()) {
Element entity = entities.addElement("entity");
entity.addAttribute("jid", affiliate.getJID().toString());
entity.addAttribute("affiliation", affiliate.getAffiliation().name());
entity.addAttribute("subscription", "none");
}
else {
for (NodeSubscription subscription : subscriptions) {
Element entity = entities.addElement("entity");
entity.addAttribute("jid", subscription.getJID().toString());
entity.addAttribute("affiliation", affiliate.getAffiliation().name());
entity.addAttribute("subscription", subscription.getState().name());
if (node.isMultipleSubscriptionsEnabled()) {
entity.addAttribute("subid", subscription.getID());
}
}
}
}
}
// Send reply
router.route(reply);
}
/**
* Terminates the subscription of the specified entity to all nodes hosted at the service.
* The affiliation with the node will be removed if the entity was not a node owner or
......@@ -1334,7 +1507,7 @@ public class PubSubEngine {
*
* @param packet the packet to be bounced.
*/
private void sendErrorPacket(IQ packet, PacketError.Condition error, Element pubsubError) {
void sendErrorPacket(IQ packet, PacketError.Condition error, Element pubsubError) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(error);
......@@ -1403,6 +1576,29 @@ public class PubSubEngine {
return completedForm;
}
public void start() {
// Probe presences of users that this service has subscribed to (once the server
// has started)
XMPPServer.getInstance().addServerListener(new XMPPServerListener() {
public void serverStarted() {
Set<JID> affiliates = new HashSet<JID>();
for (Node node : service.getNodes()) {
affiliates.addAll(node.getPresenceBasedSubscribers());
}
for (JID jid : affiliates) {
// Send probe presence
Presence subscription = new Presence(Presence.Type.probe);
subscription.setTo(jid);
subscription.setFrom(service.getAddress());
service.send(subscription);
}
}
public void serverStopping() {
}
});
}
public void shutdown() {
// Stop te maintenance processes
timer.cancel();
......@@ -1425,6 +1621,89 @@ public class PubSubEngine {
manager.stop();
}
/*******************************************************************************
* Methods related to presence subscriptions to subscribers' presence.
******************************************************************************/
/**
* Returns the show values of the last know presence of all connected resources of the
* specified subscriber. When the subscriber JID is a bare JID then the answered collection
* will have many entries one for each connected resource. Moreover, if the user
* is offline then an empty collectin is returned. Available show status is represented
* by a <tt>online</tt> value. The rest of the possible show values as defined in RFC 3921.
*
* @param subscriber the JID of the subscriber. This is not the JID of the affiliate.
* @return an empty collection when offline. Otherwise, a collection with the show value
* of each connected resource.
*/
public Collection<String> getShowPresences(JID subscriber) {
Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());
if (fullPresences == null) {
// User is offline so return empty list
return Collections.emptyList();
}
if (subscriber.getResource() == null) {
// Subscriber used bared JID so return show value of all connected resources
return fullPresences.values();
}
else {
// Look for the show value using the full JID
String show = fullPresences.get(subscriber.toString());
if (show == null) {
// User at the specified resource is offline so return empty list
return Collections.emptyList();
}
// User is connected at specified resource so answer list with presence show value
return Arrays.asList(show);
}
}
/**
* Requests the pubsub service to subscribe to the presence of the user. If the service
* has already subscribed to the user's presence then do nothing.
*
* @param node the node that originated the subscription request.
* @param user the JID of the affiliate to subscribe to his presence.
*/
public void presenceSubscriptionNotRequired(Node node, JID user) {
// Check that no node is requiring to be subscribed to this user
for (Node hostedNode : service.getNodes()) {
if (hostedNode.isPresenceBasedDelivery(user)) {
// Do not unsubscribe since presence subscription is still required
return;
}
}
// Unscribe from the user presence
Presence subscription = new Presence(Presence.Type.unsubscribe);
subscription.setTo(user);
subscription.setFrom(service.getAddress());
service.send(subscription);
}
/**
* Requests the pubsub service to unsubscribe from the presence of the user. If the service
* was not subscribed to the user's presence or any node still requires to be subscribed to
* the user presence then do nothing.
*
* @param node the node that originated the unsubscription request.
* @param user the JID of the affiliate to unsubscribe from his presence.
*/
public void presenceSubscriptionRequired(Node node, JID user) {
Map<String, String> fullPresences = barePresences.get(user.toString());
if (fullPresences == null || fullPresences.isEmpty()) {
Presence subscription = new Presence(Presence.Type.subscribe);
subscription.setTo(user);
subscription.setFrom(service.getAddress());
service.send(subscription);
// Sending subscription requests based on received presences may generate
// that a sunscription request is sent to an offline user (since offline
// presences are not stored in "barePresences"). However, this not optimal
// algorithm shouldn't bother the user since the user's server should reply
// when already subscribed to the user's presence instead of asking the user
// to accept the subscription request
}
}
/*******************************************************************************
* Methods related to PubSub maintenance tasks. Such as
* saving or deleting published items.
......
......@@ -101,16 +101,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
*/
private PubSubEngine engine = null;
/**
* Keep a registry of the presence's show value of users that subscribed to a node of
* the pubsub service and for which the node only delivers notifications for online users
* or node subscriptions deliver events based on the user presence show value. Offline
* users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key
* is full JID of connected resource and value is show value of the last received presence.
*/
private Map<String, Map<String, String>> barePresences =
new ConcurrentHashMap<String, Map<String, String>>();
public PubSubModule() {
super("Publish Subscribe Service");
}
......@@ -179,7 +169,8 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
}
else {
// TODO Handle unknown namespace
// Unknown namespace requested so return error to sender
engine.sendErrorPacket(iq, PacketError.Condition.service_unavailable, null);
}
}
......@@ -193,7 +184,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
// The user is not allowed to create nodes
return false;
}
// TODO Check that the user is not an anonymous user
return true;
}
......@@ -221,39 +211,15 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
public Collection<String> getShowPresences(JID subscriber) {
Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());
if (fullPresences == null) {
// User is offline so return empty list
return Collections.emptyList();
}
if (subscriber.getResource() == null) {
// Subscriber used bared JID so return show value of all connected resources
return fullPresences.values();
}
else {
// Look for the show value using the full JID
String show = fullPresences.get(subscriber.toString());
if (show == null) {
// User at the specified resource is offline so return empty list
return Collections.emptyList();
}
// User is connected at specified resource so answer list with presence show value
return Arrays.asList(show);
}
return engine.getShowPresences(subscriber);
}
public void presenceSubscriptionNotRequired(Node node, JID user) {
// TODO Implement this
engine.presenceSubscriptionNotRequired(node, user);
}
public void presenceSubscriptionRequired(Node node, JID user) {
Map<String, String> fullPresences = barePresences.get(user.toString());
if (fullPresences == null || fullPresences.isEmpty()) {
Presence subscription = new Presence(Presence.Type.subscribe);
subscription.setTo(user);
subscription.setFrom(getAddress());
send(subscription);
}
engine.presenceSubscriptionRequired(node, user);
}
public PubSubEngine getPubSubEngine() {
......@@ -442,7 +408,8 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
super.start();
// Add the route to this service
routingTable.addRoute(getAddress(), this);
// TODO Probe presences of users that this service has subscribed to
// Start the pubsub engine
engine.start();
ArrayList<String> params = new ArrayList<String>();
params.clear();
params.add(getServiceDomain());
......@@ -500,10 +467,10 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
identities.add(identity);
}
else if (name == null && node != null) {
else if (name == null) {
// Answer the identity of a given node
Node pubNode = getNode(node);
if (node != null && canDiscoverNode(pubNode)) {
if (canDiscoverNode(pubNode)) {
Element identity = DocumentHelper.createElement("identity");
identity.addAttribute("category", "pubsub");
identity.addAttribute("type", pubNode.isCollectionNode() ? "collection" : "leaf");
......@@ -531,8 +498,8 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
features.add("http://jabber.org/protocol/pubsub#create-nodes");
// Deletion of nodes is supported
features.add("http://jabber.org/protocol/pubsub#delete-nodes");
// TODO Retrieval of pending subscription approvals is supported
//features.add("http://jabber.org/protocol/pubsub#get-pending");
// Retrieval of pending subscription approvals is supported
features.add("http://jabber.org/protocol/pubsub#get-pending");
if (isInstantNodeSupported()) {
// Creation of instant nodes is supported
features.add("http://jabber.org/protocol/pubsub#instant-nodes");
......@@ -574,10 +541,10 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
features.add("http://jabber.org/protocol/pubsub#default_access_model_" + modelName);
}
else if (name == null && node != null) {
else if (name == null) {
// Answer the features of a given node
Node pubNode = getNode(node);
if (node != null && canDiscoverNode(pubNode)) {
if (canDiscoverNode(pubNode)) {
// Answer the features of the PubSub service
features.add("http://jabber.org/protocol/pubsub");
}
......@@ -589,7 +556,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
if (name == null && node != null) {
// Answer the extended info of a given node
Node pubNode = getNode(node);
if (node != null && canDiscoverNode(pubNode)) {
if (canDiscoverNode(pubNode)) {
// Get the metadata data form
org.xmpp.forms.DataForm metadataForm = pubNode.getMetadataForm();
......@@ -615,7 +582,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
// We always have info about the Pubsub service
return true;
}
else if (name == null && node != null) {
else if (name == null) {
// We only have info if the node exists
return hasNode(node);
}
......@@ -638,7 +605,7 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
}
}
else if (name == null && node != null) {
else if (name == null) {
Node pubNode = getNode(node);
if (pubNode != null && canDiscoverNode(pubNode)) {
if (pubNode.isCollectionNode()) {
......
......@@ -152,11 +152,11 @@ public class PubSubPersistenceManager {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(ADD_NODE);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setInt(3, (node.isCollectionNode() ? 0 : 1));
pstmt.setString(4, StringUtils.dateToMillis(node.getCreationDate()));
pstmt.setString(5, StringUtils.dateToMillis(node.getModificationDate()));
pstmt.setString(6, node.getParent() != null ? node.getParent().getNodeID() : null);
pstmt.setString(6, node.getParent() != null ? encodeNodeID(node.getParent().getNodeID()) : null);
pstmt.setInt(7, (node.isPayloadDelivered() ? 1 : 0));
if (!node.isCollectionNode()) {
pstmt.setInt(8, ((LeafNode) node).getMaxPayloadSize());
......@@ -230,7 +230,7 @@ public class PubSubPersistenceManager {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(UPDATE_NODE);
pstmt.setString(1, StringUtils.dateToMillis(node.getModificationDate()));
pstmt.setString(2, node.getParent() != null ? node.getParent().getNodeID() : null);
pstmt.setString(2, node.getParent() != null ? encodeNodeID(node.getParent().getNodeID()) : null);
pstmt.setInt(3, (node.isPayloadDelivered() ? 1 : 0));
if (!node.isCollectionNode()) {
pstmt.setInt(4, ((LeafNode) node).getMaxPayloadSize());
......@@ -278,7 +278,7 @@ public class PubSubPersistenceManager {
pstmt.setInt(29, 0);
}
pstmt.setString(30, service.getServiceID());
pstmt.setString(31, node.getNodeID());
pstmt.setString(31, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
}
catch (SQLException sqle) {
......@@ -308,28 +308,28 @@ public class PubSubPersistenceManager {
// Remove the affiliate from the table of node affiliates
pstmt = con.prepareStatement(DELETE_NODE);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
pstmt.close();
// Remove published items of the node being deleted
pstmt = con.prepareStatement(DELETE_ITEMS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
pstmt.close();
// Remove all affiliates from the table of node affiliates
pstmt = con.prepareStatement(DELETE_AFFILIATIONS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
pstmt.close();
// Remove users that were subscribed to the node
pstmt = con.prepareStatement(DELETE_SUBSCRIPTIONS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
}
catch (SQLException sqle) {
......@@ -420,11 +420,11 @@ public class PubSubPersistenceManager {
private static void loadNode(PubSubService service, Map<String, Node> loadedNodes,
ResultSet rs) {
Node node = null;
Node node;
try {
String nodeID = rs.getString(1);
String nodeID = decodeNodeID(rs.getString(1));
boolean leaf = rs.getInt(2) == 1;
String parent = rs.getString(5);
String parent = decodeNodeID(rs.getString(5));
JID creator = new JID(rs.getString(24));
CollectionNode parentNode = null;
if (parent != null) {
......@@ -488,12 +488,11 @@ public class PubSubPersistenceManager {
catch (SQLException sqle) {
Log.error(sqle);
}
return;
}
private static void loadAffiliations(Map<String, Node> nodes, ResultSet rs) {
try {
String nodeID = rs.getString(1);
String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID);
if (node == null) {
Log.warn("Affiliations found for a non-existent node: " + nodeID);
......@@ -511,7 +510,7 @@ public class PubSubPersistenceManager {
private static void loadSubscriptions(PubSubService service, Map<String, Node> nodes,
ResultSet rs) {
try {
String nodeID = rs.getString(1);
String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID);
if (node == null) {
Log.warn("Subscription found for a non-existent node: " + nodeID);
......@@ -554,7 +553,7 @@ public class PubSubPersistenceManager {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
String nodeID = rs.getString(5);
String nodeID = decodeNodeID(rs.getString(5));
LeafNode node = (LeafNode) nodes.get(nodeID);
if (node == null) {
Log.warn("Published Item found for a non-existent node: " + nodeID);
......@@ -602,7 +601,7 @@ public class PubSubPersistenceManager {
// Add the user to the generic affiliations table
pstmt = con.prepareStatement(ADD_AFFILIATION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, affiliate.getJID().toString());
pstmt.setString(4, affiliate.getAffiliation().name());
pstmt.executeUpdate();
......@@ -612,7 +611,7 @@ public class PubSubPersistenceManager {
pstmt = con.prepareStatement(UPDATE_AFFILIATION);
pstmt.setString(1, affiliate.getAffiliation().name());
pstmt.setString(2, service.getServiceID());
pstmt.setString(3, node.getNodeID());
pstmt.setString(3, encodeNodeID(node.getNodeID()));
pstmt.setString(4, affiliate.getJID().toString());
pstmt.executeUpdate();
}
......@@ -644,7 +643,7 @@ public class PubSubPersistenceManager {
// Remove the affiliate from the table of node affiliates
pstmt = con.prepareStatement(DELETE_AFFILIATION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, affiliate.getJID().toString());
pstmt.executeUpdate();
}
......@@ -677,7 +676,7 @@ public class PubSubPersistenceManager {
// Add the subscription of the user to the database
pstmt = con.prepareStatement(ADD_SUBSCRIPTION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, subscription.getID());
pstmt.setString(4, subscription.getJID().toString());
pstmt.setString(5, subscription.getOwner().toString());
......@@ -706,7 +705,7 @@ public class PubSubPersistenceManager {
// Remove the subscription of the user from the table
pstmt = con.prepareStatement(DELETE_SUBSCRIPTION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(2, subscription.getID());
pstmt.executeUpdate();
}
......@@ -731,7 +730,7 @@ public class PubSubPersistenceManager {
pstmt.setInt(10, subscription.getDepth());
pstmt.setString(11, subscription.getKeyword());
pstmt.setString(12, service.getServiceID());
pstmt.setString(13, node.getNodeID());
pstmt.setString(13, encodeNodeID(node.getNodeID()));
pstmt.setString(14, subscription.getID());
pstmt.executeUpdate();
}
......@@ -764,7 +763,7 @@ public class PubSubPersistenceManager {
// Remove the affiliate from the table of node affiliates
pstmt = con.prepareStatement(DELETE_SUBSCRIPTION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, subscription.getID());
pstmt.executeUpdate();
}
......@@ -796,7 +795,7 @@ public class PubSubPersistenceManager {
// Get published items of the specified node
pstmt = con.prepareStatement(LOAD_ITEMS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
ResultSet rs = pstmt.executeQuery();
// Rebuild loaded published items
while(rs.next()) {
......@@ -846,7 +845,7 @@ public class PubSubPersistenceManager {
// Remove the published item from the database
pstmt = con.prepareStatement(ADD_ITEM);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, item.getNode().getNodeID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID());
pstmt.setString(4, item.getPublisher().toString());
pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate()));
......@@ -883,7 +882,7 @@ public class PubSubPersistenceManager {
// Remove the published item from the database
pstmt = con.prepareStatement(DELETE_ITEM);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, item.getNode().getNodeID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID());
pstmt.executeUpdate();
// Set that the item was successfully deleted from the database
......@@ -1079,14 +1078,14 @@ public class PubSubPersistenceManager {
PreparedStatement pstmt = null;
try {
pstmt = con.prepareStatement(LOAD_NODE);
pstmt.setString(1, nodeID);
pstmt.setString(1, encodeNodeID(nodeID));
ResultSet rs = pstmt.executeQuery();
if (!rs.next()) {
// No node was found for the specified nodeID so return null
return null;
}
boolean leaf = rs.getInt(1) == 1;
String parent = rs.getString(4);
String parent = decodeNodeID(rs.getString(4));
JID creator = new JID(rs.getString(20));
CollectionNode parentNode = null;
if (parent != null) {
......@@ -1159,7 +1158,7 @@ public class PubSubPersistenceManager {
pstmt.close();
pstmt = con.prepareStatement(LOAD_NODE_AFFILIATIONS);
pstmt.setString(1, node.getNodeID());
pstmt.setString(1, encodeNodeID(node.getNodeID()));
rs = pstmt.executeQuery();
while (rs.next()) {
NodeAffiliate affiliate = new NodeAffiliate(new JID(rs.getString(1)));
......@@ -1248,4 +1247,22 @@ public class PubSubPersistenceManager {
}
return decodedStrings;
}
private static String encodeNodeID(String nodeID) {
if (DbConnectionManager.getDatabaseType() == DbConnectionManager.DatabaseType.oracle &&
"".equals(nodeID)) {
// Oracle stores empty strings as null so return a string with a space
return " ";
}
return nodeID;
}
private static String decodeNodeID(String nodeID) {
if (DbConnectionManager.getDatabaseType() == DbConnectionManager.DatabaseType.oracle &&
" ".equals(nodeID)) {
// Oracle stores empty strings as null so convert them back to empty strings
return "";
}
return nodeID;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment