Commit 8c0da0f4 authored by Guus der Kinderen's avatar Guus der Kinderen

OF-1200: Monitoring plugin should not block on worker threads.

Instead of waiting for data to become available on the main thread, this task is better offloaded to a dedicated
thread pool. This prevents worker threads from locking up (which potentially cripples the server).
parent 85632a09
......@@ -44,7 +44,7 @@
Monitoring Plugin Changelog
</h1>
<p><b>1.6.0</b> -- Feb 16, 2018</p>
<p><b>1.6.0</b> -- Feb 25, 2018</p>
<ul>
<li>[<a href='https://issues.igniterealtime.org/browse/OF-1486'>OF-1486</a>] - MAM RSM queries for MUC should allow for 'backwards-paging'.</li>
<li>[<a href='https://issues.igniterealtime.org/browse/OF-1487'>OF-1487</a>] - Archived messages should become available in the database instantly.</li>
......
......@@ -6,7 +6,7 @@
<description>Monitors conversations and statistics of the server.</description>
<author>IgniteRealtime // Jive Software</author>
<version>1.6.0</version>
<date>2/16/2018</date>
<date>2/25/2018</date>
<minServerVersion>4.1.0</minServerVersion>
<databaseKey>monitoring</databaseKey>
<databaseVersion>4</databaseVersion>
......
package com.reucon.openfire.plugin.archive.xep0313;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import org.dom4j.*;
import com.reucon.openfire.plugin.archive.model.ArchivedMessage;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.archive.ConversationManager;
import org.jivesoftware.openfire.archive.MonitoringConstants;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.forward.Forwarded;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.muc.MUCRole;
import org.jivesoftware.openfire.muc.MUCRoom;
import org.jivesoftware.openfire.muc.MultiUserChatService;
import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.NamedThreadFactory;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.*;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import com.reucon.openfire.plugin.archive.model.ArchivedMessage;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* XEP-0313 IQ Query Handler
......@@ -38,6 +45,7 @@ abstract class IQQueryHandler extends AbstractIQHandler implements
private static final Logger Log = LoggerFactory.getLogger(IQQueryHandler.class);
protected final String NAMESPACE;
protected ExecutorService executorService;
private final XMPPDateTimeFormat xmppDateTimeFormat = new XMPPDateTimeFormat();
......@@ -46,9 +54,43 @@ abstract class IQQueryHandler extends AbstractIQHandler implements
NAMESPACE = namespace;
}
public IQ handleIQ(IQ packet) throws UnauthorizedException {
@Override
public void initialize( XMPPServer server )
{
super.initialize( server );
executorService = Executors.newCachedThreadPool( new NamedThreadFactory( "message-archive-handler-", null, null, null ) );
}
Session session = sessionManager.getSession(packet.getFrom());
@Override
public void stop()
{
executorService.shutdown();
super.stop();
}
@Override
public void destroy()
{
// Give the executor some time to finish processing jobs.
final long end = System.currentTimeMillis() + 4000;
while ( !executorService.isTerminated() && System.currentTimeMillis() < end )
{
try
{
Thread.sleep( 100 );
}
catch ( InterruptedException e )
{
break;
}
}
executorService.shutdownNow();
super.destroy();
}
public IQ handleIQ( final IQ packet ) throws UnauthorizedException {
final Session session = sessionManager.getSession(packet.getFrom());
// If no session was found then answer with an error (if possible)
if (session == null) {
......@@ -127,41 +169,48 @@ abstract class IQQueryHandler extends AbstractIQHandler implements
final QueryRequest queryRequest = new QueryRequest(packet.getChildElement(), archiveJid);
// OF-1200: make sure that data is flushed to the database before retrieving it.
MonitoringPlugin plugin = (MonitoringPlugin) XMPPServer.getInstance().getPluginManager().getPlugin(MonitoringConstants.NAME);
ConversationManager conversationManager = (ConversationManager)plugin.getModule( ConversationManager.class);
final MonitoringPlugin plugin = (MonitoringPlugin) XMPPServer.getInstance().getPluginManager().getPlugin(MonitoringConstants.NAME);
final ConversationManager conversationManager = (ConversationManager)plugin.getModule( ConversationManager.class);
final Date targetEndDate = new Date(); // TODO or, the 'before' date from RSM, if that's set and in the past.
while ( System.currentTimeMillis() < targetEndDate.getTime() + 2000 ) // TODO Use the value in org.jivesoftware.openfire.archive.ConversationManager.ArchivingRunnable.maxPurgeInterval but also add time to allow for query execution time.
executorService.submit( new Runnable()
{
if ( conversationManager.hasWrittenAllDataBefore( targetEndDate ) )
{
break;
}
try
{
// FIXME let's not block the thread, but use a callback of sorts instead.
Log.debug( "Not all data that is being requested has been written to the database yet. Delaying request processing. " );
Thread.sleep( 100 );
}
catch ( InterruptedException e )
@Override
public void run()
{
break;
}
}
if ( !conversationManager.hasWrittenAllDataBefore( targetEndDate ) ) {
Log.warn( "Retrieving data from the database to formulate a response to a MAM query, while data is still waiting to be written there. The response might be incomplete." );
}
while ( System.currentTimeMillis() < targetEndDate.getTime() + 2000 ) // TODO Use the value in org.jivesoftware.openfire.archive.ConversationManager.ArchivingRunnable.maxPurgeInterval but also add time to allow for query execution time.
{
if ( conversationManager.hasWrittenAllDataBefore( targetEndDate ) )
{
break;
}
try
{
Log.debug( "Not all data that is being requested has been written to the database yet. Delaying request processing. " );
Thread.sleep( 100 );
}
catch ( InterruptedException e )
{
break;
}
}
if ( !conversationManager.hasWrittenAllDataBefore( targetEndDate ) ) {
Log.warn( "Retrieving data from the database to formulate a response to a MAM query, while data is still waiting to be written there. The response might be incomplete." );
}
Log.debug("Retrieving messages from archive...");
Collection<ArchivedMessage> archivedMessages = retrieveMessages(queryRequest);
Log.debug("Retrieved {} messages from archive.", archivedMessages.size());
Log.debug("Retrieving messages from archive...");
Collection<ArchivedMessage> archivedMessages = retrieveMessages(queryRequest);
Log.debug("Retrieved {} messages from archive.", archivedMessages.size());
for(ArchivedMessage archivedMessage : archivedMessages) {
sendMessageResult(session, queryRequest, archivedMessage);
}
for(ArchivedMessage archivedMessage : archivedMessages) {
sendMessageResult(session, queryRequest, archivedMessage);
}
sendEndQuery(packet, session, queryRequest);
Log.debug("Done with request.");
}
} );
sendEndQuery(packet, session, queryRequest);
Log.debug("Done with request.");
return null;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment