NIOConnection.java 17.5 KB
Newer Older
1 2 3 4
/**
 * $Revision: $
 * $Date: $
 *
5
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
6
 *
7 8 9 10 11 12 13 14 15 16 17
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
18 19
 */

20
package org.jivesoftware.openfire.nio;
21

22
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.COMPRESSION_FILTER_NAME;
23 24 25
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.EXECUTOR_FILTER_NAME;
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.TLS_FILTER_NAME;

26
import java.io.IOException;
27 28 29 30
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
31
import java.nio.charset.CodingErrorAction;
32 33
import java.security.KeyStore;
import java.security.cert.Certificate;
34
import java.util.concurrent.locks.ReentrantLock;
35 36 37 38 39 40 41

import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;

42 43 44 45 46
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.compression.CompressionFilter;
import org.apache.mina.filter.ssl.SslFilter;
47
import org.dom4j.io.OutputFormat;
48 49 50 51
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener;
import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
52 53 54 55 56
import org.jivesoftware.openfire.net.ClientTrustManager;
import org.jivesoftware.openfire.net.SSLConfig;
import org.jivesoftware.openfire.net.SSLJiveKeyManagerFactory;
import org.jivesoftware.openfire.net.SSLJiveTrustManagerFactory;
import org.jivesoftware.openfire.net.ServerTrustManager;
57
import org.jivesoftware.openfire.session.ConnectionSettings;
58
import org.jivesoftware.openfire.session.LocalSession;
59
import org.jivesoftware.openfire.session.Session;
60 61
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.XMLWriter;
62 63
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
64 65 66 67 68 69 70 71 72 73 74 75
import org.xmpp.packet.Packet;

/**
 * Implementation of {@link Connection} inteface specific for NIO connections when using
 * the MINA framework.<p>
 *
 * MINA project can be found at <a href="http://mina.apache.org">here</a>.
 *
 * @author Gaston Dombiak
 */
public class NIOConnection implements Connection {

76 77
	private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class);

78 79 80 81 82
    /**
     * The utf-8 charset for decoding and encoding XMPP packet streams.
     */
    public static final String CHARSET = "UTF-8";

83
    private LocalSession session;
84 85
    private IoSession ioSession;

86
    private ConnectionCloseListener closeListener;
Gaston Dombiak's avatar
Gaston Dombiak committed
87

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
    /**
     * Deliverer to use when the connection is closed or was closed when delivering
     * a packet.
     */
    private PacketDeliverer backupDeliverer;
    private boolean flashClient = false;
    private int majorVersion = 1;
    private int minorVersion = 0;
    private String language = null;

    // TODO Uso el #checkHealth????
    /**
     * TLS policy currently in use for this connection.
     */
    private TLSPolicy tlsPolicy = TLSPolicy.optional;
103
    private boolean usingSelfSignedCertificate;
104 105 106 107 108

    /**
     * Compression policy currently in use for this connection.
     */
    private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
109
    private static ThreadLocal<CharsetEncoder> encoder = new ThreadLocalEncoder();
110 111 112 113 114 115 116
    /**
     * Flag that specifies if the connection should be considered closed. Closing a NIO connection
     * is an asynch operation so instead of waiting for the connection to be actually closed just
     * keep this flag to avoid using the connection between #close was used and the socket is actually
     * closed.
     */
    private boolean closed;
117 118
    
    /**
119 120 121 122 123 124 125
     * Lock used to ensure the integrity of the underlying IoSession (refer to
     * https://issues.apache.org/jira/browse/DIRMINA-653 for details)
     * <p>
     * This lock can be removed once Openfire guarantees a stable delivery
     * order, in which case {@link #deliver(Packet)} won't be called
     * concurrently any more, which made this lock necessary in the first place.
     * </p>
126
     */
127
    private final ReentrantLock ioSessionLock = new ReentrantLock(true);
128 129 130 131

    public NIOConnection(IoSession session, PacketDeliverer packetDeliverer) {
        this.ioSession = session;
        this.backupDeliverer = packetDeliverer;
132
        closed = false;
133 134 135 136 137 138 139 140 141 142
    }

    public boolean validate() {
        if (isClosed()) {
            return false;
        }
        deliverRawText(" ");
        return !isClosed();
    }

143 144 145 146
    public void registerCloseListener(ConnectionCloseListener listener, Object ignore) {
        if (closeListener != null) {
            throw new IllegalStateException("Close listener already configured");
        }
147
        if (isClosed()) {
148
            listener.onConnectionClose(session);
149 150
        }
        else {
151
            closeListener = listener;
152 153 154 155
        }
    }

    public void removeCloseListener(ConnectionCloseListener listener) {
156 157 158
        if (closeListener == listener) {
            closeListener = null;
        }
159 160
    }

161 162 163 164 165 166 167 168 169 170
    public byte[] getAddress() throws UnknownHostException {
        return ((InetSocketAddress) ioSession.getRemoteAddress()).getAddress().getAddress();
    }

    public String getHostAddress() throws UnknownHostException {
        return ((InetSocketAddress) ioSession.getRemoteAddress()).getAddress().getHostAddress();
    }

    public String getHostName() throws UnknownHostException {
        return ((InetSocketAddress) ioSession.getRemoteAddress()).getAddress().getHostName();
171 172
    }

173
    public Certificate[] getLocalCertificates() {
174
        SSLSession sslSession = (SSLSession) ioSession.getAttribute(SslFilter.SSL_SESSION);
175 176 177 178 179 180
        if (sslSession != null) {
            return sslSession.getLocalCertificates();
        }
        return new Certificate[0];
    }

181 182
    public Certificate[] getPeerCertificates() {
        try {
183
            SSLSession sslSession = (SSLSession) ioSession.getAttribute(SslFilter.SSL_SESSION);
184 185 186 187 188 189 190
            if (sslSession != null) {
                return sslSession.getPeerCertificates();
            }
        } catch (SSLPeerUnverifiedException e) {
            Log.warn("Error retrieving client certificates of: " + session, e);
        }
        return new Certificate[0];
191 192
    }

193 194 195 196 197 198 199 200
    public void setUsingSelfSignedCertificate(boolean isSelfSigned) {
        this.usingSelfSignedCertificate = isSelfSigned;
    }

    public boolean isUsingSelfSignedCertificate() {
        return usingSelfSignedCertificate;
    }

201 202 203 204 205
    public PacketDeliverer getPacketDeliverer() {
        return backupDeliverer;
    }

    public void close() {
206 207 208 209 210 211 212 213
    	synchronized(this) {
    		if (isClosed()) {
    			return;
    		}
            try {
                deliverRawText(flashClient ? "</flash:stream>" : "</stream:stream>", false);
            } catch (Exception e) {
                // Ignore
214
            }
215 216 217 218 219
            if (session != null) {
                session.setStatus(Session.STATUS_CLOSED);
            }
            closed = true;
    	}
220 221 222 223
    	
    	// OF-881: Notify any close listeners after the synchronized block has completed. 
    	notifyCloseListeners(); // clean up session, etc.
    	
224
        ioSession.close(false); // async via MINA
225 226 227 228 229 230 231 232 233 234 235 236 237
    }

    public void systemShutdown() {
        deliverRawText("<stream:error><system-shutdown " +
                "xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error>");
        close();
    }

    /**
     * Notifies all close listeners that the connection has been closed.
     * Used by subclasses to properly finish closing the connection.
     */
    private void notifyCloseListeners() {
238
        if (closeListener != null) {
Gaston Dombiak's avatar
Gaston Dombiak committed
239
            try {
240 241 242
                closeListener.onConnectionClose(session);
            } catch (Exception e) {
                Log.error("Error notifying listener: " + closeListener, e);
243 244 245 246
            }
        }
    }

247
    public void init(LocalSession owner) {
248 249 250
        session = owner;
    }

251 252
    public synchronized boolean isClosed() {
        return closed;
253 254 255
    }

    public boolean isSecure() {
256
        return ioSession.getFilterChain().contains(TLS_FILTER_NAME);
257 258 259 260
    }

    public void deliver(Packet packet) throws UnauthorizedException {
        if (isClosed()) {
261 262 263 264 265 266 267 268 269
        	// OF-857: Do not allow the backup deliverer to recurse
        	if (backupDeliverer == null) {
        		Log.error("Failed to deliver packet: " + packet.toXML());
        		throw new IllegalStateException("Connection closed");
        	}
        	// attempt to deliver via backup only once
        	PacketDeliverer backup = backupDeliverer;
            backupDeliverer = null;
            backup.deliver(packet);
270 271 272
        }
        else {
            boolean errorDelivering = false;
273 274
            IoBuffer buffer = IoBuffer.allocate(4096);
            buffer.setAutoExpand(true);
275
            try {
276 277 278 279
            	// OF-464: if the connection has been dropped, fail over to backupDeliverer (offline)
            	if (!ioSession.isConnected()) {
            		throw new IOException("Connection reset/closed by peer");
            	}
280
                XMLWriter xmlSerializer =
281
                        new XMLWriter(new ByteBufferWriter(buffer, encoder.get()), new OutputFormat());
282 283 284 285 286 287
                xmlSerializer.write(packet.getElement());
                xmlSerializer.flush();
                if (flashClient) {
                    buffer.put((byte) '\0');
                }
                buffer.flip();
288 289 290 291 292 293 294
                
                ioSessionLock.lock();
                try {
                    ioSession.write(buffer);
                } finally {
                    ioSessionLock.unlock();
                }
295 296
            }
            catch (Exception e) {
297
                Log.debug("Error delivering packet:\n" + packet, e);
298 299 300 301 302 303 304 305
                errorDelivering = true;
            }
            if (errorDelivering) {
                close();
                // Retry sending the packet again. Most probably if the packet is a
                // Message it will be stored offline
                backupDeliverer.deliver(packet);
            }
306 307 308
            else {
                session.incrementServerPacketCount();
            }
309 310 311 312
        }
    }

    public void deliverRawText(String text) {
313 314 315 316 317
        // Deliver the packet in asynchronous mode
        deliverRawText(text, true);
    }

    private void deliverRawText(String text, boolean asynchronous) {
318 319
        if (!isClosed()) {
            boolean errorDelivering = false;
320 321
            IoBuffer buffer = IoBuffer.allocate(text.length());
            buffer.setAutoExpand(true);
322 323 324 325 326 327 328 329
            try {
                //Charset charset = Charset.forName(CHARSET);
                //buffer.putString(text, charset.newEncoder());
                buffer.put(text.getBytes(CHARSET));
                if (flashClient) {
                    buffer.put((byte) '\0');
                }
                buffer.flip();
330 331 332 333 334 335 336 337
                ioSessionLock.lock();
                try {
                    if (asynchronous) {
                        // OF-464: handle dropped connections (no backupDeliverer in this case?)
                        if (!ioSession.isConnected()) {
                            throw new IOException("Connection reset/closed by peer");
                        }
                        ioSession.write(buffer);
338
                    }
339 340 341 342 343 344 345 346 347 348 349
                    else {
                        // Send stanza and wait for ACK (using a 2 seconds default timeout)
                        boolean ok =
                                ioSession.write(buffer).awaitUninterruptibly(JiveGlobals.getIntProperty("connection.ack.timeout", 2000));
                        if (!ok) {
                            Log.warn("No ACK was received when sending stanza to: " + this.toString());
                        }
                    }
                } 
                finally {
                    ioSessionLock.unlock();
350
                }
351 352
            }
            catch (Exception e) {
353
                Log.debug("Error delivering raw text:\n" + text, e);
354 355
                errorDelivering = true;
            }
356

357 358
            // Close the connection if delivering text fails and we are already not closing the connection
            if (errorDelivering && asynchronous) {
359 360 361 362 363
                close();
            }
        }
    }

364
    public void startTLS(boolean clientMode, String remoteServer, ClientAuth authentication) throws Exception {
365
        boolean c2s = (remoteServer == null);
366 367 368
        KeyStore ksKeys = SSLConfig.getKeyStore();
        String keypass = SSLConfig.getKeyPassword();

369 370 371 372
        KeyStore ksTrust = (c2s ? SSLConfig.getc2sTrustStore() : SSLConfig.gets2sTrustStore() );
        String trustpass = (c2s ? SSLConfig.getc2sTrustPassword() : SSLConfig.gets2sTrustPassword() );
        if (c2s)  Log.debug("NIOConnection: startTLS: using c2s");
        else Log.debug("NIOConnection: startTLS: using s2s");
373 374 375 376 377
        // KeyManager's decide which key material to use.
        KeyManager[] km = SSLJiveKeyManagerFactory.getKeyManagers(ksKeys, keypass);

        // TrustManager's decide whether to allow connections.
        TrustManager[] tm = SSLJiveTrustManagerFactory.getTrustManagers(ksTrust, trustpass);
378

379
        if (clientMode || authentication == ClientAuth.needed || authentication == ClientAuth.wanted) {
380 381 382 383 384 385
            // We might need to verify a certificate from our peer, so get different TrustManager[]'s
            if(c2s) {
                // Check if we can trust certificates presented by the client
                tm = new TrustManager[]{new ClientTrustManager(ksTrust)};
            } else {
                // Check if we can trust certificates presented by the server
386
                tm = new TrustManager[]{new ServerTrustManager(remoteServer, ksTrust, this)};
387
            }
388 389
        }

390
        String algorithm = JiveGlobals.getProperty(ConnectionSettings.Client.TLS_ALGORITHM, "TLS");
391
        SSLContext tlsContext = SSLContext.getInstance(algorithm);
392 393 394

        tlsContext.init(km, tm, null);

395
        SslFilter filter = new SslFilter(tlsContext);
396
        filter.setUseClientMode(clientMode);
397 398
        // Disable SSLv3 due to POODLE vulnerability.
        filter.setEnabledProtocols(new String[]{"TLSv1", "TLSv1.1", "TLSv1.2"});
399 400 401 402 403 404 405 406
        if (authentication == ClientAuth.needed) {
            filter.setNeedClientAuth(true);
        }
        else if (authentication == ClientAuth.wanted) {
            // Just indicate that we would like to authenticate the client but if client
            // certificates are self-signed or have no certificate chain then we are still
            // good
            filter.setWantClientAuth(true);
407
        }
408 409
        ioSession.getFilterChain().addAfter(EXECUTOR_FILTER_NAME, TLS_FILTER_NAME, filter);
        ioSession.setAttribute(SslFilter.DISABLE_ENCRYPTION_ONCE, Boolean.TRUE);
410

411 412 413 414 415 416
        if (!clientMode) {
            // Indicate the client that the server is ready to negotiate TLS
            deliverRawText("<proceed xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"/>");
        }
    }

417
    public void addCompression() {
418
        IoFilterChain chain = ioSession.getFilterChain();
419 420 421
        String baseFilter = EXECUTOR_FILTER_NAME;
        if (chain.contains(TLS_FILTER_NAME)) {
            baseFilter = TLS_FILTER_NAME;
422
        }
423
        chain.addAfter(baseFilter, COMPRESSION_FILTER_NAME, new CompressionFilter(true, false, CompressionFilter.COMPRESSION_MAX));
424 425 426
    }

    public void startCompression() {
427
        CompressionFilter ioFilter = (CompressionFilter) ioSession.getFilterChain().get(COMPRESSION_FILTER_NAME);
428
        ioFilter.setCompressOutbound(true);
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
    }

    public boolean isFlashClient() {
        return flashClient;
    }

    public void setFlashClient(boolean flashClient) {
        this.flashClient = flashClient;
    }

    public int getMajorXMPPVersion() {
        return majorVersion;
    }

    public int getMinorXMPPVersion() {
        return minorVersion;
    }

    public void setXMPPVersion(int majorVersion, int minorVersion) {
        this.majorVersion = majorVersion;
        this.minorVersion = minorVersion;
    }

    public String getLanguage() {
        return language;
    }

    public void setLanaguage(String language) {
        this.language = language;
    }

    public boolean isCompressed() {
461
        return ioSession.getFilterChain().contains(COMPRESSION_FILTER_NAME);
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
    }

    public CompressionPolicy getCompressionPolicy() {
        return compressionPolicy;
    }

    public void setCompressionPolicy(CompressionPolicy compressionPolicy) {
        this.compressionPolicy = compressionPolicy;
    }

    public TLSPolicy getTlsPolicy() {
        return tlsPolicy;
    }

    public void setTlsPolicy(TLSPolicy tlsPolicy) {
        this.tlsPolicy = tlsPolicy;
    }

480 481
    @Override
	public String toString() {
482 483
        return super.toString() + " MINA Session: " + ioSession;
    }
484

485
    private static class ThreadLocalEncoder extends ThreadLocal<CharsetEncoder> {
486

487
        @Override
488 489 490 491
		protected CharsetEncoder initialValue() {
            return Charset.forName(CHARSET).newEncoder()
				.onMalformedInput(CodingErrorAction.REPORT)
				.onUnmappableCharacter(CodingErrorAction.REPORT);
492 493
        }
    }
494
}