Section 9.7. Concurrency Utilities


9.7. Concurrency Utilities

So far in this chapter we've demonstrated how to create and synchronize threads at a low level, using Java language primitives. The java.util.concurrent package and subpackages introduced with Java 5.0 build on this functionality, adding important threading utilities and codifying some common design patterns by supplying standard implementations. Roughly in order of generality, these areas include:


Thread-aware Collections implementations

The java.util.concurrent package augments the Java Collections API with several implementations for specific threading models. These include timed wait, blocking implementations of the Queue interface, as well as nonblocking, concurrent-access optimized implementations of the Queue and Map interfaces. The package also adds "copy on write" List and Set implementations for extremely efficient "almost always read" cases. These may sound complex, but actually cover some fairly simple cases very well. We'll cover the Collections API in Chapter 11.


Executors

Executors run tasks, including Runnables, and abstract the concept of thread creation and pooling from the user. Executors are intended to be a high-level replacement for the idiom of creating new threads to service a series of jobs. Along with Executors, the Callable and Future interfaces are introduced, which expand upon Runnable to allow management, value return, and exception handling.


Locks

The java.util.concurrent.locks package holds a set of classes including Lock and Condition that parallels the Java language-level synchronization primitives and promotes them to the level of a concrete API. The locks package also adds the concept of nonexclusive reader/writer locks, allowing for greater concurrency in synchronized data access.


High-level synchronization constructs

This includes the classes CyclicBarrier, CountDownLatch, Semaphore, and Exchanger. These classes implement common synchronization patterns drawn from other languages and systems and can serve as the basis for new high-level tools.


Atomic operations (sounds very James Bond, doesn't it?)

The java.util.concurrent.atomic package provides wrappers and utilities for atomic, "all-or-nothing" operations on primitive types and references. This includes simple combination atomic operations like testing a value before setting it and getting and incrementing a number in one operation.

With the possible exception of optimizations done by the Java VM for the atomic operations package, all of these utilities are implemented in pure Java, on top of the standard Java language synchronization constructs. This means that they are really just convenience utilities and don't truly add new capabilities to the language. Their main role is to offer standard patterns and idioms in Java threading and make them safer and more efficient to use. A good example of this is the Executor, which allows a user to manage a set of tasks in a predefined threading model without having to delve into creating threads at all. Higher level APIs like this both simplify coding and allow for greater optimization of the common cases.

We'll look at each of these areas in the remainder of this chapter, with the exception of the Collections implementations. We'll discuss those when we cover the Java Collections APIs in Chapter 11.

Before we dive in, we should mention that, while newly standardized in Java 5.0, the concurrency package that we'll describe has been around for some time as a product of the Java Community Process (JSR-223). The vast majority of it was originally written by Doug Lea, the author of Concurrent Programming in Java (Addison-Wesley), who deserves most of the credit for it.

9.7.1. Executors

In this chapter, we've created a lot of Threads and hopefully shown how to use them effectively. But in the grand scheme of things, threads are a fairly low-level programming tool and, without care, can be error-prone. When we recognize certain common patterns that developers reproduce over and over again using threads, it's natural to want to elevate a pattern to the level of an API. One such related pair of patterns is the concept of an executor service that manages tasks and that of a thread pool that services the tasks in an efficient way. The thread pool has been implemented and reimplemented by vast numbers of developers in one way or another over the years and when you add in features like scheduling different threading models, things can get quite complex.

To address these issues, the java.util.concurrent package includes interfaces for many default implementations of the executor pattern for common threading models. This includes sophisticated scheduling as well as asynchronous collection of results from the tasks, if they require it. In general, you can use an Executor as a replacement for creating one-off threads anywhere you need to execute Runnable objects. The advantage is that understanding and modifying the behavior of your code later is a lot easier when you work at this level.

For the simple case of running a number of tasks to completion, we can consider the base Executor interface, which executes Runnable objects for us. The neatest thing about Executor is that its companion utility class Executors is a factory for creating different kinds of Executor implementations. We'll talk about the various types it can produce in a bit, but for now let's use one called newFixedThreadPool( ), which, as its name suggests, returns an Executor that is implemented using a thread pool of a fixed size:

     Executor executor = Executors.newFixedThreadPool( 3 ) ; // 3 threads           List<Runnable> runnables = ... ;     for( Runnable task : runnables )         executor.execute( task );

Here, we are submitting a number of Runnable tasks to our Executor, which executes them using a pool with a maximum of three threads. If our list contains more than three tasks, then some of them will have to wait until a thread is free to service it. So, what happens when we submit the fourth item? The Executor interface doesn't specify that. It's up to the particular implementation to decide. Executors don't have to queue tasks, much less use a pool to service them. Some Executor implementations may block, or even execute the Runnable right on the execute( ) call in the caller's thread. In this case (and for all Executor implementations created for us by the Executors factory methods), tasks are effectively put onto an unbounded queue. In the example, our loop submits all of the tasks immediately and they are queued by the executor until the threads have serviced them.

With just a line or two of code in our example, we've been able to throttle the concurrency of our task list and avoid the details of constructing any threads ourselves. Later, if we decide we'd rather execute the tasks one at a time, the change is trivial. Next, we'll take a step up and look at manageable tasks that produce values and executors that can schedule tasks for us.

9.7.1.1 Tasks with results: Callable and Future

Since the Runnable interface was created for Threads to consume, its API doesn't really allow for feedback to the caller. The new Callable interface, which is effectively a replacement for Runnable, rectifies this situation by providing a call( ) method that both returns a result and can throw exceptions. Callable is a generic class that is parameterized by the type it returns. The following examples create a Callable that returns an integer:

     class MyCallable implements Callable<Integer> {         public Integer call(  ) { return 2+2; }     }     // or anonymously     Callable<Integer> callable = new Callable<Integer>(  ) {         public Integer call(  ) { return 2+2; }     };

There is also a convenience method for bridging Runnables to Callables in the Executors class. It takes a Runnable and a fixed value to return as a value when it completes:

     Callable<Integer> callable = Executors.callable( runnable, 42/*return value*/ );

The new Future class is used with Callable and serves as a handle to wait for and retrieve the result of the task or cancel the task before it is executed. A Future is returned by the submit( ) methods of an ExecutorService, which is essentially a beefed up Executor. We'll discuss ExecutorServices in the next section.

     Future<Integer> result = executorService.submit( callable );     int val = result.get(  );  // blocks until ready

Future is also a generic interface, which is parameterized by its return type. This explains the somewhat cute name. For example a Future<Integer> could be read as "a future integer." Future has blocking and timed wait get( ) methods to retrieve the result when it is ready as well as an isDone( ) test method and a cancel( ) method to stop the task if it hasn't started yet. If the task has been cancelled, you get a CancellationException if you attempt to retrieve the result.

Enough said about these interfaces. Next, we'll look at the ExecutorService, which uses them.

9.7.1.2 ExecutorService

Our first Executor was little more than a sinkhole for Runnables and, as we described, required knowledge of the implementation to know how it would handle tasks at all. By contrast, an ExecutorService is intended to be an asynchronous task handler. Instead of an execute( ) method, it has submit( ) methods that accept a Callable (or Runnable) and return immediately with a Future object that can be used to manage the task and collect the result later. In addition to that, an ExecutorService has a lifecycle defined by its shutdown( ) method and related methods that can be used to stop the service gracefully after tasks are completed.

ExecutorService extends Executor and, in fact, all of the implementations returned by the Executors factory methods are actually ExecutorServicesincluding the one we used in our first example. We'll look at these factory methods to see what kind of services are offered.

Executors offers three types of ExecutorService implementations:


newFixedThreadPool(int)

This is the classic thread pool with a specified maximum pool size and an unbounded queue for task submission. If a thread dies for some reason while handling a task, a new one will be created to replace it. Threads are never removed from the pool until the service is shut down.


newCachedThreadPool( )

This pool uses an open-ended number of threads that grows and shrinks with demand. The main advantage of this service is that threads are cached for a period of time and reused, eliminating the overhead of creating new threads for short-lived tasks. Threads that are not used for one minute are removed. Tasks are submitted directly to threads; there is no real queuing.


newSingleThreadExecutor( )

This ExecutorService uses a single thread to execute tasks from an unbounded queue. In this sense, it is identical to a fixed thread pool with a pool size of 1.

Let's look at a more realistic usage of an ExecutorService, drawn from the TinyHttpd example in Chapter 13. In that chapter, we create a mini web server to illustrate features of the networking APIs. Here, we won't show the networking details, but we'll implement the main request dispatching loop for the example using a thread pool executor service. (Flip to Chapter 13 to see the implementation of the Runnable client-connection handler class. But that class works equally well with both examples.) Here we go:

     public class ExecutorHttpd     {       ExecutorService executor = Executors.newFixedThreadPool(3);       public void start( int port ) throws IOException       {         final ServerSocket ss = new ServerSocket( port );         while ( !executor.isShutdown(  ) )           executor.submit( new TinyHttpdConnection( ss.accept(  ) ) );       }       public void shutdown(  ) throws InterruptedException {         executor.shutdown(  );         executor.awaitTermination( 30, TimeUnit.SECONDS );         executor.shutdownNow(  );       }       public static void main( String argv[] ) throws Exception       {         new ExecutorHttpd(  ).start( Integer.parseInt(argv[0]) );       }     }

The ExecutorHttpd class holds an instance of a fixed thread pool ExecutorService with three threads to service client connections. In the start( ) method of our class, we create a ServerSocket that accepts incoming network connections. We then enter a loop that runs as long as our service is not flagged to shut down. Inside the loop, we create a new connection handler (a Runnable instance of TinyHttpdConnection) for each connection and submit it to the executor. The shutdown( ) method of our class illustrates a graceful termination. First, we call shutdown( ) on the executor, which causes the service to stop accepting new tasks and allow the currently running ones to complete. Then we wait a reasonable period of time for all web requests to finish (30 seconds) using the awaitTermination( ) method before trying a less graceful ending with shutdownNow( ). shutdownNow( ) attempts to interrupt or otherwise stop threads as quickly as possible. We leave things there, but the method actually returns a List of tasks that remain hung after the attempt. Finally, we have a main( ) method that exercises our example by creating an instance of ExecutorHttpd on a port specified as an argument to the program.

9.7.1.3 Collective tasks

In addition to its individual task submit( ) methods, ExecutorService also offers a set of collective invokeAll( ) and invokeAny( ) executor methods that submit multiple tasks as a group and return results either when they are all complete or when the first one completes, respectively. With this, we could reproduce our first example using a List of Callables like this:

     List<Callable<Integer>> taskList = ...;     ExecutorService execService = Executors.newFixedThreadPool(3);     List<Future<Integer>> resultList = execService.invokeAll( taskList );

By contrast, the invokeAny( ) method returns just the first successfully completed task's result (canceling all the remaining unexecuted tasks):

     int result = execService.invokeAny( taskList );

Both methods also offer timed wait versions that time out after a specified period of time.

9.7.1.4 Scheduled tasks

For tasks that you'd like to run at a future time or on a periodic basis, use the ScheduledExecutorService. ScheduledExecutorService is an ExecutorService with additional "schedule" methods that take a delay for a Runnable or Callable or a period specification for a Runnable. Two additional factory methods of Executors produce scheduled executors:

     Executors.newScheduledThreadPool(int);     Executors.newSingleThreadScheduledExecutor(  );

These are exactly like the similarly named methods for regular executor services.

To execute a task in the future, you specify a delay from the current time. For example:

     ScheduledExecutorService exec = Executors.newScheduledThreadPool(3);     exec.schedule( runnable, 60, TimeUnit.SECONDS ); // run one minute in the future     // run at specified date and time     Calendar futureDate = ...; // convert calendar     Date date = futureDate.getTime(  ); // to Date     long delay  = date.getTime(  ) - System.currentTimeMillis(  ); // to relative millis     exec.schedule( runnable, delay, TimeUnit.MILLISECONDS ); // run at specified date

For periodic work, there are two kinds of recurring schedulesfixed delay and fixed rate. Fixed delay means that a fixed amount of time elapses between the end of the task's execution and the beginning of the next execution. Fixed rate means that the task should begin execution at fixed time intervals, regardless of how long the task takes. The difference comes into play when the time to execute the task is long relative to the interval. The following snippet schedules a log file cleanup to occur in 12 hours and every 12 hours thereafter:

     Runnable cleanup = new Runnable(  ) {         public void run(  ) { cleanUpLogFiles(  ); }     };     long period = 12*60*60, delay = period; // seconds     Future<?> logService = executionService.scheduleAtFixedRate(         cleanup, delay, period, TimeUnit.SECONDS );

Since the task for periodic schedules is a Runnable, the Future object does not return a useful value (it returns null) so we don't specify a parameter type in its generic type instantiation. The Future is still useful for canceling the task at a later time if we wish:

     logService.cancel(  );

We should mention that the ScheduledExecutorService bears a great deal of similarity to the java.util.Timer class that we'll discuss in Chapter 11, especially with regard to the periodic schedules. A java.util.Timer is always single-threaded, however.

9.7.1.5 CompletionService

A CompletionService is a queue-like front end to an executor service. The CompletionService provides submit( ) methods, which delegate their tasks to a particular instance of Executor, and then provides take( ) and poll( ) methods for retrieving Future results for completed tasks. Think of a CompletionService as a babysitter for the Futures, allowing you to easily gather up only completed results. ExecutorCompletionService is a concrete implementation of CompletionService that takes an Executor in its constructor:

     Executor executor = Executors.newFixedThreadPool(3);     CompletionService<Integer> completionService =         new ExecutorCompletionService<Integer>( executor );     completionService.submit( callable );     completionService.submit( runnable, resultValue );     // poll for result     Future<Integer> result = completionService.poll(  );     if ( result != null )         // use value...     // block, waiting for result     Future<Integer> result = completionService.take(  );

9.7.1.6 The ThreadPoolExecutor implementation

At various times in this chapter, we've referred to the different executor services produced by the Executors factory as different implementations of ExecutorService. But these implementations are just different configurations of a single, highly flexible implementation of ExecutorService called ThreadPoolExecutorService. You can use this implementation directly if you want; it offers some additional features. The primary constructor for THReadPoolExecutorService allows you to specify both a "core" thread pool size and a maximum size, as well as a thread timeout value for removing idle threads. The core size is a minimum number of threads which, once created, are allowed to live indefinitely. The constructor also allows you to provide the task queue (an implementation of BlockingQueue) on which new tasks are placed. This last feature allows you to govern the queuing policy yourself. You could specify a queue with a limited capacity:

         ExecutorService executorService = new ThreadPoolExecutor(         corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, taskQueue );

The ThreadPoolExecutor implementation also has methods that allow you to change the core and maximum pool size while the service is active or to "prestart" the core threads before the service is used.

Actually, these last features bring up an interesting issue. If we know that our executor service is an implementation of THReadPoolExecutor, we can cast it at runtime to get access to these extra methods and do things like change the pool size. This may not be what the designers of some services had in mind; in fact, it could be downright dangerous in the wrong hands. For this reason, Executors offers a number of "unconfigurable" wrapper methods that act something like the "unmodifiable" collection methods we'll see in the Java Collections API. These methods wrap an executor service in a delegator object that does not expose the implementation to the caller:

     ExecutorService tweakable = Executors.newFixedThreadPool(  );     ExecutorService safe = Executors.unconfigurableExecutorService( tweakable );

An application server might, for example, wrap a service to protect itself from individual applications modifying a global service to suit their own needs.

9.7.1.7 Thread production

We said that the Executor pattern is a general replacement for using Threads to run simple tasks. Although Executors shield us from Thread creation, there still may be cases where we want some control over how the threads used in our various thread pool implementations are constructed or set up. For this reason and to standardize Thread production in general, the concurrency package adds an explicit, factory API for thread creation.

The THReadFactory interface provides a newTHRead( ) method. One of these factories is used by all service implementations that create threads. All of the factory methods of Executors have an additional form that accepts an explicit ThreadFactory as an argument. You can get the default thread factory used by these with the Executors.defaultThreadFactory( ) method. You could use your own ThreadFactory to perform custom setup, such as ThreadLocal values or priorities.

9.7.2. Locks

The java.util.concurrent.locks package holds classes that mimic and expand upon the built-in Java language synchronization primitives, adding "read/write" locks among other things. As we mentioned, these classes are just utilities written in Java and don't strictly add anything new to the language semantics. However, they do provide more flexible usage at the expense of some of the built-in safety of Java language synchronization.

At the heart of the locks package are the Lock and Condition interfaces. Lock represents the same concept as a Java lock (monitor) that is associated with each object and class for use with synchronized methods and blocks. The Lock class provides for exclusive access by the owner of the lock by allowing only one party to hold the lock at a time through the lock( ) and unlock( ) methods. In Java language synchronization, this is accomplished implicitly with the synchronized keyword:

     // synchronized method     synchronized void writeData(  ) { ... }     // synchronized block     synchronized ( someObject ) {       ...     }

Upon entry to the synchronized method or block, Java acquires the lock and automatically releases it upon exiting. Even if an exception is thrown or the thread dies unexpectedly, Java automatically releases all of the locks it acquired. Using the Lock class instead requires us (or allows us, depending on how you look at it) to explicitly lock when we want the resource and remember to unlock it when we are through. The locking is not tied to any particular scope, such as a single method or code block. To reproduce the effect of the synchronized method in the example, we'd use something like:

     Lock lock = new ReentrantLock(  );     // method or block     lock.lock(  );     try {          // body of method or block ...     } finally {         lock.unlock(  )     }

The first caller to lock( ) acquires the lock and proceeds. Subsequent calls by other threads block until the lock is released. We perform the body of our locked operation in a try/finally block. This is generally important to ensure that no matter what happenswe always unlock before we leave, but you are free to implement arbitrary protocols at your own risk.

The lock implementation in this example is called ReentrantLock. The name implies that this kind of lock acts like Java locks do in that the lock is associated with the caller's thread. The owner of a lock may reacquire ("relock") the lock as many times as it wishes. For example, a recursive method that locks a resource upon entry is fine.

In addition to the standard-looking lock( ) method, the Lock interface has tryLock( ) methods that do not block or block for a specified period of time to acquire the lock. These conditional and timed wait locking forms are something that ordinary Java locks do not provide. The ReentrantLock implementation also has a notion of "fairness" that can be turned on or off when it is constructed. When fair is on, the lock attempts to hand out the lock to callers in the order in which they request it. Normal Java locks (and the default, unfair, policy of ReentrantLock) do not make this guarantee.

9.7.2.1 Read and write locks

The ReadWriteLock interface is a gateway to two different locks, one for reading and one for writing. The idea behind read/write locks is that for most resources it is okay for many "readers" to be viewing data, as long as it is not changing. Conversely, a writer of the data generally requires exclusive access to it. This is just what read/write locks do. Any number of readers may acquire the read lock as long as no write lock is in place. Only one party may hold the write lock, and no readers may hold read locks while the write lock is out. A writer may have to wait for readers to finish before acquiring the write lock, and readers may have to wait for a writer to finish before they are allowed to acquire read locks:

     ReadWriteLock rwLock = new ReentrantReadWriteLock( fair );     // reader thread 1     rwLock.readLock(  ).lock(  );     // reader thread 2     rwLock.readLock(  ).lock(  );     // writer thread     rwLock.writeLock(  ).lock(  ); // blocks on threads 1 & 2

In this code snippet, two readers hold read locks while a writer blocks waiting on the write lock. When both readers have unlock( )ed their read locks, the writer gains exclusive access to the lock and any subsequent readers block until the writer is finished.

The owner of a write lock can acquire a read lock, too, if it wishes, but not vice-versa. Acquiring a read lock and then releasing the write lock is called downgrading the lock. Trying to acquire a write lock while holding a read lock (upgrading) is not allowed and causes a deadlock.

9.7.2.2 Conditions

To complete the picture of Lock as a parallel for Java language synchronization, we need an analogue to the wait( ), notify( ), and notifyAll( )mechanism. The Condition interface represents this functionality with its await( ), signal( ), and signalAll( ) methods. A Condition is associated with a Lock by the lock's newCondition( ) method. Unlike a normal Java lock, a Lock may have multiple Condition objects that represent multiple wait sets of threads.

The Condition await( ) method is used just like the wait( ) method of a Java object within a synchronized block:

     Lock lock = ...     Condition condition = lock.newCondition(  );     lock.lock(  );     condition.await(  ); // block, waiting for signal(  )     lock.unlock(  );     // meanwhile, in another thread...     lock.lock(  );     condition.signal(  );     lock.unlock(  );

Like wait( ), the Condition await( ) method can be called only when the thread is the owner of the lock associated with the condition and the signal( ) method may be called only by another thread that has acquired the lock. Interestingly, though, in this case, these restrictions are implementation details of the java.util.concurrent package; some other implementation of these classes could conceivably change those restrictions in some way.

With the exception of the new reader/writer locks and some timed wait lock methods, it may not seem that the Locks package adds a great deal to Java. However, if you delve into it deeper, you'll find that it's also a toolkit for building new kinds of synchronization primitives and higher-level constructs. The locks package opens up a concrete implementation of Java's synchronization mechanism for all to tinker with and extend. A brief look at the implementation classes reveals nifty methods like getOwner( ) to tell you which thread owns a lock or getreadLockCount( ) to tell you how many readers are working on your data. Lots of things are possible with an open implementation like this, including specialized synchronization packages that do things like automatically detect deadlocks or tune themselves based on external information. There may also be cases where using the explicit lock API provided by this package performs better than language-level synchronization. But that probably doesn't justify the additional burden on developers except in special cases. Next, we'll move up a bit and look at some higher-level synchronization tools.

9.7.3. Synchronization Constructs

The java.util.concurrent package adds several high- and low-level synchronization utilities borrowed from other languages, including CountDownLatch, Semaphore, CyclicBarrier, and Exchanger.

9.7.3.1 CountDownLatch

The CountDownLatch is a very simple synchronization utility that allows any number of threads to block, waiting for a countdown value to reach 0 before being "released" to continue their activities. The CountDownLatch is initialized with the count when constructed. Thereafter, threads may block using the await( ) method or block for a limited period of time using the timed wait version of await( ). Any running thread may decrement the counter at any time, whether threads are blocked or not. Once the counter reaches 0, all waiting threads unblock and continue. Thereafter, any calls to await( ) do not block and the await( ) method returns false, indicating that the count has passed. The count cannot be reset.

     CountDownLatch latch = new CountDownLatch( 2 ); // count from 2     // thread 1     latch.await(  ); // blocks thread 1     // thread 2     latch.countDown(  ); // count is 1     latch.countDown(  ); // count is 0, thread 1 proceeds

Countdown latches are used in a wide variety of synchronization schemes to coordinate a number of threads on one result or cause a thread to wait for a number of other threads to produce results. Later we'll talk about a related utility, CyclicBarrier, that explicitly waits for a number of threads to synchronize to coordinate an action.

9.7.3.2 Semaphore

Semaphores are a very old synchronization construct that has been used in many other languages. Conceptually, a semaphore is a pool of permitsintangible permission slips to perform some activity. The semaphore is initialized with a specified number of permits. Callers can then use the acquire( ) and release( ) methods to take and return these permits. Calling acquire( ) when no permits are available causes the caller to block until one is released. In this way, for example, a semaphore could be used to limit access to some resource to a specified number of threads:

     int concurrentReaders = 5;     boolean fair = true;     Semaphore sem = new Semaphore( concurrentReaders, fair );     Data readData(  ) throws InterruptedException {         sem.acquire(  );         // read data ...         sem.release(  );         return data;     }

In this code snippet, readData( ) effectively limits itself to five concurrent reading threads at any given time. Additional threads are blocked in the acquire( ) method until a permit is free. In this sense, a semaphore is vaguely like a lock with multiple owners. This is where the similarity ends, however.

In actuality, a semaphore differs from a lock in several ways. First, the "pool of permits" is really just a number. No actual value is returned by acquire( ) and no association is made with the acquirer of the lock. This means that any actual locking behavior is strictly cooperative (by convention in the application). It also means that "permits" can be acquired and released by different callers without respect to who actually "acquired" them. It's really just incrementing or decrementing the number. Also, because there is no real association with an "owner," semaphores are not reentrant like real locks are. That is, if a thread calls acquire( ) multiple times, it simply decrements the counter multiple times. This behavior could be useful in some cases to count levels of recursion for security APIs, for example, but is not like a lock, in which one caller "owns" multiple permits. Finally, since the permits pool is really just a number, calling acquire( ) and release( ) out of sync can increase the permit pool beyond its starting point or decrement it below zero. It can even be initialized with a negative number if you wish to require releases before anyone may acquire a permit.

In addition to acquire( ), Semaphore has a tryAcquire( ) method that parallels the tryLock( ) method of Lock. It returns immediately, acquiring a permit if one was available and returning false otherwise. Another form of TRyAcquire( ) accepts a timed wait period. Semaphores also have a notion of "fairness" in the ordering of acquire requests. By default, requests are not guaranteed to be ordered, but if the "fair" flag is set when the Semaphore is constructed, acquire( ) doles out permits in first-in-first-out (FIFO) order. The tradeoff is that ordering may impact performance a bit, depending on the implementation.

9.7.3.3 CyclicBarrier

The CyclicBarrier class is a synchronization point where a specified number of related threads meet after finishing their activities. When all of the threads have arrived, an optional, shared barrier action can be executed and then all of the threads are "released" to continue. The class is termed "cyclic" because it can then be used again in the case where the threads repeat their activities in this manner. CyclicBarrier is an alternative to using the join( ) method, which collects threads only when they have completed and returned from their run( ) method.

The following example, SiteTimer, accepts a number of URLs on the command line and times how long it takes to connect to each one, printing the results in sorted order. It performs the connections in parallel using a dedicated thread per site and uses a CyclicBarrier for the threads to rendezvous after each timing cycle and prints the results before they begin again. This example is a little longer than we'd normally present for a minor utility like this, but it illustrates a number of Java 5.0 features, such as generics, collections, formatted printing, autoboxing, as well as an inner class. Although we haven't yet discussed collections or the network portion of the example, the usage is fairly simple and you can return to the example after reading the relevant chapters later in this book.

     import java.util.*;     import java.util.concurrent.*;     import java.net.*;     import java.io.IOException;     public class SiteTimer     {        CyclicBarrier barrier;        List<Result> results = new ArrayList<Result>(  );        private class Result implements Comparable<Result>        {           Long time;           String site;           Result( Long time, String site ) {              this.time = time;              this.site = site;           }           public int compareTo( Result r ) { return time.compareTo( r.time ); }        }        static long timeConnect( String site ) {           long start = System.currentTimeMillis(  );           try {              new URL( site ).openConnection(  ).connect(  );           } catch ( IOException e ) {              return -1;           }           return System.currentTimeMillis(  ) - start;        }        void showResults(  ) {           Collections.sort( results );           for( Result result : results )              System.out.printf( "%-30.30s : %d\n", result.site, result.time );           System.out.println("------------------");        }        public void start( String [] args )        {           Runnable showResultsAction = new Runnable(  ) {              public void run(  ) {                 showResults(  );                 results.clear(  );              } };           barrier = new CyclicBarrier( args.length, showResultsAction );           for ( final String site : args )              new Thread(  ) {                 public void run(  ) {                    while( true ) {                       long time = timeConnect( site );                       results.add( new Result( time, site ) );                       try {                          barrier.await(  );                       } catch ( BrokenBarrierException e ) { return;                       } catch ( InterruptedException e ) { return; }                    }                 }              }.start(  );        }        public static void main( String [] args ) throws IOException {           new SiteTimer(  ).start( args );        }     }

The start( ) method constructs the barrier, specifying the number of threads that must be present before the group has fully arrived and the action to perform when all of the threads are ready. For each site, a thread is created that loops, timing the connection to the site and adding a result object to the list before blocking on the barrier's await( ) method. When all of the threads reach the await( ) method, the barrier action fires, printing the results. All of the threads are then released to begin the next cycle.

If any of the waiting threads is interrupted or times out (using the timed wait version of the await( ) method) the barrier is said to be "broken" and all of the waiting threads receive a BrokenBarrierException. In theory, the barrier can be "fixed" by calling its reset( ) method, but this is complicated because only one thread from the group can reset the barrier properly. A reset( ) while any other thread is waiting causes the barrier to be broken and the waiting threads to receive the exception again, so it is probably best to start over at this point.

One more detail: the await( ) method returns an integer that indicates the order in which the threads arrived at the barrier. This can be used to divide up work for the next iteration of the threads. For example, if the threads' jobs are not identical, you could use the number to "elect" a leader thread or divide the threads into two or more groups.

9.7.3.4 Exchanger

The Exchanger is a synchronization point for a pair of threads to exchange data items. An item of the same type is passed in each direction using the exchange( ) method. The first method to arrive at the Exchanger blocks, waiting for its mate. When the second method arrives, they each receive the other's argument to the exchange( ) method. Any number of actual threads may be using the Exchanger and they are simply paired in some order when they arrive. Exchanger is a generic class that is parameterized by the type of object to be passed:

     Exchanger<ByteBuffer> xchange = new Exchanger<ByteBuffer>(  );     // thread 1     Buffer nextBuf = xchange.exchange( buffer1 ); // blocks     // thread 2     Buffer nextBuf = xchange.exchange( buffer2 );     // buffers exchanged, both threads continue...

The Exchanger pattern is primarily useful for reusing data objects or buffers between threads, as indicated in this code snippet. Say that you have a reader thread filling buffers with data and a writer thread writing the contents of the buffers somewhere. Using an Exchanger, the reader and writer can trade a pair of buffers back and forth without creating new ones. This may seem a bit arcane, but it has applications when using the NIO advanced I/O package, which we discuss in Chapters 12 and 13.

We should note that the Exchanger is similar to the SynchronousQueue, which we'll discuss in Chapter 11 when we cover the Collections API. The Exchanger, however, passes data in both directions, whereas SynchronousQueue simply passes elements in one direction.

9.7.4. Atomic Operations

The java.util.concurrent.atomic package holds an interesting set of wrapper classes for atomic, "all-or-nothing" operations on certain primitive types and reference values. An atomic operation is a kind of transaction where some sequence of events either completes or fails as a unit and there is no potential for any intermediate state to be seen. In this case, the transactions we're talking about are very simple operations that either set or get a value, possibly in combination with a simple test or mathematical operation. There are atomic wrapper classes for the following types: Booleans, integers, and long values as well as arrays of integers and longs and object references:

     AtomicBoolean.java     AtomicInteger.java     AtomicIntegerArray.java     AtomicLong.java     AtomicLongArray.java     AtomicReference.java     AtomicReferenceArray.java

The AtomicBoolean class (which, by the way, has to compete with java.awt.Robot for coolest class name in Java) serves as a good example. At first glance, it seems like an oxymoron. After all, normal operations on Booleans in Java are atomic already. There is supposed to be no possible "in-between" state for a Boolean to be misread by any horribly mangled multithreaded code (as there theoretically could be for long and double values). Instead, the usefulness of the AtomicBoolean wrapper is in its combination operations: compareAndSet( ) and getAndSet( ):

     AtomicBoolean bool = new AtomicBoolean( true );     bool.compareAndSet( expectedValue, newValue );

The compareAndSet( ) method first performs a comparison to an expected value (true or false in the case of a Boolean) and only if the value matches does it assign the new value. The interesting thing is that both of these operations happen "atomically," together. This means that there is no possibility of someone changing the value between the time that we checked it and assigned the new value. That may sound like a slim chance anyway, but it's very important for guaranteeing the semantics of flags. For example, suppose we have a master "shutdown" switch in our application and the thread that sets it wants to perform cleanup on the way out. Using compareAndSet( ) to test first, we can guarantee that only one thread can possibly set the flag and perform the procedure.

The getAndSet( ) method simply assigns the new value and returns the old value in the same, safe way. It's a little harder to see how this applies to a Boolean, so let's move on to AtomicInteger and AtomicLong. These numeric types have additional arithmetic combination operations:

     int getAndIncrement(  )     int getAndDecrement(  )     int getAndAdd(int delta)     int incrementAndGet(  )     int decrementAndGet(  )     int addAndGet(int delta)

getAndIncrement( ) increments the value and then returns the previous value. incrementAndGet( ) does the converse, returning the new value. These operations are very useful for generating unique serial numbers. For example:

     AtomicInteger serialNum = new AtomicInteger(0);     public int nextSerialNumber(  ) {         return serialNum.getAndIncrement(  );     }

We could have accomplished the same thing by synchronizing the method, but this is, in some ways, simpler and may be much faster.

Object-type references can also be wrapped for atomic operations, including compareAndSet( ) and getAndSet( ). The AtomicReference class is generic and parameterized by the type of reference it wraps:

     AtomicReference<Node> ref = new AtomicReference<Node>( node );     ref.compareAndSet( null, newNode );

9.7.4.1 Weak implementations

The compareAndSet( ) method has a strange twin named weakCompareAndSet( ), which has the dubious distinction that it simply may not work when called. It is, however, nice enough to tell you when it doesn't work by returning false. What's the point of this? Well, by allowing this fuzziness, Java may be able to make the implementation of the weak method much faster than the "certain" one. You can loop and retry the weak method instead and it may improve performance on some architectures. This is all because the Java VM may be able to map these kinds of atomic operations all the way down to the hardware level for performance, but restrictions may apply that make it difficult to guarantee.

9.7.4.2 Field updaters

The atomic package also supplies a set of "field update" utilities for each of the types that it can wrap. These utilities use reflection (see Chapter 7) to perform the kinds of atomic operations we described previously on "naked" primitive types that are not already wrapped in their atomic wrapper classes. The field updaters work on variables in an object by name and type. The catch is that atomicity is guaranteed only with respect to other callers that use the field updaters or the regular atomic wrapper classes. No guarantees are made with respect to other threads that address the variables in arbitrary ways.



    Learning Java
    Learning Java
    ISBN: 0596008732
    EAN: 2147483647
    Year: 2005
    Pages: 262

    flylib.com © 2008-2017.
    If you may any questions please contact us: flylib@qtcs.net