Commit b1501e09 authored by Dave Cridland's avatar Dave Cridland

Merge pull request #432 from guusdk/OF-993

OF-993: Replace thread factory boilerplate code.
parents eb1f971e d054c47f
...@@ -26,10 +26,8 @@ import java.util.Map; ...@@ -26,10 +26,8 @@ import java.util.Map;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
...@@ -40,6 +38,7 @@ import org.jivesoftware.openfire.StreamID; ...@@ -40,6 +38,7 @@ import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.util.JiveConstants; import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.NamedThreadFactory;
import org.jivesoftware.util.TaskEngine; import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -115,16 +114,8 @@ public class HttpSessionManager { ...@@ -115,16 +114,8 @@ public class HttpSessionManager {
sendPacketPool = new ThreadPoolExecutor(getCorePoolSize(maxPoolSize), maxPoolSize, keepAlive, TimeUnit.SECONDS, sendPacketPool = new ThreadPoolExecutor(getCorePoolSize(maxPoolSize), maxPoolSize, keepAlive, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), // unbounded task queue new LinkedBlockingQueue<Runnable>(), // unbounded task queue
new ThreadFactory() { // custom thread factory for BOSH workers new NamedThreadFactory( "httpbind-worker-", true, null, Thread.currentThread().getThreadGroup(), null )
final AtomicInteger counter = new AtomicInteger(1); );
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
"httpbind-worker-" + counter.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});
sendPacketPool.prestartCoreThread(); sendPacketPool.prestartCoreThread();
......
...@@ -17,6 +17,7 @@ import org.jivesoftware.openfire.JMXManager; ...@@ -17,6 +17,7 @@ import org.jivesoftware.openfire.JMXManager;
import org.jivesoftware.openfire.net.StalledSessionsFilter; import org.jivesoftware.openfire.net.StalledSessionsFilter;
import org.jivesoftware.openfire.nio.*; import org.jivesoftware.openfire.nio.*;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.NamedThreadFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -29,7 +30,6 @@ import java.net.InetSocketAddress; ...@@ -29,7 +30,6 @@ import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* This class is responsible for accepting new (socket) connections, using Java NIO implementation provided by the * This class is responsible for accepting new (socket) connections, using Java NIO implementation provided by the
...@@ -97,7 +97,7 @@ class MINAConnectionAcceptor extends ConnectionAcceptor ...@@ -97,7 +97,7 @@ class MINAConnectionAcceptor extends ConnectionAcceptor
final int initialSize = ( configuration.getMaxThreadPoolSize() / 4 ) + 1; final int initialSize = ( configuration.getMaxThreadPoolSize() / 4 ) + 1;
final ExecutorFilter executorFilter = new ExecutorFilter( initialSize, configuration.getMaxThreadPoolSize(), 60, TimeUnit.SECONDS ); final ExecutorFilter executorFilter = new ExecutorFilter( initialSize, configuration.getMaxThreadPoolSize(), 60, TimeUnit.SECONDS );
final ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) executorFilter.getExecutor(); final ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) executorFilter.getExecutor();
final ThreadFactory threadFactory = new DelegatingThreadFactory( name + "-thread-", eventExecutor.getThreadFactory() ); final ThreadFactory threadFactory = new NamedThreadFactory( name + "-thread-", eventExecutor.getThreadFactory(), true, null );
eventExecutor.setThreadFactory( threadFactory ); eventExecutor.setThreadFactory( threadFactory );
// Construct a new socket acceptor, and configure it. // Construct a new socket acceptor, and configure it.
...@@ -290,25 +290,4 @@ class MINAConnectionAcceptor extends ConnectionAcceptor ...@@ -290,25 +290,4 @@ class MINAConnectionAcceptor extends ConnectionAcceptor
} ); } );
} }
} }
// TODO this is a utility class that can be pulled out. There are several similar implementations throughout the codebase.
private static class DelegatingThreadFactory implements ThreadFactory {
private final AtomicInteger threadId;
private final ThreadFactory originalThreadFactory;
private String threadNamePrefix;
public DelegatingThreadFactory(String threadNamePrefix, ThreadFactory originalThreadFactory) {
this.originalThreadFactory = originalThreadFactory;
threadId = new AtomicInteger(0);
this.threadNamePrefix = threadNamePrefix;
}
public Thread newThread(Runnable runnable)
{
Thread t = originalThreadFactory.newThread(runnable);
t.setName(threadNamePrefix + threadId.incrementAndGet());
t.setDaemon(true);
return t;
}
}
} }
package org.jivesoftware.util;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A thread factory that allows threads to be named.
*
* An instance will either create new Threads, or use use a delegate Thread Factory.
* When a new thread is generated, the name of the generated thread is replaced by concatenation of the provided thread
* name prefix (which is an other argument of the constructor) and a sequence number. Sequence numbers are guaranteed to
* be unique for threads generated by the same instance of this class.
*
* Optionally, this implementation allows the priority, isDaemon and threadGroup and stackSize value of the generated
* threads to be set/overridden. Note that the threadGroup and stackSize value cannot be overridden when a delegate
* thread factory is used.
*
* This implementation is thread safe when the provided delegate is thread safe.
*
* @author Guus der Kinderen, guus.der.kinderen@gmail.com
*/
public class NamedThreadFactory implements ThreadFactory
{
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String threadNamePrefix;
private final ThreadFactory delegate;
private final Boolean daemon;
private final Integer priority;
private final Long stackSize;
private final ThreadGroup threadGroup;
/**
* Constructs an instance that delegates thread creation to the thread factory passed as an argument. When the
* delegate argument is null, the instance will instantiate threads itself (similar to the functionality of the
* other constructor of this class).
*
* When null is provided for the optional arguments of this method, the values as defined by the delegate factory
* are used.
*
* @param threadNamePrefix The prefix of the name for new threads (cannot be null or an empty string).
* @param delegate The factory to which this implementation delegates to (null when no override is desired).
* @param daemon override for the isDaemon value for new threads (null when no override is desired).
* @param priority override for the priority value for new threads (null when no override is desired).
*/
public NamedThreadFactory( String threadNamePrefix, ThreadFactory delegate, Boolean daemon, Integer priority )
{
if ( threadNamePrefix == null || threadNamePrefix.isEmpty() )
{
throw new IllegalArgumentException( "Argument 'threadNamePrefix' cannot be null or an empty string." );
}
this.threadNamePrefix = threadNamePrefix;
this.delegate = delegate;
this.daemon = daemon;
this.priority = priority;
this.threadGroup = null;
this.stackSize = null;
}
/**
* Constructs a thread factory that will create new Thread instances (as opposed to using a delegate thread
* factory).
*
* When null is provided for the optional arguments of this method, default values as provided by the Thread class
* implementation will be used.
*
* @param threadNamePrefix The prefix of the name for new threads (cannot be null or an empty string).
* @param daemon the isDaemon value for new threads (null to use default value).
* @param priority override for the priority value for new threads (null to use default value).
* @param threadGroup override for the thread group (null to use default value).
* @param stackSize override for the stackSize value for new threads (null to use default value).
*/
public NamedThreadFactory( String threadNamePrefix, Boolean daemon, Integer priority, ThreadGroup threadGroup, Long stackSize )
{
if ( threadNamePrefix == null || threadNamePrefix.isEmpty() )
{
throw new IllegalArgumentException( "Argument 'threadNamePrefix' cannot be null or an empty string." );
}
this.delegate = null;
this.threadNamePrefix = threadNamePrefix;
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
this.stackSize = stackSize;
}
@Override
public Thread newThread( Runnable runnable )
{
final String name = threadNamePrefix + threadNumber.incrementAndGet();
final Thread thread;
if ( delegate != null )
{
thread = delegate.newThread( runnable );
thread.setName( name );
}
else
{
if ( stackSize != null )
{
thread = new Thread( threadGroup, runnable, name, stackSize );
}
else
{
thread = new Thread( threadGroup, runnable, name );
}
}
if ( daemon != null && thread.isDaemon() != daemon )
{
thread.setDaemon( daemon );
}
if ( priority != null && thread.getPriority() != priority )
{
thread.setPriority( priority );
}
return thread;
}
}
...@@ -24,7 +24,6 @@ import java.util.Map; ...@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Performs tasks using worker threads. It also allows tasks to be scheduled to be * Performs tasks using worker threads. It also allows tasks to be scheduled to be
...@@ -58,23 +57,8 @@ public class TaskEngine { ...@@ -58,23 +57,8 @@ public class TaskEngine {
*/ */
private TaskEngine() { private TaskEngine() {
timer = new Timer("TaskEngine-timer", true); timer = new Timer("TaskEngine-timer", true);
executor = Executors.newCachedThreadPool(new ThreadFactory() { final ThreadFactory threadFactory = new NamedThreadFactory( "TaskEngine-pool-", true, Thread.NORM_PRIORITY, Thread.currentThread().getThreadGroup(), 0L );
executor = Executors.newCachedThreadPool( threadFactory );
final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable runnable) {
// Use our own naming scheme for the threads.
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
"TaskEngine-pool-" + threadNumber.getAndIncrement(), 0);
// Make workers daemon threads.
thread.setDaemon(true);
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
});
} }
/** /**
......
...@@ -19,11 +19,14 @@ ...@@ -19,11 +19,14 @@
package org.jivesoftware.openfire.plugin.spark; package org.jivesoftware.openfire.plugin.spark;
import org.jivesoftware.util.NamedThreadFactory;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
...@@ -58,22 +61,8 @@ public class TaskEngine { ...@@ -58,22 +61,8 @@ public class TaskEngine {
*/ */
private TaskEngine() { private TaskEngine() {
timer = new Timer("timer-openfire", true); timer = new Timer("timer-openfire", true);
executor = Executors.newCachedThreadPool(new ThreadFactory() { final ThreadFactory threadFactory = new NamedThreadFactory( "pool-openfire", true, Thread.NORM_PRIORITY, Thread.currentThread().getThreadGroup(), 0L );
executor = Executors.newCachedThreadPool( threadFactory );
final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable runnable) {
// Use our own naming scheme for the threads.
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
"pool-openfire" + threadNumber.getAndIncrement(), 0);
// Make workers daemon threads.
thread.setDaemon(true);
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
});
} }
/** /**
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package org.jivesoftware.openfire.fastpath.util; package org.jivesoftware.openfire.fastpath.util;
import org.jivesoftware.util.NamedThreadFactory;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
...@@ -55,22 +57,8 @@ public class TaskEngine { ...@@ -55,22 +57,8 @@ public class TaskEngine {
*/ */
private TaskEngine() { private TaskEngine() {
timer = new Timer("timer-fastpath", true); timer = new Timer("timer-fastpath", true);
executor = Executors.newCachedThreadPool(new ThreadFactory() { final ThreadFactory threadFactory = new NamedThreadFactory( "pool-fastpath", true, Thread.NORM_PRIORITY, Thread.currentThread().getThreadGroup(), 0L );
executor = Executors.newCachedThreadPool( threadFactory );
final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable runnable) {
// Use our own naming scheme for the threads.
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
"pool-fastpath" + threadNumber.getAndIncrement(), 0);
// Make workers daemon threads.
thread.setDaemon(true);
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
});
} }
/** /**
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.jivesoftware.openfire.reporting.util; package org.jivesoftware.openfire.reporting.util;
import org.jivesoftware.util.NamedThreadFactory;
import org.picocontainer.Disposable; import org.picocontainer.Disposable;
import java.util.*; import java.util.*;
...@@ -57,22 +58,8 @@ public class TaskEngine implements Disposable { ...@@ -57,22 +58,8 @@ public class TaskEngine implements Disposable {
*/ */
private TaskEngine() { private TaskEngine() {
timer = new Timer("timer-monitoring", true); timer = new Timer("timer-monitoring", true);
executor = Executors.newCachedThreadPool(new ThreadFactory() { final ThreadFactory threadFactory = new NamedThreadFactory( "pool-monitoring", true, Thread.NORM_PRIORITY, Thread.currentThread().getThreadGroup(), 0L );
executor = Executors.newCachedThreadPool( threadFactory );
final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable runnable) {
// Use our own naming scheme for the threads.
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
"pool-monitoring" + threadNumber.getAndIncrement(), 0);
// Make workers daemon threads.
thread.setDaemon(true);
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
});
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment