Commit 1a860d63 authored by Dave Cridland's avatar Dave Cridland

Merge pull request #259 from surevine/xep-0198

Initial xep-0198 message ack implementation
parents 94b1155e a6ea2b4b
......@@ -34,13 +34,25 @@
</network>
-->
<!-- SPDY Protocol is npn.
(note: npn does not work with Java 8)
<!-- SPDY Protocol is npn.
(note: npn does not work with Java 8)
add -Xbootclasspath/p:/OPENFIRE_HOME/lib/npn-boot.jar to .vmoptions file -->
<!--
<spdy>
<protocol>npn</protocol>
</spdy>
<!--
<spdy>
<protocol>npn</protocol>
</spdy>
-->
<!-- XEP-0198 properties -->
<stream>
<management>
<!-- Whether stream management is offered to clients by server. -->
<active>true</active>
<!-- Number of stanzas sent to client before a stream management
acknowledgement request is made. -->
<requestFrequency>5</requestFrequency>
</management>
</stream>
</jive>
......@@ -210,14 +210,18 @@ public class OfflineMessageStore extends BasicModule implements UserEventListene
xmlReader.read(new StringReader(msgXML)).getRootElement());
}
// Add a delayed delivery (XEP-0203) element to the message.
Element delay = message.addChildElement("delay", "urn:xmpp:delay");
delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getXMPPDomain());
delay.addAttribute("stamp", XMPPDateTimeFormat.format(creationDate));
// Add a legacy delayed delivery (XEP-0091) element to the message. XEP is obsolete and support should be dropped in future.
delay = message.addChildElement("x", "jabber:x:delay");
delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getXMPPDomain());
delay.addAttribute("stamp", XMPPDateTimeFormat.formatOld(creationDate));
// if there is already a delay stamp, we shouldn't add another.
Element delaytest = message.getChildElement("delay", "urn:xmpp:delay");
if (delaytest == null) {
// Add a delayed delivery (XEP-0203) element to the message.
Element delay = message.addChildElement("delay", "urn:xmpp:delay");
delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getXMPPDomain());
delay.addAttribute("stamp", XMPPDateTimeFormat.format(creationDate));
// Add a legacy delayed delivery (XEP-0091) element to the message. XEP is obsolete and support should be dropped in future.
delay = message.addChildElement("x", "jabber:x:delay");
delay.addAttribute("from", XMPPServer.getInstance().getServerInfo().getXMPPDomain());
delay.addAttribute("stamp", XMPPDateTimeFormat.formatOld(creationDate));
}
messages.add(message);
}
// Check if the offline messages loaded should be deleted, and that there are
......
......@@ -25,10 +25,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
......@@ -66,11 +69,13 @@ import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
......@@ -110,7 +115,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* Counter of user connections. A connection is counted just after it was created and not
* after the user became available. This counter only considers sessions local to this JVM.
* That means that when running inside of a cluster you will need to add up this counter
* for each cluster node.
* for each cluster node.
*/
private final AtomicInteger connectionsCounter = new AtomicInteger(0);
......@@ -159,7 +164,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* This same information is stored in {@link LocalIncomingServerSession} but the
* reason for this duplication is that when running in a cluster other nodes
* will have access to this clustered cache even in the case of this node going
* down.
* down.
*/
private Cache<String, Set<String>> validatedDomainsCache;
......@@ -171,7 +176,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
/**
* Local session manager responsible for keeping sessions connected to this JVM that are not
* present in the routing table.
* present in the routing table.
*/
private LocalSessionManager localSessionManager;
/**
......@@ -285,7 +290,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* Creates a new <tt>ConnectionMultiplexerSession</tt>.
*
* @param conn the connection to create the session from.
* @param address the JID (may include a resource) of the connection manager's session.
* @param address the JID (may include a resource) of the connection manager's session.
* @return a newly created session.
*/
public LocalConnectionMultiplexerSession createMultiplexerSession(Connection conn, JID address) {
......@@ -1207,7 +1212,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
try {
// If the requesting entity is the user itself or the requesting entity can probe the presence of the user.
if (name != null && senderJID != null &&
server.getUserManager().isRegisteredUser(senderJID) &&
server.getUserManager().isRegisteredUser(senderJID) &&
(name.equals(senderJID.getNode()) || server.getPresenceManager().canProbePresence(senderJID, name))) {
Collection<DiscoItem> discoItems = new ArrayList<DiscoItem>();
for (ClientSession clientSession : getSessions(name)) {
......@@ -1241,6 +1246,26 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
presence.setFrom(session.getAddress());
router.route(presence);
}
// Re-deliver unacknowledged stanzas from broken stream (XEP-0198)
if(session.getStreamManager().isEnabled()) {
session.getStreamManager().setEnabled(false); // Avoid concurrent usage.
Deque<StreamManager.UnackedPacket> unacknowledgedStanzas = session.getStreamManager().getUnacknowledgedServerStanzas();
if(!unacknowledgedStanzas.isEmpty()) {
for(StreamManager.UnackedPacket unacked : unacknowledgedStanzas) {
if (unacked.packet instanceof Message) {
Message m = (Message)unacked.packet;
Element delayInformation = m.addChildElement("delay", "urn:xmpp:delay");
Element delayInformationOld = m.addChildElement("x", "jabber:x:delay");
delayInformation.addAttribute("stamp", XMPPDateTimeFormat.format(unacked.timestamp));
delayInformationOld.addAttribute("stamp", XMPPDateTimeFormat.formatOld(unacked.timestamp));
delayInformation.addAttribute("from", serverAddress.toBareJID());
delayInformationOld.addAttribute("from", serverAddress.toBareJID());
}
router.route(unacked.packet);
}
}
}
}
finally {
// Remove the session
......
......@@ -28,6 +28,7 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.http.FlashCrossDomainServlet;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils;
......@@ -182,13 +183,28 @@ public abstract class StanzaHandler {
// resource binding and session establishment (to client sessions only)
waitingCompressionACK = true;
}
} else if(isStreamManagementStanza(doc)) {
switch(tag) {
case "enable":
session.enableStreamMangement(doc);
break;
case "r":
session.getStreamManager().sendServerAcknowledgement();
break;
case "a":
session.getStreamManager().processClientAcknowledgement(doc);
break;
default:
process(doc);
break;
}
}
else {
process(doc);
}
}
private void process(Element doc) throws UnauthorizedException {
private void process(Element doc) throws UnauthorizedException {
if (doc == null) {
return;
}
......@@ -536,6 +552,16 @@ public abstract class StanzaHandler {
connection.deliverRawText(sb.toString());
}
/**
* Determines whether stanza's namespace matches XEP-0198 namespace
* @param stanza Stanza to be checked
* @return whether stanza's namespace matches XEP-0198 namespace
*/
private boolean isStreamManagementStanza(Element stanza) {
return StreamManager.NAMESPACE_V2.equals(stanza.getNamespace().getStringValue()) ||
StreamManager.NAMESPACE_V3.equals(stanza.getNamespace().getStringValue());
}
private String geStreamHeader() {
StringBuilder sb = new StringBuilder(200);
sb.append("<?xml version='1.0' encoding='");
......
......@@ -21,6 +21,7 @@
package org.jivesoftware.openfire.session;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
......@@ -38,6 +39,7 @@ import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.privacy.PrivacyList;
import org.jivesoftware.openfire.privacy.PrivacyListManager;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.openfire.user.PresenceEventDispatcher;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals;
......@@ -801,10 +803,16 @@ public class LocalClientSession extends LocalSession implements ClientSession {
}
}
else {
// If the session has been authenticated then offer resource binding
// If the session has been authenticated then offer resource binding,
// and session establishment
sb.append("<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>");
sb.append("<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"><optional/></session>");
// Offer XEP-0198 stream management capabilities if enabled.
if(JiveGlobals.getBooleanProperty("stream.management.active", true)) {
sb.append(String.format("<sm xmlns='%s'/>", StreamManager.NAMESPACE_V2));
sb.append(String.format("<sm xmlns='%s'/>", StreamManager.NAMESPACE_V3));
}
}
return sb.toString();
}
......@@ -854,7 +862,17 @@ public class LocalClientSession extends LocalSession implements ClientSession {
@Override
public void deliver(Packet packet) throws UnauthorizedException {
conn.deliver(packet);
if(streamManager.isEnabled()) {
streamManager.incrementServerSentStanzas();
// Temporarily store packet until delivery confirmed
streamManager.getUnacknowledgedServerStanzas().addLast(new StreamManager.UnackedPacket(new Date(), packet.createCopy()));
if(getNumServerPackets() % JiveGlobals.getLongProperty("stream.management.requestFrequency", 5) == 0) {
streamManager.sendServerRequest();
}
}
}
@Override
......
......@@ -24,6 +24,7 @@ import java.util.Map;
import javax.net.ssl.SSLSession;
import org.dom4j.Element;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
......@@ -33,7 +34,7 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager;
import org.jivesoftware.openfire.interceptor.PacketRejectedException;
import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.TLSStreamHandler;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -94,6 +95,11 @@ public abstract class LocalSession implements Session {
*/
private final Map<String, Object> sessionData = new HashMap<String, Object>();
/**
* XEP-0198 Stream Manager
*/
protected final StreamManager streamManager;
/**
* Creates a session with an underlying connection and permission protection.
*
......@@ -111,6 +117,7 @@ public abstract class LocalSession implements Session {
String id = streamID.getID();
this.address = new JID(null, serverName, id, true);
this.sessionManager = SessionManager.getInstance();
this.streamManager = new StreamManager(conn);
}
/**
......@@ -209,6 +216,7 @@ public abstract class LocalSession implements Session {
public void incrementClientPacketCount() {
clientPacketCount++;
lastActiveDate = System.currentTimeMillis();
streamManager.incrementServerProcessedStanzas();
}
/**
......@@ -279,6 +287,14 @@ public abstract class LocalSession implements Session {
}
}
/**
* Get XEP-0198 Stream manager for session
* @return
*/
public StreamManager getStreamManager() {
return streamManager;
}
public void process(Packet packet) {
// Check that the requested packet can be processed
if (canProcess(packet)) {
......@@ -413,4 +429,27 @@ public abstract class LocalSession implements Session {
}
return "NONE";
}
/**
* Enables stream management for session
* @param enable XEP-0198 <enable/> element
*/
public void enableStreamMangement(Element enable) {
// Do nothing if already enabled
if(streamManager.isEnabled()) {
return;
}
streamManager.setNamespace(enable.getNamespace().getStringValue());
// Ensure that resource binding has occurred
if(getAddress().getResource() == null) {
streamManager.sendUnexpectedError();
return;
}
streamManager.setEnabled(true);
}
}
package org.jivesoftware.openfire.streammanagement;
import java.util.Date;
import java.util.Deque;
import java.util.LinkedList;
import org.dom4j.Element;
import org.jivesoftware.openfire.Connection;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
/**
* XEP-0198 Stream Manager.
* Handles client/server messages acknowledgement.
*
* @author jonnyheavey
*/
public class StreamManager {
public static class UnackedPacket {
public final Date timestamp;
public final Packet packet;
public UnackedPacket(Date date, Packet p) {
timestamp = date;
packet = p;
}
}
/**
* Stanza namespaces
*/
public static final String NAMESPACE_V2 = "urn:xmpp:sm:2";
public static final String NAMESPACE_V3 = "urn:xmpp:sm:3";
/**
* Connection (stream) to client for the session the manager belongs to
*/
private final Connection connection;
/**
* Whether Stream Management is enabled for session
* the manager belongs to.
*/
private boolean enabled;
/**
* Namespace to be used in stanzas sent to client (depending on XEP-0198 version used by client)
*/
private String namespace;
/**
* Count of how many stanzas/packets
* have been sent from the server to the client (not necessarily processed)
*/
private long serverSentStanzas = 0;
/**
* Count of how many stanzas/packets
* sent from the client that the server has processed
*/
private long serverProcessedStanzas = 0;
/**
* Count of how many stanzas/packets
* sent from the server that the client has processed
*/
private long clientProcessedStanzas = 0;
static private long mask = 0xFFFFFFFF; /* 2**32 - 1; this is used to emulate rollover */
/**
* Collection of stanzas/packets sent to client that haven't been acknowledged.
*/
private Deque<UnackedPacket> unacknowledgedServerStanzas = new LinkedList<UnackedPacket>();
public StreamManager(Connection connection) {
this.connection = connection;
}
/**
* Sends XEP-0198 acknowledgement <a /> to client from server
*/
public void sendServerAcknowledgement() {
if(isEnabled()) {
String ack = String.format("<a xmlns='%s' h='%s' />", getNamespace(), getServerProcessedStanzas() & mask);
getConnection().deliverRawText(ack);
}
}
/**
* Sends XEP-0198 request <r /> to client from server
*/
public void sendServerRequest() {
if(isEnabled()) {
String request = String.format("<r xmlns='%s' />", getNamespace());
getConnection().deliverRawText(request);
}
}
/**
* Send an error if a XEP-0198 stanza is received at an unexpected time.
* e.g. before resource-binding has completed.
*/
public void sendUnexpectedError() {
StringBuilder sb = new StringBuilder(340);
sb.append(String.format("<failed xmlns='%s'>", getNamespace()));
sb.append(new PacketError(PacketError.Condition.unexpected_request).toXML());
sb.append("</failed>");
getConnection().deliverRawText(sb.toString());
}
/**
* Receive and process acknowledgement packet from client
* @param ack XEP-0198 acknowledgement <a /> stanza to process
*/
public void processClientAcknowledgement(Element ack) {
if(isEnabled()) {
if(ack.attribute("h") != null) {
long count = Long.valueOf(ack.attributeValue("h"));
// Remove stanzas from temporary storage as now acknowledged
Deque<UnackedPacket> unacknowledgedStanzas = getUnacknowledgedServerStanzas();
long i = getClientProcessedStanzas();
if (count < i) {
/* Consider rollover? */
if (i > mask) {
while (count < i) {
count += mask + 1;
}
}
}
while(i < count) {
unacknowledgedStanzas.removeFirst();
i++;
}
setClientProcessedStanzas(count);
}
}
}
/**
* Get connection (stream) for the session
* @return
*/
public Connection getConnection() {
return connection;
}
/**
* Determines whether Stream Management enabled for session this
* manager belongs to.
* @return
*/
public boolean isEnabled() {
return enabled;
}
/**
* Sets whether Stream Management enabled for session this
* manager belongs to.
* @param enabled
*/
public void setEnabled(boolean enabled) {
this.enabled = enabled;
if(enabled) {
String enabledStanza = String.format("<enabled xmlns='%s'/>", getNamespace());
getConnection().deliverRawText(enabledStanza);
}
}
/**
* Retrieve configured XEP-0198 namespace
* @return
*/
public String getNamespace() {
return namespace;
}
/**
* Configure XEP-0198 namespace
* @param namespace
*/
public void setNamespace(String namespace) {
this.namespace = namespace;
}
/**
* Retrieves number of stanzas sent to client by server.
* @return
*/
public long getServerSentStanzas() {
return serverSentStanzas;
}
/**
* Increments the count of stanzas sent to client by server.
*/
public void incrementServerSentStanzas() {
this.serverSentStanzas++;
}
/**
* Retrieve the number of stanzas processed by the server since
* Stream Management was enabled.
* @return
*/
public long getServerProcessedStanzas() {
return serverProcessedStanzas;
}
/**
* Increments the count of stanzas processed by the server since
* Stream Management was enabled.
*/
public void incrementServerProcessedStanzas() {
if(isEnabled()) {
this.serverProcessedStanzas++;
}
}
/**
* Retrieve the number of stanzas processed by the client since
* Stream Management was enabled.
* @return
*/
public long getClientProcessedStanzas() {
return clientProcessedStanzas;
}
/**
* Sets the count of stanzas processed by the client since
* Stream Management was enabled.
*/
public void setClientProcessedStanzas(long count) {
if(count >= clientProcessedStanzas) {
clientProcessedStanzas = count;
}
}
/**
* Retrieves all unacknowledged stanzas sent to client from server.
* @return
*/
public Deque<UnackedPacket> getUnacknowledgedServerStanzas() {
return unacknowledgedServerStanzas;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment