StreamManager.java 9.3 KB
Newer Older
1 2
package org.jivesoftware.openfire.streammanagement;

3
import java.math.BigInteger;
4
import java.net.UnknownHostException;
5
import java.util.*;
6 7 8

import org.dom4j.Element;
import org.jivesoftware.openfire.Connection;
9 10 11 12 13 14 15
import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
16
import org.xmpp.packet.Packet;
17
import org.xmpp.packet.PacketError;
18 19 20 21 22 23 24 25

/**
 * XEP-0198 Stream Manager.
 * Handles client/server messages acknowledgement.
 *
 * @author jonnyheavey
 */
public class StreamManager {
26

27
	private final Logger Log;
28
    public static class UnackedPacket {
29 30
		public final long x;
        public final Date timestamp = new Date();
31 32
        public final Packet packet;
        
33 34
        public UnackedPacket(long x, Packet p) {
			this.x = x;
35 36 37
            packet = p;
        }
    }
Tom Evans's avatar
Tom Evans committed
38 39
    
    public static final String SM_ACTIVE = "stream.management.active";
40 41 42 43 44 45 46 47 48 49

    /**
     * Stanza namespaces
     */
    public static final String NAMESPACE_V2 = "urn:xmpp:sm:2";
    public static final String NAMESPACE_V3 = "urn:xmpp:sm:3";

	/**
	 * Connection (stream) to client for the session the manager belongs to
	 */
50
	private final Connection connection;
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67

	/**
     * Namespace to be used in stanzas sent to client (depending on XEP-0198 version used by client)
     */
    private String namespace;

    /**
     * Count of how many stanzas/packets
     * sent from the client that the server has processed
     */
    private long serverProcessedStanzas = 0;

    /**
 	 * Count of how many stanzas/packets
     * sent from the server that the client has processed
     */
    private long clientProcessedStanzas = 0;
68

69
    static private long mask = new BigInteger("2").pow(32).longValue() - 1; // This is used to emulate rollover.
70 71 72 73

    /**
     * Collection of stanzas/packets sent to client that haven't been acknowledged.
     */
74
    private Deque<UnackedPacket> unacknowledgedServerStanzas = new LinkedList<>();
75 76

    public StreamManager(Connection connection) {
77 78 79 80 81 82 83 84 85 86
		String address;
		try {
			address = connection.getHostAddress();
		}
		catch ( UnknownHostException e )
		{
			address = null;
		}

		this.Log = LoggerFactory.getLogger(StreamManager.class + "["+ (address == null ? "(unknown address)" : address) +"]" );
87
    	this.connection = connection;
88 89
    }

90 91 92 93 94 95 96 97 98 99
	/**
	 * Processes a stream management element.
	 *
	 * @param element The stream management element to be processed.
	 * @param onBehalfOf The (full) JID of the entity for which the element is processed.
	 */
	public void process( Element element, JID onBehalfOf )
	{
		switch(element.getName()) {
			case "enable":
100
				enable( onBehalfOf, element.getNamespace().getStringValue() );
101 102 103 104 105 106 107 108 109 110 111 112
				break;
			case "r":
				sendServerAcknowledgement();
				break;
			case "a":
				processClientAcknowledgement( element);
				break;
			default:
				sendUnexpectedError();
		}
	}

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
	/**
	 * Attempts to enable Stream Management for the entity identified by the provided JID.
	 *
	 * @param onBehalfOf The address of the entity for which SM is to be enabled.
	 * @param namespace The namespace that defines what version of SM is to be enabled.
	 */
	private void enable( JID onBehalfOf, String namespace )
	{
		// Ensure that resource binding has occurred.
		if( onBehalfOf.getResource() == null ) {
			sendUnexpectedError();
			return;
		}

		synchronized ( this )
		{
			// Do nothing if already enabled
			if ( isEnabled() )
			{
				return;
			}

135
			this.namespace = namespace;
136 137 138
		}

		// Send confirmation to the requestee.
139
		connection.deliverRawText( String.format( "<enabled xmlns='%s'/>", namespace ) );
140 141
	}

142
	/**
akrherz's avatar
akrherz committed
143
     * Sends XEP-0198 acknowledgement &lt;a /&gt; to client from server
144 145 146
     */
	public void sendServerAcknowledgement() {
		if(isEnabled()) {
147 148
			String ack = String.format("<a xmlns='%s' h='%s' />", namespace, serverProcessedStanzas & mask);
			connection.deliverRawText( ack );
149 150 151 152
		}
	}

	/**
153
	 * Sends XEP-0198 request <r /> to client from server
154
	 */
155
	private void sendServerRequest() {
156
		if(isEnabled()) {
157 158
			String request = String.format("<r xmlns='%s' />", namespace);
			connection.deliverRawText( request );
159 160 161 162 163 164 165
		}
	}

	/**
	 * Send an error if a XEP-0198 stanza is received at an unexpected time.
	 * e.g. before resource-binding has completed.
	 */
166 167 168 169 170
	private void sendUnexpectedError() {
		connection.deliverRawText(
				String.format( "<failed xmlns='%s'>", namespace )
						+ new PacketError( PacketError.Condition.unexpected_request ).toXML()
						+ "</failed>"
171
		);
172 173 174 175 176 177
	}

	/**
	 * Receive and process acknowledgement packet from client
	 * @param ack XEP-0198 acknowledgement <a /> stanza to process
	 */
178
	private void processClientAcknowledgement(Element ack) {
179
		if(isEnabled()) {
180
			if (ack.attribute("h") != null) {
181
				final long h = Long.valueOf(ack.attributeValue("h"));
182 183

				Log.debug( "Received acknowledgement from client: h={}", h );
184
				synchronized (this) {
185 186

					if ( !unacknowledgedServerStanzas.isEmpty() && h > unacknowledgedServerStanzas.getLast().x ) {
Guus der Kinderen's avatar
Guus der Kinderen committed
187
						Log.warn( "Client acknowledges stanzas that we didn't send! Client Ack h: {}, our last stanza: {}", h, unacknowledgedServerStanzas.getLast().x );
188
					}
189

190 191
					clientProcessedStanzas = h;

192
					// Remove stanzas from temporary storage as now acknowledged
193
					Log.trace( "Before processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size() );
194 195 196 197

					// Pop all acknowledged stanzas.
					while( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getFirst().x <= h )
					{
198
						unacknowledgedServerStanzas.removeFirst();
199 200
					}

201
					// Ensure that unacknowledged stanzas are purged after the client rolled over 'h' which occurs at h= (2^32)-1
202 203
					final int maxUnacked = getMaximumUnacknowledgedStanzas();
					final boolean clientHadRollOver = h < maxUnacked && !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked;
204
					if ( clientHadRollOver )
205
					{
206 207
						Log.info( "Client rolled over 'h'. Purging high-numbered unacknowledged stanzas." );
						while ( !unacknowledgedServerStanzas.isEmpty() && unacknowledgedServerStanzas.getLast().x > mask - maxUnacked)
208 209 210
						{
							unacknowledgedServerStanzas.removeLast();
						}
211
					}
212

213
					Log.trace( "After processing client Ack (h={}): {} unacknowledged stanzas.", h, unacknowledgedServerStanzas.size());
214
				}
215 216 217
			}
		}
	}
218

219 220 221 222
	/**
	 * Registers that Openfire sends a stanza to the client (which is expected to be acknowledged later).
	 * @param packet The stanza that is sent.
	 */
223 224 225
	public void sentStanza(Packet packet) {

		if(isEnabled()) {
226
			final long requestFrequency = JiveGlobals.getLongProperty( "stream.management.requestFrequency", 5 );
227
			final int size;
228

229 230
			synchronized (this)
			{
231 232 233
				// The next ID is one higher than the last stanza that was sent (which might be unacknowledged!)
				final long x = 1 + ( unacknowledgedServerStanzas.isEmpty() ? clientProcessedStanzas : unacknowledgedServerStanzas.getLast().x );
				unacknowledgedServerStanzas.addLast( new StreamManager.UnackedPacket( x, packet.createCopy() ) );
234

235 236
				size = unacknowledgedServerStanzas.size();

237
				Log.trace( "Added stanza of type '{}' to collection of unacknowledged stanzas (x={}). Collection size is now {}.", packet.getElement().getName(), x, size );
238 239 240 241 242 243 244 245 246

				// Prevent keeping to many stanzas in memory.
				if ( size > getMaximumUnacknowledgedStanzas() )
				{
					Log.warn( "To many stanzas go unacknowledged for this connection. Clearing queue and disabling functionality." );
					namespace = null;
					unacknowledgedServerStanzas.clear();
					return;
				}
247
			}
248

249
			// When we have a sizable amount of unacknowledged stanzas, request acknowledgement.
250 251
			if ( size % requestFrequency == 0 ) {
				Log.debug( "Requesting acknowledgement from peer, as we have {} or more unacknowledged stanzas.", requestFrequency );
252
				sendServerRequest();
253 254
			}
		}
255 256 257 258 259

	}

	public void onClose(PacketRouter router, JID serverAddress) {
		// Re-deliver unacknowledged stanzas from broken stream (XEP-0198)
260
		synchronized (this) {
261
			if(isEnabled()) {
262
				namespace = null; // disable stream management.
263 264 265 266
				for (StreamManager.UnackedPacket unacked : unacknowledgedServerStanzas) {
					if (unacked.packet instanceof Message) {
						Message m = (Message) unacked.packet;
						if (m.getExtension("delay", "urn:xmpp:delay") == null) {
267 268 269
							Element delayInformation = m.addChildElement("delay", "urn:xmpp:delay");
							delayInformation.addAttribute("stamp", XMPPDateTimeFormat.format(unacked.timestamp));
							delayInformation.addAttribute("from", serverAddress.toBareJID());
270 271 272 273 274 275 276
						}
						router.route(unacked.packet);
					}
				}
			}
		}

277 278 279 280 281
	}

	/**
	 * Determines whether Stream Management enabled for session this
	 * manager belongs to.
282
	 * @return true when stream management is enabled, otherwise false.
283 284
	 */
	public boolean isEnabled() {
285
		return namespace != null;
286 287 288 289 290 291 292 293 294 295 296
	}

	/**
	 * Increments the count of stanzas processed by the server since
	 * Stream Management was enabled.
	 */
	public void incrementServerProcessedStanzas() {
		if(isEnabled()) {
			this.serverProcessedStanzas++;
		}
	}
297 298 299 300 301 302 303 304 305

	/**
	 * The maximum amount of stanzas we keep, waiting for ack.
	 * @return The maximum number of stanzas.
	 */
	private int getMaximumUnacknowledgedStanzas()
	{
		return JiveGlobals.getIntProperty( "stream.management.max-unacked", 10000 );
	}
306
}