ProblemYou need to control producer-consumer implementations involving multiple threads. SolutionUse JDK 1.5's new Queue interface or the BlockingQueue subinterface. DiscussionAs an example of the simplifications possible with 1.5's java.util.Concurrent package, consider the producer-consumer program in Recipe 24.7. Example 24-13, ProdCons15.java, uses the new java.util.BlockingQueue (itself a subinterface of the new-in-1.5 java.util.Queue interface) to reimplement the program ProdCons2 from Example 24-12 in about two-thirds of the number of lines of code. With these new features, the application need not be concerned with wait( ) or the vagaries of notify( ) and the use of notifyAll( ) in its place. The application simply puts items into a queue and takes them from it. In the example, I have (as before) 4 producers and only 3 consumers, so the producers eventually wait. Running the application on one of my older notebooks, the producers' lead over the consumers increases to about 350 over the ten seconds or so of running it. Example 24-13. Prodcons15.javaimport java.util.*; import java.io.*; import java.util.concurrent.*; /** Producer-Consumer in Java, for J2SE 1.5 using concurrent. */ public class ProdCons15 { protected boolean done = false; /** Inner class representing the Producer side */ class Producer implements Runnable { protected BlockingQueue queue; Producer(BlockingQueue theQueue) { this.queue = theQueue; } public void run( ) { try { while (true) { Object justProduced = getRequestFromNetwork( ); queue.put(justProduced); System.out.println("Produced 1 object; List size now " + queue.size( )); if (done) { return; } } } catch (InterruptedException ex) { System.out.println("Producer INTERRUPTED"); } } Object getRequestFromNetwork( ) { // Simulation of reading from client try { Thread.sleep(10); // simulate time passing during read } catch (InterruptedException ex) { System.out.println("Producer Read INTERRUPTED"); } return(new Object( )); } } /** Inner class representing the Consumer side */ class Consumer implements Runnable { protected BlockingQueue queue; Consumer(BlockingQueue theQueue) { this.queue = theQueue; } public void run( ) { try { while (true) { Object obj = queue.take( ); int len = queue.size( ); System.out.println("List size now " + len); process(obj); if (done) { return; } } } catch (InterruptedException ex) { System.out.println("CONSUMER INTERRUPTED"); } } void process(Object obj) { // Thread.sleep(xxx) // Simulate time passing System.out.println("Consuming object " + obj); } } ProdCons15(int nP, int nC) { BlockingQueue myQueue = new LinkedBlockingQueue( ); for (int i=0; i<nP; i++) new Thread(new Producer(myQueue)).start( ); for (int i=0; i<nC; i++) new Thread(new Consumer(myQueue)).start( ); } public static void main(String[] args) throws IOException, InterruptedException { // Start producers and consumers int numProducers = 4; int numConsumers = 3; ProdCons15 pc = new ProdCons15(numProducers, numConsumers); // Let the simulation run for, say, 10 seconds Thread.sleep(10*1000); // End of simulation - shut down gracefully pc.done = true; } |