Commit 54f14f24 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Improved logic when thread pool is exhausted. JM-867

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@5702 b35dd754-fafc-0310-a699-88a17e54d16e
parent 08109ba2
...@@ -13,6 +13,7 @@ package org.jivesoftware.wildfire.net; ...@@ -13,6 +13,7 @@ package org.jivesoftware.wildfire.net;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.PacketRouter; import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.RoutingTable; import org.jivesoftware.wildfire.RoutingTable;
import org.jivesoftware.wildfire.Session; import org.jivesoftware.wildfire.Session;
...@@ -29,9 +30,7 @@ import org.xmpp.packet.Presence; ...@@ -29,9 +30,7 @@ import org.xmpp.packet.Presence;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* A SocketReader specialized for connection manager connections. Connection managers may have * A SocketReader specialized for connection manager connections. Connection managers may have
...@@ -68,6 +67,17 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -68,6 +67,17 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
* Pool of threads that are available for processing the requests. * Pool of threads that are available for processing the requests.
*/ */
private ThreadPoolExecutor threadPool; private ThreadPoolExecutor threadPool;
/**
* Queue used when thread pool is exhausted (i.e. core threads, queue and max threads are all busy). Once
* the thread pool is exhausted, incoming packets will be placed into this queue. Once the queue is emptied
* incoming packets will go to the thread pool.<p>
*
* Note that the queue is unbound so we may potentially consume all Java memory. A future version may make
* Connection Managers smarter and throttle traffic to the server to avoid this problem.
*/
private BlockingQueue<Runnable> overflowBuffer = new LinkedBlockingQueue<Runnable>();
/** /**
* Handler of IQ packets sent from the Connection Manager to the server. * Handler of IQ packets sent from the Connection Manager to the server.
*/ */
...@@ -85,7 +95,45 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -85,7 +95,45 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
threadPool = threadPool =
new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS, new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize), new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy()); new RejectedExecutionHandler() {
/**
* Stores rejected tasks in the overflow queue.
* @param r the rejected task.
* @param executor thread pool executor.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
overflowBuffer.add(r);
}
}
});
// Thread that will consume packets present in the overflow queue. The thread will monitor the threadPool
// and when a thread in the pool is available it will remove the oldest packet in the overflow queue and
// process it with the idle threads.
Thread overflowThread = new Thread() {
public void run() {
while (!threadPool.isShutdown() && !threadPool.isTerminated()) {
try {
// Get the next task that has been rejected when the threadPool was exhausted
Runnable runnable = overflowBuffer.take();
// Wait until the pool has available threads
while (threadPool.getActiveCount() >= threadPool.getMaximumPoolSize()) {
Thread.sleep(100);
}
// Process the rejected task
threadPool.execute(runnable);
} catch (InterruptedException e) {
// Do nothing
}
catch(Exception e) {
Log.error("Error consuming overflow buffer", e);
}
}
}
};
overflowThread.start();
} }
boolean createSession(String namespace) boolean createSession(String namespace)
...@@ -142,11 +190,23 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -142,11 +190,23 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
return; return;
} }
// Process the packet in another thread // Process the packet in another thread
threadPool.execute(new Runnable() { Runnable runnable = new Runnable() {
public void run() { public void run() {
packetHandler.route(packet); packetHandler.route(packet);
} }
}); };
if (!overflowBuffer.isEmpty()) {
// Thread pool is exhausted or we are still recoving from a recent exhausted state.
// Keep placing tasks in this queue until the queue is empty. The queue will help us
// keep the cronological order of incoming packets. Note that if we don't care about
// being cronologically correct then we should just add the task to the threadPool.
overflowBuffer.add(runnable);
}
else {
// Thread pool is not exhausted and we are not recovering from an exhausted state so just
// run the task using the thread pool
threadPool.execute(runnable);
}
} }
protected void processMessage(final Message packet) throws UnauthorizedException { protected void processMessage(final Message packet) throws UnauthorizedException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment