Commit 8dbcb94b authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-205: Implement replicated cache for published items

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/pubsub_clustering@13282 b35dd754-fafc-0310-a699-88a17e54d16e
parents a4bebe6b d4c65b40
...@@ -495,7 +495,7 @@ public class PEPService implements PubSubService, Cacheable { ...@@ -495,7 +495,7 @@ public class PEPService implements PubSubService, Cacheable {
Message notification = new Message(); Message notification = new Message();
Element event = notification.getElement().addElement("event", "http://jabber.org/protocol/pubsub#event"); Element event = notification.getElement().addElement("event", "http://jabber.org/protocol/pubsub#event");
Element items = event.addElement("items"); Element items = event.addElement("items");
items.addAttribute("node", leafLastPublishedItem.getNode().getNodeID()); items.addAttribute("node", leafLastPublishedItem.getNodeID());
Element item = items.addElement("item"); Element item = items.addElement("item");
if (((LeafNode) leafLastPublishedItem.getNode()).isItemRequired()) { if (((LeafNode) leafLastPublishedItem.getNode()).isItemRequired()) {
item.addAttribute("id", leafLastPublishedItem.getID()); item.addAttribute("id", leafLastPublishedItem.getID());
......
...@@ -102,7 +102,7 @@ public class NodeAffiliate { ...@@ -102,7 +102,7 @@ public class NodeAffiliate {
// //
// If the node ID looks like a JID, replace it with the published item's node ID. // If the node ID looks like a JID, replace it with the published item's node ID.
if (getNode().getNodeID().indexOf("@") >= 0) { if (getNode().getNodeID().indexOf("@") >= 0) {
items.addAttribute("node", publishedItem.getNode().getNodeID()); items.addAttribute("node", publishedItem.getNodeID());
} }
// Add item information to the event notification // Add item information to the event notification
......
...@@ -150,7 +150,6 @@ public class NodeSubscription { ...@@ -150,7 +150,6 @@ public class NodeSubscription {
/** /**
* Creates a new subscription of the specified user with the node. * Creates a new subscription of the specified user with the node.
* *
* @param service the pubsub service hosting the node where this subscription lives.
* @param node Node to which this subscription is interested in. * @param node Node to which this subscription is interested in.
* @param owner the JID of the entity that owns this subscription. * @param owner the JID of the entity that owns this subscription.
* @param jid the JID of the user that owns the subscription. * @param jid the JID of the user that owns the subscription.
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
package org.jivesoftware.openfire.pubsub; package org.jivesoftware.openfire.pubsub;
import java.io.StringReader;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
...@@ -30,15 +29,14 @@ import java.util.Collection; ...@@ -30,15 +29,14 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.locks.Lock;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.io.SAXReader;
import org.jivesoftware.database.DbConnectionManager; import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.database.DbConnectionManager.DatabaseType; import org.jivesoftware.database.DbConnectionManager.DatabaseType;
import org.jivesoftware.openfire.cluster.ClusterManager; import org.jivesoftware.openfire.cluster.ClusterManager;
...@@ -49,6 +47,7 @@ import org.jivesoftware.util.JiveGlobals; ...@@ -49,6 +47,7 @@ import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LinkedList; import org.jivesoftware.util.LinkedList;
import org.jivesoftware.util.LinkedListNode; import org.jivesoftware.util.LinkedListNode;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -198,11 +197,7 @@ public class PubSubPersistenceManager { ...@@ -198,11 +197,7 @@ public class PubSubPersistenceManager {
private static Timer purgeTimer = new Timer("Pubsub purge stale items timer"); private static Timer purgeTimer = new Timer("Pubsub purge stale items timer");
private static long purgeTimerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.purge.timer", 300) * 1000; private static long purgeTimerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.purge.timer", 300) * 1000;
private static final int POOL_SIZE = 50; private static final int MAX_ITEMS_FLUSH = JiveGlobals.getIntProperty("xmpp.pubsub.flush.max", 1000);
private static final int ITEM_CACHE_SIZE = JiveGlobals.getIntProperty("xmpp.pubsub.item.cache", 1000);
private static int publishedItemSize = 0;
private static final Object itemLock = new Object();
private static final int MAX_ROWS_FETCH = JiveGlobals.getIntProperty("xmpp.pubsub.fetch.max", 2000); private static final int MAX_ROWS_FETCH = JiveGlobals.getIntProperty("xmpp.pubsub.fetch.max", 2000);
...@@ -220,21 +215,19 @@ public class PubSubPersistenceManager { ...@@ -220,21 +215,19 @@ public class PubSubPersistenceManager {
* Keeps reference to published items that haven't been persisted yet so they can be removed * Keeps reference to published items that haven't been persisted yet so they can be removed
* before being deleted. * before being deleted.
*/ */
private static final HashMap<String, LinkedListNode> itemMap = new HashMap<String, LinkedListNode>(); private static final HashMap<String, LinkedListNode> itemsPending = new HashMap<String, LinkedListNode>();
/** /**
* Pool of SAX Readers. SAXReader is not thread safe so we need to have a pool of readers. * Cache name for recently accessed published items.
*/ */
private static BlockingQueue<SAXReader> xmlReaders = new LinkedBlockingQueue<SAXReader>(POOL_SIZE); private static final String ITEM_CACHE = "Published Items";
/**
* Cache for recently accessed published items.
*/
private static final Cache<String, PublishedItem> itemCache = CacheFactory.createCache(ITEM_CACHE);
static { static {
// Initialize the pool of sax readers
for (int i=0; i<POOL_SIZE; i++) {
SAXReader xmlReader = new SAXReader();
xmlReader.setEncoding("UTF-8");
xmlReaders.add(xmlReader);
}
// Enforce a min of 20s // Enforce a min of 20s
if (flushTimerDelay < 20000) if (flushTimerDelay < 20000)
flushTimerDelay = 20000; flushTimerDelay = 20000;
...@@ -244,7 +237,7 @@ public class PubSubPersistenceManager { ...@@ -244,7 +237,7 @@ public class PubSubPersistenceManager {
@Override @Override
public void run() public void run()
{ {
flushItems(); flushPendingItems(false); // this member only
} }
}, flushTimerDelay, flushTimerDelay); }, flushTimerDelay, flushTimerDelay);
...@@ -265,7 +258,6 @@ public class PubSubPersistenceManager { ...@@ -265,7 +258,6 @@ public class PubSubPersistenceManager {
/** /**
* Creates and stores the node configuration in the database. * Creates and stores the node configuration in the database.
* *
* @param service The pubsub service that is hosting the node.
* @param node The newly created node. * @param node The newly created node.
*/ */
public static void createNode(Node node) { public static void createNode(Node node) {
...@@ -340,7 +332,6 @@ public class PubSubPersistenceManager { ...@@ -340,7 +332,6 @@ public class PubSubPersistenceManager {
/** /**
* Updates the node configuration in the database. * Updates the node configuration in the database.
* *
* @param service The pubsub service that is hosting the node.
* @param node The updated node. * @param node The updated node.
*/ */
public static void updateNode(Node node) { public static void updateNode(Node node) {
...@@ -475,7 +466,6 @@ public class PubSubPersistenceManager { ...@@ -475,7 +466,6 @@ public class PubSubPersistenceManager {
/** /**
* Removes the specified node from the DB. * Removes the specified node from the DB.
* *
* @param service The pubsub service that is hosting the node.
* @param node The node that is being deleted. * @param node The node that is being deleted.
* @return true If the operation was successful. * @return true If the operation was successful.
*/ */
...@@ -507,11 +497,7 @@ public class PubSubPersistenceManager { ...@@ -507,11 +497,7 @@ public class PubSubPersistenceManager {
DbConnectionManager.fastcloseStmt(pstmt); DbConnectionManager.fastcloseStmt(pstmt);
// Remove published items of the node being deleted // Remove published items of the node being deleted
pstmt = con.prepareStatement(DELETE_ITEMS); if (node instanceof LeafNode) { purgeNode((LeafNode)node); }
pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID()));
pstmt.executeUpdate();
DbConnectionManager.closeStatement(pstmt);
// Remove all affiliates from the table of node affiliates // Remove all affiliates from the table of node affiliates
pstmt = con.prepareStatement(DELETE_AFFILIATIONS); pstmt = con.prepareStatement(DELETE_AFFILIATIONS);
...@@ -934,7 +920,6 @@ public class PubSubPersistenceManager { ...@@ -934,7 +920,6 @@ public class PubSubPersistenceManager {
/** /**
* Update the DB with the new affiliation of the user in the node. * Update the DB with the new affiliation of the user in the node.
* *
* @param service The pubsub service that is hosting the node.
* @param node The node where the affiliation of the user was updated. * @param node The node where the affiliation of the user was updated.
* @param affiliate The new affiliation of the user in the node. * @param affiliate The new affiliation of the user in the node.
* @param create True if this is a new affiliate. * @param create True if this is a new affiliate.
...@@ -974,7 +959,6 @@ public class PubSubPersistenceManager { ...@@ -974,7 +959,6 @@ public class PubSubPersistenceManager {
/** /**
* Removes the affiliation and subsription state of the user from the DB. * Removes the affiliation and subsription state of the user from the DB.
* *
* @param service The pubsub service that is hosting the node.
* @param node The node where the affiliation of the user was updated. * @param node The node where the affiliation of the user was updated.
* @param affiliate The existing affiliation and subsription state of the user in the node. * @param affiliate The existing affiliation and subsription state of the user in the node.
*/ */
...@@ -1001,7 +985,6 @@ public class PubSubPersistenceManager { ...@@ -1001,7 +985,6 @@ public class PubSubPersistenceManager {
/** /**
* Updates the DB with the new subsription of the user to the node. * Updates the DB with the new subsription of the user to the node.
* *
* @param service The pubsub service that is hosting the node.
* @param node The node where the user has subscribed to. * @param node The node where the user has subscribed to.
* @param subscription The new subscription of the user to the node. * @param subscription The new subscription of the user to the node.
* @param create True if this is a new affiliate. * @param create True if this is a new affiliate.
...@@ -1086,8 +1069,6 @@ public class PubSubPersistenceManager { ...@@ -1086,8 +1069,6 @@ public class PubSubPersistenceManager {
/** /**
* Removes the subscription of the user from the DB. * Removes the subscription of the user from the DB.
* *
* @param service The pubsub service that is hosting the node.
* @param node The node where the user was subscribed to.
* @param subscription The existing subsription of the user to the node. * @param subscription The existing subsription of the user to the node.
*/ */
public static void removeSubscription(NodeSubscription subscription) { public static void removeSubscription(NodeSubscription subscription) {
...@@ -1114,63 +1095,78 @@ public class PubSubPersistenceManager { ...@@ -1114,63 +1095,78 @@ public class PubSubPersistenceManager {
* Creates and stores the published item in the database. * Creates and stores the published item in the database.
* *
* @param item The published item to save. * @param item The published item to save.
* @return true if the item was successfully saved to the database.
*/ */
public static void savePublishedItem(PublishedItem item) { public static void savePublishedItem(PublishedItem item) {
synchronized(itemLock) String itemKey = item.getItemKey();
{ itemCache.put(itemKey, item);
LinkedListNode nodeToReplace = itemMap.get(item.getID()); log.debug("Added new (inbound) item to cache");
synchronized (itemsPending) {
if (nodeToReplace != null) LinkedListNode itemToReplace = itemsPending.remove(itemKey);
{ if (itemToReplace != null) {
nodeToReplace.remove(); itemToReplace.remove(); // delete from itemsToAdd linked list
publishedItemSize--;
} }
LinkedListNode listNode = itemsToAdd.addLast(item); LinkedListNode listNode = itemsToAdd.addLast(item);
itemMap.put(item.getID(), listNode); itemsPending.put(itemKey, listNode);
publishedItemSize++; }
if (itemsPending.size() > MAX_ITEMS_FLUSH) {
if (publishedItemSize > ITEM_CACHE_SIZE) flushPendingItems();
flushItems(); }
}
} }
/** /**
* Flush the cache of items to be persisted and deleted. * Flush the cache of items to be persisted and deleted.
*/ */
public static void flushItems() public static void flushPendingItems()
{ {
flushItems(ClusterManager.isClusteringEnabled()); flushPendingItems(ClusterManager.isClusteringEnabled());
} }
/** /**
* Flush the cache of items to be persisted and deleted. * Flush the cache of items to be persisted and deleted.
* @param sendToCluster If true, delegate to cluster members, otherwise local only * @param sendToCluster If true, delegate to cluster members, otherwise local only
*/ */
public static void flushItems(boolean sendToCluster) public static void flushPendingItems(boolean sendToCluster)
{ {
log.debug("Flushing items to database");
if (sendToCluster) { if (sendToCluster) {
CacheFactory.doSynchronousClusterTask(new FlushTask(), false); CacheFactory.doSynchronousClusterTask(new FlushTask(), false);
}
if (itemsToAdd.getFirst() == null && itemsToDelete.getFirst() == null) {
return; // nothing to do for this cluster member
} }
if (log.isDebugEnabled()) {
log.debug("Flush " + itemsPending.size() + " pending items to database");
}
boolean abortTransaction = false; boolean abortTransaction = false;
LinkedList addList = null; LinkedList addList = null;
LinkedList delList = null; LinkedList delList = null;
// Swap the cache so we can parse and save the contents from this point in time // Swap pending items so we can parse and save the contents from this point in time
// while not blocking new entries from being cached. // while not blocking new entries from being cached.
synchronized(itemLock) synchronized(itemsPending)
{ {
addList = itemsToAdd; addList = itemsToAdd;
delList = itemsToDelete; delList = itemsToDelete;
itemsToAdd = new LinkedList(); itemsToAdd = new LinkedList();
itemsToDelete = new LinkedList(); itemsToDelete = new LinkedList();
itemMap.clear();
publishedItemSize = 0; // Ensure pending items are available via the item cache;
// this allows the item(s) to be fetched by other thread(s)
// while being written to the DB from this thread
int copied = 0;
for (String key : itemsPending.keySet()) {
if (!itemCache.containsKey(key)) {
itemCache.put(key, (PublishedItem) itemsPending.get(key).object);
copied++;
}
}
if (log.isDebugEnabled() && copied > 0) {
log.debug("Added " + copied + " pending items to published item cache");
}
itemsPending.clear();
} }
LinkedListNode addItem = addList.getFirst(); LinkedListNode addItem = addList.getFirst();
...@@ -1197,7 +1193,7 @@ public class PubSubPersistenceManager { ...@@ -1197,7 +1193,7 @@ public class PubSubPersistenceManager {
{ {
PublishedItem item = (PublishedItem) addItem.object; PublishedItem item = (PublishedItem) addItem.object;
pstmt.setString(1, item.getNode().getService().getServiceID()); pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID())); pstmt.setString(2, encodeNodeID(item.getNodeID()));
pstmt.setString(3, item.getID()); pstmt.setString(3, item.getID());
pstmt.setString(4, item.getPublisher().toString()); pstmt.setString(4, item.getPublisher().toString());
pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate())); pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate()));
...@@ -1242,16 +1238,16 @@ public class PubSubPersistenceManager { ...@@ -1242,16 +1238,16 @@ public class PubSubPersistenceManager {
* Removes the specified published item from the DB. * Removes the specified published item from the DB.
* *
* @param item The published item to delete. * @param item The published item to delete.
* @return true if the item was successfully deleted from the database.
*/ */
public static void removePublishedItem(PublishedItem item) { public static void removePublishedItem(PublishedItem item) {
synchronized (itemLock) String itemKey = item.getItemKey();
itemCache.remove(itemKey);
synchronized (itemsPending)
{ {
itemsToDelete.addLast(item); itemsToDelete.addLast(item);
LinkedListNode nodeToDelete = itemMap.remove(item.getID()); LinkedListNode itemToDelete = itemsPending.remove(itemKey);
if (itemToDelete != null)
if (nodeToDelete != null) itemToDelete.remove();
nodeToDelete.remove();
} }
} }
...@@ -1406,7 +1402,6 @@ public class PubSubPersistenceManager { ...@@ -1406,7 +1402,6 @@ public class PubSubPersistenceManager {
/** /**
* Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}. * Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}.
* *
* @param service the pubsub service that is hosting the node.
* @param node the leaf node to load its published items. * @param node the leaf node to load its published items.
*/ */
public static List<PublishedItem> getPublishedItems(LeafNode node) { public static List<PublishedItem> getPublishedItems(LeafNode node) {
...@@ -1416,16 +1411,20 @@ public class PubSubPersistenceManager { ...@@ -1416,16 +1411,20 @@ public class PubSubPersistenceManager {
/** /**
* Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}. * Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}.
* *
* @param service the pubsub service that is hosting the node.
* @param node the leaf node to load its published items. * @param node the leaf node to load its published items.
*/ */
public static List<PublishedItem> getPublishedItems(LeafNode node, int maxRows) { public static List<PublishedItem> getPublishedItems(LeafNode node, int maxRows) {
// Flush all items to the database first to ensure the list is all recent items. Lock itemLock = CacheFactory.getLock(ITEM_CACHE, itemCache);
flushItems(); try {
Connection con = null; // NOTE: force other requests to wait for DB I/O to complete
itemLock.lock();
flushPendingItems();
} finally {
itemLock.unlock();
}
Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
SAXReader xmlReader = null;
int max = MAX_ROWS_FETCH; int max = MAX_ROWS_FETCH;
int maxPublished = node.getMaxPublishedItems(); int maxPublished = node.getMaxPublishedItems();
...@@ -1439,8 +1438,6 @@ public class PubSubPersistenceManager { ...@@ -1439,8 +1438,6 @@ public class PubSubPersistenceManager {
List<PublishedItem> results = new ArrayList<PublishedItem>(max); List<PublishedItem> results = new ArrayList<PublishedItem>(max);
try { try {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
con = DbConnectionManager.getConnection(); con = DbConnectionManager.getConnection();
// Get published items of the specified node // Get published items of the specified node
pstmt = con.prepareStatement(LOAD_ITEMS); pstmt = con.prepareStatement(LOAD_ITEMS);
...@@ -1459,8 +1456,7 @@ public class PubSubPersistenceManager { ...@@ -1459,8 +1456,7 @@ public class PubSubPersistenceManager {
PublishedItem item = new PublishedItem(node, publisher, itemID, creationDate); PublishedItem item = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item // Add the extra fields to the published item
if (rs.getString(4) != null) { if (rs.getString(4) != null) {
item.setPayload( item.setPayloadXML(rs.getString(4));
xmlReader.read(new StringReader(rs.getString(4))).getRootElement());
} }
// Add the published item to the node // Add the published item to the node
results.add(item); results.add(item);
...@@ -1471,10 +1467,6 @@ public class PubSubPersistenceManager { ...@@ -1471,10 +1467,6 @@ public class PubSubPersistenceManager {
log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
// Return the sax reader to the pool
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
DbConnectionManager.closeConnection(rs, pstmt, con); DbConnectionManager.closeConnection(rs, pstmt, con);
} }
...@@ -1490,16 +1482,20 @@ public class PubSubPersistenceManager { ...@@ -1490,16 +1482,20 @@ public class PubSubPersistenceManager {
* @param node the leaf node to load its last published items. * @param node the leaf node to load its last published items.
*/ */
public static PublishedItem getLastPublishedItem(LeafNode node) { public static PublishedItem getLastPublishedItem(LeafNode node) {
flushItems(); Lock itemLock = CacheFactory.getLock(ITEM_CACHE, itemCache);
try {
// NOTE: force other requests to wait for DB I/O to complete
itemLock.lock();
flushPendingItems();
} finally {
itemLock.unlock();
}
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
SAXReader xmlReader = null;
PublishedItem item = null; PublishedItem item = null;
try { try {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
con = DbConnectionManager.getConnection(); con = DbConnectionManager.getConnection();
// Get published items of the specified node // Get published items of the specified node
pstmt = con.prepareStatement(LOAD_LAST_ITEM); pstmt = con.prepareStatement(LOAD_LAST_ITEM);
...@@ -1515,8 +1511,7 @@ public class PubSubPersistenceManager { ...@@ -1515,8 +1511,7 @@ public class PubSubPersistenceManager {
item = new PublishedItem(node, publisher, itemID, creationDate); item = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item // Add the extra fields to the published item
if (rs.getString(4) != null) { if (rs.getString(4) != null) {
item.setPayload( item.setPayloadXML(rs.getString(4));
xmlReader.read(new StringReader(rs.getString(4))).getRootElement());
} }
} }
} }
...@@ -1524,60 +1519,71 @@ public class PubSubPersistenceManager { ...@@ -1524,60 +1519,71 @@ public class PubSubPersistenceManager {
log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
// Return the sax reader to the pool
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
DbConnectionManager.closeConnection(rs, pstmt, con); DbConnectionManager.closeConnection(rs, pstmt, con);
} }
return item; return item;
} }
public static PublishedItem getPublishedItem(LeafNode node, String itemID) { public static PublishedItem getPublishedItem(LeafNode node, String itemID) {
flushItems(); String itemKey = PublishedItem.getItemKey(node, itemID);
Connection con = null;
PreparedStatement pstmt = null; // try to fetch from cache first without locking
ResultSet rs = null; PublishedItem result = itemCache.get(itemKey);
SAXReader xmlReader = null; if (result == null) {
PublishedItem result = null; Lock itemLock = CacheFactory.getLock(ITEM_CACHE, itemCache);
try {
try { // Acquire lock, then re-check cache before reading from DB;
xmlReader = xmlReaders.take(); // allows clustered item cache to be primed by first request
con = DbConnectionManager.getConnection(); itemLock.lock();
pstmt = con.prepareStatement(LOAD_ITEM); result = itemCache.get(itemKey);
pstmt.setString(1, node.getService().getServiceID()); if (result == null) {
pstmt.setString(2, node.getNodeID()); flushPendingItems();
pstmt.setString(3, itemID);
rs = pstmt.executeQuery(); // fetch item from DB
Connection con = null;
// Add to each node the corresponding subscriptions PreparedStatement pstmt = null;
if (rs.next()) { ResultSet rs = null;
JID publisher = new JID(rs.getString(1)); try {
Date creationDate = new Date(Long.parseLong(rs.getString(2).trim())); con = DbConnectionManager.getConnection();
// Create the item pstmt = con.prepareStatement(LOAD_ITEM);
result = new PublishedItem(node, publisher, itemID, creationDate); pstmt.setString(1, node.getService().getServiceID());
// Add the extra fields to the published item pstmt.setString(2, node.getNodeID());
if (rs.getString(3) != null) { pstmt.setString(3, itemID);
result.setPayload( rs = pstmt.executeQuery();
xmlReader.read(new StringReader(rs.getString(3))).getRootElement());
} // Add to each node the corresponding subscriptions
if (rs.next()) {
JID publisher = new JID(rs.getString(1));
Date creationDate = new Date(Long.parseLong(rs.getString(2).trim()));
// Create the item
result = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item
if (rs.getString(3) != null) {
result.setPayloadXML(rs.getString(3));
}
itemCache.put(itemKey, result);
log.debug("Loaded item into cache from DB");
}
} catch (Exception exc) {
log.error(exc.getMessage(), exc);
} finally {
DbConnectionManager.closeConnection(pstmt, con);
}
} else {
log.debug("Found cached item on second attempt (after acquiring lock)");
}
} finally {
itemLock.unlock();
} }
} } else {
catch (Exception exc) { log.debug("Found cached item on first attempt (no lock)");
log.error(exc.getMessage(), exc); }
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
if (xmlReader != null)
xmlReaders.add(xmlReader);
}
return result; return result;
} }
public static void purgeNode(LeafNode leafNode) { public static void purgeNode(LeafNode leafNode) {
flushItems(); flushPendingItems();
// Remove published items of the node being deleted // Remove published items of the node being deleted
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
...@@ -1588,6 +1594,16 @@ public class PubSubPersistenceManager { ...@@ -1588,6 +1594,16 @@ public class PubSubPersistenceManager {
pstmt.setString(1, leafNode.getService().getServiceID()); pstmt.setString(1, leafNode.getService().getServiceID());
pstmt.setString(2, encodeNodeID(leafNode.getNodeID())); pstmt.setString(2, encodeNodeID(leafNode.getNodeID()));
pstmt.executeUpdate(); pstmt.executeUpdate();
// drop cached items for purged node
synchronized(itemCache) {
Iterator<PublishedItem> items = itemCache.values().iterator();
while (items.hasNext()) {
if (leafNode.getNodeID().equals(items.next().getNodeID())) {
items.remove();
}
}
}
} }
catch (Exception exc) { catch (Exception exc) {
log.error(exc.getMessage(), exc); log.error(exc.getMessage(), exc);
...@@ -1724,7 +1740,7 @@ public class PubSubPersistenceManager { ...@@ -1724,7 +1740,7 @@ public class PubSubPersistenceManager {
public static void shutdown() public static void shutdown()
{ {
flushItems(); flushPendingItems();
purgeItems(); purgeItems();
} }
} }
...@@ -20,10 +20,19 @@ ...@@ -20,10 +20,19 @@
package org.jivesoftware.openfire.pubsub; package org.jivesoftware.openfire.pubsub;
import org.xmpp.packet.JID; import java.io.Serializable;
import org.dom4j.Element; import java.io.StringReader;
import java.util.Date; import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.pep.PEPServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
/** /**
* A published item to a node. Once an item was published to a node, node subscribers will be * A published item to a node. Once an item was published to a node, node subscribers will be
...@@ -37,9 +46,28 @@ import java.util.Date; ...@@ -37,9 +46,28 @@ import java.util.Date;
* *
* @author Matt Tucker * @author Matt Tucker
*/ */
public class PublishedItem { public class PublishedItem implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PublishedItem.class);
private static final int POOL_SIZE = 50;
/** /**
* Pool of SAX Readers. SAXReader is not thread safe so we need to have a pool of readers.
*/
private static BlockingQueue<SAXReader> xmlReaders = new LinkedBlockingQueue<SAXReader>(POOL_SIZE);
private static final long serialVersionUID = 7012925993623144574L;
static {
// Initialize the pool of sax readers
for (int i=0; i<POOL_SIZE; i++) {
SAXReader xmlReader = new SAXReader();
xmlReader.setEncoding("UTF-8");
xmlReaders.add(xmlReader);
}
}
/**
* JID of the entity that published the item to the node. This is the full JID * JID of the entity that published the item to the node. This is the full JID
* of the publisher. * of the publisher.
*/ */
...@@ -47,7 +75,15 @@ public class PublishedItem { ...@@ -47,7 +75,15 @@ public class PublishedItem {
/** /**
* The node where the item was published. * The node where the item was published.
*/ */
private LeafNode node; private transient LeafNode node;
/**
* The id for the node where the item was published.
*/
private String nodeId;
/**
* The id for the service hosting the node for this item
*/
private String serviceId;
/** /**
* ID that uniquely identifies the published item in the node. * ID that uniquely identifies the published item in the node.
*/ */
...@@ -57,29 +93,49 @@ public class PublishedItem { ...@@ -57,29 +93,49 @@ public class PublishedItem {
*/ */
private Date creationDate; private Date creationDate;
/** /**
* The payload included when publishing the item. * The optional payload is included when publishing the item. This value
* is created from the payload XML and cached as/when needed.
*/ */
private Element payload; private transient Element payload;
/** /**
* XML representation of the payload. This is actually a cache that avoids * XML representation of the payload (for serialization)
* doing Element#asXML.
*/ */
private String payloadXML; private String payloadXML;
PublishedItem(LeafNode node, JID publisher, String id, Date creationDate) { PublishedItem(LeafNode node, JID publisher, String id, Date creationDate) {
this.node = node; this.node = node;
this.nodeId = node.getNodeID();
this.serviceId = node.getService().getServiceID();
this.publisher = publisher; this.publisher = publisher;
this.id = id; this.id = id;
this.creationDate = creationDate; this.creationDate = creationDate;
} }
/**
* Returns the id for the {@link LeafNode} where this item was published.
*
* @return the ID for the leaf node where this item was published.
*/
public String getNodeID() {
return nodeId;
}
/** /**
* Returns the {@link LeafNode} where this item was published. * Returns the {@link LeafNode} where this item was published.
* *
* @return the leaf node where this item was published. * @return the leaf node where this item was published.
*/ */
public LeafNode getNode() { public LeafNode getNode() {
return node; if (node == null) {
if (Node.PUBSUB_SVC_ID.equals(serviceId)) {
node = (LeafNode) XMPPServer.getInstance().getPubSubModule().getNode(nodeId);
} else {
PEPServiceManager serviceMgr = XMPPServer.getInstance().getIQPEPHandler().getServiceManager();
node = serviceMgr.hasCachedService(new JID(serviceId)) ?
(LeafNode) serviceMgr.getPEPService(serviceId).getNode(nodeId) : null;
}
}
return node;
} }
/** /**
...@@ -117,6 +173,20 @@ public class PublishedItem { ...@@ -117,6 +173,20 @@ public class PublishedItem {
* @return the payload included when publishing the item or <tt>null</tt> if none was found. * @return the payload included when publishing the item or <tt>null</tt> if none was found.
*/ */
public Element getPayload() { public Element getPayload() {
if (payload == null && payloadXML != null) {
// payload initialized as XML string from DB
SAXReader xmlReader = null;
try {
xmlReader = xmlReaders.take();
payload = xmlReader.read(new StringReader(payloadXML)).getRootElement();
} catch (Exception ex) {
log.error("Failed to parse payload XML", ex);
} finally {
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
}
}
return payload; return payload;
} }
...@@ -131,6 +201,19 @@ public class PublishedItem { ...@@ -131,6 +201,19 @@ public class PublishedItem {
return payloadXML; return payloadXML;
} }
/**
* Sets the payload included when publishing the item. A published item may or may not
* have a payload. Transient nodes that are configured to not broadcast payloads may allow
* published items to have no payload.
*
* @param payloadXML the payload included when publishing the item or <tt>null</tt>
* if none was found.
*/
void setPayloadXML(String payloadXML) {
this.payloadXML = payloadXML;
this.payload = null; // will be recreated only if needed
}
/** /**
* Sets the payload included when publishing the item. A published item may or may not * Sets the payload included when publishing the item. A published item may or may not
* have a payload. Transient nodes that are configured to not broadcast payloads may allow * have a payload. Transient nodes that are configured to not broadcast payloads may allow
...@@ -144,8 +227,7 @@ public class PublishedItem { ...@@ -144,8 +227,7 @@ public class PublishedItem {
// Update XML representation of the payload // Update XML representation of the payload
if (payload == null) { if (payload == null) {
payloadXML = null; payloadXML = null;
} } else {
else {
payloadXML = payload.asXML(); payloadXML = payload.asXML();
} }
} }
...@@ -158,7 +240,7 @@ public class PublishedItem { ...@@ -158,7 +240,7 @@ public class PublishedItem {
* @return true if payload contains the specified keyword. * @return true if payload contains the specified keyword.
*/ */
boolean containsKeyword(String keyword) { boolean containsKeyword(String keyword) {
if (payloadXML == null || keyword == null) { if (getPayloadXML() == null || keyword == null) {
return true; return true;
} }
return payloadXML.contains(keyword); return payloadXML.contains(keyword);
...@@ -173,9 +255,41 @@ public class PublishedItem { ...@@ -173,9 +255,41 @@ public class PublishedItem {
*/ */
public boolean canDelete(JID user) { public boolean canDelete(JID user) {
if (publisher.equals(user) || publisher.toBareJID().equals(user.toBareJID()) || if (publisher.equals(user) || publisher.toBareJID().equals(user.toBareJID()) ||
node.isAdmin(user)) { getNode().isAdmin(user)) {
return true; return true;
} }
return false; return false;
} }
/**
* Returns a string that uniquely identifies this published item
* in the following format: <i>nodeId:itemId</i>
* @return Unique identifier for this item
*/
public String getItemKey() {
return getItemKey(nodeId,id);
}
/**
* Returns a string that uniquely identifies this published item
* in the following format: <i>nodeId:itemId</i>
* @param node Node for the published item
* @param itemId Id for the published item (unique within the node)
* @return Unique identifier for this item
*/
public static String getItemKey(LeafNode node, String itemId) {
return getItemKey(node.getNodeID(), itemId);
}
/**
* Returns a string that uniquely identifies this published item
* in the following format: <i>nodeId:itemId</i>
* @param nodeId Node id for the published item
* @param itemId Id for the published item (unique within the node)
* @return Unique identifier for this item
*/
public static String getItemKey(String nodeId, String itemId) {
return new StringBuilder(nodeId)
.append(":").append(itemId).toString();
}
} }
...@@ -17,7 +17,7 @@ public class FlushTask implements ClusterTask ...@@ -17,7 +17,7 @@ public class FlushTask implements ClusterTask
@Override @Override
public void run() public void run()
{ {
PubSubPersistenceManager.flushItems(false); // just this member PubSubPersistenceManager.flushPendingItems(false); // just this member
} }
@Override @Override
......
...@@ -128,6 +128,7 @@ public class CacheFactory { ...@@ -128,6 +128,7 @@ public class CacheFactory {
cacheNames.put("Entity Capabilities Users", "entityCapabilitiesUsers"); cacheNames.put("Entity Capabilities Users", "entityCapabilitiesUsers");
cacheNames.put("Clearspace SSO Nonce", "clearspaceSSONonce"); cacheNames.put("Clearspace SSO Nonce", "clearspaceSSONonce");
cacheNames.put("PEPServiceManager", "pepServiceManager"); cacheNames.put("PEPServiceManager", "pepServiceManager");
cacheNames.put("Published Items", "publishedItems");
cacheProps.put("cache.fileTransfer.size", 128 * 1024l); cacheProps.put("cache.fileTransfer.size", 128 * 1024l);
cacheProps.put("cache.fileTransfer.maxLifetime", 1000 * 60 * 10l); cacheProps.put("cache.fileTransfer.maxLifetime", 1000 * 60 * 10l);
...@@ -199,6 +200,8 @@ public class CacheFactory { ...@@ -199,6 +200,8 @@ public class CacheFactory {
cacheProps.put("cache.clearspaceSSONonce.maxLifetime", JiveConstants.MINUTE * 2); cacheProps.put("cache.clearspaceSSONonce.maxLifetime", JiveConstants.MINUTE * 2);
cacheProps.put("cache.pepServiceManager.size", 1024l * 1024 * 10); cacheProps.put("cache.pepServiceManager.size", 1024l * 1024 * 10);
cacheProps.put("cache.pepServiceManager.maxLifetime", JiveConstants.MINUTE * 30); cacheProps.put("cache.pepServiceManager.maxLifetime", JiveConstants.MINUTE * 30);
cacheProps.put("cache.publishedItems.size", 1024l * 1024 * 10);
cacheProps.put("cache.publishedItems.maxLifetime", JiveConstants.MINUTE * 15);
} }
private CacheFactory() { private CacheFactory() {
......
...@@ -523,6 +523,25 @@ http://www.tangosol.com/UserGuide-Reference-CacheConfig.jsp ...@@ -523,6 +523,25 @@ http://www.tangosol.com/UserGuide-Reference-CacheConfig.jsp
</init-params> </init-params>
</cache-mapping> </cache-mapping>
<cache-mapping>
<cache-name>Published Items</cache-name>
<scheme-name>replicated</scheme-name>
<init-params>
<init-param>
<param-name>back-size-high</param-name>
<param-value>10485760</param-value>
</init-param>
<init-param>
<param-name>back-expiry</param-name>
<param-value>15m</param-value>
</init-param>
<init-param>
<param-name>back-size-low</param-name>
<param-value>9437180</param-value>
</init-param>
</init-params>
</cache-mapping>
<!-- partitioned caches --> <!-- partitioned caches -->
<cache-mapping> <cache-mapping>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment