Commit 3bb6ab49 authored by Guus der Kinderen's avatar Guus der Kinderen

OF-1028: Restrict the amount of data in memory.

When a peer refuses to ack data, we should not keep it in memory
indefinately. This commit introduces a configurable upper bound,
and when reached causes the queue to be cleared (dropped silently)
while disabling the functionality for that connection.
parent 422e7ff6
......@@ -23,6 +23,7 @@ import org.xmpp.packet.PacketError;
* @author jonnyheavey
*/
public class StreamManager {
private final Logger Log;
public static class UnackedPacket {
public final long x;
......@@ -200,11 +201,12 @@ public class StreamManager {
}
// Ensure that unacknowledged stanzas are purged after the client rolled over 'h' which occurs at h= (2^32)-1
final boolean clientHadRollOver = h < 10000 && !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - 10000;
final int maxUnacked = getMaximumUnacknowledgedStanzas();
final boolean clientHadRollOver = h < maxUnacked && !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked;
if ( clientHadRollOver )
{
Log.info( "Client rolled over 'h'. Purging high-numbered unacklowledged stanzas." );
while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - 10000)
Log.info( "Client rolled over 'h'. Purging high-numbered unacknowledged stanzas." );
while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked)
{
unacknowledgedServerStanzas.removeLast();
}
......@@ -224,18 +226,30 @@ public class StreamManager {
if(isEnabled()) {
final long requestFrequency = JiveGlobals.getLongProperty( "stream.management.requestFrequency", 5 );
final boolean requestAck;
final int size;
synchronized (this) {
synchronized (this)
{
// The next ID is one higher than the last stanza that was sent (which might be unacknowledged!)
final long x = 1 + ( unacknowledgedServerStanzas.isEmpty() ? clientProcessedStanzas : unacknowledgedServerStanzas.getLast().x );
unacknowledgedServerStanzas.addLast( new StreamManager.UnackedPacket( x, packet.createCopy() ) );
requestAck = unacknowledgedServerStanzas.size() >= requestFrequency;
Log.debug("Added stanza of type {} to collection of unacknowledged stanzas (x={}). Collection size is now {} / {}", packet.getElement().getName(), x, unacknowledgedServerStanzas.size());
size = unacknowledgedServerStanzas.size();
Log.debug( "Added stanza of type {} to collection of unacknowledged stanzas (x={}). Collection size is now {}.", packet.getElement().getName(), x, size );
// Prevent keeping to many stanzas in memory.
if ( size > getMaximumUnacknowledgedStanzas() )
{
Log.warn( "To many stanzas go unacknowledged for this connection. Clearing queue and disabling functionality." );
namespace = null;
unacknowledgedServerStanzas.clear();
return;
}
}
if(requestAck) {
// When we have a sizable amount of unacknowledged stanzas, request acknowledgement.
if ( size >= requestFrequency ) {
sendServerRequest();
}
}
......@@ -282,4 +296,13 @@ public class StreamManager {
this.serverProcessedStanzas++;
}
}
/**
* The maximum amount of stanzas we keep, waiting for ack.
* @return The maximum number of stanzas.
*/
private int getMaximumUnacknowledgedStanzas()
{
return JiveGlobals.getIntProperty( "stream.management.max-unacked", 10000 );
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment