24.12. (Optional) Case Study: Producer/Consumer |
Consider the classic Consumer/Producer example. Suppose you use a buffer to store integers. The buffer size is limited. The buffer provides the method write(int) to add an int value to the buffer and the method read() to read and delete an int value from the buffer. To synchronize the operations, use a lock with two conditions: notEmpty (i.e., buffer is not empty) and notFull (i.e., buffer is not full). When a task adds an int to the buffer, if the buffer is full, the task will wait for the notFull condition. When a task deletes an int from the buffer, if the buffer is empty, the task will wait for the notEmpty condition. The interaction between the two tasks is shown in Figure 24.19.
Listing 24.10 presents the complete program. The program contains the Buffer class (lines 48 “95) and two tasks for repeatedly producing and consuming numbers to and from the buffer (lines 16 “45). The write(int) method (line 60) adds an integer to the buffer. The read() method (line 77) deletes and returns an integer from the buffer.
For simplicity, the buffer is implemented using a linked list (lines 50 “51). Recall that linked list implements the queue interface in JDK 1.5. The conditions notEmpty and notFull on the lock are created in lines 57 “58. The conditions are bound to a lock. A lock must be acquired before a condition can be applied. If you use the wait() and notify() methods to rewrite this example, you have to designate two objects as monitors .
1 import java.util.concurrent.*; 2 import java.util.concurrent.locks.*; 3 4 public class ConsumerProducer { 5 private static Buffer buffer = new Buffer(); 6 7 public static void main(String[] args) { 8 // Create a thread pool with two threads 9 ExecutorService executor = Executors.newFixedThreadPool( 2 ); 10 executor.execute( new ProducerTask()); 11 executor.execute( new ConsumerTask()); 12 executor.shutdown(); 13 } 14 15 // A task for adding an int to the buffer 16 private static class ProducerTask implements Runnable { 17 public void run() { 18 try { 19 int i = 1 ; 20 while ( true ) { 21 System.out.println( "Producer writes " + i); 22 Buffer.write(i++); // Add a value to the buffer 23 // Put the thread into sleep 24 Thread.sleep(( int )(Math.random() * 10000 )); 25 } 26 } catch (InterruptedException ex) { 27 ex.printStackTrace(); 28 } 29 } 30 } 31 32 // A task for reading and deleting an int from the buffer 33 private static class ConsumerTask implements Runnable { 34 public void run() { 35 try { 36 while ( true ) { 37 System.out.println( "\t\t\tConsumer reads " + buffer.read()); 38 // Put the thread into sleep 39 Thread.sleep(( int )(Math.random() * 10000 )); 40 } 41 } catch (InterruptedException ex) { 42 ex.printStackTrace(); 43 } 44 } 45 } 46 47 // An inner class for buffer 48 private static class Buffer { 49 private static final int CAPACITY = 1 ; // buffer size 50 private java.util.LinkedList<Integer> queue = 51 new java.util.LinkedList<Integer>(); 52 53 // Create a new lock 54 private static Lock lock = new ReentrantLock(); 55 56 // Create two conditions 57 private static Condition notEmpty = lock.newCondition(); 58 private static Condition notFull = lock.newCondition(); 59 60 public void write( int value) { 61 lock.lock(); // Acquire the lock 62 try { 63 while (queue.size() == CAPACITY) { 64 System.out.println( "Wait for notFull condition" ); 65 notFull.await(); 66 } 67 68 queue.offer(value); 69 notEmpty.signal(); // Signal notEmpty condition 70 } catch (InterruptedException ex) { 71 ex.printStackTrace(); 72 } finally { 73 lock.unlock(); // Release the lock 74 } 75 } 76 77 public int read() { 78 int value = ; 79 lock.lock(); // Acquire the lock 80 try { 81 while (queue.isEmpty()) { 82 System.out.println( "\t\t\tWait for notEmpty condition" ); 83 notEmpty.await(); 84 } 85 86 value = queue.remove(); 87 notFull.signal(); // Signal notFull condition 88 } catch (InterruptedException ex) { 89 ex.printStackTrace(); 90 } finally { 91 lock.unlock(); // Release the lock 92 return value; 93 } 94 } 95 } 96 } |
A sample run of the program is shown in Figure 24.20.