Commit 4c4cb9b8 authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-205: Prevent cyclical flushItems call across cluster members

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/pubsub_clustering@13261 b35dd754-fafc-0310-a699-88a17e54d16e
parent e008f6da
...@@ -41,6 +41,7 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -41,6 +41,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.jivesoftware.database.DbConnectionManager; import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.database.DbConnectionManager.DatabaseType; import org.jivesoftware.database.DbConnectionManager.DatabaseType;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.pubsub.cluster.FlushTask; import org.jivesoftware.openfire.pubsub.cluster.FlushTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel; import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel; import org.jivesoftware.openfire.pubsub.models.PublisherModel;
...@@ -61,14 +62,14 @@ import org.xmpp.packet.JID; ...@@ -61,14 +62,14 @@ import org.xmpp.packet.JID;
public class PubSubPersistenceManager { public class PubSubPersistenceManager {
private static final Logger log = LoggerFactory.getLogger(PubSubPersistenceManager.class); private static final Logger log = LoggerFactory.getLogger(PubSubPersistenceManager.class);
private static final String PURGE_FOR_SIZE = private static final String PURGE_FOR_SIZE =
"DELETE ofPubsubItem FROM ofPubsubItem LEFT JOIN " + "DELETE ofPubsubItem FROM ofPubsubItem LEFT JOIN " +
"(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? " + "(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? " +
"ORDER BY creationDate DESC LIMIT ?) AS noDelete " + "ORDER BY creationDate DESC LIMIT ?) AS noDelete " +
"ON ofPubsubItem.id = noDelete.id WHERE noDelete.id IS NULL AND " + "ON ofPubsubItem.id = noDelete.id WHERE noDelete.id IS NULL AND " +
"ofPubsubItem.serviceID = ? AND nodeID = ?"; "ofPubsubItem.serviceID = ? AND nodeID = ?";
private static final String PURGE_FOR_SIZE_HSQLDB = "DELETE FROM ofPubsubItem WHERE serviceID=? AND nodeID=? AND id NOT IN " private static final String PURGE_FOR_SIZE_HSQLDB = "DELETE FROM ofPubsubItem WHERE serviceID=? AND nodeID=? AND id NOT IN "
+ "(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC LIMIT ?)"; + "(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC LIMIT ?)";
...@@ -197,14 +198,14 @@ public class PubSubPersistenceManager { ...@@ -197,14 +198,14 @@ public class PubSubPersistenceManager {
private static Timer purgeTimer = new Timer("Pubsub purge stale items timer"); private static Timer purgeTimer = new Timer("Pubsub purge stale items timer");
private static long purgeTimerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.purge.timer", 300) * 1000; private static long purgeTimerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.purge.timer", 300) * 1000;
private static final int POOL_SIZE = 50; private static final int POOL_SIZE = 50;
private static final int ITEM_CACHE_SIZE = JiveGlobals.getIntProperty("xmpp.pubsub.item.cache", 1000); private static final int ITEM_CACHE_SIZE = JiveGlobals.getIntProperty("xmpp.pubsub.item.cache", 1000);
private static int publishedItemSize = 0; private static int publishedItemSize = 0;
private static final Object itemLock = new Object(); private static final Object itemLock = new Object();
private static final int MAX_ROWS_FETCH = JiveGlobals.getIntProperty("xmpp.pubsub.fetch.max", 2000); private static final int MAX_ROWS_FETCH = JiveGlobals.getIntProperty("xmpp.pubsub.fetch.max", 2000);
/** /**
* Queue that holds the items that need to be added to the database. * Queue that holds the items that need to be added to the database.
*/ */
...@@ -233,11 +234,11 @@ public class PubSubPersistenceManager { ...@@ -233,11 +234,11 @@ public class PubSubPersistenceManager {
xmlReader.setEncoding("UTF-8"); xmlReader.setEncoding("UTF-8");
xmlReaders.add(xmlReader); xmlReaders.add(xmlReader);
} }
// Enforce a min of 20s // Enforce a min of 20s
if (flushTimerDelay < 20000) if (flushTimerDelay < 20000)
flushTimerDelay = 20000; flushTimerDelay = 20000;
flushTimer.schedule(new TimerTask() flushTimer.schedule(new TimerTask()
{ {
@Override @Override
...@@ -629,7 +630,7 @@ public class PubSubPersistenceManager { ...@@ -629,7 +630,7 @@ public class PubSubPersistenceManager {
/** /**
* Loads all nodes from the database and adds them to the PubSub service. * Loads all nodes from the database and adds them to the PubSub service.
* *
* @param service * @param service
* the pubsub service that is hosting the nodes. * the pubsub service that is hosting the nodes.
*/ */
...@@ -1119,17 +1120,17 @@ public class PubSubPersistenceManager { ...@@ -1119,17 +1120,17 @@ public class PubSubPersistenceManager {
synchronized(itemLock) synchronized(itemLock)
{ {
LinkedListNode nodeToReplace = itemMap.get(item.getID()); LinkedListNode nodeToReplace = itemMap.get(item.getID());
if (nodeToReplace != null) if (nodeToReplace != null)
{ {
nodeToReplace.remove(); nodeToReplace.remove();
publishedItemSize--; publishedItemSize--;
} }
LinkedListNode listNode = itemsToAdd.addLast(item); LinkedListNode listNode = itemsToAdd.addLast(item);
itemMap.put(item.getID(), listNode); itemMap.put(item.getID(), listNode);
publishedItemSize++; publishedItemSize++;
if (publishedItemSize > ITEM_CACHE_SIZE) if (publishedItemSize > ITEM_CACHE_SIZE)
flushItems(); flushItems();
} }
...@@ -1139,38 +1140,49 @@ public class PubSubPersistenceManager { ...@@ -1139,38 +1140,49 @@ public class PubSubPersistenceManager {
* Flush the cache of items to be persisted and deleted. * Flush the cache of items to be persisted and deleted.
*/ */
public static void flushItems() public static void flushItems()
{
flushItems(ClusterManager.isClusteringEnabled());
}
/**
* Flush the cache of items to be persisted and deleted.
* @param sendToCluster If true, delegate to cluster members, otherwise local only
*/
public static void flushItems(boolean sendToCluster)
{ {
log.debug("Flushing items to database"); log.debug("Flushing items to database");
if (sendToCluster) {
CacheFactory.doSynchronousClusterTask(new FlushTask(), false); CacheFactory.doSynchronousClusterTask(new FlushTask(), false);
}
boolean abortTransaction = false; boolean abortTransaction = false;
LinkedList addList = null; LinkedList addList = null;
LinkedList delList = null; LinkedList delList = null;
// Swap the cache so we can parse and save the contents from this point in time // Swap the cache so we can parse and save the contents from this point in time
// while not blocking new entries from being cached. // while not blocking new entries from being cached.
synchronized(itemLock) synchronized(itemLock)
{ {
addList = itemsToAdd; addList = itemsToAdd;
delList = itemsToDelete; delList = itemsToDelete;
itemsToAdd = new LinkedList(); itemsToAdd = new LinkedList();
itemsToDelete = new LinkedList(); itemsToDelete = new LinkedList();
itemMap.clear(); itemMap.clear();
publishedItemSize = 0; publishedItemSize = 0;
} }
LinkedListNode addItem = addList.getFirst(); LinkedListNode addItem = addList.getFirst();
LinkedListNode delItem = delList.getFirst(); LinkedListNode delItem = delList.getFirst();
// Check to see if there is anything to actually do. // Check to see if there is anything to actually do.
if ((addItem == null) && (delItem == null)) if ((addItem == null) && (delItem == null))
return; return;
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
try try
{ {
con = DbConnectionManager.getTransactionConnection(); con = DbConnectionManager.getTransactionConnection();
...@@ -1180,7 +1192,7 @@ public class PubSubPersistenceManager { ...@@ -1180,7 +1192,7 @@ public class PubSubPersistenceManager {
{ {
LinkedListNode addHead = addList.getLast().next; LinkedListNode addHead = addList.getLast().next;
pstmt = con.prepareStatement(ADD_ITEM); pstmt = con.prepareStatement(ADD_ITEM);
while (addItem != addHead) while (addItem != addHead)
{ {
PublishedItem item = (PublishedItem) addItem.object; PublishedItem item = (PublishedItem) addItem.object;
...@@ -1191,18 +1203,18 @@ public class PubSubPersistenceManager { ...@@ -1191,18 +1203,18 @@ public class PubSubPersistenceManager {
pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate())); pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate()));
pstmt.setString(6, item.getPayloadXML()); pstmt.setString(6, item.getPayloadXML());
pstmt.addBatch(); pstmt.addBatch();
addItem = addItem.next; addItem = addItem.next;
} }
pstmt.executeBatch(); pstmt.executeBatch();
} }
if (delItem != null) if (delItem != null)
{ {
LinkedListNode delHead = delList.getLast().next; LinkedListNode delHead = delList.getLast().next;
pstmt = con.prepareStatement(DELETE_ITEM); pstmt = con.prepareStatement(DELETE_ITEM);
while (delItem != delHead) while (delItem != delHead)
{ {
PublishedItem item = (PublishedItem) delItem.object; PublishedItem item = (PublishedItem) delItem.object;
...@@ -1210,7 +1222,7 @@ public class PubSubPersistenceManager { ...@@ -1210,7 +1222,7 @@ public class PubSubPersistenceManager {
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID())); pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID()); pstmt.setString(3, item.getID());
pstmt.executeUpdate(); pstmt.executeUpdate();
delItem = delItem.next; delItem = delItem.next;
} }
} }
...@@ -1233,11 +1245,11 @@ public class PubSubPersistenceManager { ...@@ -1233,11 +1245,11 @@ public class PubSubPersistenceManager {
* @return true if the item was successfully deleted from the database. * @return true if the item was successfully deleted from the database.
*/ */
public static void removePublishedItem(PublishedItem item) { public static void removePublishedItem(PublishedItem item) {
synchronized (itemLock) synchronized (itemLock)
{ {
itemsToDelete.addLast(item); itemsToDelete.addLast(item);
LinkedListNode nodeToDelete = itemMap.remove(item.getID()); LinkedListNode nodeToDelete = itemMap.remove(item.getID());
if (nodeToDelete != null) if (nodeToDelete != null)
nodeToDelete.remove(); nodeToDelete.remove();
} }
...@@ -1400,7 +1412,7 @@ public class PubSubPersistenceManager { ...@@ -1400,7 +1412,7 @@ public class PubSubPersistenceManager {
public static List<PublishedItem> getPublishedItems(LeafNode node) { public static List<PublishedItem> getPublishedItems(LeafNode node) {
return getPublishedItems(node, node.getMaxPublishedItems()); return getPublishedItems(node, node.getMaxPublishedItems());
} }
/** /**
* Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}. * Fetches all the results for the specified node, limited by {@link LeafNode#getMaxPublishedItems()}.
* *
...@@ -1416,16 +1428,16 @@ public class PubSubPersistenceManager { ...@@ -1416,16 +1428,16 @@ public class PubSubPersistenceManager {
SAXReader xmlReader = null; SAXReader xmlReader = null;
int max = MAX_ROWS_FETCH; int max = MAX_ROWS_FETCH;
int maxPublished = node.getMaxPublishedItems(); int maxPublished = node.getMaxPublishedItems();
// Limit the max rows until a solution is in place with Result Set Management // Limit the max rows until a solution is in place with Result Set Management
if (maxRows != -1) if (maxRows != -1)
max = maxPublished == -1 ? Math.min(maxRows, MAX_ROWS_FETCH) : Math.min(maxRows, maxPublished); max = maxPublished == -1 ? Math.min(maxRows, MAX_ROWS_FETCH) : Math.min(maxRows, maxPublished);
else if (maxPublished != -1) else if (maxPublished != -1)
max = Math.min(MAX_ROWS_FETCH, maxPublished); max = Math.min(MAX_ROWS_FETCH, maxPublished);
// We don't know how many items are in the db, so we will start with an allocation of 500 // We don't know how many items are in the db, so we will start with an allocation of 500
List<PublishedItem> results = new ArrayList<PublishedItem>(max); List<PublishedItem> results = new ArrayList<PublishedItem>(max);
try { try {
// Get a sax reader from the pool // Get a sax reader from the pool
xmlReader = xmlReaders.take(); xmlReader = xmlReaders.take();
...@@ -1437,7 +1449,7 @@ public class PubSubPersistenceManager { ...@@ -1437,7 +1449,7 @@ public class PubSubPersistenceManager {
pstmt.setString(2, encodeNodeID(node.getNodeID())); pstmt.setString(2, encodeNodeID(node.getNodeID()));
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
int counter = 0; int counter = 0;
// Rebuild loaded published items // Rebuild loaded published items
while(rs.next() && (counter < max)) { while(rs.next() && (counter < max)) {
String itemID = rs.getString(1); String itemID = rs.getString(1);
...@@ -1465,10 +1477,10 @@ public class PubSubPersistenceManager { ...@@ -1465,10 +1477,10 @@ public class PubSubPersistenceManager {
} }
DbConnectionManager.closeConnection(rs, pstmt, con); DbConnectionManager.closeConnection(rs, pstmt, con);
} }
if (results.size() == 0) if (results.size() == 0)
results = Collections.emptyList(); results = Collections.emptyList();
return results; return results;
} }
...@@ -1484,7 +1496,7 @@ public class PubSubPersistenceManager { ...@@ -1484,7 +1496,7 @@ public class PubSubPersistenceManager {
ResultSet rs = null; ResultSet rs = null;
SAXReader xmlReader = null; SAXReader xmlReader = null;
PublishedItem item = null; PublishedItem item = null;
try { try {
// Get a sax reader from the pool // Get a sax reader from the pool
xmlReader = xmlReaders.take(); xmlReader = xmlReaders.take();
...@@ -1528,7 +1540,7 @@ public class PubSubPersistenceManager { ...@@ -1528,7 +1540,7 @@ public class PubSubPersistenceManager {
ResultSet rs = null; ResultSet rs = null;
SAXReader xmlReader = null; SAXReader xmlReader = null;
PublishedItem result = null; PublishedItem result = null;
try { try {
xmlReader = xmlReaders.take(); xmlReader = xmlReaders.take();
con = DbConnectionManager.getConnection(); con = DbConnectionManager.getConnection();
...@@ -1537,7 +1549,7 @@ public class PubSubPersistenceManager { ...@@ -1537,7 +1549,7 @@ public class PubSubPersistenceManager {
pstmt.setString(2, node.getNodeID()); pstmt.setString(2, node.getNodeID());
pstmt.setString(3, itemID); pstmt.setString(3, itemID);
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
// Add to each node the corresponding subscriptions // Add to each node the corresponding subscriptions
if (rs.next()) { if (rs.next()) {
JID publisher = new JID(rs.getString(1)); JID publisher = new JID(rs.getString(1));
...@@ -1556,7 +1568,7 @@ public class PubSubPersistenceManager { ...@@ -1556,7 +1568,7 @@ public class PubSubPersistenceManager {
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
if (xmlReader != null) if (xmlReader != null)
xmlReaders.add(xmlReader); xmlReaders.add(xmlReader);
} }
...@@ -1569,7 +1581,7 @@ public class PubSubPersistenceManager { ...@@ -1569,7 +1581,7 @@ public class PubSubPersistenceManager {
// Remove published items of the node being deleted // Remove published items of the node being deleted
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
try { try {
con = DbConnectionManager.getConnection(); con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(DELETE_ITEMS); pstmt = con.prepareStatement(DELETE_ITEMS);
...@@ -1635,7 +1647,7 @@ public class PubSubPersistenceManager { ...@@ -1635,7 +1647,7 @@ public class PubSubPersistenceManager {
{ {
String persistentNodeQuery = "SELECT serviceID, nodeID, maxItems FROM ofPubsubNode WHERE " String persistentNodeQuery = "SELECT serviceID, nodeID, maxItems FROM ofPubsubNode WHERE "
+ "leaf=1 AND persistItems=1 AND maxItems > 0"; + "leaf=1 AND persistItems=1 AND maxItems > 0";
boolean abortTransaction = false; boolean abortTransaction = false;
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
...@@ -1655,9 +1667,9 @@ public class PubSubPersistenceManager { ...@@ -1655,9 +1667,9 @@ public class PubSubPersistenceManager {
String svcId = rs.getString(1); String svcId = rs.getString(1);
String nodeId = rs.getString(2); String nodeId = rs.getString(2);
int maxItems = rs.getInt(3); int maxItems = rs.getInt(3);
setPurgeParams(DbConnectionManager.getDatabaseType(), purgeNode, svcId, nodeId, maxItems); setPurgeParams(DbConnectionManager.getDatabaseType(), purgeNode, svcId, nodeId, maxItems);
purgeNode.addBatch(); purgeNode.addBatch();
} }
purgeNode.executeBatch(); purgeNode.executeBatch();
......
...@@ -22,7 +22,7 @@ public class FlushTask implements ClusterTask ...@@ -22,7 +22,7 @@ public class FlushTask implements ClusterTask
public void run() public void run()
{ {
log.debug("[TASK] Flush pubsub"); log.debug("[TASK] Flush pubsub");
PubSubPersistenceManager.flushItems(); PubSubPersistenceManager.flushItems(false); // just this member
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment