Producer/Consumer Relationship with Synchronization

Producer Consumer Relationship with Synchronization

The application in Fig. 23.11 and Fig. 23.12 demonstrates a producer and a consumer accessing a shared buffer with synchronization. In this case, the consumer correctly consumes only after the producer produces a value, and the producer correctly produces a new value only after the consumer consumes the previous value produced. We reuse interface Buffer (Fig. 23.6) and use classes Producer (Fig. 23.7 modified to remove line 28) and Consumer (Fig. 23.8 modified to remove line 28) from the example in Section 23.6. This approach enables us to demonstrate that the threads accessing the shared object are unaware that they are being synchronized. The code that performs the synchronization is placed in the set and get methods of class SynchronizedBuffer (Fig. 23.11), which implements interface Buffer (line 7). Thus, the Producer's and Consumer's run methods simply call the shared object's set and get methods, as in the example in Section 23.6.

Figure 23.11. SynchronizedBuffer synchronizes access to a shared integer.

(This item is displayed on pages 1071 - 1073 in the print version)

 1 // Fig. 23.11: SynchronizedBuffer.java
 2 // SynchronizedBuffer synchronizes access to a single shared integer.
 3 import java.util.concurrent.locks.Lock;
 4 import java.util.concurrent.locks.ReentrantLock;
 5 import java.util.concurrent.locks.Condition;
 6
 7 public class SynchronizedBuffer implements Buffer
 8 {
 9 // Lock to control synchronization with this buffer
10 private Lock accessLock = new ReentrantLock(); 
11 
12 // conditions to control reading and writing 
13 private Condition canWrite = accessLock.newCondition();
14 private Condition canRead = accessLock.newCondition(); 
15 
16 private int buffer = -1; // shared by producer and consumer threads
17 private boolean occupied = false; // whether buffer is occupied
18 
19 // place int value into buffer
20 public void set( int value )
21 {
22 accessLock.lock(); // lock this object
23 
24 // output thread information and buffer information, then wait
25 try
26 {
27 // while buffer is not empty, place thread in waiting state
28 while ( occupied )
29 {
30 System.out.println( "Producer tries to write." );
31 displayState( "Buffer full. Producer waits." );
32 canWrite.await(); // wait until buffer is empty
33 } // end while
34 
35 buffer = value; // set new buffer value
36 
37 // indicate producer cannot store another value
38 // until consumer retrieves current buffer value
39 occupied = true;
40 
41 displayState( "Producer writes " + buffer );
42 
43 // signal thread waiting to read from buffer
44 canRead.signal(); 
45 } // end try
46 catch ( InterruptedException exception )
47 {
48 exception.printStackTrace();
49 } // end catch
50 finally
51 {
52 accessLock.unlock(); // unlock this object
53 } // end finally
54 } // end method set
55 
56 // return value from buffer
57 public int get()
58 {
59 int readValue = 0; // initialize value read from buffer
60 accessLock.lock(); // lock this object
61 
62 // output thread information and buffer information, then wait
63 try
64 {
65 // while no data to read, place thread in waiting state
66 while ( !occupied )
67 {
68 System.out.println( "Consumer tries to read." );
69 displayState( "Buffer empty. Consumer waits." );
70 canRead.await(); // wait until buffer is full
71 } // end while
72 
73 // indicate that producer can store another value
74 // because consumer just retrieved buffer value
75 occupied = false;
76 
77 readValue = buffer; // retrieve value from buffer
78 displayState( "Consumer reads " + readValue );
79 
80 // signal thread waiting for buffer to be empty
81 canWrite.signal(); 
82 } // end try
83 // if waiting thread interrupted, print stack trace
84 catch ( InterruptedException exception )
85 {
86 exception.printStackTrace();
87 } // end catch
88 finally
89 {
90 accessLock.unlock(); // unlock this object
91 } // end finally
92 
93 return readValue;
94 } // end method get
95 
96 // display current operation and buffer state
97 public void displayState( String operation )
98 {
99 System.out.printf( "%-40s%d		%b

", operation, buffer,
100 occupied );
101 } // end method displayState
102 } // end class SynchronizedBuffer

Figure 23.12. SharedBufferTest2 sets up a producer/consumer application that uses a synchronized buffer.

(This item is displayed on pages 1075 - 1076 in the print version)

 1 // Fig 23.12: SharedBufferTest2.java
 2 // Application shows two threads manipulating a synchronized buffer.
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5
 6 public class SharedBufferTest2
 7 {
 8 public static void main( String[] args )
 9 {
10 // create new thread pool with two threads
11 ExecutorService application = Executors.newFixedThreadPool( 2 );
12
13 // create SynchronizedBuffer to store ints 
14 Buffer sharedLocation = new SynchronizedBuffer();
15
16 System.out.printf( "%-40s%s		%s
%-40s%s

", "Operation",
17 "Buffer", "Occupied", "---------", "------		--------" );
18
19 try // try to start producer and consumer
20 {
21 application.execute( new Producer( sharedLocation ) );
22 application.execute( new Consumer( sharedLocation ) );
23 } // end try
24 catch ( Exception exception )
25 {
26 exception.printStackTrace();
27 } // end catch
28
29 application.shutdown();
30 } // end main
31 } // end class SharedBufferTest2
 
Operation Buffer Occupied
--------- ------ --------

Producer writes 1 1 true

Producer tries to write. 
Buffer full. Producer waits. 1 true

Consumer reads 1 1 false

Producer writes 2 2 true

Producer tries to write. 
Buffer full. Producer waits. 2 true

Consumer reads 2 2 false

Producer writes 3 3 true

Consumer reads 3 3 false

Producer writes 4 4 true

Consumer reads 4 4 false

Consumer tries to read. 
Buffer empty. Consumer waits. 4 false

Producer writes 5 5 true

Consumer reads 5 5 false

Consumer tries to read. 
Buffer empty. Consumer waits. 5 false

Producer writes 6 6 true

Consumer reads 6 6 false

Producer writes 7 7 true

Consumer reads 7 7 false

Producer writes 8 8 true

Consumer reads 8 8 false

Producer writes 9 9 true

Consumer reads 9 9 false

Producer writes 10 10 true


Producer done producing.
Terminating Producer.
Consumer reads 10 10 false


Consumer read values totaling 55.
Terminating Consumer.
 

Class SynchronizedBuffer (Fig. 23.11) contains five fields. Line 10 creates a new object of type ReentrantLock and assigns its reference to Lock variable accessLock. The ReentrantLock is created without the fairness policy because only a single Producer or Consumer will be waiting to acquire the Lock in this example. Lines 1314 create two Conditions using Lock method newCondition. Condition canWrite contains a queue for threads waiting while the buffer is full (i.e., there is data in the buffer that the Consumer has not read yet). If the buffer is full, the Producer calls method await on this Condition. When the Consumer reads data from a full buffer, it calls method signal on this Condition. Condition canRead contains a queue for threads waiting while the buffer is empty (i.e., there is no data in the buffer for the Consumer to read). If the buffer is empty, the Consumer calls method await on this Condition. When the Producer writes to the empty buffer, it calls method signal on this Condition. The int buffer (line 16) holds the shared data, and the boolean variable occupied (line 17) keeps track of whether the buffer currently holds data (that the Consumer should read) or not.

Line 22 in method set calls the lock method of the SynchronizedBuffer's accessLock. If the lock is available (i.e., no other thread has acquired this lock), method lock will return immediately (this thread now owns the lock) and the thread will continue. If the lock is unavailable (i.e., the lock is held by another thread), this method will wait until the lock is released by the other thread. After the lock is acquired, the try block in lines 2545 executes. Line 28 tests occupied to determine whether the buffer is full. If it is, lines 3031 output that the thread will wait. Line 32 calls Condition method await on the canWrite condition variable which will temporarily release the SynchronizedBuffer's lock and wait for a signal from the Consumer that the buffer is available for writing. When the buffer is available for writing, the method proceeds, writing to the buffer (line 35), setting occupied to true (line 39) and outputting that the producer wrote a value. Line 44 calls Condition method signal on condition variable canRead to notify the waiting Consumer that the buffer has new data to be read. Line 52 calls method unlock within a finally block to release the lock and allow the Consumer to proceed.

Common Programming Error 23.3

Place calls to Lock method unlock in a finally block. If an exception is thrown, unlock must still be called or deadlock could occur.

Line 60 of method get (lines 5794) calls method lock to acquire the lock for this object. This method will wait until the lock is available. Once the lock is acquired, line 66 tests whether occupied is false, indicating that the buffer has no data. If the buffer is empty, line 70 calls method await on condition variable canRead. Recall that method signal is called on variable canRead in the set method (line 44). When the condition variable is signaled, the get method continues. Line 75 sets occupied to false, line 77 stores the value of buffer in readValue and line 78 outputs the readValue. Then line 81 signals the condition variable canWrite. This will awaken the Producer if it is indeed waiting for the buffer to be emptied. Line 90 calls method unlock in a finally block to release the lock, and line 93 returns the value of the buffer to the calling method.

Software Engineering Observation 23.2

Always invoke method await in a loop that tests an appropriate condition. It is possible that a thread will reenter the runnable state before the condition it was waiting on is satisfied. Testing the condition again ensures that the thread will not erroneously execute if it was signaled early.

Common Programming Error 23.4

Forgetting to signal a thread that is waiting for a condition is a logic error. The thread will remain in the waiting state, which will prevent the thread from doing any further work. Such waiting can lead to indefinite postponement or deadlock.

Class SharedBufferTest2 (Fig. 23.12) is similar to class SharedBufferTest (Fig. 23.10). SharedBufferTest2 contains method main (lines 830), which launches the application. Line 11 creates an ExecutorService with two threads to run the Producer and Consumer. Line 14 creates a SynchronizedBuffer object and assigns its reference to Buffer variable sharedLocation. This object stores the data that will be shared between the Producer and Consumer tHReads. Lines 1617 display the column heads for the output. Lines 2122 execute a Producer and a Consumer. Finally, line 29 calls method shutdown to end the application when the Producer and Consumer complete their tasks. When method main ends (line 30), the main thread of execution terminates.

Study the outputs in Fig. 23.12. Observe that every integer produced is consumed exactly onceno values are lost, and no values are consumed more than once. The synchronization and condition variables ensure that the Producer and Consumer cannot perform their tasks unless it is their turn. The Producer must go first, the Consumer must wait if the Producer has not produced since the Consumer last consumed, and the Producer must wait if the Consumer has not yet consumed the value that the Producer most recently produced. Execute this program several times to confirm that every integer produced is consumed exactly once. In the sample output, note the lines indicating when the Producer and Consumer must wait to perform their respective tasks.

Introduction to Computers, the Internet and the World Wide Web

Introduction to Java Applications

Introduction to Classes and Objects

Control Statements: Part I

Control Statements: Part 2

Methods: A Deeper Look

Arrays

Classes and Objects: A Deeper Look

Object-Oriented Programming: Inheritance

Object-Oriented Programming: Polymorphism

GUI Components: Part 1

Graphics and Java 2D™

Exception Handling

Files and Streams

Recursion

Searching and Sorting

Data Structures

Generics

Collections

Introduction to Java Applets

Multimedia: Applets and Applications

GUI Components: Part 2

Multithreading

Networking

Accessing Databases with JDBC

Servlets

JavaServer Pages (JSP)

Formatted Output

Strings, Characters and Regular Expressions

Appendix A. Operator Precedence Chart

Appendix B. ASCII Character Set

Appendix C. Keywords and Reserved Words

Appendix D. Primitive Types

Appendix E. (On CD) Number Systems

Appendix F. (On CD) Unicode®

Appendix G. Using the Java API Documentation

Appendix H. (On CD) Creating Documentation with javadoc

Appendix I. (On CD) Bit Manipulation

Appendix J. (On CD) ATM Case Study Code

Appendix K. (On CD) Labeled break and continue Statements

Appendix L. (On CD) UML 2: Additional Diagram Types

Appendix M. (On CD) Design Patterns

Appendix N. Using the Debugger

Inside Back Cover



Java(c) How to Program
Java How to Program (6th Edition) (How to Program (Deitel))
ISBN: 0131483986
EAN: 2147483647
Year: 2003
Pages: 615

Flylib.com © 2008-2020.
If you may any questions please contact us: flylib@qtcs.net