Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
O
Openfire
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
Openfire
Commits
ea7fdc80
Commit
ea7fdc80
authored
Jul 30, 2015
by
Dave Cridland
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #263 from tevans/OF-933
OF-933: Initial WebSocket implementation
parents
c2308294
189773d3
Changes
12
Show whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
962 additions
and
1 deletion
+962
-1
SessionPacketRouter.java
src/java/org/jivesoftware/openfire/SessionPacketRouter.java
+1
-1
changelog.html
src/plugins/websocket/changelog.html
+55
-0
commons-pool2-2.3.jar
src/plugins/websocket/lib/commons-pool2-2.3.jar
+0
-0
logo_large.gif
src/plugins/websocket/logo_large.gif
+0
-0
logo_small.gif
src/plugins/websocket/logo_small.gif
+0
-0
plugin.xml
src/plugins/websocket/plugin.xml
+15
-0
readme.html
src/plugins/websocket/readme.html
+75
-0
StreamManagementPacketRouter.java
...ware/openfire/websocket/StreamManagementPacketRouter.java
+90
-0
WebSocketConnection.java
.../jivesoftware/openfire/websocket/WebSocketConnection.java
+113
-0
WebSocketPlugin.java
.../org/jivesoftware/openfire/websocket/WebSocketPlugin.java
+142
-0
XMPPPPacketReaderFactory.java
...software/openfire/websocket/XMPPPPacketReaderFactory.java
+70
-0
XmppWebSocket.java
...va/org/jivesoftware/openfire/websocket/XmppWebSocket.java
+401
-0
No files found.
src/java/org/jivesoftware/openfire/SessionPacketRouter.java
View file @
ea7fdc80
...
...
@@ -35,7 +35,7 @@ import java.io.UnsupportedEncodingException;
*/
public
class
SessionPacketRouter
implements
PacketRouter
{
pr
ivate
LocalClientSession
session
;
pr
otected
LocalClientSession
session
;
private
PacketRouter
router
;
private
boolean
skipJIDValidation
=
false
;
...
...
src/plugins/websocket/changelog.html
0 → 100644
View file @
ea7fdc80
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head>
<title>
Openfire WebSocket Plugin Changelog
</title>
<style
type=
"text/css"
>
BODY
{
font-size
:
100%
;
}
BODY
,
TD
,
TH
{
font-family
:
tahoma
,
verdana
,
arial
,
helvetica
,
sans-serif
;
font-size
:
0.8em
;
}
H2
{
font-size
:
10pt
;
font-weight
:
bold
;
padding-left
:
1em
;
}
A
:hover
{
text-decoration
:
none
;
}
H1
{
font-family
:
tahoma
,
arial
,
helvetica
,
sans-serif
;
font-size
:
1.4em
;
font-weight
:
bold
;
border-bottom
:
1px
#ccc
solid
;
padding-bottom
:
2px
;
}
TT
{
font-family
:
courier
new
;
font-weight
:
bold
;
color
:
#060
;
}
PRE
{
font-family
:
courier
new
;
font-size
:
100%
;
}
</style>
</head>
<body>
<h1>
Openfire WebSocket Plugin Changelog
</h1>
<p><b>
1.0
</b>
-- July 28, 2015
</p>
<ul>
<li>
Initial release.
</li>
</ul>
</body>
</html>
\ No newline at end of file
src/plugins/websocket/lib/commons-pool2-2.3.jar
0 → 100644
View file @
ea7fdc80
File added
src/plugins/websocket/logo_large.gif
0 → 100644
View file @
ea7fdc80
1.01 KB
src/plugins/websocket/logo_small.gif
0 → 100644
View file @
ea7fdc80
917 Bytes
src/plugins/websocket/plugin.xml
0 → 100644
View file @
ea7fdc80
<?xml version="1.0" encoding="UTF-8"?>
<!--
Plugin configuration for the WebSocket plugin.
-->
<plugin>
<class>
org.jivesoftware.openfire.websocket.WebSocketPlugin
</class>
<name>
Openfire WebSocket
</name>
<description>
Provides WebSocket support for Openfire.
</description>
<author>
Tom Evans
</author>
<version>
1.0.0
</version>
<date>
07/28/2015
</date>
<url>
https://tools.ietf.org/html/rfc7395
</url>
<minServerVersion>
3.10.0
</minServerVersion>
</plugin>
\ No newline at end of file
src/plugins/websocket/readme.html
0 → 100644
View file @
ea7fdc80
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head>
<title>
Openfire WebSocket Plugin Readme
</title>
<style
type=
"text/css"
>
BODY
{
font-size
:
100%
;
}
BODY
,
TD
,
TH
{
font-family
:
tahoma
,
verdana
,
arial
,
helvetica
,
sans-serif
;
font-size
:
0.8em
;
}
H2
{
font-size
:
10pt
;
font-weight
:
bold
;
}
A
:hover
{
text-decoration
:
none
;
}
H1
{
font-family
:
tahoma
,
arial
,
helvetica
,
sans-serif
;
font-size
:
1.4em
;
font-weight
:
bold
;
border-bottom
:
1px
#ccc
solid
;
padding-bottom
:
2px
;
}
TT
{
font-family
:
courier
new
;
font-weight
:
bold
;
color
:
#060
;
}
PRE
{
font-family
:
courier
new
;
font-size
:
100%
;
}
</style>
</head>
<body>
<h1>
Openfire WebSocket Plugin Readme
</h1>
<h2>
Overview
</h2>
<p>
This plugin extends Openfire to support WebSocket. The implementation follows the XMPP WebSocket subprotocol
(
<a
href=
"https://tools.ietf.org/html/rfc7395"
>
RFC 7395
</a>
) specification, which is a standard extension of the
WebSocket protocol specification (
<a
href=
"https://tools.ietf.org/html/rfc6455"
>
RFC 6455
</a>
).
</p>
<p>
Note that the BOSH (http-bind) capabilities of Openfire must be enabled and correctly configured as a
prerequisite before installing this plugin. The WebSocket servlet is installed within the same context
as the BOSH component, and will reuse the same HTTP/S port(s) when establishing the WebSocket connection.
</p>
<h2>
Installation
</h2>
<p>
Copy websocket.jar into the plugins directory of your Openfire installation. The
plugin will then be automatically deployed. To upgrade to a new version, copy the new
websocket.jar file over the existing file.
</p>
<p>
Upon installation, the WebSocket URI path will be /ws/ on the same server/port as the BOSH
connector. To establish a secure WebSocket, modify the following URL as appropriate:
</p>
<pre>
wss://your.openfire.host:7443/ws/
</pre>
</body>
</html>
src/plugins/websocket/src/java/org/jivesoftware/openfire/websocket/StreamManagementPacketRouter.java
0 → 100644
View file @
ea7fdc80
/**
* Copyright (C) 2015 Tom Evans. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
jivesoftware
.
openfire
.
websocket
;
import
java.io.UnsupportedEncodingException
;
import
org.dom4j.Element
;
import
org.jivesoftware.openfire.SessionPacketRouter
;
import
org.jivesoftware.openfire.multiplex.UnknownStanzaException
;
import
org.jivesoftware.openfire.session.LocalClientSession
;
import
org.jivesoftware.openfire.streammanagement.StreamManager
;
import
org.jivesoftware.util.JiveGlobals
;
/**
* This class extends Openfire's session packet router with the ACK capabilities
* specified by XEP-0198: Stream Management.
*
* NOTE: This class does NOT support the XEP-0198 stream resumption capabilities.
*
* XEP-0198 allows either party (client or server) to send unsolicited ack/answer
* stanzas on a periodic basis. This implementation approximates BOSH ack behavior
* by sending unsolicited <a /> stanzas from the server to the client after a
* configurable number of stanzas have been received from the client.
*
* Setting the system property to "1" would indicate that each client packet should
* be ack'd by the server when stream management is enabled for a particular stream.
* To disable unsolicited server acks, use the default value for system property
* "stream.management.unsolicitedAckFrequency" ("0"). This setting does not affect
* server responses to explicit ack requests from the client.
*/
public
class
StreamManagementPacketRouter
extends
SessionPacketRouter
{
public
static
final
String
SM_UNSOLICITED_ACK_FREQUENCY
=
"stream.management.unsolicitedAckFrequency"
;
static
{
JiveGlobals
.
migrateProperty
(
SM_UNSOLICITED_ACK_FREQUENCY
);
}
private
int
unsolicitedAckFrequency
=
JiveGlobals
.
getIntProperty
(
SM_UNSOLICITED_ACK_FREQUENCY
,
0
);
public
StreamManagementPacketRouter
(
LocalClientSession
session
)
{
super
(
session
);
}
@Override
public
void
route
(
Element
wrappedElement
)
throws
UnsupportedEncodingException
,
UnknownStanzaException
{
String
tag
=
wrappedElement
.
getName
();
if
(
StreamManager
.
NAMESPACE_V3
.
equals
(
wrappedElement
.
getNamespace
().
getStringValue
()))
{
switch
(
tag
)
{
case
"enable"
:
session
.
enableStreamMangement
(
wrappedElement
);
break
;
case
"r"
:
session
.
getStreamManager
().
sendServerAcknowledgement
();
break
;
case
"a"
:
session
.
getStreamManager
().
processClientAcknowledgement
(
wrappedElement
);
break
;
default
:
session
.
getStreamManager
().
sendUnexpectedError
();
}
}
else
{
super
.
route
(
wrappedElement
);
if
(
isUnsolicitedAckExpected
())
{
session
.
getStreamManager
().
sendServerAcknowledgement
();
}
}
}
private
boolean
isUnsolicitedAckExpected
()
{
if
(!
session
.
getStreamManager
().
isEnabled
())
{
return
false
;
}
return
unsolicitedAckFrequency
>
0
&&
session
.
getNumClientPackets
()
%
unsolicitedAckFrequency
==
0
;
}
}
src/plugins/websocket/src/java/org/jivesoftware/openfire/websocket/WebSocketConnection.java
0 → 100644
View file @
ea7fdc80
/**
* Copyright (C) 2015 Tom Evans. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
jivesoftware
.
openfire
.
websocket
;
import
java.net.InetSocketAddress
;
import
org.dom4j.Namespace
;
import
org.jivesoftware.openfire.PacketDeliverer
;
import
org.jivesoftware.openfire.auth.UnauthorizedException
;
import
org.jivesoftware.openfire.net.VirtualConnection
;
import
org.jivesoftware.openfire.nio.OfflinePacketDeliverer
;
import
org.xmpp.packet.Packet
;
import
org.xmpp.packet.StreamError
;
/**
* Following the conventions of the BOSH implementation, this class extends {@link VirtualConnection}
* and delegates the expected XMPP connection behaviors to the corresponding {@link XmppWebSocket}.
*/
public
class
WebSocketConnection
extends
VirtualConnection
{
private
static
final
String
CLIENT_NAMESPACE
=
"jabber:client"
;
private
InetSocketAddress
remotePeer
;
private
XmppWebSocket
socket
;
private
PacketDeliverer
backupDeliverer
;
public
WebSocketConnection
(
XmppWebSocket
socket
,
InetSocketAddress
remotePeer
)
{
this
.
socket
=
socket
;
this
.
remotePeer
=
remotePeer
;
}
@Override
public
void
closeVirtualConnection
()
{
socket
.
closeSession
();
}
@Override
public
byte
[]
getAddress
()
{
return
remotePeer
.
getAddress
().
getAddress
();
}
@Override
public
String
getHostAddress
()
{
return
remotePeer
.
getAddress
().
getHostAddress
();
}
@Override
public
String
getHostName
()
{
return
remotePeer
.
getHostName
();
}
@Override
public
void
systemShutdown
()
{
deliverRawText
(
new
StreamError
(
StreamError
.
Condition
.
system_shutdown
).
toXML
());
close
();
}
@Override
public
void
deliver
(
Packet
packet
)
throws
UnauthorizedException
{
if
(
Namespace
.
NO_NAMESPACE
.
equals
(
packet
.
getElement
().
getNamespace
()))
{
packet
.
getElement
().
add
(
Namespace
.
get
(
CLIENT_NAMESPACE
));
}
if
(
validate
())
{
deliverRawText
(
packet
.
toXML
());
}
else
{
// use fallback delivery mechanism (offline)
getPacketDeliverer
().
deliver
(
packet
);
}
}
@Override
public
void
deliverRawText
(
String
text
)
{
socket
.
deliver
(
text
);
}
@Override
public
boolean
validate
()
{
return
socket
.
isWebSocketOpen
();
}
@Override
public
boolean
isSecure
()
{
return
socket
.
isWebSocketSecure
();
}
@Override
public
PacketDeliverer
getPacketDeliverer
()
{
if
(
backupDeliverer
==
null
)
{
backupDeliverer
=
new
OfflinePacketDeliverer
();
}
return
backupDeliverer
;
}
@Override
public
boolean
isCompressed
()
{
return
XmppWebSocket
.
isCompressionEnabled
();
}
}
src/plugins/websocket/src/java/org/jivesoftware/openfire/websocket/WebSocketPlugin.java
0 → 100644
View file @
ea7fdc80
/**
* Copyright (C) 2015 Tom Evans. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
jivesoftware
.
openfire
.
websocket
;
import
java.io.File
;
import
java.text.MessageFormat
;
import
org.eclipse.jetty.server.handler.ContextHandlerCollection
;
import
org.eclipse.jetty.servlet.ServletContextHandler
;
import
org.eclipse.jetty.servlet.ServletHolder
;
import
org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension
;
import
org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest
;
import
org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse
;
import
org.eclipse.jetty.websocket.servlet.WebSocketCreator
;
import
org.eclipse.jetty.websocket.servlet.WebSocketServlet
;
import
org.eclipse.jetty.websocket.servlet.WebSocketServletFactory
;
import
org.jivesoftware.openfire.SessionManager
;
import
org.jivesoftware.openfire.XMPPServer
;
import
org.jivesoftware.openfire.container.Plugin
;
import
org.jivesoftware.openfire.container.PluginClassLoader
;
import
org.jivesoftware.openfire.container.PluginManager
;
import
org.jivesoftware.openfire.http.HttpBindManager
;
import
org.jivesoftware.openfire.session.ClientSession
;
import
org.jivesoftware.openfire.session.LocalSession
;
import
org.jivesoftware.util.JiveGlobals
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* This plugin enables XMPP over WebSocket (RFC 7395) for Openfire.
*
* The Jetty WebSocketServlet serves as a base class and enables easy integration into the
* BOSH (http-bind) web context. Each WebSocket request received at the "/ws/" URI will be
* forwarded to this plugin/servlet, which will in turn create a new {@link XmppWebSocket}
* for each new connection.
*/
public
class
WebSocketPlugin
extends
WebSocketServlet
implements
Plugin
{
private
static
final
long
serialVersionUID
=
7281841492829464603L
;
private
static
final
Logger
Log
=
LoggerFactory
.
getLogger
(
WebSocketPlugin
.
class
);
private
ServletContextHandler
contextHandler
;
protected
PluginClassLoader
pluginClassLoader
=
null
;
@Override
public
void
initializePlugin
(
final
PluginManager
manager
,
final
File
pluginDirectory
)
{
if
(
Boolean
.
valueOf
(
JiveGlobals
.
getBooleanProperty
(
HttpBindManager
.
HTTP_BIND_ENABLED
,
true
)))
{
Log
.
info
(
String
.
format
(
"Initializing websocket plugin"
));
try
{
ContextHandlerCollection
contexts
=
HttpBindManager
.
getInstance
().
getContexts
();
contextHandler
=
new
ServletContextHandler
(
contexts
,
"/ws"
,
ServletContextHandler
.
SESSIONS
);
contextHandler
.
addServlet
(
new
ServletHolder
(
this
),
"/*"
);
}
catch
(
Exception
e
)
{
Log
.
error
(
"Failed to start websocket plugin"
,
e
);
}
}
else
{
Log
.
warn
(
"Failed to start websocket plugin; http-bind is disabled"
);
}
}
@Override
public
void
destroyPlugin
()
{
// terminate any active websocket sessions
SessionManager
sm
=
XMPPServer
.
getInstance
().
getSessionManager
();
for
(
ClientSession
session
:
sm
.
getSessions
())
{
if
(
session
instanceof
LocalSession
)
{
Object
ws
=
((
LocalSession
)
session
).
getSessionData
(
"ws"
);
if
(
ws
!=
null
&&
(
Boolean
)
ws
)
{
session
.
close
();
}
}
}
ContextHandlerCollection
contexts
=
HttpBindManager
.
getInstance
().
getContexts
();
contexts
.
removeHandler
(
contextHandler
);
contextHandler
=
null
;
pluginClassLoader
=
null
;
}
@Override
public
void
configure
(
WebSocketServletFactory
factory
)
{
if
(
XmppWebSocket
.
isCompressionEnabled
())
{
factory
.
getExtensionFactory
().
register
(
"permessage-deflate"
,
PerMessageDeflateExtension
.
class
);
}
factory
.
setCreator
(
new
WebSocketCreator
()
{
@Override
public
Object
createWebSocket
(
ServletUpgradeRequest
req
,
ServletUpgradeResponse
resp
)
{
ClassLoader
ccl
=
Thread
.
currentThread
().
getContextClassLoader
();
try
{
ClassLoader
pcl
=
getPluginClassLoader
();
Thread
.
currentThread
().
setContextClassLoader
(
pcl
==
null
?
ccl
:
pcl
);
for
(
String
subprotocol
:
req
.
getSubProtocols
())
{
if
(
"xmpp"
.
equals
(
subprotocol
))
{
resp
.
setAcceptedSubProtocol
(
subprotocol
);
return
new
XmppWebSocket
();
}
}
}
catch
(
Exception
e
)
{
Log
.
warn
(
MessageFormat
.
format
(
"Unable to load websocket factory: {0} ({1})"
,
e
.
getClass
().
getName
(),
e
.
getMessage
()));
}
finally
{
Thread
.
currentThread
().
setContextClassLoader
(
ccl
);
}
Log
.
warn
(
"Failed to create websocket: "
+
req
);
return
null
;
}
});
}
protected
synchronized
PluginClassLoader
getPluginClassLoader
()
{
PluginManager
pm
=
XMPPServer
.
getInstance
().
getPluginManager
();
if
(
pluginClassLoader
==
null
)
{
pluginClassLoader
=
pm
.
getPluginClassloader
(
this
);
}
// report error if plugin is unavailable
if
(
pluginClassLoader
==
null
)
{
Log
.
error
(
"Unable to find class loader for websocket plugin"
);
}
return
pluginClassLoader
;
}
}
src/plugins/websocket/src/java/org/jivesoftware/openfire/websocket/XMPPPPacketReaderFactory.java
0 → 100644
View file @
ea7fdc80
/**
* Copyright (C) 2015 Tom Evans. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
jivesoftware
.
openfire
.
websocket
;
import
org.apache.commons.pool2.BasePooledObjectFactory
;
import
org.apache.commons.pool2.PooledObject
;
import
org.apache.commons.pool2.impl.DefaultPooledObject
;
import
org.dom4j.io.XMPPPacketReader
;
import
org.jivesoftware.openfire.net.MXParser
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.xmlpull.v1.XmlPullParserException
;
import
org.xmlpull.v1.XmlPullParserFactory
;
public
class
XMPPPPacketReaderFactory
extends
BasePooledObjectFactory
<
XMPPPacketReader
>
{
private
static
Logger
Log
=
LoggerFactory
.
getLogger
(
XMPPPPacketReaderFactory
.
class
);
private
static
XmlPullParserFactory
xppFactory
=
null
;
static
{
try
{
xppFactory
=
XmlPullParserFactory
.
newInstance
(
MXParser
.
class
.
getName
(),
null
);
xppFactory
.
setNamespaceAware
(
true
);
}
catch
(
XmlPullParserException
e
)
{
Log
.
error
(
"Error creating a parser factory"
,
e
);
}
}
//-- BasePooledObjectFactory implementation
@Override
public
XMPPPacketReader
create
()
throws
Exception
{
XMPPPacketReader
parser
=
new
XMPPPacketReader
();
parser
.
setXPPFactory
(
xppFactory
);
return
parser
;
}
@Override
public
PooledObject
<
XMPPPacketReader
>
wrap
(
XMPPPacketReader
reader
)
{
return
new
DefaultPooledObject
<
XMPPPacketReader
>(
reader
);
}
@Override
public
boolean
validateObject
(
PooledObject
<
XMPPPacketReader
>
po
)
{
// reset the input for the pooled parser
try
{
po
.
getObject
().
getXPPParser
().
resetInput
();
return
true
;
}
catch
(
XmlPullParserException
xppe
)
{
Log
.
error
(
"Failed to reset pooled parser; evicting from pool"
,
xppe
);
return
false
;
}
}
}
src/plugins/websocket/src/java/org/jivesoftware/openfire/websocket/XmppWebSocket.java
0 → 100644
View file @
ea7fdc80
/**
* Copyright (C) 2015 Tom Evans. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
jivesoftware
.
openfire
.
websocket
;
import
java.io.IOException
;
import
java.io.StringReader
;
import
java.util.TimerTask
;
import
org.apache.commons.pool2.impl.GenericObjectPool
;
import
org.dom4j.Document
;
import
org.dom4j.Element
;
import
org.dom4j.io.XMPPPacketReader
;
import
org.eclipse.jetty.websocket.api.RemoteEndpoint
;
import
org.eclipse.jetty.websocket.api.Session
;
import
org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose
;
import
org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect
;
import
org.eclipse.jetty.websocket.api.annotations.OnWebSocketError
;
import
org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage
;
import
org.eclipse.jetty.websocket.api.annotations.WebSocket
;
import
org.jivesoftware.openfire.Connection
;
import
org.jivesoftware.openfire.SessionManager
;
import
org.jivesoftware.openfire.SessionPacketRouter
;
import
org.jivesoftware.openfire.XMPPServer
;
import
org.jivesoftware.openfire.multiplex.UnknownStanzaException
;
import
org.jivesoftware.openfire.net.SASLAuthentication
;
import
org.jivesoftware.openfire.net.SASLAuthentication.Status
;
import
org.jivesoftware.openfire.session.ConnectionSettings
;
import
org.jivesoftware.openfire.session.LocalClientSession
;
import
org.jivesoftware.util.JiveConstants
;
import
org.jivesoftware.util.JiveGlobals
;
import
org.jivesoftware.util.TaskEngine
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.xmpp.packet.PacketError
;
import
org.xmpp.packet.StreamError
;
/**
* This class handles all WebSocket events for the corresponding connection with a remote peer.
* Specifically the XMPP session is managed concurrently with the WebSocket session, including all
* framing and authentication requirements. Packets received from the remote peer are forwarded as
* needed via a {@link SessionPacketRouter}, and packets destined for the remote peer are forwarded
* via the corresponding {@link RemoteEndpoint}.
*/
@WebSocket
public
class
XmppWebSocket
{
private
static
final
String
STREAM_HEADER
=
"open"
;
private
static
final
String
STREAM_FOOTER
=
"close"
;
private
static
final
String
FRAMING_NAMESPACE
=
"urn:ietf:params:xml:ns:xmpp-framing"
;
private
static
Logger
Log
=
LoggerFactory
.
getLogger
(
XmppWebSocket
.
class
);
private
static
GenericObjectPool
<
XMPPPacketReader
>
readerPool
;
private
SessionPacketRouter
router
;
private
Session
wsSession
;
private
WebSocketConnection
wsConnection
;
private
LocalClientSession
xmppSession
;
private
boolean
startedSASL
=
false
;
private
Status
saslStatus
;
private
TimerTask
pingTask
;
public
XmppWebSocket
()
{
if
(
readerPool
==
null
)
{
initializePool
();
}
}
// WebSocket event handlers
@OnWebSocketConnect
public
void
onConnect
(
Session
session
)
{
wsSession
=
session
;
wsConnection
=
new
WebSocketConnection
(
this
,
session
.
getRemoteAddress
());
pingTask
=
new
PingTask
();
TaskEngine
.
getInstance
().
schedule
(
pingTask
,
JiveConstants
.
MINUTE
,
JiveConstants
.
MINUTE
);
}
@OnWebSocketClose
public
void
onClose
(
int
statusCode
,
String
reason
)
{
closeSession
();
}
@OnWebSocketMessage
public
void
onTextMethod
(
String
stanza
)
{
try
{
XMPPPacketReader
reader
=
readerPool
.
borrowObject
();
Document
doc
=
reader
.
read
(
new
StringReader
(
stanza
));
readerPool
.
returnObject
(
reader
);
if
(
xmppSession
==
null
)
{
initiateSession
(
doc
.
getRootElement
());
}
else
{
processStanza
(
doc
.
getRootElement
());
}
}
catch
(
Exception
ex
)
{
Log
.
error
(
"Failed to initiate XMPP session"
,
ex
);
}
}
@OnWebSocketError
public
void
onError
(
Throwable
error
)
{
Log
.
error
(
"Error detected; session: "
+
wsSession
,
error
);
closeStream
(
new
StreamError
(
StreamError
.
Condition
.
internal_server_error
));
try
{
wsSession
.
disconnect
();
}
catch
(
Exception
e
)
{
Log
.
error
(
"Error disconnecting websocket"
,
e
);
}
}
// local (package) visibility
boolean
isWebSocketOpen
()
{
return
wsSession
!=
null
&&
wsSession
.
isOpen
();
}
boolean
isWebSocketSecure
()
{
return
wsSession
!=
null
&&
wsSession
.
isSecure
();
}
void
closeWebSocket
()
{
if
(
isWebSocketOpen
())
{
wsSession
.
close
();
}
wsSession
=
null
;
}
void
closeSession
()
{
if
(
isWebSocketOpen
())
{
closeStream
(
null
);
}
if
(
xmppSession
!=
null
)
{
xmppSession
.
close
();
SessionManager
.
getInstance
().
removeSession
(
xmppSession
);
xmppSession
=
null
;
}
}
/**
* Send an XML packet to the remote peer
*
* @param packet XML to be sent to client
*/
void
deliver
(
String
packet
)
{
if
(
isWebSocketOpen
())
{
try
{
xmppSession
.
incrementServerPacketCount
();
wsSession
.
getRemote
().
sendStringByFuture
(
packet
);
}
catch
(
Exception
e
)
{
Log
.
error
(
"Packet delivery failed; session: "
+
wsSession
,
e
);
Log
.
warn
(
"Failed to deliver packet:\n"
+
packet
);
}
}
else
{
Log
.
warn
(
"Failed to deliver packet; socket is closed:\n"
+
packet
);
}
}
static
boolean
isCompressionEnabled
()
{
return
JiveGlobals
.
getProperty
(
ConnectionSettings
.
Client
.
COMPRESSION_SETTINGS
,
Connection
.
CompressionPolicy
.
optional
.
toString
())
.
equalsIgnoreCase
(
Connection
.
CompressionPolicy
.
optional
.
toString
());
}
// helper/utility methods
/*
* Process stream headers/footers and authentication stanzas locally;
* otherwise delegate stanza handling to the session packet router.
*/
private
void
processStanza
(
Element
stanza
)
{
try
{
String
tag
=
stanza
.
getName
();
if
(
STREAM_FOOTER
.
equals
(
tag
))
{
closeStream
(
null
);
}
else
if
(
"auth"
.
equals
(
tag
))
{
// User is trying to authenticate using SASL
startedSASL
=
true
;
// Process authentication stanza
xmppSession
.
incrementClientPacketCount
();
saslStatus
=
SASLAuthentication
.
handle
(
xmppSession
,
stanza
);
}
else
if
(
startedSASL
&&
"response"
.
equals
(
tag
)
||
"abort"
.
equals
(
tag
))
{
// User is responding to SASL challenge. Process response
xmppSession
.
incrementClientPacketCount
();
saslStatus
=
SASLAuthentication
.
handle
(
xmppSession
,
stanza
);
}
else
if
(
STREAM_HEADER
.
equals
(
tag
))
{
// restart the stream
openStream
(
stanza
.
attributeValue
(
"lang"
,
"en"
),
stanza
.
attributeValue
(
"from"
));
configureStream
();
}
else
if
(
Status
.
authenticated
.
equals
(
saslStatus
))
{
if
(
router
==
null
)
{
if
(
isStreamManagementAvailable
())
{
router
=
new
StreamManagementPacketRouter
(
xmppSession
);
}
else
{
// fall back for older Openfire installations
router
=
new
SessionPacketRouter
(
xmppSession
);
}
}
router
.
route
(
stanza
);
}
else
{
// require authentication
Log
.
warn
(
"Not authorized: "
+
stanza
.
asXML
());
sendPacketError
(
stanza
,
PacketError
.
Condition
.
not_authorized
);
}
}
catch
(
UnknownStanzaException
use
)
{
Log
.
warn
(
"Received invalid stanza: "
+
stanza
.
asXML
());
sendPacketError
(
stanza
,
PacketError
.
Condition
.
bad_request
);
}
catch
(
Exception
ex
)
{
Log
.
error
(
"Failed to process incoming stanza: "
+
stanza
.
asXML
(),
ex
);
closeStream
(
new
StreamError
(
StreamError
.
Condition
.
internal_server_error
));
}
}
/*
* Initiate the stream and corresponding XMPP session.
*/
private
void
initiateSession
(
Element
stanza
)
{
String
host
=
stanza
.
attributeValue
(
"to"
);
StreamError
streamError
=
null
;
if
(
STREAM_FOOTER
.
equals
(
stanza
.
getName
()))
{
// an error occurred while setting up the session
closeStream
(
null
);
}
else
if
(!
STREAM_HEADER
.
equals
(
stanza
.
getName
()))
{
streamError
=
new
StreamError
(
StreamError
.
Condition
.
unsupported_stanza_type
);
Log
.
warn
(
"Closing session due to incorrect stream header. Tag: "
+
stanza
.
getName
());
}
else
if
(!
FRAMING_NAMESPACE
.
equals
(
stanza
.
getNamespace
().
getURI
()))
{
// Validate the stream namespace (https://tools.ietf.org/html/rfc7395#section-3.3.2)
streamError
=
new
StreamError
(
StreamError
.
Condition
.
invalid_namespace
);
Log
.
warn
(
"Closing session due to invalid namespace in stream header. Namespace: "
+
stanza
.
getNamespace
().
getURI
());
}
else
if
(
STREAM_FOOTER
.
equals
(
stanza
.
getName
()))
{
closeStream
(
null
);
}
else
if
(!
validateHost
(
host
))
{
streamError
=
new
StreamError
(
StreamError
.
Condition
.
host_unknown
);
Log
.
warn
(
"Closing session due to incorrect hostname in stream header. Host: "
+
host
);
}
else
{
xmppSession
=
SessionManager
.
getInstance
().
createClientSession
(
wsConnection
);
xmppSession
.
setSessionData
(
"ws"
,
Boolean
.
TRUE
);
}
if
(
streamError
==
null
)
{
openStream
(
stanza
.
attributeValue
(
"lang"
,
"en"
),
stanza
.
attributeValue
(
"from"
));
configureStream
();
}
else
{
closeStream
(
streamError
);
}
}
private
boolean
validateHost
(
String
host
)
{
boolean
result
=
true
;
if
(
JiveGlobals
.
getBooleanProperty
(
"xmpp.client.validate.host"
,
false
))
{
result
=
XMPPServer
.
getInstance
().
getServerInfo
().
getXMPPDomain
().
equals
(
host
);
}
return
result
;
}
/*
* Prepare response for stream initiation (sasl) or stream restart (features).
*/
private
void
configureStream
()
{
StringBuilder
sb
=
new
StringBuilder
(
250
);
sb
.
append
(
"<stream:features xmlns:stream='http://etherx.jabber.org/streams'>"
);
if
(
saslStatus
==
null
)
{
// Include available SASL Mechanisms
sb
.
append
(
SASLAuthentication
.
getSASLMechanisms
(
xmppSession
));
sb
.
append
(
"<auth xmlns='http://jabber.org/features/iq-auth'/>"
);
}
else
if
(
saslStatus
.
equals
(
Status
.
authenticated
))
{
// Include Stream features
sb
.
append
(
String
.
format
(
"<bind xmlns='%s'/>"
,
"urn:ietf:params:xml:ns:xmpp-bind"
));
sb
.
append
(
String
.
format
(
"<session xmlns='%s'><optional/></session>"
,
"urn:ietf:params:xml:ns:xmpp-session"
));
if
(
isStreamManagementAvailable
())
{
sb
.
append
(
String
.
format
(
"<sm xmlns='%s'/>"
,
"urn:xmpp:sm:3"
));
}
}
sb
.
append
(
"</stream:features>"
);
deliver
(
sb
.
toString
());
}
private
void
openStream
(
String
lang
,
String
jid
)
{
xmppSession
.
incrementClientPacketCount
();
StringBuilder
sb
=
new
StringBuilder
(
250
);
sb
.
append
(
"<open "
);
if
(
jid
!=
null
)
{
sb
.
append
(
"to='"
).
append
(
jid
).
append
(
"' "
);
}
sb
.
append
(
"from='"
).
append
(
XMPPServer
.
getInstance
().
getServerInfo
().
getXMPPDomain
()).
append
(
"' "
);
sb
.
append
(
"id='"
).
append
(
xmppSession
.
getStreamID
().
toString
()).
append
(
"' "
);
sb
.
append
(
"xmlns='"
).
append
(
FRAMING_NAMESPACE
).
append
(
"' "
);
sb
.
append
(
"lang='"
).
append
(
lang
).
append
(
"' "
);
sb
.
append
(
"version='1.0' />"
);
deliver
(
sb
.
toString
());
}
private
void
closeStream
(
StreamError
streamError
)
{
if
(
isWebSocketOpen
())
{
if
(
streamError
!=
null
)
{
deliver
(
streamError
.
toXML
());
}
StringBuilder
sb
=
new
StringBuilder
(
250
);
sb
.
append
(
"<close "
);
sb
.
append
(
"xmlns='"
).
append
(
FRAMING_NAMESPACE
).
append
(
"' "
);
sb
.
append
(
" />"
);
deliver
(
sb
.
toString
());
closeWebSocket
();
}
}
private
void
sendPacketError
(
Element
stanza
,
PacketError
.
Condition
condition
)
{
Element
reply
=
stanza
.
createCopy
();
reply
.
addAttribute
(
"type"
,
"error"
);
reply
.
addAttribute
(
"to"
,
stanza
.
attributeValue
(
"from"
));
reply
.
addAttribute
(
"from"
,
stanza
.
attributeValue
(
"to"
));
reply
.
add
(
new
PacketError
(
condition
).
getElement
());
deliver
(
reply
.
asXML
());
}
private
synchronized
void
initializePool
()
{
if
(
readerPool
==
null
)
{
readerPool
=
new
GenericObjectPool
<
XMPPPacketReader
>(
new
XMPPPPacketReaderFactory
());
readerPool
.
setMaxTotal
(
32
);
readerPool
.
setTestOnReturn
(
true
);
readerPool
.
setNumTestsPerEvictionRun
(-
2
);
// evict half of the idle instances
readerPool
.
setTimeBetweenEvictionRunsMillis
(
JiveConstants
.
HOUR
);
}
}
private
boolean
isStreamManagementAvailable
()
{
try
{
// use reflection to determine whether stream management is supported
Class
.
forName
(
"org.jivesoftware.openfire.streammanagement.StreamManager"
);
return
JiveGlobals
.
getBooleanProperty
(
"stream.management.active"
,
true
);
}
catch
(
ClassNotFoundException
cnfe
)
{
return
false
;
}
}
//-- Keep-alive ping for idle peers
private
final
class
PingTask
extends
TimerTask
{
private
boolean
lastPingFailed
=
false
;
@Override
public
void
run
()
{
if
(!
isWebSocketOpen
())
{
TaskEngine
.
getInstance
().
cancelScheduledTask
(
pingTask
);
}
else
{
long
idleTime
=
System
.
currentTimeMillis
()
-
JiveConstants
.
MINUTE
;
if
(
xmppSession
.
getLastActiveDate
().
getTime
()
>=
idleTime
)
{
return
;
}
try
{
// see https://tools.ietf.org/html/rfc6455#section-5.5.2
wsSession
.
getRemote
().
sendPing
(
null
);
lastPingFailed
=
false
;
}
catch
(
IOException
ioe
)
{
Log
.
error
(
"Failed to ping remote peer: "
+
wsSession
,
ioe
);
if
(
lastPingFailed
)
{
closeSession
();
TaskEngine
.
getInstance
().
cancelScheduledTask
(
pingTask
);
}
else
{
lastPingFailed
=
true
;
}
}
}
}
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment