Commit 0c350eab authored by Robin Collier's avatar Robin Collier Committed by rcollier

OF-39 Added purging of stale pubsub items from db and improved performance of...

OF-39 Added purging of stale pubsub items from db and improved performance of flushing pubsub items.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/pubsub_clustering@13011 b35dd754-fafc-0310-a699-88a17e54d16e
parent 7f46f127
...@@ -57,7 +57,14 @@ import org.xmpp.packet.JID; ...@@ -57,7 +57,14 @@ import org.xmpp.packet.JID;
*/ */
public class PubSubPersistenceManager { public class PubSubPersistenceManager {
private static final Logger Log = LoggerFactory.getLogger(PubSubPersistenceManager.class); private static final Logger log = LoggerFactory.getLogger(PubSubPersistenceManager.class);
private static final String PURGE_FOR_SIZE =
"DELETE ofPubsubItem FROM ofPubsubItem LEFT JOIN " +
"(SELECT id FROM ofPubsubItem WHERE serviceID=? AND nodeID=? " +
"ORDER BY creationDate DESC LIMIT ?) AS noDelete " +
"ON ofPubsubItem.id = noDelete.id WHERE noDelete.id IS NULL AND "
+ "ofPubsubItem.serviceID = ? AND nodeID = ?";
private static final String LOAD_NON_LEAF_NODES = private static final String LOAD_NON_LEAF_NODES =
"SELECT nodeID, leaf, creationDate, modificationDate, parent, deliverPayloads, " + "SELECT nodeID, leaf, creationDate, modificationDate, parent, deliverPayloads, " +
...@@ -139,10 +146,6 @@ public class PubSubPersistenceManager { ...@@ -139,10 +146,6 @@ public class PubSubPersistenceManager {
"DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=? AND id=?"; "DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=? AND id=?";
private static final String DELETE_SUBSCRIPTIONS = private static final String DELETE_SUBSCRIPTIONS =
"DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=?"; "DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=?";
private static final String LOAD_ALL_ITEMS =
"SELECT id,jid,creationDate,payload,nodeID FROM ofPubsubItem " +
"WHERE serviceID=? ORDER BY creationDate";
private static final String LOAD_ITEMS = private static final String LOAD_ITEMS =
"SELECT id,jid,creationDate,payload FROM ofPubsubItem " + "SELECT id,jid,creationDate,payload FROM ofPubsubItem " +
"WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC"; "WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC";
...@@ -180,10 +183,14 @@ public class PubSubPersistenceManager { ...@@ -180,10 +183,14 @@ public class PubSubPersistenceManager {
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
private static Timer flushTimer = new Timer("Pubsub item flush timer"); private static Timer flushTimer = new Timer("Pubsub item flush timer");
private static long timerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.flush.timer", 120) * 1000; private static long flushTimerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.flush.timer", 120) * 1000;
private static Timer purgeTimer = new Timer("Pubsub purge stale items timer");
private static long purgeTimerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.purge.timer", 300) * 1000;
private static final int POOL_SIZE = 50; private static final int POOL_SIZE = 50;
private static final int ITEM_CACHE_SIZE = 100; private static final int ITEM_CACHE_SIZE = JiveGlobals.getIntProperty("xmpp.pubsub.flush.cache", 1000);
private static int publishedItemSize = 0; private static int publishedItemSize = 0;
private static final Object itemLock = new Object(); private static final Object itemLock = new Object();
...@@ -219,16 +226,30 @@ public class PubSubPersistenceManager { ...@@ -219,16 +226,30 @@ public class PubSubPersistenceManager {
} }
// Enforce a min of 20s // Enforce a min of 20s
if (timerDelay < 20000) if (flushTimerDelay < 20000)
timerDelay = 20000; flushTimerDelay = 20000;
flushTimer.schedule(new TimerTask() {
flushTimer.schedule(new TimerTask()
{
@Override @Override
public void run() { public void run()
{
flushItems(); flushItems();
} }
}, timerDelay, timerDelay); }, flushTimerDelay, flushTimerDelay);
// Enforce a min of 20s
if (purgeTimerDelay < 60000)
purgeTimerDelay = 60000;
purgeTimer.schedule(new TimerTask()
{
@Override
public void run()
{
purgeItems();
}
}, purgeTimerDelay, purgeTimerDelay);
} }
/** /**
...@@ -297,7 +318,7 @@ public class PubSubPersistenceManager { ...@@ -297,7 +318,7 @@ public class PubSubPersistenceManager {
saveAssociatedElements(con, node); saveAssociatedElements(con, node);
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
abortTransaction = true; abortTransaction = true;
} }
finally { finally {
...@@ -383,7 +404,7 @@ public class PubSubPersistenceManager { ...@@ -383,7 +404,7 @@ public class PubSubPersistenceManager {
saveAssociatedElements(con, node); saveAssociatedElements(con, node);
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
abortTransaction = true; abortTransaction = true;
} }
finally { finally {
...@@ -496,7 +517,7 @@ public class PubSubPersistenceManager { ...@@ -496,7 +517,7 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate(); pstmt.executeUpdate();
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
abortTransaction = true; abortTransaction = true;
} }
finally { finally {
...@@ -577,19 +598,9 @@ public class PubSubPersistenceManager { ...@@ -577,19 +598,9 @@ public class PubSubPersistenceManager {
loadSubscriptions(service, nodes, rs); loadSubscriptions(service, nodes, rs);
} }
DbConnectionManager.fastcloseStmt(rs, pstmt); DbConnectionManager.fastcloseStmt(rs, pstmt);
// TODO We may need to optimize memory consumption and load items on-demand
// Load published items of all nodes
pstmt = con.prepareStatement(LOAD_ALL_ITEMS);
pstmt.setString(1, service.getServiceID());
rs = pstmt.executeQuery();
// Add to each node the correspondiding subscriptions
while(rs.next()) {
loadItems(nodes, rs);
}
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
DbConnectionManager.closeConnection(rs, pstmt, con); DbConnectionManager.closeConnection(rs, pstmt, con);
...@@ -618,7 +629,7 @@ public class PubSubPersistenceManager { ...@@ -618,7 +629,7 @@ public class PubSubPersistenceManager {
parentNode = (CollectionNode) loadedNodes.get(parent); parentNode = (CollectionNode) loadedNodes.get(parent);
if (parentNode == null) { if (parentNode == null) {
// Parent is not in memory so try to load it // Parent is not in memory so try to load it
Log.warn("Node not loaded due to missing parent. NodeID: " + nodeID); log.warn("Node not loaded due to missing parent. NodeID: " + nodeID);
return; return;
} }
} }
...@@ -667,7 +678,7 @@ public class PubSubPersistenceManager { ...@@ -667,7 +678,7 @@ public class PubSubPersistenceManager {
loadedNodes.put(node.getNodeID(), node); loadedNodes.put(node.getNodeID(), node);
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
} }
...@@ -676,7 +687,7 @@ public class PubSubPersistenceManager { ...@@ -676,7 +687,7 @@ public class PubSubPersistenceManager {
String nodeID = decodeNodeID(rs.getString(1)); String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID); Node node = nodes.get(nodeID);
if (node == null) { if (node == null) {
Log.warn("JID associated to a non-existent node: " + nodeID); log.warn("JID associated to a non-existent node: " + nodeID);
return; return;
} }
JID jid = new JID(rs.getString(2)); JID jid = new JID(rs.getString(2));
...@@ -695,7 +706,7 @@ public class PubSubPersistenceManager { ...@@ -695,7 +706,7 @@ public class PubSubPersistenceManager {
} }
} }
catch (Exception ex) { catch (Exception ex) {
Log.error(ex.getMessage(), ex); log.error(ex.getMessage(), ex);
} }
} }
...@@ -704,13 +715,13 @@ public class PubSubPersistenceManager { ...@@ -704,13 +715,13 @@ public class PubSubPersistenceManager {
String nodeID = decodeNodeID(rs.getString(1)); String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID); Node node = nodes.get(nodeID);
if (node == null) { if (node == null) {
Log.warn("Roster Group associated to a non-existent node: " + nodeID); log.warn("Roster Group associated to a non-existent node: " + nodeID);
return; return;
} }
node.addAllowedRosterGroup(rs.getString(2)); node.addAllowedRosterGroup(rs.getString(2));
} }
catch (SQLException ex) { catch (SQLException ex) {
Log.error(ex.getMessage(), ex); log.error(ex.getMessage(), ex);
} }
} }
...@@ -719,7 +730,7 @@ public class PubSubPersistenceManager { ...@@ -719,7 +730,7 @@ public class PubSubPersistenceManager {
String nodeID = decodeNodeID(rs.getString(1)); String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID); Node node = nodes.get(nodeID);
if (node == null) { if (node == null) {
Log.warn("Affiliations found for a non-existent node: " + nodeID); log.warn("Affiliations found for a non-existent node: " + nodeID);
return; return;
} }
NodeAffiliate affiliate = new NodeAffiliate(node, new JID(rs.getString(2))); NodeAffiliate affiliate = new NodeAffiliate(node, new JID(rs.getString(2)));
...@@ -727,7 +738,7 @@ public class PubSubPersistenceManager { ...@@ -727,7 +738,7 @@ public class PubSubPersistenceManager {
node.addAffiliate(affiliate); node.addAffiliate(affiliate);
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
} }
...@@ -736,14 +747,14 @@ public class PubSubPersistenceManager { ...@@ -736,14 +747,14 @@ public class PubSubPersistenceManager {
String nodeID = decodeNodeID(rs.getString(1)); String nodeID = decodeNodeID(rs.getString(1));
Node node = nodes.get(nodeID); Node node = nodes.get(nodeID);
if (node == null) { if (node == null) {
Log.warn("Subscription found for a non-existent node: " + nodeID); log.warn("Subscription found for a non-existent node: " + nodeID);
return; return;
} }
String subID = rs.getString(2); String subID = rs.getString(2);
JID subscriber = new JID(rs.getString(3)); JID subscriber = new JID(rs.getString(3));
JID owner = new JID(rs.getString(4)); JID owner = new JID(rs.getString(4));
if (node.getAffiliate(owner) == null) { if (node.getAffiliate(owner) == null) {
Log.warn("Subscription found for a non-existent affiliate: " + owner + log.warn("Subscription found for a non-existent affiliate: " + owner +
" in node: " + nodeID); " in node: " + nodeID);
return; return;
} }
...@@ -766,43 +777,7 @@ public class PubSubPersistenceManager { ...@@ -766,43 +777,7 @@ public class PubSubPersistenceManager {
node.addSubscription(subscription); node.addSubscription(subscription);
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
}
}
private static void loadItems(Map<String, Node> nodes, ResultSet rs) {
SAXReader xmlReader = null;
try {
// Get a sax reader from the pool
xmlReader = xmlReaders.take();
String nodeID = decodeNodeID(rs.getString(5));
LeafNode node = (LeafNode) nodes.get(nodeID);
if (node == null) {
Log.warn("Published Item found for a non-existent node: " + nodeID);
return;
}
String itemID = rs.getString(1);
JID publisher = new JID(rs.getString(2));
Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
// Create the item
PublishedItem item = new PublishedItem(node, publisher, itemID, creationDate);
// Add the extra fields to the published item
if (rs.getString(4) != null) {
item.setPayload(
xmlReader.read(new StringReader(rs.getString(4))).getRootElement());
}
// Add the published item to the node
node.setLastPublishedItem(item);
}
catch (Exception sqle) {
Log.error(sqle.getMessage(), sqle);
}
finally {
// Return the sax reader to the pool
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
} }
} }
...@@ -839,7 +814,7 @@ public class PubSubPersistenceManager { ...@@ -839,7 +814,7 @@ public class PubSubPersistenceManager {
} }
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
...@@ -866,7 +841,7 @@ public class PubSubPersistenceManager { ...@@ -866,7 +841,7 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate(); pstmt.executeUpdate();
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
...@@ -951,7 +926,7 @@ public class PubSubPersistenceManager { ...@@ -951,7 +926,7 @@ public class PubSubPersistenceManager {
} }
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
...@@ -978,7 +953,7 @@ public class PubSubPersistenceManager { ...@@ -978,7 +953,7 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate(); pstmt.executeUpdate();
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
...@@ -1016,7 +991,8 @@ public class PubSubPersistenceManager { ...@@ -1016,7 +991,8 @@ public class PubSubPersistenceManager {
*/ */
public static void flushItems() public static void flushItems()
{ {
Log.debug("Flushing items to database"); log.debug("Flushing items to database");
LinkedList addList = null; LinkedList addList = null;
LinkedList delList = null; LinkedList delList = null;
...@@ -1043,40 +1019,41 @@ public class PubSubPersistenceManager { ...@@ -1043,40 +1019,41 @@ public class PubSubPersistenceManager {
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
try { try
{
con = DbConnectionManager.getTransactionConnection(); con = DbConnectionManager.getTransactionConnection();
// Add all items that were cached // Add all items that were cached
if (addItem != null) if (addItem != null)
{ {
LinkedListNode addHead = addList.getLast().next; LinkedListNode addHead = addList.getLast().next;
pstmt = con.prepareStatement(ADD_ITEM);
while (addItem != addHead) while (addItem != addHead)
{ {
PublishedItem item = (PublishedItem) addItem.object; PublishedItem item = (PublishedItem) addItem.object;
pstmt = con.prepareStatement(ADD_ITEM);
pstmt.setString(1, item.getNode().getService().getServiceID()); pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID())); pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID()); pstmt.setString(3, item.getID());
pstmt.setString(4, item.getPublisher().toString()); pstmt.setString(4, item.getPublisher().toString());
pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate())); pstmt.setString(5, StringUtils.dateToMillis(item.getCreationDate()));
pstmt.setString(6, item.getPayloadXML()); pstmt.setString(6, item.getPayloadXML());
pstmt.executeUpdate(); pstmt.addBatch();
addItem = addItem.next; addItem = addItem.next;
} }
pstmt.executeBatch();
} }
if (delItem != null) if (delItem != null)
{ {
LinkedListNode delHead = delList.getLast().next; LinkedListNode delHead = delList.getLast().next;
pstmt = con.prepareStatement(DELETE_ITEM);
while (delItem != delHead) while (delItem != delHead)
{ {
PublishedItem item = (PublishedItem) delItem.object; PublishedItem item = (PublishedItem) delItem.object;
pstmt = con.prepareStatement(DELETE_ITEM);
pstmt.setString(1, item.getNode().getService().getServiceID()); pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID())); pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID()); pstmt.setString(3, item.getID());
...@@ -1085,13 +1062,14 @@ public class PubSubPersistenceManager { ...@@ -1085,13 +1062,14 @@ public class PubSubPersistenceManager {
delItem = delItem.next; delItem = delItem.next;
} }
} }
con.commit(); con.commit();
} }
catch (SQLException sqle) { catch (SQLException sqle)
Log.error(sqle.getMessage(), sqle); {
log.error(sqle.getMessage(), sqle);
} }
finally { finally
{
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
} }
} }
...@@ -1160,7 +1138,7 @@ public class PubSubPersistenceManager { ...@@ -1160,7 +1138,7 @@ public class PubSubPersistenceManager {
} }
} }
catch (Exception sqle) { catch (Exception sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
DbConnectionManager.closeConnection(rs, pstmt, con); DbConnectionManager.closeConnection(rs, pstmt, con);
...@@ -1207,7 +1185,7 @@ public class PubSubPersistenceManager { ...@@ -1207,7 +1185,7 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate(); pstmt.executeUpdate();
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
...@@ -1253,7 +1231,7 @@ public class PubSubPersistenceManager { ...@@ -1253,7 +1231,7 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate(); pstmt.executeUpdate();
} }
catch (SQLException sqle) { catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
...@@ -1326,7 +1304,7 @@ public class PubSubPersistenceManager { ...@@ -1326,7 +1304,7 @@ public class PubSubPersistenceManager {
} }
} }
catch (Exception sqle) { catch (Exception sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
// Return the sax reader to the pool // Return the sax reader to the pool
...@@ -1379,7 +1357,7 @@ public class PubSubPersistenceManager { ...@@ -1379,7 +1357,7 @@ public class PubSubPersistenceManager {
} }
} }
catch (Exception sqle) { catch (Exception sqle) {
Log.error(sqle.getMessage(), sqle); log.error(sqle.getMessage(), sqle);
} }
finally { finally {
// Return the sax reader to the pool // Return the sax reader to the pool
...@@ -1422,7 +1400,7 @@ public class PubSubPersistenceManager { ...@@ -1422,7 +1400,7 @@ public class PubSubPersistenceManager {
} }
} }
catch (Exception exc) { catch (Exception exc) {
Log.error(exc.getMessage(), exc); log.error(exc.getMessage(), exc);
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
...@@ -1448,7 +1426,7 @@ public class PubSubPersistenceManager { ...@@ -1448,7 +1426,7 @@ public class PubSubPersistenceManager {
pstmt.executeUpdate(); pstmt.executeUpdate();
} }
catch (Exception exc) { catch (Exception exc) {
Log.error(exc.getMessage(), exc); log.error(exc.getMessage(), exc);
} }
finally { finally {
DbConnectionManager.closeConnection(pstmt, con); DbConnectionManager.closeConnection(pstmt, con);
...@@ -1497,31 +1475,59 @@ public class PubSubPersistenceManager { ...@@ -1497,31 +1475,59 @@ public class PubSubPersistenceManager {
return nodeID; return nodeID;
} }
public static void shutdown() { /**
// TODO Auto-generated method stub * Purges all items from the database that exceed the defined item count on
* all nodes.
*/
private static void purgeItems()
{
String persistentNodeQuery = "SELECT serviceID, nodeID, maxItems FROM ofPubsubNode WHERE "
+ "leaf=1 AND persistItems=1 AND maxItems > 0";
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
} try
{
con = DbConnectionManager.getTransactionConnection();
static class ItemHolder PreparedStatement nodeConfig = con.prepareStatement(persistentNodeQuery);
rs = nodeConfig.executeQuery();
PreparedStatement purgeNode = con.prepareStatement(PURGE_FOR_SIZE);
while (rs.next())
{ {
volatile private PublishedItem item; String svcId = rs.getString(1);
String nodeId = rs.getString(2);
int maxItems = rs.getInt(3);
purgeNode.setString(1, svcId);
purgeNode.setString(2, nodeId);
purgeNode.setInt(3, maxItems);
purgeNode.setString(4, svcId);
purgeNode.setString(5, nodeId);
ItemHolder(PublishedItem itemToStore) int rowsAffected = purgeNode.executeUpdate();
if (log.isDebugEnabled())
log.debug(rowsAffected + " deleted rows from service: " + svcId + " nodeId: " + nodeId);
}
}
catch (Exception sqle)
{ {
item = itemToStore; sqle.printStackTrace();
log.error(sqle.getMessage(), sqle);
} }
finally
/**
* Remove the item since it has been marked for deletion, therefore it won't get stored.
*/
void delete()
{ {
item = null; DbConnectionManager.closeConnection(rs, pstmt, con);
}
} }
PublishedItem getItem() public static void shutdown()
{ {
return item; flushItems();
} purgeItems();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment