Commit 56562bb7 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Initial version. JM-666

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3872 b35dd754-fafc-0310-a699-88a17e54d16e
parent 101cfab6
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.multiplex;
import org.dom4j.Element;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.net.VirtualConnection;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Packet;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* Represents a connection of a Client Session that was established to a Connection Manager.
* Connection Managers have their own physical connections to the server that are multiplexed
* among connected clients. Each created {@link org.jivesoftware.wildfire.ClientSession} will
* use an instance of this class as its connection.
*
* @author Gaston Dombiak
*/
public class ClientSessionConnection extends VirtualConnection {
private String connectionManagerName;
private String serverName;
private ConnectionMultiplexerManager multiplexerManager;
public ClientSessionConnection(String connectionManagerName) {
this.connectionManagerName = connectionManagerName;
multiplexerManager = ConnectionMultiplexerManager.getInstance();
serverName = XMPPServer.getInstance().getServerInfo().getName();
}
/**
* Delivers the packet to the Connection Manager that in turn will forward it to the
* target user. Connection Managers may have one or many connections to the server so
* just get any connection to the Connection Manager (uses a random) and use it.<p>
*
* If the packet to send does not have a TO attribute then wrap the packet with a
* special IQ packet. The wrapper IQ packet will be sent to the Connection Manager
* and the stream ID of this Client Session will be used for identifying that the wrapped
* packet must be sent to the connected user. Since some packets can be exchanged before
* the user has a binded JID we need to use the stream ID as the unique identifier.
*
* @param packet the packet to send to the user.
*/
public void deliver(Packet packet) {
ConnectionMultiplexerSession multiplexerSession =
multiplexerManager.getMultiplexerSession(connectionManagerName);
if (multiplexerSession != null) {
// If TO is null then wrap packet so that the connection manager can
// figure out the target session
if (packet.getTo() == null) {
IQ wrapper = new IQ(IQ.Type.set);
wrapper.setFrom(serverName);
wrapper.setTo(connectionManagerName);
Element child = wrapper.setChildElement("session",
"http://jabber.org/protocol/connectionmanager");
child.addAttribute("id", session.getStreamID().getID());
Element send = child.addElement("send");
send.add(packet.getElement().createCopy());
// Deliver wrapper
multiplexerSession.deliver(wrapper);
}
else {
// Deliver original packet
multiplexerSession.deliver(packet);
}
session.incrementServerPacketCount();
}
}
/**
* Delivers the stanza to the Connection Manager that in turn will forward it to the
* target user. Connection Managers may have one or many connections to the server so
* just get any connection to the Connection Manager (uses a random) and use it.<p>
*
* The stanza to send wrapped with a special IQ packet. The wrapper IQ packet will be
* sent to the Connection Manager and the stream ID of this Client Session will be used
* for identifying that the wrapped stanza must be sent to the connected user.
*
* @param text the stanza to send to the user.
*/
public void deliverRawText(String text) {
ConnectionMultiplexerSession multiplexerSession =
multiplexerManager.getMultiplexerSession(connectionManagerName);
if (multiplexerSession != null) {
// Wrap packet so that the connection manager can figure out the target session
StringBuilder sb = new StringBuilder(200 + text.length());
sb.append("<iq type=\"set\" from=\"").append(serverName);
sb.append("\" to=\"").append(connectionManagerName);
sb.append("\" id=\"").append(StringUtils.randomString(10));
sb.append("\"><session xmlns=\"http://jabber.org/protocol/connectionmanager\" id=\"");
sb.append(session.getStreamID().getID()).append("\"><send>");
sb.append(text);
sb.append("</send></session></iq>");
// Deliver the wrapped stanza
multiplexerSession.getConnection().deliverRawText(sb.toString());
}
}
public InetAddress getInetAddress() throws UnknownHostException {
//TODO Future version may return actual IP client address. We would need to pass this info
// Return IP address of the connection manager that the client used to log in
ConnectionMultiplexerSession multiplexerSession =
multiplexerManager.getMultiplexerSession(connectionManagerName);
if (multiplexerSession != null) {
return multiplexerSession.getConnection().getInetAddress();
}
return null;
}
/**
* If the Connection Manager or the Client requested to close the connection then just do
* nothing. But if the server originated the request to close the connection then we need
* to send to the Connection Manager a packet letting him know that the Client Session needs
* to be terminated.
*/
public void closeVirtualConnection() {
// Figure out who requested the connection to be closed
String streamID = session.getStreamID().getID();
if (multiplexerManager.getClientSession(connectionManagerName, streamID) == null) {
// Client or Connection manager requested to close the session
// Do nothing since it has already been removed and closed
}
else {
// Server requested to close the client session so let the connection manager
// know that he has to finish the client session
IQ closeRequest = new IQ(IQ.Type.set);
closeRequest.setFrom(serverName);
closeRequest.setTo(connectionManagerName);
Element child = closeRequest.setChildElement("session",
"http://jabber.org/protocol/connectionmanager");
child.addAttribute("id", streamID);
child.addElement("close");
deliver(closeRequest);
}
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.multiplex;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.event.SessionEventDispatcher;
import org.jivesoftware.wildfire.event.SessionEventListener;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
/**
* A ConnectionMultiplexerManager is responsible for keeping track of the connected
* Connection Managers and the sessions that were established with the Connection
* Managers. Moreover, a ConnectionMultiplexerManager is able to create, get and close
* client sessions based on Connection requests.
*
* @author Gaston Dombiak
*/
public class ConnectionMultiplexerManager implements SessionEventListener {
private static final ConnectionMultiplexerManager instance = new ConnectionMultiplexerManager();
/**
* Pseudo-random number generator object for use with getMultiplexerSession(String).
*/
private static Random randGen = new Random();
static {
// Add the unique instance of this class as a session listener. We need to react
// when sessions are closed so we can clean up the registry of client sessions.
SessionEventDispatcher.addListener(instance);
}
/**
* Map that keeps track of connection managers and hosted connections.
* Key: stream ID; Value: Domain of connection manager hosting connection
*/
private Map<String, String> streamIDs = new ConcurrentHashMap<String, String>();
/**
* Map that keeps track of connection managers and hosted sessions.
* Key: Domain of connection manager; Value: Map with Key: stream ID; Value: Client session
*/
private Map<String, Map<String, ClientSession>> sessionsByManager =
new ConcurrentHashMap<String, Map<String, ClientSession>>();
private SessionManager sessionManager;
/**
* Returns the unique instance of this class.
*
* @return the unique instance of this class.
*/
public static ConnectionMultiplexerManager getInstance() {
return instance;
}
private ConnectionMultiplexerManager() {
sessionManager = XMPPServer.getInstance().getSessionManager();
}
/**
* Creates a new client session that was established to the specified connection manager.
* The new session will not be findable through its stream ID.
*
* @param connectionManagerDomain the connection manager that is handling the connection
* of the session.
* @param streamID the stream ID created by the connection manager for the new session.
*/
public void createClientSession(String connectionManagerDomain, String streamID) {
try {
Connection connection = new ClientSessionConnection(connectionManagerDomain);
ClientSession session = SessionManager.getInstance()
.createClientSession(connection, new BasicStreamID(streamID));
// Register that this streamID belongs to the specified connection manager
streamIDs.put(streamID, connectionManagerDomain);
// Register which sessions are being hosted by the speicifed connection manager
Map<String, ClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions == null) {
synchronized (connectionManagerDomain.intern()) {
sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions == null) {
sessions = new ConcurrentHashMap<String, ClientSession>();
sessionsByManager.put(connectionManagerDomain, sessions);
}
}
}
sessions.put(streamID, session);
}
catch (UnauthorizedException e) {
Log.error("Error creating virtual client session", e);
}
}
/**
* Closes an existing client session that was established through a connection manager.
*
* @param connectionManagerDomain the connection manager that is handling the connection
* of the session.
* @param streamID the stream ID created by the connection manager for the session.
*/
public void closeClientSession(String connectionManagerDomain, String streamID) {
Map<String, ClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions != null) {
Session session = sessions.remove(streamID);
if (session != null) {
// Close the session
session.getConnection().close();
}
}
}
/**
* Close client sessions that were established to the specified connection manager. This
* action is usually required when the connection manager was stopped or suddenly went
* down.
*
* @param connectionManagerDomain the connection manager that is no longer available.
*/
public void closeClientSessions(String connectionManagerDomain) {
// Remove the connection manager and the hosted sessions
Map<String, ClientSession> sessions = sessionsByManager.remove(connectionManagerDomain);
if (sessions != null) {
for (String streamID : sessions.keySet()) {
// Remove inverse track of connection manager hosting streamIDs
streamIDs.remove(streamID);
// Close the session
sessions.get(streamID).getConnection().close();
}
}
}
/**
* Returns the ClientSession with the specified stream ID that is being hosted by the
* specified connection manager.
*
* @param connectionManagerDomain the connection manager that is handling the connection
* of the session.
* @param streamID the stream ID created by the connection manager for the session.
* @return the ClientSession with the specified stream ID.
*/
public ClientSession getClientSession(String connectionManagerDomain, String streamID) {
Map<String, ClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions != null) {
return sessions.get(streamID);
}
return null;
}
/**
* Returns a {@link ConnectionMultiplexerSession} for the specified connection manager
* domain or <tt>null</tt> if none was found. In case the connection manager has many
* connections established with the server then one of them will be selected randomly.
*
* @param connectionManagerDomain the domain of the connection manager to get a session.
* @return a session to the specified connection manager domain or null if none was found.
*/
public ConnectionMultiplexerSession getMultiplexerSession(String connectionManagerDomain) {
List<ConnectionMultiplexerSession> sessions =
sessionManager.getConnectionMultiplexerSessions(connectionManagerDomain);
if (sessions.isEmpty()) {
return null;
}
else if (sessions.size() == 1) {
return sessions.get(0);
}
else {
// Pick a random session so we can distribute traffic evenly
return sessions.get(randGen.nextInt(sessions.size()));
}
}
public void anonymousSessionCreated(Session session) {
// Do nothing.
}
public void anonymousSessionDestroyed(Session session) {
removeSession(session);
}
public void sessionCreated(Session session) {
// Do nothing.
}
public void sessionDestroyed(Session session) {
removeSession(session);
}
private void removeSession(Session session) {
// Remove trace indicating that a connection manager is hosting a connection
String streamID = session.getStreamID().getID();
String connectionManagerDomain = streamIDs.remove(streamID);
// Remove trace indicating that a connection manager is hosting a session
if (connectionManagerDomain != null) {
Map<String, ClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions != null) {
sessions.remove(streamID);
}
}
}
/**
* Simple implementation of the StreamID interface to hold the stream ID assigned by
* the Connection Manager to the Session.
*/
private class BasicStreamID implements StreamID {
String id;
public BasicStreamID(String id) {
this.id = id;
}
public String getID() {
return id;
}
public String toString() {
return id;
}
public int hashCode() {
return id.hashCode();
}
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.multiplex;
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.auth.AuthFactory;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.component.ExternalComponentManager;
import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.jivesoftware.wildfire.net.SocketConnection;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import org.xmpp.packet.StreamError;
import java.io.IOException;
import java.io.Writer;
import java.util.Collection;
/**
* Represents a session between the server and a connection manager.<p>
*
* Each Connection Manager has its own domain. Each connection from the same connection manager
* uses a different resource. Unlike any other session, connection manager sessions are not
* present in the routing table. This means that connection managers are not reachable entities.
* In other words, entities cannot send packets to connection managers but clients being hosted
* by them. The main reason behind this design decision is that connection managers are private
* components of the server so they can only be contacted by the server. Connection Manager
* sessions are present in {@link SessionManager} but not in {@link RoutingTable}. Use
* {@link SessionManager#getConnectionMultiplexerSessions(String)} to get all sessions or
* {@link ConnectionMultiplexerManager#getMultiplexerSession(String)}
* to get a random session to a given connection manager.
*
* @author Gaston Dombiak
*/
public class ConnectionMultiplexerSession extends Session {
private static Connection.TLSPolicy tlsPolicy;
private static Connection.CompressionPolicy compressionPolicy;
/**
* Milliseconds a connection has to be idle to be closed. Default is 30 minutes. Sending
* stanzas to the client is not considered as activity. We are only considering the connection
* active when the client sends some data or hearbeats (i.e. whitespaces) to the server.
* The reason for this is that sending data will fail if the connection is closed. And if
* the thread is blocked while sending data (because the socket is closed) then the clean up
* thread will close the socket anyway.
*/
private static long idleTimeout;
static {
// Set the TLS policy stored as a system property
String policyName = JiveGlobals.getProperty("xmpp.multiplex.tls.policy",
Connection.TLSPolicy.optional.toString());
tlsPolicy = Connection.TLSPolicy.valueOf(policyName);
// Set the Compression policy stored as a system property
policyName = JiveGlobals.getProperty("xmpp.multiplex.compression.policy",
Connection.CompressionPolicy.disabled.toString());
compressionPolicy = Connection.CompressionPolicy.valueOf(policyName);
// Set the default read idle timeout. If none was set then assume 30 minutes
idleTimeout = JiveGlobals.getIntProperty("xmpp.multiplex.idle", 30 * 60 * 1000);
}
public static Session createSession(String serverName, XMPPPacketReader reader,
SocketConnection connection) throws XmlPullParserException, IOException,
UnauthorizedException {
XmlPullParser xpp = reader.getXPPParser();
String domain = xpp.getAttributeValue("", "to");
Log.debug("[ConMng] Starting registration of new connection manager for domain: " + domain);
Writer writer = connection.getWriter();
// Default answer header in case of an error
StringBuilder sb = new StringBuilder();
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
sb.append("<stream:stream ");
sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" ");
sb.append("xmlns=\"jabber:connectionmanager\" from=\"");
sb.append(domain);
sb.append("\">");
// Check that a domain was provided in the stream header
if (domain == null) {
Log.debug("[ConMng] Domain not specified in stanza: " + xpp.getText());
// Include the bad-format in the response
StreamError error = new StreamError(StreamError.Condition.bad_format);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Get the requested domain
JID address = new JID(domain);
// Check that a secret key was configured in the server
String secretKey = getSecretKey();
if (secretKey == null) {
Log.debug("[ConMng] A shared secret for connection manager was not found.");
// Include the internal-server-error in the response
StreamError error = new StreamError(StreamError.Condition.internal_server_error);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Check that the requested subdomain is not already in use
if (SessionManager.getInstance().getConnectionMultiplexerSession(address) != null) {
Log.debug("[ConMng] Another connection manager is already using domain: " + domain);
// Domain already occupied so return a conflict error and close the connection
// Include the conflict error in the response
StreamError error = new StreamError(StreamError.Condition.conflict);
sb.append(error.toXML());
writer.write(sb.toString());
writer.flush();
// Close the underlying connection
connection.close();
return null;
}
// Indicate the TLS policy to use for this connection
connection.setTlsPolicy(tlsPolicy);
// Indicate the compression policy to use for this connection
connection.setCompressionPolicy(compressionPolicy);
// Set the max number of milliseconds the connection may not receive data from the
// client before closing the connection
connection.setIdleTimeout(idleTimeout);
// Set the connection manager domain to use delivering a packet fails
((MultiplexerPacketDeliverer) connection.getPacketDeliverer())
.setConnectionManagerDomain(address.getDomain());
// Create a ConnectionMultiplexerSession for the new session originated
// from the connection manager
Session session =
SessionManager.getInstance().createMultiplexerSession(connection, address);
// Set the address of the new session
session.setAddress(address);
try {
Log.debug("[ConMng] Send stream header with ID: " + session.getStreamID() +
" for connection manager with domain: " +
domain);
// Build the start packet response
sb = new StringBuilder();
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
sb.append("<stream:stream ");
sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" ");
sb.append("xmlns=\"jabber:connectionmanager\" from=\"");
sb.append(domain);
sb.append("\" id=\"");
sb.append(session.getStreamID().toString());
sb.append("\">");
writer.write(sb.toString());
writer.flush();
// Announce stream features.
sb = new StringBuilder(490);
sb.append("<stream:features>");
if (tlsPolicy != Connection.TLSPolicy.disabled) {
sb.append("<starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">");
if (tlsPolicy == Connection.TLSPolicy.required) {
sb.append("<required/>");
}
sb.append("</starttls>");
}
// Include Stream features
String specificFeatures = session.getAvailableStreamFeatures();
if (specificFeatures != null) {
sb.append(specificFeatures);
}
sb.append("</stream:features>");
writer.write(sb.toString());
writer.flush();
return session;
}
catch (Exception e) {
Log.error("An error occured while creating a ComponentSession", e);
// Close the underlying connection
connection.close();
return null;
}
}
private static String getSecretKey() {
// TODO Use another shared secret (?)
return ExternalComponentManager.getDefaultSecret();
}
public ConnectionMultiplexerSession(String serverName, Connection connection, StreamID streamID) {
super(serverName, connection, streamID);
}
public String getAvailableStreamFeatures() {
if (conn.getTlsPolicy() == Connection.TLSPolicy.required && !conn.isSecure()) {
return null;
}
// Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled &&
!conn.isCompressed()) {
return "<compression xmlns=\"http://jabber.org/features/compress\"><method>zlib</method></compression>";
}
return null;
}
public void process(Packet packet) {
deliver(packet);
}
/**
* Authenticates the connection manager. Shared secret is validated with the one provided
* by the connection manager. If everything went fine then the session will have a status
* of "authenticated" and the connection manager will receive the client configuration
* options.
*
* @param digest the digest provided by the connection manager with the handshake stanza.
* @return true if the connection manager was sucessfully authenticated.
*/
public boolean authenticate(String digest) {
// Perform authentication. Wait for the handshake (with the secret key)
String anticipatedDigest = AuthFactory.createDigest(getStreamID().getID(), getSecretKey());
// Check that the provided handshake (secret key + sessionID) is correct
if (!anticipatedDigest.equalsIgnoreCase(digest)) {
Log.debug("[ConMng] Incorrect handshake for connection manager with domain: " +
getAddress().getDomain());
// The credentials supplied by the initiator are not valid (answer an error
// and close the connection)
conn.deliverRawText(new StreamError(StreamError.Condition.not_authorized).toXML());
// Close the underlying connection
conn.close();
return false;
}
else {
// Component has authenticated fine
setStatus(Session.STATUS_AUTHENTICATED);
// Send empty handshake element to acknowledge success
conn.deliverRawText("<handshake></handshake>");
Log.debug("[ConMng] Connection manager was AUTHENTICATED with domain: " + getAddress());
sendClientOptions();
return true;
}
}
/**
* Send to the Connection Manager the connection options available for clients. The info
* to send includes:
* <ul>
* <li>if TLS is available, optional or required
* <li>SASL mechanisms available before TLS is negotiated
* <li>if compression is available
* <li>if Non-SASL authentication is available
* <li>if In-Band Registration is available
* </ul
*/
private void sendClientOptions() {
IQ options = new IQ(IQ.Type.set);
Element child = options.setChildElement("configuration",
"http://jabber.org/protocol/connectionmanager");
// Add info about TLS
if (ClientSession.getTLSPolicy() != Connection.TLSPolicy.disabled) {
Element tls = child.addElement("starttls", "urn:ietf:params:xml:ns:xmpp-tls");
if (ClientSession.getTLSPolicy() != Connection.TLSPolicy.required) {
tls.addElement("required");
}
}
// Add info about SASL mechanisms
Collection<String> mechanisms = SASLAuthentication.getSupportedMechanisms();
if (!mechanisms.isEmpty()) {
Element sasl = child.addElement("mechanisms", "urn:ietf:params:xml:ns:xmpp-sasl");
for (String mechanism : mechanisms) {
sasl.addElement("mechanism").setText(mechanism);
}
}
// Add info about Stream Compression
if (ClientSession.getCompressionPolicy() == Connection.CompressionPolicy.optional) {
Element comp = child.addElement("compression", "http://jabber.org/features/compress");
comp.addElement("method").setText("zlib");
}
// Add info about Non-SASL authentication
if (XMPPServer.getInstance().getIQAuthHandler().isAllowAnonymous()) {
child.addElement("auth", "http://jabber.org/features/iq-auth");
}
// Add info about In-Band Registration
if (XMPPServer.getInstance().getIQRegisterHandler().isInbandRegEnabled()) {
child.addElement("register", "http://jabber.org/features/iq-register");
}
// Send the options
try {
conn.deliver(options);
}
catch (UnauthorizedException e) {
// Do nothing. Should never happen
}
}
void deliver(Packet packet) {
if (conn != null && !conn.isClosed()) {
try {
conn.deliver(packet);
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
}
}
}
/**
* Returns whether TLS is mandatory, optional or is disabled for clients. When TLS is
* mandatory clients are required to secure their connections or otherwise their connections
* will be closed. On the other hand, when TLS is disabled clients are not allowed to secure
* their connections using TLS. Their connections will be closed if they try to secure the
* connection. in this last case.
*
* @return whether TLS is mandatory, optional or is disabled.
*/
public static SocketConnection.TLSPolicy getTLSPolicy() {
return tlsPolicy;
}
/**
* Sets whether TLS is mandatory, optional or is disabled for clients. When TLS is
* mandatory clients are required to secure their connections or otherwise their connections
* will be closed. On the other hand, when TLS is disabled clients are not allowed to secure
* their connections using TLS. Their connections will be closed if they try to secure the
* connection. in this last case.
*
* @param policy whether TLS is mandatory, optional or is disabled.
*/
public static void setTLSPolicy(SocketConnection.TLSPolicy policy) {
tlsPolicy = policy;
JiveGlobals.setProperty("xmpp.multiplex.tls.policy", tlsPolicy.toString());
}
/**
* Returns whether compression is optional or is disabled for clients.
*
* @return whether compression is optional or is disabled.
*/
public static SocketConnection.CompressionPolicy getCompressionPolicy() {
return compressionPolicy;
}
/**
* Sets whether compression is optional or is disabled for clients.
*
* @param policy whether compression is optional or is disabled.
*/
public static void setCompressionPolicy(SocketConnection.CompressionPolicy policy) {
compressionPolicy = policy;
JiveGlobals.setProperty("xmpp.multiplex.compression.policy", compressionPolicy.toString());
}
/**
* Returns the number of milliseconds a connection has to be idle to be closed. Default is
* 30 minutes. Sending stanzas to the client is not considered as activity. We are only
* considering the connection active when the client sends some data or hearbeats
* (i.e. whitespaces) to the server.
*
* @return the number of milliseconds a connection has to be idle to be closed.
*/
public static long getIdleTimeout() {
return idleTimeout;
}
/**
* Sets the number of milliseconds a connection has to be idle to be closed. Default is
* 30 minutes. Sending stanzas to the client is not considered as activity. We are only
* considering the connection active when the client sends some data or hearbeats
* (i.e. whitespaces) to the server.
*
* @param timeout the number of milliseconds a connection has to be idle to be closed.
*/
public static void setIdleTimeout(long timeout) {
idleTimeout = timeout;
JiveGlobals.setProperty("xmpp.multiplex.idle", Long.toString(idleTimeout));
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.multiplex;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.OfflineMessageStrategy;
import org.jivesoftware.wildfire.PacketDeliverer;
import org.jivesoftware.wildfire.PacketException;
import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;
/**
* Fallback method used by {@link org.jivesoftware.wildfire.net.SocketConnection} when
* connected to a connection manager. The fallback method will be used when a SocketConnection
* fails to send a {@link Packet} (probably because the socket was closed).<p>
*
* The first attempt will be to send the packet using another connection to the same connection
* manager (since managers may have a pool of connections to the server). And if that fails then
* instances of {@link Message} may be stored offline for later retrieval. Since packets may be
* wrapped by special IQ packets (read the Connection Manager JEP for more information) we need
* to unwrap the packet and store the wrapped packet offline.
*
* @author Gaston Dombiak
*/
public class MultiplexerPacketDeliverer implements PacketDeliverer {
private OfflineMessageStrategy messageStrategy;
private String connectionManagerDomain;
private ConnectionMultiplexerManager multiplexerManager;
public MultiplexerPacketDeliverer() {
this.messageStrategy = XMPPServer.getInstance().getOfflineMessageStrategy();
multiplexerManager = ConnectionMultiplexerManager.getInstance();
}
void setConnectionManagerDomain(String connectionManagerDomain) {
this.connectionManagerDomain = connectionManagerDomain;
}
public void deliver(Packet packet) throws UnauthorizedException, PacketException {
// Check if we can send the packet using another session
if (connectionManagerDomain == null) {
// Packet deliverer has not yet been configured so handle unprocessed packet
handleUnprocessedPacket(packet);
}
else {
// Try getting another session to the same connection manager
ConnectionMultiplexerSession session =
multiplexerManager.getMultiplexerSession(connectionManagerDomain);
if (session == null || session.getConnection().isClosed()) {
// No other session was found so handle unprocessed packet
handleUnprocessedPacket(packet);
}
else {
// Send the packet using this other session to the same connection manager
session.deliver(packet);
}
}
}
private void handleUnprocessedPacket(Packet packet) {
if (packet instanceof Message) {
messageStrategy.storeOffline((Message) packet);
}
else if (packet instanceof Presence) {
// presence packets are dropped silently
//dropPacket(packet);
}
else if (packet instanceof IQ) {
IQ iq = (IQ) packet;
// Check if we need to unwrap the packet
Element child = iq.getChildElement();
if (child != null && "session".equals(child.getName()) &&
"http://jabber.org/protocol/connectionmanager"
.equals(child.getNamespacePrefix())) {
Element send = child.element("send");
if (send != null) {
// Unwrap packet
Element wrappedElement = (Element) send.elements().get(0);
if ("message".equals(wrappedElement.getName())) {
handleUnprocessedPacket(new Message(wrappedElement));
}
}
}
else {
// IQ packets are logged but dropped
Log.warn(LocaleUtils.getLocalizedString("admin.error.routing") + "\n" +
packet.toString());
}
}
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.multiplex;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.ClientSession;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.interceptor.InterceptorManager;
import org.jivesoftware.wildfire.interceptor.PacketRejectedException;
import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.xmpp.packet.*;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* IQ packets sent from Connection Managers themselves to the server will be handled by
* instances of this class. Each instance of
* {@link org.jivesoftware.wildfire.net.ConnectionMultiplexerSocketReader} will have an instance
* of this class so that IQ packets can be routed to this handler.<p>
* <p/>
* This class will interact with {@link ConnectionMultiplexerManager} to create, close or
* get client sessions.
*
* @author Gaston Dombiak
*/
public class MultiplexerPacketHandler {
private String connectionManagerDomain;
private PacketRouter router;
private final ConnectionMultiplexerManager multiplexerManager;
public MultiplexerPacketHandler(String connectionManagerDomain) {
this.connectionManagerDomain = connectionManagerDomain;
router = XMPPServer.getInstance().getPacketRouter();
multiplexerManager = ConnectionMultiplexerManager.getInstance();
}
public void handle(Packet packet) {
if (packet instanceof IQ) {
IQ iq = (IQ) packet;
if (iq.getType() == IQ.Type.result) {
// Do nothing with result packets
}
else if (iq.getType() == IQ.Type.error) {
// Log the IQ error packet that the connection manager failed to process
Log.warn("Connection Manager failed to process IQ packet: " + packet.toXML());
}
else if (iq.getType() == IQ.Type.set) {
Element child = iq.getChildElement();
String streamID = child.attributeValue("id");
if (streamID == null) {
// No stream ID was included so return a bad_request error
Element extraError = DocumentHelper.createElement(QName.get(
"id-required", "http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, extraError);
}
else if ("session".equals(child.getName())) {
if (child.element("create") != null) {
// Connection Manager wants to create a Client Session
multiplexerManager.createClientSession(connectionManagerDomain, streamID);
sendResultPacket(iq);
}
else {
ClientSession session = multiplexerManager
.getClientSession(connectionManagerDomain, streamID);
if (session == null) {
// Specified Client Session does not exist
sendErrorPacket(iq, PacketError.Condition.item_not_found, null);
}
else if (child.element("close") != null) {
// Connection Manager wants to close a Client Session
multiplexerManager
.closeClientSession(connectionManagerDomain, streamID);
sendResultPacket(iq);
}
else if (child.element("send") != null) {
// Connection Manager wrapped a packet from a Client Session.
List wrappedElements = child.element("send").elements();
if (wrappedElements.size() != 1) {
// Wrapper element is wrapping 0 or many items
Element extraError = DocumentHelper.createElement(QName.get(
"invalid-payload",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, extraError);
}
else {
Element wrappedElement = (Element) wrappedElements.get(0);
String tag = wrappedElement.getName();
try {
if ("auth".equals(tag) || "response".equals(tag)) {
SASLAuthentication.handle(session, wrappedElement);
}
else if ("iq".equals(tag)) {
processIQ(session, getIQ(wrappedElement));
}
else if ("message".equals(tag)) {
processMessage(session, new Message(wrappedElement));
}
else if ("presence".equals(tag)) {
processPresence(session, new Presence(wrappedElement));
}
else {
Element extraError = DocumentHelper.createElement(QName.get(
"unknown-stanza",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request,
extraError);
return;
}
}
catch (UnsupportedEncodingException e) {
Log.error("Error processing wrapped packet: " +
wrappedElement.asXML(), e);
sendErrorPacket(iq, PacketError.Condition.internal_server_error,
null);
}
}
// The wrapped packet does not contain information about the sender
// so the streamID helps identify the Client Session
sendResultPacket(iq);
}
else if (child.element("failed") != null) {
// Connection Manager failed to deliver a message
// Connection Manager wrapped a packet from a Client Session.
List wrappedElements = child.element("failed").elements();
if (wrappedElements.size() != 1) {
// Wrapper element is wrapping 0 or many items
Element extraError = DocumentHelper.createElement(QName.get(
"invalid-payload",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, extraError);
}
else {
Element wrappedElement = (Element) wrappedElements.get(0);
String tag = wrappedElement.getName();
if ("message".equals(tag)) {
XMPPServer.getInstance().getOfflineMessageStrategy()
.storeOffline(new Message(wrappedElement));
sendResultPacket(iq);
}
else {
Element extraError = DocumentHelper.createElement(QName.get(
"unknown-stanza",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request,
extraError);
}
}
}
else {
// Unknown IQ packet received so return error to sender
sendErrorPacket(iq, PacketError.Condition.bad_request, null);
}
}
}
else {
// Unknown IQ packet received so return error to sender
sendErrorPacket(iq, PacketError.Condition.bad_request, null);
}
}
else {
// Unknown IQ packet received so return error to sender
sendErrorPacket(iq, PacketError.Condition.bad_request, null);
}
}
}
private void processIQ(ClientSession session, IQ packet) {
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet so answer a not_allowed error
IQ reply = new IQ();
reply.setChildElement(packet.getChildElement().createCopy());
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_allowed);
session.process(reply);
// Check if a message notifying the rejection should be sent
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message notification = new Message();
notification.setTo(session.getAddress());
notification.setFrom(packet.getTo());
notification.setBody(e.getRejectionMessage());
session.process(notification);
}
}
}
private void processPresence(ClientSession session, Presence packet) {
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet so answer a not_allowed error
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_allowed);
session.process(reply);
// Check if a message notifying the rejection should be sent
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message notification = new Message();
notification.setTo(session.getAddress());
notification.setFrom(packet.getTo());
notification.setBody(e.getRejectionMessage());
session.process(notification);
}
}
}
private void processMessage(ClientSession session, Message packet) {
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setType(packet.getType());
reply.setThread(packet.getThread());
reply.setBody(e.getRejectionMessage());
session.process(reply);
}
}
}
private IQ getIQ(Element doc) {
Element query = doc.element("query");
if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) {
return new Roster(doc);
}
else {
return new IQ(doc);
}
}
/**
* Sends an IQ error with the specified condition to the sender of the original
* IQ packet.
*
* @param packet the packet to be bounced.
* @param extraError application specific error or null if none.
*/
private void sendErrorPacket(IQ packet, PacketError.Condition error, Element extraError) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(error);
if (extraError != null) {
// Add specific application error if available
reply.getError().getElement().add(extraError);
}
deliver(reply);
}
/**
* Sends an IQ result packet confirming that the operation was successful.
*
* @param packet the original IQ packet.
*/
private void sendResultPacket(IQ packet) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
deliver(reply);
}
private void deliver(IQ reply) {
// Get any session of the connection manager to deliver the packet
ConnectionMultiplexerSession session =
multiplexerManager.getMultiplexerSession(connectionManagerDomain);
if (session != null) {
session.deliver(reply);
}
else {
Log.warn("No multiplexer session found. Packet not delivered: " + reply.toXML());
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment