Producer/Consumer Relationship without Synchronization

Producer Consumer Relationship without Synchronization

In a producer/consumer relationship, the producer portion of an application generates data and stores it in a shared object, and the consumer portion of an application reads data from the shared object. One example of a common producer/consumer relationship is print spooling. A word processor spools data to a buffer (typically a file) and that data is subsequently consumed by the printer as it prints the document. Similarly, an application that copies data onto compact discs places data in a fixed-size buffer that is emptied as the CD-RW drive burns the data onto the compact disc.

In a multithreaded producer/consumer relationship, a producer thread generates data and places it in a shared object called a buffer. A consumer thread reads data from the buffer. If the producer waiting to put the next data into the buffer determines that the consumer has not yet read the previous data from the buffer, the producer thread should call await so that the consumer can read the data before further updatesotherwise the consumer never sees the previous data and that data is lost to the application. When the consumer thread reads the data, it should call signal to allow a waiting producer to store the next value. If a consumer thread finds the buffer empty or finds that the previous data has already been read, the consumer should call awaitotherwise the consumer might read old data again from the buffer. When the producer places the next data into the buffer, the producer should call signal to allow the consumer thread to proceed, so that the consumer can read the new data.

Let us consider how logic errors can arise if we do not synchronize access among multiple threads manipulating shared data. Our next example (Fig. 23.6Fig. 23.10) implements a producer/consumer relationship in which a producer thread writes the numbers 1 through 10 into a shared buffera memory location shared between two threads (a single int variable called buffer in line 6 of Fig. 23.9 in this example). The consumer thread reads this data from the shared buffer and displays the data. The program's output shows the values that the producer writes (produces) into the shared buffer and the values that the consumer reads (consumes) from the shared buffer.

Figure 23.6. Buffer interface used in producer/consumer examples.

(This item is displayed on page 1065 in the print version)

 1 // Fig. 23.6: Buffer.java
 2 // Buffer interface specifies methods called by Producer and Consumer.
 3
 4 public interface Buffer
 5 {
 6 public void set( int value ); // place int value into Buffer
 7 public int get(); // return int value from Buffer
 8 } // end interface Buffer

Each value the producer thread writes to the shared buffer must be consumed exactly once by the consumer thread. However, the threads in this example are not synchronized. Therefore, data can be lost if the producer places new data into the shared buffer before the consumer consumes the previous data. Also, data can be incorrectly duplicated if the consumer consumes data again before the producer produces the next value. To show these possibilities, the consumer thread in the following example keeps a total of all the values it reads. The producer thread produces values from 1 through 10. If the consumer reads each value produced once and only once, the total will be 55. However, if you execute this program several times, you will see that the total is not always 55 (as shown in the outputs in Fig. 23.10). To emphasize the point, the producer and consumer threads in the example each sleep for random intervals of up to three seconds between performing their tasks. Thus, we do not know exactly when the producer thread will attempt to write a new value, nor do we know when the consumer thread will attempt to read a value.

The program consists of interface Buffer (Fig. 23.6) and four classesProducer (Fig. 23.7), Consumer (Fig. 23.8), UnsynchronizedBuffer (Fig. 23.9) and SharedBufferTest (Fig. 23.10). Interface Buffer declares methods set and get that a Buffer must implement to enable the Producer tHRead to place a value in the Buffer and the Consumer tHRead to retrieve a value from the Buffer. We will see the implementation of this interface in Fig. 23.9.

Figure 23.7. Producer represents the producer thread in a producer/consumer relationship.

(This item is displayed on page 1066 in the print version)

 1 // Fig. 23.7: Producer.java
 2 // Producer's run method stores the values 1 to 10 in buffer.
 3 import java.util.Random;
 4
 5 public class Producer implements Runnable
 6 {
 7 private static Random generator = new Random();
 8 private Buffer sharedLocation; // reference to shared object
 9
10 // constructor
11 public Producer( Buffer shared )
12 {
13 sharedLocation = shared;
14 } // end Producer constructor
15
16 // store values from 1 to 10 in sharedLocation
17 public void run() 
18 {
19 int sum = 0;
20
21 for ( int count = 1; count <= 10; count++ )
22 {
23 try // sleep 0 to 3 seconds, then place value in Buffer
24 {
25 Thread.sleep( generator.nextInt( 3000 ) ); // sleep thread
26 sharedLocation.set( count ); // set value in buffer
27 sum += count; // increment sum of values
28 System.out.printf( "	%2d
", sum );
29 } // end try
30 // if sleeping thread interrupted, print stack trace
31 catch ( InterruptedException exception )
32 {
33 exception.printStackTrace();
34 } // end catch
35 } // end for
36
37 System.out.printf( "
%s
%s
", "Producer done producing.",
38 "Terminating Producer." );
39 } // end method run
40 } // end class Producer

Figure 23.8. Consumer represents the consumer thread in a producer/consumer relationship.

(This item is displayed on page 1067 in the print version)

 1 // Fig. 23.8: Consumer.java
 2 // Consumer's run method loops ten times reading a value from buffer.
 3 import java.util.Random;
 4
 5 public class Consumer implements Runnable
 6 {
 7 private static Random generator = new Random();
 8 private Buffer sharedLocation; // reference to shared object
 9
10 // constructor
11 public Consumer( Buffer shared )
12 {
13 sharedLocation = shared;
14 } // end Consumer constructor
15
16 // read sharedLocation's value four times and sum the values
17 public void run() 
18 {
19 int sum = 0;
20
21 for ( int count = 1; count <= 10; count++ )
22 {
23 // sleep 0 to 3 seconds, read value from buffer and add to sum
24 try
25 {
26 Thread.sleep( generator.nextInt( 3000 ) );
27 sum += sharedLocation.get();
28 System.out.printf( "			%2d
", sum );
29 } // end try
30 // if sleeping thread interrupted, print stack trace
31 catch ( InterruptedException exception )
32 {
33 exception.printStackTrace();
34 } // end catch
35 } // end for
36
37 System.out.printf( "
%s %d.
%s
",
38 "Consumer read values totaling", sum, "Terminating Consumer." );
39 } // end method run
40 } // end class Consumer

Figure 23.9. UnsynchronizedBuffer maintains the shared integer that is accessed by a producer thread and a consumer thread via methods set and get.

(This item is displayed on page 1068 in the print version)

 1 // Fig. 23.9: UnsynchronizedBuffer.java
 2 // UnsynchronizedBuffer represents a single shared integer.
 3
 4 public class UnsynchronizedBuffer implements Buffer
 5 {
 6 private int buffer = -1; // shared by producer and consumer threads
 7
 8 // place value into buffer
 9 public void set( int value )
10 {
11 System.out.printf( "Producer writes	%2d", value );
12 buffer = value;
13 } // end method set
14
15 // return value from buffer
16 public int get()
17 {
18 System.out.printf( "Consumer reads	%2d", buffer );
19 return buffer;
20 } // end method get
21 } // end class UnsynchronizedBuffer

Figure 23.10. SharedBufferTest sets up a producer/consumer application that uses an unsynchronized buffer.

(This item is displayed on pages 1069 - 1071 in the print version)

 1 // Fig 23.10: SharedBufferTest.java
 2 // Application shows two threads manipulating an unsynchronized buffer.
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5
 6 public class SharedBufferTest
 7 {
 8 public static void main( String[] args )
 9 {
10 // create new thread pool with two threads
11 ExecutorService application = Executors.newFixedThreadPool( 2 );
12
13 // create UnsynchronizedBuffer to store ints 
14 Buffer sharedLocation = new UnsynchronizedBuffer();
15
16 System.out.println( "Action		Value	Produced	Consumed" );
17 System.out.println( "------		-----	--------	--------
" );
18
19 // try to start producer and consumer giving each of them access
20 // to sharedLocation
21 try
22 {
23 application.execute( new Producer( sharedLocation ) );
24 application.execute( new Consumer( sharedLocation ) );
25 } // end try
26 catch ( Exception exception )
27 {
28 exception.printStackTrace();
29 } // end catch
30
31 application.shutdown(); // terminate application when threads end
32 } // end main
33 } // end class SharedBufferTest
 
Action Value Produced Consumed
------ ----- -------- --------

Producer writes 1 1
Producer writes 2 3 
Producer writes 3 6 
Consumer reads 3 3
Producer writes 4 10
Consumer reads 4 7
Producer writes 5 15
Producer writes 6 21 
Producer writes 7 28 
Consumer reads 7 14
Consumer reads 7 21
Producer writes 8 36
Consumer reads 8 29
Consumer reads 8 37
Producer writes 9 45
Producer writes 10 55 

Producer done producing.
Terminating Producer.
Consumer reads 10 47
Consumer reads 10 57
Consumer reads 10 67
Consumer reads 10 77

Consumer read values totaling 77.
Terminating Consumer.

Action Value Produced Consumed
------ ----- -------- --------

Consumer reads -1 -1
Producer writes 1 1
Consumer reads 1 0
Consumer reads 1 1
Consumer reads 1 2
Consumer reads 1 3
Consumer reads 1 4
Producer writes 2 3
Consumer reads 2 6
Producer writes 3 6
Consumer reads 3 9
Producer writes 4 10
Consumer reads 4 13
Producer writes 5 15
Producer writes 6 21 
Consumer reads 6 19

Consumer read values totaling 19.
Terminating Consumer.
Producer writes 7 28 
Producer writes 8 36 
Producer writes 9 45 
Producer writes 10 55 
Producer done producing.
Terminating Producer.
 

Class Producer (Fig. 23.7) implements the Runnable interface, allowing it to be executed in a separate thread. The constructor (lines 1114) initializes Buffer reference sharedLocation with an object created in main (line 14 of Fig. 23.10) and passed to the constructor in the parameter shared. As we will see, this is an UnsynchronizedBuffer object that implements interface Buffer without synchronizing access to the shared object. The Producer tHRead in this program executes the tasks specified in method run (lines 1739). Each iteration of the loop (lines 2135) invokes THRead method sleep (line 25) to place the Producer tHRead into the timed waiting state for a random time interval between 0 and 3 seconds. When the thread awakens, line 26 passes the value of control variable count to the Buffer object's set method to set the shared buffer's value. Line 27 keeps a total of all the values produced so far and line 28 outputs that value. When the loop completes, lines 3738 display a message indicating that the thread has finished producing data and is terminating. Next, method run terminates which indicates that the Producer completed its task. It is important to note that any method called from a thread's run method (e.g., Buffer method set) executes as part of that thread of execution. In fact, each thread has its own method call stack. This fact becomes important in Section 23.7 when we add synchronization to the producer/consumer relationship.

Class Consumer (Fig. 23.8) also implements interface Runnable, allowing the Consumer to be executed concurrently with the Producer. The constructor (lines 1114) initializes Buffer reference sharedLocation with an object that implements the Buffer interface created in main (Fig. 23.10) and passed to the constructor as the parameter shared. As we will see, this is the same UnsynchronizedBuffer object that is used to initialize the Producer objectthus, the two threads share the same object. The Consumer thread in this program performs the tasks specified in method run (lines 1739). The loop at lines 2135 loops ten times. Each iteration of the loop invokes THRead method sleep (line 26) to put the Consumer thread into the timed waiting state for between 0 and 3 seconds. Next, line 27 uses the Buffer's get method to retrieve the value in the shared buffer, then adds the value to variable sum. Line 28 displays the total of all the values consumed so far. When the loop completes, lines 3738 display a line indicating the sum of the consumed values. Then method run terminates, which indicates that the Consumer completed its task. Once both threads enter the terminated state, the program ends.

[Note: We use method sleep in method run of the Producer and Consumer classes to emphasize the fact that in multithreaded applications, it is unpredictable when each thread will perform its task and for how long it will perform the task when it has a processor. Normally, these thread-scheduling issues are the job of the computer's operating system and therefore beyod the control of the Java developer. In this program, our thread's tasks are quite simplefor the Producer, write the values 1 to 10 to the buffer, and for the Consumer, read 10 values from the buffer and add each value to variable sum. Without the sleep method call, and if the Producer executes first, given today's phenomenally fast processors, the Producer would likely complete its task before the Consumer gets a chance to execute. If the Consumer executes first, it would likely consume -1 ten times, then terminate before the Producer could produce the first real value.]

Class UnsynchronizedBuffer (Fig. 23.9) implements interface Buffer (line 4). An object of this class is shared between the Producer and the Consumer. Line 6 declares instance variable buffer and initializes it with the value -1. This value is used to demonstrate the case in which the Consumer attempts to consume a value before the Producer ever places a value in buffer. Methods set (lines 913) and get (lines 1620) do not synchronize access to field buffer. Method set simply assigns its argument to buffer (line 12), and method get simply returns the value of buffer (line 19).

Class SharedBufferTest contains method main (lines 832), which launches the application. Line 11 creates an ExecutorService with two threadsone to execute the Producer and one to execute the Consumer. Line 14 creates an UnsynchronizedBuffer object and assigns its reference to Buffer variable sharedLocation. This object stores the data that will be shared between the Producer and Consumer threads. Lines 2324 create and execute the Producer and Consumer. Note that the Producer and Consumer constructors are each passed the same Buffer object (sharedLocation), so each object is initialized with a reference to the same Buffer. These lines also implicitly launch the threads and call each Runnable's run method. Finally, line 31 calls method shutdown so that the application can terminate when the Producer and Consumer threads complete their tasks. When method main terminates (line 32), the main thread of execution enters the terminated state.

Recall from the overview of this example that we would like the Producer thread to execute first and every value produced by the Producer to be consumed exactly once by the Consumer. However, when we study the first output of Fig. 23.10, we see that the Producer writes a value three times before the Consumer reads its first value (3). Therefore, the values 1 and 2 are lost. Later, the values 5, 6 and 9 are lost, while 7 and 8 are read twice and 10 is read four times. So the first output produced an incorrect total of 77, instead of a correct total of 55. In the second output, note that the Consumer reads before the Producer ever writes a value. Also note that the Consumer has already read five times before the Producer writes the value 2. Meanwhile, the values 5, 7, 8, 9 and 10 are all lost. An incorrect output of 19 is produced. (Lines in the output where the Producer or Consumer has acted out of order are highlighted.) This example clearly demonstrates that access to a shared object by concurrent threads must be controlled carefully, for otherwise a program may produce incorrect results.

To solve the problems of lost and duplicated data, Section 23.7 presents an example in which we use a Lock and Condition methods await and signal to synchronize access to the code that manipulates the shared object, guaranteeing that each and every value will be processed once and only once. When a thread acquires a lock, no other threads can acquire that same lock until the original thread releases it.

Introduction to Computers, the Internet and the World Wide Web

Introduction to Java Applications

Introduction to Classes and Objects

Control Statements: Part I

Control Statements: Part 2

Methods: A Deeper Look

Arrays

Classes and Objects: A Deeper Look

Object-Oriented Programming: Inheritance

Object-Oriented Programming: Polymorphism

GUI Components: Part 1

Graphics and Java 2D™

Exception Handling

Files and Streams

Recursion

Searching and Sorting

Data Structures

Generics

Collections

Introduction to Java Applets

Multimedia: Applets and Applications

GUI Components: Part 2

Multithreading

Networking

Accessing Databases with JDBC

Servlets

JavaServer Pages (JSP)

Formatted Output

Strings, Characters and Regular Expressions

Appendix A. Operator Precedence Chart

Appendix B. ASCII Character Set

Appendix C. Keywords and Reserved Words

Appendix D. Primitive Types

Appendix E. (On CD) Number Systems

Appendix F. (On CD) Unicode®

Appendix G. Using the Java API Documentation

Appendix H. (On CD) Creating Documentation with javadoc

Appendix I. (On CD) Bit Manipulation

Appendix J. (On CD) ATM Case Study Code

Appendix K. (On CD) Labeled break and continue Statements

Appendix L. (On CD) UML 2: Additional Diagram Types

Appendix M. (On CD) Design Patterns

Appendix N. Using the Debugger

Inside Back Cover



Java(c) How to Program
Java How to Program (6th Edition) (How to Program (Deitel))
ISBN: 0131483986
EAN: 2147483647
Year: 2003
Pages: 615

Flylib.com © 2008-2020.
If you may any questions please contact us: flylib@qtcs.net