Executors


Constructing a new thread is somewhat expensive because it involves interaction with the operating system. If your program creates a large number of short-lived threads, then it should instead use a thread pool. A thread pool contains a number of idle threads that are ready to run. You give a Runnable to the pool, and one of the threads calls the run method. When the run method exits, the thread doesn't die but stays around to serve the next request.

Another reason to use a thread pool is to throttle the number of concurrent threads. Creating a huge number of threads can greatly degrade performance and even crash the virtual machine. If you have an algorithm that creates lots of threads, then you should use a "fixed" thread pool that bounds the total number of concurrent threads.

The Executors class has a number of static factory methods for constructing thread pools; see Table 1-2 for a summary.

Table 1-2. Executors Factory Methods

Method

Description

newCachedThreadPool

New threads are created as needed; idle threads are kept for 60 seconds.

newFixedThreadPool

The pool contains a fixed set of threads; idle threads are kept indefinitely.

newSingleThreadExecutor

A "pool" with a single thread that executes the submitted tasks sequentially.

newScheduledThreadPool

A fixed-thread pool for scheduled execution.

newSingleThreadScheduledExecutor

A single-thread "pool" for scheduled execution.


Thread Pools

Let us look at the first three methods in Table 1-2. We discuss the remaining methods on page 63. The newCachedThreadPool method constructs a thread pool that executes each task immediately, using an existing idle thread when available and creating a new thread otherwise. The newFixedThreadPool method constructs a thread pool with a fixed size. If more tasks are submitted than there are idle threads, then the unserved tasks are placed on a queue. They are run when other tasks have completed. The newSingleThreadExecutor is a degenerate pool of size 1: A single thread executes the submitted tasks, one after another. These three methods return an object of the ThreadPoolExecutor class that implements the ExecutorService interface.

You can submit a Runnable or Callable to an ExecutorService with one of the following methods:

 Future<?> submit(Runnable task) Future<T> submit(Runnable task, T result) Future<T> submit(Callable<T> task) 

The pool will run the submitted task at its earliest convenience. When you call submit, you get back a Future object that you can use to query the state of the task.

The first submit method returns an odd-looking Future<?>. You can use such an object to call isDone, cancel, or isCancelled. But the get method simply returns null upon completion.

The second version of submit also submits a Runnable, and the get method of the Future returns the given result object upon completion.

The third version submits a Callable, and the returned Future gets the result of the computation when it is ready.

When you are done with a connection pool, call shutdown. This method initiates the shutdown sequence for the pool. An executor that is shut down accepts no new tasks. When all tasks are finished, the threads in the pool die. Alternatively, you can call shutdownNow. The pool then cancels all tasks that have not yet begun and attempts to interrupt the running threads.

Here, in summary, is what you do to use a connection pool:

  1. Call the static newCachedThreadPool or newFixedThreadPool method of the Executors class.

  2. Call submit to submit Runnable or Callable objects.

  3. If you want to be able to cancel a task or if you submit Callable objects, hang on to the returned Future objects.

  4. Call shutdown when you no longer want to submit any tasks.

For example, the preceding example program produced a large number of short-lived threads, one per directory. The program in Example 1-8 uses a thread pool to launch the tasks instead.

For informational purposes, this program prints out the largest pool size during execution. This information is not available through the ExecutorService interface. For that reason, we had to cast the pool object to the ThreadPoolExecutor class.

Example 1-8. ThreadPoolTest.java

[View full width]

   1. import java.io.*;   2. import java.util.*;   3. import java.util.concurrent.*;   4.   5. public class ThreadPoolTest   6. {   7.    public static void main(String[] args) throws Exception   8.    {   9.       Scanner in = new Scanner(System.in);  10.       System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): ");  11.       String directory = in.nextLine();  12.       System.out.print("Enter keyword (e.g. volatile): ");  13.       String keyword = in.nextLine();  14.  15.       ExecutorService pool = Executors.newCachedThreadPool();  16.  17.       MatchCounter counter = new MatchCounter(new File(directory), keyword, pool);  18.       Future<Integer> result = pool.submit(counter);  19.  20.       try  21.       {  22.          System.out.println(result.get() + " matching files.");  23.       }  24.       catch (ExecutionException e)  25.       {  26.          e.printStackTrace();  27.       }  28.       catch (InterruptedException e) {}  29.       pool.shutdown();  30.  31.       int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize();  32.       System.out.println("largest pool size=" + largestPoolSize);  33.    }  34. }  35.  36. /**  37.    This task counts the files in a directory and its subdirectories that contain a  given keyword.  38. */  39. class MatchCounter implements Callable<Integer>  40. {  41.    /**  42.       Constructs a MatchCounter.  43.       @param directory the directory in which to start the search  44.       @param keyword the keyword to look for  45.       @param pool the thread pool for submitting subtasks  46.    */  47.    public MatchCounter(File directory, String keyword, ExecutorService pool)  48.    {  49.       this.directory = directory;  50.       this.keyword = keyword;  51.       this.pool = pool;  52.    }  53.  54.    public Integer call()  55.    {  56.       count = 0;  57.       try  58.       {  59.          File[] files = directory.listFiles();  60.          ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>();  61.  62.          for (File file : files)  63.             if (file.isDirectory())  64.             {  65.                MatchCounter counter = new MatchCounter(file, keyword, pool);  66.                Future<Integer> result = pool.submit(counter);  67.                results.add(result);  68.             }  69.             else  70.             {  71.                if (search(file)) count++;  72.             }  73.  74.          for (Future<Integer> result : results)  75.             try  76.             {  77.                count += result.get();  78.             }  79.             catch (ExecutionException e)  80.             {  81.                e.printStackTrace();  82.             }  83.       }  84.       catch (InterruptedException e) {}  85.       return count;  86.    }  87.  88.    /**  89.       Searches a file for a given keyword.  90.       @param file the file to search  91.       @return true if the keyword is contained in the file  92.    */  93.    public boolean search(File file)  94.    {  95.       try  96.       {  97.          Scanner in = new Scanner(new FileInputStream(file));  98.          boolean found = false;  99.          while (!found && in.hasNextLine()) 100.          { 101.             String line = in.nextLine(); 102.             if (line.contains(keyword)) found = true; 103.          } 104.          in.close(); 105.          return found; 106.       } 107.       catch (IOException e) 108.       { 109.          return false; 110.       } 111.    } 112. 113.    private File directory; 114.    private String keyword; 115.    private ExecutorService pool; 116.    private int count; 117. } 


 java.util.concurrent.Executors 5.0 

  • ExecutorService newCachedThreadPool()

    returns a cached thread pool that creates threads as needed and terminates threads that have been idle for 60 seconds.

  • ExecutorService newFixedThreadPool(int threads)

    returns a thread pool that uses the given number of threads to execute tasks.

  • ExecutorService newSingleThreadExecutor()

    returns an executor that executes tasks sequentially in a single thread.


 java.util.concurrent.ExecutorService 5.0 

  • Future<T> submit(Callable<T> task)

  • Future<T> submit(Runnable task, T result)

  • Future<?> submit(Runnable task)

    submits the given task for execution.

  • void shutdown()

    shuts down the service, completing the already submitted tasks but not accepting new submissions.


 java.util.concurrent.ThreadPoolExecutor 5.0 

  • int getLargestPoolSize()

    returns the largest size of the thread pool during the life of this executor.

Scheduled Execution

The ScheduledExecutorService interface has methods for scheduled or repeated execution of tasks. It is a generalization of java.util.Timer that allows for thread pooling. The newScheduledThreadPool and newSingleThreadScheduledExecutor methods of the Executors class return objects that implement the ScheduledExecutorService interface.

You can schedule a Runnable or Callable to run once, after an initial delay. You can also schedule a Runnable to run periodically. See the API notes for details.


 java.util.concurrent.Executors 5.0 

  • ScheduledExecutorService newScheduledThreadPool(int threads)

    returns a thread pool that uses the given number of threads to schedule tasks.

  • ScheduledExecutorService newSingleThreadScheduledExecutor()

    returns an executor that schedules tasks in a single thread.


 java.util.concurrent.ScheduledExecutorService 5.0 

  • ScheduledFuture<V> schedule(Callable<V> task, long time, TimeUnit unit)

  • ScheduledFuture<?> schedule(Runnable task, long time, TimeUnit unit)

    schedules the given task after the given time has elapsed.

  • ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit)

    schedules the given task to run periodially, every period units, after the initial delay has elapsed.

  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)

    schedules the given task to run periodially, with delay units between completion of one invocation and the start of the next, after the initial delay has elapsed.

Controlling Groups of Threads

You have seen how to use an executor service as a thread pool to increase the efficiency of task execution. Sometimes, an executor is used for a more tactical reason, simply to control a group of related tasks. For example, you can cancel all tasks in an executor with the shutdownNow method.

The invokeAny method submits all objects in a collection of Callable objects and returns the result of a completed task. You don't know which task that ispresumably, it was the one that finished most quickly. You would use this method for a search problem in which you are willing to accept any solution. For example, suppose that you need to factor a large integera computation that is required for breaking the RSA cipher. You could submit a number of tasks, each of which attempts a factorization by using numbers in a different range. As soon as one of these tasks has an answer, your computation can stop.

The invokeAll method submits all objects in a collection of Callable objects and returns a list of Future objects that represent the solutions to all tasks. You can combine the results of the computation when they are available, like this:

 ArrayList<Callable<Integer>> tasks = . . .; List<Future<Integer>> results = executor.invokeAll(tasks); for (Future<Integer> result : results)    count += result.get(); 

A disadvantage with this approach is that you may wait needlessly if the first task happens to take a long time. It would make more sense to obtain the results in the order in which they are available. This can be arranged with the ExecutorCompletionService.

Start with an executor, obtained in the usual way. Then construct an ExecutorCompletionService. Submit tasks to the completion service. The service manages a blocking queue of Future objects, containing the results of the submitted tasks as they become available. Thus, a more efficient organization for the preceding computation is the following:

 ExecutorCompletionService service = new ExecutorCompletionService(executor); for (Callable<Integer> task : tasks) service.submit(task); for (int i = 0; i < taks.size(); i++)    count += service.take().get(); 


 java.util.concurrent.ExecutorService 5.0 

  • T invokeAny(Collection<Callable<T>> tasks)

  • T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)

    executes the given tasks and returns the result of one of them. The second method throws a TimeoutException if a timeout occurs.

  • List<Future<T>> invokeAll(Collection<Callable<T>> tasks)

  • List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)

    executes the given tasks and returns the results of all of them. The second method throws a TimeoutException if a timeout occurs.


 java.util.concurrent.ExecutorCompletionService 5.0 

  • ExecutorCompletionService(Executor e)

    constructs an executor completion service that collects the results of the given executor.

  • Future<T> submit(Callable<T> task)

  • Future<T> submit(Runnable task, T result)

    submits a task to the underlying executor.

  • Future<T> take()

    removes the next completed result, blocking if no completed results are available.

  • Future<T> poll()

  • Future<T> poll(long time, TimeUnit unit)

    removes the next completed result or null if no completed results are available. The second method waits for the given time.



    Core JavaT 2 Volume II - Advanced Features
    Building an On Demand Computing Environment with IBM: How to Optimize Your Current Infrastructure for Today and Tomorrow (MaxFacts Guidebook series)
    ISBN: 193164411X
    EAN: 2147483647
    Year: 2003
    Pages: 156
    Authors: Jim Hoskins

    flylib.com © 2008-2017.
    If you may any questions please contact us: flylib@qtcs.net