Commit 44af3b5b authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Modified to support non-blocking connections. JM-687

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3904 b35dd754-fafc-0310-a699-88a17e54d16e
parent 1ff31a5a
...@@ -9,18 +9,14 @@ ...@@ -9,18 +9,14 @@
package org.dom4j.io; package org.dom4j.io;
import java.io.*; import org.dom4j.*;
import java.net.URL; import org.jivesoftware.wildfire.net.MXParser;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentFactory;
import org.dom4j.Element;
import org.dom4j.ElementHandler;
import org.dom4j.QName;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory; import org.xmlpull.v1.XmlPullParserFactory;
import org.jivesoftware.wildfire.net.MXParser;
import java.io.*;
import java.net.URL;
/** /**
* <p><code>XMPPPacketReader</code> is a Reader of DOM4J documents that * <p><code>XMPPPacketReader</code> is a Reader of DOM4J documents that
...@@ -277,6 +273,23 @@ public class XMPPPacketReader { ...@@ -277,6 +273,23 @@ public class XMPPPacketReader {
return lastActive > lastHeartbeat ? lastActive : lastHeartbeat; return lastActive > lastHeartbeat ? lastActive : lastHeartbeat;
} }
/*
* DANIELE: Add parse document by string
*/
public Document parseDocument(String xml) throws DocumentException {
/*
// Long way with reuse of DocumentFactory.
DocumentFactory df = getDocumentFactory();
SAXReader reader = new SAXReader( df );
Document document = reader.read( new StringReader( xml );*/
// Simple way
// TODO Optimize. Do not create a sax reader for each parsing
Document document = DocumentHelper.parseText(xml);
return document;
}
// Implementation methods // Implementation methods
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
public Document parseDocument() throws DocumentException, IOException, XmlPullParserException { public Document parseDocument() throws DocumentException, IOException, XmlPullParserException {
......
...@@ -11,9 +11,11 @@ ...@@ -11,9 +11,11 @@
package org.jivesoftware.wildfire; package org.jivesoftware.wildfire;
import org.jivesoftware.wildfire.net.SocketReader;
import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.util.Iterator; import java.util.Iterator;
import org.xmlpull.v1.XmlPullParserException;
/** /**
* Coordinates connections (accept, read, termination) on the server. * Coordinates connections (accept, read, termination) on the server.
...@@ -31,15 +33,17 @@ public interface ConnectionManager { ...@@ -31,15 +33,17 @@ public interface ConnectionManager {
public Iterator<ServerPort> getPorts(); public Iterator<ServerPort> getPorts();
/** /**
* Adds a socket to be managed by the connection manager. * Creates a new socket reader for the new accepted socket to be managed
* by the connection manager.
* *
* @param socket the socket to add to this manager for management. * @param socket the new accepted socket by this manager.
* @param isSecure true if the connection is secure. * @param isSecure true if the connection is secure.
* @param serverPort holds information about the port on which the server is listening for * @param serverPort holds information about the port on which the server is listening for
* connections. * connections.
* @param useBlockingMode true means that the server will use a thread per connection.
*/ */
public void addSocket(Socket socket, boolean isSecure, ServerPort serverPort) public SocketReader createSocketReader(Socket socket, boolean isSecure, ServerPort serverPort,
throws XmlPullParserException; boolean useBlockingMode) throws IOException;
/** /**
* Sets if the port listener for unsecured clients will be available or not. When disabled * Sets if the port listener for unsecured clients will be available or not. When disabled
......
...@@ -12,10 +12,10 @@ ...@@ -12,10 +12,10 @@
package org.jivesoftware.wildfire.net; package org.jivesoftware.wildfire.net;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.wildfire.ClientSession; import org.jivesoftware.wildfire.ClientSession;
import org.jivesoftware.wildfire.PacketRouter; import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.util.JiveGlobals;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
...@@ -40,8 +40,8 @@ import java.net.Socket; ...@@ -40,8 +40,8 @@ import java.net.Socket;
public class ClientSocketReader extends SocketReader { public class ClientSocketReader extends SocketReader {
public ClientSocketReader(PacketRouter router, String serverName, Socket socket, public ClientSocketReader(PacketRouter router, String serverName, Socket socket,
SocketConnection connection) { SocketConnection connection, boolean useBlockingMode) {
super(router, serverName, socket, connection); super(router, serverName, socket, connection, useBlockingMode);
} }
protected void processIQ(IQ packet) throws UnauthorizedException { protected void processIQ(IQ packet) throws UnauthorizedException {
...@@ -87,6 +87,10 @@ public class ClientSocketReader extends SocketReader { ...@@ -87,6 +87,10 @@ public class ClientSocketReader extends SocketReader {
return "jabber:client"; return "jabber:client";
} }
String getName() {
return "Client SR - " + hashCode();
}
boolean validateHost() { boolean validateHost() {
return JiveGlobals.getBooleanProperty("xmpp.client.validate.host",false); return JiveGlobals.getBooleanProperty("xmpp.client.validate.host",false);
} }
......
...@@ -33,8 +33,8 @@ import java.net.Socket; ...@@ -33,8 +33,8 @@ import java.net.Socket;
public class ComponentSocketReader extends SocketReader { public class ComponentSocketReader extends SocketReader {
public ComponentSocketReader(PacketRouter router, String serverName, Socket socket, public ComponentSocketReader(PacketRouter router, String serverName, Socket socket,
SocketConnection connection) { SocketConnection connection, boolean useBlockingMode) {
super(router, serverName, socket, connection); super(router, serverName, socket, connection, useBlockingMode);
} }
/** /**
...@@ -122,6 +122,10 @@ public class ComponentSocketReader extends SocketReader { ...@@ -122,6 +122,10 @@ public class ComponentSocketReader extends SocketReader {
return "jabber:component:accept"; return "jabber:component:accept";
} }
String getName() {
return "Component SR - " + hashCode();
}
boolean validateHost() { boolean validateHost() {
return false; return false;
} }
......
...@@ -69,8 +69,8 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -69,8 +69,8 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
private MultiplexerPacketHandler packetHandler; private MultiplexerPacketHandler packetHandler;
public ConnectionMultiplexerSocketReader(PacketRouter router, String serverName, Socket socket, public ConnectionMultiplexerSocketReader(PacketRouter router, String serverName, Socket socket,
SocketConnection connection) { SocketConnection connection, boolean useBlockingMode) {
super(router, serverName, socket, connection); super(router, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are // Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process // required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.core.threads", 10); int coreThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.core.threads", 10);
...@@ -205,6 +205,10 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -205,6 +205,10 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
} }
} }
String getName() {
return "ConnectionMultiplexer SR - " + hashCode();
}
boolean validateHost() { boolean validateHost() {
return false; return false;
} }
......
...@@ -26,7 +26,9 @@ import java.net.UnknownHostException; ...@@ -26,7 +26,9 @@ import java.net.UnknownHostException;
/** /**
* Implements a network front end with a dedicated thread reading * Implements a network front end with a dedicated thread reading
* each incoming socket. * each incoming socket. The old SSL method always uses a blocking model.
*
* @author Gaston Dombiak
*/ */
public class SSLSocketAcceptThread extends Thread { public class SSLSocketAcceptThread extends Thread {
...@@ -139,7 +141,12 @@ public class SSLSocketAcceptThread extends Thread { ...@@ -139,7 +141,12 @@ public class SSLSocketAcceptThread extends Thread {
try { try {
Socket sock = serverSocket.accept(); Socket sock = serverSocket.accept();
Log.debug("SSL Connect " + sock.toString()); Log.debug("SSL Connect " + sock.toString());
connManager.addSocket(sock, true, serverPort); SocketReader reader = connManager.createSocketReader(sock, true, serverPort, true);
// Create a new reading thread for each new connected client
Thread thread = new Thread(reader, reader.getName());
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
thread.start();
} }
catch (SSLException se) { catch (SSLException se) {
long exceptionTime = System.currentTimeMillis(); long exceptionTime = System.currentTimeMillis();
......
...@@ -50,8 +50,8 @@ public class ServerSocketReader extends SocketReader { ...@@ -50,8 +50,8 @@ public class ServerSocketReader extends SocketReader {
private ThreadPoolExecutor threadPool; private ThreadPoolExecutor threadPool;
public ServerSocketReader(PacketRouter router, String serverName, Socket socket, public ServerSocketReader(PacketRouter router, String serverName, Socket socket,
SocketConnection connection) { SocketConnection connection, boolean useBlockingMode) {
super(router, serverName, socket, connection); super(router, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are // Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process // required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.server.processing.core.threads", 2); int coreThreads = JiveGlobals.getIntProperty("xmpp.server.processing.core.threads", 2);
...@@ -217,6 +217,10 @@ public class ServerSocketReader extends SocketReader { ...@@ -217,6 +217,10 @@ public class ServerSocketReader extends SocketReader {
return "jabber:server"; return "jabber:server";
} }
String getName() {
return "Server SR - " + hashCode();
}
boolean validateHost() { boolean validateHost() {
return true; return true;
} }
......
...@@ -12,21 +12,20 @@ ...@@ -12,21 +12,20 @@
package org.jivesoftware.wildfire.net; package org.jivesoftware.wildfire.net;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.ConnectionManager; import org.jivesoftware.wildfire.ConnectionManager;
import org.jivesoftware.wildfire.ServerPort; import org.jivesoftware.wildfire.ServerPort;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
/** /**
* Implements a network front end with a dedicated thread reading * Implements a network front end with a dedicated thread reading
* each incoming socket. * each incoming socket. Blocking and non-blocking modes are supported.
* By default blocking mode is used. Use the <i>xmpp.socket.blocking</i>
* system property to change the blocking mode. Restart the server after making
* changes to the system property.
* *
* @author Iain Shigeoka * @author Gaston Dombiak
*/ */
public class SocketAcceptThread extends Thread { public class SocketAcceptThread extends Thread {
...@@ -55,22 +54,11 @@ public class SocketAcceptThread extends Thread { ...@@ -55,22 +54,11 @@ public class SocketAcceptThread extends Thread {
*/ */
private ServerPort serverPort; private ServerPort serverPort;
/** private SocketAcceptingMode acceptingMode;
* True while this thread should continue running.
*/
private boolean notTerminated = true;
/**
* socket that listens for connections.
*/
ServerSocket serverSocket;
private ConnectionManager connManager;
public SocketAcceptThread(ConnectionManager connManager, ServerPort serverPort) public SocketAcceptThread(ConnectionManager connManager, ServerPort serverPort)
throws IOException { throws IOException {
super("Socket Listener at port " + serverPort.getPort()); super("Socket Listener at port " + serverPort.getPort());
this.connManager = connManager;
this.serverPort = serverPort; this.serverPort = serverPort;
// Listen on a specific network interface if it has been set. // Listen on a specific network interface if it has been set.
String interfaceName = JiveGlobals.getXMLProperty("network.interface"); String interfaceName = JiveGlobals.getXMLProperty("network.interface");
...@@ -80,7 +68,14 @@ public class SocketAcceptThread extends Thread { ...@@ -80,7 +68,14 @@ public class SocketAcceptThread extends Thread {
bindInterface = InetAddress.getByName(interfaceName); bindInterface = InetAddress.getByName(interfaceName);
} }
} }
serverSocket = new ServerSocket(serverPort.getPort(), -1, bindInterface); // Set the blocking reading mode to use
boolean useBlockingMode = JiveGlobals.getBooleanProperty("xmpp.socket.blocking", true);
if (useBlockingMode) {
acceptingMode = new BlockingAcceptingMode(connManager, serverPort, bindInterface);
}
else {
acceptingMode = new NonBlockingAcceptingMode(connManager, serverPort, bindInterface);
}
} }
/** /**
...@@ -105,19 +100,7 @@ public class SocketAcceptThread extends Thread { ...@@ -105,19 +100,7 @@ public class SocketAcceptThread extends Thread {
* Unblock the thread and force it to terminate. * Unblock the thread and force it to terminate.
*/ */
public void shutdown() { public void shutdown() {
notTerminated = false; acceptingMode.shutdown();
try {
ServerSocket sSock = serverSocket;
serverSocket = null;
if (sSock != null) {
sSock.close();
}
}
catch (IOException e) {
// we don't care, no matter what, the socket should be dead
}
} }
/** /**
...@@ -125,34 +108,8 @@ public class SocketAcceptThread extends Thread { ...@@ -125,34 +108,8 @@ public class SocketAcceptThread extends Thread {
* call getting sockets and handing them to the SocketManager. * call getting sockets and handing them to the SocketManager.
*/ */
public void run() { public void run() {
while (notTerminated) { acceptingMode.run();
try { // We stopped accepting new connections so close the listener
Socket sock = serverSocket.accept(); shutdown();
if (sock != null) {
Log.debug("Connect " + sock.toString());
connManager.addSocket(sock, false, serverPort);
}
}
catch (IOException ie) {
if (notTerminated) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),
ie);
}
}
catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
}
}
try {
ServerSocket sSock = serverSocket;
serverSocket = null;
if (sSock != null) {
sSock.close();
}
}
catch (IOException e) {
// we don't care, no matter what, the socket should be dead
}
} }
} }
...@@ -30,6 +30,7 @@ import java.io.OutputStreamWriter; ...@@ -30,6 +30,7 @@ import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.nio.channels.Channels;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
...@@ -122,7 +123,13 @@ public class SocketConnection implements Connection { ...@@ -122,7 +123,13 @@ public class SocketConnection implements Connection {
this.secure = isSecure; this.secure = isSecure;
this.socket = socket; this.socket = socket;
// DANIELE: Modify socket to use channel
if (socket.getChannel() != null) {
writer = Channels.newWriter(socket.getChannel(), CHARSET);
}
else {
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), CHARSET)); writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), CHARSET));
}
this.backupDeliverer = backupDeliverer; this.backupDeliverer = backupDeliverer;
xmlSerializer = new XMLSocketWriter(writer, this); xmlSerializer = new XMLSocketWriter(writer, this);
......
...@@ -11,9 +11,6 @@ ...@@ -11,9 +11,6 @@
package org.jivesoftware.wildfire.net; package org.jivesoftware.wildfire.net;
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader; import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
...@@ -29,12 +26,8 @@ import org.xmlpull.v1.XmlPullParserException; ...@@ -29,12 +26,8 @@ import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory; import org.xmlpull.v1.XmlPullParserFactory;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;
/** /**
* A SocketReader creates the appropriate {@link Session} based on the defined namespace in the * A SocketReader creates the appropriate {@link Session} based on the defined namespace in the
...@@ -53,7 +46,6 @@ public abstract class SocketReader implements Runnable { ...@@ -53,7 +46,6 @@ public abstract class SocketReader implements Runnable {
*/ */
private static XmlPullParserFactory factory = null; private static XmlPullParserFactory factory = null;
private Socket socket;
protected Session session; protected Session session;
protected SocketConnection connection; protected SocketConnection connection;
protected String serverName; protected String serverName;
...@@ -62,6 +54,7 @@ public abstract class SocketReader implements Runnable { ...@@ -62,6 +54,7 @@ public abstract class SocketReader implements Runnable {
* Router used to route incoming packets to the correct channels. * Router used to route incoming packets to the correct channels.
*/ */
private PacketRouter router; private PacketRouter router;
private SocketReadingMode readingMode;
XMPPPacketReader reader = null; XMPPPacketReader reader = null;
protected boolean open; protected boolean open;
private RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable(); private RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
...@@ -82,100 +75,39 @@ public abstract class SocketReader implements Runnable { ...@@ -82,100 +75,39 @@ public abstract class SocketReader implements Runnable {
* @param serverName the name of the server this socket is working for. * @param serverName the name of the server this socket is working for.
* @param socket the socket to read from. * @param socket the socket to read from.
* @param connection the connection being read. * @param connection the connection being read.
* @param useBlockingMode true means that the server will use a thread per connection.
*/ */
public SocketReader(PacketRouter router, String serverName, Socket socket, public SocketReader(PacketRouter router, String serverName, Socket socket,
SocketConnection connection) { SocketConnection connection, boolean useBlockingMode) {
this.serverName = serverName; this.serverName = serverName;
this.router = router; this.router = router;
this.connection = connection; this.connection = connection;
this.socket = socket;
connection.setSocketReader(this); connection.setSocketReader(this);
}
/** // Reader is associated with a new XMPPPacketReader
* A dedicated thread loop for reading the stream and sending incoming
* packets to the appropriate router.
*/
public void run() {
try {
reader = new XMPPPacketReader(); reader = new XMPPPacketReader();
reader.setXPPFactory(factory); reader.setXPPFactory(factory);
reader.getXPPParser().setInput(new InputStreamReader(socket.getInputStream(), // Set the blocking reading mode to use
CHARSET)); if (useBlockingMode) {
readingMode = new BlockingReadingMode(socket, this);
// Read in the opening tag and prepare for packet stream
try {
createSession();
}
catch (IOException e) {
Log.debug("Error creating session", e);
throw e;
}
// Read the packet stream until it ends
if (session != null) {
readStream();
}
}
catch (EOFException eof) {
// Normal disconnect
}
catch (SocketException se) {
// The socket was closed. The server may close the connection for several
// reasons (e.g. user requested to remove his account). Do nothing here.
}
catch (AsynchronousCloseException ace) {
// The socket was closed.
}
catch (XmlPullParserException ie) {
// It is normal for clients to abruptly cut a connection
// rather than closing the stream document. Since this is
// normal behavior, we won't log it as an error.
// Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);
}
catch (Exception e) {
if (session != null) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +
session, e);
}
}
finally {
if (session != null) {
if (Log.isDebugEnabled()) {
Log.debug("Logging off " + session.getAddress() + " on " + connection);
}
try {
session.getConnection().close();
}
catch (Exception e) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.connection")
+ "\n" + socket.toString());
}
} }
else { else {
// Close and release the created connection readingMode = new NonBlockingReadingMode(socket, this);
connection.close();
Log.error(LocaleUtils.getLocalizedString("admin.error.connection")
+ "\n" + socket.toString());
}
shutdown();
} }
} }
/** /**
* Read the incoming stream until it ends. * A dedicated thread loop for reading the stream and sending incoming
* packets to the appropriate router.
*/ */
private void readStream() throws Exception { public void run() {
open = true; readingMode.run();
while (open) { }
Element doc = reader.parseDocument().getRootElement();
protected void process(Element doc) throws Exception {
if (doc == null) { if (doc == null) {
// Stop reading the stream since the client has sent an end of
// stream element and probably closed the connection.
return; return;
} }
...@@ -193,7 +125,7 @@ public abstract class SocketReader implements Runnable { ...@@ -193,7 +125,7 @@ public abstract class SocketReader implements Runnable {
reply.getElement().addAttribute("from", doc.attributeValue("to")); reply.getElement().addAttribute("from", doc.attributeValue("to"));
reply.setError(PacketError.Condition.jid_malformed); reply.setError(PacketError.Condition.jid_malformed);
session.process(reply); session.process(reply);
continue; return;
} }
processMessage(packet); processMessage(packet);
} }
...@@ -210,7 +142,7 @@ public abstract class SocketReader implements Runnable { ...@@ -210,7 +142,7 @@ public abstract class SocketReader implements Runnable {
reply.getElement().addAttribute("from", doc.attributeValue("to")); reply.getElement().addAttribute("from", doc.attributeValue("to"));
reply.setError(PacketError.Condition.jid_malformed); reply.setError(PacketError.Condition.jid_malformed);
session.process(reply); session.process(reply);
continue; return;
} }
// Check that the presence type is valid. If not then assume available type // Check that the presence type is valid. If not then assume available type
try { try {
...@@ -237,7 +169,7 @@ public abstract class SocketReader implements Runnable { ...@@ -237,7 +169,7 @@ public abstract class SocketReader implements Runnable {
// session may have buffered data pending to be processes so we want to ignore // session may have buffered data pending to be processes so we want to ignore
// just Presences of type available // just Presences of type available
Log.warn("Ignoring available presence packet of closed session: " + packet); Log.warn("Ignoring available presence packet of closed session: " + packet);
continue; return;
} }
processPresence(packet); processPresence(packet);
} }
...@@ -259,41 +191,10 @@ public abstract class SocketReader implements Runnable { ...@@ -259,41 +191,10 @@ public abstract class SocketReader implements Runnable {
} }
reply.setError(PacketError.Condition.jid_malformed); reply.setError(PacketError.Condition.jid_malformed);
session.process(reply); session.process(reply);
continue; return;
} }
processIQ(packet); processIQ(packet);
} }
else if ("starttls".equals(tag)) {
// Negotiate TLS
if (negotiateTLS()) {
tlsNegotiated();
}
else {
open = false;
session = null;
}
}
else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
if (authenticateClient(doc)) {
// SASL authentication was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
saslSuccessful();
}
else if (connection.isClosed()) {
open = false;
session = null;
}
}
else if ("compress".equals(tag))
{
// Client is trying to initiate compression
if (compressClient(doc)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
compressionSuccessful();
}
}
else else
{ {
if (!processUnknowPacket(doc)) { if (!processUnknowPacket(doc)) {
...@@ -303,90 +204,6 @@ public abstract class SocketReader implements Runnable { ...@@ -303,90 +204,6 @@ public abstract class SocketReader implements Runnable {
} }
} }
} }
}
private boolean authenticateClient(Element doc) throws DocumentException, IOException,
XmlPullParserException {
// Ensure that connection was secured if TLS was required
if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
!connection.isSecure()) {
closeNeverSecuredConnection();
return false;
}
boolean isComplete = false;
boolean success = false;
while (!isComplete) {
SASLAuthentication.Status status = SASLAuthentication.handle(session, doc);
if (status == SASLAuthentication.Status.needResponse) {
// Get the next answer since we are not done yet
doc = reader.parseDocument().getRootElement();
if (doc == null) {
// Nothing was read because the connection was closed or dropped
isComplete = true;
}
}
else {
isComplete = true;
success = status == SASLAuthentication.Status.authenticated;
}
}
return success;
}
/**
* Start using compression but first check if the connection can and should use compression.
* The connection will be closed if the requested method is not supported, if the connection
* is already using compression or if client requested to use compression but this feature
* is disabled.
*
* @param doc the element sent by the client requesting compression. Compression method is
* included.
* @return true if it was possible to use compression.
* @throws IOException if an error occurs while starting using compression.
*/
private boolean compressClient(Element doc) throws IOException {
String error = null;
if (connection.getCompressionPolicy() == Connection.CompressionPolicy.disabled) {
// Client requested compression but this feature is disabled
error = "<failure xmlns='http://jabber.org/protocol/compress'><setup-failed/></failure>";
// Log a warning so that admins can track this case from the server side
Log.warn("Client requested compression while compression is disabled. Closing " +
"connection : " + connection);
}
else if (connection.isCompressed()) {
// Client requested compression but connection is already compressed
error = "<failure xmlns='http://jabber.org/protocol/compress'><setup-failed/></failure>";
// Log a warning so that admins can track this case from the server side
Log.warn("Client requested compression and connection is already compressed. Closing " +
"connection : " + connection);
}
else {
// Check that the requested method is supported
String method = doc.elementText("method");
if (!"zlib".equals(method)) {
error = "<failure xmlns='http://jabber.org/protocol/compress'><unsupported-method/></failure>";
// Log a warning so that admins can track this case from the server side
Log.warn("Requested compression method is not supported: " + method +
". Closing connection : " + connection);
}
}
if (error != null) {
// Deliver stanza
connection.deliverRawText(error);
return false;
}
else {
// Indicate client that he can proceed and compress the socket
connection.deliverRawText("<compressed xmlns='http://jabber.org/protocol/compress'/>");
// Start using compression
connection.startCompression();
return true;
}
}
/** /**
* Process the received IQ packet. Registered * Process the received IQ packet. Registered
...@@ -549,11 +366,18 @@ public abstract class SocketReader implements Runnable { ...@@ -549,11 +366,18 @@ public abstract class SocketReader implements Runnable {
return reader.getLastActive(); return reader.getLastActive();
} }
/**
* Returns a name that identifies the type of reader and the unique instance.
*
* @return a name that identifies the type of reader and the unique instance.
*/
abstract String getName();
/** /**
* Close the connection since TLS was mandatory and the entity never negotiated TLS. Before * Close the connection since TLS was mandatory and the entity never negotiated TLS. Before
* closing the connection a stream error will be sent to the entity. * closing the connection a stream error will be sent to the entity.
*/ */
private void closeNeverSecuredConnection() { void closeNeverSecuredConnection() {
// Set the not_authorized error // Set the not_authorized error
StreamError error = new StreamError(StreamError.Condition.not_authorized); StreamError error = new StreamError(StreamError.Condition.not_authorized);
// Deliver stanza // Deliver stanza
...@@ -584,7 +408,8 @@ public abstract class SocketReader implements Runnable { ...@@ -584,7 +408,8 @@ public abstract class SocketReader implements Runnable {
* first packet. A call to next() should result in an START_TAG state with * first packet. A call to next() should result in an START_TAG state with
* the first packet in the stream. * the first packet in the stream.
*/ */
private void createSession() throws UnauthorizedException, XmlPullParserException, IOException { protected void createSession()
throws UnauthorizedException, XmlPullParserException, IOException {
XmlPullParser xpp = reader.getXPPParser(); XmlPullParser xpp = reader.getXPPParser();
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) { for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next(); eventType = xpp.next();
...@@ -667,149 +492,6 @@ public abstract class SocketReader implements Runnable { ...@@ -667,149 +492,6 @@ public abstract class SocketReader implements Runnable {
} }
} }
/**
* Tries to secure the connection using TLS. If the connection is secured then reset
* the parser to use the new secured reader. But if the connection failed to be secured
* then send a <failure> stanza and close the connection.
*
* @return true if the connection was secured.
* @throws IOException if an I/O error occures while parsing the input stream.
* @throws XmlPullParserException if an error occures while parsing.
*/
private boolean negotiateTLS() throws IOException, XmlPullParserException {
if (connection.getTlsPolicy() == Connection.TLSPolicy.disabled) {
// Set the not_authorized error
StreamError error = new StreamError(StreamError.Condition.not_authorized);
// Deliver stanza
connection.deliverRawText(error.toXML());
// Close the underlying connection
connection.close();
// Log a warning so that admins can track this case from the server side
Log.warn("TLS requested by initiator when TLS was never offered by server. " +
"Closing connection : " + connection);
return false;
}
// Client requested to secure the connection using TLS. Negotiate TLS.
try {
connection.startTLS(false, null);
}
catch (IOException e) {
Log.error("Error while negotiating TLS", e);
connection.deliverRawText("<failure xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">");
connection.close();
return false;
}
XmlPullParser xpp = reader.getXPPParser();
// Reset the parser to use the new reader
xpp.setInput(new InputStreamReader(connection.getTLSStreamHandler().getInputStream(), CHARSET));
// Skip new stream element
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
return true;
}
/**
* TLS negotiation was successful so open a new stream and offer the new stream features.
* The new stream features will include available SASL mechanisms and specific features
* depending on the session type such as auth for Non-SASL authentication and register
* for in-band registration.
*/
private void tlsNegotiated() {
// Offer stream features including SASL Mechanisms
StringBuilder sb = new StringBuilder(620);
sb.append(geStreamHeader());
sb.append("<stream:features>");
// Include available SASL Mechanisms
sb.append(SASLAuthentication.getSASLMechanisms(session));
// Include specific features such as auth and register for client sessions
String specificFeatures = session.getAvailableStreamFeatures();
if (specificFeatures != null) {
sb.append(specificFeatures);
}
sb.append("</stream:features>");
connection.deliverRawText(sb.toString());
}
/**
* After SASL authentication was successful we should open a new stream and offer
* new stream features such as resource binding and session establishment. Notice that
* resource binding and session establishment should only be offered to clients (i.e. not
* to servers or external components)
*/
private void saslSuccessful() throws XmlPullParserException, IOException {
MXParser xpp = reader.getXPPParser();
// Reset the parser since a new stream header has been sent from the client
xpp.resetInput();
// Skip the opening stream sent by the client
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;)
{
eventType = xpp.next();
}
StringBuilder sb = new StringBuilder(420);
sb.append(geStreamHeader());
sb.append("<stream:features>");
// Include specific features such as resource binding and session establishment
// for client sessions
String specificFeatures = session.getAvailableStreamFeatures();
if (specificFeatures != null) {
sb.append(specificFeatures);
}
sb.append("</stream:features>");
connection.deliverRawText(sb.toString());
}
/**
* After compression was successful we should open a new stream and offer
* new stream features such as resource binding and session establishment. Notice that
* resource binding and session establishment should only be offered to clients (i.e. not
* to servers or external components)
*/
private void compressionSuccessful() throws XmlPullParserException, IOException
{
XmlPullParser xpp = reader.getXPPParser();
// Reset the parser since a new stream header has been sent from the client
if (connection.getTLSStreamHandler() == null)
{
ZInputStream in = new ZInputStream(socket.getInputStream());
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
}
else
{
ZInputStream in = new ZInputStream(connection.getTLSStreamHandler().getInputStream());
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
}
// Skip the opening stream sent by the client
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;)
{
eventType = xpp.next();
}
StringBuilder sb = new StringBuilder(340);
sb.append(geStreamHeader());
sb.append("<stream:features>");
// Include SASL mechanisms only if client has not been authenticated
if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
// Include available SASL Mechanisms
sb.append(SASLAuthentication.getSASLMechanisms(session));
}
// Include specific features such as resource binding and session establishment
// for client sessions
String specificFeatures = session.getAvailableStreamFeatures();
if (specificFeatures != null)
{
sb.append(specificFeatures);
}
sb.append("</stream:features>");
connection.deliverRawText(sb.toString());
}
/** /**
* Returns the stream namespace. (E.g. jabber:client, jabber:server, etc.). * Returns the stream namespace. (E.g. jabber:client, jabber:server, etc.).
* *
...@@ -827,30 +509,6 @@ public abstract class SocketReader implements Runnable { ...@@ -827,30 +509,6 @@ public abstract class SocketReader implements Runnable {
*/ */
abstract boolean validateHost(); abstract boolean validateHost();
private String geStreamHeader() {
StringBuilder sb = new StringBuilder(200);
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
if (connection.isFlashClient()) {
sb.append("<flash:stream xmlns:flash=\"http://www.jabber.com/streams/flash\" ");
} else {
sb.append("<stream:stream ");
}
sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" xmlns=\"");
sb.append(getNamespace());
sb.append("\" from=\"");
sb.append(session.getServerName());
sb.append("\" id=\"");
sb.append(session.getStreamID().toString());
sb.append("\" xml:lang=\"");
sb.append(connection.getLanguage());
sb.append("\" version=\"");
sb.append(Session.MAJOR_VERSION).append(".").append(Session.MINOR_VERSION);
sb.append("\">");
return sb.toString();
}
/** /**
* Notification message indicating that the SocketReader is shutting down. The thread will * Notification message indicating that the SocketReader is shutting down. The thread will
* stop reading and processing new requests. Subclasses may want to redefine this message * stop reading and processing new requests. Subclasses may want to redefine this message
......
...@@ -177,8 +177,15 @@ public class TLSStreamHandler { ...@@ -177,8 +177,15 @@ public class TLSStreamHandler {
reader = new TLSStreamReader(wrapper, socket); reader = new TLSStreamReader(wrapper, socket);
writer = new TLSStreamWriter(wrapper, socket); writer = new TLSStreamWriter(wrapper, socket);
// DANIELE: Add code to use directly the socket-channel.
if (socket.getChannel() != null) {
rbc = socket.getChannel();
wbc = socket.getChannel();
}
else {
rbc = Channels.newChannel(socket.getInputStream()); rbc = Channels.newChannel(socket.getInputStream());
wbc = Channels.newChannel(socket.getOutputStream()); wbc = Channels.newChannel(socket.getOutputStream());
}
initialHSStatus = HandshakeStatus.NEED_UNWRAP; initialHSStatus = HandshakeStatus.NEED_UNWRAP;
initialHSComplete = false; initialHSComplete = false;
......
...@@ -47,7 +47,13 @@ public class TLSStreamReader { ...@@ -47,7 +47,13 @@ public class TLSStreamReader {
public TLSStreamReader(TLSWrapper tlsWrapper, Socket socket) throws IOException { public TLSStreamReader(TLSWrapper tlsWrapper, Socket socket) throws IOException {
wrapper = tlsWrapper; wrapper = tlsWrapper;
// DANIELE: Add code to use directly the socket channel
if (socket.getChannel() != null) {
rbc = socket.getChannel();
}
else {
rbc = Channels.newChannel(socket.getInputStream()); rbc = Channels.newChannel(socket.getInputStream());
}
inNetBB = ByteBuffer.allocate(wrapper.getNetBuffSize()); inNetBB = ByteBuffer.allocate(wrapper.getNetBuffSize());
inAppBB = ByteBuffer.allocate(wrapper.getAppBuffSize()); inAppBB = ByteBuffer.allocate(wrapper.getAppBuffSize());
} }
......
...@@ -38,7 +38,13 @@ public class TLSStreamWriter { ...@@ -38,7 +38,13 @@ public class TLSStreamWriter {
public TLSStreamWriter(TLSWrapper tlsWrapper, Socket socket) throws IOException { public TLSStreamWriter(TLSWrapper tlsWrapper, Socket socket) throws IOException {
wrapper = tlsWrapper; wrapper = tlsWrapper;
// DANIELE: Add code to use directly the socket channel
if (socket.getChannel() != null) {
wbc = socket.getChannel();
}
else {
wbc = Channels.newChannel(socket.getOutputStream()); wbc = Channels.newChannel(socket.getOutputStream());
}
outAppData = ByteBuffer.allocate(tlsWrapper.getAppBuffSize()); outAppData = ByteBuffer.allocate(tlsWrapper.getAppBuffSize());
} }
......
...@@ -256,25 +256,19 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -256,25 +256,19 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
return ports.iterator(); return ports.iterator();
} }
public void addSocket(Socket sock, boolean isSecure, ServerPort serverPort) { public SocketReader createSocketReader(Socket sock, boolean isSecure, ServerPort serverPort,
try { boolean useBlockingMode) throws IOException {
// the order of these calls is critical (stupid huh?)
SocketReader reader;
String threadName;
if (serverPort.isClientPort()) { if (serverPort.isClientPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure); SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
reader = new ClientSocketReader(router, serverName, sock, conn); return new ClientSocketReader(router, serverName, sock, conn, useBlockingMode);
threadName = "Client SR - " + reader.hashCode();
} }
else if (serverPort.isComponentPort()) { else if (serverPort.isComponentPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure); SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
reader = new ComponentSocketReader(router, serverName, sock, conn); return new ComponentSocketReader(router, serverName, sock, conn, useBlockingMode);
threadName = "Component SR - " + reader.hashCode();
} }
else if (serverPort.isServerPort()) { else if (serverPort.isServerPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure); SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
reader = new ServerSocketReader(router, serverName, sock, conn); return new ServerSocketReader(router, serverName, sock, conn, useBlockingMode);
threadName = "Server SR - " + reader.hashCode();
} }
else { else {
// Use the appropriate packeet deliverer for connection managers. The packet // Use the appropriate packeet deliverer for connection managers. The packet
...@@ -282,16 +276,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -282,16 +276,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// the connection manager has finished the handshake. // the connection manager has finished the handshake.
SocketConnection conn = SocketConnection conn =
new SocketConnection(new MultiplexerPacketDeliverer(), sock, isSecure); new SocketConnection(new MultiplexerPacketDeliverer(), sock, isSecure);
reader = new ConnectionMultiplexerSocketReader(router, serverName, sock, conn); return new ConnectionMultiplexerSocketReader(router, serverName, sock, conn,
threadName = "ConnectionMultiplexer SR - " + reader.hashCode(); useBlockingMode);
}
Thread thread = new Thread(reader, threadName);
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
thread.start();
}
catch (IOException e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment