Producer/Consumer Relationship: Circular Buffer

Producer Consumer Relationship Circular Buffer

The program in Section 23.7 uses thread synchronization to guarantee that two threads manipulate data in a shared buffer correctly. However, the application may not perform optimally. If the two threads operate at different speeds, one of the threads will spend more (or most) of its time waiting. For example, in the program in Section 23.7 we shared a single integer variable between the two threads. If the producer thread produces values faster than the consumer can consume them, then the producer thread waits for the consumer, because there are no other locations in memory in which to place the next value. Similarly, if the consumer consumes values faster than the producer produces them, the consumer waits until the producer places the next value in the shared location in memory. Even when we have threads that operate at the same relative speeds, those threads may occasionally become "out of sync" over a period of time, causing one of them to wait for the other. We cannot make assumptions about the relative speeds of concurrent threadsinteractions that occur with the operating system, the network, the user and other components can cause the threads to operate at different speeds. When this happens, threads wait. When threads wait excessively, programs become less efficient, user-interactive programs become less responsive and applications suffer longer delays.

To minimize the amount of waiting time for threads that share resources and operate at the same average speeds, we can implement a circular buffer that provides extra buffer space into which the producer can place values and from which the consumer can retrieve those values. Let us assume that the buffer is implemented as an array. The producer and consumer work from the beginning of the array. When either thread reaches the end of the array, it simply returns to the first element of the array to perform its next task. If the producer temporarily produces values faster than the consumer can consume them, the producer can write additional values into the extra buffer space (if any are available). This capability enables the producer to perform its task even though the consumer is not ready to receive the current value being produced. Similarly, if the consumer consumes faster than the producer produces new values, the consumer can read additional values (if there are any) from the buffer. This enables the consumer to keep busy even though the producer is not ready to produce additional values.

Note that the circular buffer would be inappropriate if the producer and the consumer operate consistently at different speeds. If the consumer always executes faster than the producer, then a buffer containing one location is enough. Additional locations would waste memory. If the producer always executes faster, only a buffer with an infinite number of locations would be able to absorb the extra production.

The key to using a circular buffer with a producer and consumer that operate at about the same speed is to provide it with enough locations to handle the anticipated "extra" production. If, over a period of time, we determine that the producer often produces as many as three more values than the consumer can consume, we can provide a buffer of at least three cells to handle the extra production. We do not want the buffer to be too small, because that would cause threads to wait longer. On the other hand, we do not want the buffer to be too large, because that would waste memory.

Performance Tip 23.4

Even when using a circular buffer, it is possible that a producer thread could fill the buffer, which would force the producer thread to wait until a consumer consumes a value to free an element in the buffer. Similarly, if the buffer is empty at any given time, the consumer thread must wait until the producer produces another value. The key to using a circular buffer is to optimize the buffer size to minimize the amount of thread wait time.

The program in Fig. 23.13Fig. 23.14 demonstrates a producer and a consumer accessing a circular buffer (in this case, a shared array of three cells) with synchronization. In this version of the producer/consumer relationship, the consumer consumes a value only when the array is not empty and the producer produces a value only when the array is not full. The statements that created and started the thread objects in the main method of class SharedBufferTest2 (Fig. 23.12) now appear in class CircularBufferTest (Fig. 23.14).

Figure 23.13. CircularBuffer synchronizes access to a circular buffer containing three slots.

(This item is displayed on pages 1078 - 1080 in the print version)

 1 // Fig. 23.13: CircularBuffer.java
 2 // SynchronizedBuffer synchronizes access to a single shared integer.
 3 import java.util.concurrent.locks.Lock;
 4 import java.util.concurrent.locks.ReentrantLock;
 5 import java.util.concurrent.locks.Condition;
 6
 7 public class CircularBuffer implements Buffer
 8 {
 9 // Lock to control synchronization with this buffer
10 private Lock accessLock = new ReentrantLock(); 
11 
12 // conditions to control reading and writing 
13 private Condition canWrite = accessLock.newCondition();
14 private Condition canRead = accessLock.newCondition();  
15 
16 private int[] buffer = { -1, -1, -1 };
17 
18 private int occupiedBuffers = 0; // count number of buffers used
19 private int writeIndex = 0; // index to write next value
20 private int readIndex = 0; // index to read next value
21 
22 // place value into buffer
23 public void set( int value )
24 {
25 accessLock.lock(); // lock this object
26 
27 // output thread information and buffer information, then wait
28 try
29 {
30 // while no empty locations, place thread in waiting state
31 while ( occupiedBuffers == buffer.length )
32 {
33 System.out.printf( "All buffers full. Producer waits.
" );
34 canWrite.await(); // await until a buffer element is free
35 } // end while
36 
37 buffer[ writeIndex ] = value; // set new buffer value
38 
39 // update circular write index 
40 writeIndex = ( writeIndex + 1 ) % buffer.length;
41 
42 occupiedBuffers++; // one more buffer element is full
43 displayState( "Producer writes " + buffer[ writeIndex ] );
44 canRead.signal(); // signal threads waiting to read from buffer
45 } // end try
46 catch ( InterruptedException exception )
47 {
48 exception.printStackTrace();
49 } // end catch
50 finally
51 {
52 accessLock.unlock(); // unlock this object
53 } // end finally
54 } // end method set
55 
56 // return value from buffer
57 public int get()
58 {
59 int readValue = 0; // initialize value read from buffer
60 accessLock.lock(); // lock this object
61 
62 // wait until buffer has data, then read value
63 try
64 {
65 // while no data to read, place thread in waiting state
66 while ( occupiedBuffers == 0 )
67 {
68 System.out.printf( "All buffers empty. Consumer waits.
" );
69 canRead.await(); // await until a buffer element is filled
70 } // end while
71 
72 readValue = buffer[ readIndex ]; // read value from buffer
73 
74 // update circular read index 
75 readIndex = ( readIndex + 1 ) % buffer.length;
76 
77 occupiedBuffers--; // one more buffer element is empty
78 displayState( "Consumer reads " + readValue );
79 canWrite.signal(); // signal threads waiting to write to buffer
80 } // end try
81 // if waiting thread interrupted, print stack trace
82 catch ( InterruptedException exception )
83 {
84 exception.printStackTrace();
85 } // end catch
86 finally
87 {
88 accessLock.unlock(); // unlock this object
89 } // end finally
90 
91 return readValue;
92 } // end method get
93 
94 // display current operation and buffer state
95 public void displayState( String operation )
96 {
97 // output operation and number of occupied buffers
98 System.out.printf( "%s%s%d)
%s", operation,
99 " (buffers occupied: ", occupiedBuffers, "buffers: " );
100 
101 for ( int value : buffer )
102 System.out.printf( " %2d ", value ); // output values in buffer
103 
104 System.out.print( "
 " );
105 for ( int i = 0; i < buffer.length; i++ )
106 System.out.print( "---- " );
107 
108 System.out.print( "
 " );
109 for ( int i = 0; i < buffer.length; i++ )
110 {
111 if ( i == writeIndex && i == readIndex )
112 System.out.print( " WR" ); // both write and read index
113 else if ( i == writeIndex )
114 System.out.print( " W " ); // just write index
115 else if ( i == readIndex )
116 System.out.print( " R " ); // just read index
117 else
118 System.out.print( " " ); // neither index
119 } // end for
120 
121 System.out.println( "
" );
122 } // end method displayState
123 } // end class CircularBuffer

Figure 23.14. CircularBufferTest sets up a producer/consumer application and instantiates producer and consumer threads.

(This item is displayed on pages 1081 - 1084 in the print version)

 1 // Fig 23.14: CircularBufferTest.java
 2 // Application shows two threads manipulating a circular buffer.
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5
 6 public class CircularBufferTest
 7 {
 8 public static void main( String[] args )
 9 {
10 // create new thread pool with two threads
11 ExecutorService application = Executors.newFixedThreadPool( 2 );
12
13 // create CircularBuffer to store ints 
14 Buffer sharedLocation = new CircularBuffer();
15
16 try // try to start producer and consumer
17 {
18 application.execute( new Producer( sharedLocation ) );
19 application.execute( new Consumer( sharedLocation ) );
20 } // end try
21 catch ( Exception exception )
22 {
23 exception.printStackTrace();
24 } // end catch
25
26 application.shutdown();
27 } // end main
28 } // end class CircularBufferTest
 
Producer writes 1 (buffers occupied: 1)
buffers: 1 -1 -1
 ---- ---- ----
 R W

Consumer reads 1 (buffers occupied: 0)
buffers: 1 -1 -1
 ---- ---- ----
 WR

All buffers empty. Consumer waits.
Producer writes 2 (buffers occupied: 1)
buffers: 1 2 -1
 ---- ---- ----
 R W

Consumer reads 2 (buffers occupied: 0)
buffers: 1 2 -1
 ---- ---- ----
 WR

Producer writes 3 (buffers occupied: 1)
buffers: 1 2 3
 ---- ---- ----
 W R

Consumer reads 3 (buffers occupied: 0)
buffers: 1 2 3
 ---- ---- ----
 WR

Producer writes 4 (buffers occupied: 1)
buffers: 4 2 3
 ---- ---- ----
 R W
Producer writes 5 (buffers occupied: 2)
buffers: 4 5 3
 ---- ---- ----
 R W

Consumer reads 4 (buffers occupied: 1)
buffers: 4 5 3
 ---- ---- ----
 R W

Producer writes 6 (buffers occupied: 2)
buffers: 4 5 6
 ---- ---- ----
 W R

Producer writes 7 (buffers occupied: 3)
buffers: 7 5 6
 ---- ---- ----
 WR

Consumer reads 5 (buffers occupied: 2)
buffers: 7 5 6
 ---- ---- ----
 W R

Producer writes 8 (buffers occupied: 3)
buffers: 7 8 6
 ---- ---- ----
 WR

Consumer reads 6 (buffers occupied: 2)
buffers: 7 8 6
 ---- ---- ----
 R W

Consumer reads 7 (buffers occupied: 1)
buffers: 7 8 6
 ---- ---- ----
 R W

Producer writes 9 (buffers occupied: 2)
buffers: 7 8 9
 ---- ---- ----
 W R

Consumer reads 8 (buffers occupied: 1)
buffers: 7 8 9
 ---- ---- ----
 W R

Consumer reads 9 (buffers occupied: 0)
buffers: 7 8 9
 ---- ---- ----
 WR

Producer writes 10 (buffers occupied: 1)
buffers: 10 8 9
 ---- ---- ----
 R W

Producer done producing.
Terminating Producer.
Consumer reads 10 (buffers occupied: 0)
buffers: 10 8 9
 ---- ---- ----
 WR

Consumer read values totaling: 55.
Terminating Consumer.
 

The significant changes to the example in Section 23.7 occur in CircularBuffer (Fig. 23.13), which replaces SynchronizedBuffer (Fig. 23.11). Line 10 creates a new ReentrantLock object and assigns its reference to Lock variable accessLock. The ReentrantLock is created without a fairness policy because we have only two threads in this example and only one will ever be waiting. Lines 1314 create two Conditions using Lock method newCondition. Condition canWrite contains a queue for threads waiting while the buffer is full. If the buffer is full, the Producer calls method await on this Conditionwhen the Consumer frees space in a full buffer, it calls method signal on this Condition. Condition canRead contains a queue for threads waiting while the buffer is empty. If the buffer is empty, the Consumer calls method await on this Conditionwhen the Producer writes to the buffer, it calls method signal on this Condition. Array buffer (line 16) is a three-element integer array that represents the circular buffer. Variable occupiedBuffers (line 18) counts the number of elements in buffer that are filled with data available to be read. When occupiedBuffers is 0, there is no data in the circular buffer and the Consumer must waitwhen occupiedBuffers is 3 (the size of the circular buffer), the circular buffer is full and the Producer must wait. Variable writeIndex (line 19) indicates the next location in which a value can be placed by a Producer. Variable readIndex (line 20) indicates the position from which the next value can be read by a Consumer.

CircularBuffer method set (lines 2354) performs the same tasks that it did in Fig. 23.11, with a few modifications. The while loop at lines 3135 determines whether the Producer must wait (i.e., all buffers are full). If so, line 33 indicates that the Producer is waiting to perform its task. Then line 34 invokes Condition method await to place the Producer tHRead in the waiting state on the canWrite condition variable. When execution eventually continues at line 37 after the while loop, the value written by the Producer is placed in the circular buffer at location writeIndex. Then line 40 updates writeIndex for the next call to CircularBuffer method set. This line is the key to the circularity of the buffer. When writeIndex is incremented past the end of the buffer, this line sets it to 0. Line 42 increments occupiedBuffers, because there is now at least one value in the buffer that the Consumer can read. Next, line 43 invokes method displayState to update the output with the value produced, the number of occupied buffers, the contents of the buffers and the current writeIndex and readIndex. Line 44 invokes Condition method signal to indicate that a Consumer thread waiting on the canRead condition variable (if there is a waiting thread) should transition to the runnable state. Line 52 releases accessLock by calling method unlock inside a finally block.

Method get (lines 5792) of class CircularBuffer also performs the same tasks as it did in Fig. 23.11, with a few minor modifications. The while loop at lines 6670 determines whether the Consumer must wait (i.e., all buffers are empty). If the Consumer thread must wait, line 68 updates the output to indicate that the Consumer is waiting to perform its task. Then line 69 invokes Condition method await to place the current thread in the waiting state on the canRead condition variable. When execution eventually continues at line 72 after a signal call from the Producer, readValue is assigned the value at location readIndex in the circular buffer. Then line 75 updates readIndex for the next call to CircularBuffer method get. This line and line 40 create the circular effect of the buffer. Line 77 decrements the occupiedBuffers, because there is at least one open position in the buffer in which the Producer thread can place a value. Line 78 invokes method displayState to update the output with the consumed value, the number of occupied buffers, the contents of the buffers and the current writeIndex and readIndex. Line 79 invokes Condition method signal to transition the thread waiting to write into the CircularBuffer object into the runnable state. Line 88 releases accessLock inside a finally block to guarantee that the lock is released. Then line 91 returns the consumed value to the calling method.

Method displayState (lines 95122) outputs the state of the application. Lines 101102 output the current buffers. Line 102 uses method printf with a %2d format specifier to print the contents of each buffer with a leading space if it is a single digit. Lines 109119 output the current writeIndex and readIndex with the letters W and R respectively.

Class CircularBufferTest (Fig. 23.14) contains the main method that launches the application. Line 11 creates the ExecutorService with two threads, and line 14 creates a CircularBuffer object and assigns its reference to Buffer variable sharedLocation. Lines 1819 execute the Producer and Consumer. Line 26 calls method shutdown to end the application when the Producer and Consumer complete their tasks.

Each time the Producer writes a value or the Consumer reads a value, the program outputs the action performed (a read or a write) along with the contents of the buffer and the location of the write index and read index. In this output, the Producer first writes the value 1. The buffer then contains the value 1 in the first slot and the value -1 (the default value) in the other two slots. The write index is updated to the second slot, while the read index stays at the first slot. Next, the Consumer reads 1. The buffer contains the same values, but the read index has been updated to the second slot. The Consumer then tries to read again, but the buffer is empty and the Consumer is forced to wait. Note that only once in this execution of the program was it necessary for either thread to wait.

Introduction to Computers, the Internet and the World Wide Web

Introduction to Java Applications

Introduction to Classes and Objects

Control Statements: Part I

Control Statements: Part 2

Methods: A Deeper Look

Arrays

Classes and Objects: A Deeper Look

Object-Oriented Programming: Inheritance

Object-Oriented Programming: Polymorphism

GUI Components: Part 1

Graphics and Java 2D™

Exception Handling

Files and Streams

Recursion

Searching and Sorting

Data Structures

Generics

Collections

Introduction to Java Applets

Multimedia: Applets and Applications

GUI Components: Part 2

Multithreading

Networking

Accessing Databases with JDBC

Servlets

JavaServer Pages (JSP)

Formatted Output

Strings, Characters and Regular Expressions

Appendix A. Operator Precedence Chart

Appendix B. ASCII Character Set

Appendix C. Keywords and Reserved Words

Appendix D. Primitive Types

Appendix E. (On CD) Number Systems

Appendix F. (On CD) Unicode®

Appendix G. Using the Java API Documentation

Appendix H. (On CD) Creating Documentation with javadoc

Appendix I. (On CD) Bit Manipulation

Appendix J. (On CD) ATM Case Study Code

Appendix K. (On CD) Labeled break and continue Statements

Appendix L. (On CD) UML 2: Additional Diagram Types

Appendix M. (On CD) Design Patterns

Appendix N. Using the Debugger

Inside Back Cover

show all menu



Java(c) How to Program
Java How to Program (6th Edition) (How to Program (Deitel))
ISBN: 0131483986
EAN: 2147483647
Year: 2003
Pages: 615
Similar book on Amazon

Flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net