Commit 7c464bde authored by Robin Collier's avatar Robin Collier Committed by rcollier

OF-205 Changed transaction control for several methods for persistence. Also...

OF-205 Changed transaction control for several methods for persistence.  Also reversed the search order for loading items to compliant with the spec.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13306 b35dd754-fafc-0310-a699-88a17e54d16e
parent efe7d498
...@@ -157,7 +157,7 @@ public class PubSubPersistenceManager { ...@@ -157,7 +157,7 @@ public class PubSubPersistenceManager {
"DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=?"; "DELETE FROM ofPubsubSubscription WHERE serviceID=? AND nodeID=?";
private static final String LOAD_ITEMS = private static final String LOAD_ITEMS =
"SELECT id,jid,creationDate,payload FROM ofPubsubItem " + "SELECT id,jid,creationDate,payload FROM ofPubsubItem " +
"WHERE serviceID=? AND nodeID=? ORDER BY creationDate"; "WHERE serviceID=? AND nodeID=? ORDER BY creationDate DESC";
private static final String LOAD_ITEM = private static final String LOAD_ITEM =
"SELECT jid,creationDate,payload FROM ofPubsubItem " + "SELECT jid,creationDate,payload FROM ofPubsubItem " +
"WHERE serviceID=? AND nodeID=? AND id=?"; "WHERE serviceID=? AND nodeID=? AND id=?";
...@@ -497,7 +497,10 @@ public class PubSubPersistenceManager { ...@@ -497,7 +497,10 @@ public class PubSubPersistenceManager {
DbConnectionManager.fastcloseStmt(pstmt); DbConnectionManager.fastcloseStmt(pstmt);
// Remove published items of the node being deleted // Remove published items of the node being deleted
if (node instanceof LeafNode) { purgeNode((LeafNode)node); } if (node instanceof LeafNode)
{
purgeNode((LeafNode) node, con);
}
// Remove all affiliates from the table of node affiliates // Remove all affiliates from the table of node affiliates
pstmt = con.prepareStatement(DELETE_AFFILIATIONS); pstmt = con.prepareStatement(DELETE_AFFILIATIONS);
...@@ -1129,6 +1132,27 @@ public class PubSubPersistenceManager { ...@@ -1129,6 +1132,27 @@ public class PubSubPersistenceManager {
*/ */
public static void flushPendingItems(boolean sendToCluster) public static void flushPendingItems(boolean sendToCluster)
{ {
Connection con = null;
boolean rollback = false;
try
{
con = DbConnectionManager.getTransactionConnection();
flushPendingItems(sendToCluster, con);
}
catch (Exception e)
{
log.error("Failed to flush pending items", e);
rollback = true;
}
finally
{
DbConnectionManager.closeTransactionConnection(con, rollback);
}
}
private static void flushPendingItems(boolean sendToCluster, Connection con) throws SQLException
{
if (sendToCluster) { if (sendToCluster) {
CacheFactory.doSynchronousClusterTask(new FlushTask(), false); CacheFactory.doSynchronousClusterTask(new FlushTask(), false);
} }
...@@ -1141,7 +1165,6 @@ public class PubSubPersistenceManager { ...@@ -1141,7 +1165,6 @@ public class PubSubPersistenceManager {
log.debug("Flush " + itemsPending.size() + " pending items to database"); log.debug("Flush " + itemsPending.size() + " pending items to database");
} }
boolean abortTransaction = false;
LinkedList addList = null; LinkedList addList = null;
LinkedList delList = null; LinkedList delList = null;
...@@ -1178,14 +1201,11 @@ public class PubSubPersistenceManager { ...@@ -1178,14 +1201,11 @@ public class PubSubPersistenceManager {
if ((addItem == null) && (delItem == null)) if ((addItem == null) && (delItem == null))
return; return;
Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
// delete first (to remove possible duplicates), then add new items // delete first (to remove possible duplicates), then add new items
if (delItem != null) { if (delItem != null) {
try { try {
con = DbConnectionManager.getTransactionConnection();
LinkedListNode delHead = delList.getLast().next; LinkedListNode delHead = delList.getLast().next;
pstmt = con.prepareStatement(DELETE_ITEM); pstmt = con.prepareStatement(DELETE_ITEM);
...@@ -1201,20 +1221,14 @@ public class PubSubPersistenceManager { ...@@ -1201,20 +1221,14 @@ public class PubSubPersistenceManager {
} }
pstmt.executeBatch(); pstmt.executeBatch();
} }
catch (SQLException sqle)
{
log.error(sqle.getMessage(), sqle);
abortTransaction = true;
}
finally finally
{ {
DbConnectionManager.closeTransactionConnection(pstmt, con, abortTransaction); DbConnectionManager.closeStatement(pstmt);
} }
} }
if (addItem != null) { if (addItem != null) {
try { try {
con = DbConnectionManager.getTransactionConnection();
LinkedListNode addHead = addList.getLast().next; LinkedListNode addHead = addList.getLast().next;
pstmt = con.prepareStatement(ADD_ITEM); pstmt = con.prepareStatement(ADD_ITEM);
...@@ -1233,14 +1247,9 @@ public class PubSubPersistenceManager { ...@@ -1233,14 +1247,9 @@ public class PubSubPersistenceManager {
} }
pstmt.executeBatch(); pstmt.executeBatch();
} }
catch (SQLException sqle)
{
log.error(sqle.getMessage(), sqle);
abortTransaction = true;
}
finally finally
{ {
DbConnectionManager.closeTransactionConnection(pstmt, con, abortTransaction); DbConnectionManager.closeStatement(pstmt);
} }
} }
} }
...@@ -1592,35 +1601,75 @@ public class PubSubPersistenceManager { ...@@ -1592,35 +1601,75 @@ public class PubSubPersistenceManager {
return result; return result;
} }
public static void purgeNode(LeafNode leafNode)
{
Connection con = null;
boolean rollback = false;
public static void purgeNode(LeafNode leafNode) { try
flushPendingItems(); {
con = DbConnectionManager.getTransactionConnection();
purgeNode(leafNode, con);
// Delete all the entries from the itemsToAdd list and pending map
// that match this node.
synchronized (itemsPending)
{
Iterator<Map.Entry<String, LinkedListNode>> pendingIt = itemsPending.entrySet().iterator();
while (pendingIt.hasNext())
{
LinkedListNode itemNode = pendingIt.next().getValue();
if (((PublishedItem) itemNode.object).getNodeID().equals(leafNode.getNodeID()))
{
itemNode.remove();
pendingIt.remove();
}
}
}
}
catch (SQLException exc)
{
log.error(exc.getMessage(), exc);
rollback = true;
}
finally
{
DbConnectionManager.closeTransactionConnection(con, rollback);
}
}
private static void purgeNode(LeafNode leafNode, Connection con) throws SQLException
{
flushPendingItems(ClusterManager.isClusteringEnabled(), con);
// Remove published items of the node being deleted // Remove published items of the node being deleted
Connection con = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
try { try
con = DbConnectionManager.getConnection(); {
pstmt = con.prepareStatement(DELETE_ITEMS); pstmt = con.prepareStatement(DELETE_ITEMS);
pstmt.setString(1, leafNode.getService().getServiceID()); pstmt.setString(1, leafNode.getService().getServiceID());
pstmt.setString(2, encodeNodeID(leafNode.getNodeID())); pstmt.setString(2, encodeNodeID(leafNode.getNodeID()));
pstmt.executeUpdate(); pstmt.executeUpdate();
}
// drop cached items for purged node finally
synchronized(itemCache) { {
for (PublishedItem item : itemCache.values()) { DbConnectionManager.closeStatement(pstmt);
if (leafNode.getNodeID().equals(item.getNodeID())) { }
itemCache.remove(item.getItemKey());
} // drop cached items for purged node
} synchronized (itemCache)
{
for (PublishedItem item : itemCache.values())
{
if (leafNode.getNodeID().equals(item.getNodeID()))
{
itemCache.remove(item.getItemKey());
}
} }
} }
catch (Exception exc) {
log.error(exc.getMessage(), exc);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
}
} }
private static String encodeWithComma(Collection<String> strings) { private static String encodeWithComma(Collection<String> strings) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment