Commit 957c54e4 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Refactoring wortk. JM-924

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@6534 b35dd754-fafc-0310-a699-88a17e54d16e
parent aa39b39a
/**
* $RCSfile$
* $Revision: 3187 $
* $Date: 2005-12-11 13:34:34 -0300 (Sun, 11 Dec 2005) $
*
* Copyright (C) 2004 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.auth.AuthToken;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.jivesoftware.wildfire.net.SSLConfig;
import org.jivesoftware.wildfire.net.SocketConnection;
import org.jivesoftware.wildfire.privacy.PrivacyList;
import org.jivesoftware.wildfire.privacy.PrivacyListManager;
import org.jivesoftware.wildfire.user.PresenceEventDispatcher;
import org.jivesoftware.wildfire.user.UserNotFoundException;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;
import org.xmpp.packet.StreamError;
import java.io.IOException;
import java.io.Writer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
/**
* Represents a session between the server and a client.
*
* @author Gaston Dombiak
*/
public class ClientSession extends Session {
private static final String ETHERX_NAMESPACE = "http://etherx.jabber.org/streams";
private static final String FLASH_NAMESPACE = "http://www.jabber.com/streams/flash";
/**
* Keep the list of IP address that are allowed to connect to the server. If the list is
* empty then anyone is allowed to connect to the server.<p>
*
* Note: Key = IP address or IP range; Value = empty string. A hash map is being used for
* performance reasons.
*/
private static Map<String,String> allowedIPs = new HashMap<String,String>();
private static Connection.TLSPolicy tlsPolicy;
private static Connection.CompressionPolicy compressionPolicy;
/**
* Milliseconds a connection has to be idle to be closed. Default is 30 minutes. Sending
* stanzas to the client is not considered as activity. We are only considering the connection
* active when the client sends some data or hearbeats (i.e. whitespaces) to the server.
* The reason for this is that sending data will fail if the connection is closed. And if
* the thread is blocked while sending data (because the socket is closed) then the clean up
* thread will close the socket anyway.
*/
private static long idleTimeout;
/**
* The authentication token for this session.
*/
protected AuthToken authToken;
/**
* Flag indicating if this session has been initialized yet (upon first available transition).
*/
private boolean initialized;
/**
* Flag that indicates if the session was available ever.
*/
private boolean wasAvailable = false;
/**
* Flag indicating if the user requested to not receive offline messages when sending
* an available presence. The user may send a disco request with node
* "http://jabber.org/protocol/offline" so that no offline messages are sent to the
* user when he becomes online. If the user is connected from many resources then
* if one of the sessions stopped the flooding then no session should flood the user.
*/
private boolean offlineFloodStopped = false;
private Presence presence = null;
private int conflictCount = 0;
/**
* Privacy list that overrides the default privacy list. This list affects only this
* session and only for the duration of the session.
*/
private PrivacyList activeList;
/**
* Default privacy list used for the session's user. This list is processed if there
* is no active list set for the session.
*/
private PrivacyList defaultList;
static {
// Fill out the allowedIPs with the system property
String allowed = JiveGlobals.getProperty("xmpp.client.login.allowed", "");
StringTokenizer tokens = new StringTokenizer(allowed, ", ");
while (tokens.hasMoreTokens()) {
String address = tokens.nextToken().trim();
allowedIPs.put(address, "");
}
// Set the TLS policy stored as a system property
String policyName = JiveGlobals.getProperty("xmpp.client.tls.policy",
Connection.TLSPolicy.optional.toString());
tlsPolicy = Connection.TLSPolicy.valueOf(policyName);
// Set the Compression policy stored as a system property
policyName = JiveGlobals.getProperty("xmpp.client.compression.policy",
Connection.CompressionPolicy.optional.toString());
compressionPolicy = Connection.CompressionPolicy.valueOf(policyName);
// Set the default read idle timeout. If none was set then assume 30 minutes
idleTimeout = JiveGlobals.getIntProperty("xmpp.client.idle", 30 * 60 * 1000);
}
/**
* Returns a newly created session between the server and a client. The session will
* be created and returned only if correct name/prefix (i.e. 'stream' or 'flash')
* and namespace were provided by the client.
*
* @param serverName the name of the server where the session is connecting to.
* @param reader the reader that is reading the provided XML through the connection.
* @param connection the connection with the client.
* @return a newly created session between the server and a client.
*/
public static Session createSession(String serverName, XMPPPacketReader reader,
SocketConnection connection) throws XmlPullParserException, UnauthorizedException,
IOException
{
XmlPullParser xpp = reader.getXPPParser();
boolean isFlashClient = xpp.getPrefix().equals("flash");
connection.setFlashClient(isFlashClient);
// Conduct error checking, the opening tag should be 'stream'
// in the 'etherx' namespace
if (!xpp.getName().equals("stream") && !isFlashClient) {
throw new XmlPullParserException(
LocaleUtils.getLocalizedString("admin.error.bad-stream"));
}
if (!xpp.getNamespace(xpp.getPrefix()).equals(ETHERX_NAMESPACE) &&
!(isFlashClient && xpp.getNamespace(xpp.getPrefix()).equals(FLASH_NAMESPACE)))
{
throw new XmlPullParserException(LocaleUtils.getLocalizedString(
"admin.error.bad-namespace"));
}
if (!allowedIPs.isEmpty()) {
// The server is using a whitelist so check that the IP address of the client
// is authorized to connect to the server
if (!allowedIPs.containsKey(connection.getInetAddress().getHostAddress())) {
byte[] address = connection.getInetAddress().getAddress();
String range1 = (address[0] & 0xff) + "." + (address[1] & 0xff) + "." +
(address[2] & 0xff) +
".*";
String range2 = (address[0] & 0xff) + "." + (address[1] & 0xff) + ".*.*";
String range3 = (address[0] & 0xff) + ".*.*.*";
if (!allowedIPs.containsKey(range1) && !allowedIPs.containsKey(range2) &&
!allowedIPs.containsKey(range3)) {
// Client cannot connect from this IP address so end the stream and
// TCP connection
Log.debug("Closed connection to client attempting to connect from: " +
connection.getInetAddress().getHostAddress());
// Include the not-authorized error in the response
StreamError error = new StreamError(StreamError.Condition.not_authorized);
connection.deliverRawText(error.toXML());
// Close the underlying connection
connection.close();
return null;
}
}
}
// Default language is English ("en").
String language = "en";
// Default to a version of "0.0". Clients written before the XMPP 1.0 spec may
// not report a version in which case "0.0" should be assumed (per rfc3920
// section 4.4.1).
int majorVersion = 0;
int minorVersion = 0;
for (int i = 0; i < xpp.getAttributeCount(); i++) {
if ("lang".equals(xpp.getAttributeName(i))) {
language = xpp.getAttributeValue(i);
}
if ("version".equals(xpp.getAttributeName(i))) {
try {
int[] version = decodeVersion(xpp.getAttributeValue(i));
majorVersion = version[0];
minorVersion = version[1];
}
catch (Exception e) {
Log.error(e);
}
}
}
// If the client supports a greater major version than the server,
// set the version to the highest one the server supports.
if (majorVersion > MAJOR_VERSION) {
majorVersion = MAJOR_VERSION;
minorVersion = MINOR_VERSION;
}
else if (majorVersion == MAJOR_VERSION) {
// If the client supports a greater minor version than the
// server, set the version to the highest one that the server
// supports.
if (minorVersion > MINOR_VERSION) {
minorVersion = MINOR_VERSION;
}
}
// Store language and version information in the connection.
connection.setLanaguage(language);
connection.setXMPPVersion(majorVersion, minorVersion);
// Indicate the TLS policy to use for this connection
if (!connection.isSecure()) {
boolean hasCertificates = false;
try {
hasCertificates = SSLConfig.getKeyStore().size() > 0;
}
catch (Exception e) {
Log.error(e);
}
if (Connection.TLSPolicy.required == tlsPolicy && !hasCertificates) {
Log.error("Client session rejected. TLS is required but no certificates " +
"were created.");
return null;
}
// Set default TLS policy
connection.setTlsPolicy(hasCertificates ? tlsPolicy : Connection.TLSPolicy.disabled);
} else {
// Set default TLS policy
connection.setTlsPolicy(Connection.TLSPolicy.disabled);
}
// Indicate the compression policy to use for this connection
connection.setCompressionPolicy(compressionPolicy);
// Set the max number of milliseconds the connection may not receive data from the
// client before closing the connection
connection.setIdleTimeout(idleTimeout);
// Create a ClientSession for this user.
Session session = SessionManager.getInstance().createClientSession(connection);
Writer writer = connection.getWriter();
// Build the start packet response
StringBuilder sb = new StringBuilder(200);
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
if (isFlashClient) {
sb.append("<flash:stream xmlns:flash=\"http://www.jabber.com/streams/flash\" ");
}
else {
sb.append("<stream:stream ");
}
sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" xmlns=\"jabber:client\" from=\"");
sb.append(serverName);
sb.append("\" id=\"");
sb.append(session.getStreamID().toString());
sb.append("\" xml:lang=\"");
sb.append(language);
// Don't include version info if the version is 0.0.
if (majorVersion != 0) {
sb.append("\" version=\"");
sb.append(majorVersion).append(".").append(minorVersion);
}
sb.append("\">");
writer.write(sb.toString());
// If this is a "Jabber" connection, the session is now initialized and we can
// return to allow normal packet parsing.
if (majorVersion == 0) {
// If this is a flash client append a special caracter to the response.
if (isFlashClient) {
writer.write('\0');
}
writer.flush();
return session;
}
// Otherwise, this is at least XMPP 1.0 so we need to announce stream features.
sb = new StringBuilder(490);
sb.append("<stream:features>");
if (connection.getTlsPolicy() != Connection.TLSPolicy.disabled) {
sb.append("<starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">");
if (connection.getTlsPolicy() == Connection.TLSPolicy.required) {
sb.append("<required/>");
}
sb.append("</starttls>");
}
// Include available SASL Mechanisms
sb.append(SASLAuthentication.getSASLMechanisms(session));
// Include Stream features
String specificFeatures = session.getAvailableStreamFeatures();
if (specificFeatures != null) {
sb.append(specificFeatures);
}
sb.append("</stream:features>");
writer.write(sb.toString());
if (isFlashClient) {
writer.write('\0');
}
writer.flush();
return session;
}
/**
* Returns the list of IP address that are allowed to connect to the server. If the list is
* empty then anyone is allowed to connect to the server.
*
* @return the list of IP address that are allowed to connect to the server.
*/
public static Map<String, String> getAllowedIPs() {
return allowedIPs;
}
/**
* Sets the list of IP address that are allowed to connect to the server. If the list is
* empty then anyone is allowed to connect to the server.
*
* @param allowed the list of IP address that are allowed to connect to the server.
*/
public static void setAllowedIPs(Map<String, String> allowed) {
allowedIPs = allowed;
if (allowedIPs.isEmpty()) {
JiveGlobals.deleteProperty("xmpp.client.login.allowed");
}
else {
// Iterate through the elements in the map.
StringBuilder buf = new StringBuilder();
Iterator<String> iter = allowedIPs.keySet().iterator();
if (iter.hasNext()) {
buf.append(iter.next());
}
while (iter.hasNext()) {
buf.append(", ").append(iter.next());
}
JiveGlobals.setProperty("xmpp.client.login.allowed", buf.toString());
}
}
/**
* Returns whether TLS is mandatory, optional or is disabled for clients. When TLS is
* mandatory clients are required to secure their connections or otherwise their connections
* will be closed. On the other hand, when TLS is disabled clients are not allowed to secure
* their connections using TLS. Their connections will be closed if they try to secure the
* connection. in this last case.
*
* @return whether TLS is mandatory, optional or is disabled.
*/
public static SocketConnection.TLSPolicy getTLSPolicy() {
return tlsPolicy;
}
/**
* Sets whether TLS is mandatory, optional or is disabled for clients. When TLS is
* mandatory clients are required to secure their connections or otherwise their connections
* will be closed. On the other hand, when TLS is disabled clients are not allowed to secure
* their connections using TLS. Their connections will be closed if they try to secure the
* connection. in this last case.
*
* @param policy whether TLS is mandatory, optional or is disabled.
*/
public static void setTLSPolicy(SocketConnection.TLSPolicy policy) {
tlsPolicy = policy;
JiveGlobals.setProperty("xmpp.client.tls.policy", tlsPolicy.toString());
}
/**
* Returns whether compression is optional or is disabled for clients.
*
* @return whether compression is optional or is disabled.
*/
public static SocketConnection.CompressionPolicy getCompressionPolicy() {
return compressionPolicy;
}
/**
* Sets whether compression is optional or is disabled for clients.
*
* @param policy whether compression is optional or is disabled.
*/
public static void setCompressionPolicy(SocketConnection.CompressionPolicy policy) {
compressionPolicy = policy;
JiveGlobals.setProperty("xmpp.client.compression.policy", compressionPolicy.toString());
}
/**
* Returns the Privacy list that overrides the default privacy list. This list affects
* only this session and only for the duration of the session.
*
* @return the Privacy list that overrides the default privacy list.
*/
public PrivacyList getActiveList() {
return activeList;
}
/**
* Sets the Privacy list that overrides the default privacy list. This list affects
* only this session and only for the duration of the session.
*
* @param activeList the Privacy list that overrides the default privacy list.
*/
public void setActiveList(PrivacyList activeList) {
this.activeList = activeList;
}
/**
* Returns the default Privacy list used for the session's user. This list is
* processed if there is no active list set for the session.
*
* @return the default Privacy list used for the session's user.
*/
public PrivacyList getDefaultList() {
return defaultList;
}
/**
* Sets the default Privacy list used for the session's user. This list is
* processed if there is no active list set for the session.
*
*
* @param defaultList the default Privacy list used for the session's user.
*/
public void setDefaultList(PrivacyList defaultList) {
this.defaultList = defaultList;
}
/**
* Returns the number of milliseconds a connection has to be idle to be closed. Default is
* 30 minutes. Sending stanzas to the client is not considered as activity. We are only
* considering the connection active when the client sends some data or hearbeats
* (i.e. whitespaces) to the server.
*
* @return the number of milliseconds a connection has to be idle to be closed.
*/
public static long getIdleTimeout() {
return idleTimeout;
}
/**
* Sets the number of milliseconds a connection has to be idle to be closed. Default is
* 30 minutes. Sending stanzas to the client is not considered as activity. We are only
* considering the connection active when the client sends some data or hearbeats
* (i.e. whitespaces) to the server.
*
* @param timeout the number of milliseconds a connection has to be idle to be closed.
*/
public static void setIdleTimeout(long timeout) {
idleTimeout = timeout;
JiveGlobals.setProperty("xmpp.client.idle", Long.toString(idleTimeout));
}
/**
* Creates a session with an underlying connection and permission protection.
*
* @param connection The connection we are proxying
*/
public ClientSession(String serverName, Connection connection, StreamID streamID) {
super(serverName, connection, streamID);
// Set an unavailable initial presence
presence = new Presence();
presence.setType(Presence.Type.unavailable);
}
/**
* Returns the username associated with this session. Use this information
* with the user manager to obtain the user based on username.
*
* @return the username associated with this session
* @throws UserNotFoundException if a user is not associated with a session
* (the session has not authenticated yet)
*/
public String getUsername() throws UserNotFoundException {
if (authToken == null) {
throw new UserNotFoundException();
}
return getAddress().getNode();
}
/**
* Sets the new Authorization Token for this session. The session is not yet considered fully
* authenticated (i.e. active) since a resource has not been binded at this point. This
* message will be sent after SASL authentication was successful but yet resource binding
* is required.
*
* @param auth the authentication token obtained from SASL authentication.
*/
public void setAuthToken(AuthToken auth) {
authToken = auth;
}
/**
* Initialize the session with a valid authentication token and
* resource name. This automatically upgrades the session's
* status to authenticated and enables many features that are not
* available until authenticated (obtaining managers for example).
*
* @param auth the authentication token obtained from the AuthFactory.
* @param resource the resource this session authenticated under.
*/
public void setAuthToken(AuthToken auth, String resource) {
setAddress(new JID(auth.getUsername(), getServerName(), resource));
authToken = auth;
sessionManager.addSession(this);
setStatus(Session.STATUS_AUTHENTICATED);
// Set default privacy list for this session
setDefaultList(PrivacyListManager.getInstance().getDefaultPrivacyList(auth.getUsername()));
}
/**
* Initialize the session as an anonymous login. This automatically upgrades the session's
* status to authenticated and enables many features that are not available until
* authenticated (obtaining managers for example).<p>
*/
public void setAnonymousAuth() {
// Anonymous users have a full JID. Use the random resource as the JID's node
String resource = getAddress().getResource();
setAddress(new JID(resource, getServerName(), resource));
sessionManager.addAnonymousSession(this);
setStatus(Session.STATUS_AUTHENTICATED);
}
/**
* Returns the authentication token associated with this session.
*
* @return the authentication token associated with this session (can be null).
*/
public AuthToken getAuthToken() {
return authToken;
}
/**
* Flag indicating if this session has been initialized once coming
* online. Session initialization occurs after the session receives
* the first "available" presence update from the client. Initialization
* actions include pushing offline messages, presence subscription requests,
* and presence statuses to the client. Initialization occurs only once
* following the first available presence transition.
*
* @return True if the session has already been initializsed
*/
public boolean isInitialized() {
return initialized;
}
/**
* Sets the initialization state of the session.
*
* @param isInit True if the session has been initialized
* @see #isInitialized
*/
public void setInitialized(boolean isInit) {
initialized = isInit;
}
/**
* Returns true if the session was available ever.
*
* @return true if the session was available ever.
*/
public boolean wasAvailable() {
return wasAvailable;
}
/**
* Returns true if the offline messages of the user should be sent to the user when
* the user becomes online. If the user sent a disco request with node
* "http://jabber.org/protocol/offline" before the available presence then do not
* flood the user with the offline messages. If the user is connected from many resources
* then if one of the sessions stopped the flooding then no session should flood the user.
*
* @return true if the offline messages of the user should be sent to the user when the user
* becomes online.
*/
public boolean canFloodOfflineMessages() {
if(offlineFloodStopped) {
return false;
}
String username = getAddress().getNode();
for (ClientSession session : sessionManager.getSessions(username)) {
if (session.isOfflineFloodStopped()) {
return false;
}
}
return true;
}
/**
* Returns true if the user requested to not receive offline messages when sending
* an available presence. The user may send a disco request with node
* "http://jabber.org/protocol/offline" so that no offline messages are sent to the
* user when he becomes online. If the user is connected from many resources then
* if one of the sessions stopped the flooding then no session should flood the user.
*
* @return true if the user requested to not receive offline messages when sending
* an available presence.
*/
public boolean isOfflineFloodStopped() {
return offlineFloodStopped;
}
/**
* Sets if the user requested to not receive offline messages when sending
* an available presence. The user may send a disco request with node
* "http://jabber.org/protocol/offline" so that no offline messages are sent to the
* user when he becomes online. If the user is connected from many resources then
* if one of the sessions stopped the flooding then no session should flood the user.
*
* @param offlineFloodStopped if the user requested to not receive offline messages when
* sending an available presence.
*/
public void setOfflineFloodStopped(boolean offlineFloodStopped) {
this.offlineFloodStopped = offlineFloodStopped;
}
/**
* Obtain the presence of this session.
*
* @return The presence of this session or null if not authenticated
*/
public Presence getPresence() {
return presence;
}
/**
* Set the presence of this session
*
* @param presence The presence for the session
* @return The old priority of the session or null if not authenticated
*/
public Presence setPresence(Presence presence) {
Presence oldPresence = this.presence;
this.presence = presence;
if (oldPresence.isAvailable() && !this.presence.isAvailable()) {
// The client is no longer available
sessionManager.sessionUnavailable(this);
// Mark that the session is no longer initialized. This means that if the user sends
// an available presence again the session will be initialized again thus receiving
// offline messages and offline presence subscription requests
setInitialized(false);
// Notify listeners that the session is no longer available
PresenceEventDispatcher.unavailableSession(this, presence);
}
else if (!oldPresence.isAvailable() && this.presence.isAvailable()) {
// The client is available
sessionManager.sessionAvailable(this);
wasAvailable = true;
// Notify listeners that the session is now available
PresenceEventDispatcher.availableSession(this, presence);
}
else if (this.presence.isAvailable() && oldPresence.getPriority() != this.presence.getPriority())
{
// The client has changed the priority of his presence
sessionManager.changePriority(getAddress(), this.presence.getPriority());
// Notify listeners that the priority of the session/resource has changed
PresenceEventDispatcher.presencePriorityChanged(this, presence);
}
else if (this.presence.isAvailable()) {
// Notify listeners that the show or status value of the presence has changed
PresenceEventDispatcher.presenceChanged(this, presence);
}
return oldPresence;
}
/**
* Returns the number of conflicts detected on this session.
* Conflicts typically occur when another session authenticates properly
* to the user account and requests to use a resource matching the one
* in use by this session. Administrators may configure the server to automatically
* kick off existing sessions when their conflict count exceeds some limit including
* 0 (old sessions are kicked off immediately to accommodate new sessions). Conflicts
* typically signify the existing (old) session is broken/hung.
*
* @return The number of conflicts detected for this session
*/
public int getConflictCount() {
return conflictCount;
}
public String getAvailableStreamFeatures() {
// Offer authenticate and registration only if TLS was not required or if required
// then the connection is already secured
if (conn.getTlsPolicy() == Connection.TLSPolicy.required && !conn.isSecure()) {
return null;
}
StringBuilder sb = new StringBuilder(200);
// Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled &&
!conn.isCompressed()) {
sb.append(
"<compression xmlns=\"http://jabber.org/features/compress\"><method>zlib</method></compression>");
}
if (getAuthToken() == null) {
// Advertise that the server supports Non-SASL Authentication
sb.append("<auth xmlns=\"http://jabber.org/features/iq-auth\"/>");
// Advertise that the server supports In-Band Registration
if (XMPPServer.getInstance().getIQRegisterHandler().isInbandRegEnabled()) {
sb.append("<register xmlns=\"http://jabber.org/features/iq-register\"/>");
}
}
else {
// If the session has been authenticated then offer resource binding
// and session establishment
sb.append("<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>");
sb.append("<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>");
}
return sb.toString();
}
/**
* Increments the conflict by one.
*/
public void incrementConflictCount() {
conflictCount++;
}
/**
* Returns true if the specified packet must be blocked based on the active or default
* privacy list rules. The active list will be tried first. If none was found then the
* default list is going to be used. If no default list was defined for this user then
* allow the packet to flow.
*
* @param packet the packet to analyze if it must be blocked.
* @return true if the specified packet must be blocked.
*/
public boolean shouldBlockPacket(Packet packet) {
if (activeList != null) {
// If a privacy list is active then make sure that the packet is not blocked
return activeList.shouldBlockPacket(packet);
}
else if (defaultList != null) {
// There is no active list so check if there exists a default list and make
// sure that the packet is not blocked
return defaultList.shouldBlockPacket(packet);
}
return false;
}
public void process(Packet packet) {
if (shouldBlockPacket(packet)) {
// Communication is blocked. Drop packet.
return;
}
// Deliver packet to the client
deliver(packet);
}
private void deliver(Packet packet) {
if (conn != null && !conn.isClosed()) {
try {
conn.deliver(packet);
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
public String toString() {
return super.toString() + " presence: " + presence;
}
}
/**
* $RCSfile$
* $Revision: 3174 $
* $Date: 2005-12-08 17:41:00 -0300 (Thu, 08 Dec 2005) $
*
* Copyright (C) 2004 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire;
import org.jivesoftware.wildfire.auth.AuthToken;
import org.xmpp.packet.JID;
import java.util.Date;
import java.util.Map;
import java.util.TreeMap;
/**
* The session represents a connection between the server and a client (c2s) or
* another server (s2s) as well as a connection with a component. Authentication and
* user accounts are associated with c2s connections while s2s has an optional authentication
* association but no single user user.<p>
*
* Obtain object managers from the session in order to access server resources.
*
* @author Gaston Dombiak
*/
public abstract class Session implements RoutableChannelHandler {
/**
* Version of the XMPP spec supported as MAJOR_VERSION.MINOR_VERSION (e.g. 1.0).
*/
public static final int MAJOR_VERSION = 1;
public static final int MINOR_VERSION = 0;
/**
* The utf-8 charset for decoding and encoding Jabber packet streams.
*/
protected static String CHARSET = "UTF-8";
public static final int STATUS_CLOSED = -1;
public static final int STATUS_CONNECTED = 1;
public static final int STATUS_STREAMING = 2;
public static final int STATUS_AUTHENTICATED = 3;
/**
* The Address this session is authenticated as.
*/
private JID address;
/**
* The stream id for this session (random and unique).
*/
private StreamID streamID;
/**
* The current session status.
*/
protected int status = STATUS_CONNECTED;
/**
* The connection that this session represents.
*/
protected Connection conn;
/**
* The authentication token for this session.
*/
protected AuthToken authToken;
protected SessionManager sessionManager;
private String serverName;
private Date startDate = new Date();
private long lastActiveDate;
private long clientPacketCount = 0;
private long serverPacketCount = 0;
/**
* Session temporary data. All data stored in this <code>Map</code> disapear when session
* finishes.
*/
private Map<String, Object> sessionData = null;
/**
* Creates a session with an underlying connection and permission protection.
*
* @param connection The connection we are proxying
*/
public Session(String serverName, Connection connection, StreamID streamID) {
conn = connection;
this.streamID = streamID;
this.serverName = serverName;
String id = streamID.getID();
this.address = new JID(null, serverName, id);
this.sessionManager = SessionManager.getInstance();
sessionData = new TreeMap<String, Object>();
}
/**
* Obtain the address of the user. The address is used by services like the core
* server packet router to determine if a packet should be sent to the handler.
* Handlers that are working on behalf of the server should use the generic server
* hostname address (e.g. server.com).
*
* @return the address of the packet handler.
*/
public JID getAddress() {
return address;
}
/**
* Sets the new address of this session. The address is used by services like the core
* server packet router to determine if a packet should be sent to the handler.
* Handlers that are working on behalf of the server should use the generic server
* hostname address (e.g. server.com).
*/
public void setAddress(JID address){
this.address = address;
}
/**
* Returns the connection associated with this Session.
*
* @return The connection for this session
*/
public Connection getConnection() {
return conn;
}
/**
* Obtain the current status of this session.
*
* @return The status code for this session
*/
public int getStatus() {
return status;
}
/**
* Set the new status of this session. Setting a status may trigger
* certain events to occur (setting a closed status will close this
* session).
*
* @param status The new status code for this session
*/
public void setStatus(int status) {
this.status = status;
}
/**
* Obtain the stream ID associated with this sesison. Stream ID's are generated by the server
* and should be unique and random.
*
* @return This session's assigned stream ID
*/
public StreamID getStreamID() {
return streamID;
}
/**
* Obtain the name of the server this session belongs to.
*
* @return the server name.
*/
public String getServerName() {
return serverName;
}
/**
* Obtain the date the session was created.
*
* @return the session's creation date.
*/
public Date getCreationDate() {
return startDate;
}
/**
* Obtain the time the session last had activity.
*
* @return The last time the session received activity.
*/
public Date getLastActiveDate() {
return new Date(lastActiveDate);
}
/**
* Increments the number of packets sent from the client to the server.
*/
public void incrementClientPacketCount() {
clientPacketCount++;
lastActiveDate = System.currentTimeMillis();
}
/**
* Increments the number of packets sent from the server to the client.
*/
public void incrementServerPacketCount() {
serverPacketCount++;
lastActiveDate = System.currentTimeMillis();
}
/**
* Obtain the number of packets sent from the client to the server.
*
* @return The number of packets sent from the client to the server.
*/
public long getNumClientPackets() {
return clientPacketCount;
}
/**
* Obtain the number of packets sent from the server to the client.
*
* @return The number of packets sent from the server to the client.
*/
public long getNumServerPackets() {
return serverPacketCount;
}
/**
* Saves given session data. Data are saved to temporary storage only and are accessible during
* this session life only and only from this session instance.
*
* @param key a <code>String</code> value of stored data key ID.
* @param value a <code>Object</code> value of data stored in session.
* @see #getSessionData(String)
*/
public void setSessionData(String key, Object value) {
sessionData.put(key, value);
}
/**
* Retrieves session data. This method gives access to temporary session data only. You can
* retrieve earlier saved data giving key ID to receive needed value. Please see
* {@link #setSessionData(String, Object)} description for more details.
*
* @param key a <code>String</code> value of stored data ID.
* @return a <code>Object</code> value of data for given key.
* @see #setSessionData(String, Object)
*/
public Object getSessionData(String key) {
return sessionData.get(key);
}
/**
* Removes session data. Please see {@link #setSessionData(String, Object)} description
* for more details.
*
* @param key a <code>String</code> value of stored data ID.
* @see #setSessionData(String, Object)
*/
public void removeSessionData(String key) {
sessionData.remove(key);
}
/**
* Returns a text with the available stream features. Each subclass may return different
* values depending whether the session has been authenticated or not.
*
* @return a text with the available stream features or <tt>null</tt> to add nothing.
*/
public abstract String getAvailableStreamFeatures();
public String toString() {
return super.toString() + " status: " + status + " address: " + address + " id: " + streamID;
}
protected static int[] decodeVersion(String version) {
int[] answer = new int[] {0 , 0};
String [] versionString = version.split("\\.");
answer[0] = Integer.parseInt(versionString[0]);
answer[1] = Integer.parseInt(versionString[1]);
return answer;
}
}
\ No newline at end of file
/**
* $RCSfile: ComponentSession.java,v $
* $Revision: 3174 $
* $Date: 2005-12-08 17:41:00 -0300 (Thu, 08 Dec 2005) $
*
* Copyright (C) 2004 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.component;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.auth.AuthFactory;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.net.SocketConnection;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.component.Component;
import org.xmpp.component.ComponentManager;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import org.xmpp.packet.StreamError;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* Represents a session between the server and a component.
*
* @author Gaston Dombiak
*/
public class ComponentSession extends Session {
private ExternalComponent component = new ExternalComponent();
/**
* Returns a newly created session between the server and a component. The session will be
* created and returned only if all the checkings were correct.<p>
*
* A domain will be binded for the new connecting component. This method is following
* the JEP-114 where the domain to bind is sent in the TO attribute of the stream header.
*
* @param serverName the name of the server where the session is connecting to.
* @param reader the reader that is reading the provided XML through the connection.
* @param connection the connection with the component.
* @return a newly created session between the server and a component.
*/
public static Session createSession(String serverName, XMPPPacketReader reader,
SocketConnection connection) throws UnauthorizedException, IOException,
XmlPullParserException
{
XmlPullParser xpp = reader.getXPPParser();
String domain = xpp.getAttributeValue("", "to");
Log.debug("[ExComp] Starting registration of new external component for domain: " + domain);
Writer writer = connection.getWriter();
// Default answer header in case of an error
StringBuilder sb = new StringBuilder();
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
sb.append("<stream:stream ");
sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" ");
sb.append("xmlns=\"jabber:component:accept\" from=\"");
sb.append(domain);
sb.append("\">");
// Check that a domain was provided in the stream header
if (domain == null) {
Log.debug("[ExComp] Domain not specified in stanza: " + xpp.getText());
// Include the bad-format in the response
StreamError error = new StreamError(StreamError.Condition.bad_format);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Get the requested subdomain
String subdomain = domain;
int index = domain.indexOf(serverName);
if (index > -1) {
subdomain = domain.substring(0, index -1);
}
// Check that an external component for the specified subdomain may connect to this server
if (!ExternalComponentManager.canAccess(subdomain)) {
Log.debug("[ExComp] Component is not allowed to connect with subdomain: " + subdomain);
StreamError error = new StreamError(StreamError.Condition.host_unknown);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Check that a secret key was configured in the server
String secretKey = ExternalComponentManager.getSecretForComponent(subdomain);
if (secretKey == null) {
Log.debug("[ExComp] A shared secret for the component was not found.");
// Include the internal-server-error in the response
StreamError error = new StreamError(StreamError.Condition.internal_server_error);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Check that the requested subdomain is not already in use
if (InternalComponentManager.getInstance().getComponent(subdomain) != null) {
Log.debug("[ExComp] Another component is already using domain: " + domain);
// Domain already occupied so return a conflict error and close the connection
// Include the conflict error in the response
StreamError error = new StreamError(StreamError.Condition.conflict);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Create a ComponentSession for the external component
Session session = SessionManager.getInstance().createComponentSession(connection);
// Set the bind address as the address of the session
session.setAddress(new JID(null, domain , null));
try {
Log.debug("[ExComp] Send stream header with ID: " + session.getStreamID() +
" for component with domain: " +
domain);
// Build the start packet response
sb = new StringBuilder();
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
sb.append("<stream:stream ");
sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" ");
sb.append("xmlns=\"jabber:component:accept\" from=\"");
sb.append(domain);
sb.append("\" id=\"");
sb.append(session.getStreamID().toString());
sb.append("\">");
writer.write(sb.toString());
writer.flush();
// Perform authentication. Wait for the handshake (with the secret key)
Element doc = reader.parseDocument().getRootElement();
String digest = "handshake".equals(doc.getName()) ? doc.getStringValue() : "";
String anticipatedDigest = AuthFactory.createDigest(session.getStreamID().getID(),
secretKey);
// Check that the provided handshake (secret key + sessionID) is correct
if (!anticipatedDigest.equalsIgnoreCase(digest)) {
Log.debug("[ExComp] Incorrect handshake for component with domain: " + domain);
// The credentials supplied by the initiator are not valid (answer an error
// and close the connection)
writer.write(new StreamError(StreamError.Condition.not_authorized).toXML());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
else {
// Component has authenticated fine
session.setStatus(Session.STATUS_AUTHENTICATED);
// Send empty handshake element to acknowledge success
writer.write("<handshake></handshake>");
writer.flush();
// Bind the domain to this component
ExternalComponent component = ((ComponentSession) session).getExternalComponent();
InternalComponentManager.getInstance().addComponent(subdomain, component);
Log.debug("[ExComp] External component was registered SUCCESSFULLY with domain: " +
domain);
return session;
}
}
catch (Exception e) {
Log.error("An error occured while creating a ComponentSession", e);
// Close the underlying connection
connection.close();
return null;
}
}
public ComponentSession(String serverName, Connection conn, StreamID id) {
super(serverName, conn, id);
}
public String getAvailableStreamFeatures() {
// Nothing special to add
return null;
}
public void process(Packet packet) throws PacketException {
// Since ComponentSessions are not being stored in the RoutingTable this messages is very
// unlikely to be sent
component.processPacket(packet);
}
public ExternalComponent getExternalComponent() {
return component;
}
/**
* The ExternalComponent acts as a proxy of the remote connected component. Any Packet that is
* sent to this component will be delivered to the real component on the other side of the
* connection.<p>
*
* An ExternalComponent will be added as a route in the RoutingTable for each connected
* external component. This implies that when the server receives a packet whose domain matches
* the external component services address then a route to the external component will be used
* and the packet will be forwarded to the component on the other side of the connection.
*/
public class ExternalComponent implements Component {
private String name = "";
private String type = "";
private String category = "";
/**
* List of subdomains that were binded for this component. The list will include
* the initial subdomain.
*/
private List<String> subdomains = new ArrayList<String>();
public void processPacket(Packet packet) {
if (conn != null && !conn.isClosed()) {
try {
conn.deliver(packet);
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
conn.close();
}
}
}
public String getName() {
return name;
}
public String getDescription() {
return category + " - " + type;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public String getInitialSubdomain() {
if (subdomains.isEmpty()) {
return null;
}
return subdomains.get(0);
}
private void addSubdomain(String subdomain) {
subdomains.add(subdomain);
}
public Collection<String> getSubdomains() {
return subdomains;
}
public void initialize(JID jid, ComponentManager componentManager) {
addSubdomain(jid.toString());
}
public void start() {
}
public void shutdown() {
}
public String toString() {
return super.toString() + " - subdomains: " + subdomains;
}
}
}
\ No newline at end of file
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.multiplex;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.auth.AuthFactory;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.jivesoftware.wildfire.net.SocketConnection;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import org.xmpp.packet.StreamError;
import java.io.IOException;
import java.io.Writer;
import java.util.Collection;
/**
* Represents a session between the server and a connection manager.<p>
*
* Each Connection Manager has its own domain. Each connection from the same connection manager
* uses a different resource. Unlike any other session, connection manager sessions are not
* present in the routing table. This means that connection managers are not reachable entities.
* In other words, entities cannot send packets to connection managers but clients being hosted
* by them. The main reason behind this design decision is that connection managers are private
* components of the server so they can only be contacted by the server. Connection Manager
* sessions are present in {@link SessionManager} but not in {@link RoutingTable}. Use
* {@link SessionManager#getConnectionMultiplexerSessions(String)} to get all sessions or
* {@link ConnectionMultiplexerManager#getMultiplexerSession(String)}
* to get a random session to a given connection manager.
*
* @author Gaston Dombiak
*/
public class ConnectionMultiplexerSession extends Session {
private static Connection.TLSPolicy tlsPolicy;
private static Connection.CompressionPolicy compressionPolicy;
/**
* Milliseconds a connection has to be idle to be closed. Default is 30 minutes. Sending
* stanzas to the client is not considered as activity. We are only considering the connection
* active when the client sends some data or hearbeats (i.e. whitespaces) to the server.
* The reason for this is that sending data will fail if the connection is closed. And if
* the thread is blocked while sending data (because the socket is closed) then the clean up
* thread will close the socket anyway.
*/
private static long idleTimeout;
static {
// Set the TLS policy stored as a system property
String policyName = JiveGlobals.getProperty("xmpp.multiplex.tls.policy",
Connection.TLSPolicy.disabled.toString());
tlsPolicy = Connection.TLSPolicy.valueOf(policyName);
// Set the Compression policy stored as a system property
policyName = JiveGlobals.getProperty("xmpp.multiplex.compression.policy",
Connection.CompressionPolicy.disabled.toString());
compressionPolicy = Connection.CompressionPolicy.valueOf(policyName);
// Set the default read idle timeout. If none was set then assume 5 minutes
idleTimeout = JiveGlobals.getIntProperty("xmpp.multiplex.idle", 5 * 60 * 1000);
}
public static Session createSession(String serverName, XMPPPacketReader reader,
SocketConnection connection) throws XmlPullParserException, IOException,
UnauthorizedException {
XmlPullParser xpp = reader.getXPPParser();
String domain = xpp.getAttributeValue("", "to");
Log.debug("[ConMng] Starting registration of new connection manager for domain: " + domain);
Writer writer = connection.getWriter();
// Default answer header in case of an error
StringBuilder sb = new StringBuilder();
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
sb.append("<stream:stream ");
sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" ");
sb.append("xmlns=\"jabber:connectionmanager\" from=\"");
sb.append(domain);
sb.append("\" version=\"1.0\">");
// Check that a domain was provided in the stream header
if (domain == null) {
Log.debug("[ConMng] Domain not specified in stanza: " + xpp.getText());
// Include the bad-format in the response
StreamError error = new StreamError(StreamError.Condition.bad_format);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Get the requested domain
JID address = new JID(domain);
// Check that a secret key was configured in the server
String secretKey = ConnectionMultiplexerManager.getDefaultSecret();
if (secretKey == null) {
Log.debug("[ConMng] A shared secret for connection manager was not found.");
// Include the internal-server-error in the response
StreamError error = new StreamError(StreamError.Condition.internal_server_error);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Check that the requested subdomain is not already in use
if (SessionManager.getInstance().getConnectionMultiplexerSession(address) != null) {
Log.debug("[ConMng] Another connection manager is already using domain: " + domain);
// Domain already occupied so return a conflict error and close the connection
// Include the conflict error in the response
StreamError error = new StreamError(StreamError.Condition.conflict);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Indicate the TLS policy to use for this connection
connection.setTlsPolicy(tlsPolicy);
// Indicate the compression policy to use for this connection
connection.setCompressionPolicy(compressionPolicy);
// Set the max number of milliseconds the connection may not receive data from the
// client before closing the connection
connection.setIdleTimeout(idleTimeout);
// Set the connection manager domain to use delivering a packet fails
((MultiplexerPacketDeliverer) connection.getPacketDeliverer())
.setConnectionManagerDomain(address.getDomain());
// Create a ConnectionMultiplexerSession for the new session originated
// from the connection manager
Session session =
SessionManager.getInstance().createMultiplexerSession(connection, address);
// Set the address of the new session
session.setAddress(address);
try {
Log.debug("[ConMng] Send stream header with ID: " + session.getStreamID() +
" for connection manager with domain: " +
domain);
// Build the start packet response
sb = new StringBuilder();
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
sb.append("<stream:stream ");
sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" ");
sb.append("xmlns=\"jabber:connectionmanager\" from=\"");
sb.append(domain);
sb.append("\" id=\"");
sb.append(session.getStreamID().toString());
sb.append("\" version=\"1.0\" >");
writer.write(sb.toString());
writer.flush();
// Announce stream features.
sb = new StringBuilder(490);
sb.append("<stream:features>");
if (tlsPolicy != Connection.TLSPolicy.disabled) {
sb.append("<starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">");
if (tlsPolicy == Connection.TLSPolicy.required) {
sb.append("<required/>");
}
sb.append("</starttls>");
}
// Include Stream features
String specificFeatures = session.getAvailableStreamFeatures();
if (specificFeatures != null) {
sb.append(specificFeatures);
}
sb.append("</stream:features>");
writer.write(sb.toString());
writer.flush();
return session;
}
catch (Exception e) {
Log.error("An error occured while creating a Connection Manager Session", e);
// Close the underlying connection
connection.close();
return null;
}
}
public ConnectionMultiplexerSession(String serverName, Connection connection, StreamID streamID) {
super(serverName, connection, streamID);
}
public String getAvailableStreamFeatures() {
if (conn.getTlsPolicy() == Connection.TLSPolicy.required && !conn.isSecure()) {
return null;
}
// Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled &&
!conn.isCompressed()) {
return "<compression xmlns=\"http://jabber.org/features/compress\"><method>zlib</method></compression>";
}
return null;
}
public void process(Packet packet) {
deliver(packet);
}
/**
* Authenticates the connection manager. Shared secret is validated with the one provided
* by the connection manager. If everything went fine then the session will have a status
* of "authenticated" and the connection manager will receive the client configuration
* options.
*
* @param digest the digest provided by the connection manager with the handshake stanza.
* @return true if the connection manager was sucessfully authenticated.
*/
public boolean authenticate(String digest) {
// Perform authentication. Wait for the handshake (with the secret key)
String anticipatedDigest = AuthFactory.createDigest(getStreamID().getID(),
ConnectionMultiplexerManager.getDefaultSecret());
// Check that the provided handshake (secret key + sessionID) is correct
if (!anticipatedDigest.equalsIgnoreCase(digest)) {
Log.debug("[ConMng] Incorrect handshake for connection manager with domain: " +
getAddress().getDomain());
// The credentials supplied by the initiator are not valid (answer an error
// and close the connection)
conn.deliverRawText(new StreamError(StreamError.Condition.not_authorized).toXML());
// Close the underlying connection
conn.close();
return false;
}
else {
// Component has authenticated fine
setStatus(Session.STATUS_AUTHENTICATED);
// Send empty handshake element to acknowledge success
conn.deliverRawText("<handshake></handshake>");
Log.debug("[ConMng] Connection manager was AUTHENTICATED with domain: " + getAddress());
sendClientOptions();
return true;
}
}
/**
* Send to the Connection Manager the connection options available for clients. The info
* to send includes:
* <ul>
* <li>if TLS is available, optional or required
* <li>SASL mechanisms available before TLS is negotiated
* <li>if compression is available
* <li>if Non-SASL authentication is available
* <li>if In-Band Registration is available
* </ul
*/
private void sendClientOptions() {
IQ options = new IQ(IQ.Type.set);
Element child = options.setChildElement("configuration",
"http://jabber.org/protocol/connectionmanager");
// Add info about TLS
if (ClientSession.getTLSPolicy() != Connection.TLSPolicy.disabled) {
Element tls = child.addElement("starttls", "urn:ietf:params:xml:ns:xmpp-tls");
if (ClientSession.getTLSPolicy() == Connection.TLSPolicy.required) {
tls.addElement("required");
}
}
// Add info about SASL mechanisms
Collection<String> mechanisms = SASLAuthentication.getSupportedMechanisms();
if (!mechanisms.isEmpty()) {
Element sasl = child.addElement("mechanisms", "urn:ietf:params:xml:ns:xmpp-sasl");
for (String mechanism : mechanisms) {
sasl.addElement("mechanism").setText(mechanism);
}
}
// Add info about Stream Compression
if (ClientSession.getCompressionPolicy() == Connection.CompressionPolicy.optional) {
Element comp = child.addElement("compression", "http://jabber.org/features/compress");
comp.addElement("method").setText("zlib");
}
// Add info about Non-SASL authentication
child.addElement("auth", "http://jabber.org/features/iq-auth");
// Add info about In-Band Registration
if (XMPPServer.getInstance().getIQRegisterHandler().isInbandRegEnabled()) {
child.addElement("register", "http://jabber.org/features/iq-register");
}
// Send the options
try {
conn.deliver(options);
}
catch (UnauthorizedException e) {
// Do nothing. Should never happen
}
}
void deliver(Packet packet) {
if (conn != null && !conn.isClosed()) {
try {
conn.deliver(packet);
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
/**
* Returns whether TLS is mandatory, optional or is disabled for clients. When TLS is
* mandatory clients are required to secure their connections or otherwise their connections
* will be closed. On the other hand, when TLS is disabled clients are not allowed to secure
* their connections using TLS. Their connections will be closed if they try to secure the
* connection. in this last case.
*
* @return whether TLS is mandatory, optional or is disabled.
*/
public static SocketConnection.TLSPolicy getTLSPolicy() {
return tlsPolicy;
}
/**
* Sets whether TLS is mandatory, optional or is disabled for clients. When TLS is
* mandatory clients are required to secure their connections or otherwise their connections
* will be closed. On the other hand, when TLS is disabled clients are not allowed to secure
* their connections using TLS. Their connections will be closed if they try to secure the
* connection. in this last case.
*
* @param policy whether TLS is mandatory, optional or is disabled.
*/
public static void setTLSPolicy(SocketConnection.TLSPolicy policy) {
tlsPolicy = policy;
JiveGlobals.setProperty("xmpp.multiplex.tls.policy", tlsPolicy.toString());
}
/**
* Returns whether compression is optional or is disabled for clients.
*
* @return whether compression is optional or is disabled.
*/
public static SocketConnection.CompressionPolicy getCompressionPolicy() {
return compressionPolicy;
}
/**
* Sets whether compression is optional or is disabled for clients.
*
* @param policy whether compression is optional or is disabled.
*/
public static void setCompressionPolicy(SocketConnection.CompressionPolicy policy) {
compressionPolicy = policy;
JiveGlobals.setProperty("xmpp.multiplex.compression.policy", compressionPolicy.toString());
}
/**
* Returns the number of milliseconds a connection has to be idle to be closed. Default is
* 30 minutes. Sending stanzas to the client is not considered as activity. We are only
* considering the connection active when the client sends some data or hearbeats
* (i.e. whitespaces) to the server.
*
* @return the number of milliseconds a connection has to be idle to be closed.
*/
public static long getIdleTimeout() {
return idleTimeout;
}
/**
* Sets the number of milliseconds a connection has to be idle to be closed. Default is
* 30 minutes. Sending stanzas to the client is not considered as activity. We are only
* considering the connection active when the client sends some data or hearbeats
* (i.e. whitespaces) to the server.
*
* @param timeout the number of milliseconds a connection has to be idle to be closed.
*/
public static void setIdleTimeout(long timeout) {
idleTimeout = timeout;
JiveGlobals.setProperty("xmpp.multiplex.idle", Long.toString(idleTimeout));
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.RoutingTable;
import org.jivesoftware.wildfire.Session;
import org.jivesoftware.wildfire.SessionManager;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.multiplex.ConnectionMultiplexerSession;
import org.jivesoftware.wildfire.multiplex.MultiplexerPacketHandler;
import org.jivesoftware.wildfire.multiplex.Route;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.*;
/**
* A SocketReader specialized for connection manager connections. Connection managers may have
* one or more connections to the server. Each connection will have its own instance of this
* class. Each connection will send packets, sent from clients connected to the connection
* manager, to the server. Moreover, the server will use any of the available connections
* to the connection manager to send packets to connected clients through the connection manager.<p>
*
* Each socket reader has its own thread pool to process many packets in parallel. The thread pool
* by default will use 10 core threads, a queue of 50 elements and a max number of 100 threads.
* The pool will use the 10 core threads in parallel and queue packets. When the queue is full
* then more threads will be created until the max number is reached. Any created thread that
* exceeds the core number of threads will be killed when idle for 1 minute. The thread pool
* configuration can be modified by setting the system properties:
* <ul>
* <li>xmpp.multiplex.processing.core.threads
* <li>xmpp.multiplex.processing.max.threads
* <li>xmpp.multiplex.processing.queue
* </ul>
*
* Each Connection Manager has its own domain. Each connection from the same connection manager
* uses a different resource. Unlike any other session, connection manager sessions are not
* present in the routing table. This means that connection managers are not reachable entities.
* In other words, entities cannot send packets to connection managers but clients being hosted
* by them. The main reason behind this design decision is that connection managers are private
* components of the server so they can only be contacted by the server. Connection Manager
* sessions are present in {@link SessionManager} but not in {@link RoutingTable}.
*
* @author Gaston Dombiak
*/
public class ConnectionMultiplexerSocketReader extends SocketReader {
/**
* Pool of threads that are available for processing the requests.
*/
private ThreadPoolExecutor threadPool;
/**
* Queue used when thread pool is exhausted (i.e. core threads, queue and max threads are all busy). Once
* the thread pool is exhausted, incoming packets will be placed into this queue. Once the queue is emptied
* incoming packets will go to the thread pool.<p>
*
* Note that the queue is unbound so we may potentially consume all Java memory. A future version may make
* Connection Managers smarter and throttle traffic to the server to avoid this problem.
*/
private BlockingQueue<Runnable> overflowBuffer = new LinkedBlockingQueue<Runnable>();
/**
* Handler of IQ packets sent from the Connection Manager to the server.
*/
private MultiplexerPacketHandler packetHandler;
public ConnectionMultiplexerSocketReader(PacketRouter router, RoutingTable routingTable,
String serverName, Socket socket, SocketConnection connection,
boolean useBlockingMode) {
super(router, routingTable, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.core.threads", 10);
int maxThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.max.threads", 100);
int queueSize = JiveGlobals.getIntProperty("xmpp.multiplex.processing.queue", 50);
threadPool =
new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new RejectedExecutionHandler() {
/**
* Stores rejected tasks in the overflow queue.
* @param r the rejected task.
* @param executor thread pool executor.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
overflowBuffer.add(r);
}
}
});
// Thread that will consume packets present in the overflow queue. The thread will monitor the threadPool
// and when a thread in the pool is available it will remove the oldest packet in the overflow queue and
// process it with the idle threads.
Thread overflowThread = new Thread() {
public void run() {
while (!threadPool.isShutdown() && !threadPool.isTerminated()) {
try {
// Get the next task that has been rejected when the threadPool was exhausted
Runnable runnable = overflowBuffer.take();
// Wait until the pool has available threads
while (threadPool.getActiveCount() >= threadPool.getMaximumPoolSize()) {
Thread.sleep(100);
}
// Process the rejected task
threadPool.execute(runnable);
} catch (InterruptedException e) {
// Do nothing
}
catch(Exception e) {
Log.error("Error consuming overflow buffer", e);
}
}
}
};
overflowThread.start();
}
boolean createSession(String namespace)
throws UnauthorizedException, XmlPullParserException, IOException {
if (getNamespace().equals(namespace)) {
// The connected client is a connection manager so create a ConnectionMultiplexerSession
session = ConnectionMultiplexerSession.createSession(serverName, reader, connection);
packetHandler = new MultiplexerPacketHandler(session.getAddress().getDomain());
return true;
}
return false;
}
String getNamespace() {
return "jabber:connectionmanager";
}
protected void processIQ(final IQ packet) throws UnauthorizedException {
if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
// Session is not authenticated so return error
IQ reply = new IQ();
reply.setChildElement(packet.getChildElement().createCopy());
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_authorized);
session.process(reply);
return;
}
// Process the packet in another thread
threadPool.execute(new Runnable() {
public void run() {
packetHandler.handle(packet);
}
});
}
/**
* Process stanza sent by a client that is connected to a connection manager. The
* original stanza is wrapped in the route element. Only a single stanza must be
* wrapped in the route element.
*
* @param packet the route element.
*/
private void processRoute(final Route packet) throws UnauthorizedException {
if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
// Session is not authenticated so return error
Route reply = new Route(packet.getStreamID());
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_authorized);
session.process(reply);
return;
}
// Process the packet in another thread
Runnable runnable = new Runnable() {
public void run() {
packetHandler.route(packet);
}
};
if (!overflowBuffer.isEmpty()) {
// Thread pool is exhausted or we are still recoving from a recent exhausted state.
// Keep placing tasks in this queue until the queue is empty. The queue will help us
// keep the cronological order of incoming packets. Note that if we don't care about
// being cronologically correct then we should just add the task to the threadPool.
overflowBuffer.add(runnable);
}
else {
// Thread pool is not exhausted and we are not recovering from an exhausted state so just
// run the task using the thread pool
threadPool.execute(runnable);
}
}
protected void processMessage(final Message packet) throws UnauthorizedException {
throw new UnauthorizedException("Message packets are not supported. Original packets " +
"should be wrapped by IQ packets.");
}
protected void processPresence(final Presence packet) throws UnauthorizedException {
throw new UnauthorizedException("Message packets are not supported. Original packets " +
"should be wrapped by IQ packets.");
}
boolean processUnknowPacket(Element doc) {
String tag = doc.getName();
if ("route".equals(tag)) {
// Process stanza wrapped by the route packet
try {
processRoute(new Route(doc));
return true;
}
catch (UnauthorizedException e) {
// Should never happen
}
}
else if ("handshake".equals(tag)) {
open = ((ConnectionMultiplexerSession)session).authenticate(doc.getStringValue());
return true;
}
else if ("error".equals(tag) && "stream".equals(doc.getNamespacePrefix())) {
session.getConnection().close();
open = false;
return true;
}
return false;
}
protected void shutdown() {
super.shutdown();
// Shutdown the pool of threads that are processing packets sent by
// the remote server
threadPool.shutdown();
}
String getName() {
return "ConnectionMultiplexer SR - " + hashCode();
}
boolean validateHost() {
return false;
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.ConnectionManager;
import org.jivesoftware.wildfire.ServerPort;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Accepts new socket connections using a non-blocking model. A single selector is
* used for all connected clients and also for accepting new connections.
*
* @author Daniele Piras
*/
class NonBlockingAcceptingMode extends SocketAcceptingMode {
// Time (in ms) to sleep from a reading-cycle to another
private static final long CYCLE_TIME = 10;
// Selector to collect messages from client connections.
private Selector selector;
protected NonBlockingAcceptingMode(ConnectionManager connManager, ServerPort serverPort,
InetAddress bindInterface) throws IOException {
super(connManager, serverPort);
// Chaning server to use NIO
// Open selector...
selector = Selector.open();
// Create a new ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// Retrieve socket and bind socket with specified address
this.serverSocket = serverSocketChannel.socket();
this.serverSocket.bind(new InetSocketAddress(bindInterface, serverPort.getPort()));
// Configure Blocking to unblocking
serverSocketChannel.configureBlocking(false);
// Registering connection with selector.
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
AcceptConnection acceptConnection = new AcceptConnection();
sk.attach(acceptConnection);
}
/**
* DANIELE:
* This thread use the selector NIO features to retrieve client connections
* and messages.
*/
public void run() {
while (notTerminated && !Thread.interrupted()) {
try {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
SelectorAction action = (SelectorAction) key.attachment();
if (action == null) {
continue;
}
if (key.isAcceptable()) {
action.connect(key);
}
else if (key.isReadable()) {
action.read(key);
}
}
Thread.sleep(CYCLE_TIME);
}
catch (IOException ie) {
if (notTerminated) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),
ie);
}
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
}
}
}
/*
* InnerClass that is use when a new client arrive.
* It's use the reactor pattern to register an abstract action
* to the selector.
*/
class AcceptConnection implements SelectorAction {
public void read(SelectionKey key) throws IOException {
}
/*
* A client arrive...
*/
public void connect(SelectionKey key) throws IOException {
// Retrieve the server socket channel...
ServerSocketChannel sChannel = (ServerSocketChannel) key.channel();
// Accept the connection
SocketChannel socketChannel = sChannel.accept();
// Retrieve socket for incoming connection
Socket sock = socketChannel.socket();
socketChannel.configureBlocking(false);
// Registering READING operation into the selector
SelectionKey sockKey = socketChannel.register(selector, SelectionKey.OP_READ);
if (sock != null) {
System.out.println("Connect " + sock.toString());
Log.debug("Connect " + sock.toString());
try {
SocketReader reader =
connManager.createSocketReader(sock, false, serverPort, false);
SelectorAction action = new ReadAction(reader);
sockKey.attach(action);
}
catch (Exception e) {
// There is an exception...
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
}
}
}
}
class ReadAction implements SelectorAction {
SocketReader reader;
public ReadAction(SocketReader reader) {
this.reader = reader;
}
public void read(SelectionKey key) throws IOException {
// Socket reader (using non-blocking mode) will read the stream and process, in
// another thread, any number of stanzas found in the stream.
reader.run();
}
public void connect(SelectionKey key) throws IOException {
}
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.SessionManager;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
/**
* Process incoming packets using a non-blocking model.
*
* @author Daniele Piras
*/
class NonBlockingReadingMode extends SocketReadingMode {
// DANIELE: Socket read timeout in milliseconds
private static int READ_TIMEOUT = 0;
private static String STREAM_START = "<stream:stream";
// DANIELE: Semaphore to avoid concurrent reading operation from different thread
private boolean isReading;
// DANIELE: lightweight xml parser.
private XMLLightweightParser xmlLightWeightParser;
// DANIELE: Channel for socket connection
private SocketChannel socketChannel;
// DANIELE: Indicate if the reading operations has been scheduled into the executor.
// this is very important because if all reading thread are busy is used to avoid
// to reinsert into the queue the reading operation.
private boolean isScheduled = false;
// DANIELE: Indicate if a session is already created
private boolean sessionCreated = false;
// DANIELE: Indicate if a stream:stream is arrived to complete a sals authentication
private boolean awaytingSasl = false;
// DANIELE: Indicate if a stream:stream is arrived to complete compression
private boolean awaitingForCompleteCompression = false;
private StreamReader streamReader;
public NonBlockingReadingMode(Socket socket, SocketReader socketReader) {
super(socket, socketReader);
// DANIELE: Initialization
// Setting timeout for reading operations.
try {
socket.setSoTimeout(READ_TIMEOUT);
}
catch (SocketException e) {
// There is an exception...
Log.warn(e);
}
socketChannel = socket.getChannel();
// Initialize XML light weight parser
xmlLightWeightParser = new XMLLightweightParser(socketChannel, CHARSET);
isReading = false;
socketReader.open = true;
streamReader = new StreamReader();
}
/* DANIELE:
* Method that verify if the client has data in the channel and in this case
* call an executor to perform reading operations.
*/
void run() {
try {
// Check if the socket is open
if (socketReader.open) {
// Verify semaphore and if there are data into the socket.
if (!isReading && !isScheduled) {
try {
// Semaphore to avoid concurrent schedule of the same read operation.
isScheduled = true;
// Schedule execution with executor
IOExecutor.execute(streamReader);
}
catch (Exception e) {
if (socketReader.session != null) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") +
". Session: " +
socketReader.session, e);
}
}
}
}
}
catch (Exception e) {
socketReader.shutdown();
// There is an exception...
Log.error(e);
}
if (!socketReader.open) {
socketReader.shutdown();
}
}
protected void tlsNegotiated() throws XmlPullParserException, IOException {
XmlPullParser xpp = socketReader.reader.getXPPParser();
InputStream is = socketReader.connection.getTLSStreamHandler().getInputStream();
xpp.setInput(new InputStreamReader(is, CHARSET));
xmlLightWeightParser.setInput( is, CHARSET );
super.tlsNegotiated();
}
protected boolean compressClient(Element doc) throws IOException, XmlPullParserException {
boolean answer = super.compressClient(doc);
if (answer) {
XmlPullParser xpp = socketReader.reader.getXPPParser();
// Reset the parser since a new stream header has been sent from the client
if (socketReader.connection.getTLSStreamHandler() == null) {
InputStream is;
if (socketChannel != null) {
// DANIELE: Create an inputstream using the utility class ChannelInputStream.
is = new ChannelInputStream(socketChannel);
}
else {
is = socket.getInputStream();
}
is = ServerTrafficCounter.wrapInputStream(is);
ZInputStream in = new ZInputStream(is);
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
}
else {
ZInputStream in = new ZInputStream(
socketReader.connection.getTLSStreamHandler().getInputStream());
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
xmlLightWeightParser.setInput( in, CHARSET );
}
}
return answer;
}
class StreamReader implements Runnable {
/*
* This method is invoked when client send data to the channel.
*
*/
public void run() {
try {
// If no other reading operations are perform
if (!isReading) {
// Change the semaphore status
isReading = true;
// Call the XML light-wieght parser to read data...
xmlLightWeightParser.read();
// Check if the parser has found a complete message...
if (xmlLightWeightParser.areThereMsgs()) {
// Process every message found
String[] msgs = xmlLightWeightParser.getMsgs();
for (int i = 0; i < msgs.length; i++) {
//System.out.println( "Processing " + msgs[ i ] );
readStream(msgs[i]);
}
}
}
}
catch (IOException e) {
if (socketReader.session != null) {
// DANIELE: Remove session from SessionManager. I don't know if
// this is the easy way.
// TODO Review this. Closing the connection should be used???
SessionManager.getInstance().removeSession(
SessionManager.getInstance().getSession(
socketReader.session.getAddress()));
}
try {
xmlLightWeightParser.getChannel().close();
}
catch (IOException e1) {
}
// System.out.println( "Client disconnecting" );
}
catch (Exception e) {
if (socketReader.session != null) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +
socketReader.session, e);
}
e.printStackTrace();
}
finally {
isReading = false;
isScheduled = false;
}
}
/**
* Process a single message
*/
private void readStream(String msg) throws Exception {
if (msg.trim().startsWith(STREAM_START)) {
// Found an stream:stream tag...
if (!sessionCreated) {
sessionCreated = true;
socketReader.reader.getXPPParser().setInput(new StringReader(
msg + ((msg.indexOf("</stream:stream") == -1) ? "</stream:stream>" :
"")));
socketReader.createSession();
}
else if (awaytingSasl) {
awaytingSasl = false;
saslSuccessful();
}
else if (awaitingForCompleteCompression) {
awaitingForCompleteCompression = false;
compressionSuccessful();
}
return;
}
// Create dom in base on the string.
Element doc = socketReader.reader.parseDocument(msg).getRootElement();
if (doc == null) {
// No document found.
return;
}
String tag = doc.getName();
if ("starttls".equals(tag)) {
// Negotiate TLS
if (negotiateTLS()) {
tlsNegotiated();
}
else {
socketReader.open = false;
socketReader.session = null;
}
}
else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
if (authenticateClient(doc)) {
// SASL authentication was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
awaytingSasl = true;
}
else if (socketReader.connection.isClosed()) {
socketReader.open = false;
socketReader.session = null;
}
}
else if ("compress".equals(tag))
{
// Client is trying to initiate compression
if (compressClient(doc)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
awaitingForCompleteCompression = true;
}
}
else {
socketReader.process(doc);
}
}
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* This is a Light-Weight XML Parser.
* It read data from a channel and collect data until data are available in
* the channel.
* When a message is complete you can retrieve messages invoking the method
* getMsgs() and you can invoke the method areThereMsgs() to know if at least
* an message is presents.
*
* @author Daniele Piras
*
*/
class XMLLightweightParser
{
// Chars that rappresent CDATA section start
protected static char[] CDATA_START = {'<','!','[','C','D','A','T','A','['};
// Chars that rappresent CDATA section end
protected static char[] CDATA_END = {']',']','>'};
// Buffer with all data retrieved
protected StringBuilder buffer = new StringBuilder();
// ---- INTERNAL STATUS -------
// Initial status
protected static final int INIT = 0;
// Status used when the first tag name is retrieved
protected static final int HEAD = 2;
// Status used when robot is inside the xml and it looking for the tag conclusion
protected static final int INSIDE = 3;
// Status used when a '<' is found and try to find the conclusion tag.
protected static final int PRETAIL = 4;
// Status used when the ending tag is equal to the head tag
protected static final int TAIL = 5;
// Status used when robot is inside the main tag and found an '/' to check '/>'.
protected static final int VERIFY_CLOSE_TAG = 6;
// Status used when you are inside a parameter
protected static final int INSIDE_PARAM_VALUE = 7;
// Status used when you are inside a cdata section
protected static final int INSIDE_CDATA = 8;
// Current robot status
protected int status = INIT;
// Index to looking for a CDATA section start or end.
protected int cdataOffset = 0;
// Number of chars that machs with the head tag. If the tailCount is equal to
// the head length so a close tag is found.
protected int tailCount = 0;
// Indicate the starting point in the buffer for the next message.
protected int startLastMsg = 0;
// Flag used to discover tag in the form <tag />.
protected boolean insideRootTag = false;
// Object conteining the head tag
protected StringBuilder head = new StringBuilder( 5 );
// List with all finished messages found.
protected List<String> msgs = new ArrayList<String>();
private ReadableByteChannel inputChannel;
byte[] rawByteBuffer;
ByteBuffer byteBuffer;
Charset encoder;
public ReadableByteChannel getChannel()
{
return inputChannel;
}
public XMLLightweightParser( ReadableByteChannel channel, String charset )
{
rawByteBuffer = new byte[1024];
byteBuffer = ByteBuffer.wrap( rawByteBuffer );
setInput( channel, charset );
}
public XMLLightweightParser( InputStream is , String charset)
{
rawByteBuffer = new byte[1024];
byteBuffer = ByteBuffer.wrap( rawByteBuffer );
setInput( is, charset );
}
public void setInput( InputStream is, String charset )
{
inputChannel = Channels.newChannel( is );
encoder = Charset.forName( charset );
invalidateBuffer();
}
public void setInput( ReadableByteChannel channel, String charset )
{
inputChannel = channel;
encoder = Charset.forName( charset );
invalidateBuffer();
}
/*
* true if the parser has found some complete xml message.
*/
public boolean areThereMsgs()
{
return ( msgs.size() > 0 );
}
/*
* @return an array with all messages found
*/
public String[] getMsgs()
{
String[] res = new String[ msgs.size() ];
for ( int i = 0; i < res.length; i++ )
{
res[ i ] = msgs.get( i );
}
msgs.clear();
invalidateBuffer();
return res;
}
/*
* Method use to re-initialize the buffer
*/
protected void invalidateBuffer()
{
if ( buffer.length() > 0 )
{
String str = buffer.substring( startLastMsg ).toString().trim();
buffer.delete( 0, buffer.length() );
buffer.append( str );
buffer.trimToSize();
}
startLastMsg = 0;
}
/*
* Method that add a message to the list and reinit parser.
*/
protected void foundMsg( String msg )
{
// Add message to the complete message list
if ( msg != null )
{
msgs.add( msg.trim() );
}
// Move the position into the buffer
status = INIT;
tailCount = 0;
cdataOffset = 0;
head.setLength( 0 );
insideRootTag = false;
}
/*
* Main reading method
*/
public void read() throws Exception
{
// Reset buffer
byteBuffer.limit( rawByteBuffer.length );
byteBuffer.rewind();
int readByte = inputChannel.read( byteBuffer );
if ( readByte == -1 )
{
// ERROR ON SOCKET!!
throw new IOException( "ReadByte == -1.Socket Close" );
}
else if ( readByte <= 0 )
{
return;
}
else if ( readByte == 1 && rawByteBuffer[ 0 ] == ' ' )
{
// Heart bit! Ignore it.
return;
}
byteBuffer.flip();
byte[] bhs = byteBuffer.array();
byteBuffer.rewind();
CharBuffer charBuffer = encoder.decode( byteBuffer );
charBuffer.flip();
char[] buf = charBuffer.array();
buffer.append( buf );
// Robot.
char ch;
for ( int i = 0; i < readByte; i++ )
{
//ch = rawByteBuffer[ i ];
ch = buf[ i ];
if ( status == TAIL )
{
// Looking for the close tag
if ( ch == head.charAt( tailCount ) )
{
tailCount++;
if ( tailCount == head.length() )
{
// Close tag found!
// Calculate the correct start,end position of the message into the buffer
int end = buffer.length() - readByte + ( i + 1 );
String msg = buffer.substring( startLastMsg, end );
// Add message to the list
foundMsg( msg );
startLastMsg = end;
}
}
else
{
tailCount = 0;
status = INSIDE;
}
}
else if ( status == PRETAIL )
{
if ( ch == CDATA_START[ cdataOffset ] )
{
cdataOffset++;
if ( cdataOffset == CDATA_START.length )
{
status = INSIDE_CDATA;
cdataOffset = 0;
continue;
}
}
else
{
cdataOffset = 0;
status = INSIDE;
}
if ( ch == '/' )
{
status = TAIL;
}
}
else if ( status == VERIFY_CLOSE_TAG )
{
if ( ch == '>' )
{
// Found a tag in the form <tag />
int end = buffer.length() - readByte + ( i + 1 );
String msg = buffer.substring( startLastMsg, end );
// Add message to the list
foundMsg( msg );
startLastMsg = end;
}
else
{
status = INSIDE;
}
}
else if ( status == INSIDE_PARAM_VALUE )
{
if ( ch == '"' )
{
status = INSIDE;
continue;
}
}
else if ( status == INSIDE_CDATA )
{
if ( ch == CDATA_END[ cdataOffset ] )
{
cdataOffset++;
if ( cdataOffset == CDATA_END.length )
{
status = INSIDE;
cdataOffset = 0;
continue;
}
}
else
{
cdataOffset = 0;
}
}
else if ( status == INSIDE )
{
if ( ch == CDATA_START[ cdataOffset ] )
{
cdataOffset++;
if ( cdataOffset == CDATA_START.length )
{
status = INSIDE_CDATA;
cdataOffset = 0;
continue;
}
}
else
{
cdataOffset = 0;
}
if ( ch == '"' )
{
status = INSIDE_PARAM_VALUE;
}
else if ( ch == '>' )
{
if ( insideRootTag && "stream:stream>".equals( head.toString() ) )
{
// Found closing stream:stream
int end = buffer.length() - readByte + ( i + 1 );
String msg = buffer.substring( startLastMsg, end );
foundMsg( msg );
startLastMsg = end;
}
insideRootTag = false;
}
else if ( ch == '<' )
{
status = PRETAIL;
}
else if ( ch == '/' && insideRootTag )
{
status = VERIFY_CLOSE_TAG;
}
}
else if ( status == HEAD )
{
if ( ch == ' ' || ch == '>' )
{
// Append > to head to facility the research of </tag>
head.append( ">" );
status = INSIDE;
insideRootTag = true;
continue;
}
head.append( (char)ch );
}
else if ( status == INIT )
{
if ( ch != ' ' && ch != '\r' && ch != '\n' && ch != '<' )
{
invalidateBuffer();
return;
}
if ( ch == '<' )
{
status = HEAD;
}
}
}
}
}
/**
* $RCSfile: IncomingServerSession.java,v $
* $Revision: 3174 $
* $Date: 2005-12-08 17:41:00 -0300 (Thu, 08 Dec 2005) $
*
* Copyright (C) 2004 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.server;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.jivesoftware.wildfire.net.SSLConfig;
import org.jivesoftware.wildfire.net.SocketConnection;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.Packet;
import org.xmpp.packet.StreamError;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
/**
* Server-to-server communication is done using two TCP connections between the servers. One
* connection is used for sending packets while the other connection is used for receiving packets.
* The <tt>IncomingServerSession</tt> represents the connection to a remote server that will only
* be used for receiving packets.<p>
*
* Currently only the Server Dialback method is being used for authenticating the remote server.
* Once the remote server has been authenticated incoming packets will be processed by this server.
* It is also possible for remote servers to authenticate more domains once the session has been
* established. For optimization reasons the existing connection is used between the servers.
* Therefore, the incoming server session holds the list of authenticated domains which are allowed
* to send packets to this server.<p>
*
* Using the Server Dialback method it is possible that this server may also act as the
* Authoritative Server. This implies that an incoming connection will be established with this
* server for authenticating a domain. This incoming connection will only last for a brief moment
* and after the domain has been authenticated the connection will be closed and no session will
* exist.
*
* @author Gaston Dombiak
*/
public class IncomingServerSession extends Session {
/**
* List of domains, subdomains and virtual hostnames of the remote server that were
* validated with this server. The remote server is allowed to send packets to this
* server from any of the validated domains.
*/
private Collection<String> validatedDomains = new ArrayList<String>();
/**
* Domains or subdomain of this server that was used by the remote server
* when validating the new connection. This information is useful to prevent
* many connections from the same remote server to the same local domain.
*/
private String localDomain = null;
/**
* Creates a new session that will receive packets. The new session will be authenticated
* before being returned. If the authentication process fails then the answer will be
* <tt>null</tt>.<p>
*
* Currently the Server Dialback method is the only way to authenticate a remote server. Since
* Server Dialback requires an Authoritative Server, it is possible for this server to receive
* an incoming connection that will only exist until the requested domain has been validated.
* In this case, this method will return <tt>null</tt> since the connection is closed after
* the domain was validated. See
* {@link ServerDialback#createIncomingSession(org.dom4j.io.XMPPPacketReader)} for more
* information.
*
* @param serverName hostname of this server.
* @param reader reader on the new established connection with the remote server.
* @param connection the new established connection with the remote server.
* @return a new session that will receive packets or null if a problem occured while
* authenticating the remote server or when acting as the Authoritative Server during
* a Server Dialback authentication process.
* @throws XmlPullParserException if an error occurs while parsing the XML.
* @throws IOException if an input/output error occurs while using the connection.
*/
public static Session createSession(String serverName, XMPPPacketReader reader,
SocketConnection connection) throws XmlPullParserException, IOException {
XmlPullParser xpp = reader.getXPPParser();
if (xpp.getNamespace("db") != null) {
// Server is trying to establish connection and authenticate using server dialback
if (ServerDialback.isEnabled()) {
ServerDialback method = new ServerDialback(connection, serverName);
return method.createIncomingSession(reader);
}
Log.debug("Server dialback is disabled. Rejecting connection: " + connection);
}
String version = xpp.getAttributeValue("", "version");
int[] serverVersion = version != null ? decodeVersion(version) : new int[] {0,0};
if (serverVersion[0] >= 1) {
// Remote server is XMPP 1.0 compliant so offer TLS and SASL to establish the connection
if (JiveGlobals.getBooleanProperty("xmpp.server.tls.enabled", true)) {
try {
return createIncomingSession(connection, serverName);
}
catch (Exception e) {
Log.error("Error establishing connection from remote server", e);
}
}
else {
connection.deliverRawText(
new StreamError(StreamError.Condition.invalid_namespace).toXML());
Log.debug("Server TLS is disabled. Rejecting connection: " + connection);
}
}
// Close the connection since remote server is not XMPP 1.0 compliant and is not
// using server dialback to establish and authenticate the connection
connection.close();
return null;
}
/**
* Returns a new incoming server session pending to be authenticated. The remote server
* will be notified that TLS and SASL are available. The remote server will be able to
* send packets to this server only after SASL authentication has been finished.
*
* @param connection the new established connection with the remote server.
* @param serverName hostname of this server.
* @return a new incoming server session pending to be authenticated.
* @throws UnauthorizedException if this server is being shutdown.
*/
private static Session createIncomingSession(SocketConnection connection, String serverName)
throws UnauthorizedException {
// Get the stream ID for the new session
StreamID streamID = SessionManager.getInstance().nextStreamID();
// Create a server Session for the remote server
IncomingServerSession session = SessionManager.getInstance()
.createIncomingServerSession(connection, streamID);
// Send the stream header
StringBuilder openingStream = new StringBuilder();
openingStream.append("<stream:stream");
openingStream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
openingStream.append(" xmlns=\"jabber:server\"");
openingStream.append(" from=\"").append(serverName).append("\"");
openingStream.append(" id=\"").append(streamID).append("\"");
openingStream.append(" version=\"1.0\">");
connection.deliverRawText(openingStream.toString());
// Indicate the TLS policy to use for this connection
Connection.TLSPolicy tlsPolicy =
ServerDialback.isEnabled() ? Connection.TLSPolicy.optional :
Connection.TLSPolicy.required;
boolean hasCertificates = false;
try {
hasCertificates = SSLConfig.getKeyStore().size() > 0;
}
catch (Exception e) {
Log.error(e);
}
if (Connection.TLSPolicy.required == tlsPolicy && !hasCertificates) {
Log.error("Server session rejected. TLS is required but no certificates " +
"were created.");
return null;
}
connection.setTlsPolicy(hasCertificates ? tlsPolicy : Connection.TLSPolicy.disabled);
// Indicate the compression policy to use for this connection
String policyName = JiveGlobals.getProperty("xmpp.server.compression.policy",
Connection.CompressionPolicy.disabled.toString());
Connection.CompressionPolicy compressionPolicy =
Connection.CompressionPolicy.valueOf(policyName);
connection.setCompressionPolicy(compressionPolicy);
StringBuilder sb = new StringBuilder();
sb.append("<stream:features>");
sb.append("<starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">");
if (!ServerDialback.isEnabled()) {
// Server dialback is disabled so TLS is required
sb.append("<required/>");
}
sb.append("</starttls>");
// Include available SASL Mechanisms
sb.append(SASLAuthentication.getSASLMechanisms(session));
sb.append("</stream:features>");
connection.deliverRawText(sb.toString());
// Set the domain or subdomain of the local server targeted by the remote server
session.setLocalDomain(serverName);
return session;
}
public IncomingServerSession(String serverName, Connection connection, StreamID streamID) {
super(serverName, connection, streamID);
}
public void process(Packet packet) throws UnauthorizedException, PacketException {
//TODO Should never be called? Should be passed to the outgoing connection?
}
/**
* Returns true if the request of a new domain was valid. Sessions may receive subsequent
* domain validation request. If the validation of the new domain fails then the session and
* the underlying TCP connection will be closed.<p>
*
* For optimization reasons, the same session may be servicing several domains of a
* remote server.
*
* @param dbResult the DOM stanza requesting the domain validation.
* @return true if the requested domain was valid.
*/
public boolean validateSubsequentDomain(Element dbResult) {
ServerDialback method = new ServerDialback(getConnection(), getServerName());
if (method.validateRemoteDomain(dbResult, getStreamID())) {
// Add the validated domain as a valid domain
addValidatedDomain(dbResult.attributeValue("from"));
return true;
}
return false;
}
/**
* Returns true if the specified domain has been validated for this session. The remote
* server should send a "db:result" packet for registering new subdomains or even
* virtual hosts.<p>
*
* In the spirit of being flexible we allow remote servers to not register subdomains
* and even so consider subdomains that include the server domain in their domain part
* as valid domains.
*
* @param domain the domain to validate.
* @return true if the specified domain has been validated for this session.
*/
public boolean isValidDomain(String domain) {
// Check if the specified domain is contained in any of the validated domains
for (String validatedDomain : getValidatedDomains()) {
if (domain.contains(validatedDomain)) {
return true;
}
}
return false;
}
/**
* Returns a collection with all the domains, subdomains and virtual hosts that where
* validated. The remote server is allowed to send packets from any of these domains,
* subdomains and virtual hosts.
*
* @return domains, subdomains and virtual hosts that where validated.
*/
public Collection<String> getValidatedDomains() {
return Collections.unmodifiableCollection(validatedDomains);
}
/**
* Adds a new validated domain, subdomain or virtual host to the list of
* validated domains for the remote server.
*
* @param domain the new validated domain, subdomain or virtual host to add.
*/
public void addValidatedDomain(String domain) {
if (validatedDomains.add(domain)) {
// Register the new validated domain for this server session in SessionManager
SessionManager.getInstance().registerIncomingServerSession(domain, this);
}
}
/**
* Removes the previously validated domain from the list of validated domains. The remote
* server will no longer be able to send packets from the removed domain, subdomain or
* virtual host.
*
* @param domain the domain, subdomain or virtual host to remove from the list of
* validated domains.
*/
public void removeValidatedDomain(String domain) {
validatedDomains.remove(domain);
// Unregister the validated domain for this server session in SessionManager
SessionManager.getInstance().unregisterIncomingServerSessions(domain);
}
/**
* Returns the domain or subdomain of the local server used by the remote server
* when validating the session. This information is only used to prevent many
* connections from the same remote server to the same domain or subdomain of
* the local server.
*
* @return the domain or subdomain of the local server used by the remote server
* when validating the session.
*/
public String getLocalDomain() {
return localDomain;
}
/**
* Sets the domain or subdomain of the local server used by the remote server when asking
* to validate the session. This information is only used to prevent many connections from
* the same remote server to the same domain or subdomain of the local server.
*
* @param domain the domain or subdomain of the local server used when validating the
* session.
*/
public void setLocalDomain(String domain) {
localDomain = domain;
}
/**
* Verifies the received key sent by the remote server. This server is trying to generate
* an outgoing connection to the remote server and the remote server is reusing an incoming
* connection for validating the key.
*
* @param doc the received Element that contains the key to verify.
*/
public void verifyReceivedKey(Element doc) {
ServerDialback.verifyReceivedKey(doc, getConnection());
}
public String getAvailableStreamFeatures() {
// Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled &&
!conn.isCompressed()) {
return "<compression xmlns=\"http://jabber.org/features/compress\"><method>zlib</method></compression>";
}
// Nothing special to add
return null;
}
}
/**
* $RCSfile: OutgoingServerSession.java,v $
* $Revision: 3188 $
* $Date: 2005-12-12 00:28:19 -0300 (Mon, 12 Dec 2005) $
*
* Copyright (C) 2004 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.server;
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.net.DNSUtil;
import org.jivesoftware.wildfire.net.MXParser;
import org.jivesoftware.wildfire.net.SocketConnection;
import org.jivesoftware.wildfire.spi.BasicStreamIDFactory;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.*;
import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.regex.Pattern;
/**
* Server-to-server communication is done using two TCP connections between the servers. One
* connection is used for sending packets while the other connection is used for receiving packets.
* The <tt>OutgoingServerSession</tt> represents the connection to a remote server that will only
* be used for sending packets.<p>
*
* Currently only the Server Dialback method is being used for authenticating with the remote
* server. Use {@link #authenticateDomain(String, String)} to create a new connection to a remote
* server that will be used for sending packets to the remote server from the specified domain.
* Only the authenticated domains with the remote server will be able to effectively send packets
* to the remote server. The remote server will reject and close the connection if a
* non-authenticated domain tries to send a packet through this connection.<p>
*
* Once the connection has been established with the remote server and at least a domain has been
* authenticated then a new route will be added to the routing table for this connection. For
* optimization reasons the same outgoing connection will be used even if the remote server has
* several hostnames. However, different routes will be created in the routing table for each
* hostname of the remote server.
*
* @author Gaston Dombiak
*/
public class OutgoingServerSession extends Session {
/**
* Regular expression to ensure that the hostname contains letters.
*/
private static Pattern pattern = Pattern.compile("[a-zA-Z]");
private Collection<String> authenticatedDomains = new ArrayList<String>();
private Collection<String> hostnames = new ArrayList<String>();
private OutgoingServerSocketReader socketReader;
/**
* Flag that indicates if the session was created usign server-dialback.
*/
private boolean usingServerDialback = true;
/**
* Creates a new outgoing connection to the specified hostname if no one exists. The port of
* the remote server could be configured by setting the <b>xmpp.server.socket.remotePort</b>
* property or otherwise the standard port 5269 will be used. Either a new connection was
* created or already existed the specified hostname will be authenticated with the remote
* server. Once authenticated the remote server will start accepting packets from the specified
* domain.<p>
*
* The Server Dialback method is currently the only implemented method for server-to-server
* authentication. This implies that the remote server will ask the Authoritative Server
* to verify the domain to authenticate. Most probably this server will act as the
* Authoritative Server. See {@link IncomingServerSession} for more information.
*
* @param domain the local domain to authenticate with the remote server.
* @param hostname the hostname of the remote server.
* @return True if the domain was authenticated by the remote server.
*/
static boolean authenticateDomain(String domain, String hostname) {
if (hostname == null || hostname.length() == 0 || hostname.trim().indexOf(' ') > -1) {
// Do nothing if the target hostname is empty, null or contains whitespaces
return false;
}
try {
// Check if the remote hostname is in the blacklist
if (!RemoteServerManager.canAccess(hostname)) {
return false;
}
OutgoingServerSession session;
// Check if a session, that is using server dialback, already exists to the desired
// hostname (i.e. remote server). If no one exists then create a new session. The same
// session will be used for the same hostname for all the domains to authenticate
SessionManager sessionManager = SessionManager.getInstance();
session = sessionManager.getOutgoingServerSession(hostname);
if (session == null) {
// Try locating if the remote server has previously authenticated with this server
for (IncomingServerSession incomingSession : sessionManager
.getIncomingServerSessions(hostname)) {
for (String otherHostname : incomingSession.getValidatedDomains()) {
session = sessionManager.getOutgoingServerSession(otherHostname);
if (session != null) {
if (session.usingServerDialback) {
// A session to the same remote server but with different hostname
// was found. Use this session and add the new hostname to the
// session
session.addHostname(hostname);
break;
} else {
session = null;
}
}
}
}
}
if (session == null) {
int port = RemoteServerManager.getPortForServer(hostname);
// No session was found to the remote server so make sure that only one is created
session = sessionManager.getOutgoingServerSession(hostname);
if (session == null) {
session = createOutgoingSession(domain, hostname, port);
if (session != null) {
// Add the new hostname to the list of names that the server may have
session.addHostname(hostname);
// Add the validated domain as an authenticated domain
session.addAuthenticatedDomain(domain);
// Notify the SessionManager that a new session has been created
sessionManager.outgoingServerSessionCreated(session);
return true;
} else {
// Ensure that the hostname is not an IP address (i.e. contains chars)
if (!pattern.matcher(hostname).find()) {
return false;
}
// Check if hostname is a subdomain of an existing outgoing session
for (String otherHost : sessionManager.getOutgoingServers()) {
if (hostname.contains(otherHost)) {
session = sessionManager.getOutgoingServerSession(otherHost);
// Add the new hostname to the found session
session.addHostname(hostname);
return true;
}
}
// Try to establish a connection to candidate hostnames. Iterate on the
// substring after the . and try to establish a connection. If a
// connection is established then the same session will be used for
// sending packets to the "candidate hostname" as well as for the
// requested hostname (i.e. the subdomain of the candidate hostname)
// This trick is useful when remote servers haven't registered in their
// DNSs an entry for their subdomains
int index = hostname.indexOf('.');
while (index > -1 && index < hostname.length()) {
String newHostname = hostname.substring(index + 1);
String serverName = XMPPServer.getInstance().getServerInfo()
.getName();
if ("com".equals(newHostname) || "net".equals(newHostname) ||
"org".equals(newHostname) ||
"gov".equals(newHostname) ||
"edu".equals(newHostname) ||
serverName.equals(newHostname)) {
return false;
}
session = createOutgoingSession(domain, newHostname, port);
if (session != null) {
// Add the new hostname to the list of names that the server may have
session.addHostname(hostname);
// Add the validated domain as an authenticated domain
session.addAuthenticatedDomain(domain);
// Notify the SessionManager that a new session has been created
sessionManager.outgoingServerSessionCreated(session);
// Add the new hostname to the found session
session.addHostname(newHostname);
return true;
} else {
index = hostname.indexOf('.', index + 1);
}
}
return false;
}
}
}
// A session already exists. The session was established using server dialback so
// it is possible to do piggybacking to authenticate more domains
if (session.getAuthenticatedDomains().contains(domain)) {
// Do nothing since the domain has already been authenticated
return true;
}
// A session already exists so authenticate the domain using that session
return session.authenticateSubdomain(domain, hostname);
}
catch (Exception e) {
Log.error("Error authenticating domain with remote server: " + hostname, e);
}
return false;
}
/**
* Establishes a new outgoing session to a remote server. If the remote server supports TLS
* and SASL then the new outgoing connection will be secured with TLS and authenticated
* using SASL. However, if TLS or SASL is not supported by the remote server or if an
* error occured while securing or authenticating the connection using SASL then server
* dialback method will be used.
*
* @param domain the local domain to authenticate with the remote server.
* @param hostname the hostname of the remote server.
* @param port default port to use to establish the connection.
* @return new outgoing session to a remote server.
*/
private static OutgoingServerSession createOutgoingSession(String domain, String hostname,
int port) {
boolean useTLS = JiveGlobals.getBooleanProperty("xmpp.server.tls.enabled", true);
RemoteServerConfiguration configuration = RemoteServerManager.getConfiguration(hostname);
if (configuration != null) {
// TODO Use the specific TLS configuration for this remote server
//useTLS = configuration.isTLSEnabled();
}
if (useTLS) {
// Connect to remote server using TLS + SASL
SocketConnection connection = null;
String realHostname = null;
int realPort = port;
Socket socket = new Socket();
try {
// Get the real hostname to connect to using DNS lookup of the specified hostname
DNSUtil.HostAddress address = DNSUtil.resolveXMPPServerDomain(hostname, port);
realHostname = address.getHost();
realPort = address.getPort();
Log.debug("OS - Trying to connect to " + hostname + ":" + port +
"(DNS lookup: " + realHostname + ":" + realPort + ")");
// Establish a TCP connection to the Receiving Server
socket.connect(new InetSocketAddress(realHostname, realPort),
RemoteServerManager.getSocketTimeout());
Log.debug("OS - Plain connection to " + hostname + ":" + port + " successful");
}
catch (Exception e) {
Log.error("Error trying to connect to remote server: " + hostname +
"(DNS lookup: " + realHostname + ":" + realPort + ")", e);
return null;
}
try {
connection =
new SocketConnection(XMPPServer.getInstance().getPacketDeliverer(), socket,
false);
// Send the stream header
StringBuilder openingStream = new StringBuilder();
openingStream.append("<stream:stream");
openingStream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
openingStream.append(" xmlns=\"jabber:server\"");
openingStream.append(" to=\"").append(hostname).append("\"");
openingStream.append(" version=\"1.0\">");
connection.deliverRawText(openingStream.toString());
// Set a read timeout (of 5 seconds) so we don't keep waiting forever
int soTimeout = socket.getSoTimeout();
socket.setSoTimeout(5000);
XMPPPacketReader reader = new XMPPPacketReader();
reader.getXPPParser().setInput(new InputStreamReader(socket.getInputStream(),
CHARSET));
// Get the answer from the Receiving Server
XmlPullParser xpp = reader.getXPPParser();
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
String serverVersion = xpp.getAttributeValue("", "version");
// Check if the remote server is XMPP 1.0 compliant
if (serverVersion != null && decodeVersion(serverVersion)[0] >= 1) {
// Restore default timeout
socket.setSoTimeout(soTimeout);
// Get the stream features
Element features = reader.parseDocument().getRootElement();
// Check if TLS is enabled
if (features != null && features.element("starttls") != null) {
// Secure the connection with TLS and authenticate using SASL
OutgoingServerSession answer;
answer = secureAndAuthenticate(hostname, connection, reader, openingStream,
domain);
if (answer != null) {
// Everything went fine so return the secured and
// authenticated connection
return answer;
}
}
else {
Log.debug("OS - Error, <starttls> was not received");
}
}
// Something went wrong so close the connection and try server dialback over
// a plain connection
if (connection != null) {
connection.close();
}
}
catch (SSLHandshakeException e) {
Log.debug("Handshake error while creating secured outgoing session to remote " +
"server: " + hostname + "(DNS lookup: " + realHostname + ":" + realPort +
")", e);
// Close the connection
if (connection != null) {
connection.close();
}
}
catch (XmlPullParserException e) {
Log.warn("Error creating secured outgoing session to remote server: " + hostname +
"(DNS lookup: " + realHostname + ":" + realPort + ")", e);
// Close the connection
if (connection != null) {
connection.close();
}
}
catch (Exception e) {
Log.error("Error creating secured outgoing session to remote server: " + hostname +
"(DNS lookup: " + realHostname + ":" + realPort + ")", e);
// Close the connection
if (connection != null) {
connection.close();
}
}
}
if (ServerDialback.isEnabled()) {
Log.debug("OS - Going to try connecting using server dialback with: " + hostname);
// Use server dialback over a plain connection
return new ServerDialback().createOutgoingSession(domain, hostname, port);
}
return null;
}
private static OutgoingServerSession secureAndAuthenticate(String hostname,
SocketConnection connection, XMPPPacketReader reader, StringBuilder openingStream,
String domain) throws Exception {
Element features;
Log.debug("OS - Indicating we want TLS to " + hostname);
connection.deliverRawText("<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>");
MXParser xpp = reader.getXPPParser();
// Wait for the <proceed> response
Element proceed = reader.parseDocument().getRootElement();
if (proceed != null && proceed.getName().equals("proceed")) {
Log.debug("OS - Negotiating TLS with " + hostname);
connection.startTLS(true, hostname);
Log.debug("OS - TLS negotiation with " + hostname + " was successful");
// TLS negotiation was successful so initiate a new stream
connection.deliverRawText(openingStream.toString());
// Reset the parser to use the new secured reader
xpp.setInput(new InputStreamReader(connection.getTLSStreamHandler().getInputStream(),
CHARSET));
// Skip new stream element
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
// Get new stream features
features = reader.parseDocument().getRootElement();
if (features != null && features.element("mechanisms") != null) {
// Check if we can use stream compression
String policyName = JiveGlobals.getProperty("xmpp.server.compression.policy",
Connection.CompressionPolicy.disabled.toString());
Connection.CompressionPolicy compressionPolicy =
Connection.CompressionPolicy.valueOf(policyName);
if (Connection.CompressionPolicy.optional == compressionPolicy) {
// Verify if the remote server supports stream compression
Element compression = features.element("compression");
if (compression != null) {
boolean zlibSupported = false;
Iterator it = compression.elementIterator("method");
while (it.hasNext()) {
Element method = (Element) it.next();
if ("zlib".equals(method.getTextTrim())) {
zlibSupported = true;
}
}
if (zlibSupported) {
// Request Stream Compression
connection.deliverRawText("<compress xmlns='http://jabber.org/protocol/compress'><method>zlib</method></compress>");
// Check if we are good to start compression
Element answer = reader.parseDocument().getRootElement();
if ("compressed".equals(answer.getName())) {
// Server confirmed that we can use zlib compression
connection.startCompression();
Log.debug("OS - Stream compression was successful with " + hostname);
// Stream compression was successful so initiate a new stream
connection.deliverRawText(openingStream.toString());
// Reset the parser to use stream compression over TLS
ZInputStream in = new ZInputStream(
connection.getTLSStreamHandler().getInputStream());
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
// Skip the opening stream sent by the server
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;)
{
eventType = xpp.next();
}
// Get new stream features
features = reader.parseDocument().getRootElement();
if (features == null || features.element("mechanisms") == null) {
Log.debug("OS - Error, EXTERNAL SASL was not offered by " + hostname);
return null;
}
}
else {
Log.debug("OS - Stream compression was rejected by " + hostname);
}
}
else {
Log.debug(
"OS - Stream compression found but zlib method is not supported by" +
hostname);
}
}
else {
Log.debug("OS - Stream compression not supoprted by " + hostname);
}
}
Iterator it = features.element("mechanisms").elementIterator();
while (it.hasNext()) {
Element mechanism = (Element) it.next();
if ("EXTERNAL".equals(mechanism.getTextTrim())) {
Log.debug("OS - Starting EXTERNAL SASL with " + hostname);
if (doExternalAuthentication(domain, connection, reader)) {
Log.debug("OS - EXTERNAL SASL with " + hostname + " was successful");
// SASL was successful so initiate a new stream
connection.deliverRawText(openingStream.toString());
// Reset the parser
xpp.resetInput();
// Skip the opening stream sent by the server
for (int eventType = xpp.getEventType();
eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
// SASL authentication was successful so create new
// OutgoingServerSession
String id = xpp.getAttributeValue("", "id");
StreamID streamID = new BasicStreamIDFactory().createStreamID(id);
OutgoingServerSession session = new OutgoingServerSession(domain,
connection, new OutgoingServerSocketReader(reader), streamID);
connection.init(session);
// Set the hostname as the address of the session
session.setAddress(new JID(null, hostname, null));
// Set that the session was created using TLS+SASL (no server dialback)
session.usingServerDialback = false;
return session;
}
else {
Log.debug("OS - Error, EXTERNAL SASL authentication with " + hostname +
" failed");
return null;
}
}
}
Log.debug("OS - Error, EXTERNAL SASL was not offered by " + hostname);
}
else {
Log.debug("OS - Error, no SASL mechanisms were offered by " + hostname);
}
}
else {
Log.debug("OS - Error, <proceed> was not received");
}
return null;
}
private static boolean doExternalAuthentication(String domain, SocketConnection connection,
XMPPPacketReader reader) throws DocumentException, IOException, XmlPullParserException {
StringBuilder sb = new StringBuilder();
sb.append("<auth xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\" mechanism=\"EXTERNAL\">");
sb.append(StringUtils.encodeBase64(domain));
sb.append("</auth>");
connection.deliverRawText(sb.toString());
Element response = reader.parseDocument().getRootElement();
if (response != null && "success".equals(response.getName())) {
return true;
}
return false;
}
OutgoingServerSession(String serverName, Connection connection,
OutgoingServerSocketReader socketReader, StreamID streamID) {
super(serverName, connection, streamID);
this.socketReader = socketReader;
socketReader.setSession(this);
}
public void process(Packet packet) throws UnauthorizedException, PacketException {
try {
String senderDomain = packet.getFrom().getDomain();
if (!getAuthenticatedDomains().contains(senderDomain)) {
synchronized (senderDomain.intern()) {
if (!getAuthenticatedDomains().contains(senderDomain) &&
!authenticateSubdomain(senderDomain, packet.getTo().getDomain())) {
// Return error since sender domain was not validated by remote server
returnErrorToSender(packet);
return;
}
}
}
if (conn != null && !conn.isClosed()) {
conn.deliver(packet);
}
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
/**
* Authenticates a subdomain of this server with the specified remote server over an exsiting
* outgoing connection. If the existing session was using server dialback then a new db:result
* is going to be sent to the remote server. But if the existing session was TLS+SASL based
* then just assume that the subdomain was authenticated by the remote server.
*
* @param domain the local subdomain to authenticate with the remote server.
* @param hostname the hostname of the remote server.
* @return True if the subdomain was authenticated by the remote server.
*/
private boolean authenticateSubdomain(String domain, String hostname) {
if (!usingServerDialback) {
// Using SASL so just assume that the domain was validated
// (note: this may not be correct)
addAuthenticatedDomain(domain);
return true;
}
ServerDialback method = new ServerDialback(getConnection(), domain);
if (method.authenticateDomain(socketReader, domain, hostname, getStreamID().getID())) {
// Add the validated domain as an authenticated domain
addAuthenticatedDomain(domain);
return true;
}
return false;
}
private void returnErrorToSender(Packet packet) {
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
try {
if (packet instanceof IQ) {
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setError(PacketError.Condition.remote_server_not_found);
ChannelHandler route = routingTable.getRoute(reply.getTo());
if (route != null) {
route.process(reply);
}
}
else if (packet instanceof Presence) {
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.remote_server_not_found);
ChannelHandler route = routingTable.getRoute(reply.getTo());
if (route != null) {
route.process(reply);
}
}
else if (packet instanceof Message) {
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(((Message)packet).getType());
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
ChannelHandler route = routingTable.getRoute(reply.getTo());
if (route != null) {
route.process(reply);
}
}
}
catch (UnauthorizedException e) {
// Do nothing
}
catch (Exception e) {
Log.warn("Error returning error to sender. Original packet: " + packet, e);
}
}
/**
* Returns a collection with all the domains, subdomains and virtual hosts that where
* authenticated. The remote server will accept packets sent from any of these domains,
* subdomains and virtual hosts.
*
* @return domains, subdomains and virtual hosts that where validated.
*/
public Collection<String> getAuthenticatedDomains() {
return Collections.unmodifiableCollection(authenticatedDomains);
}
/**
* Adds a new authenticated domain, subdomain or virtual host to the list of
* authenticated domains for the remote server. The remote server will accept packets
* sent from this new authenticated domain.
*
* @param domain the new authenticated domain, subdomain or virtual host to add.
*/
public void addAuthenticatedDomain(String domain) {
authenticatedDomains.add(domain);
}
/**
* Removes an authenticated domain from the list of authenticated domains. The remote
* server will no longer be able to accept packets sent from the removed domain, subdomain or
* virtual host.
*
* @param domain the domain, subdomain or virtual host to remove from the list of
* authenticated domains.
*/
public void removeAuthenticatedDomain(String domain) {
authenticatedDomains.remove(domain);
}
/**
* Returns the list of hostnames related to the remote server. This tracking is useful for
* reusing the same session for the same remote server even if the server has many names.
*
* @return the list of hostnames related to the remote server.
*/
public Collection<String> getHostnames() {
return Collections.unmodifiableCollection(hostnames);
}
/**
* Adds a new hostname to the list of known hostnames of the remote server. This tracking is
* useful for reusing the same session for the same remote server even if the server has
* many names.
*
* @param hostname the new known name of the remote server
*/
private void addHostname(String hostname) {
if (hostnames.add(hostname)) {
// Register the outgoing session in the SessionManager. If the session
// was already registered nothing happens
sessionManager.registerOutgoingServerSession(hostname, this);
// Add a new route for this new session
XMPPServer.getInstance().getRoutingTable().addRoute(new JID(hostname), this);
}
}
public String getAvailableStreamFeatures() {
// Nothing special to add
return null;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment