Commit 590a5746 authored by Dave Cridland's avatar Dave Cridland

OF-1246 Update Monitoring plugin to support MAM-MUC

Note that this also fixes numerous bugs with general MAM itself. A full
comparitive test run was not performed, so it is unclear how many issues
were present in previous builds.

The "with" field is mandatory within MAM, but is not within MAM-MUC; it is
therefore ignored. The specification remains unclear on whether this is
legitimate.
parent 20a75d52
......@@ -43,6 +43,12 @@
<h1>
Monitoring Plugin Changelog
</h1>
<p><b>1.5.5</b> -- December 19, 2016</p>
<ul>
<li>[<a href='https://igniterealtime.org/issues/browse/OF-1246'>OF-1246</a>] - Support MAM (XEP-0313) for MUC (XEP-0045)</li>
<li>[<a href='https://igniterealtime.org/issues/browse/OF-1214'>OF-1214</a>] - Update MAM (XEP-0313) to support :0 and :1 versions</li>
</ul>
<p><b>1.5.4</b> -- April 27, 2016</p>
<ul>
<li>[<a href='https://igniterealtime.org/issues/browse/OF-1132'>OF-1132</a>] - Ensure that namespace is defined on forwarded messages.</li>
......
......@@ -4,10 +4,10 @@
<class>org.jivesoftware.openfire.plugin.MonitoringPlugin</class>
<name>Monitoring Service</name>
<description>Monitors conversations and statistics of the server.</description>
<author>Jive Software</author>
<version>1.5.4</version>
<author>IgniteRealtime // Jive Software</author>
<version>1.5.5</version>
<date>4/27/2016</date>
<minServerVersion>4.0.0</minServerVersion>
<minServerVersion>4.0.99</minServerVersion><!-- Allows the beta 4.1.0 -->
<databaseKey>monitoring</databaseKey>
<databaseVersion>4</databaseVersion>
......
......@@ -170,7 +170,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
+ "ofMessageArchive.messageID, " + "ofConParticipant.bareJID "
+ "FROM ofMessageArchive "
+ "INNER JOIN ofConParticipant ON ofMessageArchive.conversationID = ofConParticipant.conversationID "
+ "WHERE ofMessageArchive.stanza != NULL OR ofMessageArchive.body != NULL";
+ "WHERE (ofMessageArchive.stanza IS NOT NULL OR ofMessageArchive.body IS NOT NULL) ";
public static final String SELECT_MESSAGE_ORACLE = "SELECT "
+ "ofMessageArchive.fromJID, "
......@@ -186,7 +186,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
public static final String COUNT_MESSAGES = "SELECT COUNT(DISTINCT ofMessageArchive.messageID) "
+ "FROM ofMessageArchive "
+ "INNER JOIN ofConParticipant ON ofMessageArchive.conversationID = ofConParticipant.conversationID "
+ "WHERE ofMessageArchive.stanza != NULL OR ofMessageArchive.body != NULL";
+ "WHERE (ofMessageArchive.stanza IS NOT NULL OR ofMessageArchive.body IS NOT NULL) ";
public boolean createMessage(ArchivedMessage message) {
/* read only */
......@@ -456,7 +456,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
appendWhere(whereSB, "( ", MESSAGE_TO_JID, " = ? OR ", MESSAGE_FROM_JID, " = ? )");
}
if (whereSB.length() != 0) {
querySB.append(" WHERE ").append(whereSB);
querySB.append(" AND ").append(whereSB);
}
if (DbConnectionManager.getDatabaseType() == DbConnectionManager.DatabaseType.sqlserver) {
......@@ -588,7 +588,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
querySB = new StringBuilder(COUNT_MESSAGES);
if (whereClause != null && whereClause.length() != 0) {
querySB.append(" WHERE ").append(whereClause);
querySB.append(" AND ").append(whereClause);
}
Connection con = null;
......@@ -618,7 +618,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
StringBuilder querySB;
querySB = new StringBuilder(COUNT_MESSAGES);
querySB.append(" WHERE ");
querySB.append(" AND ");
if (whereClause != null && whereClause.length() != 0) {
querySB.append(whereClause);
querySB.append(" AND ");
......
package com.reucon.openfire.plugin.archive.impl;
import com.reucon.openfire.plugin.archive.ArchivedMessageConsumer;
import com.reucon.openfire.plugin.archive.PersistenceManager;
import com.reucon.openfire.plugin.archive.model.ArchivedMessage;
import com.reucon.openfire.plugin.archive.model.Conversation;
import com.reucon.openfire.plugin.archive.model.Participant;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
import org.dom4j.Attribute;
import org.dom4j.Element;
import org.dom4j.Namespace;
import org.dom4j.io.SAXReader;
import org.eclipse.jdt.internal.compiler.apt.util.Archive;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.muc.MUCRoom;
import org.jivesoftware.openfire.muc.MultiUserChatManager;
import org.jivesoftware.openfire.muc.MultiUserChatService;
import org.jivesoftware.openfire.muc.NotAllowedException;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import java.io.StringReader;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
/**
* Created by dwd on 25/07/16.
*/
public class MucMamPersistenceManager implements PersistenceManager {
private static final String LOAD_HISTORY =
"SELECT sender, nickname, logTime, subject, body, stanza, messageId FROM ofMucConversationLog " +
"WHERE messageId IS NOT NULL AND logTime>? AND logTime <= ? AND roomID=? AND (nickname IS NOT NULL OR subject IS NOT NULL) ";
private static final String WHERE_SENDER = " AND sender = ? ";
private static final String WHERE_AFTER = " AND messageId > ? ";
private static final String WHERE_BEFORE = " AND messageId < ? ";
private static final String ORDER_BY = " ORDER BY logTime";
@Override
public boolean createMessage(ArchivedMessage message) {
return false;
}
@Override
public int processAllMessages(ArchivedMessageConsumer callback) {
return 0;
}
@Override
public boolean createConversation(Conversation conversation) {
return false;
}
@Override
public boolean updateConversationEnd(Conversation conversation) {
return false;
}
@Override
public boolean createParticipant(Participant participant, Long conversationId) {
return false;
}
@Override
public List<Conversation> findConversations(String[] participants, Date startDate, Date endDate) {
return null;
}
@Override
public Collection<Conversation> findConversations(Date startDate, Date endDate, String owner, String with, XmppResultSet xmppResultSet) {
return null;
}
@Override
public Collection<ArchivedMessage> findMessages(Date startDate, Date endDate, String owner, String with, XmppResultSet xmppResultSet) {
JID mucRoom = new JID(owner);
MultiUserChatManager manager = XMPPServer.getInstance().getMultiUserChatManager();
MultiUserChatService service = manager.getMultiUserChatService(mucRoom);
MUCRoom room = service.getChatRoom(mucRoom.getNode());
Connection connection = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
// If logging isn't enabled, do nothing.
if (!room.isLogEnabled()) return null;
List<ArchivedMessage>msgs = new LinkedList<>();
if (startDate == null) {
startDate = new Date(0L);
}
if (endDate == null) {
endDate = new Date();
}
int max = xmppResultSet.getMax();
with = null; // TODO: Suppress this, since we don't yet have requestor information for access control.
try {
connection = DbConnectionManager.getConnection();
StringBuilder sql = new StringBuilder(LOAD_HISTORY);
if (with != null) {
sql.append(WHERE_SENDER);
}
if (xmppResultSet.getAfter() != null) {
sql.append(WHERE_AFTER);
}
if (xmppResultSet.getBefore() != null) {
sql.append(WHERE_BEFORE);
}
sql.append(ORDER_BY);
pstmt = connection.prepareStatement(sql.toString());
pstmt.setString(1, StringUtils.dateToMillis(startDate));
pstmt.setString(2, StringUtils.dateToMillis(endDate));
pstmt.setLong(3, room.getID());
int pos = 3;
if (with != null) {
pstmt.setString(++pos, with);
}
if (xmppResultSet.getAfter() != null) {
pstmt.setLong(++pos, xmppResultSet.getAfter());
}
if (xmppResultSet.getBefore() != null) {
pstmt.setLong(++pos, xmppResultSet.getBefore());
}
rs = pstmt.executeQuery();
while (rs.next()) {
String senderJID = rs.getString(1);
String nickname = rs.getString(2);
Date sentDate = new Date(Long.parseLong(rs.getString(3).trim()));
String subject = rs.getString(4);
String body = rs.getString(5);
String stanza = rs.getString(6);
long id = rs.getLong(7);
if (stanza == null) {
Message message = new Message();
message.setType(Message.Type.groupchat);
message.setSubject(subject);
message.setBody(body);
// Set the sender of the message
if (nickname != null && nickname.trim().length() > 0) {
JID roomJID = room.getRole().getRoleAddress();
// Recreate the sender address based on the nickname and room's JID
message.setFrom(new JID(roomJID.getNode(), roomJID.getDomain(), nickname, true));
}
else {
// Set the room as the sender of the message
message.setFrom(room.getRole().getRoleAddress());
}
stanza = message.toString();
}
ArchivedMessage archivedMessage = new ArchivedMessage(sentDate, ArchivedMessage.Direction.from, null, null);
archivedMessage.setStanza(stanza);
archivedMessage.setId(id);
msgs.add(archivedMessage);
}
} catch (SQLException e) {
// TODO ???
} finally {
DbConnectionManager.closeConnection(rs, pstmt, connection);
}
// TODO - Not great, really should be done by suitable LIMIT stuff.
// Would need to reverse ordering in some cases and then reverse results.
boolean complete = true;
xmppResultSet.setCount(msgs.size());
while (msgs.size() > max) {
msgs.remove(msgs.size() - 1);
complete = false;
}
xmppResultSet.setComplete(complete);
if (msgs.size() > 0) {
xmppResultSet.setFirst(msgs.get(0).getId());
if (msgs.size() > 1) {
xmppResultSet.setLast(msgs.get(msgs.size() - 1).getId());
}
}
return msgs;
}
@Override
public Collection<Conversation> getActiveConversations(int conversationTimeout) {
return null;
}
@Override
public List<Conversation> getConversations(Collection<Long> conversationIds) {
return null;
}
@Override
public Conversation getConversation(String ownerJid, String withJid, Date start) {
return null;
}
@Override
public Conversation getConversation(Long conversationId) {
return null;
}
}
......@@ -4,6 +4,7 @@ import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
......@@ -26,8 +27,8 @@ public abstract class AbstractIQHandler extends IQHandler {
return info;
}
protected PersistenceManager getPersistenceManager() {
return MonitoringPlugin.getInstance().getPersistenceManager();
protected PersistenceManager getPersistenceManager(JID jid) {
return MonitoringPlugin.getInstance().getPersistenceManager(jid);
}
protected IndexManager getIndexManager() {
......
......@@ -9,9 +9,12 @@ import java.util.Map;
import org.jivesoftware.openfire.IQRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.container.Module;
import org.jivesoftware.openfire.disco.IQDiscoInfoHandler;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.muc.MultiUserChatManager;
import org.jivesoftware.openfire.muc.MultiUserChatService;
import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.jivesoftware.util.Log;
import org.xmpp.packet.IQ;
......@@ -23,9 +26,10 @@ public abstract class AbstractXepSupport {
protected final Map<String, IQHandler> element2Handlers;
protected final IQHandler iqDispatcher;
protected final String namespace;
protected boolean muc = false;
protected Collection<IQHandler> iqHandlers;
public AbstractXepSupport(XMPPServer server, String namespace,String iqDispatcherNamespace, String iqDispatcherName) {
public AbstractXepSupport(XMPPServer server, String namespace,String iqDispatcherNamespace, String iqDispatcherName, boolean muc) {
this.server = server;
this.element2Handlers = Collections
......@@ -49,7 +53,7 @@ public abstract class AbstractXepSupport {
};
this.namespace = namespace;
this.iqHandlers = Collections.emptyList();
this.muc = muc;
}
public void start() {
......@@ -70,6 +74,12 @@ public abstract class AbstractXepSupport {
server.getIQDiscoInfoHandler().addServerFeature(i.next());
}
}
if (muc) {
MultiUserChatManager manager = server.getMultiUserChatManager();
for (MultiUserChatService mucService : manager.getMultiUserChatServices()) {
mucService.addIQHandler(iqHandler);
}
}
}
server.getIQDiscoInfoHandler().addServerFeature(namespace);
server.getIQRouter().addHandler(iqDispatcher);
......@@ -96,6 +106,12 @@ public abstract class AbstractXepSupport {
}
}
}
if (muc) {
MultiUserChatManager manager = server.getMultiUserChatManager();
for (MultiUserChatService mucService : manager.getMultiUserChatServices()) {
mucService.removeIQHandler(iqHandler);
}
}
}
if (iqRouter != null) {
iqRouter.removeHandler(iqDispatcher);
......
......@@ -49,7 +49,7 @@ public class IQListHandler extends AbstractIQHandler implements
}
private Collection<Conversation> list(JID from, ListRequest request) {
return getPersistenceManager().findConversations(request.getStart(),
return getPersistenceManager(from).findConversations(request.getStart(),
request.getEnd(), from.toBareJID(), request.getWith(),
request.getResultSet());
}
......
......@@ -97,7 +97,7 @@ public class IQRetrieveHandler extends AbstractIQHandler {
}
private Conversation retrieve(JID from, RetrieveRequest request) {
return getPersistenceManager().getConversation(from.toBareJID(),
return getPersistenceManager(from).getConversation(from.toBareJID(),
request.getWith(), request.getStart());
}
......
......@@ -17,7 +17,7 @@ public class Xep0136Support extends AbstractXepSupport {
private static final String IQ_NAMESPACE = "urn:xmpp:archive";
public Xep0136Support(XMPPServer server) {
super(server, NAMESPACE_AUTO,IQ_NAMESPACE, "XEP-0136 IQ Dispatcher");
super(server, NAMESPACE_AUTO,IQ_NAMESPACE, "XEP-0136 IQ Dispatcher", false);
iqHandlers = new ArrayList<IQHandler>();
......
......@@ -12,6 +12,9 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.forward.Forwarded;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.muc.MUCRole;
import org.jivesoftware.openfire.muc.MUCRoom;
import org.jivesoftware.openfire.muc.MultiUserChatService;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.slf4j.Logger;
......@@ -68,14 +71,45 @@ abstract class IQQueryHandler extends AbstractIQHandler implements
boolean muc = false;
if (!XMPPServer.getInstance().isLocal(archiveJid)) {
Log.debug("Archive is not local (user)");
if (XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(archiveJid) == null) {
Log.debug("No chat service for this domain");
return buildErrorResponse(packet);
} else {
muc = true;
Log.debug("MUC");
}
}
JID requestor = packet.getFrom().asBareJID();
Log.debug("Requestor is {} for muc=={}", requestor, muc);
// Auth checking.
if(!archiveJid.equals(requestor)) { // Not user's own
if(muc) {
MultiUserChatService service = XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(archiveJid);
MUCRoom room = service.getChatRoom(archiveJid.getNode());
if (room == null) {
return buildErrorResponse(packet);
}
boolean pass = false;
if (service.isSysadmin(requestor)) {
pass = true;
}
MUCRole.Affiliation aff = room.getAffiliation(requestor);
if (aff != MUCRole.Affiliation.outcast) {
if (aff == MUCRole.Affiliation.owner || aff == MUCRole.Affiliation.admin) {
pass = true;
} else if (room.isMembersOnly()) {
if (aff == MUCRole.Affiliation.member) {
pass = true;
}
} else {
pass = true;
}
}
if (!pass) {
return buildForbiddenResponse(packet);
}
} else if(!archiveJid.equals(requestor)) { // Not user's own
// ... disallow unless admin.
if (!XMPPServer.getInstance().getAdmins().contains(requestor)) {
return buildForbiddenResponse(packet);
......@@ -162,7 +196,7 @@ abstract class IQQueryHandler extends AbstractIQHandler implements
Log.error("Error parsing query date filters.", e);
}
return getPersistenceManager().findMessages(
return getPersistenceManager(queryRequest.getArchive()).findMessages(
startDate,
endDate,
queryRequest.getArchive().toBareJID(),
......
......@@ -16,7 +16,7 @@ public class Xep0313Support extends AbstractXepSupport {
private static final String NAMESPACE = "urn:xmpp:mam:0";
public Xep0313Support(XMPPServer server) {
super(server, NAMESPACE,NAMESPACE, "XEP-0313 IQ Dispatcher");
super(server, NAMESPACE,NAMESPACE, "XEP-0313 IQ Dispatcher", true);
this.iqHandlers = new ArrayList<>();
iqHandlers.add(new IQQueryHandler0());
......
......@@ -15,7 +15,7 @@ public class Xep0313Support1 extends AbstractXepSupport {
private static final String NAMESPACE = "urn:xmpp:mam:1";
public Xep0313Support1(XMPPServer server) {
super(server, NAMESPACE,NAMESPACE, "XEP-0313 IQ Dispatcher");
super(server, NAMESPACE,NAMESPACE, "XEP-0313 IQ Dispatcher", true);
this.iqHandlers = new ArrayList<>();
iqHandlers.add(new IQQueryHandler1());
......
......@@ -1017,13 +1017,14 @@ public class ConversationManager implements Startable, ComponentEventListener{
} else {
pstmt.execute();
}
count++;
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
if (count >= 500 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
count = 0;
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
if (count > 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
......@@ -1039,13 +1040,14 @@ public class ConversationManager implements Startable, ComponentEventListener{
} else {
pstmt.execute();
}
count++;
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
if (count >= 500 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
count = 0;
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
if (count > 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
......@@ -1063,13 +1065,14 @@ public class ConversationManager implements Startable, ComponentEventListener{
} else {
pstmt.execute();
}
count++;
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
if (count >= 500 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
count = 0;
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
if (count > 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
} catch (Exception e) {
......
......@@ -22,6 +22,7 @@ package org.jivesoftware.openfire.plugin;
import java.io.File;
import java.io.FileFilter;
import com.reucon.openfire.plugin.archive.impl.MucMamPersistenceManager;
import com.reucon.openfire.plugin.archive.xep0313.Xep0313Support1;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.archive.ArchiveIndexer;
......@@ -52,6 +53,9 @@ import com.reucon.openfire.plugin.archive.impl.ArchiveManagerImpl;
import com.reucon.openfire.plugin.archive.impl.JdbcPersistenceManager;
import com.reucon.openfire.plugin.archive.xep0136.Xep0136Support;
import com.reucon.openfire.plugin.archive.xep0313.Xep0313Support;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
/**
* Openfire Monitoring plugin.
......@@ -70,11 +74,13 @@ public class MonitoringPlugin implements Plugin {
private static MonitoringPlugin instance;
private boolean enabled = true;
private PersistenceManager persistenceManager;
private PersistenceManager mucPersistenceManager;
private ArchiveManager archiveManager;
private IndexManager indexManager;
private Xep0136Support xep0136Support;
private Xep0313Support xep0313Support;
private Xep0313Support1 xep0313Support1;
private Logger Log;
public MonitoringPlugin() {
instance = this;
......@@ -129,7 +135,12 @@ public class MonitoringPlugin implements Plugin {
return indexManager;
}
public PersistenceManager getPersistenceManager() {
public PersistenceManager getPersistenceManager(JID jid) {
Log.debug("Getting PersistenceManager for {}", jid);
if (XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(jid) != null) {
Log.debug("Using MucPersistenceManager");
return mucPersistenceManager;
}
return persistenceManager;
}
......@@ -145,6 +156,7 @@ public class MonitoringPlugin implements Plugin {
}
public void initializePlugin(PluginManager manager, File pluginDirectory) {
Log = LoggerFactory.getLogger(MonitoringPlugin.class);
/* Configuration */
conversationTimeout = JiveGlobals.getIntProperty(
......@@ -154,6 +166,7 @@ public class MonitoringPlugin implements Plugin {
false);
persistenceManager = new JdbcPersistenceManager();
mucPersistenceManager = new MucMamPersistenceManager();
archiveManager = new ArchiveManagerImpl(persistenceManager,
indexManager, conversationTimeout);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment