Commit ef338606 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gaston

1. Made asynchronic packets auditing. JM-25

2. Fixed bug while starting up AuditManager.


git-svn-id: http://svn.igniterealtime.org/svn/repos/messenger/trunk@435 b35dd754-fafc-0310-a699-88a17e54d16e
parent 558b097a
...@@ -88,6 +88,24 @@ public interface AuditManager { ...@@ -88,6 +88,24 @@ public interface AuditManager {
*/ */
void setMaxFileCount(int count); void setMaxFileCount(int count);
/**
* Returns the time in milliseconds between successive executions of the task that will save
* the queued audited packets to a permanent store.
*
* @return the time in milliseconds between successive executions of the task that will save
* the queued audited packets to a permanent store.
*/
int getLogTimeout();
/**
* Sets the time in milliseconds between successive executions of the task that will save
* the queued audited packets to a permanent store.
*
* @param logTimeout the time in milliseconds between successive executions of the task that will save
* the queued audited packets to a permanent store.
*/
void setLogTimeout(int logTimeout);
/** /**
* <p>Determines if the server will audit all message packets.</p> * <p>Determines if the server will audit all message packets.</p>
* <p>This is a speed optimization and convenience for logging all message packets * <p>This is a speed optimization and convenience for logging all message packets
......
...@@ -11,9 +11,6 @@ ...@@ -11,9 +11,6 @@
package org.jivesoftware.messenger.audit; package org.jivesoftware.messenger.audit;
import org.jivesoftware.messenger.IQ;
import org.jivesoftware.messenger.Message;
import org.jivesoftware.messenger.Presence;
import org.jivesoftware.messenger.XMPPPacket; import org.jivesoftware.messenger.XMPPPacket;
/** /**
...@@ -34,44 +31,30 @@ public interface Auditor { ...@@ -34,44 +31,30 @@ public interface Auditor {
*/ */
void audit(XMPPPacket packet); void audit(XMPPPacket packet);
/**
* Audit a message packet.
*
* @param packet the packet being audited.
*/
void audit(Message packet);
/**
* Audit a presence packet.
*
* @param packet the packet being audited.
* @param transition the presence transition type from AuditManager.
*/
void audit(Presence packet, int transition);
/**
* Audit an IQ packet.
*
* @param packet the packet being audited.
*/
void audit(IQ packet);
/** /**
* Audit any packet that was dropped (undeliverables, etc). * Audit any packet that was dropped (undeliverables, etc).
* *
* @param packet the packet that was dropped. * @param packet the packet that was dropped.
*/ */
void auditDroppedPacket(XMPPPacket packet); //void auditDroppedPacket(XMPPPacket packet);
/** /**
* Audit a non-packet event. * Audit a non-packet event.
* *
* @param event the event being audited. * @param event the event being audited.
*/ */
void audit(AuditEvent event); //void audit(AuditEvent event);
/** /**
* Prepares the auditor for system shutdown. * Prepares the auditor for system shutdown.
*/ */
void close(); void stop();
/**
* Returns the number of queued packets that are still in memory and need to be saved to a
* permanent store.
*
* @return the number of queued packets that are still in memory.
*/
int getQueuedPacketsNumber();
} }
\ No newline at end of file
...@@ -30,8 +30,10 @@ public class AuditManagerImpl extends BasicModule implements AuditManager { ...@@ -30,8 +30,10 @@ public class AuditManagerImpl extends BasicModule implements AuditManager {
private AuditorImpl auditor = null; private AuditorImpl auditor = null;
private int maxSize; private int maxSize;
private int maxCount; private int maxCount;
private int logTimeout;
private static final int MAX_FILE_SIZE = 10; private static final int MAX_FILE_SIZE = 10;
private static final int MAX_FILE_COUNT = 10; private static final int MAX_FILE_COUNT = 10;
private static final int DEFAULT_LOG_TIMEOUT = 120000;
public AuditManagerImpl() { public AuditManagerImpl() {
super("Audit Manager"); super("Audit Manager");
...@@ -63,6 +65,16 @@ public class AuditManagerImpl extends BasicModule implements AuditManager { ...@@ -63,6 +65,16 @@ public class AuditManagerImpl extends BasicModule implements AuditManager {
JiveGlobals.setProperty("xmpp.audit.maxsize", Integer.toString(size)); JiveGlobals.setProperty("xmpp.audit.maxsize", Integer.toString(size));
} }
public int getLogTimeout() {
return logTimeout;
}
public void setLogTimeout(int logTimeout) {
this.logTimeout = logTimeout;
auditor.setLogTimeout(logTimeout);
JiveGlobals.setProperty("xmpp.audit.logtimeout", Integer.toString(logTimeout));
}
public int getMaxFileCount() { public int getMaxFileCount() {
return maxCount; return maxCount;
} }
...@@ -135,11 +147,11 @@ public class AuditManagerImpl extends BasicModule implements AuditManager { ...@@ -135,11 +147,11 @@ public class AuditManagerImpl extends BasicModule implements AuditManager {
public void initialize(Container container) { public void initialize(Container container) {
super.initialize(container); super.initialize(container);
setEnabled(JiveGlobals.getBooleanProperty("xmpp.audit.active")); enabled = JiveGlobals.getBooleanProperty("xmpp.audit.active");
setEnabled(JiveGlobals.getBooleanProperty("xmpp.audit.message")); auditMessage = JiveGlobals.getBooleanProperty("xmpp.audit.message");
setEnabled(JiveGlobals.getBooleanProperty("xmpp.audit.presence")); auditPresence = JiveGlobals.getBooleanProperty("xmpp.audit.presence");
setEnabled(JiveGlobals.getBooleanProperty("xmpp.audit.iq")); auditIQ = JiveGlobals.getBooleanProperty("xmpp.audit.iq");
setEnabled(JiveGlobals.getBooleanProperty("xmpp.audit.xpath")); auditXPath = JiveGlobals.getBooleanProperty("xmpp.audit.xpath");
// TODO: load xpath values! // TODO: load xpath values!
// String[] filters = context.getProperties("xmpp.audit.filter.xpath"); // String[] filters = context.getProperties("xmpp.audit.filter.xpath");
// for (int i = 0; i < filters.length; i++) { // for (int i = 0; i < filters.length; i++) {
...@@ -147,13 +159,15 @@ public class AuditManagerImpl extends BasicModule implements AuditManager { ...@@ -147,13 +159,15 @@ public class AuditManagerImpl extends BasicModule implements AuditManager {
// } // }
maxSize = JiveGlobals.getIntProperty("xmpp.audit.maxsize", MAX_FILE_SIZE); maxSize = JiveGlobals.getIntProperty("xmpp.audit.maxsize", MAX_FILE_SIZE);
maxCount = JiveGlobals.getIntProperty("xmpp.audit.maxcount", MAX_FILE_COUNT); maxCount = JiveGlobals.getIntProperty("xmpp.audit.maxcount", MAX_FILE_COUNT);
logTimeout = JiveGlobals.getIntProperty("xmpp.audit.logtimeout", DEFAULT_LOG_TIMEOUT);
auditor = new AuditorImpl(this); auditor = new AuditorImpl(this);
auditor.setMaxValues(maxSize, maxCount); auditor.setMaxValues(maxSize, maxCount);
auditor.setLogTimeout(logTimeout);
} }
public void stop() { public void stop() {
if (auditor != null) { if (auditor != null) {
auditor.close(); auditor.stop();
} }
} }
} }
...@@ -22,9 +22,13 @@ import java.io.FileWriter; ...@@ -22,9 +22,13 @@ import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.util.Date; import java.util.Date;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
public class AuditorImpl implements Auditor { public class AuditorImpl implements Auditor {
...@@ -35,8 +39,20 @@ public class AuditorImpl implements Auditor { ...@@ -35,8 +39,20 @@ public class AuditorImpl implements Auditor {
private static final int MEGABYTE = 1024 * 1024; private static final int MEGABYTE = 1024 * 1024;
private int maxSize; private int maxSize;
private long maxCount; private long maxCount;
private int logTimeout;
private boolean closed = false; private boolean closed = false;
/**
* Queue that holds the audited packets that will be later saved to an XML file.
*/
private Queue<AuditPacket> logQueue = new LinkedBlockingQueue<AuditPacket>();
/**
* Timer to save queued logs to the XML file.
*/
private Timer timer = new Timer();
private SaveQueuedPacketsTask saveQueuedPacketsTask;
public AuditorImpl(AuditManager manager) { public AuditorImpl(AuditManager manager) {
auditManager = manager; auditManager = manager;
} }
...@@ -44,55 +60,40 @@ public class AuditorImpl implements Auditor { ...@@ -44,55 +60,40 @@ public class AuditorImpl implements Auditor {
public void audit(XMPPPacket packet) { public void audit(XMPPPacket packet) {
if (auditManager.isEnabled()) { if (auditManager.isEnabled()) {
if (packet instanceof Message) { if (packet instanceof Message) {
if (auditManager.isEnabled() && auditManager.isAuditMessage()) { if (auditManager.isAuditMessage()) {
writePacket(packet, false); writePacket(packet, false);
} }
} }
else if (packet instanceof Presence) { else if (packet instanceof Presence) {
if (auditManager.isEnabled() && auditManager.isAuditPresence()) { if (auditManager.isAuditPresence()) {
writePacket(packet, false); writePacket(packet, false);
} }
} }
else if (packet instanceof IQ) { else if (packet instanceof IQ) {
if (auditManager.isEnabled() && auditManager.isAuditIQ()) { if (auditManager.isAuditIQ()) {
writePacket(packet, false); writePacket(packet, false);
} }
} }
} }
} }
public synchronized void audit(Message packet) { /*public void auditDroppedPacket(XMPPPacket packet) {
if (auditManager.isEnabled() && auditManager.isAuditMessage()) {
writePacket(packet, false);
}
}
public synchronized void audit(Presence packet, int transition) {
if (auditManager.isEnabled() && auditManager.isAuditPresence()) {
writePacket(packet, false);
}
}
public synchronized void audit(IQ packet) {
if (auditManager.isEnabled() && auditManager.isAuditIQ()) {
writePacket(packet, false);
}
}
public synchronized void auditDroppedPacket(XMPPPacket packet) {
writePacket(packet, true); writePacket(packet, true);
} }
public synchronized void audit(AuditEvent event) { public void audit(AuditEvent event) {
try { // TODO Implement this functionality. Not used currently.
prepareAuditFile(); }*/
}
catch (Exception e) { public void stop() {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e); // Stop the scheduled task for saving queued packets to the XML file
} timer.cancel();
// Save all remaining queued packets to the XML file
saveQueuedPackets();
close();
} }
public void close() { private void close() {
if (xmlSerializer != null) { if (xmlSerializer != null) {
try { try {
xmlSerializer.writeEndElement(); xmlSerializer.writeEndElement();
...@@ -109,48 +110,9 @@ public class AuditorImpl implements Auditor { ...@@ -109,48 +110,9 @@ public class AuditorImpl implements Auditor {
private void writePacket(XMPPPacket packet, boolean dropped) { private void writePacket(XMPPPacket packet, boolean dropped) {
if (!closed) { if (!closed) {
try { // Add to the logging queue this new entry that will be saved later
prepareAuditFile(); logQueue.add(new AuditPacket((XMPPPacket) packet.createDeepCopy(), dropped));
xmlSerializer.writeStartElement("packet"); }
xmlSerializer.writeDefaultNamespace("http://jivesoftware.org");
Session session = packet.getOriginatingSession();
if (session != null) {
if (session.getStreamID() != null) {
xmlSerializer.writeAttribute("session", session.getStreamID().toString());
}
switch (session.getStatus()) {
case Session.STATUS_AUTHENTICATED:
xmlSerializer.writeAttribute("status", "auth");
break;
case Session.STATUS_CLOSED:
xmlSerializer.writeAttribute("status", "closed");
break;
case Session.STATUS_CONNECTED:
xmlSerializer.writeAttribute("status", "connected");
break;
case Session.STATUS_STREAMING:
xmlSerializer.writeAttribute("status", "stream");
break;
default:
xmlSerializer.writeAttribute("status", "unknown");
break;
}
}
xmlSerializer.writeAttribute("timestamp", new Date().toString());
if (packet.isSending()) {
xmlSerializer.writeAttribute("sending", "true");
}
if (dropped) {
xmlSerializer.writeAttribute("dropped", "true");
}
packet.send(xmlSerializer, 0);
xmlSerializer.writeEndElement();
xmlSerializer.flush();
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
} // closed
} }
private void prepareAuditFile() throws IOException, XMLStreamException { private void prepareAuditFile() throws IOException, XMLStreamException {
...@@ -164,6 +126,22 @@ public class AuditorImpl implements Auditor { ...@@ -164,6 +126,22 @@ public class AuditorImpl implements Auditor {
maxCount = count; maxCount = count;
} }
public void setLogTimeout(int newTimeout) {
// Cancel any existing task because the timeout has changed
if (saveQueuedPacketsTask != null) {
saveQueuedPacketsTask.cancel();
}
this.logTimeout = newTimeout;
// Create a new task and schedule it with the new timeout
saveQueuedPacketsTask = new SaveQueuedPacketsTask();
timer.schedule(saveQueuedPacketsTask, logTimeout, logTimeout);
}
public int getQueuedPacketsNumber() {
return logQueue.size();
}
private void rotateFiles() throws IOException, XMLStreamException { private void rotateFiles() throws IOException, XMLStreamException {
close(); close();
int i; int i;
...@@ -203,4 +181,117 @@ public class AuditorImpl implements Auditor { ...@@ -203,4 +181,117 @@ public class AuditorImpl implements Auditor {
xmlSerializer.writeStartElement("jive", "jive", "http://jivesoftware.org"); xmlSerializer.writeStartElement("jive", "jive", "http://jivesoftware.org");
xmlSerializer.writeNamespace("jive", "http://jivesoftware.org"); xmlSerializer.writeNamespace("jive", "http://jivesoftware.org");
} }
/**
* Saves the queued entries to an XML file.
*/
private class SaveQueuedPacketsTask extends TimerTask {
public void run() {
try {
saveQueuedPackets();
}
catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
private void saveQueuedPackets() {
AuditPacket entry;
int batchSize = logQueue.size();
for (int index = 0; index < batchSize; index++) {
entry = logQueue.poll();
if (entry != null) {
try {
prepareAuditFile();
entry.send(xmlSerializer);
xmlSerializer.flush();
}
catch (IOException e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
// Add again the entry to the queue to save it later
logQueue.add(entry);
}
catch (XMLStreamException e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
// Add again the entry to the queue to save it later
logQueue.add(entry);
}
}
}
}
/**
* Wrapper on a Packet with information about the packet's status at the moment when the message
* was queued.<p>
*
* The idea is to wrap every packet that is needed to be audited and then add the wrapper to a
* queue that will be later processed (i.e. saved to the XML file).
*/
private class AuditPacket {
private XMPPPacket packet;
private String streamID;
private String sessionStatus;
private Date timestamp;
private boolean sending;
private boolean dropped;
public AuditPacket(XMPPPacket packet, boolean dropped) {
this.packet = packet;
this.dropped = dropped;
this.timestamp = new Date();
this.sending = packet.isSending();
Session session = packet.getOriginatingSession();
if (session != null) {
if (session.getStreamID() != null) {
this.streamID = session.getStreamID().toString();
}
switch (session.getStatus()) {
case Session.STATUS_AUTHENTICATED:
this.sessionStatus = "auth";
break;
case Session.STATUS_CLOSED:
this.sessionStatus = "closed";
break;
case Session.STATUS_CONNECTED:
this.sessionStatus = "connected";
break;
case Session.STATUS_STREAMING:
this.sessionStatus = "stream";
break;
default:
this.sessionStatus = "unknown";
break;
}
}
}
public void send(XMLStreamWriter xmlSerializer) {
try {
xmlSerializer.writeStartElement("packet");
xmlSerializer.writeDefaultNamespace("http://jivesoftware.org");
if (streamID != null) {
xmlSerializer.writeAttribute("session", streamID);
}
if (sessionStatus != null) {
xmlSerializer.writeAttribute("status", sessionStatus);
}
xmlSerializer.writeAttribute("timestamp", timestamp.toString());
if (sending) {
xmlSerializer.writeAttribute("sending", "true");
}
if (dropped) {
xmlSerializer.writeAttribute("dropped", "true");
}
packet.send(xmlSerializer, 0);
xmlSerializer.writeEndElement();
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
} }
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
String[] xpathQuery = ParamUtils.getParameters(request,"xpathQuery"); String[] xpathQuery = ParamUtils.getParameters(request,"xpathQuery");
String maxCount = ParamUtils.getParameter(request,"maxCount"); String maxCount = ParamUtils.getParameter(request,"maxCount");
String maxSize = ParamUtils.getParameter(request,"maxSize"); String maxSize = ParamUtils.getParameter(request,"maxSize");
String logTimeout = ParamUtils.getParameter(request,"logTimeout");
// Get an audit manager: // Get an audit manager:
AuditManager auditManager = (AuditManager)admin.getServiceLookup().lookup(AuditManager.class); AuditManager auditManager = (AuditManager)admin.getServiceLookup().lookup(AuditManager.class);
...@@ -73,6 +74,11 @@ ...@@ -73,6 +74,11 @@
} catch (Exception e){ } catch (Exception e){
errors.put("maxSize","maxSize"); errors.put("maxSize","maxSize");
} }
try {
auditManager.setLogTimeout(Integer.parseInt(logTimeout) * 1000);
} catch (Exception e){
errors.put("logTimeout","logTimeout");
}
// All done, redirect // All done, redirect
if (errors.size() == 0){ if (errors.size() == 0){
%> %>
...@@ -101,6 +107,7 @@ ...@@ -101,6 +107,7 @@
auditXPath = auditManager.isAuditXPath(); auditXPath = auditManager.isAuditXPath();
maxCount = Integer.toString(auditManager.getMaxFileCount()); maxCount = Integer.toString(auditManager.getMaxFileCount());
maxSize = Integer.toString(auditManager.getMaxFileSize()); maxSize = Integer.toString(auditManager.getMaxFileSize());
logTimeout = Integer.toString(auditManager.getLogTimeout() / 1000);
} }
%> %>
...@@ -184,6 +191,24 @@ and IQ packets are primarily useful for tracing and troubleshooting XMPP deploym ...@@ -184,6 +191,24 @@ and IQ packets are primarily useful for tracing and troubleshooting XMPP deploym
</td> </td>
</tr> </tr>
<tr valign="top">
<td width="1%" nowrap class="c1">
Flush Interval (seconds):
</td>
<td width="99%">
<input type="text" size="15" maxlength="50" name="logTimeout"
value="<%= ((logTimeout != null) ? logTimeout : "") %>">
<% if (errors.get("logTimeout") != null) { %>
<span class="jive-error-text">
Please enter a valid number.
</span>
<% } %>
</td>
</tr>
<tr valign="top"> <tr valign="top">
<td width="1%" nowrap class="c1"> <td width="1%" nowrap class="c1">
Packets to audit: Packets to audit:
...@@ -230,6 +255,14 @@ and IQ packets are primarily useful for tracing and troubleshooting XMPP deploym ...@@ -230,6 +255,14 @@ and IQ packets are primarily useful for tracing and troubleshooting XMPP deploym
</table> </table>
</td> </td>
</tr> </tr>
<tr valign="top">
<td width="1%" nowrap class="c1">
Queued packets:
</td>
<td width="99%">
<%= auditManager.getAuditor().getQueuedPacketsNumber() %>
</td>
</tr>
</table> </table>
</td> </td>
</tr> </tr>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment