Commit edb9c306 authored by guus's avatar guus

First stab at replacing Stream Compression with a pluggable system. Note that...

First stab at replacing Stream Compression with a pluggable system. Note that this commit undoes JM-1059. I'll re-add the fix for that later.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/branches/rsm@9179 b35dd754-fafc-0310-a699-88a17e54d16e
parent 9d609206
...@@ -1828,6 +1828,7 @@ compression.settings.server.enable=Available ...@@ -1828,6 +1828,7 @@ compression.settings.server.enable=Available
compression.settings.server.enable_info=Communication between servers will try to use compressed traffic. compression.settings.server.enable_info=Communication between servers will try to use compressed traffic.
compression.settings.server.disable=Not Available compression.settings.server.disable=Not Available
compression.settings.server.disable_info=Communication between servers will not use compressed traffic. compression.settings.server.disable_info=Communication between servers will not use compressed traffic.
compression.settings.available.methods=Available Compression Algorithms
# General user # General user
......
...@@ -300,14 +300,16 @@ public interface Connection { ...@@ -300,14 +300,16 @@ public interface Connection {
* that he can start compressing the traffic. After we sent the uncompresses stanza we can * that he can start compressing the traffic. After we sent the uncompresses stanza we can
* start compression outgoing traffic as well. * start compression outgoing traffic as well.
*/ */
void addCompression(); // TODO: enable me again! JM-1059 void addCompression();
/** /**
* Start compressing outgoing traffic for this connection. Compression will only be available after * Start compressing outgoing traffic for this connection. Compression will only be available after
* TLS has been negotiated. This means that a connection can never be using compression before * TLS has been negotiated. This means that a connection can never be using compression before
* TLS. However, it is possible to use compression without TLS. * TLS. However, it is possible to use compression without TLS.
*
* @param methodIdentifier The name of the compression algorithm to use.
*/ */
void startCompression(); void startCompression(final String methodIdentifier);
/** /**
* Enumeration of possible compression policies required to interact with the server. * Enumeration of possible compression policies required to interact with the server.
......
...@@ -21,6 +21,7 @@ import org.jivesoftware.openfire.StreamID; ...@@ -21,6 +21,7 @@ import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.multiplex.UnknownStanzaException; import org.jivesoftware.openfire.multiplex.UnknownStanzaException;
import org.jivesoftware.openfire.net.SASLAuthentication; import org.jivesoftware.openfire.net.SASLAuthentication;
import org.jivesoftware.openfire.net.StreamCompressionManager;
import org.jivesoftware.openfire.net.VirtualConnection; import org.jivesoftware.openfire.net.VirtualConnection;
import org.jivesoftware.openfire.session.LocalClientSession; import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
...@@ -94,7 +95,9 @@ public class HttpSession extends LocalClientSession { ...@@ -94,7 +95,9 @@ public class HttpSession extends LocalClientSession {
// Include Stream Compression Mechanism // Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled && if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled &&
!conn.isCompressed()) { !conn.isCompressed() && StreamCompressionManager.isAvailable("zlib")) {
// TODO: we're only including zlib at this time. Maybe we should
// include every compression algorithm that's available.
Element compression = DocumentHelper.createElement(new QName("compression", Element compression = DocumentHelper.createElement(new QName("compression",
new Namespace("", "http://jabber.org/features/compress"))); new Namespace("", "http://jabber.org/features/compress")));
Element method = compression.addElement("method"); Element method = compression.addElement("method");
......
...@@ -11,8 +11,6 @@ ...@@ -11,8 +11,6 @@
package org.jivesoftware.openfire.net; package org.jivesoftware.openfire.net;
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
...@@ -185,19 +183,24 @@ class BlockingReadingMode extends SocketReadingMode { ...@@ -185,19 +183,24 @@ class BlockingReadingMode extends SocketReadingMode {
protected boolean compressClient(Element doc) throws XmlPullParserException, IOException { protected boolean compressClient(Element doc) throws XmlPullParserException, IOException {
boolean answer = super.compressClient(doc); boolean answer = super.compressClient(doc);
if (answer) { if (answer) {
final String method = doc.elementText("method");
if (!StreamCompressionManager.isAvailable(method)) {
Log.error("Although super.compressClient returned 'true', we're unable to apply stream compression!");
return false;
}
final StreamCompressor compressor = StreamCompressionManager
.getCompressor(method);
XmlPullParser xpp = socketReader.reader.getXPPParser(); XmlPullParser xpp = socketReader.reader.getXPPParser();
// Reset the parser since a new stream header has been sent from the client // Reset the parser since a new stream header has been sent from the
// client
if (socketReader.connection.getTLSStreamHandler() == null) { if (socketReader.connection.getTLSStreamHandler() == null) {
ZInputStream in = new ZInputStream( xpp.setInput(compressor.getReader(ServerTrafficCounter
ServerTrafficCounter.wrapInputStream(socket.getInputStream())); .wrapInputStream(socket.getInputStream())));
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH); } else {
xpp.setInput(new InputStreamReader(in, CHARSET)); xpp.setInput(compressor.getReader(socketReader.connection
} .getTLSStreamHandler().getInputStream()));
else {
ZInputStream in = new ZInputStream(
socketReader.connection.getTLSStreamHandler().getInputStream());
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
} }
} }
return answer; return answer;
......
...@@ -11,8 +11,6 @@ ...@@ -11,8 +11,6 @@
package org.jivesoftware.openfire.net; package org.jivesoftware.openfire.net;
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZOutputStream;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener; import org.jivesoftware.openfire.ConnectionCloseListener;
import org.jivesoftware.openfire.PacketDeliverer; import org.jivesoftware.openfire.PacketDeliverer;
...@@ -173,28 +171,33 @@ public class SocketConnection implements Connection { ...@@ -173,28 +171,33 @@ public class SocketConnection implements Connection {
// WARNING: We do not support adding compression for incoming traffic but not for outgoing traffic. // WARNING: We do not support adding compression for incoming traffic but not for outgoing traffic.
} }
public void startCompression() { public void startCompression(final String methodIdentifier) {
if (!StreamCompressionManager.isAvailable(methodIdentifier)) {
compressed = false;
Log.warn("Stream compression was requested, but requested method is not available: " + methodIdentifier);
return;
}
compressed = true; compressed = true;
try { try {
final StreamCompressor compressor = StreamCompressionManager.getCompressor(methodIdentifier);
if (tlsStreamHandler == null) { if (tlsStreamHandler == null) {
ZOutputStream out = new ZOutputStream( writer = compressor.getWriter(socket.getOutputStream());
ServerTrafficCounter.wrapOutputStream(socket.getOutputStream()),
JZlib.Z_BEST_COMPRESSION);
out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
writer = new BufferedWriter(new OutputStreamWriter(out, CHARSET));
xmlSerializer = new XMLSocketWriter(writer, this); xmlSerializer = new XMLSocketWriter(writer, this);
} }
else { else {
ZOutputStream out = new ZOutputStream(tlsStreamHandler.getOutputStream(), JZlib.Z_BEST_COMPRESSION); writer = compressor.getWriter(tlsStreamHandler.getOutputStream());
out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
writer = new BufferedWriter(new OutputStreamWriter(out, CHARSET));
xmlSerializer = new XMLSocketWriter(writer, this); xmlSerializer = new XMLSocketWriter(writer, this);
} }
} catch (IOException e) { } catch (IOException e) {
// TODO Would be nice to still be able to throw the exception and not catch it here // TODO Would be nice to still be able to throw the exception and not catch it here
Log.error("Error while starting compression", e); Log.error("Error while starting compression", e);
compressed = false; compressed = false;
} catch (IllegalStateException e) {
Log.error("Stream compression was requested, but requested method is not available: " + methodIdentifier, e);
compressed = false;
} }
} }
......
...@@ -165,6 +165,7 @@ abstract class SocketReadingMode { ...@@ -165,6 +165,7 @@ abstract class SocketReadingMode {
*/ */
protected boolean compressClient(Element doc) throws IOException, XmlPullParserException { protected boolean compressClient(Element doc) throws IOException, XmlPullParserException {
String error = null; String error = null;
String method = doc.elementText("method");
if (socketReader.connection.getCompressionPolicy() == Connection.CompressionPolicy.disabled) { if (socketReader.connection.getCompressionPolicy() == Connection.CompressionPolicy.disabled) {
// Client requested compression but this feature is disabled // Client requested compression but this feature is disabled
error = "<failure xmlns='http://jabber.org/protocol/compress'><setup-failed/></failure>"; error = "<failure xmlns='http://jabber.org/protocol/compress'><setup-failed/></failure>";
...@@ -181,8 +182,7 @@ abstract class SocketReadingMode { ...@@ -181,8 +182,7 @@ abstract class SocketReadingMode {
} }
else { else {
// Check that the requested method is supported // Check that the requested method is supported
String method = doc.elementText("method"); if (!StreamCompressionManager.isAvailable(method)) {
if (!"zlib".equals(method)) {
error = "<failure xmlns='http://jabber.org/protocol/compress'><unsupported-method/></failure>"; error = "<failure xmlns='http://jabber.org/protocol/compress'><unsupported-method/></failure>";
// Log a warning so that admins can track this case from the server side // Log a warning so that admins can track this case from the server side
Log.warn("Requested compression method is not supported: " + method + Log.warn("Requested compression method is not supported: " + method +
...@@ -196,14 +196,11 @@ abstract class SocketReadingMode { ...@@ -196,14 +196,11 @@ abstract class SocketReadingMode {
return false; return false;
} }
else { else {
// Start using compression for incoming traffic
socketReader.connection.addCompression();
// Indicate client that he can proceed and compress the socket // Indicate client that he can proceed and compress the socket
socketReader.connection.deliverRawText("<compressed xmlns='http://jabber.org/protocol/compress'/>"); socketReader.connection.deliverRawText("<compressed xmlns='http://jabber.org/protocol/compress'/>");
// Start using compression for outgoing traffic // Start using compression
socketReader.connection.startCompression(); socketReader.connection.startCompression(method);
return true; return true;
} }
} }
......
...@@ -42,7 +42,7 @@ public abstract class StanzaHandler { ...@@ -42,7 +42,7 @@ public abstract class StanzaHandler {
/** /**
* The utf-8 charset for decoding and encoding Jabber packet streams. * The utf-8 charset for decoding and encoding Jabber packet streams.
*/ */
protected static String CHARSET = "UTF-8"; protected static final String CHARSET = "UTF-8";
private Connection connection; private Connection connection;
// DANIELE: Indicate if a session is already created // DANIELE: Indicate if a session is already created
...@@ -438,6 +438,8 @@ public abstract class StanzaHandler { ...@@ -438,6 +438,8 @@ public abstract class StanzaHandler {
*/ */
private boolean compressClient(Element doc) { private boolean compressClient(Element doc) {
String error = null; String error = null;
String method = doc.elementText("method");
if (connection.getCompressionPolicy() == Connection.CompressionPolicy.disabled) { if (connection.getCompressionPolicy() == Connection.CompressionPolicy.disabled) {
// Client requested compression but this feature is disabled // Client requested compression but this feature is disabled
error = "<failure xmlns='http://jabber.org/protocol/compress'><setup-failed/></failure>"; error = "<failure xmlns='http://jabber.org/protocol/compress'><setup-failed/></failure>";
...@@ -454,8 +456,7 @@ public abstract class StanzaHandler { ...@@ -454,8 +456,7 @@ public abstract class StanzaHandler {
} }
else { else {
// Check that the requested method is supported // Check that the requested method is supported
String method = doc.elementText("method"); if (!StreamCompressionManager.isAvailable(method)) {
if (!"zlib".equals(method)) {
error = "<failure xmlns='http://jabber.org/protocol/compress'><unsupported-method/></failure>"; error = "<failure xmlns='http://jabber.org/protocol/compress'><unsupported-method/></failure>";
// Log a warning so that admins can track this case from the server side // Log a warning so that admins can track this case from the server side
Log.warn("Requested compression method is not supported: " + method + Log.warn("Requested compression method is not supported: " + method +
...@@ -470,13 +471,13 @@ public abstract class StanzaHandler { ...@@ -470,13 +471,13 @@ public abstract class StanzaHandler {
} }
else { else {
// Start using compression for incoming traffic // Start using compression for incoming traffic
connection.addCompression(); // TODO: Enable me again: JM-1059 connection.addCompression();
// Indicate client that he can proceed and compress the socket // Indicate client that he can proceed and compress the socket
connection.deliverRawText("<compressed xmlns='http://jabber.org/protocol/compress'/>"); connection.deliverRawText("<compressed xmlns='http://jabber.org/protocol/compress'/>");
// Start using compression for outgoing traffic // Start using compression for outgoing traffic
connection.startCompression(); connection.startCompression(method);
return true; return true;
} }
} }
......
package org.jivesoftware.openfire.net;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.Namespace;
import org.dom4j.QName;
/**
* The StreamCompressionManager allows for run-time adjustable, pluggable stream
* compression management. This means that stream compression algorithms can be
* added or removed run-time.
*
* @see StreamCompressor
* @author Guus der Kinderen, guus@nimbuzz.com
*/
public class StreamCompressionManager
{
/**
* The collection of all available Stream Compressors.
*/
private static final Map<String, StreamCompressor> availableCompressors = new ConcurrentHashMap<String, StreamCompressor>();
/**
* The singleton object of this class
*/
private static final StreamCompressionManager instance = new StreamCompressionManager();
// TODO: To make the interface completely pluggable, this needs to be done
// runtime instead.
static
{
register(new ZLibStreamCompressor());
}
/**
* The constructor of this class has been made private to enforce singleton
* behavior. The singleton instance of this class can be retrieved by
* calling {@link #getInstance()}.
*
* @see #getInstance()
*/
private StreamCompressionManager()
{
// Isn't intended to do anything.
}
/**
* Returns the singleton instance of this class.
*
* @return The StreamCompressorManager instance.
*/
public StreamCompressionManager getInstance()
{
return instance;
}
/**
* Registers a new StreamCompressor with this Manager. The compression
* algorithm provided by the StreamCompressor will be made available for use
* in Openfire.
*
* If, prior to the invocation of this method, this manager contains a
* StreamCompressor that matches the method identifier of the compressor to
* be added, the old StreamCompressor instance will be overwritten.
*
* Note that this method cannot return null or an empty String.
*
* @param compressor
* The StreamCompressor to be added.
* @return The method identifier of the newly added StreamCompressor.
* @throws IllegalArgumentException
* If the compressor argument is null, or if the compressor
* argument is a StreamCompressor instance of which the
* getIdentifier method returns null.
*/
public static String register(final StreamCompressor compressor)
{
if (compressor == null)
{
throw new IllegalArgumentException(
"Parameter 'compressor' cannot be null.");
}
final String identifier = compressor.getMethodIdentifier();
if (identifier == null || identifier.length() == 0)
{
throw new IllegalArgumentException(
"Parameter 'compressor' must be a StreamCompressor instance of which the method #getIdentifier does not return null or an empty String.");
}
availableCompressors.put(identifier, compressor);
return identifier;
}
/**
* Deregisters the StreamCompressor that matches the provided identifier.
* This removes the StreamCompressor from the set of available
* StreamCompression algorithms broadcasted by Openfire.
*
* @param identifier
* The method identifier of the StreamCompressor to be removed.
* @throws IllegalArgumentException
* If provided 'identifier' argument is null.
* @throws IllegalStateException
* If no StreamCompressor matching the method identifier has
* been registered.
*/
public static void deregister(final String identifier)
{
if (identifier == null)
{
throw new IllegalArgumentException(
"Parameter 'identifier' cannot be null.");
}
if (availableCompressors.remove(identifier) == null)
{
throw new IllegalStateException(
"Cannot deregister compressor. No compressor identified by '"
+ identifier + "' has been registered.");
}
}
/**
* Checks if a StreamCompressor matching the provided identifer was
* registered with this Manager.
*
* @param methodIdentifier
* The identifier to check.
* @return ''true'' the provided compression method identifier is available
* for usage, ''false'' otherwise.
*/
public static boolean isAvailable(final String methodIdentifier)
{
return availableCompressors.containsKey(methodIdentifier);
}
/**
* Lists all available compressors, by returning an (unmodifiable) Set of
* all compression method identifiers. This method will return an empty Set
* if no StreamCompressors are available.
*
* @return A set of all compression method idenfiers.
*/
public static Set<String> allAvailableMethods()
{
return Collections.unmodifiableSet(availableCompressors.keySet());
}
/**
* Returns the StreamCompressor matching the provided method identifier.
* Note that this method will not return ''null'' values. If unavailable
* StreamCompressors are requested, RuntimeExceptions will be thrown.
*
* @param methodIdentifier
* The identifier that denotes the requested StreamCompressor
* instance.
* @return StreamCompressor matching the identifier.
*
* @throws IllegalArgumentException
* If provided 'methodIdentifier' argument is null.
* @throws IllegalStateException
* If no StreamCompressor matching the method identifier has
* been registered.
*
*/
public static StreamCompressor getCompressor(String methodIdentifier)
{
if (methodIdentifier == null)
{
throw new IllegalArgumentException(
"Argument 'methodIdentifier' cannot be null.");
}
final StreamCompressor compressor = availableCompressors.get(methodIdentifier);
if (compressor == null)
{
throw new IllegalStateException("No compressor identified by '"
+ methodIdentifier + "' has been registered.");
}
return compressor;
}
/**
* Creates and returns a 'compression' element suitable to be included as a
* child element of the stream element. The compression element will reflect
* all compression methods available from the manager.
*
* @return A compression XML element containing all compression methods, or
* 'null' if no compression methods are available.
*/
public static Element getStreamCompressionFeature()
{
// TODO: we shouldn't iterate over the entire Map all the time. We can
// cache the response, and change the cache if the map gets changed
// instead.
if (!availableCompressors.isEmpty())
{
final Element compression = DocumentHelper.createElement(new QName(
"compression", new Namespace("",
"http://jabber.org/features/compress")));
for (String method : availableCompressors.keySet())
{
compression.addElement("method").setText(method);
}
return compression;
}
return null;
}
}
package org.jivesoftware.openfire.net;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import org.apache.mina.common.IoFilter;
/**
* A new Stream Compression algorithm (as defined by XEP-0138) can be added to
* Openfire by implementing the StreamCompressor interface. All implementing
* classes should be registered to the StreamCompressionManager.
*
* Each class implementing this interface will return three objects primarily
* used for compression: on one side, there are a Writer and Reader object
* (returned by {@link #getWriter(OutputStream)} and
* {@link #getReader(InputStream)}, and on the other side is a
* CompressionFilter object (returned by {@link #getCompressingIOFilter()}. The
* former is used by the (old) blocking IO package, while the latter is used by
* the (newer) non-Blocking network IO package (Apache MINA).
*
* @author Guus der Kinderen, guus@nimbuzz.com
* @see StreamCompressionManager
*/
public interface StreamCompressor
{
/**
* The identifier used by the XMPP server to denote this compression
* algorithm (e.g. 'zlib' or 'lzw'). This method cannot return null or an
* empty String.
*
* The return value of this method is used in the compression feature
* negotiation, as described in XEP-0138.
*
* @return The identifier of the compression algorithm used.
*/
public String getMethodIdentifier();
/**
* This method is used to apply compression to an OutputStream object.
*
* @param outputStream
* The (uncompressed) OutputStream
* @return Writer object that writes compressed data.
* @throws IOException
*/
public Writer getWriter(final OutputStream outputStream) throws IOException;
/**
* This method is used to apply decompression to an InputStream object.
*
* @param inputStream
* The (compressed) OutputStream
* @return Reader object that reads uncompressed data.
* @throws IOException
*/
public Reader getReader(final InputStream inputStream) throws IOException;
/**
* Returns a MINA filter that applies the compression algorithm.
*
* @return MINA filter that applies compression.
*/
public IoFilter getCompressingIOFilter();
}
...@@ -116,7 +116,7 @@ public abstract class VirtualConnection implements Connection { ...@@ -116,7 +116,7 @@ public abstract class VirtualConnection implements Connection {
//Ignore //Ignore
} }
public void startCompression() { public void startCompression(final String methodIdentifier) {
//Ignore //Ignore
} }
......
package org.jivesoftware.openfire.net;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import org.apache.mina.filter.CompressionFilter;
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import com.jcraft.jzlib.ZOutputStream;
/**
* Implements ZLib Stream Compression.
*
* Note that much of this code was copy/pasted from various other Openfire
* classes. These classes always assumed the ZLIB algorithm, if compression was
* enabled.
*
* @author Guus der Kinderen, guus@nimbuzz.com
*/
public class ZLibStreamCompressor implements StreamCompressor
{
/*
* (non-Javadoc)
*
* @see org.jivesoftware.openfire.net.StreamCompressor#getMethodIdentifier()
*/
public String getMethodIdentifier()
{
return "zlib";
}
/*
* (non-Javadoc)
*
* @see org.jivesoftware.openfire.net.StreamCompressor#getWriter(java.io.OutputStream)
*/
public Writer getWriter(OutputStream outputStream)
throws UnsupportedEncodingException
{
// TODO: move traffic counting outside this class!
final ZOutputStream out = new ZOutputStream(
ServerTrafficCounter.wrapOutputStream(outputStream),
JZlib.Z_BEST_COMPRESSION);
out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
return new BufferedWriter(new OutputStreamWriter(out,
SocketConnection.CHARSET));
}
/*
* (non-Javadoc)
*
* @see org.jivesoftware.openfire.net.StreamCompressor#getReader(java.io.InputStream)
*/
public Reader getReader(InputStream inputStream)
throws UnsupportedEncodingException
{
// TODO: move traffic counting outside this class!
final ZInputStream in = new ZInputStream(
ServerTrafficCounter.wrapInputStream(inputStream));
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
return new InputStreamReader(in, SocketConnection.CHARSET);
}
/*
* (non-Javadoc)
*
* @see org.jivesoftware.openfire.net.StreamCompressor#getCompressingIOFilter()
*/
public CompressionFilter getCompressingIOFilter()
{
return new CompressionFilter(CompressionFilter.COMPRESSION_MAX);
}
}
...@@ -24,6 +24,8 @@ import org.jivesoftware.openfire.net.SSLConfig; ...@@ -24,6 +24,8 @@ import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.SSLJiveKeyManagerFactory; import org.jivesoftware.openfire.net.SSLJiveKeyManagerFactory;
import org.jivesoftware.openfire.net.SSLJiveTrustManagerFactory; import org.jivesoftware.openfire.net.SSLJiveTrustManagerFactory;
import org.jivesoftware.openfire.net.ServerTrustManager; import org.jivesoftware.openfire.net.ServerTrustManager;
import org.jivesoftware.openfire.net.StreamCompressionManager;
import org.jivesoftware.openfire.net.StreamCompressor;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
...@@ -324,18 +326,15 @@ public class NIOConnection implements Connection { ...@@ -324,18 +326,15 @@ public class NIOConnection implements Connection {
} }
} }
public void addCompression() { public void startCompression(final String methodIdentifier) {
IoFilterChain chain = ioSession.getFilterChain(); IoFilterChain chain = ioSession.getFilterChain();
String baseFilter = "org.apache.mina.common.ExecutorThreadModel"; String baseFilter = "org.apache.mina.common.ExecutorThreadModel";
if (chain.contains("tls")) { if (chain.contains("tls")) {
baseFilter = "tls"; baseFilter = "tls";
} }
chain.addAfter(baseFilter, "compression", new CompressionFilter(true, false, CompressionFilter.COMPRESSION_MAX));
}
public void startCompression() { final StreamCompressor compressor = StreamCompressionManager.getCompressor(methodIdentifier);
CompressionFilter ioFilter = (CompressionFilter) ioSession.getFilterChain().get("compression"); chain.addAfter(baseFilter, "compression", compressor.getCompressingIOFilter());
ioFilter.setCompressOutbound(true);
} }
public boolean isFlashClient() { public boolean isFlashClient() {
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
package org.jivesoftware.openfire.session; package org.jivesoftware.openfire.session;
import org.dom4j.Element;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID; import org.jivesoftware.openfire.StreamID;
...@@ -20,6 +21,7 @@ import org.jivesoftware.openfire.auth.UnauthorizedException; ...@@ -20,6 +21,7 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.SASLAuthentication; import org.jivesoftware.openfire.net.SASLAuthentication;
import org.jivesoftware.openfire.net.SSLConfig; import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.SocketConnection; import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.StreamCompressionManager;
import org.jivesoftware.openfire.privacy.PrivacyList; import org.jivesoftware.openfire.privacy.PrivacyList;
import org.jivesoftware.openfire.privacy.PrivacyListManager; import org.jivesoftware.openfire.privacy.PrivacyListManager;
import org.jivesoftware.openfire.user.PresenceEventDispatcher; import org.jivesoftware.openfire.user.PresenceEventDispatcher;
...@@ -696,10 +698,12 @@ public class LocalClientSession extends LocalSession implements ClientSession { ...@@ -696,10 +698,12 @@ public class LocalClientSession extends LocalSession implements ClientSession {
StringBuilder sb = new StringBuilder(200); StringBuilder sb = new StringBuilder(200);
// Include Stream Compression Mechanism // Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled && if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled
!conn.isCompressed()) { && !conn.isCompressed()) {
sb.append( final Element compression = StreamCompressionManager.getStreamCompressionFeature();
"<compression xmlns=\"http://jabber.org/features/compress\"><method>zlib</method></compression>"); if (compression != null) {
sb.append(compression.asXML());
}
} }
if (getAuthToken() == null) { if (getAuthToken() == null) {
......
...@@ -11,6 +11,7 @@ import org.jivesoftware.openfire.multiplex.ConnectionMultiplexerManager; ...@@ -11,6 +11,7 @@ import org.jivesoftware.openfire.multiplex.ConnectionMultiplexerManager;
import org.jivesoftware.openfire.multiplex.MultiplexerPacketDeliverer; import org.jivesoftware.openfire.multiplex.MultiplexerPacketDeliverer;
import org.jivesoftware.openfire.net.SASLAuthentication; import org.jivesoftware.openfire.net.SASLAuthentication;
import org.jivesoftware.openfire.net.SocketConnection; import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.StreamCompressionManager;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
...@@ -187,9 +188,12 @@ public class LocalConnectionMultiplexerSession extends LocalSession implements C ...@@ -187,9 +188,12 @@ public class LocalConnectionMultiplexerSession extends LocalSession implements C
} }
// Include Stream Compression Mechanism // Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled && if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled
!conn.isCompressed()) { && !conn.isCompressed()) {
return "<compression xmlns=\"http://jabber.org/features/compress\"><method>zlib</method></compression>"; final Element compression = StreamCompressionManager.getStreamCompressionFeature();
if (compression != null) {
return compression.asXML();
}
} }
return null; return null;
} }
...@@ -262,8 +266,10 @@ public class LocalConnectionMultiplexerSession extends LocalSession implements C ...@@ -262,8 +266,10 @@ public class LocalConnectionMultiplexerSession extends LocalSession implements C
} }
// Add info about Stream Compression // Add info about Stream Compression
if (LocalClientSession.getCompressionPolicy() == Connection.CompressionPolicy.optional) { if (LocalClientSession.getCompressionPolicy() == Connection.CompressionPolicy.optional) {
Element comp = child.addElement("compression", "http://jabber.org/features/compress"); final Element compression = StreamCompressionManager.getStreamCompressionFeature();
comp.addElement("method").setText("zlib"); if (compression != null) {
child.add(compression);
}
} }
// Add info about Non-SASL authentication // Add info about Non-SASL authentication
child.addElement("auth", "http://jabber.org/features/iq-auth"); child.addElement("auth", "http://jabber.org/features/iq-auth");
......
...@@ -19,6 +19,7 @@ import org.jivesoftware.openfire.auth.UnauthorizedException; ...@@ -19,6 +19,7 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.SASLAuthentication; import org.jivesoftware.openfire.net.SASLAuthentication;
import org.jivesoftware.openfire.net.SSLConfig; import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.SocketConnection; import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.StreamCompressionManager;
import org.jivesoftware.openfire.server.ServerDialback; import org.jivesoftware.openfire.server.ServerDialback;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
...@@ -331,7 +332,10 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -331,7 +332,10 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
// Include Stream Compression Mechanism // Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled && if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled &&
!conn.isCompressed()) { !conn.isCompressed()) {
return "<compression xmlns=\"http://jabber.org/features/compress\"><method>zlib</method></compression>"; final Element compression = StreamCompressionManager.getStreamCompressionFeature();
if (compression != null) {
return compression.asXML();
}
} }
// Nothing special to add // Nothing special to add
return null; return null;
......
...@@ -11,8 +11,6 @@ ...@@ -11,8 +11,6 @@
package org.jivesoftware.openfire.session; package org.jivesoftware.openfire.session;
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader; import org.dom4j.io.XMPPPacketReader;
...@@ -21,6 +19,8 @@ import org.jivesoftware.openfire.auth.UnauthorizedException; ...@@ -21,6 +19,8 @@ import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.DNSUtil; import org.jivesoftware.openfire.net.DNSUtil;
import org.jivesoftware.openfire.net.MXParser; import org.jivesoftware.openfire.net.MXParser;
import org.jivesoftware.openfire.net.SocketConnection; import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.StreamCompressionManager;
import org.jivesoftware.openfire.net.StreamCompressor;
import org.jivesoftware.openfire.server.OutgoingServerSocketReader; import org.jivesoftware.openfire.server.OutgoingServerSocketReader;
import org.jivesoftware.openfire.server.RemoteServerConfiguration; import org.jivesoftware.openfire.server.RemoteServerConfiguration;
import org.jivesoftware.openfire.server.RemoteServerManager; import org.jivesoftware.openfire.server.RemoteServerManager;
...@@ -388,31 +388,34 @@ public class LocalOutgoingServerSession extends LocalSession implements Outgoing ...@@ -388,31 +388,34 @@ public class LocalOutgoingServerSession extends LocalSession implements Outgoing
// Verify if the remote server supports stream compression // Verify if the remote server supports stream compression
Element compression = features.element("compression"); Element compression = features.element("compression");
if (compression != null) { if (compression != null) {
boolean zlibSupported = false; boolean matchingCompressors = false;
String methodIdentifier = null;
Iterator it = compression.elementIterator("method"); Iterator it = compression.elementIterator("method");
while (it.hasNext()) { while (it.hasNext()) {
Element method = (Element) it.next(); Element method = (Element) it.next();
if ("zlib".equals(method.getTextTrim())) { methodIdentifier = method.getTextTrim();
zlibSupported = true; if (StreamCompressionManager.isAvailable(methodIdentifier)) {
// TODO: we now accept the first method that matches. It'd be
// nicer to prioritize this somehow.
matchingCompressors = true;
break;
} }
} }
if (zlibSupported) { if (matchingCompressors && methodIdentifier.length() > 0) {
// Request Stream Compression // Request Stream Compression
connection.deliverRawText("<compress xmlns='http://jabber.org/protocol/compress'><method>zlib</method></compress>"); connection.deliverRawText("<compress xmlns='http://jabber.org/protocol/compress'><method>"+methodIdentifier+"</method></compress>");
// Check if we are good to start compression // Check if we are good to start compression
Element answer = reader.parseDocument().getRootElement(); Element answer = reader.parseDocument().getRootElement();
if ("compressed".equals(answer.getName())) { if ("compressed".equals(answer.getName())) {
// Server confirmed that we can use zlib compression // Server confirmed that we can use compression
connection.addCompression(); connection.addCompression();
connection.startCompression(); connection.startCompression(methodIdentifier);
Log.debug("OS - Stream compression was successful with " + hostname); Log.debug("OS - Stream compression was successful with " + hostname);
// Stream compression was successful so initiate a new stream // Stream compression was successful so initiate a new stream
connection.deliverRawText(openingStream.toString()); connection.deliverRawText(openingStream.toString());
// Reset the parser to use stream compression over TLS // Reset the parser to use stream compression over TLS
ZInputStream in = new ZInputStream( final StreamCompressor compressor = StreamCompressionManager.getCompressor(methodIdentifier);
connection.getTLSStreamHandler().getInputStream()); xpp.setInput(compressor.getReader(connection.getTLSStreamHandler().getInputStream()));
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
// Skip the opening stream sent by the server // Skip the opening stream sent by the server
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;)
{ {
...@@ -431,12 +434,12 @@ public class LocalOutgoingServerSession extends LocalSession implements Outgoing ...@@ -431,12 +434,12 @@ public class LocalOutgoingServerSession extends LocalSession implements Outgoing
} }
else { else {
Log.debug( Log.debug(
"OS - Stream compression found but zlib method is not supported by" + "OS - Stream compression found but no valid method is supported by " +
hostname); hostname);
} }
} }
else { else {
Log.debug("OS - Stream compression not supoprted by " + hostname); Log.debug("OS - Stream compression not supported by " + hostname);
} }
} }
......
...@@ -9,10 +9,12 @@ ...@@ -9,10 +9,12 @@
- a copy of which is included in this distribution. - a copy of which is included in this distribution.
--%> --%>
<%@ page import="org.jivesoftware.openfire.Connection, <%@ page import="java.util.Set,
org.jivesoftware.openfire.Connection,
org.jivesoftware.openfire.PrivateStorage, org.jivesoftware.openfire.PrivateStorage,
org.jivesoftware.openfire.session.LocalClientSession, org.jivesoftware.openfire.session.LocalClientSession,
org.jivesoftware.util.JiveGlobals" org.jivesoftware.util.JiveGlobals,
org.jivesoftware.openfire.net.StreamCompressionManager"
errorPage="error.jsp" errorPage="error.jsp"
%> %>
<%@ page import="org.jivesoftware.util.ParamUtils" %> <%@ page import="org.jivesoftware.util.ParamUtils" %>
...@@ -37,6 +39,9 @@ ...@@ -37,6 +39,9 @@
// Get an audit manager: // Get an audit manager:
PrivateStorage privateStorage = webManager.getPrivateStore(); PrivateStorage privateStorage = webManager.getPrivateStore();
// Get currently available compression methods.
Set<String> allMethods = StreamCompressionManager.allAvailableMethods();
if (update) { if (update) {
// Update c2s compression policy // Update c2s compression policy
LocalClientSession.setCompressionPolicy( LocalClientSession.setCompressionPolicy(
...@@ -136,6 +141,22 @@ ...@@ -136,6 +141,22 @@
</tr> </tr>
</tbody> </tbody>
</table> </table>
<br>
<br>
<h4><fmt:message key="compression.settings.available.methods" /></h4>
<% if (allMethods.isEmpty()) { %>
<i>(none)</i>
<% } else {%>
<ul>
<% for (final String method : allMethods) { %>
<li><%= method %></li>
<% } %>
<% } %>
</ul>
</div> </div>
<input type="submit" name="update" value="<fmt:message key="global.save_settings" />"> <input type="submit" name="update" value="<fmt:message key="global.save_settings" />">
</form> </form>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment