SocketConnection.java 25 KB
Newer Older
1
/*
2
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
3
 *
4 5 6 7 8 9 10 11 12 13 14
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
15 16
 */

17
package org.jivesoftware.openfire.net;
18 19 20 21 22 23

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.Socket;
24
import java.net.UnknownHostException;
25
import java.nio.channels.Channels;
26
import java.nio.charset.StandardCharsets;
27
import java.security.cert.Certificate;
28 29 30 31 32
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
33
import java.util.concurrent.atomic.AtomicBoolean;
34
import java.util.concurrent.atomic.AtomicReference;
35

36 37
import javax.net.ssl.SSLPeerUnverifiedException;

38
import org.jivesoftware.openfire.*;
39 40 41 42
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session;
43 44 45
import org.jivesoftware.openfire.spi.ConnectionConfiguration;
import org.jivesoftware.openfire.spi.ConnectionManagerImpl;
import org.jivesoftware.openfire.spi.ConnectionType;
46 47 48 49 50 51 52 53 54
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.Packet;

import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZOutputStream;

55 56 57 58 59 60
/**
 * An object to track the state of a XMPP client-server session.
 * Currently this class contains the socket channel connecting the
 * client and server.
 *
 * @author Iain Shigeoka
61
 * @deprecated Old, pre NIO / MINA code. Should not be used as NIO offers better performance. Currently only in use for s2s.
62 63 64
 */
public class SocketConnection implements Connection {

65 66
	private static final Logger Log = LoggerFactory.getLogger(SocketConnection.class);

67
    private static Map<SocketConnection, String> instances =
68
            new ConcurrentHashMap<>();
69 70 71 72 73 74 75 76 77 78 79 80

    /**
     * Milliseconds a connection has to be idle to be closed. Timeout is disabled by default. It's
     * up to the connection's owner to configure the timeout value. Sending stanzas to the client
     * is not considered as activity. We are only considering the connection active when the
     * client sends some data or hearbeats (i.e. whitespaces) to the server.
     * The reason for this is that sending data will fail if the connection is closed. And if
     * the thread is blocked while sending data (because the socket is closed) then the clean up
     * thread will close the socket anyway.
     */
    private long idleTimeout = -1;

81
    final private Map<ConnectionCloseListener, Object> listeners =
82
            new HashMap<>();
83 84 85 86 87

    private Socket socket;
    private SocketReader socketReader;

    private Writer writer;
88
    private AtomicBoolean writing = new AtomicBoolean(false);
89
    private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
90

91 92 93 94 95
    /**
     * Deliverer to use when the connection is closed or was closed when delivering
     * a packet.
     */
    private PacketDeliverer backupDeliverer;
96

97
    private LocalSession session;
98
    private boolean secure;
99
    private boolean compressed;
100 101 102 103 104 105 106 107 108 109 110 111 112
    private org.jivesoftware.util.XMLWriter xmlSerializer;
    private boolean flashClient = false;
    private int majorVersion = 1;
    private int minorVersion = 0;
    private String language = null;
    private TLSStreamHandler tlsStreamHandler;

    private long writeStarted = -1;

    /**
     * TLS policy currently in use for this connection.
     */
    private TLSPolicy tlsPolicy = TLSPolicy.optional;
113
    private boolean usingSelfSignedCertificate;
114

115 116 117 118
    /**
     * Compression policy currently in use for this connection.
     */
    private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
119 120 121 122 123 124 125 126

    public static Collection<SocketConnection> getInstances() {
        return instances.keySet();
    }

    /**
     * Create a new session using the supplied socket.
     *
127
     * @param backupDeliverer the packet deliverer this connection will use when socket is closed.
128 129
     * @param socket the socket to represent.
     * @param isSecure true if this is a secure connection.
130
     * @throws java.io.IOException if there was a socket error while sending the packet.
131
     */
132
    public SocketConnection(PacketDeliverer backupDeliverer, Socket socket, boolean isSecure)
133 134 135 136 137 138 139
            throws IOException {
        if (socket == null) {
            throw new NullPointerException("Socket channel must be non-null");
        }

        this.secure = isSecure;
        this.socket = socket;
140 141
        // DANIELE: Modify socket to use channel
        if (socket.getChannel() != null) {
142
            writer = Channels.newWriter(
143
                    ServerTrafficCounter.wrapWritableChannel(socket.getChannel()), StandardCharsets.UTF_8.newEncoder(), -1);
144 145
        }
        else {
146
            writer = new BufferedWriter(new OutputStreamWriter(
147
                    ServerTrafficCounter.wrapOutputStream(socket.getOutputStream()), StandardCharsets.UTF_8));
148
        }
149
        this.backupDeliverer = backupDeliverer;
150 151 152
        xmlSerializer = new XMLSocketWriter(writer, this);

        instances.put(this, "");
153 154 155

        // Default this sensibly.
        this.tlsPolicy = this.getConfiguration().getTlsPolicy();
156 157 158 159 160 161 162 163 164 165 166 167 168
    }

    /**
     * Returns the stream handler responsible for securing the plain connection and providing
     * the corresponding input and output streams.
     *
     * @return the stream handler responsible for securing the plain connection and providing
     *         the corresponding input and output streams.
     */
    public TLSStreamHandler getTLSStreamHandler() {
        return tlsStreamHandler;
    }

169 170
    @Deprecated
    public void startTLS(boolean clientMode, String remoteServer, ClientAuth authentication) throws Exception {
171
        startTLS( clientMode );
172 173
    }

174
    public void startTLS(boolean clientMode) throws IOException {
175 176
        if (!secure) {
            secure = true;
177

178
            // Prepare for TLS
179 180 181 182 183 184 185 186 187
            final ClientAuth clientAuth;
            if (session instanceof IncomingServerSession)
            {
                clientAuth = ClientAuth.needed;
            }
            else
            {
                clientAuth = ClientAuth.wanted;
            }
188
            tlsStreamHandler = new TLSStreamHandler(socket, getConfiguration(), clientMode);
189 190 191 192 193 194 195
            if (!clientMode) {
                // Indicate the client that the server is ready to negotiate TLS
                deliverRawText("<proceed xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"/>");
            }
            // Start handshake
            tlsStreamHandler.start();
            // Use new wrapped writers
Guus der Kinderen's avatar
Guus der Kinderen committed
196
            writer = new BufferedWriter(new OutputStreamWriter(tlsStreamHandler.getOutputStream(), StandardCharsets.UTF_8));
197 198 199 200
            xmlSerializer = new XMLSocketWriter(writer, this);
        }
    }

201
    @Override
202 203 204 205
    public void addCompression() {
        // WARNING: We do not support adding compression for incoming traffic but not for outgoing traffic.
    }

206
    @Override
207
    public void startCompression() {
208 209
        compressed = true;

210 211 212 213 214 215
        try {
            if (tlsStreamHandler == null) {
                ZOutputStream out = new ZOutputStream(
                        ServerTrafficCounter.wrapOutputStream(socket.getOutputStream()),
                        JZlib.Z_BEST_COMPRESSION);
                out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
216
                writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
217 218 219 220 221
                xmlSerializer = new XMLSocketWriter(writer, this);
            }
            else {
                ZOutputStream out = new ZOutputStream(tlsStreamHandler.getOutputStream(), JZlib.Z_BEST_COMPRESSION);
                out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
222
                writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
223 224 225 226 227 228
                xmlSerializer = new XMLSocketWriter(writer, this);
            }
        } catch (IOException e) {
            // TODO Would be nice to still be able to throw the exception and not catch it here
            Log.error("Error while starting compression", e);
            compressed = false;
229 230 231
        }
    }

232
    @Override
233 234 235 236 237 238
    public ConnectionConfiguration getConfiguration()
    {
        // This is an ugly hack to get backwards compatibility with the pre-MINA era. As this implementation is being
        // removed (it is marked as deprecated - at the time of writing, it is only used for S2S). The ugly hack: assume
        // S2S:
        final ConnectionManagerImpl connectionManager = ((ConnectionManagerImpl) XMPPServer.getInstance().getConnectionManager());
239
        return connectionManager.getListener( ConnectionType.SOCKET_S2S, false ).generateConnectionConfiguration();
240 241
    }

242 243 244 245
    public boolean validate() {
        if (isClosed()) {
            return false;
        }
246
        boolean allowedToWrite = false;
247
        try {
248 249 250 251 252 253
            requestWriting();
            allowedToWrite = true;
            // Register that we started sending data on the connection
            writeStarted();
            writer.write(" ");
            writer.flush();
254 255 256 257 258 259 260 261
        }
        catch (Exception e) {
            Log.warn("Closing no longer valid connection" + "\n" + this.toString(), e);
            close();
        }
        finally {
            // Register that we finished sending data on the connection
            writeFinished();
262 263 264
            if (allowedToWrite) {
                releaseWriting();
            }
265 266 267 268
        }
        return !isClosed();
    }

269
    @Override
270
    public void init(LocalSession owner) {
271 272 273
        session = owner;
    }

274
    @Override
275
    public void registerCloseListener(ConnectionCloseListener listener, Object handbackMessage) {
276 277 278 279
        if (isClosed()) {
            listener.onConnectionClose(handbackMessage);
        }
        else {
280
            listeners.put(listener, handbackMessage);
281 282 283
        }
    }

284
    @Override
285 286
    public void removeCloseListener(ConnectionCloseListener listener) {
        listeners.remove(listener);
287 288
    }

289
    @Override
290 291 292 293
    public byte[] getAddress() throws UnknownHostException {
        return socket.getInetAddress().getAddress();
    }

294
    @Override
295 296 297 298
    public String getHostAddress() throws UnknownHostException {
        return socket.getInetAddress().getHostAddress();
    }

299
    @Override
300 301
    public String getHostName() throws UnknownHostException {
        return socket.getInetAddress().getHostName();
302 303
    }

304 305 306 307 308
    /**
     * Returns the port that the connection uses.
     *
     * @return the port that the connection uses.
     */
309 310 311 312
    public int getPort() {
        return socket.getPort();
    }

313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
    /**
     * Returns the Writer used to send data to the connection. The writer should be
     * used with caution. In the majority of cases, the {@link #deliver(Packet)}
     * method should be used to send data instead of using the writer directly.
     * You must synchronize on the writer before writing data to it to ensure
     * data consistency:
     *
     * <pre>
     *  Writer writer = connection.getWriter();
     * synchronized(writer) {
     *     // write data....
     * }</pre>
     *
     * @return the Writer for this connection.
     */
328 329 330 331
    public Writer getWriter() {
        return writer;
    }

332
    @Override
333
    public boolean isClosed() {
334
    	return state.get() == State.CLOSED;
335 336
    }

337
    @Override
338 339 340 341
    public boolean isSecure() {
        return secure;
    }

342
    @Override
343 344 345 346
    public boolean isCompressed() {
        return compressed;
    }

347
    @Override
348 349 350 351
    public TLSPolicy getTlsPolicy() {
        return tlsPolicy;
    }

352 353 354 355 356 357 358 359 360
    /**
     * Sets whether TLS is mandatory, optional or is disabled. When TLS is mandatory clients
     * are required to secure their connections or otherwise their connections will be closed.
     * On the other hand, when TLS is disabled clients are not allowed to secure their connections
     * using TLS. Their connections will be closed if they try to secure the connection. in this
     * last case.
     *
     * @param tlsPolicy whether TLS is mandatory, optional or is disabled.
     */
361
    @Override
362 363 364 365
    public void setTlsPolicy(TLSPolicy tlsPolicy) {
        this.tlsPolicy = tlsPolicy;
    }

366
    @Override
367 368 369 370
    public CompressionPolicy getCompressionPolicy() {
        return compressionPolicy;
    }

371 372 373 374 375
    /**
     * Sets whether compression is enabled or is disabled.
     *
     * @param compressionPolicy whether Compression is enabled or is disabled.
     */
376
    @Override
377 378 379 380 381 382 383 384
    public void setCompressionPolicy(CompressionPolicy compressionPolicy) {
        this.compressionPolicy = compressionPolicy;
    }

    public long getIdleTimeout() {
        return idleTimeout;
    }

385 386 387 388 389 390 391 392
    /**
     * Sets the number of milliseconds a connection has to be idle to be closed. Sending
     * stanzas to the client is not considered as activity. We are only considering the
     * connection active when the client sends some data or hearbeats (i.e. whitespaces)
     * to the server.
     *
     * @param timeout the number of milliseconds a connection has to be idle to be closed.
     */
393 394 395 396
    public void setIdleTimeout(long timeout) {
        this.idleTimeout = timeout;
    }

397
    @Override
398 399 400 401
    public int getMajorXMPPVersion() {
        return majorVersion;
    }

402
    @Override
403 404 405 406 407 408 409 410 411 412 413 414
    public int getMinorXMPPVersion() {
        return minorVersion;
    }

    /**
     * Sets the XMPP version information. In most cases, the version should be "1.0".
     * However, older clients using the "Jabber" protocol do not set a version. In that
     * case, the version is "0.0".
     *
     * @param majorVersion the major version.
     * @param minorVersion the minor version.
     */
415
    @Override
416 417 418 419 420
    public void setXMPPVersion(int majorVersion, int minorVersion) {
        this.majorVersion = majorVersion;
        this.minorVersion = minorVersion;
    }

421
    @Override
422 423 424 425 426 427 428 429 430 431 432 433
    public boolean isFlashClient() {
        return flashClient;
    }

    /**
     * Sets whether the connected client is a flash client. Flash clients need to
     * receive a special character (i.e. \0) at the end of each xml packet. Flash
     * clients may send the character \0 in incoming packets and may start a
     * connection using another openning tag such as: "flash:client".
     *
     * @param flashClient true if the if the connection is a flash client.
     */
434
    @Override
435 436 437 438
    public void setFlashClient(boolean flashClient) {
        this.flashClient = flashClient;
    }

439
    @Override
440 441 442 443 444 445 446
    public Certificate[] getLocalCertificates() {
        if (tlsStreamHandler != null) {
            return tlsStreamHandler.getSSLSession().getLocalCertificates();
        }
        return new Certificate[0];
    }

447
    @Override
448
    public Certificate[] getPeerCertificates() {
449
        if (tlsStreamHandler != null) {
450 451 452
            try {
                return tlsStreamHandler.getSSLSession().getPeerCertificates();
            } catch (SSLPeerUnverifiedException e ) {
453 454
                // Perfectly valid when client-auth is 'want', a problem when it is 'need'.
                Log.debug( "Peer certificates have not been verified - there are no certificates to return for: {}", tlsStreamHandler.getSSLSession().getPeerHost(), e );
455
            }
456
        }
457
        return new Certificate[0];
458 459
    }

460
    @Override
461 462 463 464
    public void setUsingSelfSignedCertificate(boolean isSelfSigned) {
        this.usingSelfSignedCertificate = isSelfSigned;
    }

465
    @Override
466 467 468 469
    public boolean isUsingSelfSignedCertificate() {
        return usingSelfSignedCertificate;
    }

470
    @Override
471 472 473 474
    public PacketDeliverer getPacketDeliverer() {
        return backupDeliverer;
    }

475 476 477 478 479 480 481 482 483 484
    /**
     * Closes the connection without sending any data (not even a stream end-tag).
     */
    public void forceClose() {
        close( true );
    }

    /**
     * Closes the connection after trying to send a stream end tag.
     */
485
    @Override
486
    public void close() {
487
        close( false );
488 489 490 491 492 493 494 495
    }

    /**
     * Normal connection close will attempt to write the stream end tag. Otherwise this method
     * forces the connection closed immediately. This method will be called from {@link SocketSendingTracker} 
     * when sending data over the socket has taken a long time and we need to close the socket, discard
     * the connection and its session.
     */
496
    private void close(boolean force) {
497 498
    	if (state.compareAndSet(State.OPEN, State.CLOSED)) {
    		
499 500 501
            if (session != null) {
                session.setStatus(Session.STATUS_CLOSED);
            }
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516

            if (!force) {
	            boolean allowedToWrite = false;
	            try {
	                requestWriting();
	                allowedToWrite = true;
	                // Register that we started sending data on the connection
	                writeStarted();
	                writer.write("</stream:stream>");
	                if (flashClient) {
	                    writer.write('\0');
	                }
	                writer.flush();
	            }
	            catch (Exception e) {
517
	                Log.debug("Failed to deliver stream close tag: " + e.getMessage());
518 519 520 521 522 523 524
	            }
	            
	            // Register that we finished sending data on the connection
	            writeFinished();
	            if (allowedToWrite) {
	                releaseWriting();
	            }
525
            }
526 527 528
                
            closeConnection();
            notifyCloseListeners();
529
            
530
    	}
531 532
    }

533
    @Override
534 535 536 537 538 539
    public void systemShutdown() {
        deliverRawText("<stream:error><system-shutdown " +
                "xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error>");
        close();
    }

540 541 542 543 544 545 546 547
    void writeStarted() {
        writeStarted = System.currentTimeMillis();
    }

    void writeFinished() {
        writeStarted = -1;
    }

548 549 550 551 552 553 554 555 556
    /**
     * Returns true if the socket was closed due to a bad health. The socket is considered to
     * be in a bad state if a thread has been writing for a while and the write operation has
     * not finished in a long time or when the client has not sent a heartbeat for a long time.
     * In any of both cases the socket will be closed.
     *
     * @return true if the socket was closed due to a bad health.s
     */
    boolean checkHealth() {
557
        // Check that the sending operation is still active
558 559
        long writeTimestamp = writeStarted;
        if (writeTimestamp > -1 && System.currentTimeMillis() - writeTimestamp >
560 561 562 563
                JiveGlobals.getIntProperty("xmpp.session.sending-limit", 60000)) {
            // Close the socket
            if (Log.isDebugEnabled()) {
                Log.debug("Closing connection: " + this + " that started sending data at: " +
564
                        new Date(writeTimestamp));
565
            }
566
            forceClose();
567
            return true;
568 569 570 571 572 573 574 575 576 577 578
        }
        else {
            // Check if the connection has been idle. A connection is considered idle if the client
            // has not been receiving data for a period. Sending data to the client is not
            // considered as activity.
            if (idleTimeout > -1 && socketReader != null &&
                    System.currentTimeMillis() - socketReader.getLastActive() > idleTimeout) {
                // Close the socket
                if (Log.isDebugEnabled()) {
                    Log.debug("Closing connection that has been idle: " + this);
                }
579
                forceClose();
580
                return true;
581 582
            }
        }
583
        return false;
584 585
    }

586
    private void release() {
587 588
        writeStarted = -1;
        instances.remove(this);
589
    }
590 591

    private void closeConnection() {
592
        release();
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
        try {
            if (tlsStreamHandler == null) {
                socket.close();
            }
            else {
                // Close the channels since we are using TLS (i.e. NIO). If the channels implement
                // the InterruptibleChannel interface then any other thread that was blocked in
                // an I/O operation will be interrupted and an exception thrown
                tlsStreamHandler.close();
            }
        }
        catch (Exception e) {
            Log.error(LocaleUtils.getLocalizedString("admin.error.close")
                    + "\n" + this.toString(), e);
        }
    }

610
    @Override
611 612
    public void deliver(Packet packet) throws UnauthorizedException, PacketException {
        if (isClosed()) {
613
            backupDeliverer.deliver(packet);
614 615
        }
        else {
616 617
            boolean errorDelivering = false;
            boolean allowedToWrite = false;
618
            try {
619 620 621 622 623
                requestWriting();
                allowedToWrite = true;
                xmlSerializer.write(packet.getElement());
                if (flashClient) {
                    writer.write('\0');
624
                }
625 626 627 628 629 630 631 632 633
                xmlSerializer.flush();
            }
            catch (Exception e) {
                Log.debug("Error delivering packet" + "\n" + this.toString(), e);
                errorDelivering = true;
            }
            finally {
                if (allowedToWrite) {
                    releaseWriting();
634 635
                }
            }
636 637 638 639 640 641 642 643
            if (errorDelivering) {
                close();
                // Retry sending the packet again. Most probably if the packet is a
                // Message it will be stored offline
                backupDeliverer.deliver(packet);
            }
            else {
                session.incrementServerPacketCount();
644 645 646 647
            }
        }
    }

648
    @Override
649 650 651
    public void deliverRawText(String text) {
        if (!isClosed()) {
            boolean errorDelivering = false;
652 653 654 655 656 657 658 659 660
            boolean allowedToWrite = false;
            try {
                requestWriting();
                allowedToWrite = true;
                // Register that we started sending data on the connection
                writeStarted();
                writer.write(text);
                if (flashClient) {
                    writer.write('\0');
661
                }
662 663 664 665 666 667 668 669 670 671 672
                writer.flush();
            }
            catch (Exception e) {
                Log.debug("Error delivering raw text" + "\n" + this.toString(), e);
                errorDelivering = true;
            }
            finally {
                // Register that we finished sending data on the connection
                writeFinished();
                if (allowedToWrite) {
                    releaseWriting();
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
                }
            }
            if (errorDelivering) {
                close();
            }
        }
    }

    /**
     * Notifies all close listeners that the connection has been closed.
     * Used by subclasses to properly finish closing the connection.
     */
    private void notifyCloseListeners() {
        synchronized (listeners) {
            for (ConnectionCloseListener listener : listeners.keySet()) {
                try {
                    listener.onConnectionClose(listeners.get(listener));
                }
                catch (Exception e) {
                    Log.error("Error notifying listener: " + listener, e);
                }
            }
        }
    }

698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
    private void requestWriting() throws Exception {
        for (;;) {
            if (writing.compareAndSet(false, true)) {
                // We are now in writing mode and only we can write to the socket
                return;
            }
            else {
                // Check health of the socket
                if (checkHealth()) {
                    // Connection was closed then stop
                    throw new Exception("Probable dead connection was closed");
                }
                else {
                    Thread.sleep(1);
                }
            }
        }
    }

    private void releaseWriting() {
        writing.compareAndSet(true, false);
    }

721 722
    @Override
	public String toString() {
723 724 725 726 727 728
        return super.toString() + " socket: " + socket + " session: " + session;
    }

    public void setSocketReader(SocketReader socketReader) {
        this.socketReader = socketReader;
    }
729
}