Commit d574fed4 authored by Tom Evans's avatar Tom Evans

OF-933: Initial WebSocket implementation

Adds websocket capabilities compliant with the latest specifications
(RFC 7395).

Note that the new component is implemented as a plugin, but I have
included a few small modifications to core classes to improve
extensibility.

I have tested these changes using the stanza.io client library on both
Chrome and Firefox.
parent ee22466c
...@@ -35,7 +35,7 @@ import java.io.UnsupportedEncodingException; ...@@ -35,7 +35,7 @@ import java.io.UnsupportedEncodingException;
*/ */
public class SessionPacketRouter implements PacketRouter { public class SessionPacketRouter implements PacketRouter {
private LocalClientSession session; protected LocalClientSession session;
private PacketRouter router; private PacketRouter router;
private boolean skipJIDValidation = false; private boolean skipJIDValidation = false;
......
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head>
<title>Openfire WebSocket Plugin Changelog</title>
<style type="text/css">
BODY {
font-size : 100%;
}
BODY, TD, TH {
font-family : tahoma, verdana, arial, helvetica, sans-serif;
font-size : 0.8em;
}
H2 {
font-size : 10pt;
font-weight : bold;
padding-left : 1em;
}
A:hover {
text-decoration : none;
}
H1 {
font-family : tahoma, arial, helvetica, sans-serif;
font-size : 1.4em;
font-weight: bold;
border-bottom : 1px #ccc solid;
padding-bottom : 2px;
}
TT {
font-family : courier new;
font-weight : bold;
color : #060;
}
PRE {
font-family : courier new;
font-size : 100%;
}
</style>
</head>
<body>
<h1>
Openfire WebSocket Plugin Changelog
</h1>
<p><b>1.0</b> -- July 28, 2015</p>
<ul>
<li>Initial release. </li>
</ul>
</body>
</html>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Plugin configuration for the WebSocket plugin.
-->
<plugin>
<class>org.jivesoftware.openfire.websocket.WebSocketPlugin</class>
<name>Openfire WebSocket</name>
<description>Provides WebSocket support for Openfire.</description>
<author>Tom Evans</author>
<version>1.0.0</version>
<date>07/28/2015</date>
<url>https://tools.ietf.org/html/rfc7395</url>
<minServerVersion>3.10.0</minServerVersion>
</plugin>
\ No newline at end of file
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head>
<title>Openfire WebSocket Plugin Readme</title>
<style type="text/css">
BODY {
font-size : 100%;
}
BODY, TD, TH {
font-family : tahoma, verdana, arial, helvetica, sans-serif;
font-size : 0.8em;
}
H2 {
font-size : 10pt;
font-weight : bold;
}
A:hover {
text-decoration : none;
}
H1 {
font-family : tahoma, arial, helvetica, sans-serif;
font-size : 1.4em;
font-weight: bold;
border-bottom : 1px #ccc solid;
padding-bottom : 2px;
}
TT {
font-family : courier new;
font-weight : bold;
color : #060;
}
PRE {
font-family : courier new;
font-size : 100%;
}
</style>
</head>
<body>
<h1>
Openfire WebSocket Plugin Readme
</h1>
<h2>Overview</h2>
<p>
This plugin extends Openfire to support WebSocket. The implementation follows the XMPP WebSocket subprotocol
(<a href="https://tools.ietf.org/html/rfc7395">RFC 7395</a>) specification, which is a standard extension of the
WebSocket protocol specification (<a href="https://tools.ietf.org/html/rfc6455">RFC 6455</a>).
</p>
<p>
Note that the BOSH (http-bind) capabilities of Openfire must be enabled and correctly configured as a
prerequisite before installing this plugin. The WebSocket servlet is installed within the same context
as the BOSH component, and will reuse the same HTTP/S port(s) when establishing the WebSocket connection.
</p>
<h2>Installation</h2>
<p>Copy websocket.jar into the plugins directory of your Openfire installation. The
plugin will then be automatically deployed. To upgrade to a new version, copy the new
websocket.jar file over the existing file.</p>
<p>
Upon installation, the WebSocket URI path will be /ws/ on the same server/port as the BOSH
connector. To establish a secure WebSocket, modify the following URL as appropriate:
</p>
<pre>
wss://your.openfire.host:7443/ws/
</pre>
</body>
</html>
package org.jivesoftware.openfire.websocket;
import java.io.UnsupportedEncodingException;
import org.dom4j.Element;
import org.jivesoftware.openfire.SessionPacketRouter;
import org.jivesoftware.openfire.multiplex.UnknownStanzaException;
import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.JiveGlobals;
/**
* This class extends Openfire's session packet router with the ACK capabilities
* specified by XEP-0198: Stream Management.
*
* NOTE: This class does NOT support the XEP-0198 stream resumption capabilities.
*
* XEP-0198 allows either party (client or server) to send unsolicited ack/answer
* stanzas on a periodic basis. This implementation approximates BOSH ack behavior
* by sending unsolicited <a /> stanzas from the server to the client after a
* configurable number of stanzas have been received from the client.
*
* Setting the system property to "1" would indicate that each client packet should
* be ack'd by the server when stream management is enabled for a particular stream.
* To disable unsolicited server acks, use the default value for system property
* "stream.management.unsolicitedAckFrequency" ("0"). This setting does not affect
* server responses to explicit ack requests from the client.
*/
public class StreamManagementPacketRouter extends SessionPacketRouter {
public static final String SM_UNSOLICITED_ACK_FREQUENCY = "stream.management.unsolicitedAckFrequency";
static {
JiveGlobals.migrateProperty(SM_UNSOLICITED_ACK_FREQUENCY);
}
private int unsolicitedAckFrequency = JiveGlobals.getIntProperty(SM_UNSOLICITED_ACK_FREQUENCY, 0);
public StreamManagementPacketRouter(LocalClientSession session) {
super(session);
}
@Override
public void route(Element wrappedElement) throws UnsupportedEncodingException, UnknownStanzaException {
String tag = wrappedElement.getName();
if (StreamManager.NAMESPACE_V3.equals(wrappedElement.getNamespace().getStringValue())) {
switch(tag) {
case "enable":
session.enableStreamMangement(wrappedElement);
break;
case "r":
session.getStreamManager().sendServerAcknowledgement();
break;
case "a":
session.getStreamManager().processClientAcknowledgement(wrappedElement);
break;
default:
session.getStreamManager().sendUnexpectedError();
}
} else {
super.route(wrappedElement);
if (isUnsolicitedAckExpected()) {
session.getStreamManager().sendServerAcknowledgement();
}
}
}
private boolean isUnsolicitedAckExpected() {
if (!session.getStreamManager().isEnabled()) {
return false;
}
return unsolicitedAckFrequency > 0 && session.getNumClientPackets() % unsolicitedAckFrequency == 0;
}
}
package org.jivesoftware.openfire.websocket;
import java.net.InetSocketAddress;
import org.dom4j.Namespace;
import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.VirtualConnection;
import org.jivesoftware.openfire.nio.OfflinePacketDeliverer;
import org.xmpp.packet.Packet;
import org.xmpp.packet.StreamError;
/**
* Following the conventions of the BOSH implementation, this class extends {@link VirtualConnection}
* and delegates the expected XMPP connection behaviors to the corresponding {@link XmppWebSocket}.
*/
public class WebSocketConnection extends VirtualConnection
{
private static final String CLIENT_NAMESPACE = "jabber:client";
private InetSocketAddress remotePeer;
private XmppWebSocket socket;
private PacketDeliverer backupDeliverer;
public WebSocketConnection(XmppWebSocket socket, InetSocketAddress remotePeer) {
this.socket = socket;
this.remotePeer = remotePeer;
}
@Override
public void closeVirtualConnection()
{
socket.closeSession();
}
@Override
public byte[] getAddress() {
return remotePeer.getAddress().getAddress();
}
@Override
public String getHostAddress() {
return remotePeer.getAddress().getHostAddress();
}
@Override
public String getHostName() {
return remotePeer.getHostName();
}
@Override
public void systemShutdown() {
deliverRawText(new StreamError(StreamError.Condition.system_shutdown).toXML());
close();
}
@Override
public void deliver(Packet packet) throws UnauthorizedException
{
if (Namespace.NO_NAMESPACE.equals(packet.getElement().getNamespace())) {
packet.getElement().add(Namespace.get(CLIENT_NAMESPACE));
}
if (validate()) {
deliverRawText(packet.toXML());
} else {
// use fallback delivery mechanism (offline)
getPacketDeliverer().deliver(packet);
}
}
@Override
public void deliverRawText(String text)
{
socket.deliver(text);
}
@Override
public boolean validate() {
return socket.isWebSocketOpen();
}
@Override
public boolean isSecure() {
return socket.isWebSocketSecure();
}
@Override
public PacketDeliverer getPacketDeliverer() {
if (backupDeliverer == null) {
backupDeliverer = new OfflinePacketDeliverer();
}
return backupDeliverer;
}
@Override
public boolean isCompressed() {
return XmppWebSocket.isCompressionEnabled();
}
}
package org.jivesoftware.openfire.websocket;
import java.io.File;
import java.text.MessageFormat;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.container.Plugin;
import org.jivesoftware.openfire.container.PluginClassLoader;
import org.jivesoftware.openfire.container.PluginManager;
import org.jivesoftware.openfire.http.HttpBindManager;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.util.JiveGlobals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This plugin enables XMPP over WebSocket (RFC 7395) for Openfire.
*
* The Jetty WebSocketServlet serves as a base class and enables easy integration into the
* BOSH (http-bind) web context. Each WebSocket request received at the "/ws/" URI will be
* forwarded to this plugin/servlet, which will in turn create a new {@link XmppWebSocket}
* for each new connection.
*/
public class WebSocketPlugin extends WebSocketServlet implements Plugin {
private static final long serialVersionUID = 7281841492829464603L;
private static final Logger Log = LoggerFactory.getLogger(WebSocketPlugin.class);
private ServletContextHandler contextHandler;
protected PluginClassLoader pluginClassLoader = null;
@Override
public void initializePlugin(final PluginManager manager, final File pluginDirectory) {
if (Boolean.valueOf(JiveGlobals.getBooleanProperty(HttpBindManager.HTTP_BIND_ENABLED, true))) {
Log.info(String.format("Initializing websocket plugin"));
try {
ContextHandlerCollection contexts = HttpBindManager.getInstance().getContexts();
contextHandler = new ServletContextHandler(contexts, "/ws", ServletContextHandler.SESSIONS);
contextHandler.addServlet(new ServletHolder(this), "/*");
} catch (Exception e) {
Log.error("Failed to start websocket plugin", e);
}
} else {
Log.warn("Failed to start websocket plugin; http-bind is disabled");
}
}
@Override
public void destroyPlugin() {
// terminate any active websocket sessions
SessionManager sm = XMPPServer.getInstance().getSessionManager();
for (ClientSession session : sm.getSessions()) {
if (session instanceof LocalSession) {
Object ws = ((LocalSession) session).getSessionData("ws");
if (ws != null && (Boolean) ws) {
session.close();
}
}
}
ContextHandlerCollection contexts = HttpBindManager.getInstance().getContexts();
contexts.removeHandler(contextHandler);
contextHandler = null;
pluginClassLoader = null;
}
@Override
public void configure(WebSocketServletFactory factory)
{
if (XmppWebSocket.isCompressionEnabled()) {
factory.getExtensionFactory().register("permessage-deflate", PerMessageDeflateExtension.class);
}
factory.setCreator(new WebSocketCreator() {
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
{
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
try {
ClassLoader pcl = getPluginClassLoader();
Thread.currentThread().setContextClassLoader(pcl == null ? ccl : pcl);
for (String subprotocol : req.getSubProtocols())
{
if ("xmpp".equals(subprotocol))
{
resp.setAcceptedSubProtocol(subprotocol);
return new XmppWebSocket();
}
}
} catch (Exception e) {
Log.warn(MessageFormat.format("Unable to load websocket factory: {0} ({1})", e.getClass().getName(), e.getMessage()));
} finally {
Thread.currentThread().setContextClassLoader(ccl);
}
Log.warn("Failed to create websocket: " + req);
return null;
}
});
}
protected synchronized PluginClassLoader getPluginClassLoader() {
PluginManager pm = XMPPServer.getInstance().getPluginManager();
if (pluginClassLoader == null) {
pluginClassLoader = pm.getPluginClassloader(this);
}
// report error if plugin is unavailable
if (pluginClassLoader == null) {
Log.error("Unable to find class loader for websocket plugin");
}
return pluginClassLoader;
}
}
package org.jivesoftware.openfire.websocket;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.openfire.net.MXParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
public class XMPPPPacketReaderFactory extends BasePooledObjectFactory<XMPPPacketReader> {
private static Logger Log = LoggerFactory.getLogger( XMPPPPacketReaderFactory.class );
private static XmlPullParserFactory xppFactory = null;
static {
try {
xppFactory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
xppFactory.setNamespaceAware(true);
}
catch (XmlPullParserException e) {
Log.error("Error creating a parser factory", e);
}
}
//-- BasePooledObjectFactory implementation
@Override
public XMPPPacketReader create() throws Exception {
XMPPPacketReader parser = new XMPPPacketReader();
parser.setXPPFactory( xppFactory );
return parser;
}
@Override
public PooledObject<XMPPPacketReader> wrap(XMPPPacketReader reader) {
return new DefaultPooledObject<XMPPPacketReader>(reader);
}
@Override
public boolean validateObject(PooledObject<XMPPPacketReader> po) {
// reset the input for the pooled parser
try {
po.getObject().getXPPParser().resetInput();
return true;
} catch (XmlPullParserException xppe) {
Log.error("Failed to reset pooled parser; evicting from pool", xppe);
return false;
}
}
}
package org.jivesoftware.openfire.websocket;
import java.io.IOException;
import java.io.StringReader;
import java.util.TimerTask;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.SessionPacketRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.multiplex.UnknownStanzaException;
import org.jivesoftware.openfire.net.SASLAuthentication;
import org.jivesoftware.openfire.net.SASLAuthentication.Status;
import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.StreamError;
/**
* This class handles all WebSocket events for the corresponding connection with a remote peer.
* Specifically the XMPP session is managed concurrently with the WebSocket session, including all
* framing and authentication requirements. Packets received from the remote peer are forwarded as
* needed via a {@link SessionPacketRouter}, and packets destined for the remote peer are forwarded
* via the corresponding {@link RemoteEndpoint}.
*/
@WebSocket
public class XmppWebSocket {
private static final String STREAM_HEADER = "open";
private static final String STREAM_FOOTER = "close";
private static final String FRAMING_NAMESPACE = "urn:ietf:params:xml:ns:xmpp-framing";
private static Logger Log = LoggerFactory.getLogger( XmppWebSocket.class );
private static GenericObjectPool<XMPPPacketReader> readerPool;
private SessionPacketRouter router;
private Session wsSession;
private WebSocketConnection wsConnection;
private LocalClientSession xmppSession;
private boolean startedSASL = false;
private Status saslStatus;
private TimerTask pingTask;
public XmppWebSocket() {
if (readerPool == null) {
initializePool();
}
}
// WebSocket event handlers
@OnWebSocketConnect
public void onConnect(Session session)
{
wsSession = session;
wsConnection = new WebSocketConnection(this, session.getRemoteAddress());
pingTask = new PingTask();
TaskEngine.getInstance().schedule(pingTask, JiveConstants.MINUTE, JiveConstants.MINUTE);
}
@OnWebSocketClose
public void onClose(int statusCode, String reason)
{
closeSession();
}
@OnWebSocketMessage
public void onTextMethod(String stanza)
{
try {
XMPPPacketReader reader = readerPool.borrowObject();
Document doc = reader.read(new StringReader(stanza));
readerPool.returnObject(reader);
if (xmppSession == null) {
initiateSession(doc.getRootElement());
} else {
processStanza(doc.getRootElement());
}
} catch (Exception ex) {
Log.error("Failed to initiate XMPP session", ex);
}
}
@OnWebSocketError
public void onError(Throwable error)
{
Log.error("Error detected; session: " + wsSession, error);
closeStream(new StreamError(StreamError.Condition.internal_server_error));
try {
wsSession.disconnect();
} catch ( Exception e ) {
Log.error("Error disconnecting websocket", e);
}
}
// local (package) visibility
boolean isWebSocketOpen() {
return wsSession != null && wsSession.isOpen();
}
boolean isWebSocketSecure() {
return wsSession != null && wsSession.isSecure();
}
void closeWebSocket()
{
if (isWebSocketOpen())
{
wsSession.close();
}
wsSession = null;
}
void closeSession() {
if (isWebSocketOpen()) {
closeStream(null);
}
if (xmppSession != null) {
xmppSession.close();
SessionManager.getInstance().removeSession(xmppSession);
xmppSession = null;
}
}
/**
* Send an XML packet to the remote peer
*
* @param packet XML to be sent to client
*/
void deliver(String packet)
{
if (isWebSocketOpen())
{
try {
xmppSession.incrementServerPacketCount();
wsSession.getRemote().sendStringByFuture(packet);
} catch (Exception e) {
Log.error("Packet delivery failed; session: " + wsSession, e);
Log.warn("Failed to deliver packet:\n" + packet );
}
} else {
Log.warn("Failed to deliver packet; socket is closed:\n" + packet);
}
}
static boolean isCompressionEnabled() {
return JiveGlobals.getProperty(
ConnectionSettings.Client.COMPRESSION_SETTINGS, Connection.CompressionPolicy.optional.toString())
.equalsIgnoreCase(Connection.CompressionPolicy.optional.toString());
}
// helper/utility methods
/*
* Process stream headers/footers and authentication stanzas locally;
* otherwise delegate stanza handling to the session packet router.
*/
private void processStanza(Element stanza) {
try {
String tag = stanza.getName();
if (STREAM_FOOTER.equals(tag)) {
closeStream(null);
} else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
startedSASL = true;
// Process authentication stanza
xmppSession.incrementClientPacketCount();
saslStatus = SASLAuthentication.handle(xmppSession, stanza);
} else if (startedSASL && "response".equals(tag) || "abort".equals(tag)) {
// User is responding to SASL challenge. Process response
xmppSession.incrementClientPacketCount();
saslStatus = SASLAuthentication.handle(xmppSession, stanza);
} else if (STREAM_HEADER.equals(tag)) {
// restart the stream
openStream(stanza.attributeValue("lang", "en"), stanza.attributeValue("from"));
configureStream();
} else if (Status.authenticated.equals(saslStatus)) {
if (router == null) {
router = new StreamManagementPacketRouter(xmppSession);
}
router.route(stanza);
} else {
// require authentication
Log.warn("Not authorized: " + stanza.asXML());
sendPacketError(stanza, PacketError.Condition.not_authorized);
}
} catch (UnknownStanzaException use) {
Log.warn("Received invalid stanza: " + stanza.asXML());
sendPacketError(stanza, PacketError.Condition.bad_request);
} catch (Exception ex) {
Log.error("Failed to process incoming stanza: " + stanza.asXML(), ex);
closeStream(new StreamError(StreamError.Condition.internal_server_error));
}
}
/*
* Initiate the stream and corresponding XMPP session.
*/
private void initiateSession(Element stanza) {
String host = stanza.attributeValue("to");
StreamError streamError = null;
if (STREAM_FOOTER.equals(stanza.getName())) {
// an error occurred while setting up the session
closeStream(null);
} else if (!STREAM_HEADER.equals(stanza.getName())) {
streamError = new StreamError(StreamError.Condition.unsupported_stanza_type);
Log.warn("Closing session due to incorrect stream header. Tag: " + stanza.getName());
} else if (!FRAMING_NAMESPACE.equals(stanza.getNamespace().getURI())) {
// Validate the stream namespace (https://tools.ietf.org/html/rfc7395#section-3.3.2)
streamError = new StreamError(StreamError.Condition.invalid_namespace);
Log.warn("Closing session due to invalid namespace in stream header. Namespace: " + stanza.getNamespace().getURI());
} else if (STREAM_FOOTER.equals(stanza.getName())) {
closeStream(null);
} else if (!validateHost(host)) {
streamError = new StreamError(StreamError.Condition.host_unknown);
Log.warn("Closing session due to incorrect hostname in stream header. Host: " + host);
} else {
xmppSession = SessionManager.getInstance().createClientSession(wsConnection);
xmppSession.setSessionData("ws", Boolean.TRUE);
}
if (streamError == null) {
openStream(stanza.attributeValue("lang", "en"), stanza.attributeValue("from"));
configureStream();
} else {
closeStream(streamError);
}
}
private boolean validateHost(String host) {
boolean result = true;
if (JiveGlobals.getBooleanProperty("xmpp.client.validate.host", false)) {
result = XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(host);
}
return result;
}
/*
* Prepare response for stream initiation (sasl) or stream restart (features).
*/
private void configureStream() {
StringBuilder sb = new StringBuilder(250);
sb.append("<stream:features xmlns:stream='http://etherx.jabber.org/streams'>");
if (saslStatus == null) {
// Include available SASL Mechanisms
sb.append(SASLAuthentication.getSASLMechanisms(xmppSession));
sb.append("<auth xmlns='http://jabber.org/features/iq-auth'/>");
} else if (saslStatus.equals(Status.authenticated)) {
// Include Stream features
sb.append(String.format("<bind xmlns='%s'/>", "urn:ietf:params:xml:ns:xmpp-bind"));
sb.append(String.format("<session xmlns='%s'><optional/></session>", "urn:ietf:params:xml:ns:xmpp-session"));
if (JiveGlobals.getBooleanProperty("stream.management.active", true)) {
sb.append(String.format("<sm xmlns='%s'/>", StreamManager.NAMESPACE_V3));
}
}
sb.append("</stream:features>");
deliver(sb.toString());
}
private void openStream(String lang, String jid) {
xmppSession.incrementClientPacketCount();
StringBuilder sb = new StringBuilder(250);
sb.append("<open ");
if (jid != null) {
sb.append("to='").append(jid).append("' ");
}
sb.append("from='").append(XMPPServer.getInstance().getServerInfo().getXMPPDomain()).append("' ");
sb.append("id='").append(xmppSession.getStreamID().toString()).append("' ");
sb.append("xmlns='").append(FRAMING_NAMESPACE).append("' ");
sb.append("lang='").append(lang).append("' ");
sb.append("version='1.0' />");
deliver(sb.toString());
}
private void closeStream(StreamError streamError)
{
if (isWebSocketOpen()) {
if (streamError != null) {
deliver(streamError.toXML());
}
StringBuilder sb = new StringBuilder(250);
sb.append("<close ");
sb.append("xmlns='").append(FRAMING_NAMESPACE).append("' ");
sb.append(" />");
deliver(sb.toString());
closeWebSocket();
}
}
private void sendPacketError(Element stanza, PacketError.Condition condition) {
Element reply = stanza.createCopy();
reply.addAttribute("type", "error");
reply.addAttribute("to", stanza.attributeValue("from"));
reply.addAttribute("from", stanza.attributeValue("to"));
reply.add(new PacketError(condition).getElement());
deliver(reply.asXML());
}
private synchronized void initializePool() {
if (readerPool == null) {
readerPool = new GenericObjectPool<XMPPPacketReader>(new XMPPPPacketReaderFactory());
readerPool.setMaxTotal(32);
readerPool.setTestOnReturn(true);
readerPool.setNumTestsPerEvictionRun(-2); // evict half of the idle instances
readerPool.setTimeBetweenEvictionRunsMillis(JiveConstants.HOUR);
}
}
//-- Keep-alive ping for idle peers
private final class PingTask extends TimerTask {
private boolean lastPingFailed = false;
@Override
public void run() {
if (!isWebSocketOpen()) {
TaskEngine.getInstance().cancelScheduledTask(pingTask);
} else {
long idleTime = System.currentTimeMillis() - JiveConstants.MINUTE;
if (xmppSession.getLastActiveDate().getTime() >= idleTime) {
return;
}
try {
// see https://tools.ietf.org/html/rfc6455#section-5.5.2
wsSession.getRemote().sendPing(null);
lastPingFailed = false;
} catch (IOException ioe) {
Log.error("Failed to ping remote peer: " + wsSession, ioe);
if (lastPingFailed) {
closeSession();
TaskEngine.getInstance().cancelScheduledTask(pingTask);
} else {
lastPingFailed = true;
}
}
}
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment