NIOConnection.java 15.3 KB
Newer Older
1
/*
2
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
3
 *
4 5 6 7 8 9 10 11 12 13 14
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
15 16
 */

17
package org.jivesoftware.openfire.nio;
18

19
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.COMPRESSION_FILTER_NAME;
20 21 22
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.EXECUTOR_FILTER_NAME;
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.TLS_FILTER_NAME;

23
import java.io.IOException;
24
import java.net.InetAddress;
25
import java.net.InetSocketAddress;
26
import java.net.SocketAddress;
27
import java.net.UnknownHostException;
28
import java.nio.charset.Charset;
29
import java.nio.charset.CharsetEncoder;
30
import java.nio.charset.CodingErrorAction;
Guus der Kinderen's avatar
Guus der Kinderen committed
31
import java.nio.charset.StandardCharsets;
32
import java.security.cert.Certificate;
Guus der Kinderen's avatar
Guus der Kinderen committed
33
import java.util.concurrent.atomic.AtomicReference;
34
import java.util.concurrent.locks.ReentrantLock;
35

36
import javax.net.ssl.*;
37

38 39 40 41 42
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.compression.CompressionFilter;
import org.apache.mina.filter.ssl.SslFilter;
43
import org.dom4j.io.OutputFormat;
44 45 46 47
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener;
import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
48
import org.jivesoftware.openfire.keystore.*;
49
import org.jivesoftware.openfire.net.*;
50
import org.jivesoftware.openfire.session.LocalSession;
51
import org.jivesoftware.openfire.session.Session;
52 53
import org.jivesoftware.openfire.spi.ConnectionConfiguration;
import org.jivesoftware.openfire.spi.ConnectionType;
54
import org.jivesoftware.openfire.spi.EncryptionArtifactFactory;
55
import org.jivesoftware.util.XMLWriter;
56 57
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
58 59 60
import org.xmpp.packet.Packet;

/**
61
 * Implementation of {@link Connection} interface specific for NIO connections when using the Apache MINA framework.
62 63
 *
 * @author Gaston Dombiak
64
 * @see <a href="http://mina.apache.org">Apache MINA</a>
65 66 67
 */
public class NIOConnection implements Connection {

68
	private static final Logger Log = LoggerFactory.getLogger(NIOConnection.class);
69
    private ConnectionConfiguration configuration;
70

71 72 73 74
    /**
     * The utf-8 charset for decoding and encoding XMPP packet streams.
     */
    public static final String CHARSET = "UTF-8";
75

76
    private LocalSession session;
77 78
    private IoSession ioSession;

79
    private ConnectionCloseListener closeListener;
Gaston Dombiak's avatar
Gaston Dombiak committed
80

81 82 83 84 85 86 87 88 89 90 91 92 93 94
    /**
     * Deliverer to use when the connection is closed or was closed when delivering
     * a packet.
     */
    private PacketDeliverer backupDeliverer;
    private boolean flashClient = false;
    private int majorVersion = 1;
    private int minorVersion = 0;
    private String language = null;

    /**
     * TLS policy currently in use for this connection.
     */
    private TLSPolicy tlsPolicy = TLSPolicy.optional;
95
    private boolean usingSelfSignedCertificate;
96 97 98 99 100

    /**
     * Compression policy currently in use for this connection.
     */
    private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
101
    private static final ThreadLocal<CharsetEncoder> encoder = new ThreadLocalEncoder();
102

103 104 105 106 107 108
    /**
     * Flag that specifies if the connection should be considered closed. Closing a NIO connection
     * is an asynch operation so instead of waiting for the connection to be actually closed just
     * keep this flag to avoid using the connection between #close was used and the socket is actually
     * closed.
     */
Guus der Kinderen's avatar
Guus der Kinderen committed
109
    private AtomicReference<State> state = new AtomicReference<>(State.OPEN);
110 111
    
    /**
112 113 114 115 116 117 118
     * Lock used to ensure the integrity of the underlying IoSession (refer to
     * https://issues.apache.org/jira/browse/DIRMINA-653 for details)
     * <p>
     * This lock can be removed once Openfire guarantees a stable delivery
     * order, in which case {@link #deliver(Packet)} won't be called
     * concurrently any more, which made this lock necessary in the first place.
     * </p>
119
     */
120
    private final ReentrantLock ioSessionLock = new ReentrantLock(true);
121

122
    public NIOConnection( IoSession session, PacketDeliverer packetDeliverer, ConnectionConfiguration configuration ) {
123 124
        this.ioSession = session;
        this.backupDeliverer = packetDeliverer;
125
        this.configuration = configuration;
126 127
    }

128
    @Override
129 130 131 132 133 134 135 136
    public boolean validate() {
        if (isClosed()) {
            return false;
        }
        deliverRawText(" ");
        return !isClosed();
    }

137
    @Override
138 139 140 141
    public void registerCloseListener(ConnectionCloseListener listener, Object ignore) {
        if (closeListener != null) {
            throw new IllegalStateException("Close listener already configured");
        }
142
        if (isClosed()) {
143
            listener.onConnectionClose(session);
144 145
        }
        else {
146
            closeListener = listener;
147 148 149
        }
    }

150
    @Override
151
    public void removeCloseListener(ConnectionCloseListener listener) {
152 153 154
        if (closeListener == listener) {
            closeListener = null;
        }
155 156
    }

157
    @Override
158
    public byte[] getAddress() throws UnknownHostException {
159
        final SocketAddress remoteAddress = ioSession.getRemoteAddress();
160
        if (remoteAddress == null) throw new UnknownHostException();
161 162 163
        final InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress;
        final InetAddress address = socketAddress.getAddress();
        return address.getAddress();
164 165
    }

166
    @Override
167
    public String getHostAddress() throws UnknownHostException {
168
        final SocketAddress remoteAddress = ioSession.getRemoteAddress();
169
        if (remoteAddress == null) throw new UnknownHostException();
170 171 172
        final InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress;
        final InetAddress inetAddress = socketAddress.getAddress();
        return inetAddress.getHostAddress();
173 174
    }

175
    @Override
176
    public String getHostName() throws UnknownHostException {
177
        final SocketAddress remoteAddress = ioSession.getRemoteAddress();
178
        if (remoteAddress == null) throw new UnknownHostException();
179 180 181
        final InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress;
        final InetAddress inetAddress = socketAddress.getAddress();
        return inetAddress.getHostName();
182 183
    }

184
    @Override
185
    public Certificate[] getLocalCertificates() {
186
        SSLSession sslSession = (SSLSession) ioSession.getAttribute(SslFilter.SSL_SESSION);
187 188 189 190 191 192
        if (sslSession != null) {
            return sslSession.getLocalCertificates();
        }
        return new Certificate[0];
    }

193
    @Override
194 195
    public Certificate[] getPeerCertificates() {
        try {
196
            SSLSession sslSession = (SSLSession) ioSession.getAttribute(SslFilter.SSL_SESSION);
197 198 199 200
            if (sslSession != null) {
                return sslSession.getPeerCertificates();
            }
        } catch (SSLPeerUnverifiedException e) {
201 202 203 204
            if (Log.isTraceEnabled()) {
                // This is perfectly acceptable when mutual authentication is not enforced by Openfire configuration.
                Log.trace( "Peer does not offer certificates in session: " + session, e);
            }
205 206
        }
        return new Certificate[0];
207 208
    }

209
    @Override
210 211 212 213
    public void setUsingSelfSignedCertificate(boolean isSelfSigned) {
        this.usingSelfSignedCertificate = isSelfSigned;
    }

214
    @Override
215 216 217 218
    public boolean isUsingSelfSignedCertificate() {
        return usingSelfSignedCertificate;
    }

219
    @Override
220 221 222 223
    public PacketDeliverer getPacketDeliverer() {
        return backupDeliverer;
    }

224
    @Override
225
    public void close() {
226
    	if (state.compareAndSet(State.OPEN, State.CLOSED)) {
227

228 229 230 231
            // Ensure that the state of this connection, its session and the MINA context are eventually closed.

    		if ( session != null ) {
                session.setStatus( Session.STATUS_CLOSED );
232
                }
233 234

            try {
235
                            deliverRawText( flashClient ? "</flash:stream>" : "</stream:stream>" );
236 237
            } catch ( Exception e ) {
                Log.error("Failed to deliver stream close tag: " + e.getMessage());
238 239
                }

240
            try {
241
                ioSession.close( true );
242 243 244 245
            } catch (Exception e) {
                Log.error("Exception while closing MINA session", e);
            }
            notifyCloseListeners(); // clean up session, etc.
246
        }
247 248
    }

249
    @Override
250 251 252 253 254 255 256 257 258 259 260
    public void systemShutdown() {
        deliverRawText("<stream:error><system-shutdown " +
                "xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error>");
        close();
    }

    /**
     * Notifies all close listeners that the connection has been closed.
     * Used by subclasses to properly finish closing the connection.
     */
    private void notifyCloseListeners() {
261
        if (closeListener != null) {
Gaston Dombiak's avatar
Gaston Dombiak committed
262
            try {
263 264 265
                closeListener.onConnectionClose(session);
            } catch (Exception e) {
                Log.error("Error notifying listener: " + closeListener, e);
266 267 268 269
            }
        }
    }

270
    @Override
271
    public void init(LocalSession owner) {
272 273 274
        session = owner;
    }

275
    @Override
276
    public boolean isClosed() {
277
    	return state.get() == State.CLOSED;
278 279
    }

280
    @Override
281
    public boolean isSecure() {
282
        return ioSession.getFilterChain().contains(TLS_FILTER_NAME);
283 284
    }

285
    @Override
286
    public void deliver(Packet packet) throws UnauthorizedException {
287
        if (isClosed()) {
288
        	backupDeliverer.deliver(packet);
289 290 291
        }
        else {
            boolean errorDelivering = false;
292 293
            IoBuffer buffer = IoBuffer.allocate(4096);
            buffer.setAutoExpand(true);
294
            try {
295
                buffer.putString(packet.getElement().asXML(), encoder.get());
296 297 298 299
                if (flashClient) {
                    buffer.put((byte) '\0');
                }
                buffer.flip();
300 301 302 303 304 305 306
                
                ioSessionLock.lock();
                try {
                    ioSession.write(buffer);
                } finally {
                    ioSessionLock.unlock();
                }
307 308
            }
            catch (Exception e) {
309
                Log.debug("Error delivering packet:\n" + packet, e);
310 311 312 313 314 315 316 317
                errorDelivering = true;
            }
            if (errorDelivering) {
                close();
                // Retry sending the packet again. Most probably if the packet is a
                // Message it will be stored offline
                backupDeliverer.deliver(packet);
            }
318 319 320
            else {
                session.incrementServerPacketCount();
            }
321 322 323
        }
    }

324
    @Override
325
    public void deliverRawText(String text) {
326
        if (!isClosed()) {
327
            boolean errorDelivering = false;
328 329
            IoBuffer buffer = IoBuffer.allocate(text.length());
            buffer.setAutoExpand(true);
330 331 332
            try {
                //Charset charset = Charset.forName(CHARSET);
                //buffer.putString(text, charset.newEncoder());
333
                buffer.put(text.getBytes(StandardCharsets.UTF_8));
334 335 336 337
                if (flashClient) {
                    buffer.put((byte) '\0');
                }
                buffer.flip();
338 339
                ioSessionLock.lock();
                try {
340 341
                    ioSession.write(buffer);
                }
342 343
                finally {
                    ioSessionLock.unlock();
344
                }
345 346
            }
            catch (Exception e) {
347
                Log.debug("Error delivering raw text:\n" + text, e);
348 349
                errorDelivering = true;
            }
350

351 352
            // Attempt to close the connection if delivering text fails.
            if (errorDelivering) {
353 354 355 356 357
                close();
            }
        }
    }

358 359
    @Deprecated
	@Override
360
    public void startTLS(boolean clientMode, String remoteServer, ClientAuth authentication) throws Exception {
361
        startTLS( clientMode );
362
    }
363

364
    public void startTLS(boolean clientMode) throws Exception {
365

366
        final EncryptionArtifactFactory factory = new EncryptionArtifactFactory( configuration );
367
        final SslFilter filter;
368 369
        if ( clientMode )
        {
370
            filter = factory.createClientModeSslFilter();
371
        }
372 373
        else
        {
374
            filter = factory.createServerModeSslFilter();
375 376
        }

377
        ioSession.getFilterChain().addBefore(EXECUTOR_FILTER_NAME, TLS_FILTER_NAME, filter);
378
        ioSession.setAttribute(SslFilter.DISABLE_ENCRYPTION_ONCE, Boolean.TRUE);
379

380
        if ( !clientMode ) {
381
            // Indicate the client that the server is ready to negotiate TLS
382
            deliverRawText( "<proceed xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"/>" );
383 384 385
        }
    }

386
    @Override
387
    public void addCompression() {
388
        IoFilterChain chain = ioSession.getFilterChain();
389 390 391
        String baseFilter = EXECUTOR_FILTER_NAME;
        if (chain.contains(TLS_FILTER_NAME)) {
            baseFilter = TLS_FILTER_NAME;
392
        }
393
        chain.addAfter(baseFilter, COMPRESSION_FILTER_NAME, new CompressionFilter(true, false, CompressionFilter.COMPRESSION_MAX));
394 395
    }

396
    @Override
397
    public void startCompression() {
398
        CompressionFilter ioFilter = (CompressionFilter) ioSession.getFilterChain().get(COMPRESSION_FILTER_NAME);
399
        ioFilter.setCompressOutbound(true);
400 401
    }

402
    @Override
403 404 405 406 407
    public ConnectionConfiguration getConfiguration()
    {
        return configuration;
    }

408 409 410 411
    public boolean isFlashClient() {
        return flashClient;
    }

412
    @Override
413 414 415 416
    public void setFlashClient(boolean flashClient) {
        this.flashClient = flashClient;
    }

417
    @Override
418 419 420 421
    public int getMajorXMPPVersion() {
        return majorVersion;
    }

422
    @Override
423 424 425 426
    public int getMinorXMPPVersion() {
        return minorVersion;
    }

427
    @Override
428 429 430 431 432
    public void setXMPPVersion(int majorVersion, int minorVersion) {
        this.majorVersion = majorVersion;
        this.minorVersion = minorVersion;
    }

433
    @Override
434
    public boolean isCompressed() {
435
        return ioSession.getFilterChain().contains(COMPRESSION_FILTER_NAME);
436 437
    }

438
    @Override
439 440 441 442
    public CompressionPolicy getCompressionPolicy() {
        return compressionPolicy;
    }

443
    @Override
444 445 446 447
    public void setCompressionPolicy(CompressionPolicy compressionPolicy) {
        this.compressionPolicy = compressionPolicy;
    }

448
    @Override
449 450 451 452
    public TLSPolicy getTlsPolicy() {
        return tlsPolicy;
    }

453
    @Override
454 455 456 457
    public void setTlsPolicy(TLSPolicy tlsPolicy) {
        this.tlsPolicy = tlsPolicy;
    }

458 459
    @Override
	public String toString() {
460 461
        return super.toString() + " MINA Session: " + ioSession;
    }
462

463
    private static class ThreadLocalEncoder extends ThreadLocal<CharsetEncoder> {
464

465
        @Override
466
		protected CharsetEncoder initialValue() {
467
            return StandardCharsets.UTF_8.newEncoder()
468 469
				.onMalformedInput(CodingErrorAction.REPORT)
				.onUnmappableCharacter(CodingErrorAction.REPORT);
470 471
        }
    }
472
}