ConnectionMultiplexerSocketReader.java 11.2 KB
Newer Older
Gaston Dombiak's avatar
Gaston Dombiak committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/**
 * $RCSfile: $
 * $Revision: $
 * $Date: $
 *
 * Copyright (C) 2006 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution.
 */

package org.jivesoftware.wildfire.net;

import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals;
16
import org.jivesoftware.util.Log;
Gaston Dombiak's avatar
Gaston Dombiak committed
17 18 19 20
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.RoutingTable;
import org.jivesoftware.wildfire.Session;
import org.jivesoftware.wildfire.SessionManager;
Gaston Dombiak's avatar
Gaston Dombiak committed
21 22 23
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.multiplex.ConnectionMultiplexerSession;
import org.jivesoftware.wildfire.multiplex.MultiplexerPacketHandler;
Gaston Dombiak's avatar
Gaston Dombiak committed
24
import org.jivesoftware.wildfire.multiplex.Route;
Gaston Dombiak's avatar
Gaston Dombiak committed
25
import org.xmlpull.v1.XmlPullParserException;
Gaston Dombiak's avatar
Gaston Dombiak committed
26 27 28 29
import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
Gaston Dombiak's avatar
Gaston Dombiak committed
30 31 32

import java.io.IOException;
import java.net.Socket;
33
import java.util.concurrent.*;
Gaston Dombiak's avatar
Gaston Dombiak committed
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69

/**
 * A SocketReader specialized for connection manager connections. Connection managers may have
 * one or more connections to the server. Each connection will have its own instance of this
 * class. Each connection will send packets, sent from clients connected to the connection
 * manager, to the server. Moreover, the server will use any of the available connections
 * to the connection manager to send packets to connected clients through the connection manager.<p>
 *
 * Each socket reader has its own thread pool to process many packets in parallel. The thread pool
 * by default will use 10 core threads, a queue of 50 elements and a max number of 100 threads.
 * The pool will use the 10 core threads in parallel and queue packets. When the queue is full
 * then more threads will be created until the max number is reached. Any created thread that
 * exceeds the core number of threads will be killed when idle for 1 minute. The thread pool
 * configuration can be modified by setting the system properties:
 * <ul>
 *  <li>xmpp.multiplex.processing.core.threads
 *  <li>xmpp.multiplex.processing.max.threads
 *  <li>xmpp.multiplex.processing.queue
 * </ul>
 *
 * Each Connection Manager has its own domain. Each connection from the same connection manager
 * uses a different resource. Unlike any other session, connection manager sessions are not
 * present in the routing table. This means that connection managers are not reachable entities.
 * In other words, entities cannot send packets to connection managers but clients being hosted
 * by them. The main reason behind this design decision is that connection managers are private
 * components of the server so they can only be contacted by the server. Connection Manager
 * sessions are present in {@link SessionManager} but not in {@link RoutingTable}.
 *
 * @author Gaston Dombiak
 */
public class ConnectionMultiplexerSocketReader extends SocketReader {

    /**
     * Pool of threads that are available for processing the requests.
     */
    private ThreadPoolExecutor threadPool;
70 71 72 73 74 75 76 77 78 79 80

    /**
     * Queue used when thread pool is exhausted (i.e. core threads, queue and max threads are all busy). Once
     * the thread pool is exhausted, incoming packets will be placed into this queue. Once the queue is emptied
     * incoming packets will go to the thread pool.<p>
     * 
     * Note that the queue is unbound so we may potentially consume all Java memory. A future version may make
     * Connection Managers smarter and throttle traffic to the server to avoid this problem.
     */
    private BlockingQueue<Runnable> overflowBuffer = new LinkedBlockingQueue<Runnable>();

Gaston Dombiak's avatar
Gaston Dombiak committed
81 82 83 84 85
    /**
     * Handler of IQ packets sent from the Connection Manager to the server.
     */
    private MultiplexerPacketHandler packetHandler;

Gaston Dombiak's avatar
Gaston Dombiak committed
86 87 88 89
    public ConnectionMultiplexerSocketReader(PacketRouter router, RoutingTable routingTable,
            String serverName, Socket socket, SocketConnection connection,
            boolean useBlockingMode) {
        super(router, routingTable, serverName, socket, connection, useBlockingMode);
Gaston Dombiak's avatar
Gaston Dombiak committed
90 91 92 93 94 95 96 97
        // Create a pool of threads that will process received packets. If more threads are
        // required then the command will be executed on the SocketReader process
        int coreThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.core.threads", 10);
        int maxThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.max.threads", 100);
        int queueSize = JiveGlobals.getIntProperty("xmpp.multiplex.processing.queue", 50);
        threadPool =
                new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(queueSize),
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
                        new RejectedExecutionHandler() {
                            /**
                             * Stores rejected tasks in the overflow queue.
                             * @param r the rejected task.
                             * @param executor thread pool executor.
                             */
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                if (!executor.isShutdown()) {
                                    overflowBuffer.add(r);
                                }
                            }
                        });
        // Thread that will consume packets present in the overflow queue. The thread will monitor the threadPool
        // and when a thread in the pool is available it will remove the oldest packet in the overflow queue and
        // process it with the idle threads.
        Thread overflowThread = new Thread() {

            public void run() {
                while (!threadPool.isShutdown() && !threadPool.isTerminated()) {
                    try {
                        // Get the next task that has been rejected when the threadPool was exhausted
                        Runnable runnable = overflowBuffer.take();
                        // Wait until the pool has available threads
                        while (threadPool.getActiveCount() >= threadPool.getMaximumPoolSize()) {
                            Thread.sleep(100);
                        }
                        // Process the rejected task
                        threadPool.execute(runnable);

                    } catch (InterruptedException e) {
                        // Do nothing
                    }
                    catch(Exception e) {
                        Log.error("Error consuming overflow buffer", e);
                    }
                }
            }
        };
        overflowThread.start();
Gaston Dombiak's avatar
Gaston Dombiak committed
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    }

    boolean createSession(String namespace)
            throws UnauthorizedException, XmlPullParserException, IOException {
        if (getNamespace().equals(namespace)) {
            // The connected client is a connection manager so create a ConnectionMultiplexerSession
            session = ConnectionMultiplexerSession.createSession(serverName, reader, connection);
            packetHandler = new MultiplexerPacketHandler(session.getAddress().getDomain());
            return true;
        }
        return false;
    }

    String getNamespace() {
        return "jabber:connectionmanager";
    }

    protected void processIQ(final IQ packet) throws UnauthorizedException {
        if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
Gaston Dombiak's avatar
Gaston Dombiak committed
156
            // Session is not authenticated so return error
Gaston Dombiak's avatar
Gaston Dombiak committed
157 158 159 160 161 162 163 164 165 166 167 168
            IQ reply = new IQ();
            reply.setChildElement(packet.getChildElement().createCopy());
            reply.setID(packet.getID());
            reply.setTo(packet.getFrom());
            reply.setFrom(packet.getTo());
            reply.setError(PacketError.Condition.not_authorized);
            session.process(reply);
            return;
        }
        // Process the packet in another thread
        threadPool.execute(new Runnable() {
            public void run() {
Gaston Dombiak's avatar
Gaston Dombiak committed
169
                packetHandler.handle(packet);
Gaston Dombiak's avatar
Gaston Dombiak committed
170 171 172 173
            }
        });
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
174 175 176 177 178 179 180 181
    /**
     * Process stanza sent by a client that is connected to a connection manager. The
     * original stanza is wrapped in the route element. Only a single stanza must be
     * wrapped in the route element.
     *
     * @param packet the route element.
     */
    private void processRoute(final Route packet) throws UnauthorizedException {
Gaston Dombiak's avatar
Gaston Dombiak committed
182
        if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
Gaston Dombiak's avatar
Gaston Dombiak committed
183 184
            // Session is not authenticated so return error
            Route reply = new Route(packet.getStreamID());
Gaston Dombiak's avatar
Gaston Dombiak committed
185 186 187 188 189 190 191 192
            reply.setID(packet.getID());
            reply.setTo(packet.getFrom());
            reply.setFrom(packet.getTo());
            reply.setError(PacketError.Condition.not_authorized);
            session.process(reply);
            return;
        }
        // Process the packet in another thread
193
        Runnable runnable = new Runnable() {
Gaston Dombiak's avatar
Gaston Dombiak committed
194
            public void run() {
Gaston Dombiak's avatar
Gaston Dombiak committed
195
                packetHandler.route(packet);
Gaston Dombiak's avatar
Gaston Dombiak committed
196
            }
197 198 199 200 201 202 203 204 205 206 207 208 209
        };
        if (!overflowBuffer.isEmpty()) {
            // Thread pool is exhausted or we are still recoving from a recent exhausted state.
            // Keep placing tasks in this queue until the queue is empty. The queue will help us
            // keep the cronological order of incoming packets. Note that if we don't care about
            // being cronologically correct then we should just add the task to the threadPool.
            overflowBuffer.add(runnable);
        }
        else {
            // Thread pool is not exhausted and we are not recovering from an exhausted state so just
            // run the task using the thread pool
            threadPool.execute(runnable);
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
210 211
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
212 213 214 215 216
    protected void processMessage(final Message packet) throws UnauthorizedException {
        throw new UnauthorizedException("Message packets are not supported. Original packets " +
                "should be wrapped by IQ packets.");
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
217
    protected void processPresence(final Presence packet) throws UnauthorizedException {
Gaston Dombiak's avatar
Gaston Dombiak committed
218 219
        throw new UnauthorizedException("Message packets are not supported. Original packets " +
                "should be wrapped by IQ packets.");
Gaston Dombiak's avatar
Gaston Dombiak committed
220 221 222
    }

    boolean processUnknowPacket(Element doc) {
Gaston Dombiak's avatar
Gaston Dombiak committed
223 224 225 226 227 228 229 230 231 232 233 234
        String tag = doc.getName();
        if ("route".equals(tag)) {
            // Process stanza wrapped by the route packet
            try {
                processRoute(new Route(doc));
                return true;
            }
            catch (UnauthorizedException e) {
                // Should never happen
            }
        }
        else if ("handshake".equals(tag)) {
Gaston Dombiak's avatar
Gaston Dombiak committed
235 236 237
            open = ((ConnectionMultiplexerSession)session).authenticate(doc.getStringValue());
            return true;
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
238 239 240 241
        else if ("error".equals(tag) && "stream".equals(doc.getNamespacePrefix())) {
            session.getConnection().close();
            open = false;
            return true;
Gaston Dombiak's avatar
Gaston Dombiak committed
242
        }
Gaston Dombiak's avatar
Gaston Dombiak committed
243
        return false;
Gaston Dombiak's avatar
Gaston Dombiak committed
244 245
    }

246 247 248 249 250 251 252
    protected void shutdown() {
        super.shutdown();
        // Shutdown the pool of threads that are processing packets sent by
        // the remote server
        threadPool.shutdown();
    }

253 254 255 256
    String getName() {
        return "ConnectionMultiplexer SR - " + hashCode();
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
257 258 259 260
    boolean validateHost() {
        return false;
    }
}