27.7. Key Terms

 
[Page 797 ( continued )]

24.12. (Optional) Case Study: Producer/Consumer

Consider the classic Consumer/Producer example. Suppose you use a buffer to store integers. The buffer size is limited. The buffer provides the method write(int) to add an int value to the buffer and the method read() to read and delete an int value from the buffer. To synchronize the operations, use a lock with two conditions: notEmpty (i.e., buffer is not empty) and notFull (i.e., buffer is not full). When a task adds an int to the buffer, if the buffer is full, the task will wait for the notFull condition. When a task deletes an int from the buffer, if the buffer is empty, the task will wait for the notEmpty condition. The interaction between the two tasks is shown in Figure 24.19.

Figure 24.19. The conditions notFull and notEmpty are used to coordinate task interactions.


Listing 24.10 presents the complete program. The program contains the Buffer class (lines 48 “95) and two tasks for repeatedly producing and consuming numbers to and from the buffer (lines 16 “45). The write(int) method (line 60) adds an integer to the buffer. The read() method (line 77) deletes and returns an integer from the buffer.

For simplicity, the buffer is implemented using a linked list (lines 50 “51). Recall that linked list implements the queue interface in JDK 1.5. The conditions notEmpty and notFull on the lock are created in lines 57 “58. The conditions are bound to a lock. A lock must be acquired before a condition can be applied. If you use the wait() and notify() methods to rewrite this example, you have to designate two objects as monitors .

Listing 24.10. ConsumerProducer.java
(This item is displayed on pages 797 - 799 in the print version)
 1   import   java.util.concurrent.*; 2   import   java.util.concurrent.locks.*; 3 4   public class   ConsumerProducer { 5    private static   Buffer buffer =   new   Buffer();  6 7   public static void   main(String[] args) { 8  // Create a thread pool with two threads  

[Page 798]
 9 ExecutorService executor = Executors.newFixedThreadPool(   2   ); 10 executor.execute(   new   ProducerTask()); 11 executor.execute(   new   ConsumerTask()); 12 executor.shutdown(); 13 } 14 15  // A task for adding an int to the buffer  16    private static class   ProducerTask   implements   Runnable  { 17   public void   run() { 18   try   { 19   int   i =   1   ; 20   while   (   true   ) { 21 System.out.println(   "Producer writes "   + i); 22 Buffer.write(i++);  // Add a value to the buffer  23  // Put the thread into sleep  24 Thread.sleep((   int   )(Math.random() *   10000   )); 25 } 26 }   catch   (InterruptedException ex) { 27 ex.printStackTrace(); 28 } 29 } 30 } 31 32  // A task for reading and deleting an int from the buffer  33    private static class   ConsumerTask   implements   Runnable  { 34   public void   run() { 35   try   { 36   while   (   true   ) { 37 System.out.println(   "\t\t\tConsumer reads "   + buffer.read()); 38  // Put the thread into sleep  39 Thread.sleep((   int   )(Math.random() *   10000   )); 40 } 41 }   catch   (InterruptedException ex) { 42 ex.printStackTrace(); 43 } 44 } 45 } 46 47  // An inner class for buffer  48   private static class   Buffer { 49   private static final int   CAPACITY =   1   ;  // buffer size  50   private   java.util.LinkedList<Integer> queue = 51   new   java.util.LinkedList<Integer>(); 52 53  // Create a new lock  54    private static   Lock lock =   new   ReentrantLock();  55 56  // Create two conditions  57    private static   Condition notEmpty = lock.newCondition();  58    private static   Condition notFull = lock.newCondition();  59 60   public void   write(   int   value) { 61  lock.lock();  // Acquire the lock   62   try   { 63   while   (queue.size() == CAPACITY) { 64 System.out.println(   "Wait for notFull condition"   ); 65  notFull.await();  66 } 67 

[Page 799]
 68 queue.offer(value); 69  notEmpty.signal();  // Signal notEmpty condition   70 }   catch   (InterruptedException ex) { 71 ex.printStackTrace(); 72 }   finally   { 73  lock.unlock();  // Release the lock   74 } 75 } 76 77   public int   read() { 78   int   value =     ; 79  lock.lock();  // Acquire the lock   80   try   { 81   while   (queue.isEmpty()) { 82 System.out.println(   "\t\t\tWait for notEmpty condition"   ); 83  notEmpty.await();  84 } 85 86 value = queue.remove(); 87  notFull.signal();  // Signal notFull condition   88 }   catch   (InterruptedException ex) { 89 ex.printStackTrace(); 90 }   finally   { 91  lock.unlock();  // Release the lock   92   return   value; 93 } 94 } 95 } 96 } 

A sample run of the program is shown in Figure 24.20.

Figure 24.20. Locks and conditions are used for communications between the Producer and Consumer threads.


 


Introduction to Java Programming-Comprehensive Version
Introduction to Java Programming-Comprehensive Version (6th Edition)
ISBN: B000ONFLUM
EAN: N/A
Year: 2004
Pages: 503

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net