Commit bac4c600 authored by Guus der Kinderen's avatar Guus der Kinderen

OF-1028: Avoid potential concurrency issue.

Do not expose the collection of packets that our implementation
modifies only under a mutex (alternatively, we could return a
defensive copy instead, but there does not seem to be a need
to expose the collection in the first place).
parent 2889dfcd
......@@ -138,9 +138,8 @@ public class StreamManager {
if (ack.attribute("h") != null) {
long count = Long.valueOf(ack.attributeValue("h"));
// Remove stanzas from temporary storage as now acknowledged
Deque<UnackedPacket> unacknowledgedStanzas = getUnacknowledgedServerStanzas();
long i = getClientProcessedStanzas();
Log.debug("Ack: h={} mine={} length={}", count, i, unacknowledgedStanzas.size());
Log.debug("Ack: h={} mine={} length={}", count, i, unacknowledgedServerStanzas.size());
if (count < i) {
/* Consider rollover? */
Log.debug("Maybe rollover");
......@@ -152,9 +151,9 @@ public class StreamManager {
}
}
while (i < count) {
unacknowledgedStanzas.removeFirst();
unacknowledgedServerStanzas.removeFirst();
i++;
Log.debug("In Ack: h={} mine={} length={}", count, i, unacknowledgedStanzas.size());
Log.debug("In Ack: h={} mine={} length={}", count, i, unacknowledgedServerStanzas.size());
}
setClientProcessedStanzas(count);
......@@ -169,8 +168,8 @@ public class StreamManager {
synchronized (this) {
incrementServerSentStanzas();
// Temporarily store packet until delivery confirmed
getUnacknowledgedServerStanzas().addLast(new StreamManager.UnackedPacket(new Date(), packet.createCopy()));
Log.debug("Added stanza of type {}, now {} / {}", packet.getClass().getName(), getServerSentStanzas(), getUnacknowledgedServerStanzas().size());
unacknowledgedServerStanzas.addLast( new StreamManager.UnackedPacket( new Date(), packet.createCopy() ) );
Log.debug("Added stanza of type {}, now {} / {}", packet.getClass().getName(), getServerSentStanzas(), unacknowledgedServerStanzas.size());
}
if(getServerSentStanzas() % JiveGlobals.getLongProperty("stream.management.requestFrequency", 5) == 0) {
sendServerRequest();
......@@ -184,9 +183,8 @@ public class StreamManager {
if(isEnabled()) {
setEnabled(false); // Avoid concurrent usage.
synchronized (this) {
Deque<StreamManager.UnackedPacket> unacknowledgedStanzas = getUnacknowledgedServerStanzas();
if (!unacknowledgedStanzas.isEmpty()) {
for (StreamManager.UnackedPacket unacked : unacknowledgedStanzas) {
if (!unacknowledgedServerStanzas.isEmpty()) {
for (StreamManager.UnackedPacket unacked : unacknowledgedServerStanzas) {
if (unacked.packet instanceof Message) {
Message m = (Message) unacked.packet;
if (m.getExtension("delay", "urn:xmpp:delay") == null) {
......@@ -302,13 +300,4 @@ public class StreamManager {
clientProcessedStanzas = count;
}
}
/**
* Retrieves all unacknowledged stanzas sent to client from server.
* @return all unacknowledged stanzas sent to client from server.
*/
public Deque<UnackedPacket> getUnacknowledgedServerStanzas() {
return unacknowledgedServerStanzas;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment