Commit 4dfa5f49 authored by JonnyHeavey's avatar JonnyHeavey

xep-0313 implementation

parent ffc7e0bb
......@@ -9,8 +9,8 @@
<date>10/28/2014</date>
<minServerVersion>3.9.0</minServerVersion>
<databaseKey>monitoring</databaseKey>
<databaseVersion>2</databaseVersion>
<databaseVersion>3</databaseVersion>
<adminconsole>
<tab id="tab-server">
<sidebar id="stats-dashboard" name="${admin.sidebar.statistics.name}" description="${admin.item.stats-dashboard.description}">
......
-- $Revision$
-- $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID INTEGER NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX entConPar_con_idx ON ofConParticipant (conversationID, bareJID, jid
CREATE INDEX entConPar_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID INTEGER NOT NULL,
fromJID VARCHAR(1024) NOT NULL,
fromJIDResource VARCHAR(255) NULL,
toJID VARCHAR(1024) NOT NULL,
toJIDResource VARCHAR(255) NULL,
sentDate BIGINT NOT NULL,
stanza LONG VARCHAR NULL,
body LONG VARCHAR
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
// $Revision$
// $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID BIGINT NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX ofConParticipant_conv_idx ON ofConParticipant (conversationID, bare
CREATE INDEX ofConParticipant_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID BIGINT NOT NULL,
fromJID VARCHAR(1024) NOT NULL,
fromJIDResource VARCHAR(255) NULL,
toJID VARCHAR(1024) NOT NULL,
toJIDResource VARCHAR(255) NULL,
sentDate BIGINT NOT NULL,
stanza LONGVARCHAR NULL,
body LONGVARCHAR
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
# $Revision$
# $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID BIGINT NOT NULL,
......@@ -28,12 +28,14 @@ CREATE TABLE ofConParticipant (
);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID BIGINT NOT NULL,
fromJID VARCHAR(255) NOT NULL,
fromJIDResource VARCHAR(100) NULL,
toJID VARCHAR(255) NOT NULL,
toJIDResource VARCHAR(100) NULL,
sentDate BIGINT NOT NULL,
stanza TEXT NULL,
body TEXT,
INDEX ofMessageArchive_con_idx (conversationID)
);
......
-- $Revision$
-- $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID INTEGER NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX ofConParticipant_conv_idx ON ofConParticipant (conversationID, bare
CREATE INDEX ofConParticipant_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID INTEGER NULL,
conversationID INTEGER NOT NULL,
fromJID VARCHAR2(1024) NOT NULL,
fromJIDResource VARCHAR2(255) NULL,
toJID VARCHAR2(1024) NOT NULL,
toJIDResource VARCHAR2(255) NULL,
sentDate INTEGER NOT NULL,
stanza LONG NULL,
body LONG
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
-- $Revision$
-- $Date$
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID INTEGER NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX ofConParticipant_conv_idx ON ofConParticipant (conversationID, bare
CREATE INDEX ofConParticipant_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID INTEGER NOT NULL,
fromJID VARCHAR(1024) NOT NULL,
fromJIDResource VARCHAR(1024) NULL,
toJID VARCHAR(1024) NOT NULL,
toJIDResource VARCHAR(1024) NULL,
sentDate BIGINT NOT NULL,
stanza TEXT NULL,
body TEXT
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
/* $Revision$ */
/* $Date$ */
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 2);
INSERT INTO ofVersion (name, version) VALUES ('monitoring', 3);
CREATE TABLE ofConversation (
conversationID BIGINT NOT NULL,
......@@ -28,12 +28,14 @@ CREATE INDEX ofConParticipant_conv_idx ON ofConParticipant (conversationID, bare
CREATE INDEX ofConParticipant_jid_idx ON ofConParticipant (bareJID);
CREATE TABLE ofMessageArchive (
messageID BIGINT NULL,
conversationID BIGINT NOT NULL,
fromJID NVARCHAR(1024) NOT NULL,
fromJIDResource NVARCHAR(1024) NULL,
toJID NVARCHAR(1024) NOT NULL,
toJIDResource NVARCHAR(1024) NULL,
sentDate BIGINT NOT NULL,
stanza NVARCHAR(MAX) NULL,
body NVARCHAR(MAX)
);
CREATE INDEX ofMessageArchive_con_idx ON ofMessageArchive (conversationID);
......
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD COLUMN messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD COLUMN stanza LONG VARCHAR NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD COLUMN messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD COLUMN stanza LONGVARCHAR NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD COLUMN messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD COLUMN stanza TEXT NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD messageID INTEGER NULL;
ALTER TABLE ofMessageArchive ADD stanza LONG NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
commit;
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD COLUMN messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD COLUMN stanza TEXT NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
-- $Revision$
-- $Date$
ALTER TABLE ofMessageArchive ADD messageID BIGINT NULL;
ALTER TABLE ofMessageArchive ADD stanza NVARCHAR(MAX) NULL;
-- Update database version
UPDATE ofVersion SET version = 3 WHERE name = 'monitoring';
\ No newline at end of file
......@@ -65,10 +65,22 @@ public interface PersistenceManager
* @param owner bare jid of the owner of the conversation to find or <code>null</code> for any.
* @param with bare jid of the communication partner or <code>null</code> for any. This is either
* the jid of another XMPP user or the jid of a group chat.
* @return the conversations that matched search critera without messages and participants.
* @return the conversations that matched search criteria without messages and participants.
*/
Collection<Conversation> findConversations(Date startDate, Date endDate, String owner, String with, XmppResultSet xmppResultSet);
/**
* Searches for messages.
*
* @param startDate earliest start date of the message to find or <code>null</code> for any.
* @param endDate latest end date of the message to find or <code>null</code> for any.
* @param owner bare jid of the owner of the message to find or <code>null</code> for any.
* @param with bare jid of the communication partner or <code>null</code> for any. This is either
* the jid of another XMPP user or the jid of a group chat.
* @return the messages that matched search criteria.
*/
Collection<ArchivedMessage> findMessages(Date startDate, Date endDate, String owner, String with, XmppResultSet xmppResultSet);
Collection<Conversation> getActiveConversations(int conversationTimeout);
List<Conversation> getConversations(Collection<Long> conversationIds);
......
......@@ -4,10 +4,8 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.TreeMap;
import java.util.HashSet;
......@@ -87,6 +85,14 @@ public class JdbcPersistenceManager implements PersistenceManager {
public static final String CONVERSATION_WITH_JID = "ofMessageArchive.toJID";
// public static final String CONVERSATION_WITH_JID = "c.withJid";
public static final String MESSAGE_ID = "ofMessageArchive.messageID";
public static final String MESSAGE_SENT_DATE = "ofMessageArchive.sentDate";
public static final String MESSAGE_TO_JID = "ofMessageArchive.toJID";
public static final String MESSAGE_FROM_JID = "ofMessageArchive.fromJID";
public static final String SELECT_ACTIVE_CONVERSATIONS = "SELECT DISTINCT " + "ofConversation.conversationID, " + "ofConversation.room, "
+ "ofConversation.isExternal, " + "ofConversation.startDate, " + "ofConversation.lastActivity, " + "ofConversation.messageCount, "
+ "ofConParticipant.joinedDate, " + "ofConParticipant.leftDate, " + "ofConParticipant.bareJID, " + "ofConParticipant.jidResource, "
......@@ -110,6 +116,16 @@ public class JdbcPersistenceManager implements PersistenceManager {
// public static final String SELECT_PARTICIPANTS_BY_CONVERSATION =
// "SELECT participantId,startTime,endTime,jid FROM archiveParticipants WHERE conversationId =? ORDER BY startTime";
public static final String SELECT_MESSAGES = "SELECT DISTINCT " + "ofMessageArchive.fromJID, "
+ "ofMessageArchive.toJID, " + "ofMessageArchive.sentDate, " + "ofMessageArchive.stanza, "
+ "ofMessageArchive.messageID, " + "ofConParticipant.bareJID "
+ "FROM ofMessageArchive "
+ "INNER JOIN ofConParticipant ON ofMessageArchive.conversationID = ofConParticipant.conversationID ";
public static final String COUNT_MESSAGES = "SELECT COUNT(DISTINCT ofMessageArchive.messageID) "
+ "FROM ofMessageArchive "
+ "INNER JOIN ofConParticipant ON ofMessageArchive.conversationID = ofConParticipant.conversationID ";
public boolean createMessage(ArchivedMessage message) {
/* read only */
return false;
......@@ -342,6 +358,215 @@ public class JdbcPersistenceManager implements PersistenceManager {
return parameterIndex;
}
@Override
public Collection<ArchivedMessage> findMessages(Date startDate,
Date endDate, String ownerJid, String withJid, XmppResultSet xmppResultSet) {
final StringBuilder querySB;
final StringBuilder whereSB;
final StringBuilder limitSB;
final TreeMap<Long, ArchivedMessage> archivedMessages = new TreeMap<Long, ArchivedMessage>();
querySB = new StringBuilder(SELECT_MESSAGES);
whereSB = new StringBuilder();
limitSB = new StringBuilder();
// Ignore legacy messages
appendWhere(whereSB, MESSAGE_ID, " IS NOT NULL ");
startDate = getAuditedStartDate(startDate);
if (startDate != null) {
appendWhere(whereSB, MESSAGE_SENT_DATE, " >= ?");
}
if (endDate != null) {
appendWhere(whereSB, MESSAGE_SENT_DATE, " <= ?");
}
if (ownerJid != null) {
appendWhere(whereSB, CONVERSATION_OWNER_JID, " = ?");
}
if(withJid != null) {
appendWhere(whereSB, "( ", MESSAGE_TO_JID, " = ? OR ", MESSAGE_FROM_JID, " = ? )");
}
if (whereSB.length() != 0) {
querySB.append(" WHERE ").append(whereSB);
}
if (DbConnectionManager.getDatabaseType() == DbConnectionManager.DatabaseType.sqlserver) {
querySB.insert(0,"SELECT * FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY "+MESSAGE_SENT_DATE+") AS RowNum FROM ( ");
querySB.append(") ofMessageArchive ) t2 WHERE RowNum");
}
else {
querySB.append(" ORDER BY ").append(MESSAGE_SENT_DATE);
}
if (xmppResultSet != null) {
Integer firstIndex = null;
int max = xmppResultSet.getMax() != null ? xmppResultSet.getMax() : DEFAULT_MAX;
int count = countMessages(startDate, endDate, ownerJid, withJid, whereSB.toString());
boolean reverse = false;
xmppResultSet.setCount(count);
if (xmppResultSet.getIndex() != null) {
firstIndex = xmppResultSet.getIndex();
} else if (xmppResultSet.getAfter() != null) {
firstIndex = countMessagesBefore(startDate, endDate, ownerJid, withJid, xmppResultSet.getAfter(), whereSB.toString());
firstIndex += 1;
} else if (xmppResultSet.getBefore() != null) {
int messagesBeforeCount = countMessagesBefore(startDate, endDate, ownerJid, withJid, xmppResultSet.getBefore(), whereSB.toString());
firstIndex = messagesBeforeCount;
firstIndex -= max;
// Reduce result limit to number of results before (if less than a page remaining)
if(messagesBeforeCount < max) {
max = messagesBeforeCount;
}
reverse = true;
if (firstIndex < 0) {
firstIndex = 0;
}
}
firstIndex = firstIndex != null ? firstIndex : 0;
if (DbConnectionManager.getDatabaseType() == DbConnectionManager.DatabaseType.sqlserver) {
limitSB.append(" BETWEEN ").append(firstIndex+1);
limitSB.append(" AND ").append(firstIndex+max);
}
else {
limitSB.append(" LIMIT ").append(max);
limitSB.append(" OFFSET ").append(firstIndex);
}
xmppResultSet.setFirstIndex(firstIndex);
if(isLastPage(firstIndex, count, max, reverse)) {
xmppResultSet.setComplete(true);
}
}
querySB.append(limitSB);
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(querySB.toString());
bindMessageParameters(startDate, endDate, ownerJid, withJid, pstmt);
rs = pstmt.executeQuery();
Log.debug("findMessages: SELECT_MESSAGES: " + pstmt.toString());
while(rs.next()) {
Date time = millisToDate(rs.getLong("sentDate"));
ArchivedMessage archivedMessage = new ArchivedMessage(time, null, null, null);
archivedMessage.setId(rs.getLong("messageID"));
archivedMessage.setStanza(rs.getString("stanza"));
archivedMessages.put(archivedMessage.getId(), archivedMessage);
}
} catch(SQLException sqle) {
Log.error("Error selecting conversations", sqle);
} finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
if (xmppResultSet != null && archivedMessages.size() > 0) {
xmppResultSet.setFirst(archivedMessages.firstKey());
xmppResultSet.setLast(archivedMessages.lastKey());
}
return archivedMessages.values();
}
private Integer countMessages(Date startDate, Date endDate,
String ownerJid, String withJid, String whereClause) {
StringBuilder querySB;
querySB = new StringBuilder(COUNT_MESSAGES);
if (whereClause != null && whereClause.length() != 0) {
querySB.append(" WHERE ").append(whereClause);
}
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(querySB.toString());
bindMessageParameters(startDate, endDate, ownerJid, withJid, pstmt);
rs = pstmt.executeQuery();
if (rs.next()) {
return rs.getInt(1);
} else {
return 0;
}
} catch (SQLException sqle) {
Log.error("Error counting conversations", sqle);
return 0;
} finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
}
private Integer countMessagesBefore(Date startDate, Date endDate,
String ownerJid, String withJid, Long before, String whereClause) {
StringBuilder querySB;
querySB = new StringBuilder(COUNT_MESSAGES);
querySB.append(" WHERE ");
if (whereClause != null && whereClause.length() != 0) {
querySB.append(whereClause);
querySB.append(" AND ");
}
querySB.append(MESSAGE_ID).append(" < ?");
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
int parameterIndex;
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(querySB.toString());
parameterIndex = bindMessageParameters(startDate, endDate, ownerJid, withJid, pstmt);
pstmt.setLong(parameterIndex, before);
rs = pstmt.executeQuery();
if (rs.next()) {
return rs.getInt(1);
} else {
return 0;
}
} catch (SQLException sqle) {
Log.error("Error counting conversations", sqle);
return 0;
} finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
}
private int bindMessageParameters(Date startDate, Date endDate,
String ownerJid, String withJid, PreparedStatement pstmt) throws SQLException {
int parameterIndex = 1;
if (startDate != null) {
pstmt.setLong(parameterIndex++, dateToMillis(startDate));
}
if (endDate != null) {
pstmt.setLong(parameterIndex++, dateToMillis(endDate));
}
if (ownerJid != null) {
pstmt.setString(parameterIndex++, ownerJid);
}
if (withJid != null) {
// Add twice due to OR operator
pstmt.setString(parameterIndex++, withJid);
pstmt.setString(parameterIndex++, withJid);
}
return parameterIndex;
}
public Collection<Conversation> getActiveConversations(int conversationTimeout) {
final Collection<Conversation> conversations;
final long now = System.currentTimeMillis();
......@@ -621,4 +846,30 @@ public class JdbcPersistenceManager implements PersistenceManager {
private Date millisToDate(Long millis) {
return millis == null ? null : new Date(millis);
}
/**
* Determines whether a result page is the last of a set.
*
* @param firstItemIndex index (in whole set) of first item in page.
* @param resultCount total number of results in set.
* @param pageSize number of results in a page.
* @param reverse whether paging is being performed in reverse (back to front)
* @return whether results are from last page.
*/
private boolean isLastPage(int firstItemIndex, int resultCount, int pageSize, boolean reverse) {
if(reverse) {
// Index of first item in last page always 0 when reverse
if(firstItemIndex == 0) {
return true;
}
} else {
if((firstItemIndex + pageSize) >= resultCount) {
return true;
}
}
return false;
}
}
......@@ -30,6 +30,7 @@ public class ArchivedMessage {
private String body;
private Conversation conversation;
private JID withJid;
private String stanza;
public ArchivedMessage(Date time, Direction direction, String type, JID withJid) {
this.time = time;
......@@ -74,6 +75,14 @@ public class ArchivedMessage {
this.body = body;
}
public String getStanza() {
return stanza;
}
public void setStanza(String stanza) {
this.stanza = stanza;
}
public Conversation getConversation() {
return conversation;
}
......@@ -84,7 +93,7 @@ public class ArchivedMessage {
/**
* Checks if this message contains payload that should be archived.
*
*
* @return <code>true</code> if this message is empty, <code>false</code>
* otherwise.
*/
......
package com.reucon.openfire.plugin.archive.xep0136;
package com.reucon.openfire.plugin.archive.xep;
import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.handler.IQHandler;
......@@ -11,15 +11,15 @@ import com.reucon.openfire.plugin.archive.IndexManager;
import com.reucon.openfire.plugin.archive.PersistenceManager;
/**
* Abstract base class for XEP-0136 IQ Handlers.
* Abstract base class for XEP-specific IQ Handlers.
*/
public abstract class AbstractIQHandler extends IQHandler {
protected static final String NAMESPACE = "urn:xmpp:archive";
private final IQHandlerInfo info;
protected AbstractIQHandler(String moduleName, String elementName) {
protected AbstractIQHandler(String moduleName, String elementName, String namespace) {
super(moduleName);
this.info = new IQHandlerInfo(elementName, NAMESPACE);
this.info = new IQHandlerInfo(elementName, namespace);
}
public final IQHandlerInfo getInfo() {
......
package com.reucon.openfire.plugin.archive.xep;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.jivesoftware.openfire.IQRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.IQDiscoInfoHandler;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.jivesoftware.util.Log;
import org.xmpp.packet.IQ;
import org.xmpp.packet.PacketError;
public abstract class AbstractXepSupport {
protected final XMPPServer server;
protected final Map<String, IQHandler> element2Handlers;
protected final IQHandler iqDispatcher;
protected final String namespace;
protected Collection<IQHandler> iqHandlers;
public AbstractXepSupport(XMPPServer server, String namespace, String iqDispatcherName) {
this.server = server;
this.element2Handlers = Collections
.synchronizedMap(new HashMap<String, IQHandler>());
this.iqDispatcher = new AbstractIQHandler(iqDispatcherName, null, namespace) {
public IQ handleIQ(IQ packet) throws UnauthorizedException {
if (!MonitoringPlugin.getInstance().isEnabled()) {
return error(packet,
PacketError.Condition.feature_not_implemented);
}
final IQHandler iqHandler = element2Handlers.get(packet
.getChildElement().getName());
if (iqHandler != null) {
return iqHandler.handleIQ(packet);
} else {
return error(packet,
PacketError.Condition.feature_not_implemented);
}
}
};
this.namespace = namespace;
this.iqHandlers = Collections.emptyList();
}
public void start() {
for (IQHandler iqHandler : iqHandlers) {
try {
iqHandler.initialize(server);
iqHandler.start();
} catch (Exception e) {
Log.error("Unable to initialize and start "
+ iqHandler.getClass());
continue;
}
element2Handlers.put(iqHandler.getInfo().getName(), iqHandler);
if (iqHandler instanceof ServerFeaturesProvider) {
for (Iterator<String> i = ((ServerFeaturesProvider) iqHandler)
.getFeatures(); i.hasNext();) {
server.getIQDiscoInfoHandler().addServerFeature(i.next());
}
}
}
server.getIQDiscoInfoHandler().addServerFeature(namespace);
server.getIQRouter().addHandler(iqDispatcher);
}
public void stop() {
IQRouter iqRouter = server.getIQRouter();
IQDiscoInfoHandler iqDiscoInfoHandler = server.getIQDiscoInfoHandler();
for (IQHandler iqHandler : iqHandlers) {
element2Handlers.remove(iqHandler.getInfo().getName());
try {
iqHandler.stop();
iqHandler.destroy();
} catch (Exception e) {
Log.warn("Unable to stop and destroy " + iqHandler.getClass());
}
if (iqHandler instanceof ServerFeaturesProvider) {
for (Iterator<String> i = ((ServerFeaturesProvider) iqHandler)
.getFeatures(); i.hasNext();) {
if (iqDiscoInfoHandler != null) {
iqDiscoInfoHandler.removeServerFeature(i.next());
}
}
}
}
if (iqRouter != null) {
iqRouter.removeHandler(iqDispatcher);
}
}
}
......@@ -17,6 +17,7 @@ public class XmppResultSet
private Integer firstIndex;
private Long last;
private Integer count;
private boolean complete;
public XmppResultSet(Element setElement)
{
......@@ -118,6 +119,24 @@ public class XmppResultSet
return max;
}
/**
* Returns the total size of the result set.
*
* @return the total size of the result set.
*/
public Integer getCount() {
return count;
}
/**
* Returns whether the result set is complete (last page of results).
*
* @return whether the result set is complete.
*/
public boolean isComplete() {
return complete;
}
/**
* Sets the id of the first element returned.
*
......@@ -158,7 +177,16 @@ public class XmppResultSet
this.count = count;
}
public Element createResultElement()
/**
* Sets whether the result set is complete (used by last page of results)
*
* @param complete
*/
public void setComplete(boolean complete) {
this.complete = complete;
}
public Element createResultElement()
{
final Element set;
......
......@@ -12,6 +12,7 @@ import org.xmpp.packet.JID;
import com.reucon.openfire.plugin.archive.model.Conversation;
import com.reucon.openfire.plugin.archive.util.XmppDateUtil;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
/**
......@@ -19,10 +20,12 @@ import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
*/
public class IQListHandler extends AbstractIQHandler implements
ServerFeaturesProvider {
private static final String NAMESPACE = "urn:xmpp:archive";
private static final String NAMESPACE_MANAGE = "urn:xmpp:archive:manage";
public IQListHandler() {
super("Message Archiving List Handler", "list");
super("Message Archiving List Handler", "list", NAMESPACE);
}
public IQ handleIQ(IQ packet) throws UnauthorizedException {
......
......@@ -5,6 +5,8 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.xmpp.packet.IQ;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
......@@ -14,11 +16,13 @@ import java.util.List;
*/
public class IQPrefHandler extends AbstractIQHandler implements ServerFeaturesProvider
{
private static final String NAMESPACE = "urn:xmpp:archive";
private static final String NAMESPACE_PREF = "urn:xmpp:archive:pref";
public IQPrefHandler()
{
super("Message Archiving Preferences Handler", "pref");
super("Message Archiving Preferences Handler", "pref", NAMESPACE);
}
@SuppressWarnings("unchecked")
......
......@@ -3,6 +3,7 @@ package com.reucon.openfire.plugin.archive.xep0136;
import com.reucon.openfire.plugin.archive.model.ArchivedMessage;
import com.reucon.openfire.plugin.archive.model.Conversation;
import com.reucon.openfire.plugin.archive.util.XmppDateUtil;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
import org.dom4j.Element;
import org.jivesoftware.openfire.auth.UnauthorizedException;
......@@ -16,8 +17,11 @@ import java.util.List;
* Message Archiving Retrieve Handler.
*/
public class IQRetrieveHandler extends AbstractIQHandler {
private static final String NAMESPACE = "urn:xmpp:archive";
public IQRetrieveHandler() {
super("Message Archiving Retrieve Handler", "retrieve");
super("Message Archiving Retrieve Handler", "retrieve", NAMESPACE);
}
public IQ handleIQ(IQ packet) throws UnauthorizedException {
......
package com.reucon.openfire.plugin.archive.xep0136;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.jivesoftware.openfire.IQRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.IQDiscoInfoHandler;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.jivesoftware.util.Log;
import org.xmpp.packet.IQ;
import org.xmpp.packet.PacketError;
import com.reucon.openfire.plugin.archive.xep.AbstractXepSupport;
/**
* Encapsulates support for <a
* href="http://www.xmpp.org/extensions/xep-0136.html">XEP-0136</a>.
*/
public class Xep0136Support {
private static final String NAMESPACE_AUTO = "urn:xmpp:archive:auto";
public class Xep0136Support extends AbstractXepSupport {
final XMPPServer server;
final Map<String, IQHandler> element2Handlers;
final IQHandler iqDispatcher;
final Collection<IQHandler> iqHandlers;
private static final String NAMESPACE = "urn:xmpp:archive:auto";
public Xep0136Support(XMPPServer server) {
this.server = server;
this.element2Handlers = Collections
.synchronizedMap(new HashMap<String, IQHandler>());
this.iqDispatcher = new AbstractIQHandler("XEP-0136 IQ Dispatcher",
null) {
public IQ handleIQ(IQ packet) throws UnauthorizedException {
if (!MonitoringPlugin.getInstance().isEnabled()) {
return error(packet,
PacketError.Condition.feature_not_implemented);
}
final IQHandler iqHandler = element2Handlers.get(packet
.getChildElement().getName());
if (iqHandler != null) {
return iqHandler.handleIQ(packet);
} else {
return error(packet,
PacketError.Condition.feature_not_implemented);
}
}
};
super(server, NAMESPACE, "XEP-0136 IQ Dispatcher");
iqHandlers = new ArrayList<IQHandler>();
......@@ -64,53 +29,4 @@ public class Xep0136Support {
// iqHandlers.add(new IQRemoveHandler());
}
public void start() {
for (IQHandler iqHandler : iqHandlers) {
try {
iqHandler.initialize(server);
iqHandler.start();
} catch (Exception e) {
Log.error("Unable to initialize and start "
+ iqHandler.getClass());
continue;
}
element2Handlers.put(iqHandler.getInfo().getName(), iqHandler);
if (iqHandler instanceof ServerFeaturesProvider) {
for (Iterator<String> i = ((ServerFeaturesProvider) iqHandler)
.getFeatures(); i.hasNext();) {
server.getIQDiscoInfoHandler().addServerFeature(i.next());
}
}
}
server.getIQDiscoInfoHandler().addServerFeature(NAMESPACE_AUTO);
server.getIQRouter().addHandler(iqDispatcher);
}
public void stop() {
IQRouter iqRouter = server.getIQRouter();
IQDiscoInfoHandler iqDiscoInfoHandler = server.getIQDiscoInfoHandler();
for (IQHandler iqHandler : iqHandlers) {
element2Handlers.remove(iqHandler.getInfo().getName());
try {
iqHandler.stop();
iqHandler.destroy();
} catch (Exception e) {
Log.warn("Unable to stop and destroy " + iqHandler.getClass());
}
if (iqHandler instanceof ServerFeaturesProvider) {
for (Iterator<String> i = ((ServerFeaturesProvider) iqHandler)
.getFeatures(); i.hasNext();) {
if (iqDiscoInfoHandler != null) {
iqDiscoInfoHandler.removeServerFeature(i.next());
}
}
}
}
if (iqRouter != null) {
iqRouter.removeHandler(iqDispatcher);
}
}
}
package com.reucon.openfire.plugin.archive.xep0313;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.TimeZone;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.disco.ServerFeaturesProvider;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.session.LocalClientSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import com.reucon.openfire.plugin.archive.model.ArchivedMessage;
import com.reucon.openfire.plugin.archive.xep.AbstractIQHandler;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
/**
* XEP-0313 IQ Query Handler
*/
public class IQQueryHandler extends AbstractIQHandler implements
ServerFeaturesProvider {
private static final Logger Log = LoggerFactory.getLogger(IQHandler.class);
private static final String NAMESPACE = "urn:xmpp:mam:0";
private static final String MODULE_NAME = "Message Archive Management Query Handler";
private static final DateFormat XEP0082_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
protected IQQueryHandler() {
super(MODULE_NAME, "query", NAMESPACE);
}
public IQ handleIQ(IQ packet) throws UnauthorizedException {
LocalClientSession session = (LocalClientSession) sessionManager.getSession(packet.getFrom());
// If no session was found then answer with an error (if possible)
if (session == null) {
Log.error("Error during resource binding. Session not found in " +
sessionManager.getPreAuthenticatedKeys() +
" for key " +
packet.getFrom());
return buildErrorResponse(packet);
}
if(packet.getType().equals(IQ.Type.get)) {
sendSupportedFieldsResult(packet, session);
return null;
}
// Default to user's own archive
JID archiveJid = packet.getFrom();
if(packet.getElement().attribute("to") != null) {
archiveJid = new JID(packet.getElement().attribute("to").getStringValue());
// Only allow queries to users own archives
if(!archiveJid.toBareJID().equals(packet.getFrom().toBareJID())) {
return buildForbiddenResponse(packet);
}
}
sendAcknowledgementResult(packet, session);
final QueryRequest queryRequest = new QueryRequest(packet.getChildElement(), archiveJid);
Collection<ArchivedMessage> archivedMessages = retrieveMessages(queryRequest);
for(ArchivedMessage archivedMessage : archivedMessages) {
sendMessageResult(session, queryRequest, archivedMessage);
}
sendFinalMessage(session, queryRequest);
return null;
}
/**
* Create error response to send to client
* @param packet
* @return
*/
private IQ buildErrorResponse(IQ packet) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(PacketError.Condition.internal_server_error);
return reply;
}
/**
* Create error response due to forbidden request
* @param packet Received request
* @return
*/
private IQ buildForbiddenResponse(IQ packet) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(PacketError.Condition.forbidden);
return reply;
}
/**
* Retrieve messages matching query request from server archive
* @param queryRequest
* @return
*/
private Collection<ArchivedMessage> retrieveMessages(QueryRequest queryRequest) {
String withField = null;
String startField = null;
String endField = null;
DataForm dataForm = queryRequest.getDataForm();
if(dataForm != null) {
if(dataForm.getField("with") != null) {
withField = dataForm.getField("with").getFirstValue();
}
if(dataForm.getField("start") != null) {
startField = dataForm.getField("start").getFirstValue();
}
if(dataForm.getField("end") != null) {
endField = dataForm.getField("end").getFirstValue();
}
}
Date startDate = null;
Date endDate = null;
try {
if(startField != null) {
startDate = XEP0082_DATE_FORMAT.parse(startField);
}
if(endField != null) {
endDate = XEP0082_DATE_FORMAT.parse(endField);
}
} catch (ParseException e) {
Log.error("Error parsing query date filters.", e);
}
return getPersistenceManager().findMessages(
startDate,
endDate,
queryRequest.getArchive().toBareJID(),
withField,
queryRequest.getResultSet());
}
/**
* Send result packet to client acknowledging query.
* @param packet Received query packet
* @param session Client session to respond to
*/
private void sendAcknowledgementResult(IQ packet, LocalClientSession session) {
IQ result = IQ.createResultIQ(packet);
session.process(result);
}
/**
* Send final message back to client following query.
* @param session Client session to respond to
* @param queryRequest Received query request
*/
private void sendFinalMessage(LocalClientSession session,
final QueryRequest queryRequest) {
Message finalMessage = new Message();
Element fin = finalMessage.addChildElement("fin", NAMESPACE);
if(queryRequest.getQueryid() != null) {
fin.addAttribute("queryid", queryRequest.getQueryid());
}
XmppResultSet resultSet = queryRequest.getResultSet();
if (resultSet != null) {
fin.add(resultSet.createResultElement());
if(resultSet.isComplete()) {
fin.addAttribute("complete", "true");
}
}
session.process(finalMessage);
}
/**
* Send archived message to requesting client
* @param session Client session that send message to
* @param queryRequest Query request made by client
* @param archivedMessage Message to send to client
* @return
*/
private void sendMessageResult(LocalClientSession session,
QueryRequest queryRequest, ArchivedMessage archivedMessage) {
if(archivedMessage.getStanza() == null) {
// Don't send legacy archived messages (that have no stanza)
return;
}
Message messagePacket = new Message();
messagePacket.setTo(session.getAddress());
Element result = messagePacket.addChildElement("result", NAMESPACE);
result.addAttribute("id", archivedMessage.getId().toString());
if(queryRequest.getQueryid() != null) {
result.addAttribute("queryid", queryRequest.getQueryid());
}
Element forwarded = result.addElement("forwarded", "urn:xmpp:forward:0");
Element delay = forwarded.addElement("delay", "urn:xmpp:delay");
XEP0082_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
delay.addAttribute("stamp", XEP0082_DATE_FORMAT.format(archivedMessage.getTime()));
Document stanza;
try {
stanza = DocumentHelper.parseText(archivedMessage.getStanza());
forwarded.add(stanza.getRootElement());
} catch (DocumentException e) {
Log.error("Failed to parse message stanza.", e);
// If we can't parse stanza then we have no message to send to client, abort
return;
}
session.process(messagePacket);
}
/**
* Declare DataForm fields supported by the MAM implementation on this server
* @param packet Incoming query (form field request) packet
* @param session Session with client
*/
private void sendSupportedFieldsResult(IQ packet, LocalClientSession session) {
IQ result = IQ.createResultIQ(packet);
Element query = result.setChildElement("query", NAMESPACE);
DataForm form = new DataForm(DataForm.Type.form);
form.addField("FORM_TYPE", null, FormField.Type.hidden);
form.getField("FORM_TYPE").addValue(NAMESPACE);
form.addField("with", null, FormField.Type.jid_single);
form.addField("start", null, FormField.Type.text_single);
form.addField("end", null, FormField.Type.text_single);
query.add(form.getElement());
session.process(result);
}
@Override
public Iterator<String> getFeatures() {
return Collections.singleton(NAMESPACE).iterator();
}
}
package com.reucon.openfire.plugin.archive.xep0313;
import org.dom4j.Element;
import org.dom4j.QName;
import org.xmpp.forms.DataForm;
import org.xmpp.packet.JID;
import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
/**
* A request to query an archive
*/
public class QueryRequest {
private String queryid;
private DataForm dataForm;
private XmppResultSet resultSet;
private JID archive;
public QueryRequest(Element queryElement, JID archive) {
this.archive = archive;
if (queryElement.attribute("queryid") != null)
{
this.queryid = queryElement.attributeValue("queryid");
}
Element xElement = queryElement.element(QName.get("x", DataForm.NAMESPACE));
if(xElement != null) {
this.dataForm = new DataForm(xElement);
}
Element setElement = queryElement.element(QName.get("set", XmppResultSet.NAMESPACE));
if (setElement != null)
{
resultSet = new XmppResultSet(setElement);
}
}
public String getQueryid() {
return queryid;
}
public DataForm getDataForm() {
return dataForm;
}
public XmppResultSet getResultSet() {
return resultSet;
}
public JID getArchive() {
return archive;
}
}
package com.reucon.openfire.plugin.archive.xep0313;
import java.util.ArrayList;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.handler.IQHandler;
import com.reucon.openfire.plugin.archive.xep.AbstractXepSupport;
/**
* Encapsulates support for <a
* href="http://www.xmpp.org/extensions/xep-0313.html">XEP-0313</a>.
*/
public class Xep0313Support extends AbstractXepSupport {
private static final String NAMESPACE = "urn:xmpp:mam:0";
public Xep0313Support(XMPPServer server) {
super(server, NAMESPACE, "XEP-0313 IQ Dispatcher");
this.iqHandlers = new ArrayList<IQHandler>();
iqHandlers.add(new IQQueryHandler());
}
}
......@@ -72,7 +72,7 @@ public class ArchiveInterceptor implements PacketInterceptor, Startable {
if (conversationManager.isConversation(message)) {
// Process this event in the senior cluster member or local JVM when not in a cluster
if (ClusterManager.isSeniorClusterMember()) {
conversationManager.processMessage(message.getFrom(), message.getTo(), message.getBody(), new Date());
conversationManager.processMessage(message.getFrom(), message.getTo(), message.getBody(), message.toXML(), new Date());
}
else {
JID sender = message.getFrom();
......
......@@ -35,6 +35,7 @@ public class ArchivedMessage {
private JID toJID;
private Date sentDate;
private String body;
private String stanza;
private boolean roomEvent;
/**
......@@ -57,6 +58,11 @@ public class ArchivedMessage {
this.roomEvent = roomEvent;
}
public ArchivedMessage(long conversationID, JID fromJID, JID toJID, Date sentDate, String body, String stanza, boolean roomEvent) {
this(conversationID, fromJID, toJID, sentDate, body, roomEvent);
this.stanza = stanza;
}
/**
* The conversation ID that the message is associated with.
*
......@@ -102,6 +108,15 @@ public class ArchivedMessage {
return body;
}
/**
* String encoded message stanza.
*
* @return string encoded message stanza.
*/
public String getStanza() {
return stanza;
}
/**
* Returns true if the message belongs to a room event. Examples of room events are:
* user joined the room or user left the room.
......
......@@ -56,7 +56,7 @@ public class ConversationEvent implements Externalizable {
public void run(ConversationManager conversationManager) {
if (Type.chatMessageReceived == type) {
conversationManager.processMessage(sender, receiver, body, date);
conversationManager.processMessage(sender, receiver, body, "", date);
}
else if (Type.roomDestroyed == type) {
conversationManager.roomConversationEnded(roomJID, date);
......
......@@ -72,11 +72,11 @@ import org.xmpp.packet.Message;
* conversation data is enabled by default, but can be disabled by setting "conversation.metadataArchiving" to <tt>false</tt>. Archiving of messages
* in a conversation is disabled by default, but can be enabled by setting "conversation.messageArchiving" to <tt>true</tt>.
* <p>
*
*
* When running in a cluster only the senior cluster member will keep track of the active conversations. Other cluster nodes will forward conversation
* events that occurred in the local node to the senior cluster member. If the senior cluster member goes down then current conversations will be
* terminated and if users keep sending messages between them then new conversations will be created.
*
*
* @author Matt Tucker
*/
public class ConversationManager implements Startable, ComponentEventListener {
......@@ -85,8 +85,8 @@ public class ConversationManager implements Startable, ComponentEventListener {
private static final String UPDATE_CONVERSATION = "UPDATE ofConversation SET lastActivity=?, messageCount=? WHERE conversationID=?";
private static final String UPDATE_PARTICIPANT = "UPDATE ofConParticipant SET leftDate=? WHERE conversationID=? AND bareJID=? AND jidResource=? AND joinedDate=?";
private static final String INSERT_MESSAGE = "INSERT INTO ofMessageArchive(conversationID, fromJID, fromJIDResource, toJID, toJIDResource, sentDate, body) "
+ "VALUES (?,?,?,?,?,?,?)";
private static final String INSERT_MESSAGE = "INSERT INTO ofMessageArchive(messageID, conversationID, fromJID, fromJIDResource, toJID, toJIDResource, sentDate, body, stanza) "
+ "VALUES ((SELECT COUNT(*) FROM ofMessageArchive),?,?,?,?,?,?,?,?)";
private static final String CONVERSATION_COUNT = "SELECT COUNT(*) FROM ofConversation";
private static final String MESSAGE_COUNT = "SELECT COUNT(*) FROM ofMessageArchive";
private static final String DELETE_CONVERSATION_1 = "DELETE FROM ofMessageArchive WHERE conversationID=?";
......@@ -319,7 +319,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns true if metadata archiving is enabled. Conversation meta-data includes the participants, start date, last activity, and the count of
* messages sent. When archiving is enabled, all meta-data is written to the database.
*
*
* @return true if metadata archiving is enabled.
*/
public boolean isMetadataArchivingEnabled() {
......@@ -329,7 +329,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Sets whether metadata archiving is enabled. Conversation meta-data includes the participants, start date, last activity, and the count of
* messages sent. When archiving is enabled, all meta-data is written to the database.
*
*
* @param enabled
* true if archiving should be enabled.
*/
......@@ -340,7 +340,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns true if one-to-one chats or group chats messages are being archived.
*
*
* @return true if one-to-one chats or group chats messages are being archived.
*/
public boolean isArchivingEnabled() {
......@@ -351,7 +351,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
* Returns true if message archiving is enabled for one-to-one chats. When enabled, all messages in one-to-one conversations are stored in the
* database. Note: it's not possible for meta-data archiving to be disabled when message archiving is enabled; enabling message archiving
* automatically enables meta-data archiving.
*
*
* @return true if message archiving is enabled.
*/
public boolean isMessageArchivingEnabled() {
......@@ -361,7 +361,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Sets whether message archiving is enabled. When enabled, all messages in conversations are stored in the database. Note: it's not possible for
* meta-data archiving to be disabled when message archiving is enabled; enabling message archiving automatically enables meta-data archiving.
*
*
* @param enabled
* true if message should be enabled.
*/
......@@ -378,7 +378,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
* Returns true if message archiving is enabled for group chats. When enabled, all messages in group conversations are stored in the database
* unless a list of rooms was specified in {@link #getRoomsArchived()} . Note: it's not possible for meta-data archiving to be disabled when room
* archiving is enabled; enabling room archiving automatically enables meta-data archiving.
*
*
* @return true if room archiving is enabled.
*/
public boolean isRoomArchivingEnabled() {
......@@ -389,7 +389,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
* Sets whether message archiving is enabled for group chats. When enabled, all messages in group conversations are stored in the database unless
* a list of rooms was specified in {@link #getRoomsArchived()} . Note: it's not possible for meta-data archiving to be disabled when room
* archiving is enabled; enabling room archiving automatically enables meta-data archiving.
*
*
* @param enabled
* if room archiving is enabled.
*/
......@@ -405,7 +405,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns list of room names whose messages will be archived. When room archiving is enabled and this list is empty then messages of all local
* rooms will be archived. However, when name of rooms are defined in this list then only messages of those rooms will be archived.
*
*
* @return list of local room names whose messages will be archived.
*/
public Collection<String> getRoomsArchived() {
......@@ -415,7 +415,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Sets list of room names whose messages will be archived. When room archiving is enabled and this list is empty then messages of all local rooms
* will be archived. However, when name of rooms are defined in this list then only messages of those rooms will be archived.
*
*
* @param roomsArchived
* list of local room names whose messages will be archived.
*/
......@@ -426,7 +426,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns the number of minutes a conversation can be idle before it's ended.
*
*
* @return the conversation idle time.
*/
public int getIdleTime() {
......@@ -435,7 +435,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Sets the number of minutes a conversation can be idle before it's ended.
*
*
* @param idleTime
* the max number of minutes a conversation can be idle before it's ended.
* @throws IllegalArgumentException
......@@ -452,7 +452,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns the maximum number of minutes a conversation can last before it's ended. Any additional messages between the participants in the chat
* will be associated with a new conversation.
*
*
* @return the maximum number of minutes a conversation can last.
*/
public int getMaxTime() {
......@@ -462,7 +462,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Sets the maximum number of minutes a conversation can last before it's ended. Any additional messages between the participants in the chat will
* be associated with a new conversation.
*
*
* @param maxTime
* the maximum number of minutes a conversation can last.
* @throws IllegalArgumentException
......@@ -506,7 +506,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns the count of active conversations.
*
*
* @return the count of active conversations.
*/
public int getConversationCount() {
......@@ -518,7 +518,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns a conversation by ID.
*
*
* @param conversationID
* the ID of the conversation.
* @return the conversation.
......@@ -548,7 +548,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns the set of active conversations.
*
*
* @return the active conversations.
*/
public Collection<Conversation> getConversations() {
......@@ -571,7 +571,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns the total number of conversations that have been archived to the database. The archived conversation may only be the meta-data, or it
* might include messages as well if message archiving is turned on.
*
*
* @return the total number of archived conversations.
*/
public int getArchivedConversationCount() {
......@@ -596,7 +596,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns the total number of messages that have been archived to the database.
*
*
* @return the total number of archived messages.
*/
public int getArchivedMessageCount() {
......@@ -621,7 +621,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Adds a conversation listener, which will be notified of newly created conversations, conversations ending, and updates to conversations.
*
*
* @param listener
* the conversation listener.
*/
......@@ -631,7 +631,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Removes a conversation listener.
*
*
* @param listener
* the conversation listener.
*/
......@@ -642,17 +642,19 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Processes an incoming message of a one-to-one chat. The message will mapped to a conversation and then queued for storage if archiving is
* turned on.
*
*
* @param sender
* sender of the message.
* @param receiver
* receiver of the message.
* @param body
* body of the message.
* @param stanza
* String encoded message stanza
* @param date
* date when the message was sent.
*/
void processMessage(JID sender, JID receiver, String body, Date date) {
void processMessage(JID sender, JID receiver, String body, String stanza, Date date) {
String conversationKey = getConversationKey(sender, receiver);
synchronized (conversationKey.intern()) {
Conversation conversation = conversations.get(conversationKey);
......@@ -706,7 +708,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
if (messageArchivingEnabled) {
if (body != null) {
/* OF-677 - Workaround to prevent null messages being archived */
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, receiver, date, body, false));
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, receiver, date, body, stanza, false));
}
}
// Notify listeners of the conversation update.
......@@ -718,7 +720,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Processes an incoming message sent to a room. The message will mapped to a conversation and then queued for storage if archiving is turned on.
*
*
* @param roomJID
* the JID of the room where the group conversation is taking place.
* @param sender
......@@ -768,7 +770,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
JID jid = new JID(roomJID + "/" + nickname);
if (body != null) {
/* OF-677 - Workaround to prevent null messages being archived */
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, jid, date, body, false));
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, jid, date, body, "", false));
}
}
// Notify listeners of the conversation update.
......@@ -785,7 +787,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
* <p/>
* Eventually, when a new conversation will start in the room and if this user is still in the room then the new conversation will detect this
* user and mark like if the user joined the converstion from the beginning.
*
*
* @param room
* the room where the user joined.
* @param user
......@@ -805,7 +807,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Notification message indicating that a user left a groupchat conversation. If no groupchat conversation was taking place in the specified room
* then ignore this event.
*
*
* @param room
* the room where the user left.
* @param user
......@@ -839,7 +841,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns the group conversation taking place in the specified room or <tt>null</tt> if none.
*
*
* @param room
* JID of the room.
* @return the group conversation taking place in the specified room or null if none.
......@@ -856,7 +858,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns true if the specified message should be processed by the conversation manager. Only messages between two users, group chats, or
* gateways are processed.
*
*
* @param message
* the message to analyze.
* @return true if the specified message should be processed by the conversation manager.
......@@ -871,7 +873,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns true if the specified JID should be recorded in a conversation.
*
*
* @param jid
* the JID.
* @return true if the JID should be recorded in a conversation.
......@@ -903,7 +905,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
/**
* Returns a unique key for a coversation between two JID's. The order of two JID parameters is irrelevant; the same key will be returned.
*
*
* @param jid1
* the first JID.
* @param jid2
......@@ -987,6 +989,7 @@ public class ConversationManager implements Startable, ComponentEventListener {
pstmt.setString(5, message.getToJID().getResource());
pstmt.setLong(6, message.getSentDate().getTime());
DbConnectionManager.setLargeTextField(pstmt, 7, message.getBody());
DbConnectionManager.setLargeTextField(pstmt, 8, message.getStanza());
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.addBatch();
} else {
......
......@@ -124,11 +124,11 @@ public class GroupConversationInterceptor implements MUCEventListener, Startable
ConversationEvent.roomMessageReceived(roomJID, user, nickname, withBody ? message.getBody() : null, new Date()));
}
}
public void privateMessageRecieved(JID toJID, JID fromJID, Message message) {
if(message.getBody() != null) {
if (ClusterManager.isSeniorClusterMember()) {
conversationManager.processMessage(fromJID, toJID, message.getBody(), new Date());
conversationManager.processMessage(fromJID, toJID, message.getBody(), message.toXML(), new Date());
}
else {
ConversationEventsQueue eventsQueue = conversationManager.getConversationEventsQueue();
......
......@@ -50,10 +50,11 @@ import com.reucon.openfire.plugin.archive.PersistenceManager;
import com.reucon.openfire.plugin.archive.impl.ArchiveManagerImpl;
import com.reucon.openfire.plugin.archive.impl.JdbcPersistenceManager;
import com.reucon.openfire.plugin.archive.xep0136.Xep0136Support;
import com.reucon.openfire.plugin.archive.xep0313.Xep0313Support;
/**
* Openfire Monitoring plugin.
*
*
* @author Matt Tucker
*/
public class MonitoringPlugin implements Plugin {
......@@ -71,6 +72,7 @@ public class MonitoringPlugin implements Plugin {
private ArchiveManager archiveManager;
private IndexManager indexManager;
private Xep0136Support xep0136Support;
private Xep0313Support xep0313Support;
public MonitoringPlugin() {
instance = this;
......@@ -131,7 +133,7 @@ public class MonitoringPlugin implements Plugin {
/**
* Returns the instance of a module registered with the Monitoring plugin.
*
*
* @param clazz
* the module class.
* @return the instance of the module.
......@@ -157,6 +159,9 @@ public class MonitoringPlugin implements Plugin {
xep0136Support = new Xep0136Support(XMPPServer.getInstance());
xep0136Support.start();
xep0313Support = new Xep0313Support(XMPPServer.getInstance());
xep0313Support.start();
System.out.println("Starting Monitoring Plugin");
// Check if we Enterprise is installed and stop loading this plugin if
......@@ -189,6 +194,9 @@ public class MonitoringPlugin implements Plugin {
xep0136Support = new Xep0136Support(XMPPServer.getInstance());
xep0136Support.start();
xep0313Support = new Xep0313Support(XMPPServer.getInstance());
xep0313Support.start();
}
public void destroyPlugin() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment