SocketSendingTracker.java 5.67 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
package org.jivesoftware.messenger.net;

import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;

import java.io.IOException;
import java.net.Socket;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * A SocketSendingTracker keeps track of all the sockets that are currently sending data and
 * checks the health of the sockets to detect hanged connections. If a sending operation takes
 * too much time (i.e. exceeds a time limit) then it is assumed that the connection has been
 * lost and for some reason the JVM has not been notified of the dead connection. Once a dead
 * connection has been detected it will be closed so that the thread that was writing to the
 * socket can resume. Resuming locked threads is important since otherwise a complete system halt
 * may occur.<p>
 *
 * The time limit to wait before considering a connection dead can be configured changing the
 * property <b>xmpp.session.sending-limit</b>. If the property was not defined then a default
 * time limit of 60 seconds will be assumed. This means that by default if a sending operation
 * takes longer than 60 seconds then the connection will be closed and the client disconnected.
 * Therefore, it is important to not set a very low time limit since active clients may be
 * incorrectly considered as dead clients.
 *
 * @author Gaston Dombiak
 */
public class SocketSendingTracker {


    private static SocketSendingTracker instance = new SocketSendingTracker();
    /**
     * Map that holds the sockets that are currently sending information together with the date
     * when the sending operation started.
     */
    private Map<Socket, Date> sockets = new ConcurrentHashMap<Socket, Date>();

    /**
     * Flag that indicates if the tracket should shutdown the tracking process.
     */
    private boolean shutdown = false;

    /**
     * Thread used for checking periodically the health of the sockets involved in sending
     * operations.
     */
    private Thread checkingThread;

    /**
     * Returns the unique instance of this class.
     *
     * @return the unique instance of this class.
     */
    public static SocketSendingTracker getInstance() {
        return instance;
    }

    /**
     * Hide the constructor so that only one instance of this class can exist.
     */
    private SocketSendingTracker() {
    }

    /**
     * Register that the specified socket has started sending information. The registration will
     * include the timestamp when the sending operation started so that if after several minutes
     * it hasn't finished then the socket will be closed.
     *
     * @param socket the socket that started sending data.
     */
    public void socketStartedSending(Socket socket) {
        sockets.put(socket, new Date());
    }

    /**
     * Register that the specified socket has finished sending information. The socket will
     * be removed from the tracking list.
     *
     * @param socket the socket that finished sending data.
     */
    public void socketFinishedSending(Socket socket) {
        sockets.remove(socket);
    }

    /**
     * Start up the daemon thread that will check for the health of the sockets that are
     * currently sending data.
     */
    public void start() {
        shutdown = false;
        checkingThread = new Thread("SocketSendingTracker") {
            public void run() {
                while (!shutdown) {
                    checkHealth();
                    synchronized (this) {
                        try {
                            wait(10000);
                        }
                        catch (InterruptedException e) {
                        }
                    }
                }
            }
        };
        checkingThread.setDaemon(true);
        checkingThread.start();
    }

    /**
     * Indicates that the checking thread should be stoped. The thread will be waked up
     * so that it can be stoped.
     */
    public void shutdown() {
        shutdown = true;
        // Use a wait/notify algorithm to ensure that the thread stops immediately if it
        // was waiting
        synchronized (checkingThread) {
            checkingThread.notify();
        }
    }

    /**
     * Checks if a socket has been trying to send data for a given amount of time. If it has
     * exceded a limit of time then the socket will be closed.<p>
     *
     * It is expected that sending operations will not take too much time so the checking will
     * be very fast since very few sockets will be present in the Map and most or all of them
     * will not exceed the time limit. Therefore, it is expected the overhead of this class to be
     * quite small.
     */
    private void checkHealth() {
        for (Socket socket : sockets.keySet()) {
            Date startDate = sockets.get(socket);
            if (startDate != null &&
                    System.currentTimeMillis() - startDate.getTime() >
                    JiveGlobals.getIntProperty("xmpp.session.sending-limit", 60000)) {
                // Check that the sending operation is still active
                if (sockets.get(socket) != null) {
                    // Close the socket
                    try {
                        Log.debug("Closing socket: " + socket + " that started sending data at: " +
                                startDate);
                        socket.close();
                    }
                    catch (IOException e) {
                        Log.error("Error closing socket", e);
                    }
                    finally {
                        // Remove tracking on this socket
                        sockets.remove(socket);
                    }
                }
            }

        }
    }
}