Commit 7768e4fb authored by Dave Cridland's avatar Dave Cridland

Merge pull request #493 from guusdk/OF-1028

OF-1028 Stream Management refactoring
parents 8d2f0722 c8269f66
......@@ -190,21 +190,8 @@ public abstract class StanzaHandler {
// resource binding and session establishment (to client sessions only)
waitingCompressionACK = true;
}
} else if(isStreamManagementStanza(doc)) {
switch(tag) {
case "enable":
session.enableStreamMangement(doc);
break;
case "r":
session.getStreamManager().sendServerAcknowledgement();
break;
case "a":
session.getStreamManager().processClientAcknowledgement(doc);
break;
default:
process(doc);
break;
}
} else if (isStreamManagementStanza(doc)) {
session.getStreamManager().process( doc, session.getAddress() );
}
else {
process(doc);
......
......@@ -453,28 +453,6 @@ public abstract class LocalSession implements Session {
return "NONE";
}
/**
* Enables stream management for session
* @param enable XEP-0198 <enable/> element
*/
public void enableStreamMangement(Element enable) {
// Do nothing if already enabled
if(streamManager.isEnabled()) {
return;
}
streamManager.setNamespace(enable.getNamespace().getStringValue());
// Ensure that resource binding has occurred
if(getAddress().getResource() == null) {
streamManager.sendUnexpectedError();
return;
}
streamManager.setEnabled(true);
}
@Override
public final Locale getLanguage() {
return language;
......
package org.jivesoftware.openfire.streammanagement;
import java.util.Date;
import java.util.Deque;
import java.util.LinkedList;
import java.math.BigInteger;
import java.net.UnknownHostException;
import java.util.*;
import org.dom4j.Element;
import org.jivesoftware.openfire.Connection;
......@@ -23,13 +23,15 @@ import org.xmpp.packet.PacketError;
* @author jonnyheavey
*/
public class StreamManager {
private static final Logger Log = LoggerFactory.getLogger(StreamManager.class);
private final Logger Log;
public static class UnackedPacket {
public final Date timestamp;
public final long x;
public final Date timestamp = new Date();
public final Packet packet;
public UnackedPacket(Date date, Packet p) {
timestamp = date;
public UnackedPacket(long x, Packet p) {
this.x = x;
packet = p;
}
}
......@@ -46,22 +48,10 @@ public class StreamManager {
private final Connection connection;
/**
* Whether Stream Management is enabled for session
* the manager belongs to.
*/
private boolean enabled;
/**
* Namespace to be used in stanzas sent to client (depending on XEP-0198 version used by client)
*/
private String namespace;
/**
* Count of how many stanzas/packets
* have been sent from the server to the client (not necessarily processed)
*/
private long serverSentStanzas = 0;
/**
* Count of how many stanzas/packets
* sent from the client that the server has processed
......@@ -73,8 +63,8 @@ public class StreamManager {
* sent from the server that the client has processed
*/
private long clientProcessedStanzas = 0;
static private long mask = 0xFFFFFFFF; /* 2**32 - 1; this is used to emulate rollover */
static private long mask = new BigInteger("2").pow(32).longValue() - 1; // This is used to emulate rollover.
/**
* Collection of stanzas/packets sent to client that haven't been acknowledged.
......@@ -82,26 +72,88 @@ public class StreamManager {
private Deque<UnackedPacket> unacknowledgedServerStanzas = new LinkedList<>();
public StreamManager(Connection connection) {
String address;
try {
address = connection.getHostAddress();
}
catch ( UnknownHostException e )
{
address = null;
}
this.Log = LoggerFactory.getLogger(StreamManager.class + "["+ (address == null ? "(unknown address)" : address) +"]" );
this.connection = connection;
}
/**
/**
* Processes a stream management element.
*
* @param element The stream management element to be processed.
* @param onBehalfOf The (full) JID of the entity for which the element is processed.
*/
public void process( Element element, JID onBehalfOf )
{
switch(element.getName()) {
case "enable":
enable( onBehalfOf, element.getNamespace().getStringValue() );
break;
case "r":
sendServerAcknowledgement();
break;
case "a":
processClientAcknowledgement( element);
break;
default:
sendUnexpectedError();
}
}
/**
* Attempts to enable Stream Management for the entity identified by the provided JID.
*
* @param onBehalfOf The address of the entity for which SM is to be enabled.
* @param namespace The namespace that defines what version of SM is to be enabled.
*/
private void enable( JID onBehalfOf, String namespace )
{
// Ensure that resource binding has occurred.
if( onBehalfOf.getResource() == null ) {
sendUnexpectedError();
return;
}
synchronized ( this )
{
// Do nothing if already enabled
if ( isEnabled() )
{
return;
}
this.namespace = namespace;
}
// Send confirmation to the requestee.
connection.deliverRawText( String.format( "<enabled xmlns='%s'/>", namespace ) );
}
/**
* Sends XEP-0198 acknowledgement <a /> to client from server
*/
public void sendServerAcknowledgement() {
if(isEnabled()) {
String ack = String.format("<a xmlns='%s' h='%s' />", getNamespace(), getServerProcessedStanzas() & mask);
getConnection().deliverRawText(ack);
String ack = String.format("<a xmlns='%s' h='%s' />", namespace, serverProcessedStanzas & mask);
connection.deliverRawText( ack );
}
}
/**
* Sends XEP-0198 request <r /> to client from server
* Sends XEP-0198 request <r /> to client from server
*/
private void sendServerRequest() {
if(isEnabled()) {
String request = String.format("<r xmlns='%s' />", getNamespace());
getConnection().deliverRawText(request);
String request = String.format("<r xmlns='%s' />", namespace);
connection.deliverRawText( request );
}
}
......@@ -109,59 +161,92 @@ public class StreamManager {
* Send an error if a XEP-0198 stanza is received at an unexpected time.
* e.g. before resource-binding has completed.
*/
public void sendUnexpectedError() {
StringBuilder sb = new StringBuilder(340);
sb.append(String.format("<failed xmlns='%s'>", getNamespace()));
sb.append(new PacketError(PacketError.Condition.unexpected_request).toXML());
sb.append("</failed>");
getConnection().deliverRawText(sb.toString());
private void sendUnexpectedError() {
connection.deliverRawText(
String.format( "<failed xmlns='%s'>", namespace )
+ new PacketError( PacketError.Condition.unexpected_request ).toXML()
+ "</failed>"
);
}
/**
* Receive and process acknowledgement packet from client
* @param ack XEP-0198 acknowledgement <a /> stanza to process
*/
public void processClientAcknowledgement(Element ack) {
private void processClientAcknowledgement(Element ack) {
if(isEnabled()) {
synchronized (this) {
if (ack.attribute("h") != null) {
long count = Long.valueOf(ack.attributeValue("h"));
if (ack.attribute("h") != null) {
final long h = Long.valueOf(ack.attributeValue("h"));
Log.debug( "Received acknowledgement from client: h={}", h );
synchronized (this) {
if ( !unacknowledgedServerStanzas.isEmpty() && h > unacknowledgedServerStanzas.getLast().x ) {
Log.warn( "Client acknowledges stanzas that we didn't sent! Client Ack h: {}, our last stanza: {}", h, unacknowledgedServerStanzas.getLast().x );
}
clientProcessedStanzas = h;
// Remove stanzas from temporary storage as now acknowledged
Deque<UnackedPacket> unacknowledgedStanzas = getUnacknowledgedServerStanzas();
long i = getClientProcessedStanzas();
Log.debug("Ack: h={} mine={} length={}", count, i, unacknowledgedStanzas.size());
if (count < i) {
/* Consider rollover? */
Log.debug("Maybe rollover");
if (i > mask) {
while (count < i) {
Log.debug("Rolling...");
count += mask + 1;
}
}
Log.trace( "Before processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size() );
// Pop all acknowledged stanzas.
while( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getFirst().x <= h )
{
unacknowledgedServerStanzas.removeFirst();
}
while (i < count) {
unacknowledgedStanzas.removeFirst();
i++;
Log.debug("In Ack: h={} mine={} length={}", count, i, unacknowledgedStanzas.size());
// Ensure that unacknowledged stanzas are purged after the client rolled over 'h' which occurs at h= (2^32)-1
final int maxUnacked = getMaximumUnacknowledgedStanzas();
final boolean clientHadRollOver = h < maxUnacked && !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked;
if ( clientHadRollOver )
{
Log.info( "Client rolled over 'h'. Purging high-numbered unacknowledged stanzas." );
while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked)
{
unacknowledgedServerStanzas.removeLast();
}
}
setClientProcessedStanzas(count);
Log.trace( "After processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size());
}
}
}
}
/**
* Registers that Openfire sends a stanza to the client (which is expected to be acknowledged later).
* @param packet The stanza that is sent.
*/
public void sentStanza(Packet packet) {
if(isEnabled()) {
synchronized (this) {
incrementServerSentStanzas();
// Temporarily store packet until delivery confirmed
getUnacknowledgedServerStanzas().addLast(new StreamManager.UnackedPacket(new Date(), packet.createCopy()));
Log.debug("Added stanza of type {}, now {} / {}", packet.getClass().getName(), getServerSentStanzas(), getUnacknowledgedServerStanzas().size());
final long requestFrequency = JiveGlobals.getLongProperty( "stream.management.requestFrequency", 5 );
final int size;
synchronized (this)
{
// The next ID is one higher than the last stanza that was sent (which might be unacknowledged!)
final long x = 1 + ( unacknowledgedServerStanzas.isEmpty() ? clientProcessedStanzas : unacknowledgedServerStanzas.getLast().x );
unacknowledgedServerStanzas.addLast( new StreamManager.UnackedPacket( x, packet.createCopy() ) );
size = unacknowledgedServerStanzas.size();
Log.trace( "Added stanza of type '{}' to collection of unacknowledged stanzas (x={}). Collection size is now {}.", packet.getElement().getName(), x, size );
// Prevent keeping to many stanzas in memory.
if ( size > getMaximumUnacknowledgedStanzas() )
{
Log.warn( "To many stanzas go unacknowledged for this connection. Clearing queue and disabling functionality." );
namespace = null;
unacknowledgedServerStanzas.clear();
return;
}
}
if(getServerSentStanzas() % JiveGlobals.getLongProperty("stream.management.requestFrequency", 5) == 0) {
// When we have a sizable amount of unacknowledged stanzas, request acknowledgement.
if ( size % requestFrequency == 0 ) {
Log.debug( "Requesting acknowledgement from peer, as we have {} or more unacknowledged stanzas.", requestFrequency );
sendServerRequest();
}
}
......@@ -170,19 +255,16 @@ public class StreamManager {
public void onClose(PacketRouter router, JID serverAddress) {
// Re-deliver unacknowledged stanzas from broken stream (XEP-0198)
if(isEnabled()) {
setEnabled(false); // Avoid concurrent usage.
synchronized (this) {
Deque<StreamManager.UnackedPacket> unacknowledgedStanzas = getUnacknowledgedServerStanzas();
if (!unacknowledgedStanzas.isEmpty()) {
for (StreamManager.UnackedPacket unacked : unacknowledgedStanzas) {
if (unacked.packet instanceof Message) {
Message m = (Message) unacked.packet;
if (m.getExtension("delay", "urn:xmpp:delay") == null) {
Element delayInformation = m.addChildElement("delay", "urn:xmpp:delay");
delayInformation.addAttribute("stamp", XMPPDateTimeFormat.format(unacked.timestamp));
delayInformation.addAttribute("from", serverAddress.toBareJID());
}
synchronized (this) {
if(isEnabled()) {
namespace = null; // disable stream management.
for (StreamManager.UnackedPacket unacked : unacknowledgedServerStanzas) {
if (unacked.packet instanceof Message) {
Message m = (Message) unacked.packet;
if (m.getExtension("delay", "urn:xmpp:delay") == null) {
Element delayInformation = m.addChildElement("delay", "urn:xmpp:delay");
delayInformation.addAttribute("stamp", XMPPDateTimeFormat.format(unacked.timestamp));
delayInformation.addAttribute("from", serverAddress.toBareJID());
}
router.route(unacked.packet);
}
......@@ -192,75 +274,13 @@ public class StreamManager {
}
/**
* Get connection (stream) for the session
* @return
*/
public Connection getConnection() {
return connection;
}
/**
* Determines whether Stream Management enabled for session this
* manager belongs to.
* @return
* @return true when stream management is enabled, otherwise false.
*/
public boolean isEnabled() {
return enabled;
}
/**
* Sets whether Stream Management enabled for session this
* manager belongs to.
* @param enabled
*/
synchronized public void setEnabled(boolean enabled) {
this.enabled = enabled;
if(enabled) {
String enabledStanza = String.format("<enabled xmlns='%s'/>", getNamespace());
getConnection().deliverRawText(enabledStanza);
}
}
/**
* Retrieve configured XEP-0198 namespace
* @return
*/
public String getNamespace() {
return namespace;
}
/**
* Configure XEP-0198 namespace
* @param namespace
*/
public void setNamespace(String namespace) {
this.namespace = namespace;
}
/**
* Retrieves number of stanzas sent to client by server.
* @return
*/
public long getServerSentStanzas() {
return serverSentStanzas;
}
/**
* Increments the count of stanzas sent to client by server.
*/
public void incrementServerSentStanzas() {
this.serverSentStanzas++;
}
/**
* Retrieve the number of stanzas processed by the server since
* Stream Management was enabled.
* @return
*/
public long getServerProcessedStanzas() {
return serverProcessedStanzas;
return namespace != null;
}
/**
......@@ -274,30 +294,11 @@ public class StreamManager {
}
/**
* Retrieve the number of stanzas processed by the client since
* Stream Management was enabled.
* @return
* The maximum amount of stanzas we keep, waiting for ack.
* @return The maximum number of stanzas.
*/
public long getClientProcessedStanzas() {
return clientProcessedStanzas;
private int getMaximumUnacknowledgedStanzas()
{
return JiveGlobals.getIntProperty( "stream.management.max-unacked", 10000 );
}
/**
* Sets the count of stanzas processed by the client since
* Stream Management was enabled.
*/
public void setClientProcessedStanzas(long count) {
if(count >= clientProcessedStanzas) {
clientProcessedStanzas = count;
}
}
/**
* Retrieves all unacknowledged stanzas sent to client from server.
* @return
*/
public Deque<UnackedPacket> getUnacknowledgedServerStanzas() {
return unacknowledgedServerStanzas;
}
}
......@@ -57,21 +57,8 @@ public class StreamManagementPacketRouter extends SessionPacketRouter {
@Override
public void route(Element wrappedElement) throws UnknownStanzaException {
String tag = wrappedElement.getName();
if (StreamManager.NAMESPACE_V3.equals(wrappedElement.getNamespace().getStringValue())) {
switch(tag) {
case "enable":
session.enableStreamMangement(wrappedElement);
break;
case "r":
session.getStreamManager().sendServerAcknowledgement();
break;
case "a":
session.getStreamManager().processClientAcknowledgement(wrappedElement);
break;
default:
session.getStreamManager().sendUnexpectedError();
}
session.getStreamManager().process( wrappedElement, session.getAddress() );
} else {
super.route(wrappedElement);
if (isUnsolicitedAckExpected()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment