Commit 68d5e49e authored by Guus der Kinderen's avatar Guus der Kinderen

OF-1487: Monitoring plugin: write archived data to the database more often.

Prior to this commit, archived data was kept in a buffer, which was flushed to the database once per minute.
It is undesirable to have the database 'lag behind' for such an extend period of time.

This commit introduces near-instantanious flushes. Data that's put in a buffer is flushed immediately, unless
more data is available instantly.
parent adaa8196
......@@ -47,6 +47,7 @@ Monitoring Plugin Changelog
<p><b>1.6.0</b> -- Feb 16, 2018</p>
<ul>
<li>[<a href='https://issues.igniterealtime.org/browse/OF-1486'>OF-1486</a>] - MAM RSM queries for MUC should allow for 'backwards-paging'.</li>
<li>[<a href='https://issues.igniterealtime.org/browse/OF-1487'>OF-1487</a>] - Archived messages should become available in the database instantly.</li>
</ul>
<p><b>1.5.9</b> -- Feb 13, 2018</p>
......
......@@ -30,10 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.*;
import org.dom4j.Element;
import org.jivesoftware.database.DbConnectionManager;
......@@ -49,13 +46,7 @@ import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.jivesoftware.openfire.reporting.util.TaskEngine;
import org.jivesoftware.openfire.stats.Statistic;
import org.jivesoftware.openfire.stats.StatisticsManager;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.NotFoundException;
import org.jivesoftware.util.PropertyEventDispatcher;
import org.jivesoftware.util.PropertyEventListener;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.*;
import org.jivesoftware.util.cache.CacheFactory;
import org.picocontainer.Startable;
import org.slf4j.Logger;
......@@ -123,18 +114,13 @@ public class ConversationManager implements Startable, ComponentEventListener{
private long maxRetrievable;
private PropertyEventListener propertyListener;
private Queue<Conversation> conversationQueue;
private Queue<ArchivedMessage> messageQueue;
/**
* Queue of participants that joined or left a conversation. This queue is processed by the ArchivingTask.
*/
private Queue<RoomParticipant> participantQueue;
private final PriorityBlockingQueue<ArchiveCandidate<Conversation>> conversationQueue = new PriorityBlockingQueue<>();
private final PriorityBlockingQueue<ArchiveCandidate<ArchivedMessage>> messageQueue = new PriorityBlockingQueue<>();
private final PriorityBlockingQueue<ArchiveCandidate<RoomParticipant>> participantQueue = new PriorityBlockingQueue<>();
private boolean archivingRunning = false;
private ExecutorService executorService;
private TimerTask archiveTask;
private TimerTask cleanupTask;
private TimerTask maxAgeTask;
private Collection<ConversationListener> conversationListeners;
......@@ -176,20 +162,16 @@ public class ConversationManager implements Startable, ComponentEventListener{
propertyListener = new ConversationPropertyListener();
PropertyEventDispatcher.addListener(propertyListener);
conversationQueue = new ConcurrentLinkedQueue<Conversation>();
messageQueue = new ConcurrentLinkedQueue<ArchivedMessage>();
participantQueue = new ConcurrentLinkedQueue<RoomParticipant>();
conversationListeners = new CopyOnWriteArraySet<ConversationListener>();
// Schedule a task to do conversation archiving.
archiveTask = new TimerTask() {
@Override
public void run() {
new ArchivingTask().run();
if ( executorService != null && !executorService.isShutdown() )
{
executorService.shutdownNow();
}
};
taskEngine.scheduleAtFixedRate(archiveTask, JiveConstants.MINUTE, JiveConstants.MINUTE);
executorService = Executors.newFixedThreadPool( 3, new NamedThreadFactory( "MonitorPluginArchiver", null, null, null ) );
executorService.submit( new ConversationArchivingRunnable( conversationQueue ) );
executorService.submit( new MessageArchivingRunnable( messageQueue ) );
executorService.submit( new ParticipantArchivingRunnable( participantQueue ) );
if (JiveGlobals.getProperty("conversation.maxTimeDebug") != null) {
Log.info("Monitoring plugin max time value deleted. Must be left over from stalled userCreation plugin run.");
......@@ -292,8 +274,8 @@ public class ConversationManager implements Startable, ComponentEventListener{
}
public void stop() {
archiveTask.cancel();
archiveTask = null;
executorService.shutdownNow();
cleanupTask.cancel();
cleanupTask = null;
......@@ -302,17 +284,6 @@ public class ConversationManager implements Startable, ComponentEventListener{
PropertyEventDispatcher.removeListener(propertyListener);
propertyListener = null;
conversations.clear();
conversations = null;
// Archive anything remaining in the queue before quitting.
new ArchivingTask().run();
conversationQueue.clear();
conversationQueue = null;
messageQueue.clear();
messageQueue = null;
conversationListeners.clear();
conversationListeners = null;
......@@ -718,12 +689,12 @@ public class ConversationManager implements Startable, ComponentEventListener{
// Record the newly received message.
conversation.messageReceived(sender, date);
if (metadataArchivingEnabled) {
conversationQueue.add(conversation);
conversationQueue.add(new ArchiveCandidate<>(conversation));
}
if (messageArchivingEnabled) {
if (body != null) {
/* OF-677 - Workaround to prevent null messages being archived */
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, receiver, date, body, stanza, false));
messageQueue.add(new ArchiveCandidate<>( new ArchivedMessage(conversation.getConversationID(), sender, receiver, date, body, stanza, false) ));
}
}
// Notify listeners of the conversation update.
......@@ -779,13 +750,13 @@ public class ConversationManager implements Startable, ComponentEventListener{
// Record the newly received message.
conversation.messageReceived(sender, date);
if (metadataArchivingEnabled) {
conversationQueue.add(conversation);
conversationQueue.add(new ArchiveCandidate<>( conversation ));
}
if (roomArchivingEnabled && (roomsArchived.isEmpty() || roomsArchived.contains(roomJID.getNode()))) {
JID jid = new JID(roomJID + "/" + nickname);
if (body != null) {
/* OF-677 - Workaround to prevent null messages being archived */
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, jid, date, body, roomArchivingStanzasEnabled ? stanza : "", false));
messageQueue.add(new ArchiveCandidate<>( new ArchivedMessage(conversation.getConversationID(), sender, jid, date, body, roomArchivingStanzasEnabled ? stanza : "", false)) );
}
}
// Notify listeners of the conversation update.
......@@ -972,114 +943,317 @@ public class ConversationManager implements Startable, ComponentEventListener{
updatedParticipant.user = user;
updatedParticipant.joined = participation.getJoined();
updatedParticipant.left = participation.getLeft();
participantQueue.add(updatedParticipant);
participantQueue.add(new ArchiveCandidate<>( updatedParticipant ));
}
/**
* A task that persists conversation meta-data and messages to the database.
* A to-be-archived entity.
*
* Note that the ordering imposed by the Comparable implementation is not consistent with equals, and serves only
* to order instances by their creation timestamp.
*/
private class ArchivingTask implements Runnable {
private static class ArchiveCandidate<E> implements Comparable<ArchiveCandidate<E>> {
private final Date creation = new Date();
public void run() {
synchronized (this) {
if (archivingRunning) {
return;
private final E element;
public ArchiveCandidate( E element ) {
if ( element == null ) {
throw new IllegalArgumentException( "Argument 'element' cannot be null." );
}
this.element = element;
}
public Date createdAt()
{
return creation;
}
public E getElement()
{
return element;
}
@Override
public int compareTo( ArchiveCandidate<E> o )
{
return creation.compareTo( o.creation );
}
}
/**
* Returns true if none of the queues hold data that was delivered before the provided argument.
*
* This method is intended to be used to determine if it's safe to construct an answer (based on database
* content) to a request for archived data. Such response should only be generated after all data that was
* queued before the request arrived has been written to the database.
*
* @param date A date (cannot be null).
* @return false if any of the the queues contain work that was created before the provided date, otherwise true.
*/
public boolean hasWrittenAllDataBefore( Date date )
{
final ArchiveCandidate c = conversationQueue.peek();
final ArchiveCandidate m = messageQueue.peek();
final ArchiveCandidate p = participantQueue.peek();
return ( c == null || c.creation.after( date ) )
&& ( m == null || m.creation.after( date ) )
&& ( p == null || p.creation.after( date ) );
}
/**
* An abstract runnable that adds to-be-archived data to the database.
*
* This implementation is designed to reduce the work load on the database, by batching work where possible, without
* severely delaying database writes.
*
* This implementation acts as a consumer (in context of the producer-consumer design pattern), where the queue that
* is used to relay work from both processes is passed as an argument to the constructor of this class.
*
* @author Guus der Kinderen, guus.der.kinderen@gmail.com
*/
private static abstract class ArchivingRunnable<E> implements Runnable
{
// Do not add more than this amount of queries in a batch.
final int maxWorkQueueSize = 500; // TODO make this value configurable.
// Do not delay longer than this amount of milliseconds before storing data in the database.
final long maxPurgeInterval = 1000; // TODO make this value configurable.
// Maximum amount of milliseconds to wait for 'more' work to arrive, before committing the batch.
final long gracePeriod = 50; // TODO make this value configurable.
// Reference to the queue in which work is produced.
final PriorityBlockingQueue<ArchiveCandidate<E>> queue;
ArchivingRunnable( PriorityBlockingQueue<ArchiveCandidate<E>> queue )
{
if ( queue == null )
{
throw new IllegalArgumentException( "Argument 'queue' cannot be null." );
}
this.queue = queue;
}
public void run()
{
boolean running = true;
// This loop is designed to write data to be stored in the database without much delay, while at the same
// time allowing for batching of work that's produced at roughly the same time (which improves performance).
while ( running )
{
// The batch of work for this iteration.
final List<ArchiveCandidate<E>> workQueue = new ArrayList<>();
try
{
// Blocks until work is produced.
ArchiveCandidate<E> work = queue.take();
workQueue.add( work );
// Continue filling up this batch as long as new archive candidates can be retrieved pretty much
// instantaneously, but don't take longer than the maximum allowed purge interval (this is intended
// to make sure that the database content is updated regularly)
final long start = System.currentTimeMillis();
while ( ( workQueue.size() < maxWorkQueueSize ) // Don't allow the batch to grow to big.
&& ( System.currentTimeMillis() - start < maxPurgeInterval - gracePeriod ) // Don't take to long between commits.
&& ( ( work = queue.poll( gracePeriod, TimeUnit.MILLISECONDS ) ) != null ) )
{
workQueue.add( work );
}
archivingRunning = true;
}
if (!messageQueue.isEmpty() || !conversationQueue.isEmpty() || !participantQueue.isEmpty()) {
catch ( InterruptedException e )
{
// Causes the thread to stop.
running = false;
}
// Store all produced work in the database.
store( workQueue );
}
}
abstract void store( List<ArchiveCandidate<E>> workQueue );
}
/**
* Stores Conversations in the database.
*/
private static class ConversationArchivingRunnable extends ArchivingRunnable<Conversation>
{
ConversationArchivingRunnable( PriorityBlockingQueue<ArchiveCandidate<Conversation>> queue )
{
super( queue );
}
protected void store( List<ArchiveCandidate<Conversation>> workQueue )
{
if ( workQueue.isEmpty() )
{
return;
}
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(INSERT_MESSAGE);
ArchivedMessage message;
int msgCount = getArchivedMessageCount();
try
{
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(UPDATE_CONVERSATION);
int count = 0;
while ((message = messageQueue.poll()) != null) {
pstmt.setInt(1, ++msgCount);
pstmt.setLong(2, message.getConversationID());
pstmt.setString(3, message.getFromJID().toBareJID());
pstmt.setString(4, message.getFromJID().getResource());
pstmt.setString(5, message.getToJID().toBareJID());
pstmt.setString(6, message.getToJID().getResource());
pstmt.setLong(7, message.getSentDate().getTime());
DbConnectionManager.setLargeTextField(pstmt, 8, message.getBody());
DbConnectionManager.setLargeTextField(pstmt, 9, message.getStanza());
if (DbConnectionManager.isBatchUpdatesSupported()) {
for ( final ArchiveCandidate<Conversation> work : workQueue )
{
pstmt.setLong( 1, work.getElement().getLastActivity().getTime() );
pstmt.setInt( 2, work.getElement().getMessageCount() );
pstmt.setLong( 3, work.getElement().getConversationID() );
if ( DbConnectionManager.isBatchUpdatesSupported() )
{
pstmt.addBatch();
} else {
}
else
{
pstmt.execute();
}
count++;
// Only batch up to 500 items at a time.
if (count >= 500 && DbConnectionManager.isBatchUpdatesSupported()) {
}
if ( DbConnectionManager.isBatchUpdatesSupported() )
{
pstmt.executeBatch();
count = 0;
}
}
if (count > 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
catch ( Exception e )
{
Log.error( "Unable to archive conversation data!", e );
}
finally
{
DbConnectionManager.closeConnection( pstmt, con );
}
}
}
pstmt = con.prepareStatement(UPDATE_CONVERSATION);
Conversation conversation;
count = 0;
while ((conversation = conversationQueue.poll()) != null) {
pstmt.setLong(1, conversation.getLastActivity().getTime());
pstmt.setInt(2, conversation.getMessageCount());
pstmt.setLong(3, conversation.getConversationID());
if (DbConnectionManager.isBatchUpdatesSupported()) {
/**
* Stores Messages in the database.
*/
private class MessageArchivingRunnable extends ArchivingRunnable<ArchivedMessage>
{
MessageArchivingRunnable( PriorityBlockingQueue<ArchiveCandidate<ArchivedMessage>> queue )
{
super( queue );
}
@Override
void store( List<ArchiveCandidate<ArchivedMessage>> workQueue )
{
if ( workQueue.isEmpty() )
{
return;
}
Connection con = null;
PreparedStatement pstmt = null;
try
{
int msgCount = getArchivedMessageCount();
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(INSERT_MESSAGE);
for ( final ArchiveCandidate<ArchivedMessage> work : workQueue )
{
pstmt.setInt(1, ++msgCount);
pstmt.setLong(2, work.getElement().getConversationID());
pstmt.setString(3, work.getElement().getFromJID().toBareJID());
pstmt.setString(4, work.getElement().getFromJID().getResource());
pstmt.setString(5, work.getElement().getToJID().toBareJID());
pstmt.setString(6, work.getElement().getToJID().getResource());
pstmt.setLong(7, work.getElement().getSentDate().getTime());
DbConnectionManager.setLargeTextField(pstmt, 8, work.getElement().getBody());
DbConnectionManager.setLargeTextField(pstmt, 9, work.getElement().getStanza());
if ( DbConnectionManager.isBatchUpdatesSupported() )
{
pstmt.addBatch();
} else {
}
else
{
pstmt.execute();
}
count++;
// Only batch up to 500 items at a time.
if (count >= 500 && DbConnectionManager.isBatchUpdatesSupported()) {
}
if ( DbConnectionManager.isBatchUpdatesSupported() )
{
pstmt.executeBatch();
count = 0;
}
}
if (count > 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
catch ( Exception e )
{
Log.error( "Unable to archive message data!", e );
}
finally
{
DbConnectionManager.closeConnection( pstmt, con );
}
}
}
/**
* Stores Participants in the database.
*/
private static class ParticipantArchivingRunnable extends ArchivingRunnable<RoomParticipant>
{
ParticipantArchivingRunnable( PriorityBlockingQueue<ArchiveCandidate<RoomParticipant>> queue )
{
super( queue );
}
protected void store( List<ArchiveCandidate<RoomParticipant>> workQueue )
{
if ( workQueue.isEmpty() )
{
return;
}
pstmt = con.prepareStatement(UPDATE_PARTICIPANT);
RoomParticipant particpiant;
count = 0;
while ((particpiant = participantQueue.poll()) != null) {
pstmt.setLong(1, particpiant.left.getTime());
pstmt.setLong(2, particpiant.conversationID);
pstmt.setString(3, particpiant.user.toBareJID());
pstmt.setString(4, particpiant.user.getResource() == null ? " " : particpiant.user.getResource());
pstmt.setLong(5, particpiant.joined.getTime());
if (DbConnectionManager.isBatchUpdatesSupported()) {
Connection con = null;
PreparedStatement pstmt = null;
try
{
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement( UPDATE_PARTICIPANT );
for ( final ArchiveCandidate<RoomParticipant> work : workQueue )
{
pstmt.setLong(1, work.getElement().left.getTime());
pstmt.setLong(2, work.getElement().conversationID);
pstmt.setString(3, work.getElement().user.toBareJID());
pstmt.setString(4, work.getElement().user.getResource() == null ? " " : work.getElement().user.getResource());
pstmt.setLong(5, work.getElement().joined.getTime());
if ( DbConnectionManager.isBatchUpdatesSupported() )
{
pstmt.addBatch();
} else {
pstmt.execute();
}
count++;
// Only batch up to 500 items at a time.
if (count >= 500 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
count = 0;
else
{
pstmt.execute();
}
}
if (count > 0 && DbConnectionManager.isBatchUpdatesSupported()) {
if ( DbConnectionManager.isBatchUpdatesSupported() )
{
pstmt.executeBatch();
}
} catch (Exception e) {
Log.error(e.getMessage(), e);
} finally {
DbConnectionManager.closeConnection(pstmt, con);
}
catch ( Exception e )
{
Log.error( "Unable to archive participant data!", e );
}
finally
{
DbConnectionManager.closeConnection( pstmt, con );
}
// Set archiving running back to false.
archivingRunning = false;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment