Recipe 24.8 Simplifying Producer-Consumer with the 1.5 Queue Interface


Problem

You need to control producer-consumer implementations involving multiple threads.

Solution

Use JDK 1.5's new Queue interface or the BlockingQueue subinterface.

Discussion

As an example of the simplifications possible with 1.5's java.util.Concurrent package, consider the producer-consumer program in Recipe 24.7. Example 24-13, ProdCons15.java, uses the new java.util.BlockingQueue (itself a subinterface of the new-in-1.5 java.util.Queue interface) to reimplement the program ProdCons2 from Example 24-12 in about two-thirds of the number of lines of code. With these new features, the application need not be concerned with wait( ) or the vagaries of notify( ) and the use of notifyAll( ) in its place.

The application simply puts items into a queue and takes them from it. In the example, I have (as before) 4 producers and only 3 consumers, so the producers eventually wait. Running the application on one of my older notebooks, the producers' lead over the consumers increases to about 350 over the ten seconds or so of running it.

Example 24-13. Prodcons15.java
import java.util.*; import java.io.*; import java.util.concurrent.*; /** Producer-Consumer in Java, for J2SE 1.5 using concurrent.  */ public class ProdCons15 { protected boolean done = false; /** Inner class representing the Producer side */ class Producer implements Runnable {      protected BlockingQueue queue;      Producer(BlockingQueue theQueue) { this.queue = theQueue; }         public void run( ) {             try {                 while (true) {                     Object justProduced = getRequestFromNetwork( );                     queue.put(justProduced);                     System.out.println("Produced 1 object; List size now " + queue.size( ));                     if (done) {                     return;                     }                  }              } catch (InterruptedException ex) {                  System.out.println("Producer INTERRUPTED");              }         }         Object getRequestFromNetwork( ) {        // Simulation of reading from client             try {                       Thread.sleep(10); // simulate time passing during read             } catch (InterruptedException ex) {                 System.out.println("Producer Read INTERRUPTED");             }             return(new Object( ));         } } /** Inner class representing the Consumer side */ class Consumer implements Runnable {     protected BlockingQueue queue;     Consumer(BlockingQueue theQueue) { this.queue = theQueue; }     public void run( ) {          try {              while (true) {                   Object obj = queue.take( );                   int len = queue.size( );                   System.out.println("List size now " + len);                   process(obj);                   if (done) {                       return;                   }               }           } catch (InterruptedException ex) {                    System.out.println("CONSUMER INTERRUPTED");           }     }     void process(Object obj) {          // Thread.sleep(xxx) // Simulate time passing          System.out.println("Consuming object " + obj);     } } ProdCons15(int nP, int nC) {      BlockingQueue myQueue = new LinkedBlockingQueue( );      for (int i=0; i<nP; i++)           new Thread(new Producer(myQueue)).start( );      for (int i=0; i<nC; i++)           new Thread(new Consumer(myQueue)).start( ); } public static void main(String[] args) throws IOException, InterruptedException {       // Start producers and consumers       int numProducers = 4;       int numConsumers = 3;       ProdCons15 pc = new ProdCons15(numProducers, numConsumers);       // Let the simulation run for, say, 10 seconds       Thread.sleep(10*1000);        // End of simulation - shut down gracefully       pc.done = true; }



Java Cookbook
Java Cookbook, Second Edition
ISBN: 0596007019
EAN: 2147483647
Year: 2003
Pages: 409
Authors: Ian F Darwin

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net