Blocking Queues


A queue is a data structure with two fundamental operations: to add an element to the tail of the queue and to remove an element from the head. That is, the queue manages the data in a first-in/first-out discipline. A blocking queue causes a thread to block when you try to add an element when the queue is currently full or to remove an element when the queue is empty. Blocking queues are a useful tool for coordinating the work of multiple threads. Worker threads can periodically deposit intermediate results in a blocking queue. Other worker threads remove the intermediate results and modify them further. The queue automatically balances the workload. If the first set of threads runs slower than the second, the second set blocks while waiting for the results. If the first set of threads runs faster, the queue fills up until the second set catches up. Table 1-1 shows the operations for blocking queues.

Table 1-1. Blocking Queue Operations

Method

Normal Action

Failure Action

add

Adds an element

Throws an IllegalStateException if the queue is full

remove

Removes and returns the head element

Throws a NoSuchElementException if the queue is empty

element

Returns the head element

Throws a NoSuchElementException if the queue is empty

offer

Adds an element and returns true

Returns false if the queue is full

poll

Removes and returns the head element

Returns null if the queue was empty

peek

Returns the head element

Returns null if the queue was empty

put

Adds an element

Blocks if the queue is full

take

Removes and returns the head element

Blocks if the queue is empty


The blocking queue operations fall into three categories, depending on their response. The add, remove, and element operations throw an exception when you try to add to a full queue or get the head of an empty queue. Of course, in a multithreaded program, the queue might become full or empty at any time, so you will instead want to use the offer, poll, and peek methods. These methods simply return with a failure indicator instead of throwing an exception if they cannot carry out their tasks.

NOTE

The poll and peek methods return null to indicate failure. Therefore, it is illegal to insert null values into these queues.


There are also variants of the offer and poll methods with a timeout. For example, the call

 boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS); 

tries for 100 milliseconds to insert an element to the tail of the queue. If it succeeds, it immediately returns TRue; otherwise, it returns false when it times out. Similarly, the call

 Object head = q.poll(100, TimeUnit.MILLISECONDS) 

returns TRue for 100 milliseconds to remove the head of the queue. If it succeeds, it immediately returns the head; otherwise, it returns null when it times out.

Finally, we have blocking operations put and take. The put method blocks if the queue is full, and the take method blocks if the queue is empty. These are the equivalents of offer and poll with no timeout.

The java.util.concurrent package supplies four variations of blocking queues. By default, the LinkedBlockingQueue has no upper bound on its capacity, but a maximum capacity can be optionally specified. The ArrayBlockingQueue is constructed with a given capacity and an optional parameter to require fairness. If fairness is specified, then the longest-waiting threads are given preferential treatment. As always, fairness exacts a significant performance penalty, and you should only use it if your problem specifically requires it.

The PriorityBlockingQueue is a priority queue, not a first-in/first-out queue. Elements are removed in order of their priority. The queue has unbounded capacity, but retrieval will block if the queue is empty. (We discuss priority queues in greater detail in Chapter 2.)

Finally, a DelayQueue contains objects that implement the Delayed interface:

 interface Delayed extends Comparable<Delayed> {    long getDelay(TimeUnit unit); } 

The getdelay method returns the remaining delay of the object. A negative value indicates that the delay has elapsed. Elements can only be removed from a DelayQueue if their delay has elapsed. You also need to implement the compareTo method. The DelayQueue uses that method to sort the entries.

The program in Example 1-6 shows how to use a blocking queue to control a set of threads. The program searches through all files in a directory and its subdirectories, printing lines that contain a given keyword.

A producer thread enumerates all files in all subdirectories and places them in a blocking queue. This operation is fast, and the queue would quickly fill up with all files in the file system if it was not bounded.

We also start a large number of search threads. Each search thread takes a file from the queue, opens it, prints all lines containing the keyword, and then takes the next file. We use a trick to terminate the application when no further work is required. In order to signal completion, the enumeration thread places a dummy object into the queue. (This is similar to a dummy suitcase with a label "last bag" in a baggage claim belt.) When a search thread takes the dummy, it puts it back and terminates.

Note that no explicit thread synchronization is required. In this application, we use the queue data structure as a synchronization mechanism.

Example 1-6. BlockingQueueTest.java

[View full width]

   1. import java.io.*;   2. import java.util.*;   3. import java.util.concurrent.*;   4.   5. public class BlockingQueueTest   6. {   7.    public static void main(String[] args)   8.    {   9.       Scanner in = new Scanner(System.in);  10.       System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): ");  11.       String directory = in.nextLine();  12.       System.out.print("Enter keyword (e.g. volatile): ");  13.       String keyword = in.nextLine();  14.  15.       final int FILE_QUEUE_SIZE = 10;  16.       final int SEARCH_THREADS = 100;  17.  18.       BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);  19.  20.       FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File (directory));  21.       new Thread(enumerator).start();  22.       for (int i = 1; i <= SEARCH_THREADS; i++)  23.          new Thread(new SearchTask(queue, keyword)).start();  24.    }  25. }  26.  27. /**  28.    This task enumerates all files in a directory and its subdirectories.  29. */  30. class FileEnumerationTask implements Runnable  31. {  32.    /**  33.       Constructs a FileEnumerationTask.  34.       @param queue the blocking queue to which the enumerated files are added  35.       @param startingDirectory the directory in which to start the enumeration  36.    */  37.    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)  38.    {  39.       this.queue = queue;  40.       this.startingDirectory = startingDirectory;  41.    }  42.  43.    public void run()  44.    {  45.       try  46.       {  47.          enumerate(startingDirectory);  48.          queue.put(DUMMY);  49.       }  50.       catch (InterruptedException e) {}  51.    }  52.  53.    /**  54.       Recursively enumerates all files in a given directory and its subdirectories  55.       @param directory the directory in which to start  56.    */  57.    public void enumerate(File directory) throws InterruptedException  58.    {  59.       File[] files = directory.listFiles();  60.       for (File file : files)      {  61.          if (file.isDirectory()) enumerate(file);  62.          else queue.put(file);  63.       }  64.    }  65.  66.    public static File DUMMY = new File("");  67.  68.    private BlockingQueue<File> queue;  69.    private File startingDirectory;  70. }  71.  72. /**  73.    This task searches files for a given keyword.  74. */  75. class SearchTask implements Runnable  76. {  77.    /**  78.       Constructs a SearchTask.  79.       @param queue the queue from which to take files  80.       @param keyword the keyword to look for  81.    */  82.    public SearchTask(BlockingQueue<File> queue, String keyword)  83.    {  84.       this.queue = queue;  85.       this.keyword = keyword;  86.    }  87.  88.    public void run()  89.    {  90.       try  91.       {  92.          boolean done = false;  93.          while (!done)  94.          {  95.             File file = queue.take();  96.             if (file == FileEnumerationTask.DUMMY) { queue.put(file); done = true; }  97.             else search(file);  98.          }  99.       } 100.       catch (IOException e) { e.printStackTrace(); } 101.       catch (InterruptedException e) {} 102.    } 103. 104.    /** 105.       Searches a file for a given keyword and prints all matching lines. 106.       @param file the file to search 107.    */ 108.    public void search(File file) throws IOException 109.    { 110.       Scanner in = new Scanner(new FileInputStream(file)); 111.       int lineNumber = 0; 112.       while (in.hasNextLine()) 113.       { 114.          lineNumber++; 115.          String line = in.nextLine(); 116.          if (line.contains(keyword)) 117.             System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line); 118.       } 119.       in.close(); 120.    } 121. 122.    private BlockingQueue<File> queue; 123.    private String keyword; 124. } 


 java.util.concurrent.ArrayBlockingQueue<E> 5.0 

  • ArrayBlockingQueue(int capacity)

  • ArrayBlockingQueue(int capacity, boolean fair)

    construct a blocking queue with the given capacity and fairness settings. The queue is implemented as a circular array.


 java.util.concurrent.LinkedBlockingQueue<E> 5.0 

  • LinkedBlockingQueue()

    constructs an unbounded blocking queue, implemented as a linked list.

  • LinkedBlockingQueue(int capacity)

    constructs a bounded blocking queue with the given capacity, implemented as a linked list.


 java.util.concurrent.DelayQueue<E extends Delayed> 5.0 

  • DelayQueue()

    constructs an unbounded bounded blocking queue of Delayed elements. Only elements whose delay has expired can be removed from the queue.


 java.util.concurrent.Delayed 5.0 

  • long getDelay(TimeUnit unit)

    gets the delay for this object, measured in the given time unit.


 java.util.concurrent.PriorityBlockingQueue<E> 5.0 

  • PriorityBlockingQueue()

  • PriorityBlockingQueue(int initialCapacity)

  • PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)

    constructs an unbounded blocking priority queue implemented as a heap.

    Parameters

    initialCapacity

    The initial capacity of the priority queue. Default is 11

     

    comparator

    The comparator used to compare elements. If not specified, the elements must implement the Comparable interface



 java.util.concurrent.BlockingQueue<E> 5.0 

  • void put(E element)

    adds the element, blocking if necessary.

  • boolean offer(E element)

  • boolean offer(E element, long time, TimeUnit unit)

    adds the given element and returns true if successful, or returns without adding the element and returns false if the queue is full. The second method blocks if necessary, until the element has been added or the time has elapsed.

  • E take()

    removes and returns the head element, blocking if necessary.

  • E poll(long time, TimeUnit unit)

    removes and returns the head element, blocking if necessary until an element is available or the time has elapsed. Returns null upon failure.


 java.util.Queue<E> 5.0 

  • E poll()

    removes and returns the head element or null if the queue is empty.

  • E peek()

    returns the head element or null if the queue is empty.



    Core JavaT 2 Volume II - Advanced Features
    Building an On Demand Computing Environment with IBM: How to Optimize Your Current Infrastructure for Today and Tomorrow (MaxFacts Guidebook series)
    ISBN: 193164411X
    EAN: 2147483647
    Year: 2003
    Pages: 156
    Authors: Jim Hoskins

    flylib.com © 2008-2017.
    If you may any questions please contact us: flylib@qtcs.net