1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/**
* $RCSfile$
* $Revision$
* $Date$
*
* Copyright (C) 2004 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.messenger;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.LocaleUtils;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A channel provides a mechanism to queue work units for processing. Each work unit is
* encapsulated as a ChannelMessage, and processing of each message is performed by a
* ChannelHandler.<p>
*
* As a request is handled by the system, it will travel through a sequence of channels.
* This architecture has a number of advantages:
* <ul>
* <li> Each request doesn't need to correspond to a thread. Instead, a thread pool
* in each channel processes requests from a queue.
* <li> Due to the queue at each channel, the system is much better able to respond
* to load spikes.
* </ul><p>
*
* Channels are modeled after SEDA stages. For much much more in-depth architecture information,
* refer to the <a href="http://www.cs.berkeley.edu/~mdw/proj/sandstorm/">SEDA website</a>.
*
* @author Matt Tucker
*/
public class Channel<T extends XMPPPacket> {
private String name;
private ChannelHandler channelHandler;
ThreadPoolExecutor executor;
/**
* Creates a new channel. The channel should be registered after it's created.
*
* @param name the name of the channel.
* @param channelHandler the handler for this channel.
*/
public Channel(String name, ChannelHandler<T> channelHandler) {
this.name = name;
this.channelHandler = channelHandler;
executor = new ThreadPoolExecutor(1, 8, 15, TimeUnit.SECONDS, new LinkedBlockingQueue());
}
/**
* Returns the name of the channel.
*
* @return the name of the channel.
*/
public String getName() {
return name;
}
/**
* Enqueus a message to be handled by this channel. After the ChannelHandler is done
* processing the message, it will be sent to the next channel. Messages with a higher
* priority will be handled first.
*
* @param packet an XMPP packet to add to the channel for processing.
*/
public void add(final T packet) {
Runnable r = new Runnable() {
public void run() {
try {
channelHandler.process(packet);
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
try {
packet.getOriginatingSession().getConnection().close();
}
catch (UnauthorizedException e1) {
// do nothing
}
}
}
};
executor.execute(r);
}
/**
* Returns true if the channel is currently running. The channel can be started and
* stopped by calling the start() and stop() methods.
*
* @return true if the channel is running.
*/
public boolean isRunning() {
return !executor.isShutdown();
}
/**
* Starts the channel, which means that worker threads will start processing messages
* from the queue. If the server isn't running, messages can still be enqueued.
*/
public void start() {
}
/**
* Stops the channel, which means that worker threads will stop processing messages from
* the queue. If the server isn't running, messages can still be enqueued.
*/
public synchronized void stop() {
executor.shutdown();
}
/**
* Returns the number of currently active worker threads in the channel. This value
* will always fall in between the min a max thread count.
*
* @return the current number of worker threads.
*/
public int getThreadCount() {
return executor.getPoolSize();
}
/**
* Returns the min number of threads the channel will use for processing messages.
* The channel will automatically de-allocate worker threads as the queue load shrinks,
* down to the defined minimum. This lets the channel consume fewer resources when load
* is low.
*
* @return the min number of threads that can be used by the channel.
*/
public int getMinThreadCount() {
return executor.getCorePoolSize();
}
/**
* Sets the min number of threads the channel will use for processing messages.
* The channel will automatically de-allocate worker threads as the queue load shrinks,
* down to the defined minimum. This lets the channel consume fewer resources when load
* is low.
*
* @param minThreadCount the min number of threads that can be used by the channel.
*/
public void setMinThreadCount(int minThreadCount) {
executor.setCorePoolSize(minThreadCount);
}
/**
* Returns the max number of threads the channel will use for processing messages. The
* channel will automatically allocate new worker threads as the queue load grows, up to the
* defined maximum. This lets the channel meet higher concurrency needs, but prevents too
* many threads from being allocated, which decreases overall system performance.
*
* @return the max number of threads that can be used by the channel.
*/
public int getMaxThreadCount() {
return executor.getMaximumPoolSize();
}
/**
* Sets the max number of threads the channel will use for processing messages. The channel
* will automatically allocate new worker threads as the queue size grows, up to the defined
* maximum. This lets the channel meet higher concurrency needs, but prevents too many threads
* from being allocated, which decreases overall system performance.
*
* @param maxThreadCount the max number of threads that can be used by the channel.
*/
public void setMaxThreadCount(int maxThreadCount) {
executor.setMaximumPoolSize(maxThreadCount);
}
/**
* Returns the current number of ChannelMessage objects waiting to be processed by
* the channel.
*
* @return the current number of elements in the processing queue.
*/
public int getQueueSize() {
return executor.getQueue().size();
}
}