Commit 33b5e8c0 authored by Leon Roy's avatar Leon Roy Committed by leonroy

OF-651 - Monitoring plugin should have an option to purge and restrict

Restriction implemented, purging left

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13650 b35dd754-fafc-0310-a699-88a17e54d16e
parent 1a4803a6
......@@ -76,6 +76,10 @@ archive.settings.idle.time = Idle Time
archive.settings.idle.time.description = The number of minutes a conversation can be idle before it's ended.
archive.settings.max.time = Max Time
archive.settings.max.time.description = The maximum number of minutes a conversation can last before it's ended.
archive.settings.max.age = Max Message Age
archive.settings.max.age.description = The maximum number of days to keep messages before purging them from the database.
archive.settings.max.retrievable = Retrievable Messages
archive.settings.max.retrievable.description = The number of days worth of messages a user is allowed to retrieve.
archive.settings.index.settings = Index Settings
archive.settings.index.settings.description = View and/or rebuild the current Search Index.
archive.settings.current.index = Current Search Index
......
......@@ -50,6 +50,10 @@ archive.settings.idle.time = Doba ne\u010dinnosti
archive.settings.idle.time.description = Po\u010det minut b\u011bhem kter\u00fdch m\u016f\u017ee b\u00fdt konverzace ne\u010dinn\u00e1 ne\u017e bude ukon\u010dena.
archive.settings.max.time = Maxim\u00e1ln\u00ed \u010das
archive.settings.max.time.description = Maxim\u00e1ln\u00ed po\u010det minut trv\u00e1n\u00ed konverzace ne\u017e bude ukon\u010dena.
archive.settings.max.age = Max Message Age
archive.settings.max.age.description = The maximum number of days to keep messages before purging them from the database.
archive.settings.max.retrievable = Retrievable Messages
archive.settings.max.retrievable.description = The number of days worth of messages a user is allowed to retrieve.
archive.settings.index.settings = Nastaven\u00ed index\u016f
archive.settings.index.settings.description = Prohl\u00ed\u017een\u00ed a/nebo obnova aktu\u00e1ln\u00edho vyhled\u00e1vac\u00edho indexu.
archive.settings.current.index = Aktu\u00e1ln\u00ed vyhled\u00e1vac\u00ed index
......
......@@ -50,6 +50,10 @@ archive.settings.idle.time = Tiempo Ocioso
archive.settings.idle.time.description = El n\u00famero de minutos una conversaci\u00f3n puede estar libre antes de ser finalizada.
archive.settings.max.time = Tiempo M\u00e1ximo
archive.settings.max.time.description = El m\u00e1ximo n\u00famero de minutos una conversacion puede durar antes de ser finalizada.
archive.settings.max.age = Max Message Age
archive.settings.max.age.description = The maximum number of days to keep messages before purging them from the database.
archive.settings.max.retrievable = Max Retrievable Messages
archive.settings.max.retrievable.description = The number of days worth of messages a user is allowed to retrieve.
archive.settings.index.settings = Seteos de Indices
archive.settings.index.settings.description = Ver y/o reconstruir el \u00edndice de b\u00fasqueda actual.
archive.settings.current.index = Indice de B\u00fasqueda Actual
......
......@@ -110,6 +110,10 @@ archive.settings.index.settings=Param\u00e8tres d'Indexage
archive.settings.index.settings.description=Voir et/ou reconstruire les indexes de recherche.
archive.settings.max.time=Temps Max
archive.settings.max.time.description=Le nombre de minutes maximum d'une conversation avant qu'elle ne soit coup\u00e9e automatiquement.
archive.settings.max.age = Max Message Age
archive.settings.max.age.description = The maximum number of days to keep messages before purging them from the database.
archive.settings.max.retrievable = Retrievable Messages
archive.settings.max.retrievable.description = The number of days worth of messages a user is allowed to retrieve.
archive.settings.message.count=Nombre de messages archiv\u00e9s
archive.settings.message.count.description=Le nombre total de messages archiv\u00e9s.
archive.settings.message.metadata.description=Activer ou D\u00e9sactiver l'archivage des messages et/ou metadata.
......
......@@ -74,6 +74,10 @@ archive.settings.idle.time = Tempo Ocioso
archive.settings.idle.time.description = N\u00famero de minutos que a conversa pode estar ociosa antes de ser encerrada.
archive.settings.max.time = Tempo M\u00e1ximo
archive.settings.max.time.description = N\u00famero m\u00e1ximo de minutos que a conversa pode durar antes de ser encerrada.
archive.settings.max.age = Max Message Age
archive.settings.max.age.description = The maximum number of days to keep messages before purging them from the database.
archive.settings.max.retrievable = Retrievable Messages
archive.settings.max.retrievable.description = The number of days worth of messages a user is allowed to retrieve.
archive.settings.index.settings = Ajustes de \u00edndice
archive.settings.index.settings.description = Ver e/ou refazer o \u00edndice de Buscas atual.
archive.settings.current.index = \u00edndice de Buscas Atual
......
......@@ -71,6 +71,10 @@ archive.settings.idle.time = \u7a7a\u95f2\u65f6\u95f4
archive.settings.idle.time.description = \u4f1a\u8bdd\u7ed3\u675f\u540e\u7a7a\u95f2\u7684\u65f6\u95f4
archive.settings.max.time = \u6700\u5927\u65f6\u95f4
archive.settings.max.time.description = \u4f1a\u8bdd\u7ed3\u675f\u524d\u53ef\u6301\u7eed\u7684\u6700\u5927\u65f6\u95f4
archive.settings.max.age = Max Message Age
archive.settings.max.age.description = The maximum number of days to keep messages before purging them from the database.
archive.settings.max.retrievable = Retrievable Messages
archive.settings.max.retrievable.description = The number of days worth of messages a user is allowed to retrieve.
archive.settings.index.settings = \u7d22\u5f15\u8bbe\u7f6e
archive.settings.index.settings.description = \u67e5\u770b\u6216\u91cd\u5efa\u5f53\u524d\u7684\u641c\u7d22\u7d22\u5f15
archive.settings.current.index = \u5f53\u524d\u7684\u641c\u7d22\u7d22\u5f15
......
......@@ -4,6 +4,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
......@@ -13,6 +14,9 @@ import java.util.HashSet;
import java.util.List;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.openfire.archive.ConversationManager;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.xmpp.packet.JID;
......@@ -30,23 +34,11 @@ import com.reucon.openfire.plugin.archive.xep0059.XmppResultSet;
public class JdbcPersistenceManager implements PersistenceManager {
public static final int DEFAULT_MAX = 1000;
public static final String SELECT_MESSAGES_BY_CONVERSATION = "SELECT DISTINCT "
+ "ofConversation.conversationID, "
+ "ofConversation.room, "
+ "ofConversation.isExternal, "
+ "ofConversation.startDate, "
+ "ofConversation.lastActivity, "
+ "ofConversation.messageCount, "
+ "ofConParticipant.joinedDate, "
+ "ofConParticipant.leftDate, "
+ "ofConParticipant.bareJID, "
+ "ofConParticipant.jidResource, "
+ "ofConParticipant.nickname, "
+ "ofMessageArchive.fromJID, "
+ "ofMessageArchive.toJID, "
+ "ofMessageArchive.sentDate, "
+ "ofMessageArchive.body "
+ "FROM ofConversation "
public static final String SELECT_MESSAGES_BY_CONVERSATION = "SELECT DISTINCT " + "ofConversation.conversationID, " + "ofConversation.room, "
+ "ofConversation.isExternal, " + "ofConversation.startDate, " + "ofConversation.lastActivity, " + "ofConversation.messageCount, "
+ "ofConParticipant.joinedDate, " + "ofConParticipant.leftDate, " + "ofConParticipant.bareJID, " + "ofConParticipant.jidResource, "
+ "ofConParticipant.nickname, " + "ofMessageArchive.fromJID, " + "ofMessageArchive.toJID, " + "ofMessageArchive.sentDate, "
+ "ofMessageArchive.body " + "FROM ofConversation "
+ "INNER JOIN ofConParticipant ON ofConversation.conversationID = ofConParticipant.conversationID "
+ "INNER JOIN ofMessageArchive ON ofConParticipant.conversationID = ofMessageArchive.conversationID "
+ "WHERE ofConversation.conversationID = ? AND ofConParticipant.bareJID = ? ORDER BY ofMessageArchive.sentDate";
......@@ -55,23 +47,11 @@ public class JdbcPersistenceManager implements PersistenceManager {
// "SELECT messageId,time,direction,type,subject,body "
// + "FROM archiveMessages WHERE conversationId = ? ORDER BY time";
public static final String SELECT_CONVERSATIONS = "SELECT DISTINCT "
+ "ofConversation.conversationID, "
+ "ofConversation.room, "
+ "ofConversation.isExternal, "
+ "ofConversation.startDate, "
+ "ofConversation.lastActivity, "
+ "ofConversation.messageCount, "
+ "ofConParticipant.joinedDate, "
+ "ofConParticipant.leftDate, "
+ "ofConParticipant.bareJID, "
+ "ofConParticipant.jidResource, "
+ "ofConParticipant.nickname, "
+ "ofMessageArchive.fromJID, "
+ "ofMessageArchive.toJID, "
+ "ofMessageArchive.sentDate, "
+ "ofMessageArchive.body "
+ "FROM ofConversation "
public static final String SELECT_CONVERSATIONS = "SELECT DISTINCT " + "ofConversation.conversationID, " + "ofConversation.room, "
+ "ofConversation.isExternal, " + "ofConversation.startDate, " + "ofConversation.lastActivity, " + "ofConversation.messageCount, "
+ "ofConParticipant.joinedDate, " + "ofConParticipant.leftDate, " + "ofConParticipant.bareJID, " + "ofConParticipant.jidResource, "
+ "ofConParticipant.nickname, " + "ofMessageArchive.fromJID, " + "ofMessageArchive.toJID, " + "ofMessageArchive.sentDate, "
+ "ofMessageArchive.body " + "FROM ofConversation "
+ "INNER JOIN ofConParticipant ON ofConversation.conversationID = ofConParticipant.conversationID "
+ "INNER JOIN ofMessageArchive ON ofConParticipant.conversationID = ofMessageArchive.conversationID";
......@@ -101,23 +81,11 @@ public class JdbcPersistenceManager implements PersistenceManager {
public static final String CONVERSATION_WITH_JID = "(ofMessageArchive.toJID = ? OR ofMessageArchive.fromJID = ?)";
// public static final String CONVERSATION_WITH_JID = "c.withJid";
public static final String SELECT_ACTIVE_CONVERSATIONS = "SELECT DISTINCT "
+ "ofConversation.conversationID, "
+ "ofConversation.room, "
+ "ofConversation.isExternal, "
+ "ofConversation.startDate, "
+ "ofConversation.lastActivity, "
+ "ofConversation.messageCount, "
+ "ofConParticipant.joinedDate, "
+ "ofConParticipant.leftDate, "
+ "ofConParticipant.bareJID, "
+ "ofConParticipant.jidResource, "
+ "ofConParticipant.nickname, "
+ "ofMessageArchive.fromJID, "
+ "ofMessageArchive.toJID, "
+ "ofMessageArchive.sentDate, "
+ "ofMessageArchive.body "
+ "FROM ofConversation "
public static final String SELECT_ACTIVE_CONVERSATIONS = "SELECT DISTINCT " + "ofConversation.conversationID, " + "ofConversation.room, "
+ "ofConversation.isExternal, " + "ofConversation.startDate, " + "ofConversation.lastActivity, " + "ofConversation.messageCount, "
+ "ofConParticipant.joinedDate, " + "ofConParticipant.leftDate, " + "ofConParticipant.bareJID, " + "ofConParticipant.jidResource, "
+ "ofConParticipant.nickname, " + "ofMessageArchive.fromJID, " + "ofMessageArchive.toJID, " + "ofMessageArchive.sentDate, "
+ "ofMessageArchive.body " + "FROM ofConversation "
+ "INNER JOIN ofConParticipant ON ofConversation.conversationID = ofConParticipant.conversationID "
+ "INNER JOIN ofMessageArchive ON ofConParticipant.conversationID = ofMessageArchive.conversationID "
+ "WHERE ofConversation.lastActivity > ?";
......@@ -127,12 +95,8 @@ public class JdbcPersistenceManager implements PersistenceManager {
// + " c.subject,c.thread "
// + "FROM archiveConversations AS c WHERE c.endTime > ?";
public static final String SELECT_PARTICIPANTS_BY_CONVERSATION = "SELECT DISTINCT "
+ "ofConversation.conversationID, "
+ "ofConversation.startDate, "
+ "ofConversation.lastActivity, "
+ "ofConParticipant.bareJID "
+ "FROM ofConversation "
public static final String SELECT_PARTICIPANTS_BY_CONVERSATION = "SELECT DISTINCT " + "ofConversation.conversationID, "
+ "ofConversation.startDate, " + "ofConversation.lastActivity, " + "ofConParticipant.bareJID " + "FROM ofConversation "
+ "INNER JOIN ofConParticipant ON ofConversation.conversationID = ofConParticipant.conversationID "
+ "INNER JOIN ofMessageArchive ON ofConParticipant.conversationID = ofMessageArchive.conversationID "
+ "WHERE ofConversation.conversationID = ? ORDER BY ofConversation.startDate";
......@@ -159,20 +123,32 @@ public class JdbcPersistenceManager implements PersistenceManager {
return false;
}
public boolean createParticipant(Participant participant,
Long conversationId) {
public boolean createParticipant(Participant participant, Long conversationId) {
return false;
}
public List<Conversation> findConversations(String[] participants,
Date startDate, Date endDate) {
public List<Conversation> findConversations(String[] participants, Date startDate, Date endDate) {
final List<Conversation> conversations = new ArrayList<Conversation>();
return conversations;
}
public Collection<Conversation> findConversations(Date startDate,
Date endDate, String ownerJid, String withJid,
XmppResultSet xmppResultSet) {
public Date getAuditedStartDate(Date startDate) {
long maxRetrievable = JiveGlobals.getIntProperty("conversation.maxRetrievable", ConversationManager.DEFAULT_MAX_RETRIEVABLE)
* JiveConstants.DAY;
Date result = null;
if (maxRetrievable > 0) {
Date now = new Date();
Date maxRetrievableDate = new Date(now.getTime() - maxRetrievable);
if (startDate == null) {
result = maxRetrievableDate;
} else if (startDate.before(maxRetrievableDate)) {
result = maxRetrievableDate;
}
}
return result;
}
public Collection<Conversation> findConversations(Date startDate, Date endDate, String ownerJid, String withJid, XmppResultSet xmppResultSet) {
final HashMap<Long, Conversation> conversations;
final StringBuilder querySB;
final StringBuilder whereSB;
......@@ -184,6 +160,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
whereSB = new StringBuilder();
limitSB = new StringBuilder();
startDate = getAuditedStartDate(startDate);
if (startDate != null) {
appendWhere(whereSB, CONVERSATION_START_TIME, " >= ?");
}
......@@ -199,22 +176,16 @@ public class JdbcPersistenceManager implements PersistenceManager {
if (xmppResultSet != null) {
Integer firstIndex = null;
int max = xmppResultSet.getMax() != null ? xmppResultSet.getMax()
: DEFAULT_MAX;
int max = xmppResultSet.getMax() != null ? xmppResultSet.getMax() : DEFAULT_MAX;
xmppResultSet.setCount(countConversations(startDate, endDate,
ownerJid, withJid, whereSB.toString()));
xmppResultSet.setCount(countConversations(startDate, endDate, ownerJid, withJid, whereSB.toString()));
if (xmppResultSet.getIndex() != null) {
firstIndex = xmppResultSet.getIndex();
} else if (xmppResultSet.getAfter() != null) {
firstIndex = countConversationsBefore(startDate, endDate,
ownerJid, withJid, xmppResultSet.getAfter(),
whereSB.toString());
firstIndex = countConversationsBefore(startDate, endDate, ownerJid, withJid, xmppResultSet.getAfter(), whereSB.toString());
firstIndex += 1;
} else if (xmppResultSet.getBefore() != null) {
firstIndex = countConversationsBefore(startDate, endDate,
ownerJid, withJid, xmppResultSet.getBefore(),
whereSB.toString());
firstIndex = countConversationsBefore(startDate, endDate, ownerJid, withJid, xmppResultSet.getBefore(), whereSB.toString());
firstIndex -= max;
if (firstIndex < 0) {
firstIndex = 0;
......@@ -239,11 +210,9 @@ public class JdbcPersistenceManager implements PersistenceManager {
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(querySB.toString());
bindConversationParameters(startDate, endDate, ownerJid, withJid,
pstmt);
bindConversationParameters(startDate, endDate, ownerJid, withJid, pstmt);
rs = pstmt.executeQuery();
Log.debug("findConversations: SELECT_CONVERSATIONS: "
+ pstmt.toString());
Log.debug("findConversations: SELECT_CONVERSATIONS: " + pstmt.toString());
while (rs.next()) {
Conversation conv = extractConversation(rs);
conversations.put(conv.getId(), conv);
......@@ -255,12 +224,10 @@ public class JdbcPersistenceManager implements PersistenceManager {
}
if (xmppResultSet != null && conversations.size() > 0) {
ArrayList<Long> sortedConvKeys = new ArrayList<Long>(
conversations.keySet());
ArrayList<Long> sortedConvKeys = new ArrayList<Long>(conversations.keySet());
Collections.sort(sortedConvKeys);
xmppResultSet.setFirst(sortedConvKeys.get(0));
xmppResultSet
.setLast(sortedConvKeys.get(sortedConvKeys.size() - 1));
xmppResultSet.setLast(sortedConvKeys.get(sortedConvKeys.size() - 1));
}
return conversations.values();
}
......@@ -275,8 +242,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
}
}
private int countConversations(Date startDate, Date endDate,
String ownerJid, String withJid, String whereClause) {
private int countConversations(Date startDate, Date endDate, String ownerJid, String withJid, String whereClause) {
StringBuilder querySB;
querySB = new StringBuilder(COUNT_CONVERSATIONS);
......@@ -290,8 +256,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(querySB.toString());
bindConversationParameters(startDate, endDate, ownerJid, withJid,
pstmt);
bindConversationParameters(startDate, endDate, ownerJid, withJid, pstmt);
rs = pstmt.executeQuery();
if (rs.next()) {
return rs.getInt(1);
......@@ -306,8 +271,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
}
}
private int countConversationsBefore(Date startDate, Date endDate,
String ownerJid, String withJid, Long before, String whereClause) {
private int countConversationsBefore(Date startDate, Date endDate, String ownerJid, String withJid, Long before, String whereClause) {
StringBuilder querySB;
querySB = new StringBuilder(COUNT_CONVERSATIONS);
......@@ -325,8 +289,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
int parameterIndex;
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(querySB.toString());
parameterIndex = bindConversationParameters(startDate, endDate,
ownerJid, withJid, pstmt);
parameterIndex = bindConversationParameters(startDate, endDate, ownerJid, withJid, pstmt);
pstmt.setLong(parameterIndex, before);
rs = pstmt.executeQuery();
if (rs.next()) {
......@@ -342,8 +305,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
}
}
private int bindConversationParameters(Date startDate, Date endDate,
String ownerJid, String withJid, PreparedStatement pstmt)
private int bindConversationParameters(Date startDate, Date endDate, String ownerJid, String withJid, PreparedStatement pstmt)
throws SQLException {
int parameterIndex = 1;
......@@ -363,8 +325,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
return parameterIndex;
}
public Collection<Conversation> getActiveConversations(
int conversationTimeout) {
public Collection<Conversation> getActiveConversations(int conversationTimeout) {
final Collection<Conversation> conversations;
final long now = System.currentTimeMillis();
......@@ -438,8 +399,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
return conversations;
}
public Conversation getConversation(String ownerJid, String withJid,
Date start) {
public Conversation getConversation(String ownerJid, String withJid, Date start) {
return getConversation(null, ownerJid, withJid, start);
}
......@@ -447,8 +407,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
return getConversation(conversationId, null, null, null);
}
private Conversation getConversation(Long conversationId, String ownerJid,
String withJid, Date start) {
private Conversation getConversation(Long conversationId, String ownerJid, String withJid, Date start) {
Conversation conversation = null;
StringBuilder querySB;
......@@ -490,8 +449,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
}
}
rs = pstmt.executeQuery();
Log.debug("getConversation: SELECT_CONVERSATIONS: "
+ pstmt.toString());
Log.debug("getConversation: SELECT_CONVERSATIONS: " + pstmt.toString());
if (rs.next()) {
conversation = extractConversation(rs);
} else {
......@@ -505,8 +463,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
pstmt.setLong(1, conversation.getId());
rs = pstmt.executeQuery();
Log.debug("getConversation: SELECT_PARTICIPANTS_BY_CONVERSATION: "
+ pstmt.toString());
Log.debug("getConversation: SELECT_PARTICIPANTS_BY_CONVERSATION: " + pstmt.toString());
while (rs.next()) {
for (Participant participant : extractParticipant(rs)) {
......@@ -522,8 +479,7 @@ public class JdbcPersistenceManager implements PersistenceManager {
pstmt.setString(2, conversation.getOwnerJid());
rs = pstmt.executeQuery();
Log.debug("getConversation: SELECT_MESSAGES_BY_CONVERSATION: "
+ pstmt.toString());
Log.debug("getConversation: SELECT_MESSAGES_BY_CONVERSATION: " + pstmt.toString());
while (rs.next()) {
ArchivedMessage message;
......@@ -567,14 +523,12 @@ public class JdbcPersistenceManager implements PersistenceManager {
if (bareJid != null && fromJid != null && toJid != null) {
if (bareJid.equals(fromJid)) {
/*
* if message from me to withJid then it is to the withJid
* participant
* if message from me to withJid then it is to the withJid participant
*/
direction = Direction.to;
} else {
/*
* if message to me from withJid then it is from the withJid
* participant
* if message to me from withJid then it is from the withJid participant
*/
direction = Direction.from;
}
......@@ -594,14 +548,12 @@ public class JdbcPersistenceManager implements PersistenceManager {
String subject = null;
String thread = String.valueOf(id);
conversation = new Conversation(startDate, ownerJid, ownerResource,
withJid, withResource, subject, thread);
conversation = new Conversation(startDate, ownerJid, ownerResource, withJid, withResource, subject, thread);
conversation.setId(id);
return conversation;
}
private Collection<Participant> extractParticipant(ResultSet rs)
throws SQLException {
private Collection<Participant> extractParticipant(ResultSet rs) throws SQLException {
Collection<Participant> participants = new HashSet<Participant>();
Date startDate = millisToDate(rs.getLong("startDate"));
......
......@@ -67,1003 +67,1044 @@ import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
/**
* Manages all conversations in the system. Optionally, conversations (messages plus
* meta-data) can be archived to the database. Archiving of conversation data is
* enabled by default, but can be disabled by setting "conversation.metadataArchiving" to
* <tt>false</tt>. Archiving of messages in a conversation is disabled by default, but
* can be enabled by setting "conversation.messageArchiving" to <tt>true</tt>.<p>
*
* When running in a cluster only the senior cluster member will keep track of the active
* conversations. Other cluster nodes will forward conversation events that occurred in the
* local node to the senior cluster member. If the senior cluster member goes down then
* current conversations will be terminated and if users keep sending messages between them
* then new conversations will be created.
*
* Manages all conversations in the system. Optionally, conversations (messages plus meta-data) can be archived to the database. Archiving of
* conversation data is enabled by default, but can be disabled by setting "conversation.metadataArchiving" to <tt>false</tt>. Archiving of messages
* in a conversation is disabled by default, but can be enabled by setting "conversation.messageArchiving" to <tt>true</tt>.
* <p>
*
* When running in a cluster only the senior cluster member will keep track of the active conversations. Other cluster nodes will forward conversation
* events that occurred in the local node to the senior cluster member. If the senior cluster member goes down then current conversations will be
* terminated and if users keep sending messages between them then new conversations will be created.
*
* @author Matt Tucker
*/
public class ConversationManager implements Startable, ComponentEventListener {
private static final Logger Log = LoggerFactory.getLogger(ConversationManager.class);
private static final String UPDATE_CONVERSATION =
"UPDATE ofConversation SET lastActivity=?, messageCount=? WHERE conversationID=?";
private static final String UPDATE_PARTICIPANT =
"UPDATE ofConParticipant SET leftDate=? WHERE conversationID=? AND bareJID=? AND jidResource=? AND joinedDate=?";
private static final String INSERT_MESSAGE =
"INSERT INTO ofMessageArchive(conversationID, fromJID, fromJIDResource, toJID, toJIDResource, sentDate, body) " +
"VALUES (?,?,?,?,?,?,?)";
private static final String CONVERSATION_COUNT =
"SELECT COUNT(*) FROM ofConversation";
private static final String MESSAGE_COUNT =
"SELECT COUNT(*) FROM ofMessageArchive";
private static final int DEFAULT_IDLE_TIME = 10;
private static final int DEFAULT_MAX_TIME = 60;
public static final String CONVERSATIONS_KEY = "conversations";
private ConversationEventsQueue conversationEventsQueue;
private TaskEngine taskEngine;
private Map<String, Conversation> conversations = new ConcurrentHashMap<String, Conversation>();
private boolean metadataArchivingEnabled;
/**
* Flag that indicates if messages of one-to-one chats should be archived.
*/
private boolean messageArchivingEnabled;
/**
* Flag that indicates if messages of group chats (in MUC rooms) should be archived.
*/
private boolean roomArchivingEnabled;
/**
* List of room names to archive. When list is empty then all rooms are archived (if
* roomArchivingEnabled is enabled).
*/
private Collection<String> roomsArchived;
private long idleTime;
private long maxTime;
private PropertyEventListener propertyListener;
private Queue<Conversation> conversationQueue;
private Queue<ArchivedMessage> messageQueue;
/**
* Queue of participants that joined or left a conversation. This queue is processed by the
* ArchivingTask.
*/
private Queue<RoomParticipant> participantQueue;
private boolean archivingRunning = false;
private TimerTask archiveTask;
private TimerTask cleanupTask;
private Collection<ConversationListener> conversationListeners;
/**
* Keeps the address of those components that provide the gateway service.
*/
private List<String> gateways;
private XMPPServerInfo serverInfo;
public ConversationManager(TaskEngine taskEngine) {
this.taskEngine = taskEngine;
this.gateways = new CopyOnWriteArrayList<String>();
this.serverInfo = XMPPServer.getInstance().getServerInfo();
this.conversationEventsQueue = new ConversationEventsQueue(this, taskEngine);
}
public void start() {
metadataArchivingEnabled = JiveGlobals.getBooleanProperty("conversation.metadataArchiving", true);
messageArchivingEnabled = JiveGlobals.getBooleanProperty("conversation.messageArchiving", false);
if (messageArchivingEnabled && !metadataArchivingEnabled) {
Log.warn("Metadata archiving must be enabled when message archiving is enabled. Overriding setting.");
metadataArchivingEnabled = true;
}
roomArchivingEnabled = JiveGlobals.getBooleanProperty("conversation.roomArchiving", false);
roomsArchived = StringUtils.stringToCollection(JiveGlobals.getProperty("conversation.roomsArchived", ""));
if (roomArchivingEnabled && !metadataArchivingEnabled) {
Log.warn("Metadata archiving must be enabled when room archiving is enabled. Overriding setting.");
metadataArchivingEnabled = true;
}
idleTime = JiveGlobals.getIntProperty("conversation.idleTime", DEFAULT_IDLE_TIME) *
JiveConstants.MINUTE;
maxTime = JiveGlobals.getIntProperty("conversation.maxTime",
DEFAULT_MAX_TIME) * JiveConstants.MINUTE;
// Listen for any changes to the conversation properties.
propertyListener = new ConversationPropertyListener();
PropertyEventDispatcher.addListener(propertyListener);
conversationQueue = new ConcurrentLinkedQueue<Conversation>();
messageQueue = new ConcurrentLinkedQueue<ArchivedMessage>();
participantQueue = new ConcurrentLinkedQueue<RoomParticipant>();
conversationListeners = new CopyOnWriteArraySet<ConversationListener>();
// Schedule a task to do conversation archiving.
archiveTask = new TimerTask() {
@Override
private static final String UPDATE_CONVERSATION = "UPDATE ofConversation SET lastActivity=?, messageCount=? WHERE conversationID=?";
private static final String UPDATE_PARTICIPANT = "UPDATE ofConParticipant SET leftDate=? WHERE conversationID=? AND bareJID=? AND jidResource=? AND joinedDate=?";
private static final String INSERT_MESSAGE = "INSERT INTO ofMessageArchive(conversationID, fromJID, fromJIDResource, toJID, toJIDResource, sentDate, body) "
+ "VALUES (?,?,?,?,?,?,?)";
private static final String CONVERSATION_COUNT = "SELECT COUNT(*) FROM ofConversation";
private static final String MESSAGE_COUNT = "SELECT COUNT(*) FROM ofMessageArchive";
private static final int DEFAULT_IDLE_TIME = 10;
private static final int DEFAULT_MAX_TIME = 60;
public static final int DEFAULT_MAX_RETRIEVABLE = 0;
private static final int DEFAULT_MAX_AGE = 0;
public static final String CONVERSATIONS_KEY = "conversations";
private ConversationEventsQueue conversationEventsQueue;
private TaskEngine taskEngine;
private Map<String, Conversation> conversations = new ConcurrentHashMap<String, Conversation>();
private boolean metadataArchivingEnabled;
/**
* Flag that indicates if messages of one-to-one chats should be archived.
*/
private boolean messageArchivingEnabled;
/**
* Flag that indicates if messages of group chats (in MUC rooms) should be archived.
*/
private boolean roomArchivingEnabled;
/**
* List of room names to archive. When list is empty then all rooms are archived (if roomArchivingEnabled is enabled).
*/
private Collection<String> roomsArchived;
private long idleTime;
private long maxTime;
private long maxAge;
private long maxRetrievable;
private PropertyEventListener propertyListener;
private Queue<Conversation> conversationQueue;
private Queue<ArchivedMessage> messageQueue;
/**
* Queue of participants that joined or left a conversation. This queue is processed by the ArchivingTask.
*/
private Queue<RoomParticipant> participantQueue;
private boolean archivingRunning = false;
private TimerTask archiveTask;
private TimerTask cleanupTask;
private TimerTask maxAgeTask;
private Collection<ConversationListener> conversationListeners;
/**
* Keeps the address of those components that provide the gateway service.
*/
private List<String> gateways;
private XMPPServerInfo serverInfo;
public ConversationManager(TaskEngine taskEngine) {
this.taskEngine = taskEngine;
this.gateways = new CopyOnWriteArrayList<String>();
this.serverInfo = XMPPServer.getInstance().getServerInfo();
this.conversationEventsQueue = new ConversationEventsQueue(this, taskEngine);
}
public void start() {
metadataArchivingEnabled = JiveGlobals.getBooleanProperty("conversation.metadataArchiving", true);
messageArchivingEnabled = JiveGlobals.getBooleanProperty("conversation.messageArchiving", false);
if (messageArchivingEnabled && !metadataArchivingEnabled) {
Log.warn("Metadata archiving must be enabled when message archiving is enabled. Overriding setting.");
metadataArchivingEnabled = true;
}
roomArchivingEnabled = JiveGlobals.getBooleanProperty("conversation.roomArchiving", false);
roomsArchived = StringUtils.stringToCollection(JiveGlobals.getProperty("conversation.roomsArchived", ""));
if (roomArchivingEnabled && !metadataArchivingEnabled) {
Log.warn("Metadata archiving must be enabled when room archiving is enabled. Overriding setting.");
metadataArchivingEnabled = true;
}
idleTime = JiveGlobals.getIntProperty("conversation.idleTime", DEFAULT_IDLE_TIME) * JiveConstants.MINUTE;
maxTime = JiveGlobals.getIntProperty("conversation.maxTime", DEFAULT_MAX_TIME) * JiveConstants.MINUTE;
maxAge = JiveGlobals.getIntProperty("conversation.maxAge", DEFAULT_MAX_AGE) * JiveConstants.DAY;
maxRetrievable = JiveGlobals.getIntProperty("conversation.maxRetrievable", DEFAULT_MAX_RETRIEVABLE) * JiveConstants.DAY;
// Listen for any changes to the conversation properties.
propertyListener = new ConversationPropertyListener();
PropertyEventDispatcher.addListener(propertyListener);
conversationQueue = new ConcurrentLinkedQueue<Conversation>();
messageQueue = new ConcurrentLinkedQueue<ArchivedMessage>();
participantQueue = new ConcurrentLinkedQueue<RoomParticipant>();
conversationListeners = new CopyOnWriteArraySet<ConversationListener>();
// Schedule a task to do conversation archiving.
archiveTask = new TimerTask() {
@Override
public void run() {
new ArchivingTask().run();
}
};
taskEngine.scheduleAtFixedRate(archiveTask, JiveConstants.MINUTE, JiveConstants.MINUTE);
// Schedule a task to do conversation cleanup.
cleanupTask = new TimerTask() {
@Override
public void run() {
new ArchivingTask().run();
}
};
taskEngine.scheduleAtFixedRate(archiveTask, JiveConstants.MINUTE, JiveConstants.MINUTE);
// Schedule a task to do conversation cleanup.
cleanupTask = new TimerTask() {
@Override
for (String key : conversations.keySet()) {
Conversation conversation = conversations.get(key);
long now = System.currentTimeMillis();
if ((now - conversation.getLastActivity().getTime() > idleTime) || (now - conversation.getStartDate().getTime() > maxTime)) {
removeConversation(key, conversation, new Date(now));
}
}
}
};
taskEngine.scheduleAtFixedRate(cleanupTask, JiveConstants.MINUTE * 5, JiveConstants.MINUTE * 5);
// Schedule a task to do conversation purging.
maxAgeTask = new TimerTask() {
@Override
public void run() {
for (String key : conversations.keySet()) {
Conversation conversation = conversations.get(key);
long now = System.currentTimeMillis();
if ((now - conversation.getLastActivity().getTime() > idleTime) ||
(now - conversation.getStartDate().getTime() > maxTime)) {
removeConversation(key, conversation, new Date(now));
}
}
}
};
taskEngine.scheduleAtFixedRate(cleanupTask, JiveConstants.MINUTE * 5, JiveConstants.MINUTE * 5);
// Register a statistic.
Statistic conversationStat = new Statistic() {
public String getName() {
return LocaleUtils.getLocalizedString("stat.conversation.name", MonitoringConstants.NAME);
}
public Type getStatType() {
return Type.count;
}
public String getDescription() {
return LocaleUtils.getLocalizedString("stat.conversation.desc", MonitoringConstants.NAME);
}
public String getUnits() {
return LocaleUtils.getLocalizedString("stat.conversation.units", MonitoringConstants.NAME);
}
public double sample() {
return getConversationCount();
}
public boolean isPartialSample() {
return false;
}
};
StatisticsManager.getInstance().addStatistic(CONVERSATIONS_KEY, conversationStat);
InternalComponentManager.getInstance().addListener(this);
}
public void stop() {
archiveTask.cancel();
archiveTask = null;
cleanupTask.cancel();
cleanupTask = null;
// Remove the statistics.
StatisticsManager.getInstance().removeStatistic(CONVERSATIONS_KEY);
PropertyEventDispatcher.removeListener(propertyListener);
propertyListener = null;
conversations.clear();
conversations = null;
// Archive anything remaining in the queue before quitting.
new ArchivingTask().run();
conversationQueue.clear();
conversationQueue = null;
messageQueue.clear();
messageQueue = null;
conversationListeners.clear();
conversationListeners = null;
serverInfo = null;
InternalComponentManager.getInstance().removeListener(this);
}
/**
* Returns true if metadata archiving is enabled. Conversation meta-data includes
* the participants, start date, last activity, and the count of messages sent.
* When archiving is enabled, all meta-data is written to the database.
*
* @return true if metadata archiving is enabled.
*/
public boolean isMetadataArchivingEnabled() {
return metadataArchivingEnabled;
}
/**
* Sets whether metadata archiving is enabled. Conversation meta-data includes
* the participants, start date, last activity, and the count of messages sent.
* When archiving is enabled, all meta-data is written to the database.
*
* @param enabled true if archiving should be enabled.
*/
public void setMetadataArchivingEnabled(boolean enabled) {
this.metadataArchivingEnabled = enabled;
JiveGlobals.setProperty("conversation.metadataArchiving", Boolean.toString(enabled));
}
/**
* Returns true if one-to-one chats or group chats messages are being archived.
*
* @return true if one-to-one chats or group chats messages are being archived.
*/
public boolean isArchivingEnabled() {
return isMessageArchivingEnabled() || isRoomArchivingEnabled();
}
/**
* Returns true if message archiving is enabled for one-to-one chats. When enabled, all messages
* in one-to-one conversations are stored in the database. Note: it's not possible for
* meta-data archiving to be disabled when message archiving is enabled; enabling
* message archiving automatically enables meta-data archiving.
*
* @return true if message archiving is enabled.
*/
public boolean isMessageArchivingEnabled() {
return messageArchivingEnabled;
}
/**
* Sets whether message archiving is enabled. When enabled, all messages
* in conversations are stored in the database. Note: it's not possible for
* meta-data archiving to be disabled when message archiving is enabled; enabling
* message archiving automatically enables meta-data archiving.
*
* @param enabled true if message should be enabled.
*/
public void setMessageArchivingEnabled(boolean enabled) {
this.messageArchivingEnabled = enabled;
JiveGlobals.setProperty("conversation.messageArchiving", Boolean.toString(enabled));
// Force metadata archiving enabled.
if (enabled) {
this.metadataArchivingEnabled = true;
}
}
/**
* Returns true if message archiving is enabled for group chats. When enabled, all messages
* in group conversations are stored in the database unless a list of rooms was specified
* in {@link #getRoomsArchived()} . Note: it's not possible for meta-data archiving to be
* disabled when room archiving is enabled; enabling room archiving automatically
* enables meta-data archiving.
*
* @return true if room archiving is enabled.
*/
public boolean isRoomArchivingEnabled() {
return roomArchivingEnabled;
}
/**
* Sets whether message archiving is enabled for group chats. When enabled, all messages
* in group conversations are stored in the database unless a list of rooms was specified
* in {@link #getRoomsArchived()} . Note: it's not possible for meta-data archiving to be
* disabled when room archiving is enabled; enabling room archiving automatically
* enables meta-data archiving.
*
* @param enabled if room archiving is enabled.
*/
public void setRoomArchivingEnabled(boolean enabled) {
this.roomArchivingEnabled = enabled;
JiveGlobals.setProperty("conversation.roomArchiving", Boolean.toString(enabled));
// Force metadata archiving enabled.
if (enabled) {
this.metadataArchivingEnabled = true;
}
}
/**
* Returns list of room names whose messages will be archived. When room archiving is enabled and
* this list is empty then messages of all local rooms will be archived. However, when name of
* rooms are defined in this list then only messages of those rooms will be archived.
*
* @return list of local room names whose messages will be archived.
*/
public Collection<String> getRoomsArchived() {
return roomsArchived;
}
/**
* Sets list of room names whose messages will be archived. When room archiving is enabled and
* this list is empty then messages of all local rooms will be archived. However, when name of
* rooms are defined in this list then only messages of those rooms will be archived.
*
* @param roomsArchived list of local room names whose messages will be archived.
*/
public void setRoomsArchived(Collection<String> roomsArchived) {
this.roomsArchived = roomsArchived;
JiveGlobals.setProperty("conversation.roomsArchived", StringUtils.collectionToString(roomsArchived));
}
/**
* Returns the number of minutes a conversation can be idle before it's ended.
*
* @return the conversation idle time.
*/
public int getIdleTime() {
return (int)(idleTime / JiveConstants.MINUTE);
}
/**
* Sets the number of minutes a conversation can be idle before it's ended.
*
* @param idleTime the max number of minutes a conversation can be idle before it's ended.
* @throws IllegalArgumentException if idleTime is less than 1.
*/
public void setIdleTime(int idleTime) {
if (idleTime < 1) {
throw new IllegalArgumentException("Idle time less than 1 is not valid: " + idleTime);
}
JiveGlobals.setProperty("conversation.idleTime", Integer.toString(idleTime));
this.idleTime = idleTime * JiveConstants.MINUTE;
}
/**
* Returns the maximum number of minutes a conversation can last before it's ended.
* Any additional messages between the participants in the chat will be associated
* with a new conversation.
*
* @return the maximum number of minutes a conversation can last.
*/
public int getMaxTime() {
return (int)(maxTime / JiveConstants.MINUTE);
}
/**
* Sets the maximum number of minutes a conversation can last before it's ended.
* Any additional messages between the participants in the chat will be associated
* with a new conversation.
*
* @param maxTime the maximum number of minutes a conversation can last.
* @throws IllegalArgumentException if maxTime is less than 1.
*/
public void setMaxTime(int maxTime) {
if (maxTime < 1) {
throw new IllegalArgumentException("Max time less than 1 is not valid: " + maxTime);
}
JiveGlobals.setProperty("conversation.maxTime", Integer.toString(maxTime));
this.maxTime = maxTime * JiveConstants.MINUTE;
}
public ConversationEventsQueue getConversationEventsQueue() {
return conversationEventsQueue;
}
/**
* Returns the count of active conversations.
*
* @return the count of active conversations.
*/
public int getConversationCount() {
if (ClusterManager.isSeniorClusterMember()) {
return conversations.size();
}
return (Integer) CacheFactory.doSynchronousClusterTask(new GetConversationCountTask(),
ClusterManager.getSeniorClusterMember().toByteArray());
}
/**
* Returns a conversation by ID.
*
* @param conversationID the ID of the conversation.
* @return the conversation.
* @throws NotFoundException if the conversation could not be found.
*/
public Conversation getConversation(long conversationID) throws NotFoundException {
if (ClusterManager.isSeniorClusterMember()) {
// Search through the currently active conversations.
for (Conversation conversation : conversations.values()) {
if (conversation.getConversationID() == conversationID) {
return conversation;
}
}
// Otherwise, it might be an archived conversation, so attempt to load it.
return new Conversation(this, conversationID);
}
else {
// Get this info from the senior cluster member when running in a cluster
Conversation conversation = (Conversation) CacheFactory.doSynchronousClusterTask(
new GetConversationTask(conversationID), ClusterManager.getSeniorClusterMember().toByteArray());
if (conversation == null) {
throw new NotFoundException("Conversation not found: " + conversationID);
}
return conversation;
}
}
/**
* Returns the set of active conversations.
*
* @return the active conversations.
*/
public Collection<Conversation> getConversations() {
if (ClusterManager.isSeniorClusterMember()) {
List<Conversation> conversationList = new ArrayList<Conversation>(conversations.values());
// Sort the conversations by creation date.
Collections.sort(conversationList, new Comparator<Conversation>() {
public int compare(Conversation c1, Conversation c2) {
return c1.getStartDate().compareTo(c2.getStartDate());
}
});
return conversationList;
}
else {
// Get this info from the senior cluster member when running in a cluster
return (Collection<Conversation>) CacheFactory.doSynchronousClusterTask(new GetConversationsTask(),
ClusterManager.getSeniorClusterMember().toByteArray());
}
}
/**
* Returns the total number of conversations that have been archived to the database.
* The archived conversation may only be the meta-data, or it might include messages
* as well if message archiving is turned on.
*
* @return the total number of archived conversations.
*/
public int getArchivedConversationCount() {
int conversationCount = 0;
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(CONVERSATION_COUNT);
rs = pstmt.executeQuery();
if (rs.next()) {
conversationCount = rs.getInt(1);
}
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
return conversationCount;
}
/**
* Returns the total number of messages that have been archived to the database.
*
* @return the total number of archived messages.
*/
public int getArchivedMessageCount() {
int messageCount = 0;
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(MESSAGE_COUNT);
rs = pstmt.executeQuery();
if (rs.next()) {
messageCount = rs.getInt(1);
}
}
catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
}
finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
return messageCount;
}
/**
* Adds a conversation listener, which will be notified of newly created conversations,
* conversations ending, and updates to conversations.
*
* @param listener the conversation listener.
*/
public void addConversationListener(ConversationListener listener) {
conversationListeners.add(listener);
}
/**
* Removes a conversation listener.
*
* @param listener the conversation listener.
*/
public void removeConversationListener(ConversationListener listener) {
conversationListeners.remove(listener);
}
/**
* Processes an incoming message of a one-to-one chat. The message will mapped to a
* conversation and then queued for storage if archiving is turned on.
*
* @param sender sender of the message.
* @param receiver receiver of the message.
* @param body body of the message.
* @param date date when the message was sent.
*/
void processMessage(JID sender, JID receiver, String body, Date date) {
String conversationKey = getConversationKey(sender, receiver);
synchronized (conversationKey.intern()) {
Conversation conversation = conversations.get(conversationKey);
// Create a new conversation if necessary.
if (conversation == null) {
Collection<JID> participants = new ArrayList<JID>(2);
participants.add(sender);
participants.add(receiver);
XMPPServer server = XMPPServer.getInstance();
// Check to see if this is an external conversation; i.e. one of the participants
// is on a different server. We can use XOR since we know that both JID's can't
// be external.
boolean external = isExternal(server, sender) ^ isExternal(server, receiver);
// Make sure that the user joined the conversation before a message was received
Date start = new Date(date.getTime() - 1);
conversation = new Conversation(this, participants, external, start);
conversations.put(conversationKey, conversation);
// Notify listeners of the newly created conversation.
for (ConversationListener listener : conversationListeners) {
listener.conversationCreated(conversation);
}
}
// Check to see if the current conversation exceeds either the max idle time
// or max conversation time.
else if ((date.getTime() - conversation.getLastActivity().getTime() > idleTime) ||
(date.getTime() - conversation.getStartDate().getTime() > maxTime)) {
removeConversation(conversationKey, conversation, conversation.getLastActivity());
Collection<JID> participants = new ArrayList<JID>(2);
participants.add(sender);
participants.add(receiver);
XMPPServer server = XMPPServer.getInstance();
// Check to see if this is an external conversation; i.e. one of the participants
// is on a different server. We can use XOR since we know that both JID's can't
// be external.
boolean external = isExternal(server, sender) ^ isExternal(server, receiver);
// Make sure that the user joined the conversation before a message was received
Date start = new Date(date.getTime() - 1);
conversation = new Conversation(this, participants, external, start);
conversations.put(conversationKey, conversation);
// Notify listeners of the newly created conversation.
for (ConversationListener listener : conversationListeners) {
listener.conversationCreated(conversation);
}
}
// Record the newly received message.
conversation.messageReceived(sender, date);
if (metadataArchivingEnabled) {
conversationQueue.add(conversation);
}
if (messageArchivingEnabled) {
messageQueue
.add(new ArchivedMessage(conversation.getConversationID(), sender, receiver, date, body, false));
}
// Notify listeners of the conversation update.
for (ConversationListener listener : conversationListeners) {
listener.conversationUpdated(conversation, date);
}
}
}
/**
* Processes an incoming message sent to a room. The message will mapped to a conversation and then
* queued for storage if archiving is turned on.
*
* @param roomJID the JID of the room where the group conversation is taking place.
* @param sender the JID of the entity that sent the message.
* @param nickname nickname of the user in the room when the message was sent.
* @param body the message sent to the room.
* @param date date when the message was sent.
*/
void processRoomMessage(JID roomJID, JID sender, String nickname, String body, Date date) {
String conversationKey = getRoomConversationKey(roomJID);
synchronized (conversationKey.intern()) {
Conversation conversation = conversations.get(conversationKey);
// Create a new conversation if necessary.
if (conversation == null) {
// Make sure that the user joined the conversation before a message was received
Date start = new Date(date.getTime() - 1);
conversation = new Conversation(this, roomJID, false, start);
conversations.put(conversationKey, conversation);
// Notify listeners of the newly created conversation.
for (ConversationListener listener : conversationListeners) {
listener.conversationCreated(conversation);
}
}
// Check to see if the current conversation exceeds either the max idle time
// or max conversation time.
else if ((date.getTime() - conversation.getLastActivity().getTime() > idleTime) ||
(date.getTime() - conversation.getStartDate().getTime() > maxTime)) {
removeConversation(conversationKey, conversation, conversation.getLastActivity());
// Make sure that the user joined the conversation before a message was received
Date start = new Date(date.getTime() - 1);
conversation = new Conversation(this, roomJID, false, start);
conversations.put(conversationKey, conversation);
// Notify listeners of the newly created conversation.
for (ConversationListener listener : conversationListeners) {
listener.conversationCreated(conversation);
}
}
// Record the newly received message.
conversation.messageReceived(sender, date);
if (metadataArchivingEnabled) {
conversationQueue.add(conversation);
}
if (roomArchivingEnabled && (roomsArchived.isEmpty() || roomsArchived.contains(roomJID.getNode()))) {
JID jid = new JID(roomJID + "/" + nickname);
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, jid, date, body, false));
}
// Notify listeners of the conversation update.
for (ConversationListener listener : conversationListeners) {
listener.conversationUpdated(conversation, date);
}
}
}
/**
* Notification message indicating that a user joined a groupchat conversation. If
* no groupchat conversation was taking place in the specified room then ignore this
* event.<p>
* <p/>
* Eventually, when a new conversation will start in the room and if this user is
* still in the room then the new conversation will detect this user and mark like
* if the user joined the converstion from the beginning.
*
* @param room the room where the user joined.
* @param user the user that joined the room.
* @param nickname nickname of the user in the room.
* @param date date when the user joined the group coversation.
*/
void joinedGroupConversation(JID room, JID user, String nickname, Date date) {
Conversation conversation = getRoomConversation(room);
if (conversation != null) {
conversation.participantJoined(user, nickname, date.getTime());
}
}
/**
* Notification message indicating that a user left a groupchat conversation. If
* no groupchat conversation was taking place in the specified room then ignore this
* event.
*
* @param room the room where the user left.
* @param user the user that left the room.
* @param date date when the user left the group coversation.
*/
void leftGroupConversation(JID room, JID user, Date date) {
Conversation conversation = getRoomConversation(room);
if (conversation != null) {
conversation.participantLeft(user, date.getTime());
}
}
void roomConversationEnded(JID room, Date date) {
Conversation conversation = getRoomConversation(room);
if (conversation != null) {
removeConversation(room.toString(), conversation, date);
}
}
private void removeConversation(String key, Conversation conversation, Date date) {
conversations.remove(key);
// Notify conversation that it has ended
conversation.conversationEnded(date);
// Notify listeners of the conversation ending.
for (ConversationListener listener : conversationListeners) {
listener.conversationEnded(conversation);
}
}
/**
* Returns the group conversation taking place in the specified room or <tt>null</tt> if none.
*
* @param room JID of the room.
* @return the group conversation taking place in the specified room or null if none.
*/
private Conversation getRoomConversation(JID room) {
String conversationKey = room.toString();
return conversations.get(conversationKey);
}
private boolean isExternal(XMPPServer server, JID jid) {
return !server.isLocal(jid) || gateways.contains(jid.getDomain());
}
/**
* Returns true if the specified message should be processed by the conversation manager.
* Only messages between two users, group chats, or gateways are processed.
*
* @param message the message to analyze.
* @return true if the specified message should be processed by the conversation manager.
*/
boolean isConversation(Message message) {
if (Message.Type.normal == message.getType() || Message.Type.chat == message.getType()) {
// TODO: how should conversations with components on other servers be handled?
return isConversationJID(message.getFrom()) && isConversationJID(message.getTo());
}
return false;
}
/**
* Returns true if the specified JID should be recorded in a conversation.
*
* @param jid the JID.
* @return true if the JID should be recorded in a conversation.
*/
private boolean isConversationJID(JID jid) {
// Ignore conversations when there is no jid
if (jid == null) {
return false;
}
XMPPServer server = XMPPServer.getInstance();
if (jid.getNode() == null) {
return false;
}
// Always accept local JIDs or JIDs related to gateways
// (this filters our components, MUC, pubsub, etc. except gateways).
if (server.isLocal(jid) || gateways.contains(jid.getDomain())) {
return true;
}
// If not a local JID, always record it.
if (!jid.getDomain().endsWith(serverInfo.getXMPPDomain())) {
return true;
}
// Otherwise return false.
return false;
}
/**
* Returns a unique key for a coversation between two JID's. The order of two JID parameters
* is irrelevant; the same key will be returned.
*
* @param jid1 the first JID.
* @param jid2 the second JID.
* @return a unique key.
*/
String getConversationKey(JID jid1, JID jid2) {
StringBuilder builder = new StringBuilder();
if (jid1.compareTo(jid2) < 0) {
builder.append(jid1.toBareJID()).append("_").append(jid2.toBareJID());
}
else {
builder.append(jid2.toBareJID()).append("_").append(jid1.toBareJID());
}
return builder.toString();
}
String getRoomConversationKey(JID roomJID) {
return roomJID.toString();
}
public void componentInfoReceived(IQ iq) {
//Check if the component is a gateway
boolean gatewayFound = false;
Element childElement = iq.getChildElement();
for (Iterator<Element> it = childElement.elementIterator("identity"); it.hasNext();) {
Element identity = it.next();
if ("gateway".equals(identity.attributeValue("category"))) {
gatewayFound = true;
}
}
// If component is a gateway then keep track of the component
if (gatewayFound) {
gateways.add(iq.getFrom().getDomain());
}
}
public void componentRegistered(JID componentJID) {
//Do nothing
}
public void componentUnregistered(JID componentJID) {
// Remove stored information about this component
gateways.remove(componentJID.getDomain());
}
void queueParticipantLeft(Conversation conversation, JID user, ConversationParticipation participation) {
RoomParticipant updatedParticipant = new RoomParticipant();
updatedParticipant.conversationID = conversation.getConversationID();
updatedParticipant.user = user;
updatedParticipant.joined = participation.getJoined();
updatedParticipant.left = participation.getLeft();
participantQueue.add(updatedParticipant);
}
/**
* A task that persists conversation meta-data and messages to the database.
*/
private class ArchivingTask implements Runnable {
public void run() {
synchronized (this) {
if (archivingRunning) {
return;
}
archivingRunning = true;
}
if (!messageQueue.isEmpty() || !conversationQueue.isEmpty() || !participantQueue.isEmpty()) {
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(INSERT_MESSAGE);
ArchivedMessage message;
int count = 0;
while ((message = messageQueue.poll()) != null) {
pstmt.setLong(1, message.getConversationID());
pstmt.setString(2, message.getFromJID().toBareJID());
pstmt.setString(3, message.getFromJID().getResource());
pstmt.setString(4, message.getToJID().toBareJID());
pstmt.setString(5, message.getToJID().getResource());
pstmt.setLong(6, message.getSentDate().getTime());
DbConnectionManager.setLargeTextField(pstmt, 7, message.getBody());
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.addBatch();
}
else {
pstmt.execute();
}
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
pstmt = con.prepareStatement(UPDATE_CONVERSATION);
Conversation conversation;
count = 0;
while ((conversation = conversationQueue.poll()) != null) {
pstmt.setLong(1, conversation.getLastActivity().getTime());
pstmt.setInt(2, conversation.getMessageCount());
pstmt.setLong(3, conversation.getConversationID());
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.addBatch();
}
else {
pstmt.execute();
}
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
pstmt = con.prepareStatement(UPDATE_PARTICIPANT);
RoomParticipant particpiant;
count = 0;
while ((particpiant = participantQueue.poll()) != null) {
pstmt.setLong(1, particpiant.left.getTime());
pstmt.setLong(2, particpiant.conversationID);
pstmt.setString(3, particpiant.user.toBareJID());
pstmt.setString(4, particpiant.user.getResource() == null ? " " : particpiant.user.getResource());
pstmt.setLong(5, particpiant.joined.getTime());
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.addBatch();
}
else {
pstmt.execute();
}
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
}
catch (Exception e) {
Log.error(e.getMessage(), e);
}
finally {
DbConnectionManager.closeConnection(pstmt, con);
}
}
// Set archiving running back to false.
archivingRunning = false;
}
}
/**
* A PropertyEventListener that tracks updates to Jive properties that are related
* to conversation tracking and archiving.
*/
private class ConversationPropertyListener implements PropertyEventListener {
public void propertySet(String property, Map<String, Object> params) {
if (property.equals("conversation.metadataArchiving")) {
String value = (String)params.get("value");
metadataArchivingEnabled = Boolean.valueOf(value);
}
else if (property.equals("conversation.messageArchiving")) {
String value = (String)params.get("value");
messageArchivingEnabled = Boolean.valueOf(value);
// Force metadata archiving enabled on if message archiving on.
if (messageArchivingEnabled) {
metadataArchivingEnabled = true;
}
}
else if (property.equals("conversation.roomArchiving")) {
String value = (String)params.get("value");
roomArchivingEnabled = Boolean.valueOf(value);
// Force metadata archiving enabled on if message archiving on.
if (roomArchivingEnabled) {
metadataArchivingEnabled = true;
}
}
else if (property.equals("conversation.roomsArchived")) {
String value = (String)params.get("value");
roomsArchived = StringUtils.stringToCollection(value);
}
else if (property.equals("conversation.idleTime")) {
String value = (String)params.get("value");
try {
idleTime = Integer.parseInt(value) * JiveConstants.MINUTE;
}
catch (Exception e) {
Log.error(e.getMessage(), e);
idleTime = DEFAULT_IDLE_TIME * JiveConstants.MINUTE;
}
}
else if (property.equals("conversation.maxTime")) {
String value = (String)params.get("value");
try {
maxTime = Integer.parseInt(value) * JiveConstants.MINUTE;
}
catch (Exception e) {
Log.error(e.getMessage(), e);
maxTime = DEFAULT_MAX_TIME * JiveConstants.MINUTE;
}
}
}
public void propertyDeleted(String property, Map<String, Object> params) {
if (property.equals("conversation.metadataArchiving")) {
metadataArchivingEnabled = true;
}
else if (property.equals("conversation.messageArchiving")) {
messageArchivingEnabled = false;
}
else if (property.equals("conversation.roomArchiving")) {
roomArchivingEnabled = false;
}
else if (property.equals("conversation.roomsArchived")) {
roomsArchived = Collections.emptyList();
}
else if (property.equals("conversation.idleTime")) {
idleTime = DEFAULT_IDLE_TIME * JiveConstants.MINUTE;
}
else if (property.equals("conversation.maxTime")) {
maxTime = DEFAULT_MAX_TIME * JiveConstants.MINUTE;
}
}
public void xmlPropertySet(String property, Map<String, Object> params) {
// Ignore.
}
public void xmlPropertyDeleted(String property, Map<String, Object> params) {
// Ignore.
}
}
private static class RoomParticipant {
private long conversationID = -1;
private JID user;
private Date joined;
private Date left;
}
// Delete conversations older than maxAge days
// TODO
}
};
taskEngine.scheduleAtFixedRate(maxAgeTask, JiveConstants.MINUTE, JiveConstants.DAY);
// Register a statistic.
Statistic conversationStat = new Statistic() {
public String getName() {
return LocaleUtils.getLocalizedString("stat.conversation.name", MonitoringConstants.NAME);
}
public Type getStatType() {
return Type.count;
}
public String getDescription() {
return LocaleUtils.getLocalizedString("stat.conversation.desc", MonitoringConstants.NAME);
}
public String getUnits() {
return LocaleUtils.getLocalizedString("stat.conversation.units", MonitoringConstants.NAME);
}
public double sample() {
return getConversationCount();
}
public boolean isPartialSample() {
return false;
}
};
StatisticsManager.getInstance().addStatistic(CONVERSATIONS_KEY, conversationStat);
InternalComponentManager.getInstance().addListener(this);
}
public void stop() {
archiveTask.cancel();
archiveTask = null;
cleanupTask.cancel();
cleanupTask = null;
// Remove the statistics.
StatisticsManager.getInstance().removeStatistic(CONVERSATIONS_KEY);
PropertyEventDispatcher.removeListener(propertyListener);
propertyListener = null;
conversations.clear();
conversations = null;
// Archive anything remaining in the queue before quitting.
new ArchivingTask().run();
conversationQueue.clear();
conversationQueue = null;
messageQueue.clear();
messageQueue = null;
conversationListeners.clear();
conversationListeners = null;
serverInfo = null;
InternalComponentManager.getInstance().removeListener(this);
}
/**
* Returns true if metadata archiving is enabled. Conversation meta-data includes the participants, start date, last activity, and the count of
* messages sent. When archiving is enabled, all meta-data is written to the database.
*
* @return true if metadata archiving is enabled.
*/
public boolean isMetadataArchivingEnabled() {
return metadataArchivingEnabled;
}
/**
* Sets whether metadata archiving is enabled. Conversation meta-data includes the participants, start date, last activity, and the count of
* messages sent. When archiving is enabled, all meta-data is written to the database.
*
* @param enabled
* true if archiving should be enabled.
*/
public void setMetadataArchivingEnabled(boolean enabled) {
this.metadataArchivingEnabled = enabled;
JiveGlobals.setProperty("conversation.metadataArchiving", Boolean.toString(enabled));
}
/**
* Returns true if one-to-one chats or group chats messages are being archived.
*
* @return true if one-to-one chats or group chats messages are being archived.
*/
public boolean isArchivingEnabled() {
return isMessageArchivingEnabled() || isRoomArchivingEnabled();
}
/**
* Returns true if message archiving is enabled for one-to-one chats. When enabled, all messages in one-to-one conversations are stored in the
* database. Note: it's not possible for meta-data archiving to be disabled when message archiving is enabled; enabling message archiving
* automatically enables meta-data archiving.
*
* @return true if message archiving is enabled.
*/
public boolean isMessageArchivingEnabled() {
return messageArchivingEnabled;
}
/**
* Sets whether message archiving is enabled. When enabled, all messages in conversations are stored in the database. Note: it's not possible for
* meta-data archiving to be disabled when message archiving is enabled; enabling message archiving automatically enables meta-data archiving.
*
* @param enabled
* true if message should be enabled.
*/
public void setMessageArchivingEnabled(boolean enabled) {
this.messageArchivingEnabled = enabled;
JiveGlobals.setProperty("conversation.messageArchiving", Boolean.toString(enabled));
// Force metadata archiving enabled.
if (enabled) {
this.metadataArchivingEnabled = true;
}
}
/**
* Returns true if message archiving is enabled for group chats. When enabled, all messages in group conversations are stored in the database
* unless a list of rooms was specified in {@link #getRoomsArchived()} . Note: it's not possible for meta-data archiving to be disabled when room
* archiving is enabled; enabling room archiving automatically enables meta-data archiving.
*
* @return true if room archiving is enabled.
*/
public boolean isRoomArchivingEnabled() {
return roomArchivingEnabled;
}
/**
* Sets whether message archiving is enabled for group chats. When enabled, all messages in group conversations are stored in the database unless
* a list of rooms was specified in {@link #getRoomsArchived()} . Note: it's not possible for meta-data archiving to be disabled when room
* archiving is enabled; enabling room archiving automatically enables meta-data archiving.
*
* @param enabled
* if room archiving is enabled.
*/
public void setRoomArchivingEnabled(boolean enabled) {
this.roomArchivingEnabled = enabled;
JiveGlobals.setProperty("conversation.roomArchiving", Boolean.toString(enabled));
// Force metadata archiving enabled.
if (enabled) {
this.metadataArchivingEnabled = true;
}
}
/**
* Returns list of room names whose messages will be archived. When room archiving is enabled and this list is empty then messages of all local
* rooms will be archived. However, when name of rooms are defined in this list then only messages of those rooms will be archived.
*
* @return list of local room names whose messages will be archived.
*/
public Collection<String> getRoomsArchived() {
return roomsArchived;
}
/**
* Sets list of room names whose messages will be archived. When room archiving is enabled and this list is empty then messages of all local rooms
* will be archived. However, when name of rooms are defined in this list then only messages of those rooms will be archived.
*
* @param roomsArchived
* list of local room names whose messages will be archived.
*/
public void setRoomsArchived(Collection<String> roomsArchived) {
this.roomsArchived = roomsArchived;
JiveGlobals.setProperty("conversation.roomsArchived", StringUtils.collectionToString(roomsArchived));
}
/**
* Returns the number of minutes a conversation can be idle before it's ended.
*
* @return the conversation idle time.
*/
public int getIdleTime() {
return (int) (idleTime / JiveConstants.MINUTE);
}
/**
* Sets the number of minutes a conversation can be idle before it's ended.
*
* @param idleTime
* the max number of minutes a conversation can be idle before it's ended.
* @throws IllegalArgumentException
* if idleTime is less than 1.
*/
public void setIdleTime(int idleTime) {
if (idleTime < 1) {
throw new IllegalArgumentException("Idle time less than 1 is not valid: " + idleTime);
}
JiveGlobals.setProperty("conversation.idleTime", Integer.toString(idleTime));
this.idleTime = idleTime * JiveConstants.MINUTE;
}
/**
* Returns the maximum number of minutes a conversation can last before it's ended. Any additional messages between the participants in the chat
* will be associated with a new conversation.
*
* @return the maximum number of minutes a conversation can last.
*/
public int getMaxTime() {
return (int) (maxTime / JiveConstants.MINUTE);
}
/**
* Sets the maximum number of minutes a conversation can last before it's ended. Any additional messages between the participants in the chat will
* be associated with a new conversation.
*
* @param maxTime
* the maximum number of minutes a conversation can last.
* @throws IllegalArgumentException
* if maxTime is less than 1.
*/
public void setMaxTime(int maxTime) {
if (maxTime < 1) {
throw new IllegalArgumentException("Max time less than 1 is not valid: " + maxTime);
}
JiveGlobals.setProperty("conversation.maxTime", Integer.toString(maxTime));
this.maxTime = maxTime * JiveConstants.MINUTE;
}
public int getMaxAge() {
return (int) (maxAge / JiveConstants.DAY);
}
public void setMaxAge(int maxAge) {
if (maxAge < 0) {
throw new IllegalArgumentException("Max age less than 0 is not valid: " + maxAge);
}
JiveGlobals.setProperty("conversation.maxAge", Integer.toString(maxAge));
this.maxAge = maxAge * JiveConstants.DAY;
}
public int getMaxRetrievable() {
return (int) (maxRetrievable / JiveConstants.DAY);
}
public void setMaxRetrievable(int maxRetrievable) {
if (maxRetrievable < 0) {
throw new IllegalArgumentException("Max retrievable less than 0 is not valid: " + maxRetrievable);
}
JiveGlobals.setProperty("conversation.maxRetrievable", Integer.toString(maxRetrievable));
this.maxRetrievable = maxRetrievable * JiveConstants.DAY;
}
public ConversationEventsQueue getConversationEventsQueue() {
return conversationEventsQueue;
}
/**
* Returns the count of active conversations.
*
* @return the count of active conversations.
*/
public int getConversationCount() {
if (ClusterManager.isSeniorClusterMember()) {
return conversations.size();
}
return (Integer) CacheFactory.doSynchronousClusterTask(new GetConversationCountTask(), ClusterManager.getSeniorClusterMember().toByteArray());
}
/**
* Returns a conversation by ID.
*
* @param conversationID
* the ID of the conversation.
* @return the conversation.
* @throws NotFoundException
* if the conversation could not be found.
*/
public Conversation getConversation(long conversationID) throws NotFoundException {
if (ClusterManager.isSeniorClusterMember()) {
// Search through the currently active conversations.
for (Conversation conversation : conversations.values()) {
if (conversation.getConversationID() == conversationID) {
return conversation;
}
}
// Otherwise, it might be an archived conversation, so attempt to load it.
return new Conversation(this, conversationID);
} else {
// Get this info from the senior cluster member when running in a cluster
Conversation conversation = (Conversation) CacheFactory.doSynchronousClusterTask(new GetConversationTask(conversationID), ClusterManager
.getSeniorClusterMember().toByteArray());
if (conversation == null) {
throw new NotFoundException("Conversation not found: " + conversationID);
}
return conversation;
}
}
/**
* Returns the set of active conversations.
*
* @return the active conversations.
*/
public Collection<Conversation> getConversations() {
if (ClusterManager.isSeniorClusterMember()) {
List<Conversation> conversationList = new ArrayList<Conversation>(conversations.values());
// Sort the conversations by creation date.
Collections.sort(conversationList, new Comparator<Conversation>() {
public int compare(Conversation c1, Conversation c2) {
return c1.getStartDate().compareTo(c2.getStartDate());
}
});
return conversationList;
} else {
// Get this info from the senior cluster member when running in a cluster
return (Collection<Conversation>) CacheFactory.doSynchronousClusterTask(new GetConversationsTask(), ClusterManager
.getSeniorClusterMember().toByteArray());
}
}
/**
* Returns the total number of conversations that have been archived to the database. The archived conversation may only be the meta-data, or it
* might include messages as well if message archiving is turned on.
*
* @return the total number of archived conversations.
*/
public int getArchivedConversationCount() {
int conversationCount = 0;
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(CONVERSATION_COUNT);
rs = pstmt.executeQuery();
if (rs.next()) {
conversationCount = rs.getInt(1);
}
} catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
} finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
return conversationCount;
}
/**
* Returns the total number of messages that have been archived to the database.
*
* @return the total number of archived messages.
*/
public int getArchivedMessageCount() {
int messageCount = 0;
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(MESSAGE_COUNT);
rs = pstmt.executeQuery();
if (rs.next()) {
messageCount = rs.getInt(1);
}
} catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
} finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
return messageCount;
}
/**
* Adds a conversation listener, which will be notified of newly created conversations, conversations ending, and updates to conversations.
*
* @param listener
* the conversation listener.
*/
public void addConversationListener(ConversationListener listener) {
conversationListeners.add(listener);
}
/**
* Removes a conversation listener.
*
* @param listener
* the conversation listener.
*/
public void removeConversationListener(ConversationListener listener) {
conversationListeners.remove(listener);
}
/**
* Processes an incoming message of a one-to-one chat. The message will mapped to a conversation and then queued for storage if archiving is
* turned on.
*
* @param sender
* sender of the message.
* @param receiver
* receiver of the message.
* @param body
* body of the message.
* @param date
* date when the message was sent.
*/
void processMessage(JID sender, JID receiver, String body, Date date) {
String conversationKey = getConversationKey(sender, receiver);
synchronized (conversationKey.intern()) {
Conversation conversation = conversations.get(conversationKey);
// Create a new conversation if necessary.
if (conversation == null) {
Collection<JID> participants = new ArrayList<JID>(2);
participants.add(sender);
participants.add(receiver);
XMPPServer server = XMPPServer.getInstance();
// Check to see if this is an external conversation; i.e. one of the participants
// is on a different server. We can use XOR since we know that both JID's can't
// be external.
boolean external = isExternal(server, sender) ^ isExternal(server, receiver);
// Make sure that the user joined the conversation before a message was received
Date start = new Date(date.getTime() - 1);
conversation = new Conversation(this, participants, external, start);
conversations.put(conversationKey, conversation);
// Notify listeners of the newly created conversation.
for (ConversationListener listener : conversationListeners) {
listener.conversationCreated(conversation);
}
}
// Check to see if the current conversation exceeds either the max idle time
// or max conversation time.
else if ((date.getTime() - conversation.getLastActivity().getTime() > idleTime)
|| (date.getTime() - conversation.getStartDate().getTime() > maxTime)) {
removeConversation(conversationKey, conversation, conversation.getLastActivity());
Collection<JID> participants = new ArrayList<JID>(2);
participants.add(sender);
participants.add(receiver);
XMPPServer server = XMPPServer.getInstance();
// Check to see if this is an external conversation; i.e. one of the participants
// is on a different server. We can use XOR since we know that both JID's can't
// be external.
boolean external = isExternal(server, sender) ^ isExternal(server, receiver);
// Make sure that the user joined the conversation before a message was received
Date start = new Date(date.getTime() - 1);
conversation = new Conversation(this, participants, external, start);
conversations.put(conversationKey, conversation);
// Notify listeners of the newly created conversation.
for (ConversationListener listener : conversationListeners) {
listener.conversationCreated(conversation);
}
}
// Record the newly received message.
conversation.messageReceived(sender, date);
if (metadataArchivingEnabled) {
conversationQueue.add(conversation);
}
if (messageArchivingEnabled) {
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, receiver, date, body, false));
}
// Notify listeners of the conversation update.
for (ConversationListener listener : conversationListeners) {
listener.conversationUpdated(conversation, date);
}
}
}
/**
* Processes an incoming message sent to a room. The message will mapped to a conversation and then queued for storage if archiving is turned on.
*
* @param roomJID
* the JID of the room where the group conversation is taking place.
* @param sender
* the JID of the entity that sent the message.
* @param nickname
* nickname of the user in the room when the message was sent.
* @param body
* the message sent to the room.
* @param date
* date when the message was sent.
*/
void processRoomMessage(JID roomJID, JID sender, String nickname, String body, Date date) {
String conversationKey = getRoomConversationKey(roomJID);
synchronized (conversationKey.intern()) {
Conversation conversation = conversations.get(conversationKey);
// Create a new conversation if necessary.
if (conversation == null) {
// Make sure that the user joined the conversation before a message was received
Date start = new Date(date.getTime() - 1);
conversation = new Conversation(this, roomJID, false, start);
conversations.put(conversationKey, conversation);
// Notify listeners of the newly created conversation.
for (ConversationListener listener : conversationListeners) {
listener.conversationCreated(conversation);
}
}
// Check to see if the current conversation exceeds either the max idle time
// or max conversation time.
else if ((date.getTime() - conversation.getLastActivity().getTime() > idleTime)
|| (date.getTime() - conversation.getStartDate().getTime() > maxTime)) {
removeConversation(conversationKey, conversation, conversation.getLastActivity());
// Make sure that the user joined the conversation before a message was received
Date start = new Date(date.getTime() - 1);
conversation = new Conversation(this, roomJID, false, start);
conversations.put(conversationKey, conversation);
// Notify listeners of the newly created conversation.
for (ConversationListener listener : conversationListeners) {
listener.conversationCreated(conversation);
}
}
// Record the newly received message.
conversation.messageReceived(sender, date);
if (metadataArchivingEnabled) {
conversationQueue.add(conversation);
}
if (roomArchivingEnabled && (roomsArchived.isEmpty() || roomsArchived.contains(roomJID.getNode()))) {
JID jid = new JID(roomJID + "/" + nickname);
messageQueue.add(new ArchivedMessage(conversation.getConversationID(), sender, jid, date, body, false));
}
// Notify listeners of the conversation update.
for (ConversationListener listener : conversationListeners) {
listener.conversationUpdated(conversation, date);
}
}
}
/**
* Notification message indicating that a user joined a groupchat conversation. If no groupchat conversation was taking place in the specified
* room then ignore this event.
* <p>
* <p/>
* Eventually, when a new conversation will start in the room and if this user is still in the room then the new conversation will detect this
* user and mark like if the user joined the converstion from the beginning.
*
* @param room
* the room where the user joined.
* @param user
* the user that joined the room.
* @param nickname
* nickname of the user in the room.
* @param date
* date when the user joined the group coversation.
*/
void joinedGroupConversation(JID room, JID user, String nickname, Date date) {
Conversation conversation = getRoomConversation(room);
if (conversation != null) {
conversation.participantJoined(user, nickname, date.getTime());
}
}
/**
* Notification message indicating that a user left a groupchat conversation. If no groupchat conversation was taking place in the specified room
* then ignore this event.
*
* @param room
* the room where the user left.
* @param user
* the user that left the room.
* @param date
* date when the user left the group coversation.
*/
void leftGroupConversation(JID room, JID user, Date date) {
Conversation conversation = getRoomConversation(room);
if (conversation != null) {
conversation.participantLeft(user, date.getTime());
}
}
void roomConversationEnded(JID room, Date date) {
Conversation conversation = getRoomConversation(room);
if (conversation != null) {
removeConversation(room.toString(), conversation, date);
}
}
private void removeConversation(String key, Conversation conversation, Date date) {
conversations.remove(key);
// Notify conversation that it has ended
conversation.conversationEnded(date);
// Notify listeners of the conversation ending.
for (ConversationListener listener : conversationListeners) {
listener.conversationEnded(conversation);
}
}
/**
* Returns the group conversation taking place in the specified room or <tt>null</tt> if none.
*
* @param room
* JID of the room.
* @return the group conversation taking place in the specified room or null if none.
*/
private Conversation getRoomConversation(JID room) {
String conversationKey = room.toString();
return conversations.get(conversationKey);
}
private boolean isExternal(XMPPServer server, JID jid) {
return !server.isLocal(jid) || gateways.contains(jid.getDomain());
}
/**
* Returns true if the specified message should be processed by the conversation manager. Only messages between two users, group chats, or
* gateways are processed.
*
* @param message
* the message to analyze.
* @return true if the specified message should be processed by the conversation manager.
*/
boolean isConversation(Message message) {
if (Message.Type.normal == message.getType() || Message.Type.chat == message.getType()) {
// TODO: how should conversations with components on other servers be handled?
return isConversationJID(message.getFrom()) && isConversationJID(message.getTo());
}
return false;
}
/**
* Returns true if the specified JID should be recorded in a conversation.
*
* @param jid
* the JID.
* @return true if the JID should be recorded in a conversation.
*/
private boolean isConversationJID(JID jid) {
// Ignore conversations when there is no jid
if (jid == null) {
return false;
}
XMPPServer server = XMPPServer.getInstance();
if (jid.getNode() == null) {
return false;
}
// Always accept local JIDs or JIDs related to gateways
// (this filters our components, MUC, pubsub, etc. except gateways).
if (server.isLocal(jid) || gateways.contains(jid.getDomain())) {
return true;
}
// If not a local JID, always record it.
if (!jid.getDomain().endsWith(serverInfo.getXMPPDomain())) {
return true;
}
// Otherwise return false.
return false;
}
/**
* Returns a unique key for a coversation between two JID's. The order of two JID parameters is irrelevant; the same key will be returned.
*
* @param jid1
* the first JID.
* @param jid2
* the second JID.
* @return a unique key.
*/
String getConversationKey(JID jid1, JID jid2) {
StringBuilder builder = new StringBuilder();
if (jid1.compareTo(jid2) < 0) {
builder.append(jid1.toBareJID()).append("_").append(jid2.toBareJID());
} else {
builder.append(jid2.toBareJID()).append("_").append(jid1.toBareJID());
}
return builder.toString();
}
String getRoomConversationKey(JID roomJID) {
return roomJID.toString();
}
public void componentInfoReceived(IQ iq) {
// Check if the component is a gateway
boolean gatewayFound = false;
Element childElement = iq.getChildElement();
for (Iterator<Element> it = childElement.elementIterator("identity"); it.hasNext();) {
Element identity = it.next();
if ("gateway".equals(identity.attributeValue("category"))) {
gatewayFound = true;
}
}
// If component is a gateway then keep track of the component
if (gatewayFound) {
gateways.add(iq.getFrom().getDomain());
}
}
public void componentRegistered(JID componentJID) {
// Do nothing
}
public void componentUnregistered(JID componentJID) {
// Remove stored information about this component
gateways.remove(componentJID.getDomain());
}
void queueParticipantLeft(Conversation conversation, JID user, ConversationParticipation participation) {
RoomParticipant updatedParticipant = new RoomParticipant();
updatedParticipant.conversationID = conversation.getConversationID();
updatedParticipant.user = user;
updatedParticipant.joined = participation.getJoined();
updatedParticipant.left = participation.getLeft();
participantQueue.add(updatedParticipant);
}
/**
* A task that persists conversation meta-data and messages to the database.
*/
private class ArchivingTask implements Runnable {
public void run() {
synchronized (this) {
if (archivingRunning) {
return;
}
archivingRunning = true;
}
if (!messageQueue.isEmpty() || !conversationQueue.isEmpty() || !participantQueue.isEmpty()) {
Connection con = null;
PreparedStatement pstmt = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(INSERT_MESSAGE);
ArchivedMessage message;
int count = 0;
while ((message = messageQueue.poll()) != null) {
pstmt.setLong(1, message.getConversationID());
pstmt.setString(2, message.getFromJID().toBareJID());
pstmt.setString(3, message.getFromJID().getResource());
pstmt.setString(4, message.getToJID().toBareJID());
pstmt.setString(5, message.getToJID().getResource());
pstmt.setLong(6, message.getSentDate().getTime());
DbConnectionManager.setLargeTextField(pstmt, 7, message.getBody());
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.addBatch();
} else {
pstmt.execute();
}
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
pstmt = con.prepareStatement(UPDATE_CONVERSATION);
Conversation conversation;
count = 0;
while ((conversation = conversationQueue.poll()) != null) {
pstmt.setLong(1, conversation.getLastActivity().getTime());
pstmt.setInt(2, conversation.getMessageCount());
pstmt.setLong(3, conversation.getConversationID());
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.addBatch();
} else {
pstmt.execute();
}
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
pstmt = con.prepareStatement(UPDATE_PARTICIPANT);
RoomParticipant particpiant;
count = 0;
while ((particpiant = participantQueue.poll()) != null) {
pstmt.setLong(1, particpiant.left.getTime());
pstmt.setLong(2, particpiant.conversationID);
pstmt.setString(3, particpiant.user.toBareJID());
pstmt.setString(4, particpiant.user.getResource() == null ? " " : particpiant.user.getResource());
pstmt.setLong(5, particpiant.joined.getTime());
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.addBatch();
} else {
pstmt.execute();
}
// Only batch up to 500 items at a time.
if (count % 500 == 0 && DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
count++;
}
if (DbConnectionManager.isBatchUpdatesSupported()) {
pstmt.executeBatch();
}
} catch (Exception e) {
Log.error(e.getMessage(), e);
} finally {
DbConnectionManager.closeConnection(pstmt, con);
}
}
// Set archiving running back to false.
archivingRunning = false;
}
}
/**
* A PropertyEventListener that tracks updates to Jive properties that are related to conversation tracking and archiving.
*/
private class ConversationPropertyListener implements PropertyEventListener {
public void propertySet(String property, Map<String, Object> params) {
if (property.equals("conversation.metadataArchiving")) {
String value = (String) params.get("value");
metadataArchivingEnabled = Boolean.valueOf(value);
} else if (property.equals("conversation.messageArchiving")) {
String value = (String) params.get("value");
messageArchivingEnabled = Boolean.valueOf(value);
// Force metadata archiving enabled on if message archiving on.
if (messageArchivingEnabled) {
metadataArchivingEnabled = true;
}
} else if (property.equals("conversation.roomArchiving")) {
String value = (String) params.get("value");
roomArchivingEnabled = Boolean.valueOf(value);
// Force metadata archiving enabled on if message archiving on.
if (roomArchivingEnabled) {
metadataArchivingEnabled = true;
}
} else if (property.equals("conversation.roomsArchived")) {
String value = (String) params.get("value");
roomsArchived = StringUtils.stringToCollection(value);
} else if (property.equals("conversation.idleTime")) {
String value = (String) params.get("value");
try {
idleTime = Integer.parseInt(value) * JiveConstants.MINUTE;
} catch (Exception e) {
Log.error(e.getMessage(), e);
idleTime = DEFAULT_IDLE_TIME * JiveConstants.MINUTE;
}
} else if (property.equals("conversation.maxTime")) {
String value = (String) params.get("value");
try {
maxTime = Integer.parseInt(value) * JiveConstants.MINUTE;
} catch (Exception e) {
Log.error(e.getMessage(), e);
maxTime = DEFAULT_MAX_TIME * JiveConstants.MINUTE;
}
} else if (property.equals("conversation.maxRetrievable")) {
String value = (String) params.get("value");
try {
maxRetrievable = Integer.parseInt(value) * JiveConstants.DAY;
} catch (Exception e) {
Log.error(e.getMessage(), e);
maxRetrievable = DEFAULT_MAX_RETRIEVABLE * JiveConstants.DAY;
}
} else if (property.equals("conversation.maxAge")) {
String value = (String) params.get("value");
try {
maxAge = Integer.parseInt(value) * JiveConstants.DAY;
} catch (Exception e) {
Log.error(e.getMessage(), e);
maxAge = DEFAULT_MAX_AGE * JiveConstants.DAY;
}
}
}
public void propertyDeleted(String property, Map<String, Object> params) {
if (property.equals("conversation.metadataArchiving")) {
metadataArchivingEnabled = true;
} else if (property.equals("conversation.messageArchiving")) {
messageArchivingEnabled = false;
} else if (property.equals("conversation.roomArchiving")) {
roomArchivingEnabled = false;
} else if (property.equals("conversation.roomsArchived")) {
roomsArchived = Collections.emptyList();
} else if (property.equals("conversation.idleTime")) {
idleTime = DEFAULT_IDLE_TIME * JiveConstants.MINUTE;
} else if (property.equals("conversation.maxTime")) {
maxTime = DEFAULT_MAX_TIME * JiveConstants.MINUTE;
} else if (property.equals("conversation.maxAge")) {
maxAge = DEFAULT_MAX_AGE * JiveConstants.DAY;
} else if (property.equals("conversation.maxRetrievable")) {
maxRetrievable = DEFAULT_MAX_RETRIEVABLE * JiveConstants.DAY;
}
}
public void xmlPropertySet(String property, Map<String, Object> params) {
// Ignore.
}
public void xmlPropertyDeleted(String property, Map<String, Object> params) {
// Ignore.
}
}
private static class RoomParticipant {
private long conversationID = -1;
private JID user;
private Date joined;
private Date left;
}
}
\ No newline at end of file
......@@ -163,6 +163,10 @@
boolean roomArchiving = conversationManager.isRoomArchivingEnabled();
int idleTime = ParamUtils.getIntParameter(request, "idleTime", conversationManager.getIdleTime());
int maxTime = ParamUtils.getIntParameter(request, "maxTime", conversationManager.getMaxTime());
int maxAge = ParamUtils.getIntParameter(request, "maxAge", conversationManager.getMaxAge());
int maxRetrievable = ParamUtils.getIntParameter(request, "maxRetrievable", conversationManager.getMaxRetrievable());
boolean rebuildIndex = request.getParameter("rebuild") != null;
if (request.getParameter("cancel") != null) {
......@@ -197,6 +201,14 @@
errors.put("roomsArchived", "");
errorMessage = "Only name of local rooms should be specified.";
}
if (maxAge < 0) {
errors.put("maxAge", "");
errorMessage = "Max Age must be greater than or equal to 0.";
}
if (maxRetrievable < 1) {
errors.put("maxRetrievable", "");
errorMessage = "Max Retrievable must be greater than or equal to 0.";
}
// If no errors, continue:
if (errors.size() == 0) {
conversationManager.setMetadataArchivingEnabled(metadataArchiving);
......@@ -205,6 +217,9 @@
conversationManager.setRoomsArchived(StringUtils.stringToCollection(roomsArchived));
conversationManager.setIdleTime(idleTime);
conversationManager.setMaxTime(maxTime);
conversationManager.setMaxAge(maxAge);
conversationManager.setMaxRetrievable(maxRetrievable);
%>
<div class="success">
......@@ -285,6 +300,21 @@
<td><input type="text" name="maxTime" size="10" maxlength="10" value="<%= conversationManager.getMaxTime()%>" /></td>
<td></td>
</tr>
<tr>
<td><label class="jive-label"><fmt:message key="archive.settings.max.age"/>:</label><br>
<fmt:message key="archive.settings.max.age.description"/><br><br></td>
<td><input type="text" name="maxAge" size="10" maxlength="10" value="<%= conversationManager.getMaxAge()%>" /></td>
<td></td>
</tr>
<tr>
<td><label class="jive-label"><fmt:message key="archive.settings.max.retrievable"/>:</label><br>
<fmt:message key="archive.settings.max.retrievable.description"/><br><br></td>
<td><input type="text" name="maxRetrievable" size="10" maxlength="10" value="<%= conversationManager.getMaxRetrievable()%>" /></td>
<td></td>
</tr>
</tbody>
</table>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment