Channel.java 7.36 KB
Newer Older
1 2 3 4 5
/**
 * $RCSfile$
 * $Revision: 617 $
 * $Date: 2004-12-03 05:59:50 -0300 (Fri, 03 Dec 2004) $
 *
6
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
7
 *
8 9 10 11 12 13 14 15 16 17 18
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
19 20
 */

21
package org.jivesoftware.openfire;
22

23 24 25 26
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

27 28 29 30 31 32
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.Packet;

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
/**
 * A channel provides a mechanism to queue work units for processing. Each work unit is
 * encapsulated as a ChannelMessage, and processing of each message is performed by a
 * ChannelHandler.<p>
 *
 * As a request is handled by the system, it will travel through a sequence of channels.
 * This architecture has a number of advantages:
 * <ul>
 *      <li> Each request doesn't need to correspond to a thread. Instead, a thread pool
 *          in each channel processes requests from a queue.
 *      <li> Due to the queue at each channel, the system is much better able to respond
 *          to load spikes.
 * </ul><p>
 *
 * Channels are modeled after SEDA stages. For much much more in-depth architecture information,
 * refer to the <a href="http://www.cs.berkeley.edu/~mdw/proj/sandstorm/">SEDA website</a>.
 *
 * @author Matt Tucker
 */
public class Channel<T extends Packet> {

54 55
	private static final Logger Log = LoggerFactory.getLogger(Channel.class);

56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    private String name;
    private ChannelHandler channelHandler;

    ThreadPoolExecutor executor;

    /**
     * Creates a new channel. The channel should be registered after it's created.
     *
     * @param name the name of the channel.
     * @param channelHandler the handler for this channel.
     */
    public Channel(String name, ChannelHandler<T> channelHandler) {
        this.name = name;
        this.channelHandler = channelHandler;

        executor = new ThreadPoolExecutor(1, 8, 15, TimeUnit.SECONDS, new LinkedBlockingQueue());
    }

    /**
     * Returns the name of the channel.
     *
     * @return the name of the channel.
     */
    public String getName() {
        return name;
    }

    /**
     * Enqueus a message to be handled by this channel. After the ChannelHandler is done
     * processing the message, it will be sent to the next channel. Messages with a higher
     * priority will be handled first.
     *
     * @param packet an XMPP packet to add to the channel for processing.
     */
    public void add(final T packet) {
        Runnable r = new Runnable() {
            public void run() {
                try {
                    channelHandler.process(packet);
                }
                catch (Exception e) {
                    Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
                   
                        try {
                            Session session = SessionManager.getInstance().getSession(packet.getFrom());
101
                            session.close();
102 103
                        }
                        catch (Exception e1) {
104
                           Log.error(e1.getMessage(), e1);
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
                        }
                }
            }
        };
        executor.execute(r);
    }

    /**
     * Returns true if the channel is currently running. The channel can be started and
     * stopped by calling the start() and stop() methods.
     *
     * @return true if the channel is running.
     */
    public boolean isRunning() {
        return !executor.isShutdown();
    }

    /**
     * Starts the channel, which means that worker threads will start processing messages
     * from the queue. If the server isn't running, messages can still be enqueued.
     */
    public void start() {

    }

    /**
     * Stops the channel, which means that worker threads will stop processing messages from
     * the queue. If the server isn't running, messages can still be enqueued.
     */
    public synchronized void stop() {
        executor.shutdown();
    }

    /**
     * Returns the number of currently active worker threads in the channel. This value
     * will always fall in between the min a max thread count.
     *
     * @return the current number of worker threads.
     */
    public int getThreadCount() {
        return executor.getPoolSize();
    }

    /**
     * Returns the min number of threads the channel will use for processing messages.
     * The channel will automatically de-allocate worker threads as the queue load shrinks,
     * down to the defined minimum. This lets the channel consume fewer resources when load
     * is low.
     *
     * @return the min number of threads that can be used by the channel.
     */
    public int getMinThreadCount() {
        return executor.getCorePoolSize();
    }

    /**
     * Sets the min number of threads the channel will use for processing messages.
     * The channel will automatically de-allocate worker threads as the queue load shrinks,
     * down to the defined minimum. This lets the channel consume fewer resources when load
     * is low.
     *
     * @param minThreadCount the min number of threads that can be used by the channel.
     */
    public void setMinThreadCount(int minThreadCount) {
        executor.setCorePoolSize(minThreadCount);
    }

    /**
     * Returns the max number of threads the channel will use for processing messages. The
     * channel will automatically allocate new worker threads as the queue load grows, up to the
     * defined maximum. This lets the channel meet higher concurrency needs, but prevents too
     * many threads from being allocated, which decreases overall system performance.
     *
     * @return the max number of threads that can be used by the channel.
     */
    public int getMaxThreadCount() {
        return executor.getMaximumPoolSize();
    }

    /**
     * Sets the max number of threads the channel will use for processing messages. The channel
     * will automatically allocate new worker threads as the queue size grows, up to the defined
     * maximum. This lets the channel meet higher concurrency needs, but prevents too many threads
     * from being allocated, which decreases overall system performance.
     *
     * @param maxThreadCount the max number of threads that can be used by the channel.
     */
    public void setMaxThreadCount(int maxThreadCount) {
        executor.setMaximumPoolSize(maxThreadCount);
    }

    /**
     * Returns the current number of ChannelMessage objects waiting to be processed by
     * the channel.
     *
     * @return the current number of elements in the processing queue.
     */
    public int getQueueSize() {
        return executor.getQueue().size();
    }
}