Commit c3ed050a authored by Guus der Kinderen's avatar Guus der Kinderen

OF-1028: Store X with each unack'ed stanza

To safely determine what stanza has and has not been ack'ed by the peer,
store the current value of 'x' with each stanza that's kept. This also
allows for proper handling when the peer rolls over X (at x=2^32).
parent 231e73ac
package org.jivesoftware.openfire.streammanagement; package org.jivesoftware.openfire.streammanagement;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Date; import java.util.*;
import java.util.Deque;
import java.util.LinkedList;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
...@@ -26,11 +24,12 @@ import org.xmpp.packet.PacketError; ...@@ -26,11 +24,12 @@ import org.xmpp.packet.PacketError;
public class StreamManager { public class StreamManager {
private final Logger Log; private final Logger Log;
public static class UnackedPacket { public static class UnackedPacket {
public final Date timestamp; public final long x;
public final Date timestamp = new Date();
public final Packet packet; public final Packet packet;
public UnackedPacket(Date date, Packet p) { public UnackedPacket(long x, Packet p) {
timestamp = date; this.x = x;
packet = p; packet = p;
} }
} }
...@@ -74,7 +73,7 @@ public class StreamManager { ...@@ -74,7 +73,7 @@ public class StreamManager {
* sent from the server that the client has processed * sent from the server that the client has processed
*/ */
private long clientProcessedStanzas = 0; private long clientProcessedStanzas = 0;
static private long mask = 0xFFFFFFFF; /* 2**32 - 1; this is used to emulate rollover */ static private long mask = 0xFFFFFFFF; /* 2**32 - 1; this is used to emulate rollover */
/** /**
...@@ -188,43 +187,54 @@ public class StreamManager { ...@@ -188,43 +187,54 @@ public class StreamManager {
private void processClientAcknowledgement(Element ack) { private void processClientAcknowledgement(Element ack) {
if(isEnabled()) { if(isEnabled()) {
if (ack.attribute("h") != null) { if (ack.attribute("h") != null) {
long count = Long.valueOf(ack.attributeValue("h")); final long h = Long.valueOf(ack.attributeValue("h"));
synchronized (this) { synchronized (this) {
// Remove stanzas from temporary storage as now acknowledged
Log.debug("Ack: h={} mine={} length={}", count, clientProcessedStanzas, unacknowledgedServerStanzas.size()); if ( !unacknowledgedServerStanzas.isEmpty() && h > unacknowledgedServerStanzas.getLast().x ) {
if (count < clientProcessedStanzas) { Log.warn( "Client acknowledges stanzas that we didn't sent! Client Ack h: {}, our last stanza: {}", h, unacknowledgedServerStanzas.getLast().x );
/* Consider rollover? */ clientProcessedStanzas = h; // Correct the bookkeeping on our end.
Log.debug("Maybe rollover");
if (clientProcessedStanzas > mask) {
while (count < clientProcessedStanzas) {
Log.debug("Rolling...");
count += mask + 1;
}
}
} }
while (clientProcessedStanzas < count) {
// Remove stanzas from temporary storage as now acknowledged
Log.debug("Before processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size());
// Pop all acknowledged stanzas.
while( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getFirst().x <= h )
{
unacknowledgedServerStanzas.removeFirst(); unacknowledgedServerStanzas.removeFirst();
clientProcessedStanzas++;
Log.debug("In Ack: h={} mine={} length={}", count, clientProcessedStanzas, unacknowledgedServerStanzas.size());
} }
if(count >= clientProcessedStanzas) { // Ensure that unacknowledged stanzas are purged after the client rolled over 'h' which occurs at h= (2^32)-1
clientProcessedStanzas = count; final boolean clientHadRollOver = h < 10000 && unacknowledgedServerStanzas.getLast().x > mask - 10000;
if (clientHadRollOver )
{
Log.info( "Client rolled over 'h'. Purging high-numbered unacklowledged stanzas." );
while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - 10000)
{
unacknowledgedServerStanzas.removeLast();
}
} }
Log.debug("After processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size());
} }
} }
} }
} }
/**
* Registers that Openfire sends a stanza to the client (which is expected to be acknowledged later).
* @param packet The stanza that is sent.
*/
public void sentStanza(Packet packet) { public void sentStanza(Packet packet) {
if(isEnabled()) { if(isEnabled()) {
synchronized (this) { synchronized (this) {
this.serverSentStanzas++; this.serverSentStanzas++;
// Temporarily store packet until delivery confirmed // The next ID is one higher than the last stanza that was sent (which might be unacknowledged!)
unacknowledgedServerStanzas.addLast( new StreamManager.UnackedPacket( new Date(), packet.createCopy() ) ); final long x = 1 + ( unacknowledgedServerStanzas.isEmpty() ? clientProcessedStanzas : unacknowledgedServerStanzas.getLast().x );
Log.debug("Added stanza of type {}, now {} / {}", packet.getClass().getName(), serverSentStanzas, unacknowledgedServerStanzas.size()); unacknowledgedServerStanzas.addLast( new StreamManager.UnackedPacket( x, packet.createCopy() ) );
Log.debug("Added stanza of type {} to collection of unacknowledged stanzas (x={}). Collection size is now {} / {}", packet.getElement().getName(), x, serverSentStanzas, unacknowledgedServerStanzas.size());
} }
if(serverSentStanzas % JiveGlobals.getLongProperty("stream.management.requestFrequency", 5) == 0) { if(serverSentStanzas % JiveGlobals.getLongProperty("stream.management.requestFrequency", 5) == 0) {
sendServerRequest(); sendServerRequest();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment