Commit a738f60b authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-668: Improve PubSub persistence exception handling; use TaskEngine in lieu...

OF-668: Improve PubSub persistence exception handling; use TaskEngine in lieu of timer threads to avoid shutdown problems (non-daemon threads, etc.)

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13632 b35dd754-fafc-0310-a699-88a17e54d16e
parent 389894b2
...@@ -35,7 +35,6 @@ import java.util.Comparator; ...@@ -35,7 +35,6 @@ import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
...@@ -49,6 +48,7 @@ import org.jivesoftware.util.FastDateFormat; ...@@ -49,6 +48,7 @@ import org.jivesoftware.util.FastDateFormat;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
...@@ -113,7 +113,6 @@ public class AuditorImpl implements Auditor { ...@@ -113,7 +113,6 @@ public class AuditorImpl implements Auditor {
/** /**
* Timer to save queued logs to the XML file. * Timer to save queued logs to the XML file.
*/ */
private Timer timer = new Timer("Auditor");
private SaveQueuedPacketsTask saveQueuedPacketsTask; private SaveQueuedPacketsTask saveQueuedPacketsTask;
private FastDateFormat dateFormat; private FastDateFormat dateFormat;
private static FastDateFormat auditFormat; private static FastDateFormat auditFormat;
...@@ -137,7 +136,7 @@ public class AuditorImpl implements Auditor { ...@@ -137,7 +136,7 @@ public class AuditorImpl implements Auditor {
} }
// Create a new task and schedule it with the new timeout // Create a new task and schedule it with the new timeout
saveQueuedPacketsTask = new SaveQueuedPacketsTask(); saveQueuedPacketsTask = new SaveQueuedPacketsTask();
timer.schedule(saveQueuedPacketsTask, logTimeout, logTimeout); TaskEngine.getInstance().schedule(saveQueuedPacketsTask, logTimeout, logTimeout);
} }
...@@ -185,8 +184,6 @@ public class AuditorImpl implements Auditor { ...@@ -185,8 +184,6 @@ public class AuditorImpl implements Auditor {
public void stop() { public void stop() {
// Stop queuing packets since we are being stopped // Stop queuing packets since we are being stopped
closed = true; closed = true;
// Stop the scheduled task for saving queued packets to the XML file
timer.cancel();
// Save all remaining queued packets to the XML file // Save all remaining queued packets to the XML file
saveQueuedPackets(); saveQueuedPackets();
close(); close();
......
...@@ -95,6 +95,7 @@ public class HttpSessionManager { ...@@ -95,6 +95,7 @@ public class HttpSessionManager {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable, Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
"httpbind-worker-" + counter.getAndIncrement()); "httpbind-worker-" + counter.getAndIncrement());
thread.setDaemon(true);
return thread; return thread;
} }
}); });
......
...@@ -36,6 +36,7 @@ import org.jivesoftware.openfire.component.ComponentEventListener; ...@@ -36,6 +36,7 @@ import org.jivesoftware.openfire.component.ComponentEventListener;
import org.jivesoftware.openfire.component.InternalComponentManager; import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.TaskEngine;
import org.jivesoftware.util.XMPPDateTimeFormat; import org.jivesoftware.util.XMPPDateTimeFormat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -68,7 +69,6 @@ public class PacketCopier implements PacketInterceptor, ComponentEventListener { ...@@ -68,7 +69,6 @@ public class PacketCopier implements PacketInterceptor, ComponentEventListener {
/** /**
* Timer to save queued logs to the XML file. * Timer to save queued logs to the XML file.
*/ */
private Timer timer = new Timer("PacketActivityNotifier");
private ProcessPacketsTask packetsTask; private ProcessPacketsTask packetsTask;
/** /**
...@@ -99,7 +99,7 @@ public class PacketCopier implements PacketInterceptor, ComponentEventListener { ...@@ -99,7 +99,7 @@ public class PacketCopier implements PacketInterceptor, ComponentEventListener {
// Create a new task and schedule it with the new timeout // Create a new task and schedule it with the new timeout
packetsTask = new ProcessPacketsTask(); packetsTask = new ProcessPacketsTask();
timer.schedule(packetsTask, 5000, 5000); TaskEngine.getInstance().schedule(packetsTask, 5000, 5000);
} }
/** /**
......
...@@ -28,7 +28,6 @@ import java.util.Iterator; ...@@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
...@@ -61,6 +60,7 @@ import org.jivesoftware.openfire.muc.cluster.RoomAvailableEvent; ...@@ -61,6 +60,7 @@ import org.jivesoftware.openfire.muc.cluster.RoomAvailableEvent;
import org.jivesoftware.openfire.muc.cluster.RoomRemovedEvent; import org.jivesoftware.openfire.muc.cluster.RoomRemovedEvent;
import org.jivesoftware.util.JiveProperties; import org.jivesoftware.util.JiveProperties;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.TaskEngine;
import org.jivesoftware.util.XMPPDateTimeFormat; import org.jivesoftware.util.XMPPDateTimeFormat;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -166,12 +166,6 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService ...@@ -166,12 +166,6 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService
*/ */
public long totalChatTime; public long totalChatTime;
/**
* Timer to monitor chatroom participants. If they've been idle for too long, probe for
* presence.
*/
private Timer timer = new Timer("MUC cleanup");
/** /**
* Flag that indicates if the service should provide information about locked rooms when * Flag that indicates if the service should provide information about locked rooms when
* handling service discovery requests. * handling service discovery requests.
...@@ -745,7 +739,7 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService ...@@ -745,7 +739,7 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService
this.user_timeout = timeout; this.user_timeout = timeout;
// Create a new task and schedule it with the new timeout // Create a new task and schedule it with the new timeout
userTimeoutTask = new UserTimeoutTask(); userTimeoutTask = new UserTimeoutTask();
timer.schedule(userTimeoutTask, user_timeout, user_timeout); TaskEngine.getInstance().schedule(userTimeoutTask, user_timeout, user_timeout);
// Set the new property value // Set the new property value
MUCPersistenceManager.setProperty(chatServiceName, "tasks.user.timeout", Integer.toString(timeout)); MUCPersistenceManager.setProperty(chatServiceName, "tasks.user.timeout", Integer.toString(timeout));
} }
...@@ -778,7 +772,7 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService ...@@ -778,7 +772,7 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService
this.log_timeout = timeout; this.log_timeout = timeout;
// Create a new task and schedule it with the new timeout // Create a new task and schedule it with the new timeout
logConversationTask = new LogConversationTask(); logConversationTask = new LogConversationTask();
timer.schedule(logConversationTask, log_timeout, log_timeout); TaskEngine.getInstance().schedule(logConversationTask, log_timeout, log_timeout);
// Set the new property value // Set the new property value
MUCPersistenceManager.setProperty(chatServiceName, "tasks.log.timeout", Integer.toString(timeout)); MUCPersistenceManager.setProperty(chatServiceName, "tasks.log.timeout", Integer.toString(timeout));
} }
...@@ -1029,14 +1023,14 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService ...@@ -1029,14 +1023,14 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService
// Run through the users every 5 minutes after a 5 minutes server startup delay (default // Run through the users every 5 minutes after a 5 minutes server startup delay (default
// values) // values)
userTimeoutTask = new UserTimeoutTask(); userTimeoutTask = new UserTimeoutTask();
timer.schedule(userTimeoutTask, user_timeout, user_timeout); TaskEngine.getInstance().schedule(userTimeoutTask, user_timeout, user_timeout);
// Log the room conversations every 5 minutes after a 5 minutes server startup delay // Log the room conversations every 5 minutes after a 5 minutes server startup delay
// (default values) // (default values)
logConversationTask = new LogConversationTask(); logConversationTask = new LogConversationTask();
timer.schedule(logConversationTask, log_timeout, log_timeout); TaskEngine.getInstance().schedule(logConversationTask, log_timeout, log_timeout);
// Remove unused rooms from memory // Remove unused rooms from memory
cleanupTask = new CleanupTask(); cleanupTask = new CleanupTask();
timer.schedule(cleanupTask, CLEANUP_FREQUENCY, CLEANUP_FREQUENCY); TaskEngine.getInstance().schedule(cleanupTask, CLEANUP_FREQUENCY, CLEANUP_FREQUENCY);
// Set us up to answer disco item requests // Set us up to answer disco item requests
XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this); XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this);
...@@ -1059,7 +1053,6 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService ...@@ -1059,7 +1053,6 @@ public class MultiUserChatServiceImpl implements Component, MultiUserChatService
XMPPServer.getInstance().getServerItemsProviders().remove(this); XMPPServer.getInstance().getServerItemsProviders().remove(this);
// Remove the route to this service // Remove the route to this service
routingTable.removeComponentRoute(getAddress()); routingTable.removeComponentRoute(getAddress());
timer.cancel();
logAllConversation(); logAllConversation();
} }
......
...@@ -101,7 +101,18 @@ public class PublishedItem implements Serializable { ...@@ -101,7 +101,18 @@ public class PublishedItem implements Serializable {
* XML representation of the payload (for serialization) * XML representation of the payload (for serialization)
*/ */
private String payloadXML; private String payloadXML;
/**
* Persistence retry counter
*/
private volatile transient int retryCount = 0;
/**
* Creates a published item
* @param node
* @param publisher
* @param id
* @param creationDate
*/
PublishedItem(LeafNode node, JID publisher, String id, Date creationDate) { PublishedItem(LeafNode node, JID publisher, String id, Date creationDate) {
this.node = node; this.node = node;
this.nodeId = node.getNodeID(); this.nodeId = node.getNodeID();
...@@ -282,6 +293,14 @@ public class PublishedItem implements Serializable { ...@@ -282,6 +293,14 @@ public class PublishedItem implements Serializable {
} }
/** /**
* Returns (and increments) the item persistence retry counter
* @return Number of attempts made to persist this item to the DB
*/
public int getRetryCount() {
return retryCount++;
}
/**
* Returns a string that uniquely identifies this published item * Returns a string that uniquely identifies this published item
* in the following format: <i>nodeId:itemId</i> * in the following format: <i>nodeId:itemId</i>
* @param node Node for the published item * @param node Node for the published item
......
...@@ -34,13 +34,13 @@ public class LinkedList { ...@@ -34,13 +34,13 @@ public class LinkedList {
* The root of the list keeps a reference to both the first and last * The root of the list keeps a reference to both the first and last
* elements of the list. * elements of the list.
*/ */
private LinkedListNode head = new LinkedListNode("head", null, null); private LinkedListNode head;
/** /**
* Creates a new linked list. * Creates a new linked list.
*/ */
public LinkedList() { public LinkedList() {
head.next = head.previous = head; head = new LinkedListNode("head");
} }
/** /**
...@@ -75,11 +75,7 @@ public class LinkedList { ...@@ -75,11 +75,7 @@ public class LinkedList {
* @param node the node to add to the beginning of the list. * @param node the node to add to the beginning of the list.
*/ */
public LinkedListNode addFirst(LinkedListNode node) { public LinkedListNode addFirst(LinkedListNode node) {
node.next = head.next; return node.insert(head.next, head);
node.previous = head;
node.previous.next = node;
node.next.previous = node;
return node;
} }
/** /**
...@@ -90,10 +86,16 @@ public class LinkedList { ...@@ -90,10 +86,16 @@ public class LinkedList {
* @return the node created to wrap the object. * @return the node created to wrap the object.
*/ */
public LinkedListNode addFirst(Object object) { public LinkedListNode addFirst(Object object) {
LinkedListNode node = new LinkedListNode(object, head.next, head); return new LinkedListNode(object, head.next, head);
node.previous.next = node; }
node.next.previous = node;
return node; /**
* Adds a node to the end of the list.
*
* @param node the node to add to the beginning of the list.
*/
public LinkedListNode addLast(LinkedListNode node) {
return node.insert(head, head.previous);
} }
/** /**
...@@ -104,10 +106,7 @@ public class LinkedList { ...@@ -104,10 +106,7 @@ public class LinkedList {
* @return the node created to wrap the object. * @return the node created to wrap the object.
*/ */
public LinkedListNode addLast(Object object) { public LinkedListNode addLast(Object object) {
LinkedListNode node = new LinkedListNode(object, head, head.previous); return new LinkedListNode(object, head, head.previous);
node.previous.next = node;
node.next.previous = node;
return node;
} }
/** /**
......
...@@ -57,6 +57,15 @@ public class LinkedListNode { ...@@ -57,6 +57,15 @@ public class LinkedListNode {
*/ */
public long timestamp; public long timestamp;
/**
* Constructs an self-referencing node. This node acts as a start/end
* sentinel when traversing nodes in a LinkedList.
*/
public LinkedListNode(Object object) {
previous = next = this;
this.object = object;
}
/** /**
* Constructs a new linked list node. * Constructs a new linked list node.
* *
...@@ -64,19 +73,33 @@ public class LinkedListNode { ...@@ -64,19 +73,33 @@ public class LinkedListNode {
* @param next a reference to the next LinkedListNode in the list. * @param next a reference to the next LinkedListNode in the list.
* @param previous a reference to the previous LinkedListNode in the list. * @param previous a reference to the previous LinkedListNode in the list.
*/ */
public LinkedListNode(Object object, LinkedListNode next, public LinkedListNode(Object object, LinkedListNode next, LinkedListNode previous) {
LinkedListNode previous) { if (next != null && previous != null) {
this.insert(next, previous);
}
this.object = object; this.object = object;
this.next = next;
this.previous = previous;
} }
/** /**
* Removes this node from the linked list that it is a part of. * Removes this node from the linked list that it was a part of.
* @return This node; next and previous references dropped
*/ */
public void remove() { public LinkedListNode remove() {
previous.next = next; previous.next = next;
next.previous = previous; next.previous = previous;
previous = next = null;
return this;
}
/**
* Inserts this node into the linked list that it will be a part of.
* @return This node, updated to reflect previous/next changes
*/
public LinkedListNode insert(LinkedListNode next, LinkedListNode previous) {
this.next = next;
this.previous = previous;
this.previous.next = this.next.previous = this;
return this;
} }
/** /**
......
...@@ -57,7 +57,7 @@ public class TaskEngine { ...@@ -57,7 +57,7 @@ public class TaskEngine {
* Constructs a new task engine. * Constructs a new task engine.
*/ */
private TaskEngine() { private TaskEngine() {
timer = new Timer("timer-openfire", true); timer = new Timer("TaskEngine-timer", true);
executor = Executors.newCachedThreadPool(new ThreadFactory() { executor = Executors.newCachedThreadPool(new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1); final AtomicInteger threadNumber = new AtomicInteger(1);
...@@ -65,7 +65,7 @@ public class TaskEngine { ...@@ -65,7 +65,7 @@ public class TaskEngine {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
// Use our own naming scheme for the threads. // Use our own naming scheme for the threads.
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable, Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
"pool-openfire" + threadNumber.getAndIncrement(), 0); "TaskEngine-pool-" + threadNumber.getAndIncrement(), 0);
// Make workers daemon threads. // Make workers daemon threads.
thread.setDaemon(true); thread.setDaemon(true);
if (thread.getPriority() != Thread.NORM_PRIORITY) { if (thread.getPriority() != Thread.NORM_PRIORITY) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment