24.13. (Optional) Blocking Queues |
The discussion in §22.8 introduced queues and priority queues. A blocking queue causes a thread to block when you try to add an element to a full queue or to remove an element from an empty queue. The BlockingQueue interface extends java.util.Queue and provides the synchronized put and take methods for adding an element to the head of the queue and for removing an element from the tail of the queue, as shown in Figure 24.21.
Three concrete blocking queues, ArrayBlockingQueue , LinkedBlockingQueue , and PriorityBlockingQueue , are supported in JDK 1.5, as shown in Figure 24.22. All are in the java.util.concurrent package. ArrayBlockingQueue implements a blocking queue using an array. You have to specify a capacity or an optional fairness to construct an ArrayBlockingQueue . LinkedBlockingQueue implements a blocking queue using a linked list. You may create an unbounded or bounded LinkedBlockingQueue . PriorityBlockingQueue is a priority queue. You may create an unbounded or bounded priority queue.
Note
You may create an unbounded LinkedBlockingQueue or PriorityBlockingQueue . For an unbounded queue , the put method will never block. |
Listing 24.11 gives an example of using an ArrayBlockingQueue to simplify the Consumer/Producer example in Listing 24.10. Line 5 creates an ArrayBlockingQueue to store integers. The Producer thread puts an integer into the queue (line 22), and the Consumer thread takes an integer from the queue (line 37).
1 import java.util.concurrent.*; 2 3 public class ConsumerProducerUsingBlockingQueue { 4 private static ArrayBlockingQueue<Integer> buffer = 5 new ArrayBlockingQueue<Integer>( 2 ); 6 7 public static void main(String[] args) { 8 // Create a thread pool with two threads 9 ExecutorService executor = Executors.newFixedThreadPool( 2 ); 10 executor.execute( new ProducerTask()); 11 executor.execute( new ConsumerTask()); 12 executor.shutdown(); 13 } 14 15 // A task for adding an int to the buffer 16 private static class ProducerTask implements Runnable { 17 public void run() { 18 try { 19 int i = 1 ; 20 while ( true ) { 21 System.out.println( "Producer writes " + i); 22 buffer.put(i++); // Add any value to the buffer, say, 1 23 // Put the thread into sleep 24 Thread.sleep(( int )(Math.random() * 10000 )); 25 } 26 } catch (InterruptedException ex) { 27 ex.printStackTrace(); 28 } 29 } 30 } 31 32 // A task for reading and deleting an int from the buffer 33 private static class ConsumerTask implements Runnable { 34 public void run() { 35 try { 36 while ( true ) { 37 System.out.println( "\t\t\tConsumer reads " + buffer.take() ); 38 // Put the thread into sleep 39 Thread.sleep(( int )(Math.random() * 10000 )); 40 } 41 } catch (InterruptedException ex) { 42 ex.printStackTrace(); 43 } 44 } 45 } 46 } |
In Listing 24.10, you used locks and conditions to synchronize the Producer and Consumer threads. In this program, there is no need to hand-code it, because synchronization is already implemented in ArrayBlockingQueue .