Commit dd960eeb authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-668: Ensure Pubsub write cache is persisted during Openfire shutdown

- Improved exception handling during shutdown
- Refactored LinkedList / LinkedListNode to use generics
- Refactored PublishedItem to remove persistence-specific retry attribute
- Added RetryWrapper to improve pubsub item persistence

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13651 b35dd754-fafc-0310-a699-88a17e54d16e
parent 33b5e8c0
......@@ -42,6 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.dom4j.Document;
import org.dom4j.io.SAXReader;
import org.eclipse.jetty.util.log.Log;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.openfire.admin.AdminManager;
import org.jivesoftware.openfire.audit.AuditManager;
......@@ -361,6 +362,7 @@ public class XMPPServer {
}
if (isStandAlone()) {
Log.info("Registering shutdown hook (standalone mode)");
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread());
}
......@@ -682,6 +684,7 @@ public class XMPPServer {
* inside of another server.
*/
public void stop() {
Log.info("Initiating shutdown ...");
// Only do a system exit if we're running standalone
if (isStandAlone()) {
// if we're in a wrapper, we have to tell the wrapper to shut us down
......@@ -934,24 +937,42 @@ public class XMPPServer {
ClusterManager.shutdown();
// Notify server listeners that the server is about to be stopped
for (XMPPServerListener listener : listeners) {
try {
listener.serverStopping();
} catch (Exception ex) {
Log.error("Exception during listener shutdown", ex);
}
}
// If we don't have modules then the server has already been shutdown
if (modules.isEmpty()) {
return;
}
Log.info("Shutting down " + modules.size() + " modules ...");
// Get all modules and stop and destroy them
for (Module module : modules.values()) {
try {
module.stop();
module.destroy();
} catch (Exception ex) {
Log.error("Exception during module shutdown", ex);
}
}
// Stop all plugins
Log.info("Shutting down plugins ...");
if (pluginManager != null) {
try {
pluginManager.shutdown();
} catch (Exception ex) {
Log.error("Exception during plugin shutdown", ex);
}
}
modules.clear();
// Stop the Db connection manager.
try {
DbConnectionManager.destroyConnectionProvider();
} catch (Exception ex) {
Log.error("Exception during DB shutdown", ex);
}
// Shutdown the task engine.
TaskEngine.getInstance().shutdown();
......
......@@ -228,20 +228,21 @@ public class PubSubPersistenceManager {
private static final int MAX_ITEM_RETRY = JiveGlobals.getIntProperty("xmpp.pubsub.item.retry", 1);
/**
* Queue that holds the items that need to be added to the database.
* Queue that holds the (wrapped) items that need to be added to the database.
*/
private static LinkedList itemsToAdd = new LinkedList();
private static LinkedList<RetryWrapper> itemsToAdd = new LinkedList<RetryWrapper>();
/**
* Queue that holds the items that need to be deleted from the database.
*/
private static LinkedList itemsToDelete = new LinkedList();
private static LinkedList<PublishedItem> itemsToDelete = new LinkedList<PublishedItem>();
/**
* Keeps reference to published items that haven't been persisted yet so they can be removed
* before being deleted.
* Keeps reference to published items that haven't been persisted yet so they
* can be removed before being deleted. Note these items are wrapped via the
* RetryWrapper to allow multiple persistence attempts when needed.
*/
private static final HashMap<String, LinkedListNode> itemsPending = new HashMap<String, LinkedListNode>();
private static final HashMap<String, LinkedListNode<RetryWrapper>> itemsPending = new HashMap<String, LinkedListNode<RetryWrapper>>();
/**
* Cache name for recently accessed published items.
......@@ -1136,34 +1137,50 @@ public class PubSubPersistenceManager {
* @param item The published item to save.
*/
public static void savePublishedItem(PublishedItem item) {
savePublishedItem(item, false);
savePublishedItem(new RetryWrapper(item));
}
/**
* Creates and stores the published item in the database.
* @param item The published item to save.
* @param isRetry True if this pass is for an item persistence retry
* @param wrapper The published item, wrapped for retry
*/
private static void savePublishedItem(PublishedItem item, boolean isRetry) {
private static void savePublishedItem(RetryWrapper wrapper) {
boolean firstPass = (wrapper.getRetryCount() == 0);
PublishedItem item = wrapper.get();
String itemKey = item.getItemKey();
itemCache.put(itemKey, item);
log.debug("Added new (inbound) item to cache");
synchronized (itemsPending) {
LinkedListNode itemToReplace = itemsPending.remove(itemKey);
LinkedListNode<RetryWrapper> itemToReplace = itemsPending.remove(itemKey);
if (itemToReplace != null) {
itemToReplace.remove(); // remove duplicate from itemsToAdd linked list
}
LinkedListNode listNode = isRetry ? itemsToAdd.addFirst(item) : itemsToAdd.addLast(item);
LinkedListNode<RetryWrapper> listNode = firstPass ?
itemsToAdd.addLast(wrapper) :
itemsToAdd.addFirst(wrapper);
itemsPending.put(itemKey, listNode);
}
// don't flush pending items immediately if this is a retry attempt
if (!isRetry && itemsPending.size() > MAX_ITEMS_FLUSH) {
// skip the flush step if this is a retry attempt
if (firstPass && itemsPending.size() > MAX_ITEMS_FLUSH) {
TaskEngine.getInstance().submit(new Runnable() {
public void run() { flushPendingItems(false); }
});
}
}
/**
* This class is used internally to wrap PublishedItems. It adds
* a retry counter for the persistence exception handling logic.
*/
private static class RetryWrapper {
private PublishedItem item;
private volatile transient int retryCount = 0;
public RetryWrapper(PublishedItem item) { this.item = item; }
public PublishedItem get() { return item; }
public int getRetryCount() { return retryCount; }
public int nextRetry() { return ++retryCount; }
}
/**
* Flush the cache(s) of items to be persisted (itemsToAdd) and deleted (itemsToDelete).
*/
......@@ -1189,8 +1206,8 @@ public class PubSubPersistenceManager {
Connection con = null;
boolean rollback = false;
LinkedList addList = null;
LinkedList delList = null;
LinkedList<RetryWrapper> addList = null;
LinkedList<PublishedItem> delList = null;
// Swap pending items so we can parse and save the contents from this point in time
// while not blocking new entries from being cached.
......@@ -1199,8 +1216,8 @@ public class PubSubPersistenceManager {
addList = itemsToAdd;
delList = itemsToDelete;
itemsToAdd = new LinkedList();
itemsToDelete = new LinkedList();
itemsToAdd = new LinkedList<RetryWrapper>();
itemsToDelete = new LinkedList<PublishedItem>();
// Ensure pending items are available via the item read cache;
// this allows the item(s) to be fetched by other request threads
......@@ -1208,7 +1225,7 @@ public class PubSubPersistenceManager {
int copied = 0;
for (String key : itemsPending.keySet()) {
if (!itemCache.containsKey(key)) {
itemCache.put(key, (PublishedItem) itemsPending.get(key).object);
itemCache.put(key, (((RetryWrapper)itemsPending.get(key).object)).get());
copied++;
}
}
......@@ -1232,9 +1249,9 @@ public class PubSubPersistenceManager {
} catch (SQLException se) {
log.error("Failed to flush pending items; initiating rollback", se);
// return new items to the write cache
LinkedListNode node = addList.getLast();
LinkedListNode<RetryWrapper> node = addList.getLast();
while (node != null) {
savePublishedItem((PublishedItem)node.object, true);
savePublishedItem(node.object);
node.remove();
node = addList.getLast();
}
......@@ -1251,10 +1268,10 @@ public class PubSubPersistenceManager {
* @param delList
* @throws SQLException
*/
private static void writePendingItems(Connection con, LinkedList addList, LinkedList delList) throws SQLException
private static void writePendingItems(Connection con, LinkedList<RetryWrapper> addList, LinkedList<PublishedItem> delList) throws SQLException
{
LinkedListNode addItem = addList.getFirst();
LinkedListNode delItem = delList.getFirst();
LinkedListNode<RetryWrapper> addItem = addList.getFirst();
LinkedListNode<PublishedItem> delItem = delList.getFirst();
// is there anything to do?
if ((addItem == null) && (delItem == null)) { return; }
......@@ -1265,9 +1282,9 @@ public class PubSubPersistenceManager {
// ensure there are no duplicates by deleting before adding
if (addItem != null) {
LinkedListNode addHead = addItem.previous;
LinkedListNode<RetryWrapper> addHead = addItem.previous;
while (addItem != addHead) {
delList.addLast(addItem.object);
delList.addLast(addItem.object.get());
addItem = addItem.next;
}
}
......@@ -1277,12 +1294,12 @@ public class PubSubPersistenceManager {
if (delItem != null) {
PreparedStatement pstmt = null;
try {
LinkedListNode delHead = delItem.previous;
LinkedListNode<PublishedItem> delHead = delItem.previous;
pstmt = con.prepareStatement(DELETE_ITEM);
while (delItem != delHead)
{
PublishedItem item = (PublishedItem) delItem.object;
PublishedItem item = delItem.object;
pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNode().getNodeID()));
pstmt.setString(3, item.getID());
......@@ -1315,17 +1332,19 @@ public class PubSubPersistenceManager {
* @param batch
* @throws SQLException
*/
private static void writePendingItems(Connection con, LinkedListNode addItem, boolean batch) throws SQLException
private static void writePendingItems(Connection con, LinkedListNode<RetryWrapper> addItem, boolean batch) throws SQLException
{
if (addItem == null) { return; }
LinkedListNode addHead = addItem.previous;
LinkedListNode<RetryWrapper> addHead = addItem.previous;
PreparedStatement pstmt = null;
RetryWrapper wrappedItem = null;
PublishedItem item = null;
try {
pstmt = con.prepareStatement(ADD_ITEM);
while (addItem != addHead)
{
item = (PublishedItem) addItem.object;
wrappedItem = addItem.object;
item = wrappedItem.get();
pstmt.setString(1, item.getNode().getService().getServiceID());
pstmt.setString(2, encodeNodeID(item.getNodeID()));
pstmt.setString(3, item.getID());
......@@ -1338,9 +1357,9 @@ public class PubSubPersistenceManager {
catch (SQLException se) {
// individual item could not be persisted; retry (up to MAX_ITEM_RETRY attempts)
String itemKey = item.getItemKey();
if (item.getRetryCount() < MAX_ITEM_RETRY) {
if (wrappedItem.nextRetry() < MAX_ITEM_RETRY) {
log.warn("Failed to persist published item (will retry): " + itemKey);
savePublishedItem(item, true);
savePublishedItem(wrappedItem);
} else {
// all hope is lost ... item will be dropped
log.error("Published item could not be written to database: " + itemKey + "\n" + item.getPayloadXML(), se);
......@@ -1370,7 +1389,7 @@ public class PubSubPersistenceManager {
synchronized (itemsPending)
{
itemsToDelete.addLast(item);
LinkedListNode itemToAdd = itemsPending.remove(itemKey);
LinkedListNode<RetryWrapper> itemToAdd = itemsPending.remove(itemKey);
if (itemToAdd != null)
itemToAdd.remove(); // drop from itemsToAdd linked list
}
......@@ -1728,13 +1747,13 @@ public class PubSubPersistenceManager {
// that match this node.
synchronized (itemsPending)
{
Iterator<Map.Entry<String, LinkedListNode>> pendingIt = itemsPending.entrySet().iterator();
Iterator<Map.Entry<String, LinkedListNode<RetryWrapper>>> pendingIt = itemsPending.entrySet().iterator();
while (pendingIt.hasNext())
{
LinkedListNode itemNode = pendingIt.next().getValue();
LinkedListNode<RetryWrapper> itemNode = pendingIt.next().getValue();
if (((PublishedItem) itemNode.object).getNodeID().equals(leafNode.getNodeID()))
if (itemNode.object.get().getNodeID().equals(leafNode.getNodeID()))
{
itemNode.remove();
pendingIt.remove();
......@@ -1908,6 +1927,7 @@ public class PubSubPersistenceManager {
public static void shutdown()
{
log.info("Flushing write cache to database");
flushPendingItems(false); // local member only
// node cleanup (skip when running as a cluster)
......
......@@ -101,10 +101,6 @@ public class PublishedItem implements Serializable {
* XML representation of the payload (for serialization)
*/
private String payloadXML;
/**
* Persistence retry counter
*/
private volatile transient int retryCount = 0;
/**
* Creates a published item
......@@ -292,14 +288,6 @@ public class PublishedItem implements Serializable {
return getItemKey(nodeId,id);
}
/**
* Returns (and increments) the item persistence retry counter
* @return Number of attempts made to persist this item to the DB
*/
public int getRetryCount() {
return retryCount++;
}
/**
* Returns a string that uniquely identifies this published item
* in the following format: <i>nodeId:itemId</i>
......
......@@ -27,20 +27,21 @@ package org.jivesoftware.util;
* reference to the node that is to be deleted.<p>
*
* @author Jive Software
* @param <E>
*/
public class LinkedList {
public class LinkedList<E> {
/**
* The root of the list keeps a reference to both the first and last
* elements of the list.
*/
private LinkedListNode head;
private LinkedListNode<E> head;
/**
* Creates a new linked list.
*/
public LinkedList() {
head = new LinkedListNode("head");
head = new LinkedListNode<E>();
}
/**
......@@ -48,8 +49,8 @@ public class LinkedList {
*
* @return the first element of the list.
*/
public LinkedListNode getFirst() {
LinkedListNode node = head.next;
public LinkedListNode<E> getFirst() {
LinkedListNode<E> node = head.next;
if (node == head) {
return null;
}
......@@ -61,8 +62,8 @@ public class LinkedList {
*
* @return the last element of the list.
*/
public LinkedListNode getLast() {
LinkedListNode node = head.previous;
public LinkedListNode<E> getLast() {
LinkedListNode<E> node = head.previous;
if (node == head) {
return null;
}
......@@ -74,7 +75,7 @@ public class LinkedList {
*
* @param node the node to add to the beginning of the list.
*/
public LinkedListNode addFirst(LinkedListNode node) {
public LinkedListNode<E> addFirst(LinkedListNode<E> node) {
return node.insert(head.next, head);
}
......@@ -85,8 +86,8 @@ public class LinkedList {
* @param object the object to add to the beginning of the list.
* @return the node created to wrap the object.
*/
public LinkedListNode addFirst(Object object) {
return new LinkedListNode(object, head.next, head);
public LinkedListNode<E> addFirst(E object) {
return new LinkedListNode<E>(object, head.next, head);
}
/**
......@@ -94,7 +95,7 @@ public class LinkedList {
*
* @param node the node to add to the beginning of the list.
*/
public LinkedListNode addLast(LinkedListNode node) {
public LinkedListNode<E> addLast(LinkedListNode<E> node) {
return node.insert(head, head.previous);
}
......@@ -105,8 +106,8 @@ public class LinkedList {
* @param object the object to add to the end of the list.
* @return the node created to wrap the object.
*/
public LinkedListNode addLast(Object object) {
return new LinkedListNode(object, head, head.previous);
public LinkedListNode<E> addLast(E object) {
return new LinkedListNode<E>(object, head, head.previous);
}
/**
......@@ -114,7 +115,7 @@ public class LinkedList {
*/
public void clear() {
//Remove all references in the list.
LinkedListNode node = getLast();
LinkedListNode<E> node = getLast();
while (node != null) {
node.remove();
node = getLast();
......@@ -132,7 +133,7 @@ public class LinkedList {
*/
@Override
public String toString() {
LinkedListNode node = head.next;
LinkedListNode<E> node = head.next;
StringBuilder buf = new StringBuilder();
while (node != head) {
buf.append(node.toString()).append(", ");
......
......@@ -38,11 +38,11 @@ package org.jivesoftware.util;
* @author Jive Software
* @see org.jivesoftware.util.LinkedList
*/
public class LinkedListNode {
public class LinkedListNode<E> {
public LinkedListNode previous;
public LinkedListNode next;
public Object object;
public LinkedListNode<E> previous;
public LinkedListNode<E> next;
public E object;
/**
* This class is further customized for the CoolServlets cache system. It
......@@ -61,9 +61,8 @@ public class LinkedListNode {
* Constructs an self-referencing node. This node acts as a start/end
* sentinel when traversing nodes in a LinkedList.
*/
public LinkedListNode(Object object) {
public LinkedListNode() {
previous = next = this;
this.object = object;
}
/**
......@@ -73,7 +72,7 @@ public class LinkedListNode {
* @param next a reference to the next LinkedListNode in the list.
* @param previous a reference to the previous LinkedListNode in the list.
*/
public LinkedListNode(Object object, LinkedListNode next, LinkedListNode previous) {
public LinkedListNode(E object, LinkedListNode<E> next, LinkedListNode<E> previous) {
if (next != null && previous != null) {
this.insert(next, previous);
}
......@@ -84,7 +83,7 @@ public class LinkedListNode {
* Removes this node from the linked list that it was a part of.
* @return This node; next and previous references dropped
*/
public LinkedListNode remove() {
public LinkedListNode<E> remove() {
previous.next = next;
next.previous = previous;
previous = next = null;
......@@ -95,7 +94,7 @@ public class LinkedListNode {
* Inserts this node into the linked list that it will be a part of.
* @return This node, updated to reflect previous/next changes
*/
public LinkedListNode insert(LinkedListNode next, LinkedListNode previous) {
public LinkedListNode<E> insert(LinkedListNode<E> next, LinkedListNode<E> previous) {
this.next = next;
this.previous = previous;
this.previous.next = this.next.previous = this;
......
......@@ -70,13 +70,13 @@ public class DefaultCache<K, V> implements Cache<K, V> {
* Linked list to maintain order that cache objects are accessed
* in, most used to least used.
*/
protected org.jivesoftware.util.LinkedList lastAccessedList;
protected org.jivesoftware.util.LinkedList<K> lastAccessedList;
/**
* Linked list to maintain time that cache objects were initially added
* to the cache, most recently added to oldest added.
*/
protected org.jivesoftware.util.LinkedList ageList;
protected org.jivesoftware.util.LinkedList<K> ageList;
/**
* Maximum size in bytes that the cache can grow to.
......@@ -127,8 +127,8 @@ public class DefaultCache<K, V> implements Cache<K, V> {
// is too small in almost all cases, so we set it bigger.
map = new HashMap<K, CacheObject<V>>(103);
lastAccessedList = new org.jivesoftware.util.LinkedList();
ageList = new org.jivesoftware.util.LinkedList();
lastAccessedList = new org.jivesoftware.util.LinkedList<K>();
ageList = new org.jivesoftware.util.LinkedList<K>();
}
public synchronized V put(K key, V value) {
......@@ -153,12 +153,12 @@ public class DefaultCache<K, V> implements Cache<K, V> {
DefaultCache.CacheObject<V> cacheObject = new DefaultCache.CacheObject<V>(value, objectSize);
map.put(key, cacheObject);
// Make an entry into the cache order list.
LinkedListNode lastAccessedNode = lastAccessedList.addFirst(key);
LinkedListNode<K> lastAccessedNode = lastAccessedList.addFirst(key);
// Store the cache order list entry so that we can get back to it
// during later lookups.
cacheObject.lastAccessedListNode = lastAccessedNode;
// Add the object to the age list
LinkedListNode ageNode = ageList.addFirst(key);
LinkedListNode<K> ageNode = ageList.addFirst(key);
// We make an explicit call to currentTimeMillis() so that total accuracy
// of lifetime calculations is better than one second.
ageNode.timestamp = System.currentTimeMillis();
......@@ -191,7 +191,7 @@ public class DefaultCache<K, V> implements Cache<K, V> {
// Remove the object from it's current place in the cache order list,
// and re-insert it at the front of the list.
cacheObject.lastAccessedListNode.remove();
lastAccessedList.addFirst(cacheObject.lastAccessedListNode);
lastAccessedList.addFirst((LinkedListNode<K>) cacheObject.lastAccessedListNode);
return cacheObject.object;
}
......@@ -224,9 +224,9 @@ public class DefaultCache<K, V> implements Cache<K, V> {
// Now, reset all containers.
map.clear();
lastAccessedList.clear();
lastAccessedList = new org.jivesoftware.util.LinkedList();
lastAccessedList = new org.jivesoftware.util.LinkedList<K>();
ageList.clear();
ageList = new org.jivesoftware.util.LinkedList();
ageList = new org.jivesoftware.util.LinkedList<K>();
cacheSize = 0;
cacheHits = 0;
......@@ -558,7 +558,7 @@ public class DefaultCache<K, V> implements Cache<K, V> {
// of the linked list until they are no longer too old. We get to avoid
// any hash lookups or looking at any more objects than is strictly
// neccessary.
LinkedListNode node = ageList.getLast();
LinkedListNode<K> node = ageList.getLast();
// If there are no entries in the age list, return.
if (node == null) {
return;
......@@ -639,14 +639,14 @@ public class DefaultCache<K, V> implements Cache<K, V> {
* accessed, the node is removed from its current spot in the list and
* moved to the front.
*/
public LinkedListNode lastAccessedListNode;
public LinkedListNode<?> lastAccessedListNode;
/**
* A reference to the node in the age order list. We keep the reference
* here to avoid linear scans of the list. The reference is used if the
* object has to be deleted from the list.
*/
public LinkedListNode ageListNode;
public LinkedListNode<?> ageListNode;
/**
* A count of the number of times the object has been read from cache.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment