Blocking queues are unique among the collections classes: not only do they act as containers for objects, but they can also coordinate the control flow of producer and consumer threads because take and put block until the queue enters the desired state (not empty or not full).
A synchronizer is any object that coordinates the control flow of threads based on its state. Blocking queues can act as synchronizers; other types of synchronizers include semaphores, barriers, and latches. There are a number of synchronizer classes in the platform library; if these do not meet your needs, you can also create your own using the mechanisms described in Chapter 14.
All synchronizers share certain structural properties: they encapsulate state that determines whether threads arriving at the synchronizer should be allowed to pass or forced to wait, provide methods to manipulate that state, and provide methods to wait efficiently for the synchronizer to enter the desired state.
5.5.1. Latches
A latch is a synchronizer that can delay the progress of threads until it reaches its terminal state [CPJ 3.4.2]. A latch acts as a gate: until the latch reaches the terminal state the gate is closed and no thread can pass, and in the terminal state the gate opens, allowing all threads to pass. Once the latch reaches the terminal state, it cannot change state again, so it remains open forever. Latches can be used to ensure that certain activities do not proceed until other one-time activities complete, such as:
CountDownLatch is a flexible latch implementation that can be used in any of these situations; it allows one or more threads to wait for a set of events to occur. The latch state consists of a counter initialized to a positive number, representing the number of events to wait for. The countDown method decrements the counter, indicating that an event has occurred, and the await methods wait for the counter to reach zero, which happens when all the events have occurred. If the counter is nonzero on entry, await blocks until the counter reaches zero, the waiting thread is interrupted, or the wait times out.
TestHarness in Listing 5.11 illustrates two common uses for latches. TestHarness creates a number of threads that run a given task concurrently. It uses two latches, a "starting gate" and an "ending gate". The starting gate is initialized with a count of one; the ending gate is initialized with a count equal to the number of worker threads. The first thing each worker thread does is wait on the starting gate; this ensures that none of them starts working until they all are ready to start. The last thing each does is count down on the ending gate; this allows the master thread to wait efficiently until the last of the worker threads has finished, so it can calculate the elapsed time.
Why did we bother with the latches in TestHarness instead of just starting the threads immediately after they are created? Presumably, we wanted to measure how long it takes to run a task n times concurrently. If we simply created and started the threads, the threads started earlier would have a "head start" on the later threads, and the degree of contention would vary over time as the number of active threads increased or decreased. Using a starting gate allows the master thread to release all the worker threads at once, and the ending gate allows the master thread to wait for the last thread to finish rather than waiting sequentially for each thread to finish.
5.5.2. FutureTask
FutureTask also acts like a latch. (FutureTask implements Future, which describes an abstract result-bearing computation [CPJ 4.3.3].) A computation represented by a FutureTask is implemented with a Callable, the result-bearing equivalent of Runnable, and can be in one of three states: waiting to run, running, or completed. Completion subsumes all the ways a computation can complete, including normal completion, cancellation, and exception. Once a FutureTask enters the completed state, it stays in that state forever.
The behavior of Future.get depends on the state of the task. If it is completed, get returns the result immediately, and otherwise blocks until the task transitions to the completed state and then returns the result or throws an exception. FutureTask conveys the result from the thread executing the computation to the thread(s) retrieving the result; the specification of FutureTask guarantees that this transfer constitutes a safe publication of the result.
Listing 5.11. Using CountDownLatch for Starting and Stopping Threads in Timing Tests.
public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end-start; } } |
FutureTask is used by the Executor framework to represent asynchronous tasks, and can also be used to represent any potentially lengthy computation that can be started before the results are needed. Preloader in Listing 5.12 uses FutureTask to perform an expensive computation whose results are needed later; by starting the computation early, you reduce the time you would have to wait later when you actually need the results.
Listing 5.12. Using FutureTask to Preload Data that is Needed Later.
public class Preloader { private final FutureTask future = new FutureTask(new Callable() { public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; else throw launderThrowable(cause); } } } |
Preloader creates a FutureTask that describes the task of loading product information from a database and a thread in which the computation will be performed. It provides a start method to start the thread, since it is inadvisable to start a thread from a constructor or static initializer. When the program later needs the ProductInfo, it can call get, which returns the loaded data if it is ready, or waits for the load to complete if not.
Tasks described by Callable can throw checked and unchecked exceptions, and any code can throw an Error. Whatever the task code may throw, it is wrapped in an ExecutionException and rethrown from Future.get. This complicates code that calls get, not only because it must deal with the possibility of ExecutionException (and the unchecked CancellationException), but also because the cause of the ExecutionException is returned as a THRowable, which is inconvenient to deal with.
When get throws an ExecutionException in Preloader, the cause will fall into one of three categories: a checked exception thrown by the Callable, a RuntimeException, or an Error. We must handle each of these cases separately, but we will use the launderThrowable utility method in Listing 5.13 to encapsulate some of the messier exception-handling logic. Before calling launderThrowable, Preloader tests for the known checked exceptions and rethrows them. That leaves only unchecked exceptions, which Preloader handles by calling launderThrowable and throwing the result. If the Throwable passed to launderThrowable is an Error, launderThrowable rethrows it directly; if it is not a RuntimeException, it throws an IllegalStateException to indicate a logic error. That leaves only RuntimeException, which launderThrowable returns to its caller, and which the caller generally rethrows.
Listing 5.13. Coercing an Unchecked Throwable to a RuntimeException.
/** If the Throwable is an Error, throw it; if it is a * RuntimeException return it, otherwise throw IllegalStateException */ public static RuntimeException launderThrowable(Throwable t) { if (t instanceof RuntimeException) return (RuntimeException) t; else if (t instanceof Error) throw (Error) t; else throw new IllegalStateException("Not unchecked", t); } |
5.5.3. Semaphores
Counting semaphores are used to control the number of activities that can access a certain resource or perform a given action at the same time [CPJ 3.4.1]. Counting semaphores can be used to implement resource pools or to impose a bound on a collection.
A Semaphore manages a set of virtual permits; the initial number of permits is passed to the Semaphore constructor. Activities can acquire permits (as long as some remain) and release permits when they are done with them. If no permit is available, acquire blocks until one is (or until interrupted or the operation times out). The release method returns a permit to the semaphore. [4] A degenerate case of a counting semaphore is a binary semaphore, a Semaphore with an initial count of one. A binary semaphore can be used as a mutex with nonreentrant locking semantics; whoever holds the sole permit holds the mutex.
[4] The implementation has no actual permit objects, and Semaphore does not associate dispensed permits with threads, so a permit acquired in one thread can be released from another thread. You can think of acquire as consuming a permit and release as creating one; a Semaphore is not limited to the number of permits it was created with.
Semaphores are useful for implementing resource pools such as database connection pools. While it is easy to construct a fixed-sized pool that fails if you request a resource from an empty pool, what you really want is to block if the pool is empty and unblock when it becomes nonempty again. If you initialize a Semaphore to the pool size, acquire a permit before trying to fetch a resource from the pool, and release the permit after putting a resource back in the pool, acquire blocks until the pool becomes nonempty. This technique is used in the bounded buffer class in Chapter 12. (An easier way to construct a blocking object pool would be to use a BlockingQueue to hold the pooled resources.)
Similarly, you can use a Semaphore to turn any collection into a blocking bounded collection, as illustrated by BoundedHashSet in Listing 5.14. The semaphore is initialized to the desired maximum size of the collection. The add operation acquires a permit before adding the item into the underlying collection. If the underlying add operation does not actually add anything, it releases the permit immediately. Similarly, a successful remove operation releases a permit, enabling more elements to be added. The underlying Set implementation knows nothing about the bound; this is handled by BoundedHashSet.
5.5.4. Barriers
We have seen how latches can facilitate starting a group of related activities or waiting for a group of related activities to complete. Latches are single-use objects; once a latch enters the terminal state, it cannot be reset.
Barriers are similar to latches in that they block a group of threads until some event has occurred [CPJ 4.4.3]. The key difference is that with a barrier, all the threads must come together at a barrier point at the same time in order to proceed. Latches are for waiting for events; barriers are for waiting for other threads. A barrier implements the protocol some families use to rendezvous during a day at the mall: "Everyone meet at McDonald's at 6:00; once you get there, stay there until everyone shows up, and then we'll figure out what we're doing next."
CyclicBarrier allows a fixed number of parties to rendezvous repeatedly at a barrier point and is useful in parallel iterative algorithms that break down a problem into a fixed number of independent subproblems. Threads call await when they reach the barrier point, and await blocks until all the threads have reached the barrier point. If all threads meet at the barrier point, the barrier has been successfully passed, in which case all threads are released and the barrier is reset so it can be used again. If a call to await times out or a thread blocked in await is interrupted, then the barrier is considered broken and all outstanding calls to await terminate with BrokenBarrierException. If the barrier is successfully passed, await returns a unique arrival index for each thread, which can be used to "elect" a leader that takes some special action in the next iteration. CyclicBar rier also lets you pass a barrier action to the constructor; this is a Runnable that is executed (in one of the subtask threads) when the barrier is successfully passed but before the blocked threads are released.
Listing 5.14. Using Semaphore to Bound a Collection.
public class BoundedHashSet { private final Set set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet()); sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) sem.release(); } } public boolean remove(Object o) { boolean wasRemoved = set.remove(o); if (wasRemoved) sem.release(); return wasRemoved; } } |
Barriers are often used in simulations, where the work to calculate one step can be done in parallel but all the work associated with a given step must complete before advancing to the next step. For example, in n-body particle simulations, each step calculates an update to the position of each particle based on the locations and other attributes of the other particles. Waiting on a barrier between each update ensures that all updates for step k have completed before moving on to step k + 1.
CellularAutomata in Listing 5.15 demonstrates using a barrier to compute a cellular automata simulation, such as Conway's Life game (Gardner, 1970). When parallelizing a simulation, it is generally impractical to assign a separate thread to each element (in the case of Life, a cell); this would require too many threads, and the overhead of coordinating them would dwarf the computation. Instead, it makes sense to partition the problem into a number of subparts, let each thread solve a subpart, and then merge the results. CellularAutomata partitions the board into Ncpu parts, where Ncpu is the number of CPUs available, and assigns each part to a thread. [5] At each step, the worker threads calculate new values for all the cells in their part of the board. When all worker threads have reached the barrier, the barrier action commits the new values to the data model. After the barrier action runs, the worker threads are released to compute the next step of the calculation, which includes consulting an isDone method to determine whether further iterations are required.
[5] For computational problems like this that do no I/O and access no shared data, Ncpu or Ncpu + 1 threads yield optimal throughput; more threads do not help, and may in fact degrade performance as the threads compete for CPU and memory resources.
Another form of barrier is Exchanger, a two-party barrier in which the parties exchange data at the barrier point [CPJ 3.4.3]. Exchangers are useful when the parties perform asymmetric activities, for example when one thread fills a buffer with data and the other thread consumes the data from the buffer; these threads could use an Exchanger to meet and exchange a full buffer for an empty one. When two threads exchange objects via an Exchanger, the exchange constitutes a safe publication of both objects to the other party.
The timing of the exchange depends on the responsiveness requirements of the application. The simplest approach is that the filling task exchanges when the buffer is full, and the emptying task exchanges when the buffer is empty; this minimizes the number of exchanges but can delay processing of some data if the arrival rate of new data is unpredictable. Another approach would be that the filler exchanges when the buffer is full, but also when the buffer is partially filled and a certain amount of time has elapsed.
Introduction
Part I: Fundamentals
Thread Safety
Sharing Objects
Composing Objects
Building Blocks
Part II: Structuring Concurrent Applications
Task Execution
Cancellation and Shutdown
Applying Thread Pools
GUI Applications
Part III: Liveness, Performance, and Testing
Avoiding Liveness Hazards
Performance and Scalability
Testing Concurrent Programs
Part IV: Advanced Topics
Explicit Locks
Building Custom Synchronizers
Atomic Variables and Nonblocking Synchronization
The Java Memory Model