HttpSessionManager.java 18.7 KB
Newer Older
Alex Wenckus's avatar
Alex Wenckus committed
1 2 3 4
/**
 * $Revision: $
 * $Date: $
 *
5
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
Alex Wenckus's avatar
Alex Wenckus committed
6
 *
7 8 9 10 11 12 13 14 15 16 17
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
Alex Wenckus's avatar
Alex Wenckus committed
18
 */
Matt Tucker's avatar
Matt Tucker committed
19

20
package org.jivesoftware.openfire.http;
Alex Wenckus's avatar
Alex Wenckus committed
21

22 23 24 25 26 27
import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
28
import java.util.concurrent.ThreadFactory;
29 30
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.atomic.AtomicInteger;
32

33 34 35
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
36
import org.dom4j.QName;
37 38 39
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.auth.UnauthorizedException;
40 41
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
42
import org.jivesoftware.util.Log;
43
import org.jivesoftware.util.TaskEngine;
44 45
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Alex Wenckus's avatar
Alex Wenckus committed
46 47

/**
48
 * Manages sessions for all users connecting to Openfire using the HTTP binding protocal,
49
 * <a href="http://www.xmpp.org/extensions/xep-0124.html">XEP-0124</a>.
Alex Wenckus's avatar
Alex Wenckus committed
50 51
 */
public class HttpSessionManager {
52 53 54
	
	private static final Logger Log = LoggerFactory.getLogger(HttpSessionManager.class);

Alex Wenckus's avatar
Alex Wenckus committed
55
    private SessionManager sessionManager;
56 57
    private Map<String, HttpSession> sessionMap = new ConcurrentHashMap<String, HttpSession>(
    		JiveGlobals.getIntProperty("xmpp.httpbind.session.initial.count", 16));
58
    private TimerTask inactivityTask;
59
    private ThreadPoolExecutor sendPacketPool;
60 61 62 63 64 65
    private SessionListener sessionListener = new SessionListener() {
        public void connectionOpened(HttpSession session, HttpConnection connection) {
        }

        public void connectionClosed(HttpSession session, HttpConnection connection) {
        }
Alex Wenckus's avatar
Alex Wenckus committed
66

67 68 69 70 71 72 73 74
        public void sessionClosed(HttpSession session) {
            sessionMap.remove(session.getStreamID().getID());
        }
    };

    /**
     * Creates a new HttpSessionManager instance.
     */
Alex Wenckus's avatar
Alex Wenckus committed
75
    public HttpSessionManager() {
76 77 78 79
    	
        JiveGlobals.migrateProperty("xmpp.httpbind.worker.threads");
        JiveGlobals.migrateProperty("xmpp.httpbind.worker.timeout");
    	
Alex Wenckus's avatar
Alex Wenckus committed
80
        this.sessionManager = SessionManager.getInstance();
81

82 83 84 85 86 87 88 89
        // Configure a pooled executor to handle async routing for incoming packets
        // with a default size of 16 threads ("xmpp.httpbind.worker.threads"); also
        // uses an unbounded task queue and configurable keep-alive (default: 60 secs)
        
        // Note: server supports up to 254 client threads by default (@see HttpBindManager)
        // BOSH installations expecting heavy loads may want to allocate additional threads 
        // to this worker pool to ensure timely delivery of inbound packets
        
90
        int maxPoolSize = JiveGlobals.getIntProperty("xmpp.httpbind.worker.threads", 
91
				// use deprecated property as default (shared with ConnectionManagerImpl)
92
				JiveGlobals.getIntProperty("xmpp.client.processing.threads", 8));
93 94
        int keepAlive = JiveGlobals.getIntProperty("xmpp.httpbind.worker.timeout", 60);

95
        sendPacketPool = new ThreadPoolExecutor(getCorePoolSize(maxPoolSize), maxPoolSize, keepAlive, TimeUnit.SECONDS, 
96 97 98 99 100 101
			new LinkedBlockingQueue<Runnable>(), // unbounded task queue
	        new ThreadFactory() { // custom thread factory for BOSH workers
	            final AtomicInteger counter = new AtomicInteger(1);
	            public Thread newThread(Runnable runnable) {
	                Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
	                                    "httpbind-worker-" + counter.getAndIncrement());
102
	                thread.setDaemon(true);
103 104 105
	                return thread;
	            }
	    	});
Alex Wenckus's avatar
Alex Wenckus committed
106 107
    }

108 109 110 111
	private int getCorePoolSize(int maxPoolSize) {
		return (maxPoolSize/4)+1;
	}

112 113 114
    /**
     * Starts the services used by the HttpSessionManager.
     */
115
    public void start() {
116 117
        inactivityTask = new HttpSessionReaper();
        TaskEngine.getInstance().schedule(inactivityTask, 30 * JiveConstants.SECOND,
118
                30 * JiveConstants.SECOND);
119
        sendPacketPool.prestartCoreThread();
120 121
    }

122 123 124
    /**
     * Stops any services and cleans up any resources used by the HttpSessionManager.
     */
125
    public void stop() {
126
        inactivityTask.cancel();
127
        for (HttpSession session : sessionMap.values()) {
128 129 130
            session.close();
        }
        sessionMap.clear();
131
        sendPacketPool.shutdown();
132 133
    }

134 135 136 137 138 139
    /**
     * Returns the session related to a stream id.
     *
     * @param streamID the stream id to retrieve the session.
     * @return the session related to the provided stream id.
     */
Alex Wenckus's avatar
Alex Wenckus committed
140 141 142 143
    public HttpSession getSession(String streamID) {
        return sessionMap.get(streamID);
    }

144
    /**
145
     * Creates an HTTP binding session which will allow a user to exchange packets with Openfire.
146 147 148
     *
     * @param address the internet address that was used to bind to Wildfie.
     * @param rootNode the body element that was sent containing the request for a new session.
149
     * @param connection the HTTP connection object which abstracts the individual connections to
150
     * Openfire over the HTTP binding protocol. The initial session creation response is returned to
151
     * this connection.
152
     * @return the created HTTP session.
153
     *
154
     * @throws UnauthorizedException if the Openfire server is currently in an uninitialized state.
155 156 157 158
     * Either shutting down or starting up.
     * @throws HttpBindException when there is an internal server error related to the creation of
     * the initial session creation response.
     */
Alex Wenckus's avatar
Alex Wenckus committed
159
    public HttpSession createSession(InetAddress address, Element rootNode,
160 161
                                     HttpConnection connection)
            throws UnauthorizedException, HttpBindException {
Alex Wenckus's avatar
Alex Wenckus committed
162 163 164 165
        // TODO Check if IP address is allowed to connect to the server

        // Default language is English ("en").
        String language = rootNode.attributeValue("xml:lang");
166
        if (language == null || "".equals(language)) {
Alex Wenckus's avatar
Alex Wenckus committed
167 168 169 170 171
            language = "en";
        }

        int wait = getIntAttribute(rootNode.attributeValue("wait"), 60);
        int hold = getIntAttribute(rootNode.attributeValue("hold"), 1);
172 173 174 175 176
        
        String version = rootNode.attributeValue("ver");
        if (version == null || "".equals(version)) {
        	version = "1.5";
        }
Alex Wenckus's avatar
Alex Wenckus committed
177

178
        HttpSession session = createSession(connection.getRequestId(), address, connection);
179
        session.setWait(Math.min(wait, getMaxWait()));
180
        session.setHold(hold);
Alex Wenckus's avatar
Alex Wenckus committed
181
        session.setSecure(connection.isSecure());
182 183
        session.setMaxPollingInterval(getPollingInterval());
        session.setMaxRequests(getMaxRequests());
184 185 186 187 188 189 190 191 192 193
        session.setMaxPause(getMaxPause());
        
        if(session.isPollingSession()) {
        	session.setDefaultInactivityTimeout(getPollingInactivityTimeout());
        }
        else {
        	session.setDefaultInactivityTimeout(getInactivityTimeout());
        }
    	session.resetInactivityTimeout();
        
Alex Wenckus's avatar
Alex Wenckus committed
194
        // Store language and version information in the connection.
195 196 197 198 199 200
        session.setLanguage(language);
        
        String [] versionString = version.split("\\.");
        session.setMajorVersion(Integer.parseInt(versionString[0]));
        session.setMinorVersion(Integer.parseInt(versionString[1]));
        
Alex Wenckus's avatar
Alex Wenckus committed
201 202 203 204 205 206 207 208
        try {
            connection.deliverBody(createSessionCreationResponse(session));
        }
        catch (HttpConnectionClosedException e) {
            /* This won't happen here. */
        }
        catch (DocumentException e) {
            Log.error("Error creating document", e);
209 210
            throw new HttpBindException("Internal server error",
                    BoshBindingError.internalServerError);
Alex Wenckus's avatar
Alex Wenckus committed
211 212 213 214
        }
        return session;
    }

215

216 217 218 219 220 221 222 223 224 225 226
    /**
     * Returns the maximum length of a temporary session pause (in seconds) that the client MAY 
     * request.
     *
     * @return the maximum length of a temporary session pause (in seconds) that the client MAY 
     *         request.
     */
    public int getMaxPause() {
        return JiveGlobals.getIntProperty("xmpp.httpbind.client.maxpause", 300);
    }

227
    /**
228
     * Returns the longest time (in seconds) that Openfire is allowed to wait before responding to
229 230 231
     * any request during the session. This enables the client to prevent its TCP connection from
     * expiring due to inactivity, as well as to limit the delay before it discovers any network
     * failure.
232
     *
233
     * @return the longest time (in seconds) that Openfire is allowed to wait before responding to
234
     *         any request during the session.
235 236 237 238 239 240 241
     */
    public int getMaxWait() {
        return JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.wait",
                Integer.MAX_VALUE);
    }

    /**
242
     * Openfire SHOULD include two additional attributes in the session creation response element,
243 244 245 246
     * specifying the shortest allowable polling interval and the longest allowable inactivity
     * period (both in seconds). Communication of these parameters enables the client to engage in
     * appropriate behavior (e.g., not sending empty request elements more often than desired, and
     * ensuring that the periods with no requests pending are never too long).
247 248
     *
     * @return the maximum allowable period over which a client can send empty requests to the
249
     *         server.
250 251 252 253 254 255
     */
    public int getPollingInterval() {
        return JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.polling", 5);
    }

    /**
256
     * Openfire MAY limit the number of simultaneous requests the client makes with the 'requests'
257 258 259
     * attribute. The RECOMMENDED value is "2". Servers that only support polling behavior MUST
     * prevent clients from making simultaneous requests by setting the 'requests' attribute to a
     * value of "1" (however, polling is NOT RECOMMENDED). In any case, clients MUST NOT make more
260
     * simultaneous requests than specified by the Openfire.
261 262 263 264 265 266 267 268
     *
     * @return the number of simultaneous requests allowable.
     */
    public int getMaxRequests() {
        return JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.max", 2);
    }

    /**
269
     * Seconds a session has to be idle to be closed. Default is 30. Sending stanzas to the
270 271 272 273 274
     * client is not considered as activity. We are only considering the connection active when the
     * client sends some data or hearbeats (i.e. whitespaces) to the server. The reason for this is
     * that sending data will fail if the connection is closed. And if the thread is blocked while
     * sending data (because the socket is closed) then the clean up thread will close the socket
     * anyway.
275 276 277 278 279 280 281
     *
     * @return Seconds a session has to be idle to be closed.
     */
    public int getInactivityTimeout() {
        return JiveGlobals.getIntProperty("xmpp.httpbind.client.idle", 30);
    }

282 283 284 285 286 287 288 289 290 291 292 293 294 295
    /**
     * Seconds a polling session has to be idle to be closed. Default is 60. Sending stanzas to the
     * client is not considered as activity. We are only considering the connection active when the
     * client sends some data or hearbeats (i.e. whitespaces) to the server. The reason for this is
     * that sending data will fail if the connection is closed. And if the thread is blocked while
     * sending data (because the socket is closed) then the clean up thread will close the socket
     * anyway.
     *
     * @return Seconds a polling session has to be idle to be closed.
     */
    public int getPollingInactivityTimeout() {
        return JiveGlobals.getIntProperty("xmpp.httpbind.client.idle.polling", 60);
    }

296 297 298 299 300 301 302 303 304 305 306 307
    /**
     * Forwards a client request, which is related to a session, to the server. A connection is
     * created and queued up in the provided session. When a connection reaches the top of a queue
     * any pending packets bound for the client will be forwarded to the client through the
     * connection.
     *
     * @param rid the unique, sequential, requestID sent from the client.
     * @param session the HTTP session of the client that made the request.
     * @param isSecure true if the request was made over a secure channel, HTTPS, and false if it
     * was not.
     * @param rootNode the XML body of the request.
     * @return the created HTTP connection.
308
     *
309 310 311 312 313 314 315 316 317 318
     * @throws HttpBindException for several reasons: if the encoding inside of an auth packet is
     * not recognized by the server, or if the packet type is not recognized.
     * @throws HttpConnectionClosedException if the session is no longer available.
     */
    public HttpConnection forwardRequest(long rid, HttpSession session, boolean isSecure,
                                         Element rootNode) throws HttpBindException,
            HttpConnectionClosedException
    {
        //noinspection unchecked
        List<Element> elements = rootNode.elements();
319 320 321 322 323 324 325 326
    	boolean isPoll = (elements.size() == 0);
    	if ("terminate".equals(rootNode.attributeValue("type")))
    		isPoll = false;
    	else if ("true".equals(rootNode.attributeValue(new QName("restart", rootNode.getNamespaceForPrefix("xmpp")))))
    		isPoll = false;
    	else if (rootNode.attributeValue("pause") != null)
    		isPoll = false;
        HttpConnection connection = session.createConnection(rid, elements, isSecure, isPoll);
327 328 329
        if (elements.size() > 0) {
            // creates the runnable to forward the packets
            new HttpPacketSender(session).init();
330 331 332 333
        }
        return connection;
    }

334
    private HttpSession createSession(long rid, InetAddress address, HttpConnection connection) throws UnauthorizedException {
Alex Wenckus's avatar
Alex Wenckus committed
335 336 337
        // Create a ClientSession for this user.
        StreamID streamID = SessionManager.getInstance().nextStreamID();
        // Send to the server that a new client session has been created
338
        HttpSession session = sessionManager.createClientHttpSession(rid, address, streamID, connection);
Alex Wenckus's avatar
Alex Wenckus committed
339 340
        // Register that the new session is associated with the specified stream ID
        sessionMap.put(streamID.getID(), session);
341
        session.addSessionCloseListener(sessionListener);
Alex Wenckus's avatar
Alex Wenckus committed
342 343 344 345
        return session;
    }

    private static int getIntAttribute(String value, int defaultValue) {
346
        if (value == null || "".equals(value.trim())) {
Alex Wenckus's avatar
Alex Wenckus committed
347 348 349 350 351 352 353 354 355 356
            return defaultValue;
        }
        try {
            return Integer.valueOf(value);
        }
        catch (Exception ex) {
            return defaultValue;
        }
    }

357 358 359 360 361 362 363 364 365 366 367 368
    private double getDoubleAttribute(String doubleValue, double defaultValue) {
        if (doubleValue == null || "".equals(doubleValue.trim())) {
            return defaultValue;
        }
        try {
            return Double.parseDouble(doubleValue);
        }
        catch (Exception ex) {
            return defaultValue;
        }
    }

Alex Wenckus's avatar
Alex Wenckus committed
369 370 371 372
    private String createSessionCreationResponse(HttpSession session) throws DocumentException {
        Element response = DocumentHelper.createElement("body");
        response.addNamespace("", "http://jabber.org/protocol/httpbind");
        response.addNamespace("stream", "http://etherx.jabber.org/streams");
csh's avatar
csh committed
373
        response.addAttribute("from", session.getServerName());
Alex Wenckus's avatar
Alex Wenckus committed
374 375 376
        response.addAttribute("authid", session.getStreamID().getID());
        response.addAttribute("sid", session.getStreamID().getID());
        response.addAttribute("secure", Boolean.TRUE.toString());
377
        response.addAttribute("requests", String.valueOf(session.getMaxRequests()));
Alex Wenckus's avatar
Alex Wenckus committed
378
        response.addAttribute("inactivity", String.valueOf(session.getInactivityTimeout()));
379
        response.addAttribute("polling", String.valueOf(session.getMaxPollingInterval()));
Alex Wenckus's avatar
Alex Wenckus committed
380
        response.addAttribute("wait", String.valueOf(session.getWait()));
381 382 383 384 385 386 387
        if ((session.getMajorVersion() == 1 && session.getMinorVersion() >= 6) ||
        	session.getMajorVersion() > 1) {
            response.addAttribute("hold", String.valueOf(session.getHold()));
            response.addAttribute("ack", String.valueOf(session.getLastAcknowledged()));
            response.addAttribute("maxpause", String.valueOf(session.getMaxPause()));
            response.addAttribute("ver", String.valueOf(session.getMajorVersion())
            		+ "." + String.valueOf(session.getMinorVersion()));
388
        }
Alex Wenckus's avatar
Alex Wenckus committed
389 390

        Element features = response.addElement("stream:features");
391
        for (Element feature : session.getAvailableStreamFeaturesElements()) {
Alex Wenckus's avatar
Alex Wenckus committed
392 393 394 395 396 397
            features.add(feature);
        }

        return response.asXML();
    }

398
    private class HttpSessionReaper extends TimerTask {
Alex Wenckus's avatar
Alex Wenckus committed
399

400 401
        @Override
		public void run() {
402 403
            long currentTime = System.currentTimeMillis();
            for (HttpSession session : sessionMap.values()) {
404 405 406 407 408 409 410 411 412 413 414 415
            	try {
                    long lastActive = currentTime - session.getLastActivity();
                    if (Log.isDebugEnabled()) {
                    	Log.debug("Session was last active " + lastActive + " ms ago: " + session.getAddress());
                    }
                    if (lastActive > session.getInactivityTimeout() * JiveConstants.SECOND) {
                    	Log.info("Closing idle session: " + session.getAddress());
                        session.close();
                    }
            	} catch (Exception e) {
            		Log.error("Failed to determine idle state for session: " + session, e);
            	}
416
            }
Alex Wenckus's avatar
Alex Wenckus committed
417 418
        }
    }
419 420

    /**
421
     * A runner that guarantees that the packets per a session will be sent and
422
     * processed in the order in which they were received.
423 424 425 426 427 428 429 430 431
     */
    private class HttpPacketSender implements Runnable {
        private HttpSession session;

        HttpPacketSender(HttpSession session) {
            this.session = session;
        }

        public void run() {
432
            session.sendPendingPackets();
433 434 435 436 437 438
        }

        private void init() {
            sendPacketPool.execute(this);
        }
    }
Alex Wenckus's avatar
Alex Wenckus committed
439
}