Commit 9b5d3a49 authored by Guus der Kinderen's avatar Guus der Kinderen Committed by akrherz

OF-1509: Add thread safety when incrementing counter.

parent 2513ee66
...@@ -22,6 +22,7 @@ import java.util.Date; ...@@ -22,6 +22,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
...@@ -86,8 +87,8 @@ public abstract class LocalSession implements Session { ...@@ -86,8 +87,8 @@ public abstract class LocalSession implements Session {
private long startDate = System.currentTimeMillis(); private long startDate = System.currentTimeMillis();
private long lastActiveDate; private long lastActiveDate;
private long clientPacketCount = 0; private AtomicLong clientPacketCount = new AtomicLong( 0 );
private long serverPacketCount = 0; private AtomicLong serverPacketCount = new AtomicLong( 0 );
/** /**
* Session temporary data. All data stored in this <code>Map</code> disapear when session * Session temporary data. All data stored in this <code>Map</code> disapear when session
...@@ -272,7 +273,7 @@ public abstract class LocalSession implements Session { ...@@ -272,7 +273,7 @@ public abstract class LocalSession implements Session {
* Increments the number of packets sent from the client to the server. * Increments the number of packets sent from the client to the server.
*/ */
public void incrementClientPacketCount() { public void incrementClientPacketCount() {
clientPacketCount++; clientPacketCount.incrementAndGet();
lastActiveDate = System.currentTimeMillis(); lastActiveDate = System.currentTimeMillis();
streamManager.incrementServerProcessedStanzas(); streamManager.incrementServerProcessedStanzas();
} }
...@@ -281,7 +282,7 @@ public abstract class LocalSession implements Session { ...@@ -281,7 +282,7 @@ public abstract class LocalSession implements Session {
* Increments the number of packets sent from the server to the client. * Increments the number of packets sent from the server to the client.
*/ */
public void incrementServerPacketCount() { public void incrementServerPacketCount() {
serverPacketCount++; serverPacketCount.incrementAndGet();
lastActiveDate = System.currentTimeMillis(); lastActiveDate = System.currentTimeMillis();
} }
...@@ -292,7 +293,7 @@ public abstract class LocalSession implements Session { ...@@ -292,7 +293,7 @@ public abstract class LocalSession implements Session {
*/ */
@Override @Override
public long getNumClientPackets() { public long getNumClientPackets() {
return clientPacketCount; return clientPacketCount.get();
} }
/** /**
...@@ -302,7 +303,7 @@ public abstract class LocalSession implements Session { ...@@ -302,7 +303,7 @@ public abstract class LocalSession implements Session {
*/ */
@Override @Override
public long getNumServerPackets() { public long getNumServerPackets() {
return serverPacketCount; return serverPacketCount.get();
} }
/** /**
......
...@@ -4,6 +4,7 @@ import java.math.BigInteger; ...@@ -4,6 +4,7 @@ import java.math.BigInteger;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.QName; import org.dom4j.QName;
...@@ -67,15 +68,18 @@ public class StreamManager { ...@@ -67,15 +68,18 @@ public class StreamManager {
* Count of how many stanzas/packets * Count of how many stanzas/packets
* sent from the client that the server has processed * sent from the client that the server has processed
*/ */
private long serverProcessedStanzas = 0; private AtomicLong serverProcessedStanzas = new AtomicLong( 0 );
/** /**
* Count of how many stanzas/packets * Count of how many stanzas/packets
* sent from the server that the client has processed * sent from the server that the client has processed
*/ */
private long clientProcessedStanzas = 0; private AtomicLong clientProcessedStanzas = new AtomicLong( 0 );
static private long mask = new BigInteger("2").pow(32).longValue() - 1; // This is used to emulate rollover. /**
* The value (2^32)-1, used to emulate roll-over
*/
private static final long MASK = new BigInteger( "2" ).pow( 32 ).longValue() - 1;
/** /**
* Collection of stanzas/packets sent to client that haven't been acknowledged. * Collection of stanzas/packets sent to client that haven't been acknowledged.
...@@ -301,7 +305,7 @@ public class StreamManager { ...@@ -301,7 +305,7 @@ public class StreamManager {
Log.debug("Session is detached, won't request an ack."); Log.debug("Session is detached, won't request an ack.");
return; return;
} }
String ack = String.format("<a xmlns='%s' h='%s' />", namespace, serverProcessedStanzas & mask); String ack = String.format("<a xmlns='%s' h='%s' />", namespace, serverProcessedStanzas.get() & MASK );
session.deliverRawText( ack ); session.deliverRawText( ack );
} }
} }
...@@ -350,7 +354,7 @@ public class StreamManager { ...@@ -350,7 +354,7 @@ public class StreamManager {
* @return false if we sent less stanzas to the client than the number it is acknowledging. * @return false if we sent less stanzas to the client than the number it is acknowledging.
*/ */
private synchronized boolean validateClientAcknowledgement(long h) { private synchronized boolean validateClientAcknowledgement(long h) {
return h <= ( unacknowledgedServerStanzas.isEmpty() ? clientProcessedStanzas : unacknowledgedServerStanzas.getLast().x ); return h <= ( unacknowledgedServerStanzas.isEmpty() ? clientProcessedStanzas.get() : unacknowledgedServerStanzas.getLast().x );
} }
/** /**
...@@ -366,7 +370,7 @@ public class StreamManager { ...@@ -366,7 +370,7 @@ public class StreamManager {
throw new IllegalStateException( "Client acknowledges stanzas that we didn't send! Client Ack h: "+h+", our last stanza: " + unacknowledgedServerStanzas.getLast().x ); throw new IllegalStateException( "Client acknowledges stanzas that we didn't send! Client Ack h: "+h+", our last stanza: " + unacknowledgedServerStanzas.getLast().x );
} }
clientProcessedStanzas = h; clientProcessedStanzas.set( h );
// Remove stanzas from temporary storage as now acknowledged // Remove stanzas from temporary storage as now acknowledged
Log.trace( "Before processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size() ); Log.trace( "Before processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size() );
...@@ -379,11 +383,11 @@ public class StreamManager { ...@@ -379,11 +383,11 @@ public class StreamManager {
// Ensure that unacknowledged stanzas are purged after the client rolled over 'h' which occurs at h= (2^32)-1 // Ensure that unacknowledged stanzas are purged after the client rolled over 'h' which occurs at h= (2^32)-1
final int maxUnacked = getMaximumUnacknowledgedStanzas(); final int maxUnacked = getMaximumUnacknowledgedStanzas();
final boolean clientHadRollOver = h < maxUnacked && !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked; final boolean clientHadRollOver = h < maxUnacked && !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > MASK - maxUnacked;
if ( clientHadRollOver ) if ( clientHadRollOver )
{ {
Log.info( "Client rolled over 'h'. Purging high-numbered unacknowledged stanzas." ); Log.info( "Client rolled over 'h'. Purging high-numbered unacknowledged stanzas." );
while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked) while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > MASK - maxUnacked)
{ {
unacknowledgedServerStanzas.removeLast(); unacknowledgedServerStanzas.removeLast();
} }
...@@ -430,7 +434,7 @@ public class StreamManager { ...@@ -430,7 +434,7 @@ public class StreamManager {
synchronized (this) synchronized (this)
{ {
// The next ID is one higher than the last stanza that was sent (which might be unacknowledged!) // The next ID is one higher than the last stanza that was sent (which might be unacknowledged!)
final long x = 1 + ( unacknowledgedServerStanzas.isEmpty() ? clientProcessedStanzas : unacknowledgedServerStanzas.getLast().x ); final long x = 1 + ( unacknowledgedServerStanzas.isEmpty() ? clientProcessedStanzas.get() : unacknowledgedServerStanzas.getLast().x );
unacknowledgedServerStanzas.addLast( new StreamManager.UnackedPacket( x, packet.createCopy() ) ); unacknowledgedServerStanzas.addLast( new StreamManager.UnackedPacket( x, packet.createCopy() ) );
size = unacknowledgedServerStanzas.size(); size = unacknowledgedServerStanzas.size();
...@@ -481,7 +485,7 @@ public class StreamManager { ...@@ -481,7 +485,7 @@ public class StreamManager {
Log.debug("Agreeing to resume"); Log.debug("Agreeing to resume");
Element resumed = new DOMElement(QName.get("resumed", namespace)); Element resumed = new DOMElement(QName.get("resumed", namespace));
resumed.addAttribute("previd", StringUtils.encodeBase64( session.getAddress().getResource() + "\0" + session.getStreamID().getID())); resumed.addAttribute("previd", StringUtils.encodeBase64( session.getAddress().getResource() + "\0" + session.getStreamID().getID()));
resumed.addAttribute("h", Long.toString(serverProcessedStanzas)); resumed.addAttribute("h", Long.toString(serverProcessedStanzas.get()));
session.getConnection().deliverRawText(resumed.asXML()); session.getConnection().deliverRawText(resumed.asXML());
Log.debug("Resuming session: Ack for {}", h); Log.debug("Resuming session: Ack for {}", h);
processClientAcknowledgement(h); processClientAcknowledgement(h);
...@@ -535,7 +539,7 @@ public class StreamManager { ...@@ -535,7 +539,7 @@ public class StreamManager {
*/ */
public void incrementServerProcessedStanzas() { public void incrementServerProcessedStanzas() {
if(isEnabled()) { if(isEnabled()) {
this.serverProcessedStanzas++; this.serverProcessedStanzas.incrementAndGet();
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment