Commit c4ae3493 authored by Robin Collier's avatar Robin Collier Committed by rcollier

OF-39 Removal of in memory caching for pubsub

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/pubsub_clustering@12969 b35dd754-fafc-0310-a699-88a17e54d16e
parent 47d48a8b
...@@ -356,7 +356,7 @@ public class LeafNode extends Node { ...@@ -356,7 +356,7 @@ public class LeafNode extends Node {
@Override @Override
public List<PublishedItem> getPublishedItems(int recentItems) { public List<PublishedItem> getPublishedItems(int recentItems) {
return PubSubPersistenceManager.getPublishedItems(this, Math.min(getMaxPublishedItems(), recentItems)); return PubSubPersistenceManager.getPublishedItems(this, recentItems);
} }
@Override @Override
......
...@@ -42,6 +42,7 @@ import org.dom4j.io.SAXReader; ...@@ -42,6 +42,7 @@ import org.dom4j.io.SAXReader;
import org.jivesoftware.database.DbConnectionManager; import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.openfire.pubsub.models.AccessModel; import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel; import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LinkedList; import org.jivesoftware.util.LinkedList;
import org.jivesoftware.util.LinkedListNode; import org.jivesoftware.util.LinkedListNode;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
...@@ -144,7 +145,7 @@ public class PubSubPersistenceManager { ...@@ -144,7 +145,7 @@ public class PubSubPersistenceManager {
"WHERE serviceID=? ORDER BY creationDate"; "WHERE serviceID=? ORDER BY creationDate";
private static final String LOAD_ITEMS = private static final String LOAD_ITEMS =
"SELECT id,jid,creationDate,payload FROM ofPubsubItem " + "SELECT id,jid,creationDate,payload FROM ofPubsubItem " +
"WHERE serviceID=? AND nodeID=? ORDER BY creationDate"; "WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC";
private static final String LOAD_ITEM = private static final String LOAD_ITEM =
"SELECT jid,creationDate,payload FROM ofPubsubItem " + "SELECT jid,creationDate,payload FROM ofPubsubItem " +
"WHERE serviceID=? AND nodeID=? AND id=?"; "WHERE serviceID=? AND nodeID=? AND id=?";
...@@ -179,13 +180,15 @@ public class PubSubPersistenceManager { ...@@ -179,13 +180,15 @@ public class PubSubPersistenceManager {
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
private static Timer flushTimer = new Timer("Pubsub item flush timer"); private static Timer flushTimer = new Timer("Pubsub item flush timer");
private static long timerDelay = 1000 * 120; private static long timerDelay = JiveGlobals.getIntProperty("xmpp.pubsub.flush.timer", 120) * 1000;
private static final int POOL_SIZE = 50; private static final int POOL_SIZE = 50;
private static final int ITEM_CACHE_SIZE = 100; private static final int ITEM_CACHE_SIZE = 100;
private static int publishedItemSize = 0; private static int publishedItemSize = 0;
private static final Object itemLock = new Object(); private static final Object itemLock = new Object();
private static final int MAX_ROWS_FETCH = JiveGlobals.getIntProperty("xmpp.pubsub.fetch.max", 2000);
/** /**
* Queue that holds the items that need to be added to the database. * Queue that holds the items that need to be added to the database.
*/ */
...@@ -215,6 +218,10 @@ public class PubSubPersistenceManager { ...@@ -215,6 +218,10 @@ public class PubSubPersistenceManager {
xmlReaders.add(xmlReader); xmlReaders.add(xmlReader);
} }
// Enforce a min of 20s
if (timerDelay < 20000)
timerDelay = 20000;
flushTimer.schedule(new TimerTask() { flushTimer.schedule(new TimerTask() {
@Override @Override
...@@ -987,6 +994,14 @@ public class PubSubPersistenceManager { ...@@ -987,6 +994,14 @@ public class PubSubPersistenceManager {
public static void savePublishedItem(PublishedItem item) { public static void savePublishedItem(PublishedItem item) {
synchronized(itemLock) synchronized(itemLock)
{ {
LinkedListNode nodeToReplace = itemMap.get(item.getID());
if (nodeToReplace != null)
{
nodeToReplace.remove();
publishedItemSize--;
}
LinkedListNode listNode = itemsToAdd.addLast(item); LinkedListNode listNode = itemsToAdd.addLast(item);
itemMap.put(item.getID(), listNode); itemMap.put(item.getID(), listNode);
publishedItemSize++; publishedItemSize++;
...@@ -1002,7 +1017,6 @@ public class PubSubPersistenceManager { ...@@ -1002,7 +1017,6 @@ public class PubSubPersistenceManager {
public static void flushItems() public static void flushItems()
{ {
Log.debug("Flushing items to database"); Log.debug("Flushing items to database");
System.out.println("Flushing " + publishedItemSize + " items to database");
LinkedList addList = null; LinkedList addList = null;
LinkedList delList = null; LinkedList delList = null;
...@@ -1014,23 +1028,34 @@ public class PubSubPersistenceManager { ...@@ -1014,23 +1028,34 @@ public class PubSubPersistenceManager {
delList = itemsToDelete; delList = itemsToDelete;
itemsToAdd = new LinkedList(); itemsToAdd = new LinkedList();
delList = new LinkedList(); itemsToDelete = new LinkedList();
itemMap.clear(); itemMap.clear();
publishedItemSize = 0; publishedItemSize = 0;
} }
LinkedListNode addItem = addList.getFirst();
LinkedListNode delItem = delList.getFirst();
// Check to see if there is anything to actually do.
if ((addItem == null) && (delItem == null))
return;
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
try { try {
con = DbConnectionManager.getTransactionConnection(); con = DbConnectionManager.getTransactionConnection();
// Add all items that were cached // Add all items that were cached
LinkedListNode itemNode = addList.getFirst();
while (itemNode != null) if (addItem != null)
{
LinkedListNode addHead = addList.getLast().next;
while (addItem != addHead)
{ {
PublishedItem item = (PublishedItem) itemNode.object; PublishedItem item = (PublishedItem) addItem.object;
pstmt = con.prepareStatement(ADD_ITEM); pstmt = con.prepareStatement(ADD_ITEM);
pstmt.setString(1, item.getNode().getService().getServiceID()); pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID())); pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
...@@ -1040,22 +1065,25 @@ public class PubSubPersistenceManager { ...@@ -1040,22 +1065,25 @@ public class PubSubPersistenceManager {
pstmt.setString(6, item.getPayloadXML()); pstmt.setString(6, item.getPayloadXML());
pstmt.executeUpdate(); pstmt.executeUpdate();
itemNode = itemNode.next; addItem = addItem.next;
}
} }
// Delete all items that were cached if (delItem != null)
itemNode = delList.getFirst(); {
LinkedListNode delHead = delList.getLast().next;
while (itemNode != null) while (delItem != delHead)
{ {
PublishedItem item = (PublishedItem) itemNode.object; PublishedItem item = (PublishedItem) delItem.object;
pstmt = con.prepareStatement(DELETE_ITEM); pstmt = con.prepareStatement(DELETE_ITEM);
pstmt.setString(1, item.getNode().getService().getServiceID()); pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID())); pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID()); pstmt.setString(3, item.getID());
pstmt.executeUpdate(); pstmt.executeUpdate();
itemNode = itemNode.next; delItem = delItem.next;
}
} }
con.commit(); con.commit();
...@@ -1256,7 +1284,17 @@ public class PubSubPersistenceManager { ...@@ -1256,7 +1284,17 @@ public class PubSubPersistenceManager {
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
SAXReader xmlReader = null; SAXReader xmlReader = null;
List<PublishedItem> results = new ArrayList<PublishedItem>(node.getMaxPublishedItems()); int max = MAX_ROWS_FETCH;
int maxPublished = node.getMaxPublishedItems();
// Limit the max rows until a solution is in place with Result Set Management
if (maxRows != -1)
max = maxPublished == -1 ? Math.min(maxRows, MAX_ROWS_FETCH) : Math.min(maxRows, maxPublished);
else if (maxPublished != -1)
max = Math.min(MAX_ROWS_FETCH, maxPublished);
// We don't know how many items are in the db, so we will start with an allocation of 500
List<PublishedItem> results = new ArrayList<PublishedItem>(max);
try { try {
// Get a sax reader from the pool // Get a sax reader from the pool
...@@ -1264,12 +1302,14 @@ public class PubSubPersistenceManager { ...@@ -1264,12 +1302,14 @@ public class PubSubPersistenceManager {
con = DbConnectionManager.getConnection(); con = DbConnectionManager.getConnection();
// Get published items of the specified node // Get published items of the specified node
pstmt = con.prepareStatement(LOAD_ITEMS); pstmt = con.prepareStatement(LOAD_ITEMS);
pstmt.setMaxRows(node.getMaxPublishedItems()); pstmt.setMaxRows(max);
pstmt.setString(1, node.getService().getServiceID()); pstmt.setString(1, node.getService().getServiceID());
pstmt.setString(2, encodeNodeID(node.getNodeID())); pstmt.setString(2, encodeNodeID(node.getNodeID()));
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
int counter = 0;
// Rebuild loaded published items // Rebuild loaded published items
while(rs.next()) { while(rs.next() && (counter < max)) {
String itemID = rs.getString(1); String itemID = rs.getString(1);
JID publisher = new JID(rs.getString(2)); JID publisher = new JID(rs.getString(2));
Date creationDate = new Date(Long.parseLong(rs.getString(3).trim())); Date creationDate = new Date(Long.parseLong(rs.getString(3).trim()));
...@@ -1282,6 +1322,7 @@ public class PubSubPersistenceManager { ...@@ -1282,6 +1322,7 @@ public class PubSubPersistenceManager {
} }
// Add the published item to the node // Add the published item to the node
results.add(item); results.add(item);
counter++;
} }
} }
catch (Exception sqle) { catch (Exception sqle) {
...@@ -1296,7 +1337,8 @@ public class PubSubPersistenceManager { ...@@ -1296,7 +1337,8 @@ public class PubSubPersistenceManager {
} }
if (results.size() == 0) if (results.size() == 0)
return Collections.emptyList(); results = Collections.emptyList();
return results; return results;
} }
...@@ -1350,6 +1392,7 @@ public class PubSubPersistenceManager { ...@@ -1350,6 +1392,7 @@ public class PubSubPersistenceManager {
} }
public static PublishedItem getPublishedItem(LeafNode node, String itemID) { public static PublishedItem getPublishedItem(LeafNode node, String itemID) {
flushItems();
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
...@@ -1392,6 +1435,7 @@ public class PubSubPersistenceManager { ...@@ -1392,6 +1435,7 @@ public class PubSubPersistenceManager {
public static void purgeNode(LeafNode leafNode) { public static void purgeNode(LeafNode leafNode) {
flushItems();
// Remove published items of the node being deleted // Remove published items of the node being deleted
Connection con = null; Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment