ConnectionMultiplexerManager.java 15 KB
Newer Older
Gaston Dombiak's avatar
Gaston Dombiak committed
1 2 3 4 5
/**
 * $RCSfile: $
 * $Revision: $
 * $Date: $
 *
6
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
Gaston Dombiak's avatar
Gaston Dombiak committed
7
 *
8 9 10 11 12 13 14 15 16 17 18
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
Gaston Dombiak's avatar
Gaston Dombiak committed
19 20
 */

21
package org.jivesoftware.openfire.multiplex;
Gaston Dombiak's avatar
Gaston Dombiak committed
22

23 24 25 26 27 28 29 30
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;

31 32 33 34 35 36 37
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.event.SessionEventDispatcher;
import org.jivesoftware.openfire.event.SessionEventListener;
import org.jivesoftware.openfire.session.ConnectionMultiplexerSession;
38
import org.jivesoftware.openfire.session.LocalClientSession;
39
import org.jivesoftware.openfire.session.Session;
40 41 42
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.TaskEngine;
43 44
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Gaston Dombiak's avatar
Gaston Dombiak committed
45 46 47 48 49 50 51 52 53 54 55

/**
 * A ConnectionMultiplexerManager is responsible for keeping track of the connected
 * Connection Managers and the sessions that were established with the Connection
 * Managers. Moreover, a ConnectionMultiplexerManager is able to create, get and close
 * client sessions based on Connection requests.
 *
 * @author Gaston Dombiak
 */
public class ConnectionMultiplexerManager implements SessionEventListener {

56 57
	private static final Logger Log = LoggerFactory.getLogger(ConnectionMultiplexerManager.class);

Gaston Dombiak's avatar
Gaston Dombiak committed
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    private static final ConnectionMultiplexerManager instance = new ConnectionMultiplexerManager();

    /**
     * Pseudo-random number generator object for use with getMultiplexerSession(String).
     */
    private static Random randGen = new Random();

    static {
        // Add the unique instance of this class as a session listener. We need to react
        // when sessions are closed so we can clean up the registry of client sessions.
        SessionEventDispatcher.addListener(instance);
    }

    /**
     * Map that keeps track of connection managers and hosted connections.
     * Key: stream ID; Value: Domain of connection manager hosting connection
     */
    private Map<String, String> streamIDs = new ConcurrentHashMap<String, String>();
    /**
     * Map that keeps track of connection managers and hosted sessions.
     * Key: Domain of connection manager; Value: Map with Key: stream ID; Value: Client session
     */
80 81
    private Map<String, Map<String, LocalClientSession>> sessionsByManager =
            new ConcurrentHashMap<String, Map<String, LocalClientSession>>();
Gaston Dombiak's avatar
Gaston Dombiak committed
82 83 84 85 86 87 88 89 90 91 92 93

    private SessionManager sessionManager;

    /**
     * Returns the unique instance of this class.
     *
     * @return the unique instance of this class.
     */
    public static ConnectionMultiplexerManager getInstance() {
        return instance;
    }

94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
    /**
     * Returns the default secret key that connection managers should present while trying to
     * establish a new connection.
     *
     * @return the default secret key that connection managers should present while trying to
     *         establish a new connection.
     */
    public static String getDefaultSecret() {
        return JiveGlobals.getProperty("xmpp.multiplex.defaultSecret");
    }

    /**
     * Sets the default secret key that connection managers should present while trying to
     * establish a new connection.
     *
     * @param defaultSecret the default secret key that connection managers should present
     *        while trying to establish a new connection.
     */
    public static void setDefaultSecret(String defaultSecret) {
        JiveGlobals.setProperty("xmpp.multiplex.defaultSecret", defaultSecret);
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
116 117
    private ConnectionMultiplexerManager() {
        sessionManager = XMPPServer.getInstance().getSessionManager();
118 119
        // Start thread that will send heartbeats to Connection Managers every 30 seconds
        // to keep connections open.
120
        TimerTask heartbeatTask = new TimerTask() {
121 122
            @Override
			public void run() {
123
                try {
124 125
                    for (ConnectionMultiplexerSession session : sessionManager.getConnectionMultiplexerSessions()) {
                        session.deliverRawText(" ");
126 127
                    }
                }
128
                catch(Exception e) {
129
                    Log.error(e.getMessage(), e);
130
                }
131 132
            }
        };
133
        TaskEngine.getInstance().schedule(heartbeatTask, 30*JiveConstants.SECOND, 30*JiveConstants.SECOND);
Gaston Dombiak's avatar
Gaston Dombiak committed
134 135 136 137 138 139 140 141 142
    }

    /**
     * Creates a new client session that was established to the specified connection manager.
     * The new session will not be findable through its stream ID.
     *
     * @param connectionManagerDomain the connection manager that is handling the connection
     *        of the session.
     * @param streamID the stream ID created by the connection manager for the new session.
143 144 145
     * @param hostName the address's hostname of the client or null if using old connection manager.
     * @param hostAddress the textual representation of the address of the client or null if using old CM.
     * @return true if a session was created or false if the client should disconnect.
Gaston Dombiak's avatar
Gaston Dombiak committed
146
     */
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
    public boolean createClientSession(String connectionManagerDomain, String streamID, String hostName, String hostAddress) {
        Connection connection = new ClientSessionConnection(connectionManagerDomain, hostName, hostAddress);
        // Check if client is allowed to connect from the specified IP address. Ignore the checking if connection
        // manager is old version and is not passing client's address
        byte[] address = null;
        try {
            address = connection.getAddress();
        } catch (UnknownHostException e) {
            // Ignore
        }
        if (address == null || LocalClientSession.isAllowed(connection)) {
            LocalClientSession session =
                    SessionManager.getInstance().createClientSession(connection, new BasicStreamID(streamID));
            // Register that this streamID belongs to the specified connection manager
            streamIDs.put(streamID, connectionManagerDomain);
            // Register which sessions are being hosted by the speicifed connection manager
            Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
            if (sessions == null) {
                synchronized (connectionManagerDomain.intern()) {
                    sessions = sessionsByManager.get(connectionManagerDomain);
                    if (sessions == null) {
                        sessions = new ConcurrentHashMap<String, LocalClientSession>();
                        sessionsByManager.put(connectionManagerDomain, sessions);
                    }
Gaston Dombiak's avatar
Gaston Dombiak committed
171 172
                }
            }
173 174
            sessions.put(streamID, session);
            return true;
Gaston Dombiak's avatar
Gaston Dombiak committed
175
        }
176
        return false;
Gaston Dombiak's avatar
Gaston Dombiak committed
177 178 179 180 181 182 183 184 185 186
    }

    /**
     * Closes an existing client session that was established through a connection manager.
     *
     * @param connectionManagerDomain the connection manager that is handling the connection
     *        of the session.
     * @param streamID the stream ID created by the connection manager for the session.
     */
    public void closeClientSession(String connectionManagerDomain, String streamID) {
187
        Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
Gaston Dombiak's avatar
Gaston Dombiak committed
188 189 190 191
        if (sessions != null) {
            Session session = sessions.remove(streamID);
            if (session != null) {
                // Close the session
192
                session.close();
Gaston Dombiak's avatar
Gaston Dombiak committed
193 194 195 196 197
            }
        }
    }

    /**
198 199 200 201 202 203 204 205
     * A connection manager has become available. Clients can now connect to the server through
     * the connection manager.
     *
     * @param connectionManagerName the connection manager that has become available.
     */
    public void multiplexerAvailable(String connectionManagerName) {
        // Add a new entry in the list of available managers. Here is where we are going to store
        // which clients were connected through which connection manager
206
        Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerName);
207 208 209 210
        if (sessions == null) {
            synchronized (connectionManagerName.intern()) {
                sessions = sessionsByManager.get(connectionManagerName);
                if (sessions == null) {
211
                    sessions = new ConcurrentHashMap<String, LocalClientSession>();
212 213 214 215 216 217 218 219 220
                    sessionsByManager.put(connectionManagerName, sessions);
                }
            }
        }
    }

    /**
     * A connection manager has gone unavailable. Close client sessions that were established
     * to the specified connection manager.
Gaston Dombiak's avatar
Gaston Dombiak committed
221
     *
222
     * @param connectionManagerName the connection manager that is no longer available.
Gaston Dombiak's avatar
Gaston Dombiak committed
223
     */
224
    public void multiplexerUnavailable(String connectionManagerName) {
Gaston Dombiak's avatar
Gaston Dombiak committed
225
        // Remove the connection manager and the hosted sessions
226
        Map<String, LocalClientSession> sessions = sessionsByManager.remove(connectionManagerName);
Gaston Dombiak's avatar
Gaston Dombiak committed
227 228 229 230 231
        if (sessions != null) {
            for (String streamID : sessions.keySet()) {
                // Remove inverse track of connection manager hosting streamIDs
                streamIDs.remove(streamID);
                // Close the session
232
                sessions.get(streamID).close();
Gaston Dombiak's avatar
Gaston Dombiak committed
233 234 235 236 237 238 239 240 241 242 243 244 245
            }
        }
    }

    /**
     * Returns the ClientSession with the specified stream ID that is being hosted by the
     * specified connection manager.
     *
     * @param connectionManagerDomain the connection manager that is handling the connection
     *        of the session.
     * @param streamID the stream ID created by the connection manager for the session.
     * @return the ClientSession with the specified stream ID.
     */
246 247
    public LocalClientSession getClientSession(String connectionManagerDomain, String streamID) {
        Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
Gaston Dombiak's avatar
Gaston Dombiak committed
248 249 250 251 252 253 254 255
        if (sessions != null) {
            return sessions.get(streamID);
        }
        return null;
    }

    /**
     * Returns a {@link ConnectionMultiplexerSession} for the specified connection manager
256 257
     * domain or <tt>null</tt> if none was found. If a StreamID is passed in, the same connection
     * will always be used for that StreamID. Otherwise, if the connection manager has many
Gaston Dombiak's avatar
Gaston Dombiak committed
258 259 260
     * connections established with the server then one of them will be selected randomly.
     *
     * @param connectionManagerDomain the domain of the connection manager to get a session.
261
     * @param streamID if provided, the same connection will always be used for a given streamID
Gaston Dombiak's avatar
Gaston Dombiak committed
262 263
     * @return a session to the specified connection manager domain or null if none was found.
     */
264
    public ConnectionMultiplexerSession getMultiplexerSession(String connectionManagerDomain,String streamID) {
Gaston Dombiak's avatar
Gaston Dombiak committed
265 266 267 268 269 270 271 272
        List<ConnectionMultiplexerSession> sessions =
                sessionManager.getConnectionMultiplexerSessions(connectionManagerDomain);
        if (sessions.isEmpty()) {
            return null;
        }
        else if (sessions.size() == 1) {
            return sessions.get(0);
        }
273 274 275 276 277
        else if (streamID != null) {
            // Always use the same connection for a given streamID
            int connectionIndex = Math.abs(streamID.hashCode()) % sessions.size();
            return sessions.get(connectionIndex);
        } else {
Gaston Dombiak's avatar
Gaston Dombiak committed
278 279 280 281 282
            // Pick a random session so we can distribute traffic evenly
            return sessions.get(randGen.nextInt(sessions.size()));
        }
    }

283 284 285 286 287 288 289 290 291 292 293 294 295
    /**
     * Returns a {@link ConnectionMultiplexerSession} for the specified connection manager
     * domain or <tt>null</tt> if none was found. In case the connection manager has many
     * connections established with the server then one of them will be selected randomly.
     *
     * @param connectionManagerDomain the domain of the connection manager to get a session.
     * @return a session to the specified connection manager domain or null if none was found.
     */
    public ConnectionMultiplexerSession getMultiplexerSession(String connectionManagerDomain) {
        return getMultiplexerSession(connectionManagerDomain,null);
    }


296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
    /**
     * Returns the names of the connected connection managers to this server.
     *
     * @return the names of the connected connection managers to this server.
     */
    public Collection<String> getMultiplexers() {
        return sessionsByManager.keySet();
    }

    /**
     * Returns the number of connected clients to a specific connection manager.
     *
     * @param managerName the name of the connection manager.
     * @return the number of connected clients to a specific connection manager.
     */
    public int getNumConnectedClients(String managerName) {
312
        Map<String, LocalClientSession> clients = sessionsByManager.get(managerName);
313 314 315 316 317 318 319 320
        if (clients == null) {
            return 0;
        }
        else {
            return clients.size();
        }
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
    public void anonymousSessionCreated(Session session) {
        // Do nothing.
    }

    public void anonymousSessionDestroyed(Session session) {
        removeSession(session);
    }

    public void sessionCreated(Session session) {
        // Do nothing.
    }

    public void sessionDestroyed(Session session) {
        removeSession(session);
    }

337 338 339 340
    public void resourceBound(Session session) {
    	// Do nothing.
    }

Gaston Dombiak's avatar
Gaston Dombiak committed
341 342 343 344 345 346
    private void removeSession(Session session) {
        // Remove trace indicating that a connection manager is hosting a connection
        String streamID = session.getStreamID().getID();
        String connectionManagerDomain = streamIDs.remove(streamID);
        // Remove trace indicating that a connection manager is hosting a session
        if (connectionManagerDomain != null) {
347
            Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
Gaston Dombiak's avatar
Gaston Dombiak committed
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
            if (sessions != null) {
                sessions.remove(streamID);
            }
        }
    }

    /**
     * Simple implementation of the StreamID interface to hold the stream ID assigned by
     * the Connection Manager to the Session.
     */
    private class BasicStreamID implements StreamID {
        String id;

        public BasicStreamID(String id) {
            this.id = id;
        }

        public String getID() {
            return id;
        }

369 370
        @Override
		public String toString() {
Gaston Dombiak's avatar
Gaston Dombiak committed
371 372 373
            return id;
        }

374 375
        @Override
		public int hashCode() {
Gaston Dombiak's avatar
Gaston Dombiak committed
376 377 378 379
            return id.hashCode();
        }
    }
}