ThreadPoolExecutor provides the base implementation for the executors returned by the newCachedThreadPool, newFixedThreadPool, and newScheduled-ThreadExecutor factories in Executors. ThreadPoolExecutor is a flexible, robust pool implementation that allows a variety of customizations.
If the default execution policy does not meet your needs, you can instantiate a ThreadPoolExecutor through its constructor and customize it as you see fit; you can consult the source code for Executors to see the execution policies for the default configurations and use them as a starting point. ThreadPoolExecutor has several constructors, the most general of which is shown in Listing 8.2.
8.3.1. Thread Creation and Teardown
The core pool size, maximum pool size, and keep-alive time govern thread creation and teardown. The core size is the target size; the implementation attempts to maintain the pool at this size even when there are no tasks to execute,[2] and will not create more threads than this unless the work queue is full.[3] The maximum pool size is the upper bound on how many pool threads can be active at once. A thread that has been idle for longer than the keep-alive time becomes a candidate for reaping and can be terminated if the current pool size exceeds the core size.
[2] Whena ThreadPoolExecutor is initially created, the core threads are not started immediately but instead as tasks are submitted, unless you call prestartAllCoreThreads.
[3] Developers are sometimes tempted to set the core size to zero so that the worker threads will eventually be torn down and therefore won't prevent the JVM from exiting, but this can cause some strange-seeming behavior in thread pools that don't use a SynchronousQueue for their work queue (as newCachedThreadPool does). If the pool is already at the core size, THReadPoolExecutor creates a new thread only if the work queue is full. So tasks submitted to a thread pool with a work queue that has any capacity and a core size of zero will not execute until the queue fills up, which is usually not what is desired. In Java 6, allowCoreThreadTimeOut allows you to request that all pool threads be able to time out; enable this feature with a core size of zero if you want a bounded thread pool with a bounded work queue but still have all the threads torn down when there is no work to do.
Listing 8.2. General Constructor for THReadPoolExecutor.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... } |
By tuning the core pool size and keep-alive times, you can encourage the pool to reclaim resources used by otherwise idle threads, making them available for more useful work. (Like everything else, this is a tradeoff: reaping idle threads incurs additional latency due to thread creation if threads must later be created when demand increases.)
The newFixedThreadPool factory sets both the core pool size and the maximum pool size to the requested pool size, creating the effect of infinite timeout; the newCachedThreadPool factory sets the maximum pool size to Integer.MAX_VALUE and the core pool size to zero with a timeout of one minute, creating the effect of an infinitely expandable thread pool that will contract again when demand decreases. Other combinations are possible using the explicit THReadPool-Executor constructor.
8.3.2. Managing Queued Tasks
Bounded thread pools limit the number of tasks that can be executed concurrently. (The single-threaded executors are a notable special case: they guarantee that no tasks will execute concurrently, offering the possibility of achieving thread safety through thread confinement.)
We saw in Section 6.1.2 how unbounded thread creation could lead to instability, and addressed this problem by using a fixed-sized thread pool instead of creating a new thread for every request. However, this is only a partial solution; it is still possible for the application to run out of resources under heavy load, just harder. If the arrival rate for new requests exceeds the rate at which they can be handled, requests will still queue up. With a thread pool, they wait in a queue of Runnables managed by the Executor instead of queueing up as threads contending for the CPU. Representing a waiting task with a Runnable and a list node is certainly a lot cheaper than with a thread, but the risk of resource exhaustion still remains if clients can throw requests at the server faster than it can handle them.
Requests often arrive in bursts even when the average request rate is fairly stable. Queues can help smooth out transient bursts of tasks, but if tasks continue to arrive too quickly you will eventually have to throttle the arrival rate to avoid running out of memory.[4] Even before you run out of memory, response time will get progressively worse as the task queue grows.
[4] This is analogous to flow control in communications networks: you may be willing to buffer a certain amount of data, but eventually you need to find a way to get the other side to stop sending you data, or throw the excess data on the floor and hope the sender retransmits it when you're not so busy.
ThreadPoolExecutor allows you to supply a BlockingQueue to hold tasks awaiting execution. There are three basic approaches to task queueing: unbounded queue, bounded queue, and synchronous handoff. The choice of queue interacts with other configuration parameters such as pool size.
The default for newFixedThreadPool and newSingleThreadExecutor is to use an unbounded LinkedBlockingQueue. Tasks will queue up if all worker threads are busy, but the queue could grow without bound if the tasks keep arriving faster than they can be executed.
A more stable resource management strategy is to use a bounded queue, such as an ArrayBlockingQueue or a bounded LinkedBlockingQueue or Priority-BlockingQueue. Bounded queues help prevent resource exhaustion but introduce the question of what to do with new tasks when the queue is full. (There are a number of possible saturation policies for addressing this problem; see Section 8.3.3.) With a bounded work queue, the queue size and pool size must be tuned together. A large queue coupled with a small pool can help reduce memory usage, CPU usage, and context switching, at the cost of potentially constraining throughput.
For very large or unbounded pools, you can also bypass queueing entirely and instead hand off tasks directly from producers to worker threads using a SynchronousQueue. A SynchronousQueue is not really a queue at all, but a mechanism for managing handoffs between threads. In order to put an element on a SynchronousQueue, another thread must already be waiting to accept the handoff. If no thread is waiting but the current pool size is less than the maximum, Thread-PoolExecutor creates a new thread; otherwise the task is rejected according to the saturation policy. Using a direct handoff is more efficient because the task can be handed right to the thread that will execute it, rather than first placing it on a queue and then having the worker thread fetch it from the queue. SynchronousQueue is a practical choice only if the pool is unbounded or if rejecting excess tasks is acceptable. The newCachedThreadPool factory uses a SynchronousQueue.
Using a FIFO queue like LinkedBlockingQueue or ArrayBlockingQueue causes tasks to be started in the order in which they arrived. For more control over task execution order, you can use a PriorityBlockingQueue, which orders tasks according to priority. Priority can be defined by natural order (if tasks implement Comparable) or by a Comparator.
The newCachedThreadPool factory is a good default choice for an Executor, providing better queuing performance than a fixed thread pool.[5] A fixed size thread pool is a good choice when you need to limit the number of concurrent tasks for resource-management purposes, as in a server application that accepts requests from network clients and would otherwise be vulnerable to overload. |
[5] This performance difference comes from the use of SynchronousQueue instead of LinkedBlocking-Queue. SynchronousQueue was replaced in Java 6 with a new nonblocking algorithm that improved throughput in Executor benchmarks by a factor of three over the Java 5.0 SynchronousQueue implementation (Scherer et al., 2006).
Bounding either the thread pool or the work queue is suitable only when tasks are independent. With tasks that depend on other tasks, bounded thread pools or queues can cause thread starvation deadlock; instead, use an unbounded pool configuration like newCachedThreadPool.[6]
[6] An alternative configuration for tasks that submit other tasks and wait for their results is to use a bounded thread pool, a SynchronousQueue as the work queue, and the caller-runs saturation policy.
8.3.3. Saturation Policies
When a bounded work queue fills up, the saturation policy comes into play. The saturation policy for a ThreadPoolExecutor can be modified by calling setRejectedExecutionHandler. (The saturation policy is also used when a task is submitted to an Executor that has been shut down.) Several implementations of RejectedExecutionHandler are provided, each implementing a different saturation policy: AbortPolicy, CallerRunsPolicy, DiscardPolicy, and DiscardOldestPolicy.
The default policy, abort, causes execute to throw the unchecked Rejected-ExecutionException; the caller can catch this exception and implement its own overflow handling as it sees fit. The discard policy silently discards the newly submitted task if it cannot be queued for execution; the discard-oldest policy discards the task that would otherwise be executed next and tries to resubmit the new task. (If the work queue is a priority queue, this discards the highest-priority element, so the combination of a discard-oldest saturation policy and a priority queue is not a good one.)
The caller-runs policy implements a form of throttling that neither discards tasks nor throws an exception, but instead tries to slow down the flow of new tasks by pushing some of the work back to the caller. It executes the newly submitted task not in a pool thread, but in the thread that calls execute. If we modified our WebServer example to use a bounded queue and the caller-runs policy, after all the pool threads were occupied and the work queue filled up the next task would be executed in the main thread during the call to execute. Since this would probably take some time, the main thread cannot submit any more tasks for at least a little while, giving the worker threads some time to catch up on the backlog. The main thread would also not be calling accept during this time, so incoming requests will queue up in the TCP layer instead of in the application. If the overload persisted, eventually the TCP layer would decide it has queued enough connection requests and begin discarding connection requests as well. As the server becomes overloaded, the overload is gradually pushed outwardfrom the pool threads to the work queue to the application to the TCP layer, and eventually to the clientenabling more graceful degradation under load.
Choosing a saturation policy or making other changes to the execution policy can be done when the Executor is created. Listing 8.3 illustrates creating a fixedsize thread pool with the caller-runs saturation policy.
Listing 8.3. Creating a Fixed-sized Thread Pool with a Bounded Queue and the Caller-runs Saturation Policy.
ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(CAPACITY)); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); |
There is no predefined saturation policy to make execute block when the work queue is full. However, the same effect can be accomplished by using a Semaphore to bound the task injection rate, as shown in BoundedExecutor in Listing 8.4. In such an approach, use an unbounded queue (there's no reason to bound both the queue size and the injection rate) and set the bound on the semaphore to be equal to the pool size plus the number of queued tasks you want to allow, since the semaphore is bounding the number of tasks both currently executing and awaiting execution.
8.3.4. Thread Factories
Whenever a thread pool needs to create a thread, it does so through a thread factory (see Listing 8.5). The default thread factory creates a new, nondaemon thread with no special configuration. Specifying a thread factory allows you to customize the configuration of pool threads. THReadFactory has a single method, newTHRead, that is called whenever a thread pool needs to create a new thread.
There are a number of reasons to use a custom thread factory. You might want to specify an UncaughtExceptionHandler for pool threads, or instantiate an instance of a custom THRead class, such as one that performs debug logging. You might want to modify the priority (generally not a very good idea; see Section 10.3.1) or set the daemon status (again, not all that good an idea; see Section 7.4.2) of pool threads. Or maybe you just want to give pool threads more meaningful names to simplify interpreting thread dumps and error logs.
Listing 8.4. Using a Semaphore to Throttle Task Submission.
@ThreadSafe public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException { semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); } } } |
Listing 8.5. ThreadFactory Interface.
public interface ThreadFactory { Thread newThread(Runnable r); } |
MyThreadFactory in Listing 8.6 illustrates a custom thread factory. It instantiates a new MyAppThread, passing a pool-specific name to the constructor so that threads from each pool can be distinguished in thread dumps and error logs. My-AppThread can also be used elsewhere in the application so that all threads can take advantage of its debugging features.
Listing 8.6. Custom Thread Factory.
public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName; } public Thread newThread(Runnable runnable) { return new MyAppThread(runnable, poolName); } } |
The interesting customization takes place in MyAppThread, shown in Listing 8.7, which lets you provide a thread name, sets a custom UncaughtException-Handler that writes a message to a Logger, maintains statistics on how many threads have been created and destroyed, and optionally writes a debug message to the log when a thread is created or terminates.
If your application takes advantage of security policies to grant permissions to particular codebases, you may want to use the privilegedThreadFactory factory method in Executors to construct your thread factory. It creates pool threads that have the same permissions, AccessControlContext, and contextClassLoader as the thread creating the privilegedThreadFactory. Otherwise, threads created by the thread pool inherit permissions from whatever client happens to be calling execute or submit at the time a new thread is needed, which could cause confusing security-related exceptions.
8.3.5. Customizing ThreadPoolExecutor After Construction
Most of the options passed to the ThreadPoolExecutor constructors can also be modified after construction via setters (such as the core thread pool size, maximum thread pool size, keep-alive time, thread factory, and rejected execution handler). If the Executor is created through one of the factory methods in Executors (except newSingleThreadExecutor), you can cast the result to THRead-PoolExecutor to access the setters as in Listing 8.8.
Executors includes a factory method, unconfigurableExecutorService, which takes an existing ExecutorService and wraps it with one exposing only the methods of ExecutorService so it cannot be further configured. Unlike the pooled implementations, newSingleThreadExecutor returns an ExecutorService wrapped in this manner, rather than a raw ThreadPoolExecutor. While a single-threaded executor is actually implemented as a thread pool with one thread, it also promises not to execute tasks concurrently. If some misguided code were to increase the pool size on a single-threaded executor, it would undermine the intended execution semantics.
Listing 8.7. Custom Thread Base Class.
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); setUncaughtExceptionHandler( new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created "+getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting "+getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } } |
Listing 8.8. Modifying an Executor Created with the Standard Factories.
ExecutorService exec = Executors.newCachedThreadPool(); if (exec instanceof ThreadPoolExecutor) ((ThreadPoolExecutor) exec).setCorePoolSize(10); else throw new AssertionError("Oops, bad assumption"); |
You can use this technique with your own executors to prevent the execution policy from being modified. If you will be exposing an ExecutorService to code you don't trust not to modify it, you can wrap it with an unconfigurableExecutorService.
Extending ThreadPoolExecutor |
Introduction
Part I: Fundamentals
Thread Safety
Sharing Objects
Composing Objects
Building Blocks
Part II: Structuring Concurrent Applications
Task Execution
Cancellation and Shutdown
Applying Thread Pools
GUI Applications
Part III: Liveness, Performance, and Testing
Avoiding Liveness Hazards
Performance and Scalability
Testing Concurrent Programs
Part IV: Advanced Topics
Explicit Locks
Building Custom Synchronizers
Atomic Variables and Nonblocking Synchronization
The Java Memory Model