Commit 98d4a810 authored by Robin Collier's avatar Robin Collier Committed by rcollier

OF-39 Merge from dev branch

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13034 b35dd754-fafc-0310-a699-88a17e54d16e
parent ab38739a
......@@ -23,12 +23,9 @@ package org.jivesoftware.openfire.pep;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TimeZone;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
......@@ -49,7 +46,6 @@ import org.jivesoftware.openfire.pubsub.PubSubEngine;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.jivesoftware.openfire.pubsub.PubSubService;
import org.jivesoftware.openfire.pubsub.PublishedItem;
import org.jivesoftware.openfire.pubsub.PublishedItemTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.openfire.roster.Roster;
......@@ -77,12 +73,6 @@ import org.xmpp.packet.PacketExtension;
*/
public class PEPService implements PubSubService, Cacheable {
/**
* Timer to save published items to the database or remove deleted or old
* items.
*/
private static final Timer timer = new Timer("PEP service maintenance");
/**
* Date format to use for time stamps in delayed event notifications.
*/
......@@ -134,16 +124,6 @@ public class PEPService implements PubSubService, Cacheable {
*/
private Map<String, Map<String, String>> barePresences = new ConcurrentHashMap<String, Map<String, String>>();
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>(10000);
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>(10000);
/**
* Manager that keeps the list of ad-hoc commands and processing command
* requests.
......@@ -155,17 +135,6 @@ public class PEPService implements PubSubService, Cacheable {
*/
private EntityCapabilitiesManager entityCapsManager = EntityCapabilitiesManager.getInstance();
/**
* The time to elapse between each execution of the maintenance process.
* Default is 2 minutes.
*/
private int items_task_timeout = 2 * 60 * 1000;
/**
* Task that saves or deletes published items from the database.
*/
private PublishedItemTask publishedItemTask;
static {
fastDateFormat = FastDateFormat.getInstance(JiveConstants.XMPP_DATETIME_FORMAT, TimeZone.getTimeZone("UTC"));
}
......@@ -184,13 +153,6 @@ public class PEPService implements PubSubService, Cacheable {
adHocCommandManager = new AdHocCommandManager();
adHocCommandManager.addCommand(new PendingSubscriptionsCommand(this));
// Save or delete published items from the database every 2 minutes
// starting in 2 minutes (default values)
publishedItemTask = new PublishedItemTask(this) {
};
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
// Load default configuration for leaf nodes
leafDefaultConfiguration = PubSubPersistenceManager.loadDefaultConfiguration(this, true);
if (leafDefaultConfiguration == null) {
......@@ -551,50 +513,14 @@ public class PEPService implements PubSubService, Cacheable {
}
}
public void queueItemToAdd(PublishedItem newItem) {
PubSubEngine.queueItemToAdd(this, newItem);
}
public void queueItemToRemove(PublishedItem removedItem) {
PubSubEngine.queueItemToRemove(this, removedItem);
}
public Map<String, Map<String, String>> getBarePresences() {
return barePresences;
}
public Queue<PublishedItem> getItemsToAdd() {
return itemsToAdd;
}
public Queue<PublishedItem> getItemsToDelete() {
return itemsToDelete;
}
public AdHocCommandManager getManager() {
return adHocCommandManager;
}
public PublishedItemTask getPublishedItemTask() {
return publishedItemTask;
}
public void setPublishedItemTask(PublishedItemTask task) {
publishedItemTask = task;
}
public Timer getTimer() {
return timer;
}
public int getItemsTaskTimeout() {
return items_task_timeout;
}
public void setItemsTaskTimeout(int timeout) {
items_task_timeout = timeout;
}
public int getCachedSize() {
// Rather arbitrary. Don't use this for size-based eviction policies!
return 600;
......
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2005-2008 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.openfire.pep;
import org.jivesoftware.openfire.pubsub.PublishedItemTask;
/**
* TimerTask that unloads services from memory, after they have been expired
* from cache.
*
* @author Guus der Kinderen, guus.der.kinderen@gmail.com
*/
public class PublishedPEPServiceTask extends PublishedItemTask {
private final PEPServiceManager manager;
public PublishedPEPServiceTask(PEPService service, PEPServiceManager manager) {
super(service);
this.manager = manager;
}
@Override
public void run() {
// Somewhat of a hack to unload the PEPService after it has been removed
// from the cache. New scheduled packets will re-instate the service.
PEPService service = (PEPService) this.getService();
if (manager.hasCachedService(service.getAddress())) {
super.run();
} else {
manager.unload(service);
}
}
}
......@@ -28,6 +28,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
......@@ -49,6 +51,8 @@ import org.xmpp.packet.Message;
public class LeafNode extends Node {
private static final Logger Log = LoggerFactory.getLogger(LeafNode.class);
private static final String genIdSeed = UUID.randomUUID().toString();
private static final AtomicLong sequenceCounter = new AtomicLong();
/**
* Flag that indicates whether to persist items to storage. Note that when the
......@@ -76,8 +80,7 @@ public class LeafNode extends Node {
* not configured to persist items then the last published item will be kept. The list is
* sorted cronologically.
*/
protected final List<PublishedItem> publishedItems = new ArrayList<PublishedItem>();
protected Map<String, PublishedItem> itemsByID = new HashMap<String, PublishedItem>();
volatile private PublishedItem lastPublished;
// TODO Add checking of max payload size. Return <not-acceptable> plus a application specific error condition of <payload-too-big/>.
......@@ -125,13 +128,6 @@ public class LeafNode extends Node {
maxPublishedItems = values.size() > 0 ? Integer.parseInt(values.get(0)) : 50;
}
}
synchronized (publishedItems) {
// Remove stored published items based on the new max items
while (!publishedItems.isEmpty() && isMaxItemsReached())
{
removeItem(0);
}
}
}
@Override
......@@ -175,19 +171,10 @@ public class LeafNode extends Node {
@Override
protected void deletingNode() {
synchronized (publishedItems) {
// Remove stored published items
while (!publishedItems.isEmpty()) {
removeItem(0);
}
}
}
void addPublishedItem(PublishedItem item) {
synchronized (publishedItems) {
publishedItems.add(item);
itemsByID.put(item.getID(), item);
}
void setLastPublishedItem(PublishedItem item) {
lastPublished = item;
}
public int getMaxPayloadSize() {
......@@ -250,43 +237,21 @@ public class LeafNode extends Node {
itemID = item.attributeValue("id");
List entries = item.elements();
payload = entries.isEmpty() ? null : (Element) entries.get(0);
// Create a published item from the published data and add it to the node and the db
synchronized (publishedItems) {
// Make sure that the published item has a unique ID if NOT assigned by publisher
if (itemID == null) {
do {
itemID = StringUtils.randomString(15);
}
while (itemsByID.containsKey(itemID));
}
// Create a new published item
newItem = new PublishedItem(this, publisher, itemID, new Date());
newItem.setPayload(payload);
// Add the new item to the list of published items
newPublishedItems.add(newItem);
// Check and remove any existing items that have the matching ID,
// generated ID's won't match since we already checked.
PublishedItem duplicate = itemsByID.get(newItem.getID());
if (duplicate != null)
{
removeItem(findIndexById(duplicate.getID()));
}
// Add the published item to the list of items to persist (using another thread)
// but check that we don't exceed the limit. Remove oldest items if required.
while (!publishedItems.isEmpty() && isMaxItemsReached())
{
removeItem(0);
}
addPublishedItem(newItem);
// Add the new published item to the queue of items to add to the database. The
// queue is going to be processed by another thread
service.queueItemToAdd(newItem);
// Make sure that the published item has a unique ID if NOT assigned by publisher
if (itemID == null) {
itemID = genIdSeed + sequenceCounter.getAndIncrement();
}
// Create a new published item
newItem = new PublishedItem(this, publisher, itemID, new Date());
newItem.setPayload(payload);
// Add the new item to the list of published items
newPublishedItems.add(newItem);
setLastPublishedItem(newItem);
// Add the new published item to the queue of items to add to the database. The
// queue is going to be processed by another thread
PubSubPersistenceManager.savePublishedItem(newItem);
}
}
......@@ -307,31 +272,6 @@ public class LeafNode extends Node {
}
}
/**
* Must be called from code synchronized on publishedItems
*/
private int findIndexById(String id) {
for (int i=0; i<publishedItems.size(); i++)
{
PublishedItem item = publishedItems.get(i);
if (item.getID().equals(id))
return i;
}
return -1;
}
/**
* Must be called from code synchronized on publishedItems
*/
private void removeItem(int index) {
PublishedItem removedItem = publishedItems.remove(index);
itemsByID.remove(removedItem.getID());
// Add the removed item to the queue of items to delete from the database. The
// queue is going to be processed by another thread
service.queueItemToRemove(removedItem);
}
/**
* Deletes the list of published items from the node. Event notifications may be sent to
* subscribers for the deleted items. When an affiliate has many subscriptions to the node,
......@@ -345,17 +285,9 @@ public class LeafNode extends Node {
* @param toDelete list of items that were deleted from the node.
*/
public void deleteItems(List<PublishedItem> toDelete) {
synchronized (publishedItems) {
for (PublishedItem item : toDelete) {
// Remove items to delete from memory
publishedItems.remove(item);
// Update fast look up cache of published items
itemsByID.remove(item.getID());
}
}
// Remove deleted items from the database
for (PublishedItem item : toDelete) {
service.queueItemToRemove(item);
PubSubPersistenceManager.removePublishedItem(item);
}
if (isNotifiedOfRetract()) {
// Broadcast notification deletion to subscribers
......@@ -414,42 +346,22 @@ public class LeafNode extends Node {
if (!isItemRequired()) {
return null;
}
synchronized (publishedItems) {
return itemsByID.get(itemID);
}
return PubSubPersistenceManager.getPublishedItem(this, itemID);
}
@Override
public List<PublishedItem> getPublishedItems() {
synchronized (publishedItems) {
return Collections.unmodifiableList(publishedItems);
}
return PubSubPersistenceManager.getPublishedItems(this, getMaxPublishedItems());
}
@Override
public List<PublishedItem> getPublishedItems(int recentItems) {
synchronized (publishedItems) {
int size = publishedItems.size();
if (recentItems > size) {
// User requested more items than the one the node has so return the current list
return Collections.unmodifiableList(publishedItems);
}
else {
// Return the number of recent items the user requested
List<PublishedItem> recent = publishedItems.subList(size - recentItems, size);
return new ArrayList<PublishedItem>(recent);
}
}
return PubSubPersistenceManager.getPublishedItems(this, recentItems);
}
@Override
public PublishedItem getLastPublishedItem() {
synchronized (publishedItems) {
if (publishedItems.isEmpty()) {
return null;
}
return publishedItems.get(publishedItems.size()-1);
}
return lastPublished;
}
/**
......@@ -484,38 +396,14 @@ public class LeafNode extends Node {
* published items will be deleted with the exception of the last published item.
*/
public void purge() {
List<PublishedItem> toDelete = null;
// Calculate items to delete
synchronized (publishedItems) {
if (publishedItems.size() > 1) {
// Remove all items except the last one
toDelete = new ArrayList<PublishedItem>(
publishedItems.subList(0, publishedItems.size() - 1));
// Remove items to delete from memory
publishedItems.removeAll(toDelete);
// Update fast look up cache of published items
itemsByID = new HashMap<String, PublishedItem>();
itemsByID.put(publishedItems.get(0).getID(), publishedItems.get(0));
}
}
if (toDelete != null) {
// Delete purged items from the database
for (PublishedItem item : toDelete) {
service.queueItemToRemove(item);
}
// Broadcast purge notification to subscribers
// Build packet to broadcast to subscribers
Message message = new Message();
Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event");
Element items = event.addElement("purge");
items.addAttribute("node", nodeID);
// Send notification that the node configuration has changed
broadcastNodeEvent(message, false);
}
}
private boolean isMaxItemsReached()
{
return (maxPublishedItems > -1 ) && (publishedItems.size() >= maxPublishedItems);
PubSubPersistenceManager.purgeNode(this);
// Broadcast purge notification to subscribers
// Build packet to broadcast to subscribers
Message message = new Message();
Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event");
Element items = event.addElement("purge");
items.addAttribute("node", nodeID);
// Send notification that the node configuration has changed
broadcastNodeEvent(message, false);
}
}
......@@ -348,7 +348,7 @@ public abstract class Node {
if (savedToDB) {
// Add or update the affiliate in the database
PubSubPersistenceManager.saveAffiliation(service, this, affiliate, created);
PubSubPersistenceManager.saveAffiliation(this, affiliate, created);
}
return affiliate;
}
......@@ -367,7 +367,7 @@ public abstract class Node {
affiliates.remove(affiliate);
if (savedToDB) {
// Remove the affiliate from the database
PubSubPersistenceManager.removeAffiliation(service, this, affiliate);
PubSubPersistenceManager.removeAffiliation(this, affiliate);
}
}
......@@ -1133,6 +1133,15 @@ public abstract class Node {
return false;
}
/**
* Returns the {@link PubSubService} to which this node belongs.
*
* @return the pubsub service.
*/
public PubSubService getService() {
return service;
}
/**
* Returns the unique identifier for a node within the context of a pubsub service.
*
......@@ -1698,16 +1707,16 @@ public abstract class Node {
public void saveToDB() {
// Make the room persistent
if (!savedToDB) {
PubSubPersistenceManager.createNode(service, this);
PubSubPersistenceManager.createNode(this);
// Set that the node is now in the DB
setSavedToDB(true);
// Save the existing node affiliates to the DB
for (NodeAffiliate affialiate : affiliates) {
PubSubPersistenceManager.saveAffiliation(service, this, affialiate, true);
PubSubPersistenceManager.saveAffiliation(this, affialiate, true);
}
// Add new subscriptions to the database
for (NodeSubscription subscription : subscriptionsByID.values()) {
PubSubPersistenceManager.saveSubscription(service, this, subscription, true);
PubSubPersistenceManager.saveSubscription(this, subscription, true);
}
// Add the new node to the list of available nodes
service.addNode(this);
......@@ -1717,7 +1726,7 @@ public abstract class Node {
}
}
else {
PubSubPersistenceManager.updateNode(service, this);
PubSubPersistenceManager.updateNode(this);
}
}
......@@ -1776,7 +1785,7 @@ public abstract class Node {
*/
public boolean delete() {
// Delete node from the database
if (PubSubPersistenceManager.removeNode(service, this)) {
if (PubSubPersistenceManager.removeNode(this)) {
// Remove this node from the parent node (if any)
if (parent != null) {
parent.removeChildNode(this);
......@@ -1837,7 +1846,7 @@ public abstract class Node {
parent.addChildNode(this);
}
if (savedToDB) {
PubSubPersistenceManager.updateNode(service, this);
PubSubPersistenceManager.updateNode(this);
}
}
......@@ -2018,7 +2027,7 @@ public abstract class Node {
if (savedToDB) {
// Add the new subscription to the database
PubSubPersistenceManager.saveSubscription(service, this, subscription, true);
PubSubPersistenceManager.saveSubscription(this, subscription, true);
}
if (originalIQ != null) {
......@@ -2072,7 +2081,7 @@ public abstract class Node {
}
if (savedToDB) {
// Remove the subscription from the database
PubSubPersistenceManager.removeSubscription(service, this, subscription);
PubSubPersistenceManager.removeSubscription(subscription);
}
// Check if we need to unsubscribe from the presence of the owner
if (isPresenceBasedDelivery() && getSubscriptions(subscription.getOwner()).isEmpty()) {
......
......@@ -514,7 +514,7 @@ public class NodeSubscription {
}
if (savedToDB) {
// Update the subscription in the backend store
PubSubPersistenceManager.saveSubscription(service, node, this, false);
PubSubPersistenceManager.saveSubscription(node, this, false);
}
// Check if the service needs to subscribe or unsubscribe from the owner presence
if (!node.isPresenceBasedDelivery() && wasUsingPresence != !presenceStates.isEmpty()) {
......@@ -878,7 +878,7 @@ public class NodeSubscription {
if (savedToDB) {
// Update the subscription in the backend store
PubSubPersistenceManager.saveSubscription(service, node, this, false);
PubSubPersistenceManager.saveSubscription(node, this, false);
}
// Send last published item (if node is leaf node and subscription status is ok)
......
......@@ -1732,23 +1732,7 @@ public class PubSubEngine {
}
public void shutdown(PubSubService service) {
// Stop the maintenance processes
service.getPublishedItemTask().cancel();
// Delete from the database items contained in the itemsToDelete queue
PublishedItem entry;
while (!service.getItemsToDelete().isEmpty()) {
entry = service.getItemsToDelete().poll();
if (entry != null) {
PubSubPersistenceManager.removePublishedItem(service, entry);
}
}
// Save to the database items contained in the itemsToAdd queue
while (!service.getItemsToAdd().isEmpty()) {
entry = service.getItemsToAdd().poll();
if (entry != null) {
PubSubPersistenceManager.createPublishedItem(service, entry);
}
}
PubSubPersistenceManager.shutdown();
// Stop executing ad-hoc commands
service.getManager().stop();
......@@ -1843,77 +1827,6 @@ public class PubSubEngine {
}
}
/*******************************************************************************
* Methods related to PubSub maintenance tasks. Such as
* saving or deleting published items.
******************************************************************************/
/**
* Schedules the maintenance task for repeated <i>fixed-delay execution</i>,
* beginning after the specified delay. Subsequent executions take place
* at approximately regular intervals separated by the specified period.
*
* @param service the PubSub service this action is to be performed for.
* @param timeout the new frequency of the maintenance task.
*/
void setPublishedItemTaskTimeout(PubSubService service, int timeout) {
int items_task_timeout = service.getItemsTaskTimeout();
if (items_task_timeout == timeout) {
return;
}
// Cancel the existing task because the timeout has changed
PublishedItemTask publishedItemTask = service.getPublishedItemTask();
if (publishedItemTask != null) {
publishedItemTask.cancel();
}
service.setItemsTaskTimeout(timeout);
// Create a new task and schedule it with the new timeout
service.setPublishedItemTask(new PublishedItemTask(service));
service.getTimer().schedule(publishedItemTask, items_task_timeout, items_task_timeout);
}
/**
* Adds the item to the queue of items to remove from the database. The queue is going
* to be processed by another thread.
*
* @param service the PubSub service this action is to be performed for.
* @param removedItem the item to remove from the database.
*/
public static void queueItemToRemove(PubSubService service, PublishedItem removedItem) {
// Remove the removed item from the queue of items to add to the database
if (!service.getItemsToAdd().remove(removedItem)) {
// The item is already present in the database so add the removed item
// to the queue of items to delete from the database
service.getItemsToDelete().add(removedItem);
}
}
/**
* Adds the item to the queue of items to add to the database. The queue is going
* to be processed by another thread.
*
* @param service the PubSub service this action is to be performed for.
* @param newItem the item to add to the database.
*/
public static void queueItemToAdd(PubSubService service, PublishedItem newItem) {
service.getItemsToAdd().add(newItem);
}
/**
* Cancels any queued operation for the specified list of items. This operation is
* usually required when a node was deleted so any pending operation of the node items
* should be cancelled.
*
* @param service the PubSub service this action is to be performed for.
* @param items the list of items to remove the from queues.
*/
void cancelQueuedItems(PubSubService service, Collection<PublishedItem> items) {
for (PublishedItem item : items) {
service.getItemsToAdd().remove(item);
service.getItemsToDelete().remove(item);
}
}
/**
* Checks to see if the jid given is a component by looking at the routing
* table. Similar to {@link InternalComponentManager#hasComponent(JID)}.
......
......@@ -100,36 +100,11 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
private Map<String, Map<String, String>> barePresences =
new ConcurrentHashMap<String, Map<String, String>>();
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>(10000);
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>(10000);
/**
* Manager that keeps the list of ad-hoc commands and processing command requests.
*/
private AdHocCommandManager manager;
/**
* The time to elapse between each execution of the maintenance process. Default
* is 2 minutes.
*/
private int items_task_timeout = 2 * 60 * 1000;
/**
* Task that saves or deletes published items from the database.
*/
private PublishedItemTask publishedItemTask;
/**
* Timer to save published items to the database or remove deleted or old items.
*/
private Timer timer = new Timer("PubSub maintenance");
/**
* Returns the permission policy for creating nodes. A true value means that not anyone can
* create a node, only the JIDs listed in <code>allowedToCreate</code> are allowed to create
......@@ -188,11 +163,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
// Initialize the ad-hoc commands manager to use for this pubsub service
manager = new AdHocCommandManager();
manager.addCommand(new PendingSubscriptionsCommand(this));
// Save or delete published items from the database every 2 minutes starting in
// 2 minutes (default values)
publishedItemTask = new PublishedItemTask(this);
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
}
public void process(Packet packet) {
......@@ -303,14 +273,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
PubSubEngine.presenceSubscriptionRequired(this, node, user);
}
public void queueItemToAdd(PublishedItem newItem) {
PubSubEngine.queueItemToAdd(this, newItem);
}
public void queueItemToRemove(PublishedItem removedItem) {
PubSubEngine.queueItemToRemove(this, removedItem);
}
public String getServiceName() {
return serviceName;
}
......@@ -828,38 +790,10 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
return barePresences;
}
public Queue<PublishedItem> getItemsToAdd() {
return itemsToAdd;
}
public Queue<PublishedItem> getItemsToDelete() {
return itemsToDelete;
}
public AdHocCommandManager getManager() {
return manager;
}
public PublishedItemTask getPublishedItemTask() {
return publishedItemTask;
}
public void setPublishedItemTask(PublishedItemTask task) {
publishedItemTask = task;
}
public Timer getTimer() {
return timer;
}
public int getItemsTaskTimeout() {
return items_task_timeout;
}
public void setItemsTaskTimeout(int timeout) {
items_task_timeout = timeout;
}
public void propertySet(String property, Map<String, Object> params) {
if (property.equals("xmpp.pubsub.enabled")) {
boolean enabled = Boolean.parseBoolean((String)params.get("value"));
......
......@@ -27,17 +27,25 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.io.SAXReader;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.database.DbConnectionManager.DatabaseType;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LinkedList;
import org.jivesoftware.util.LinkedListNode;
import org.jivesoftware.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -50,7 +58,17 @@ import org.xmpp.packet.JID;
*/
public class PubSubPersistenceManager {
private static final Logger Log = LoggerFactory.getLogger(PubSubPersistenceManager.class);
private static final Logger log = LoggerFactory.getLogger(PubSubPersistenceManager.class);
private static final String PURGE_FOR_SIZE =
"DELETE ofPubsubItem FROM ofPubsubItem LEFT JOIN " +
"(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? " +
"ORDER BY creationDate DESC LIMIT ?) AS noDelete " +
"ON ofPubsubItem.id = noDelete.id WHERE noDelete.id IS NULL AND "
+ "ofPubsubItem.serviceID = ? AND nodeID = ?";
private static final String PURGE_FOR_SIZE_HSQLDB = "DELETE FROM ofPubsubItem WHERE serviceID=? AND nodeID=? AND id NOT IN "
+ "(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC LIMIT ?)";
private static final String LOAD_NON_LEAF_NODES =
"SELECT nodeID, leaf, creationDate, modificationDate, parent, deliverPayloads, " +
......@@ -132,13 +150,15 @@ public class PubSubPersistenceManager {
"DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=? AND id=?";
private static final String DELETE_SUBSCRIPTIONS =
"DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=?";
private static final String LOAD_ALL_ITEMS =
"SELECT id,jid,creationDate,payload,nodeID FROM ofPubsubItem " +
"WHERE serviceID=? ORDER BY creationDate";
private static final String LOAD_ITEMS =
"SELECT id,jid,creationDate,payload FROM ofPubsubItem " +
"WHERE serviceID=? AND nodeID=? ORDER BY creationDate";
"WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC";
private static final String LOAD_ITEM =
"SELECT jid,creationDate,payload FROM ofPubsubItem " +
"WHERE serviceID=? AND nodeID=? AND id=?";
private static final String LOAD_LAST_ITEM =
"SELECT jid,creationDate,payload FROM ofPubsubItem " +
"WHERE serviceID=? AND nodeID=? AND creationDate IN (select MAX(creationDate) FROM ofPubsubItem GROUP BY serviceId,nodeId)";
private static final String ADD_ITEM =
"INSERT INTO ofPubsubItem (serviceID,nodeID,id,jid,creationDate,payload) " +
"VALUES (?,?,?,?,?,?)";
......@@ -166,8 +186,36 @@ public class PubSubPersistenceManager {
"accessModel, language, replyPolicy, associationPolicy, maxLeafNodes) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
private static Timer flushTimer = new Timer("Pubsub item flush timer");
private static long flushTimerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.flush.timer", 120) * 1000;
private static Timer purgeTimer = new Timer("Pubsub purge stale items timer");
private static long purgeTimerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.purge.timer", 300) * 1000;
private static final int POOL_SIZE = 50;
private static final int ITEM_CACHE_SIZE = JiveGlobals.getIntProperty("xmpp.pubsub.flush.cache", 1000);
private static int publishedItemSize = 0;
private static final Object itemLock = new Object();
private static final int MAX_ROWS_FETCH = JiveGlobals.getIntProperty("xmpp.pubsub.fetch.max", 2000);
/**
* Queue that holds the items that need to be added to the database.
*/
private static LinkedList itemsToAdd = new LinkedList();
/**
* Queue that holds the items that need to be deleted from the database.
*/
private static LinkedList itemsToDelete = new LinkedList();
/**
* Keeps reference to published items that haven't been persisted yet so they can be removed
* before being deleted.
*/
private static final HashMap<String, LinkedListNode> itemMap = new HashMap<String, LinkedListNode>();
/**
* Pool of SAX Readers. SAXReader is not thread safe so we need to have a pool of readers.
*/
......@@ -180,6 +228,32 @@ public class PubSubPersistenceManager {
xmlReader.setEncoding("UTF-8");
xmlReaders.add(xmlReader);
}
// Enforce a min of 20s
if (flushTimerDelay < 20000)
flushTimerDelay = 20000;
flushTimer.schedule(new TimerTask()
{
@Override
public void run()
{
flushItems();
}
}, flushTimerDelay, flushTimerDelay);
// Enforce a min of 20s
if (purgeTimerDelay < 60000)
purgeTimerDelay = 60000;
purgeTimer.schedule(new TimerTask()
{
@Override
public void run()
{
purgeItems();
}
}, purgeTimerDelay, purgeTimerDelay);
}
/**
......@@ -188,14 +262,14 @@ public class PubSubPersistenceManager {
* @param service The pubsub service that is hosting the node.
* @param node The newly created node.
*/
public static void createNode(PubSubService service, Node node) {
public static void createNode(Node node) {
Connection con = null;
PreparedStatement pstmt = null;
boolean abortTransaction = false;
try {
con = DbConnectionManager.getTransactionConnection();
pstmt = con.prepareStatement(ADD_NODE);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setInt(3, (node.isCollectionNode() ? 0 : 1));
pstmt.setString(4, StringUtils.dateToMillis(node.getCreationDate()));
......@@ -245,10 +319,10 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate();
// Save associated JIDs and roster groups
saveAssociatedElements(con, node, service);
saveAssociatedElements(con, node);
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
abortTransaction = true;
}
finally {
......@@ -263,7 +337,7 @@ public class PubSubPersistenceManager {
* @param service The pubsub service that is hosting the node.
* @param node The updated node.
*/
public static void updateNode(PubSubService service, Node node) {
public static void updateNode(Node node) {
Connection con = null;
PreparedStatement pstmt = null;
boolean abortTransaction = false;
......@@ -312,29 +386,29 @@ public class PubSubPersistenceManager {
pstmt.setString(23, null);
pstmt.setInt(24, 0);
}
pstmt.setString(25, service.getServiceID());
pstmt.setString(25, node.getService().getServiceID());
pstmt.setString(26, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
DbConnectionManager.fastcloseStmt(pstmt);
// Remove existing JIDs associated with the the node
pstmt = con.prepareStatement(DELETE_NODE_JIDS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
DbConnectionManager.fastcloseStmt(pstmt);
// Remove roster groups associated with the the node being deleted
pstmt = con.prepareStatement(DELETE_NODE_GROUPS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
// Save associated JIDs and roster groups
saveAssociatedElements(con, node, service);
saveAssociatedElements(con, node);
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
abortTransaction = true;
}
finally {
......@@ -343,27 +417,26 @@ public class PubSubPersistenceManager {
}
}
private static void saveAssociatedElements(Connection con, Node node,
PubSubService service) throws SQLException {
private static void saveAssociatedElements(Connection con, Node node) throws SQLException {
// Add new JIDs associated with the the node
PreparedStatement pstmt = con.prepareStatement(ADD_NODE_JIDS);
try {
for (JID jid : node.getContacts()) {
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, jid.toString());
pstmt.setString(4, "contacts");
pstmt.executeUpdate();
}
for (JID jid : node.getReplyRooms()) {
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, jid.toString());
pstmt.setString(4, "replyRooms");
pstmt.executeUpdate();
}
for (JID jid : node.getReplyTo()) {
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, jid.toString());
pstmt.setString(4, "replyTo");
......@@ -371,7 +444,7 @@ public class PubSubPersistenceManager {
}
if (node.isCollectionNode()) {
for (JID jid : ((CollectionNode) node).getAssociationTrusted()) {
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, jid.toString());
pstmt.setString(4, "associationTrusted");
......@@ -382,7 +455,7 @@ public class PubSubPersistenceManager {
// Add new roster groups associated with the the node
pstmt = con.prepareStatement(ADD_NODE_GROUPS);
for (String groupName : node.getRosterGroupsAllowed()) {
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, groupName);
pstmt.executeUpdate();
......@@ -400,7 +473,7 @@ public class PubSubPersistenceManager {
* @param node The node that is being deleted.
* @return true If the operation was successful.
*/
public static boolean removeNode(PubSubService service, Node node) {
public static boolean removeNode(Node node) {
Connection con = null;
PreparedStatement pstmt = null;
boolean abortTransaction = false;
......@@ -408,47 +481,47 @@ public class PubSubPersistenceManager {
con = DbConnectionManager.getTransactionConnection();
// Remove the affiliate from the table of node affiliates
pstmt = con.prepareStatement(DELETE_NODE);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
DbConnectionManager.fastcloseStmt(pstmt);
// Remove JIDs associated with the the node being deleted
pstmt = con.prepareStatement(DELETE_NODE_JIDS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
DbConnectionManager.fastcloseStmt(pstmt);
// Remove roster groups associated with the the node being deleted
pstmt = con.prepareStatement(DELETE_NODE_GROUPS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
DbConnectionManager.fastcloseStmt(pstmt);
// Remove published items of the node being deleted
pstmt = con.prepareStatement(DELETE_ITEMS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
DbConnectionManager.closeStatement(pstmt);
// Remove all affiliates from the table of node affiliates
pstmt = con.prepareStatement(DELETE_AFFILIATIONS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
DbConnectionManager.fastcloseStmt(pstmt);
// Remove users that were subscribed to the node
pstmt = con.prepareStatement(DELETE_SUBSCRIPTIONS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
abortTransaction = true;
}
finally {
......@@ -529,19 +602,9 @@ public class PubSubPersistenceManager {
loadSubscriptions(service, nodes, rs);
}
DbConnectionManager.fastcloseStmt(rs, pstmt);
// TODO We may need to optimize memory consumption and load items on-demand
// Load published items of all nodes
pstmt = con.prepareStatement(LOAD_ALL_ITEMS);
pstmt.setString(1, service.getServiceID());
rs = pstmt.executeQuery();
// Add to each node the correspondiding subscriptions
while(rs.next()) {
loadItems(nodes, rs);
}
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
......@@ -557,8 +620,7 @@ public class PubSubPersistenceManager {
}
}
private static void loadNode(PubSubService service, Map<String, Node> loadedNodes,
ResultSet rs) {
private static void loadNode(PubSubService service, Map<String, Node> loadedNodes, ResultSet rs) {
Node node;
try {
String nodeID = decodeNodeID(rs.getString(1));
......@@ -571,7 +633,7 @@ public class PubSubPersistenceManager {
parentNode = (CollectionNode) loadedNodes.get(parent);
if (parentNode == null) {
// Parent is not in memory so try to load it
Log.warn("Node not loaded due to missing parent. NodeID: " + nodeID);
log.warn("Node not loaded due to missing parent. NodeID: " + nodeID);
return;
}
}
......@@ -620,7 +682,7 @@ public class PubSubPersistenceManager {
loadedNodes.put(node.getNodeID(), node);
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
}
......@@ -629,7 +691,7 @@ public class PubSubPersistenceManager {
String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID);
if (node == null) {
Log.warn("JID associated to a non-existent node: " + nodeID);
log.warn("JID associated to a non-existent node: " + nodeID);
return;
}
JID jid = new JID(rs.getString(2));
......@@ -648,7 +710,7 @@ public class PubSubPersistenceManager {
}
}
catch (Exception ex) {
Log.error(ex.getMessage(), ex);
log.error(ex.getMessage(), ex);
}
}
......@@ -657,13 +719,13 @@ public class PubSubPersistenceManager {
String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID);
if (node == null) {
Log.warn("Roster Group associated to a non-existent node: " + nodeID);
log.warn("Roster Group associated to a non-existent node: " + nodeID);
return;
}
node.addAllowedRosterGroup(rs.getString(2));
}
catch (SQLException ex) {
Log.error(ex.getMessage(), ex);
log.error(ex.getMessage(), ex);
}
}
......@@ -672,7 +734,7 @@ public class PubSubPersistenceManager {
String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID);
if (node == null) {
Log.warn("Affiliations found for a non-existent node: " + nodeID);
log.warn("Affiliations found for a non-existent node: " + nodeID);
return;
}
NodeAffiliate affiliate = new NodeAffiliate(node, new JID(rs.getString(2)));
......@@ -680,24 +742,23 @@ public class PubSubPersistenceManager {
node.addAffiliate(affiliate);
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
}
private static void loadSubscriptions(PubSubService service, Map<String, Node> nodes,
ResultSet rs) {
private static void loadSubscriptions(PubSubService service, Map<String, Node> nodes, ResultSet rs) {
try {
String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID);
if (node == null) {
Log.warn("Subscription found for a non-existent node: " + nodeID);
log.warn("Subscription found for a non-existent node: " + nodeID);
return;
}
String subID = rs.getString(2);
JID subscriber = new JID(rs.getString(3));
JID owner = new JID(rs.getString(4));
if (node.getAffiliate(owner) == null) {
Log.warn("Subscription found for a non-existent affiliate: " + owner +
log.warn("Subscription found for a non-existent affiliate: " + owner +
" in node: " + nodeID);
return;
}
......@@ -720,43 +781,7 @@ public class PubSubPersistenceManager {
node.addSubscription(subscription);
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
}
}
private static void loadItems(Map<String, Node> nodes, ResultSet rs) {
SAXReader xmlReader = null;
try {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
String nodeID = decodeNodeID(rs.getString(5));
LeafNode node = (LeafNode) nodes.get(nodeID);
if (node == null) {
Log.warn("Published Item found for a non-existent node: " + nodeID);
return;
}
String itemID = rs.getString(1);
JID publisher = new JID(rs.getString(2));
Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
// Create the item
PublishedItem item = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item
if (rs.getString(4) != null) {
item.setPayload(
xmlReader.read(new StringReader(rs.getString(4))).getRootElement());
}
// Add the published item to the node
node.addPublishedItem(item);
}
catch (Exception sqle) {
Log.error(sqle.getMessage(), sqle);
}
finally {
// Return the sax reader to the pool
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
log.error(sqle.getMessage(), sqle);
}
}
......@@ -768,8 +793,7 @@ public class PubSubPersistenceManager {
* @param affiliate The new affiliation of the user in the node.
* @param create True if this is a new affiliate.
*/
public static void saveAffiliation(PubSubService service, Node node, NodeAffiliate affiliate,
boolean create) {
public static void saveAffiliation(Node node, NodeAffiliate affiliate, boolean create) {
Connection con = null;
PreparedStatement pstmt = null;
try {
......@@ -777,7 +801,7 @@ public class PubSubPersistenceManager {
if (create) {
// Add the user to the generic affiliations table
pstmt = con.prepareStatement(ADD_AFFILIATION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, affiliate.getJID().toString());
pstmt.setString(4, affiliate.getAffiliation().name());
......@@ -787,14 +811,14 @@ public class PubSubPersistenceManager {
// Update the affiliate's data in the backend store
pstmt = con.prepareStatement(UPDATE_AFFILIATION);
pstmt.setString(1, affiliate.getAffiliation().name());
pstmt.setString(2, service.getServiceID());
pstmt.setString(2, node.getService().getServiceID());
pstmt.setString(3, encodeNodeID(node.getNodeID()));
pstmt.setString(4, affiliate.getJID().toString());
pstmt.executeUpdate();
}
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
......@@ -808,21 +832,20 @@ public class PubSubPersistenceManager {
* @param node The node where the affiliation of the user was updated.
* @param affiliate The existing affiliation and subsription state of the user in the node.
*/
public static void removeAffiliation(PubSubService service, Node node,
NodeAffiliate affiliate) {
public static void removeAffiliation(Node node, NodeAffiliate affiliate) {
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
// Remove the affiliate from the table of node affiliates
pstmt = con.prepareStatement(DELETE_AFFILIATION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, affiliate.getJID().toString());
pstmt.executeUpdate();
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
......@@ -837,8 +860,7 @@ public class PubSubPersistenceManager {
* @param subscription The new subscription of the user to the node.
* @param create True if this is a new affiliate.
*/
public static void saveSubscription(PubSubService service, Node node,
NodeSubscription subscription, boolean create) {
public static void saveSubscription(Node node, NodeSubscription subscription, boolean create) {
Connection con = null;
PreparedStatement pstmt = null;
try {
......@@ -846,7 +868,7 @@ public class PubSubPersistenceManager {
if (create) {
// Add the subscription of the user to the database
pstmt = con.prepareStatement(ADD_SUBSCRIPTION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(3, subscription.getID());
pstmt.setString(4, subscription.getJID().toString());
......@@ -875,7 +897,7 @@ public class PubSubPersistenceManager {
if (NodeSubscription.State.none == subscription.getState()) {
// Remove the subscription of the user from the table
pstmt = con.prepareStatement(DELETE_SUBSCRIPTION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(2, subscription.getID());
pstmt.executeUpdate();
......@@ -900,7 +922,7 @@ public class PubSubPersistenceManager {
pstmt.setString(9, subscription.getType().name());
pstmt.setInt(10, subscription.getDepth());
pstmt.setString(11, subscription.getKeyword());
pstmt.setString(12, service.getServiceID());
pstmt.setString(12, node.getService().getServiceID());
pstmt.setString(13, encodeNodeID(node.getNodeID()));
pstmt.setString(14, subscription.getID());
pstmt.executeUpdate();
......@@ -908,7 +930,7 @@ public class PubSubPersistenceManager {
}
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
......@@ -922,21 +944,20 @@ public class PubSubPersistenceManager {
* @param node The node where the user was subscribed to.
* @param subscription The existing subsription of the user to the node.
*/
public static void removeSubscription(PubSubService service, Node node,
NodeSubscription subscription) {
public static void removeSubscription(NodeSubscription subscription) {
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
// Remove the affiliate from the table of node affiliates
pstmt = con.prepareStatement(DELETE_SUBSCRIPTION);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.setString(1, subscription.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(subscription.getNode().getNodeID()));
pstmt.setString(3, subscription.getID());
pstmt.executeUpdate();
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
......@@ -944,116 +965,134 @@ public class PubSubPersistenceManager {
}
/**
* Loads and adds the published items to the specified node.
* Creates and stores the published item in the database.
*
* @param service the pubsub service that is hosting the node.
* @param node the leaf node to load its published items.
* @param item The published item to save.
* @return true if the item was successfully saved to the database.
*/
public static void loadItems(PubSubService service, LeafNode node) {
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
SAXReader xmlReader = null;
try {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
con = DbConnectionManager.getConnection();
// Get published items of the specified node
pstmt = con.prepareStatement(LOAD_ITEMS);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
rs = pstmt.executeQuery();
// Rebuild loaded published items
while(rs.next()) {
String itemID = rs.getString(1);
JID publisher = new JID(rs.getString(2));
Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
// Create the item
PublishedItem item = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item
if (rs.getString(4) != null) {
item.setPayload(
xmlReader.read(new StringReader(rs.getString(4))).getRootElement());
}
// Add the published item to the node
node.addPublishedItem(item);
}
}
catch (Exception sqle) {
Log.error(sqle.getMessage(), sqle);
}
finally {
// Return the sax reader to the pool
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
DbConnectionManager.closeConnection(rs, pstmt, con);
}
public static void savePublishedItem(PublishedItem item) {
synchronized(itemLock)
{
LinkedListNode nodeToReplace = itemMap.get(item.getID());
if (nodeToReplace != null)
{
nodeToReplace.remove();
publishedItemSize--;
}
LinkedListNode listNode = itemsToAdd.addLast(item);
itemMap.put(item.getID(), listNode);
publishedItemSize++;
if (publishedItemSize > ITEM_CACHE_SIZE)
flushItems();
}
}
/**
* Creates and stores the published item in the database.
*
* @param service the pubsub service that is hosting the node.
* @param item The published item to save.
* @return true if the item was successfully saved to the database.
* Flush the cache of items to be persisted and deleted.
*/
public static boolean createPublishedItem(PubSubService service, PublishedItem item) {
boolean success = false;
public static void flushItems()
{
log.debug("Flushing items to database");
LinkedList addList = null;
LinkedList delList = null;
// Swap the cache so we can parse and save the contents from this point in time
// while not blocking new entries from being cached.
synchronized(itemLock)
{
addList = itemsToAdd;
delList = itemsToDelete;
itemsToAdd = new LinkedList();
itemsToDelete = new LinkedList();
itemMap.clear();
publishedItemSize = 0;
}
LinkedListNode addItem = addList.getFirst();
LinkedListNode delItem = delList.getFirst();
// Check to see if there is anything to actually do.
if ((addItem == null) && (delItem == null))
return;
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
// Remove the published item from the database
pstmt = con.prepareStatement(ADD_ITEM);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID());
pstmt.setString(4, item.getPublisher().toString());
pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate()));
pstmt.setString(6, item.getPayloadXML());
pstmt.executeUpdate();
// Set that the item was successfully saved to the database
success = true;
try
{
con = DbConnectionManager.getTransactionConnection();
// Add all items that were cached
if (addItem != null)
{
LinkedListNode addHead = addList.getLast().next;
pstmt = con.prepareStatement(ADD_ITEM);
while (addItem != addHead)
{
PublishedItem item = (PublishedItem) addItem.object;
pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID());
pstmt.setString(4, item.getPublisher().toString());
pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate()));
pstmt.setString(6, item.getPayloadXML());
pstmt.addBatch();
addItem = addItem.next;
}
pstmt.executeBatch();
}
if (delItem != null)
{
LinkedListNode delHead = delList.getLast().next;
pstmt = con.prepareStatement(DELETE_ITEM);
while (delItem != delHead)
{
PublishedItem item = (PublishedItem) delItem.object;
pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID());
pstmt.executeUpdate();
delItem = delItem.next;
}
}
con.commit();
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
catch (SQLException sqle)
{
log.error(sqle.getMessage(), sqle);
}
finally {
finally
{
DbConnectionManager.closeConnection(pstmt, con);
}
return success;
}
/**
* Removes the specified published item from the DB.
*
* @param service the pubsub service that is hosting the node.
* @param item The published item to delete.
* @return true if the item was successfully deleted from the database.
*/
public static boolean removePublishedItem(PubSubService service, PublishedItem item) {
boolean success = false;
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
// Remove the published item from the database
pstmt = con.prepareStatement(DELETE_ITEM);
pstmt.setString(1, service.getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID());
pstmt.executeUpdate();
// Set that the item was successfully deleted from the database
success = true;
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
}
return success;
public static void removePublishedItem(PublishedItem item) {
synchronized (itemLock)
{
itemsToDelete.addLast(item);
LinkedListNode nodeToDelete = itemMap.remove(item.getID());
if (nodeToDelete != null)
nodeToDelete.remove();
}
}
/**
......@@ -1103,7 +1142,7 @@ public class PubSubPersistenceManager {
}
}
catch (Exception sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
......@@ -1150,7 +1189,7 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate();
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
......@@ -1196,146 +1235,209 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate();
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
}
}
/*public static Node loadNode(PubSubService service, String nodeID) {
/**
* Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}.
*
* @param service the pubsub service that is hosting the node.
* @param node the leaf node to load its published items.
*/
public static List<PublishedItem> getPublishedItems(LeafNode node) {
return getPublishedItems(node, node.getMaxPublishedItems());
}
/**
* Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}.
*
* @param service the pubsub service that is hosting the node.
* @param node the leaf node to load its published items.
*/
public static List<PublishedItem> getPublishedItems(LeafNode node, int maxRows) {
// Flush all items to the database first to ensure the list is all recent items.
flushItems();
Connection con = null;
Node node = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
SAXReader xmlReader = null;
int max = MAX_ROWS_FETCH;
int maxPublished = node.getMaxPublishedItems();
// Limit the max rows until a solution is in place with Result Set Management
if (maxRows != -1)
max = maxPublished == -1 ? Math.min(maxRows, MAX_ROWS_FETCH) : Math.min(maxRows, maxPublished);
else if (maxPublished != -1)
max = Math.min(MAX_ROWS_FETCH, maxPublished);
// We don't know how many items are in the db, so we will start with an allocation of 500
List<PublishedItem> results = new ArrayList<PublishedItem>(max);
try {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
con = DbConnectionManager.getConnection();
node = loadNode(service, nodeID, con);
// Get published items of the specified node
pstmt = con.prepareStatement(LOAD_ITEMS);
pstmt.setMaxRows(max);
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
rs = pstmt.executeQuery();
int counter = 0;
// Rebuild loaded published items
while(rs.next() && (counter < max)) {
String itemID = rs.getString(1);
JID publisher = new JID(rs.getString(2));
Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
// Create the item
PublishedItem item = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item
if (rs.getString(4) != null) {
item.setPayload(
xmlReader.read(new StringReader(rs.getString(4))).getRootElement());
}
// Add the published item to the node
results.add(item);
counter++;
}
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
catch (Exception sqle) {
log.error(sqle.getMessage(), sqle);
}
finally {
try { if (con != null) con:close(); }
catch (Exception e) { Log.error(e.getMessage(), e); }
// Return the sax reader to the pool
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
DbConnectionManager.closeConnection(rs, pstmt, con);
}
return node;
if (results.size() == 0)
results = Collections.emptyList();
return results;
}
private static Node loadNode(PubSubService service, String nodeID, Connection con) {
Node node = null;
/**
* Fetches the last published item for the specified node.
*
* @param node the leaf node to load its last published items.
*/
public static PublishedItem getLastPublishedItem(LeafNode node) {
flushItems();
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
SAXReader xmlReader = null;
PublishedItem item = null;
try {
pstmt = con.prepareStatement(LOAD_NODE);
pstmt.setString(1, encodeNodeID(nodeID));
ResultSet rs = pstmt.executeQuery();
if (!rs.next()) {
// No node was found for the specified nodeID so return null
return null;
}
boolean leaf = rs.getInt(1) == 1;
String parent = decodeNodeID(rs.getString(4));
JID creator = new JID(rs.getString(20));
CollectionNode parentNode = null;
if (parent != null) {
// Check if the parent has already been loaded
parentNode = (CollectionNode) service.getNode(parent);
if (parentNode == null) {
// Parent is not in memory so try to load it
synchronized (parent.intern()) {
// Check again if parent has not been already loaded (concurrency issues)
parentNode = (CollectionNode) service.getNode(parent);
if (parentNode == null) {
// Parent was never loaded so load it from the database now
parentNode = (CollectionNode) loadNode(service, parent, con);
}
}
}
}
if (leaf) {
// Retrieving a leaf node
node = new LeafNode(parentNode, nodeID, creator);
}
else {
// Retrieving a collection node
node = new CollectionNode(parentNode, nodeID, creator);
}
node.setCreationDate(new Date(Long.parseLong(rs.getString(2).trim())));
node.setModificationDate(new Date(Long.parseLong(rs.getString(3).trim())));
node.setPayloadDelivered(rs.getInt(5) == 1);
node.setMaxPayloadSize(rs.getInt(6));
node.setPersistPublishedItems(rs.getInt(7) == 1);
node.setMaxPublishedItems(rs.getInt(8));
node.setNotifiedOfConfigChanges(rs.getInt(9) == 1);
node.setNotifiedOfDelete(rs.getInt(10) == 1);
node.setNotifiedOfRetract(rs.getInt(11) == 1);
node.setPresenceBasedDelivery(rs.getInt(12) == 1);
node.setSendItemSubscribe(rs.getInt(13) == 1);
node.setPublisherModel(Node.PublisherModel.valueOf(rs.getString(14)));
node.setSubscriptionEnabled(rs.getInt(15) == 1);
node.setAccessModel(Node.AccessModel.valueOf(rs.getString(16)));
node.setPayloadType(rs.getString(17));
node.setBodyXSLT(rs.getString(18));
node.setDataformXSLT(rs.getString(19));
node.setDescription(rs.getString(21));
node.setLanguage(rs.getString(22));
node.setName(rs.getString(23));
rs:close();
pstmt:close();
pstmt = con.prepareStatement(LOAD_HISTORY);
// Recreate the history until two days ago
long from = System.currentTimeMillis() - (86400000 * 2);
pstmt.setString(1, StringUtils.dateToMillis(new Date(from)));
pstmt.setLong(2, room.getID());
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
con = DbConnectionManager.getConnection();
// Get published items of the specified node
pstmt = con.prepareStatement(LOAD_LAST_ITEM);
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
rs = pstmt.executeQuery();
while (rs.next()) {
String senderJID = rs.getString(1);
String nickname = rs.getString(2);
Date sentDate = new Date(Long.parseLong(rs.getString(3).trim()));
String subject = rs.getString(4);
String body = rs.getString(5);
// Recreate the history only for the rooms that have the conversation logging
// enabled
if (room.isLogEnabled()) {
room.getRoomHistory().addOldMessage(senderJID, nickname, sentDate, subject,
body);
// Rebuild loaded published items
if (rs.next()) {
String itemID = rs.getString(1);
JID publisher = new JID(rs.getString(2));
Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
// Create the item
item = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item
if (rs.getString(4) != null) {
item.setPayload(
xmlReader.read(new StringReader(rs.getString(4))).getRootElement());
}
}
rs:close();
pstmt:close();
}
catch (Exception sqle) {
log.error(sqle.getMessage(), sqle);
}
finally {
// Return the sax reader to the pool
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
DbConnectionManager.closeConnection(rs, pstmt, con);
}
return item;
}
pstmt = con.prepareStatement(LOAD_NODE_AFFILIATIONS);
pstmt.setString(1, encodeNodeID(node.getNodeID()));
public static PublishedItem getPublishedItem(LeafNode node, String itemID) {
flushItems();
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
SAXReader xmlReader = null;
PublishedItem result = null;
try {
xmlReader = xmlReaders.take();
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(LOAD_ITEM);
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, node.getNodeID());
pstmt.setString(3, itemID);
rs = pstmt.executeQuery();
while (rs.next()) {
NodeAffiliate affiliate = new NodeAffiliate(new JID(rs.getString(1)));
affiliate.setAffiliation(NodeAffiliate.Affiliation.valueOf(rs.getString(2)));
affiliate.setSubscription(NodeAffiliate.State.valueOf(rs.getString(3)));
node.addAffiliate(affiliate);
// Add to each node the corresponding subscriptions
if (rs.next()) {
JID publisher = new JID(rs.getString(1));
Date creationDate = new Date(Long.parseLong(rs.getString(2).trim()));
// Create the item
result = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item
if (rs.getString(3) != null) {
result.setPayload(
xmlReader.read(new StringReader(rs.getString(3))).getRootElement());
}
}
rs:close();
}
catch (Exception exc) {
log.error(exc.getMessage(), exc);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
if (xmlReader != null)
xmlReaders.add(xmlReader);
}
return result;
}
// Set now that the room's configuration is updated in the database. Note: We need to
// set this now since otherwise the room's affiliations will be saved to the database
// "again" while adding them to the room!
node.setSavedToDB(true);
// Add the retrieved node to the pubsub service
service.addChildNode(node);
public static void purgeNode(LeafNode leafNode) {
flushItems();
// Remove published items of the node being deleted
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(DELETE_ITEMS);
pstmt.setString(1, leafNode.getService().getServiceID());
pstmt.setString(2, encodeNodeID(leafNode.getNodeID()));
pstmt.executeUpdate();
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
catch (Exception exc) {
log.error(exc.getMessage(), exc);
}
finally {
try { if (pstmt != null) pstmt:close(); }
catch (Exception e) { Log.error(e.getMessage(), e); }
try { if (con != null) con:close(); }
catch (Exception e) { Log.error(e.getMessage(), e); }
DbConnectionManager.closeConnection(pstmt, con);
}
return node;
}*/
}
private static String encodeWithComma(Collection<String> strings) {
private static String encodeWithComma(Collection<String> strings) {
StringBuilder sb = new StringBuilder(90);
for (String group : strings) {
sb.append(group).append(",");
......@@ -1376,4 +1478,93 @@ public class PubSubPersistenceManager {
}
return nodeID;
}
/**
* Purges all items from the database that exceed the defined item count on
* all nodes.
*/
private static void purgeItems()
{
String persistentNodeQuery = "SELECT serviceID, nodeID, maxItems FROM ofPubsubNode WHERE "
+ "leaf=1 AND persistItems=1 AND maxItems > 0";
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try
{
con = DbConnectionManager.getTransactionConnection();
PreparedStatement nodeConfig = con.prepareStatement(persistentNodeQuery);
rs = nodeConfig.executeQuery();
PreparedStatement purgeNode = con
.prepareStatement(getPurgeStatement(DbConnectionManager.getDatabaseType()));
while (rs.next())
{
String svcId = rs.getString(1);
String nodeId = rs.getString(2);
int maxItems = rs.getInt(3);
setPurgeParams(DbConnectionManager.getDatabaseType(), purgeNode, svcId, nodeId, maxItems);
int rowsAffected = purgeNode.executeUpdate();
if (log.isDebugEnabled())
log.debug(rowsAffected + " deleted rows from service: " + svcId + " nodeId: " + nodeId);
}
}
catch (Exception sqle)
{
sqle.printStackTrace();
log.error(sqle.getMessage(), sqle);
}
finally
{
DbConnectionManager.closeConnection(rs, pstmt, con);
}
}
private static void setPurgeParams(DatabaseType dbType, PreparedStatement purgeStmt, String serviceId,
String nodeId, int maxItems) throws SQLException
{
switch (dbType)
{
case hsqldb:
purgeStmt.setString(1, serviceId);
purgeStmt.setString(2, nodeId);
purgeStmt.setString(3, serviceId);
purgeStmt.setString(4, nodeId);
purgeStmt.setInt(5, maxItems);
break;
default:
purgeStmt.setString(1, serviceId);
purgeStmt.setString(2, nodeId);
purgeStmt.setInt(3, maxItems);
purgeStmt.setString(4, serviceId);
purgeStmt.setString(5, nodeId);
break;
}
}
private static String getPurgeStatement(DatabaseType type)
{
switch (type)
{
case hsqldb:
return PURGE_FOR_SIZE_HSQLDB;
default:
return PURGE_FOR_SIZE;
}
}
public static void shutdown()
{
flushItems();
purgeItems();
}
}
......@@ -239,76 +239,10 @@ public interface PubSubService {
*/
boolean isMultipleSubscriptionsEnabled();
/**
* Adds the item to the queue of items to add to the database. The queue is going
* to be processed by another thread.
*
* @param newItem the item to add to the database.
*/
void queueItemToAdd(PublishedItem newItem);
/**
* Gets the queue that holds the items that need to be added to the database.
*
* @return the queue that holds the items that need to be added to the database.
*/
Queue<PublishedItem> getItemsToAdd();
/**
* Gets the queue that holds the items that need to be deleted from the database.
*
* @return the queue that holds the items that need to be deleted from the database.
*/
Queue<PublishedItem> getItemsToDelete();
/**
* Returns the ad-hoc commands manager used for this service.
*
* @return the ad-hoc commands manager used for this service.
*/
AdHocCommandManager getManager();
/**
* Returns the published item task used for this service.
*
* @return the published item task used for this service.
*/
PublishedItemTask getPublishedItemTask();
/**
* Sets the published item task used for this service.
*
* @param task the PublishedItemTask to set for this service.
*/
void setPublishedItemTask(PublishedItemTask task);
/**
* Adds the item to the queue of items to remove from the database. The queue is going
* to be processed by another thread.
*
* @param removedItem the item to remove from the database.
*/
void queueItemToRemove(PublishedItem removedItem);
/**
* Returns the timer used for the maintenance process of this service.
*
* @return the timer used for the maintenance process of this service.
*/
Timer getTimer();
/**
* Returns the timeout value for the published items maintenance task.
*
* @return the timeout value for the published items maintenance task.
*/
int getItemsTaskTimeout();
/**
* Sets the timeout value for the published items maintenance task.
*
* @param timeout the timeout value for the published items maintenance task.
*/
void setItemsTaskTimeout(int timeout);
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2005-2008 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.openfire.pubsub;
import java.util.Queue;
import java.util.TimerTask;
import org.jivesoftware.openfire.pep.PEPService;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A timed maintenance task that updates the database by adding and/or
* removing <code>PublishedItem</code>s in regular intervals.
*
* @author Matt Tucker
*/
public class PublishedItemTask extends TimerTask {
private static final Logger Log = LoggerFactory.getLogger(PublishedItemTask.class);
/**
* Queue that holds the items that need to be added to the database.
*/
private Queue<PublishedItem> itemsToAdd = null;
/**
* Queue that holds the items that need to be deleted from the database.
*/
private Queue<PublishedItem> itemsToDelete = null;
/**
* The service to perform the published item tasks on.
*/
private PubSubService service = null;
/**
* The number of items to save on each run of the maintenance process.
*/
private int items_batch_size = 50;
public PublishedItemTask(PubSubService service) {
this.service = service;
this.itemsToAdd = service.getItemsToAdd();
this.itemsToDelete = service.getItemsToDelete();
}
@Override
public void run() {
try {
PublishedItem entry;
boolean success;
// Delete from the database items contained in the itemsToDelete queue
for (int index = 0; index <= items_batch_size && !itemsToDelete.isEmpty(); index++) {
entry = itemsToDelete.poll();
if (entry != null) {
success = PubSubPersistenceManager.removePublishedItem(service, entry);
if (!success) {
itemsToDelete.add(entry);
}
}
}
// Save to the database items contained in the itemsToAdd queue
for (int index = 0; index <= items_batch_size && !itemsToAdd.isEmpty(); index++) {
entry = itemsToAdd.poll();
if (entry != null) {
success = PubSubPersistenceManager.createPublishedItem(service, entry);
if (!success) {
itemsToAdd.add(entry);
}
}
}
} catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
protected PubSubService getService() {
return service;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment