Commit dedb78d1 authored by Dave Cridland's avatar Dave Cridland

Merge pull request #213 from surevine/xep-0313

OF-862 Add support for XEP-313: Message Archive Management
parents 1c25ac16 7ec33c22
......@@ -9,7 +9,7 @@
<date>10/28/2014</date>
<minServerVersion>3.9.0</minServerVersion>
<databaseKey>monitoring</databaseKey>
<databaseVersion>2</databaseVersion>
<databaseVersion>3</databaseVersion>
<adminconsole>
<tab id="tab-server">
......
-- $Revision$
-- $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID INTEGER NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX entConPar_con_idx ON ofConParticipant (conversationID, bareJID, jid
CREATE INDEX entConPar_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID INTEGER NOT NULL,
fromJID VARCHAR(1024) NOT NULL,
fromJIDResource VARCHAR(255) NULL,
toJID VARCHAR(1024) NOT NULL,
toJIDResource VARCHAR(255) NULL,
sentDate BIGINT NOT NULL,
stanza LONG VARCHAR NULL,
body LONG VARCHAR
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
// $Revision$
// $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID BIGINT NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX ofConParticipant_conv_idx ON ofConParticipant (conversationID, bare
CREATE INDEX ofConParticipant_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID BIGINT NOT NULL,
fromJID VARCHAR(1024) NOT NULL,
fromJIDResource VARCHAR(255) NULL,
toJID VARCHAR(1024) NOT NULL,
toJIDResource VARCHAR(255) NULL,
sentDate BIGINT NOT NULL,
stanza LONGVARCHAR NULL,
body LONGVARCHAR
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
# $Revision$
# $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID BIGINT NOT NULL,
......@@ -28,12 +28,14 @@ CREATE TABLE ofConParticipant (
);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID BIGINT NOT NULL,
fromJID VARCHAR(255) NOT NULL,
fromJIDResource VARCHAR(100) NULL,
toJID VARCHAR(255) NOT NULL,
toJIDResource VARCHAR(100) NULL,
sentDate BIGINT NOT NULL,
stanza TEXT NULL,
body TEXT,
INDEX ofMessageArchive_con_idx (conversationID)
);
......
-- $Revision$
-- $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID INTEGER NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX ofConParticipant_conv_idx ON ofConParticipant (conversationID, bare
CREATE INDEX ofConParticipant_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID INTEGER NULL,
conversationID INTEGER NOT NULL,
fromJID VARCHAR2(1024) NOT NULL,
fromJIDResource VARCHAR2(255) NULL,
toJID VARCHAR2(1024) NOT NULL,
toJIDResource VARCHAR2(255) NULL,
sentDate INTEGER NOT NULL,
stanza LONG NULL,
body LONG
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
-- $Revision$
-- $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID INTEGER NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX ofConParticipant_conv_idx ON ofConParticipant (conversationID, bare
CREATE INDEX ofConParticipant_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID INTEGER NOT NULL,
fromJID VARCHAR(1024) NOT NULL,
fromJIDResource VARCHAR(1024) NULL,
toJID VARCHAR(1024) NOT NULL,
toJIDResource VARCHAR(1024) NULL,
sentDate BIGINT NOT NULL,
stanza TEXT NULL,
body TEXT
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
/* $Revision$ */
/* $Date$ */
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID BIGINT NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX ofConParticipant_conv_idx ON ofConParticipant (conversationID, bare
CREATE INDEX ofConParticipant_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID BIGINT NOT NULL,
fromJID NVARCHAR(1024) NOT NULL,
fromJIDResource NVARCHAR(1024) NULL,
toJID NVARCHAR(1024) NOT NULL,
toJIDResource NVARCHAR(1024) NULL,
sentDate BIGINT NOT NULL,
stanza NVARCHAR(MAX) NULL,
body NVARCHAR(MAX)
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD COLUMN messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD COLUMN stanza LONG VARCHAR NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD COLUMN messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD COLUMN stanza LONGVARCHAR NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD COLUMN messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD COLUMN stanza TEXT NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD messageID INTEGER NULL;
ALTER TABLE ofMessageArchive ADD stanza LONG NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
commit;
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD COLUMN messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD COLUMN stanza TEXT NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD stanza NVARCHAR(MAX) NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
......@@ -65,10 +65,22 @@ public interface PersistenceManager
* @param owner bare jid of the owner of the conversation to find or <code>null</code> for any.
* @param with bare jid of the communication partner or <code>null</code> for any. This is either
* the jid of another XMPP user or the jid of a group chat.
* @return the conversations that matched search critera without messages and participants.
* @return the conversations that matched search criteria without messages and participants.
*/
Collection<Conversation> findConversations(Date startDate, Date endDate, String owner, String with, XmppResultSet xmppResultSet);
/**
* Searches for messages.
*
* @param startDate earliest start date of the message to find or <code>null</code> for any.
* @param endDate latest end date of the message to find or <code>null</code> for any.
* @param owner bare jid of the owner of the message to find or <code>null</code> for any.
* @param with bare jid of the communication partner or <code>null</code> for any. This is either
* the jid of another XMPP user or the jid of a group chat.
* @return the messages that matched search criteria.
*/
Collection<ArchivedMessage> findMessages(Date startDate, Date endDate, String owner, String with, XmppResultSet xmppResultSet);
Collection<Conversation> getActiveConversations(int conversationTimeout);
List<Conversation> getConversations(Collection<Long> conversationIds);
......
......@@ -30,6 +30,7 @@ public class ArchivedMessage {
private String body;
private Conversation conversation;
private JID withJid;
private String stanza;
public ArchivedMessage(Date time, Direction direction, String type, JID withJid) {
this.time = time;
......@@ -74,6 +75,14 @@ public class ArchivedMessage {
this.body = body;
}
public String getStanza() {
return stanza;
}
public void setStanza(String stanza) {
this.stanza = stanza;
}
public Conversation getConversation() {
return conversation;
}
......
package com.reucon.openfire.plugin.archive.xep0136;
package com.reucon.openfire.plugin.archive.xep;
import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.handler.IQHandler;
......@@ -11,15 +11,15 @@ import com.reucon.openfire.plugin.archive.IndexManager;
import com.reucon.openfire.plugin.archive.PersistenceManager;
/**
* Abstract base class for XEP-0136 IQ Handlers.
* Abstract base class for XEP-specific IQ Handlers.
*/
public abstract class AbstractIQHandler extends IQHandler {
protected static final String NAMESPACE = "urn:xmpp:archive";
private final IQHandlerInfo info;
protected AbstractIQHandler(String moduleName, String elementName) {
protected AbstractIQHandler(String moduleName, String elementName, String namespace) {
super(moduleName);
this.info = new IQHandlerInfo(elementName, NAMESPACE);
this.info = new IQHandlerInfo(elementName, namespace);
}
public final IQHandlerInfo getInfo() {
......
package com.reucon.openfire.plugin.archive.xep;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.jivesoftware.openfire.IQRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.IQDiscoInfoHandler;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.jivesoftware.util.Log;
import org.xmpp.packet.IQ;
import org.xmpp.packet.PacketError;
public abstract class AbstractXepSupport {
protected final XMPPServer server;
protected final Map<String, IQHandler> element2Handlers;
protected final IQHandler iqDispatcher;
protected final String namespace;
protected Collection<IQHandler> iqHandlers;
public AbstractXepSupport(XMPPServer server, String namespace, String iqDispatcherName) {
this.server = server;
this.element2Handlers = Collections
.synchronizedMap(new HashMap<String, IQHandler>());
this.iqDispatcher = new AbstractIQHandler(iqDispatcherName, null, namespace) {
public IQ handleIQ(IQ packet) throws UnauthorizedException {
if (!MonitoringPlugin.getInstance().isEnabled()) {
return error(packet,
PacketError.Condition.feature_not_implemented);
}
final IQHandler iqHandler = element2Handlers.get(packet
.getChildElement().getName());
if (iqHandler != null) {
return iqHandler.handleIQ(packet);
} else {
return error(packet,
PacketError.Condition.feature_not_implemented);
}
}
};
this.namespace = namespace;
this.iqHandlers = Collections.emptyList();
}
public void start() {
for (IQHandler iqHandler : iqHandlers) {
try {
iqHandler.initialize(server);
iqHandler.start();
} catch (Exception e) {
Log.error("Unable to initialize and start "
+ iqHandler.getClass());
continue;
}
element2Handlers.put(iqHandler.getInfo().getName(), iqHandler);
if (iqHandler instanceof ServerFeaturesProvider) {
for (Iterator<String> i = ((ServerFeaturesProvider) iqHandler)
.getFeatures(); i.hasNext();) {
server.getIQDiscoInfoHandler().addServerFeature(i.next());
}
}
}
server.getIQDiscoInfoHandler().addServerFeature(namespace);
server.getIQRouter().addHandler(iqDispatcher);
}
public void stop() {
IQRouter iqRouter = server.getIQRouter();
IQDiscoInfoHandler iqDiscoInfoHandler = server.getIQDiscoInfoHandler();
for (IQHandler iqHandler : iqHandlers) {
element2Handlers.remove(iqHandler.getInfo().getName());
try {
iqHandler.stop();
iqHandler.destroy();
} catch (Exception e) {
Log.warn("Unable to stop and destroy " + iqHandler.getClass());
}
if (iqHandler instanceof ServerFeaturesProvider) {
for (Iterator<String> i = ((ServerFeaturesProvider) iqHandler)
.getFeatures(); i.hasNext();) {
if (iqDiscoInfoHandler != null) {
iqDiscoInfoHandler.removeServerFeature(i.next());
}
}
}
}
if (iqRouter != null) {
iqRouter.removeHandler(iqDispatcher);
}
}
}
......@@ -17,6 +17,7 @@ public class XmppResultSet
private Integer firstIndex;
private Long last;
private Integer count;
private boolean complete;
public XmppResultSet(Element setElement)
{
......@@ -118,6 +119,24 @@ public class XmppResultSet
return max;
}
/**
* Returns the total size of the result set.
*
* @return the total size of the result set.
*/
public Integer getCount() {
return count;
}
/**
* Returns whether the result set is complete (last page of results).
*
* @return whether the result set is complete.
*/
public boolean isComplete() {
return complete;
}
/**
* Sets the id of the first element returned.
*
......@@ -158,6 +177,15 @@ public class XmppResultSet
this.count = count;
}
/**
* Sets whether the result set is complete (used by last page of results)
*
* @param complete
*/
public void setComplete(boolean complete) {
this.complete = complete;
}
public Element createResultElement()
{
final Element set;
......
......@@ -12,6 +12,7 @@ import org.xmpp.packet.JID;
import com.reucon.openfire.plugin.archive.model.Conversation;
import com.reucon.openfire.plugin.archive.util.XmppDateUtil;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
/**
......@@ -19,10 +20,12 @@ import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
*/
public class IQListHandler extends AbstractIQHandler implements
ServerFeaturesProvider {
private static final String NAMESPACE = "urn:xmpp:archive";
private static final String NAMESPACE_MANAGE = "urn:xmpp:archive:manage";
public IQListHandler() {
super("Message Archiving List Handler", "list");
super("Message Archiving List Handler", "list", NAMESPACE);
}
public IQ handleIQ(IQ packet) throws UnauthorizedException {
......
......@@ -5,6 +5,8 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.xmpp.packet.IQ;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
......@@ -14,11 +16,13 @@ import java.util.List;
*/
public class IQPrefHandler extends AbstractIQHandler implements ServerFeaturesProvider
{
private static final String NAMESPACE = "urn:xmpp:archive";
private static final String NAMESPACE_PREF = "urn:xmpp:archive:pref";
public IQPrefHandler()
{
super("Message Archiving Preferences Handler", "pref");
super("Message Archiving Preferences Handler", "pref", NAMESPACE);
}
@SuppressWarnings("unchecked")
......
......@@ -3,6 +3,7 @@ package com.reucon.openfire.plugin.archive.xep0136;
import com.reucon.openfire.plugin.archive.model.ArchivedMessage;
import com.reucon.openfire.plugin.archive.model.Conversation;
import com.reucon.openfire.plugin.archive.util.XmppDateUtil;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
import org.dom4j.Element;
import org.jivesoftware.openfire.auth.UnauthorizedException;
......@@ -16,8 +17,11 @@ import java.util.List;
* Message Archiving Retrieve Handler.
*/
public class IQRetrieveHandler extends AbstractIQHandler {
private static final String NAMESPACE = "urn:xmpp:archive";
public IQRetrieveHandler() {
super("Message Archiving Retrieve Handler", "retrieve");
super("Message Archiving Retrieve Handler", "retrieve", NAMESPACE);
}
public IQ handleIQ(IQ packet) throws UnauthorizedException {
......
package com.reucon.openfire.plugin.archive.xep0136;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.jivesoftware.openfire.IQRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.IQDiscoInfoHandler;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.jivesoftware.util.Log;
import org.xmpp.packet.IQ;
import org.xmpp.packet.PacketError;
import com.reucon.openfire.plugin.archive.xep.AbstractXepSupport;
/**
* Encapsulates support for <a
* href="http://www.xmpp.org/extensions/xep-0136.html">XEP-0136</a>.
*/
public class Xep0136Support {
private static final String NAMESPACE_AUTO = "urn:xmpp:archive:auto";
public class Xep0136Support extends AbstractXepSupport {
final XMPPServer server;
final Map<String, IQHandler> element2Handlers;
final IQHandler iqDispatcher;
final Collection<IQHandler> iqHandlers;
private static final String NAMESPACE = "urn:xmpp:archive:auto";
public Xep0136Support(XMPPServer server) {
this.server = server;
this.element2Handlers = Collections
.synchronizedMap(new HashMap<String, IQHandler>());
this.iqDispatcher = new AbstractIQHandler("XEP-0136 IQ Dispatcher",
null) {
public IQ handleIQ(IQ packet) throws UnauthorizedException {
if (!MonitoringPlugin.getInstance().isEnabled()) {
return error(packet,
PacketError.Condition.feature_not_implemented);
}
final IQHandler iqHandler = element2Handlers.get(packet
.getChildElement().getName());
if (iqHandler != null) {
return iqHandler.handleIQ(packet);
} else {
return error(packet,
PacketError.Condition.feature_not_implemented);
}
}
};
super(server, NAMESPACE, "XEP-0136 IQ Dispatcher");
iqHandlers = new ArrayList<IQHandler>();
......@@ -64,53 +29,4 @@ public class Xep0136Support {
// iqHandlers.add(new IQRemoveHandler());
}
public void start() {
for (IQHandler iqHandler : iqHandlers) {
try {
iqHandler.initialize(server);
iqHandler.start();
} catch (Exception e) {
Log.error("Unable to initialize and start "
+ iqHandler.getClass());
continue;
}
element2Handlers.put(iqHandler.getInfo().getName(), iqHandler);
if (iqHandler instanceof ServerFeaturesProvider) {
for (Iterator<String> i = ((ServerFeaturesProvider) iqHandler)
.getFeatures(); i.hasNext();) {
server.getIQDiscoInfoHandler().addServerFeature(i.next());
}
}
}
server.getIQDiscoInfoHandler().addServerFeature(NAMESPACE_AUTO);
server.getIQRouter().addHandler(iqDispatcher);
}
public void stop() {
IQRouter iqRouter = server.getIQRouter();
IQDiscoInfoHandler iqDiscoInfoHandler = server.getIQDiscoInfoHandler();
for (IQHandler iqHandler : iqHandlers) {
element2Handlers.remove(iqHandler.getInfo().getName());
try {
iqHandler.stop();
iqHandler.destroy();
} catch (Exception e) {
Log.warn("Unable to stop and destroy " + iqHandler.getClass());
}
if (iqHandler instanceof ServerFeaturesProvider) {
for (Iterator<String> i = ((ServerFeaturesProvider) iqHandler)
.getFeatures(); i.hasNext();) {
if (iqDiscoInfoHandler != null) {
iqDiscoInfoHandler.removeServerFeature(i.next());
}
}
}
}
if (iqRouter != null) {
iqRouter.removeHandler(iqDispatcher);
}
}
}
package com.reucon.openfire.plugin.archive.xep0313;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.TimeZone;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import com.reucon.openfire.plugin.archive.model.ArchivedMessage;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
/**
* XEP-0313 IQ Query Handler
*/
public class IQQueryHandler extends AbstractIQHandler implements
ServerFeaturesProvider {
private static final Logger Log = LoggerFactory.getLogger(IQHandler.class);
private static final String NAMESPACE = "urn:xmpp:mam:0";
private static final String MODULE_NAME = "Message Archive Management Query Handler";
XMPPDateTimeFormat xmppDateTimeFormat = new XMPPDateTimeFormat();
protected IQQueryHandler() {
super(MODULE_NAME, "query", NAMESPACE);
}
public IQ handleIQ(IQ packet) throws UnauthorizedException {
LocalClientSession session = (LocalClientSession) sessionManager.getSession(packet.getFrom());
// If no session was found then answer with an error (if possible)
if (session == null) {
Log.error("Error during resource binding. Session not found in " +
sessionManager.getPreAuthenticatedKeys() +
" for key " +
packet.getFrom());
return buildErrorResponse(packet);
}
if(packet.getType().equals(IQ.Type.get)) {
sendSupportedFieldsResult(packet, session);
return null;
}
// Default to user's own archive
JID archiveJid = packet.getFrom();
if(packet.getElement().attribute("to") != null) {
archiveJid = new JID(packet.getElement().attribute("to").getStringValue());
// Only allow queries to users own archives
if(!archiveJid.toBareJID().equals(packet.getFrom().toBareJID())) {
return buildForbiddenResponse(packet);
}
}
sendAcknowledgementResult(packet, session);
final QueryRequest queryRequest = new QueryRequest(packet.getChildElement(), archiveJid);
Collection<ArchivedMessage> archivedMessages = retrieveMessages(queryRequest);
for(ArchivedMessage archivedMessage : archivedMessages) {
sendMessageResult(session, queryRequest, archivedMessage);
}
sendFinalMessage(session, queryRequest);
return null;
}
/**
* Create error response to send to client
* @param packet
* @return
*/
private IQ buildErrorResponse(IQ packet) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(PacketError.Condition.internal_server_error);
return reply;
}
/**
* Create error response due to forbidden request
* @param packet Received request
* @return
*/
private IQ buildForbiddenResponse(IQ packet) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(PacketError.Condition.forbidden);
return reply;
}
/**
* Retrieve messages matching query request from server archive
* @param queryRequest
* @return
*/
private Collection<ArchivedMessage> retrieveMessages(QueryRequest queryRequest) {
String withField = null;
String startField = null;
String endField = null;
DataForm dataForm = queryRequest.getDataForm();
if(dataForm != null) {
if(dataForm.getField("with") != null) {
withField = dataForm.getField("with").getFirstValue();
}
if(dataForm.getField("start") != null) {
startField = dataForm.getField("start").getFirstValue();
}
if(dataForm.getField("end") != null) {
endField = dataForm.getField("end").getFirstValue();
}
}
Date startDate = null;
Date endDate = null;
try {
if(startField != null) {
startDate = xmppDateTimeFormat.parseString(startField);
}
if(endField != null) {
endDate = xmppDateTimeFormat.parseString(endField);
}
} catch (ParseException e) {
Log.error("Error parsing query date filters.", e);
}
return getPersistenceManager().findMessages(
startDate,
endDate,
queryRequest.getArchive().toBareJID(),
withField,
queryRequest.getResultSet());
}
/**
* Send result packet to client acknowledging query.
* @param packet Received query packet
* @param session Client session to respond to
*/
private void sendAcknowledgementResult(IQ packet, LocalClientSession session) {
IQ result = IQ.createResultIQ(packet);
session.process(result);
}
/**
* Send final message back to client following query.
* @param session Client session to respond to
* @param queryRequest Received query request
*/
private void sendFinalMessage(LocalClientSession session,
final QueryRequest queryRequest) {
Message finalMessage = new Message();
Element fin = finalMessage.addChildElement("fin", NAMESPACE);
if(queryRequest.getQueryid() != null) {
fin.addAttribute("queryid", queryRequest.getQueryid());
}
XmppResultSet resultSet = queryRequest.getResultSet();
if (resultSet != null) {
fin.add(resultSet.createResultElement());
if(resultSet.isComplete()) {
fin.addAttribute("complete", "true");
}
}
session.process(finalMessage);
}
/**
* Send archived message to requesting client
* @param session Client session that send message to
* @param queryRequest Query request made by client
* @param archivedMessage Message to send to client
* @return
*/
private void sendMessageResult(LocalClientSession session,
QueryRequest queryRequest, ArchivedMessage archivedMessage) {
if(archivedMessage.getStanza() == null) {
// Don't send legacy archived messages (that have no stanza)
return;
}
Message messagePacket = new Message();
messagePacket.setTo(session.getAddress());
Element result = messagePacket.addChildElement("result", NAMESPACE);
result.addAttribute("id", archivedMessage.getId().toString());
if(queryRequest.getQueryid() != null) {
result.addAttribute("queryid", queryRequest.getQueryid());
}
Element forwarded = result.addElement("forwarded", "urn:xmpp:forward:0");
Element delay = forwarded.addElement("delay", "urn:xmpp:delay");
delay.addAttribute("stamp", XMPPDateTimeFormat.format(archivedMessage.getTime()));
Document stanza;
try {
stanza = DocumentHelper.parseText(archivedMessage.getStanza());
forwarded.add(stanza.getRootElement());
} catch (DocumentException e) {
Log.error("Failed to parse message stanza.", e);
// If we can't parse stanza then we have no message to send to client, abort
return;
}
session.process(messagePacket);
}
/**
* Declare DataForm fields supported by the MAM implementation on this server
* @param packet Incoming query (form field request) packet
* @param session Session with client
*/
private void sendSupportedFieldsResult(IQ packet, LocalClientSession session) {
IQ result = IQ.createResultIQ(packet);
Element query = result.setChildElement("query", NAMESPACE);
DataForm form = new DataForm(DataForm.Type.form);
form.addField("FORM_TYPE", null, FormField.Type.hidden);
form.getField("FORM_TYPE").addValue(NAMESPACE);
form.addField("with", null, FormField.Type.jid_single);
form.addField("start", null, FormField.Type.text_single);
form.addField("end", null, FormField.Type.text_single);
query.add(form.getElement());
session.process(result);
}
@Override
public Iterator<String> getFeatures() {
return Collections.singleton(NAMESPACE).iterator();
}
}
package com.reucon.openfire.plugin.archive.xep0313;
import org.dom4j.Element;
import org.dom4j.QName;
import org.xmpp.forms.DataForm;
import org.xmpp.packet.JID;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
/**
* A request to query an archive
*/
public class QueryRequest {
private String queryid;
private DataForm dataForm;
private XmppResultSet resultSet;
private JID archive;
public QueryRequest(Element queryElement, JID archive) {
this.archive = archive;
if (queryElement.attribute("queryid") != null)
{
this.queryid = queryElement.attributeValue("queryid");
}
Element xElement = queryElement.element(QName.get("x", DataForm.NAMESPACE));
if(xElement != null) {
this.dataForm = new DataForm(xElement);
}
Element setElement = queryElement.element(QName.get("set", XmppResultSet.NAMESPACE));
if (setElement != null)
{
resultSet = new XmppResultSet(setElement);
}
}
public String getQueryid() {
return queryid;
}
public DataForm getDataForm() {
return dataForm;
}
public XmppResultSet getResultSet() {
return resultSet;
}
public JID getArchive() {
return archive;
}
}
package com.reucon.openfire.plugin.archive.xep0313;
import java.util.ArrayList;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.handler.IQHandler;
import com.reucon.openfire.plugin.archive.xep.AbstractXepSupport;
/**
* Encapsulates support for <a
* href="http://www.xmpp.org/extensions/xep-0313.html">XEP-0313</a>.
*/
public class Xep0313Support extends AbstractXepSupport {
private static final String NAMESPACE = "urn:xmpp:mam:0";
public Xep0313Support(XMPPServer server) {
super(server, NAMESPACE, "XEP-0313 IQ Dispatcher");
this.iqHandlers = new ArrayList<IQHandler>();
iqHandlers.add(new IQQueryHandler());
}
}
......@@ -72,7 +72,7 @@ public class ArchiveInterceptor implements PacketInterceptor, Startable {
if (conversationManager.isConversation(message)) {
// Process this event in the senior cluster member or local JVM when not in a cluster
if (ClusterManager.isSeniorClusterMember()) {
conversationManager.processMessage(message.getFrom(), message.getTo(), message.getBody(), new Date());
conversationManager.processMessage(message.getFrom(), message.getTo(), message.getBody(), message.toXML(), new Date());
}
else {
JID sender = message.getFrom();
......
......@@ -35,6 +35,7 @@ public class ArchivedMessage {
private JID toJID;
private Date sentDate;
private String body;
private String stanza;
private boolean roomEvent;
/**
......@@ -57,6 +58,11 @@ public class ArchivedMessage {
this.roomEvent = roomEvent;
}
public ArchivedMessage(long conversationID, JID fromJID, JID toJID, Date sentDate, String body, String stanza, boolean roomEvent) {
this(conversationID, fromJID, toJID, sentDate, body, roomEvent);
this.stanza = stanza;
}
/**
* The conversation ID that the message is associated with.
*
......@@ -102,6 +108,15 @@ public class ArchivedMessage {
return body;
}
/**
* String encoded message stanza.
*
* @return string encoded message stanza.
*/
public String getStanza() {
return stanza;
}
/**
* Returns true if the message belongs to a room event. Examples of room events are:
* user joined the room or user left the room.
......
......@@ -56,7 +56,7 @@ public class ConversationEvent implements Externalizable {
public void run(ConversationManager conversationManager) {
if (Type.chatMessageReceived == type) {
conversationManager.processMessage(sender, receiver, body, date);
conversationManager.processMessage(sender, receiver, body, "", date);
}
else if (Type.roomDestroyed == type) {
conversationManager.roomConversationEnded(roomJID, date);
......
......@@ -85,8 +85,8 @@ public class ConversationManager implements Startable, ComponentEventListener {
private static final String UPDATE_CONVERSATION = "UPDATE ofConversation SET lastActivity=?, messageCount=? WHERE conversationID=?";
private static final String UPDATE_PARTICIPANT = "UPDATE ofConParticipant SET leftDate=? WHERE conversationID=? AND bareJID=? AND jidResource=? AND joinedDate=?";
private static final String INSERT_MESSAGE = "INSERT INTO ofMessageArchive(conversationID, fromJID, fromJIDResource, toJID, toJIDResource, sentDate, body) "
+ "VALUES (?,?,?,?,?,?,?)";
private static final String INSERT_MESSAGE = "INSERT INTO ofMessageArchive(messageID, conversationID, fromJID, fromJIDResource, toJID, toJIDResource, sentDate, body, stanza) "
+ "VALUES ((SELECT COUNT(*) FROM ofMessageArchive),?,?,?,?,?,?,?,?)";
private static final String CONVERSATION_COUNT = "SELECT COUNT(*) FROM ofConversation";
private static final String MESSAGE_COUNT = "SELECT COUNT(*) FROM ofMessageArchive";
private static final String DELETE_CONVERSATION_1 = "DELETE FROM ofMessageArchive WHERE conversationID=?";
......@@ -649,10 +649,12 @@ public class ConversationManager implements Startable, ComponentEventListener {
* receiver of the message.
* @param body
* body of the message.
* @param stanza
* String encoded message stanza
* @param date
* date when the message was sent.
*/
void processMessage(JID sender, JID receiver, String body, Date date) {
void processMessage(JID sender, JID receiver, String body, String stanza, Date date) {
String conversationKey = getConversationKey(sender, receiver);
synchronized (conversationKey.intern()) {
Conversation conversation = conversations.get(conversationKey);
......@@ -706,7 +708,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
if (messageArchivingEnabled) {
if (body != null) {
/* OF-677 - Workaround to prevent null messages being archived */
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, receiver, date, body, false));
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, receiver, date, body, stanza, false));
}
}
// Notify listeners of the conversation update.
......@@ -768,7 +770,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
JID jid = new JID(roomJID + "/" + nickname);
if (body != null) {
/* OF-677 - Workaround to prevent null messages being archived */
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, jid, date, body, false));
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, jid, date, body, "", false));
}
}
// Notify listeners of the conversation update.
......@@ -987,6 +989,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
pstmt.setString(5, message.getToJID().getResource());
pstmt.setLong(6, message.getSentDate().getTime());
DbConnectionManager.setLargeTextField(pstmt, 7, message.getBody());
DbConnectionManager.setLargeTextField(pstmt, 8, message.getStanza());
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.addBatch();
} else {
......
......@@ -128,7 +128,7 @@ public class GroupConversationInterceptor implements MUCEventListener, Startable
public void privateMessageRecieved(JID toJID, JID fromJID, Message message) {
if(message.getBody() != null) {
if (ClusterManager.isSeniorClusterMember()) {
conversationManager.processMessage(fromJID, toJID, message.getBody(), new Date());
conversationManager.processMessage(fromJID, toJID, message.getBody(), message.toXML(), new Date());
}
else {
ConversationEventsQueue eventsQueue = conversationManager.getConversationEventsQueue();
......
......@@ -50,6 +50,7 @@ import com.reucon.openfire.plugin.archive.PersistenceManager;
import com.reucon.openfire.plugin.archive.impl.ArchiveManagerImpl;
import com.reucon.openfire.plugin.archive.impl.JdbcPersistenceManager;
import com.reucon.openfire.plugin.archive.xep0136.Xep0136Support;
import com.reucon.openfire.plugin.archive.xep0313.Xep0313Support;
/**
* Openfire Monitoring plugin.
......@@ -71,6 +72,7 @@ public class MonitoringPlugin implements Plugin {
private ArchiveManager archiveManager;
private IndexManager indexManager;
private Xep0136Support xep0136Support;
private Xep0313Support xep0313Support;
public MonitoringPlugin() {
instance = this;
......@@ -157,6 +159,9 @@ public class MonitoringPlugin implements Plugin {
xep0136Support = new Xep0136Support(XMPPServer.getInstance());
xep0136Support.start();
xep0313Support = new Xep0313Support(XMPPServer.getInstance());
xep0313Support.start();
System.out.println("Starting Monitoring Plugin");
// Check if we Enterprise is installed and stop loading this plugin if
......@@ -189,6 +194,9 @@ public class MonitoringPlugin implements Plugin {
xep0136Support = new Xep0136Support(XMPPServer.getInstance());
xep0136Support.start();
xep0313Support = new Xep0313Support(XMPPServer.getInstance());
xep0313Support.start();
}
public void destroyPlugin() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment