15.8. Producer/Consumer Relationship: Circular BufferFigures 15.915.10 use thread synchronization to guarantee that two threads manipulate data in a shared buffer correctly. However, the application may not perform optimally. If the two threads operate at different speeds, one will spend more (or most) of its time waiting. For example, in Fig. 15.9 we shared a single integer between the two threads. If the producer thread produces values faster than the consumer can consume them, then the producer thread waits for the consumer, because there are no other locations in memory to place the next value. Similarly, if the consumer consumes faster than the producer can produce values, the consumer waits until the producer places the next value in the shared location in memory. Even when we have threads that operate at the same relative speeds, they may become "out of sync" over a period of time, causing one of the threads to wait for the other. We cannot make assumptions about the relative speeds of concurrently executing threads. There are too many interactions that occur with the operating system, the network, the user and other components, which can cause the threads to operate at different speeds. When this happens, threads wait. When threads wait, programs can become less productive, user-interactive programs can become less responsive and network applications can suffer longer delays because the processor is not used efficiently. To minimize the waiting for threads that share resources and operate at the same relative speeds, we can implement a circular buffer that provides extra locations in which the producer can place values (if it "gets ahead" of the consumer) and from which the consumer can retrieve those values (while it is "catching up" to the producer). Let us assume the buffer is implemented as an array. The producer and consumer work from the beginning of the array. When either thread reaches the end of the array, it simply returns to the first element of the array to perform its next task. If the producer temporarily produces values faster than the consumer can consume them, the producer can write additional values in the extra buffers (if cells are available; otherwise, the producer must, once again, wait). This enables the producer to perform its task even though the consumer is not ready to receive the current value being produced. Similarly, if the consumer consumes faster than the producer produces new values, the consumer can read additional values from the buffer (if there are any; otherwise, the consumer must, once again, wait) and thus "catch up" to the producer. This enables the consumer to perform its task even though the producer is not ready to produce additional values. A circular buffer would be inappropriate if the producer and consumer operate at different speeds. If the consumer always executes faster than the producer, then a buffer with one location is sufficient. Additional locations would waste memory. If the producer always executes faster, a buffer with an infinite number of locations would be required to absorb the extra production. The key to using a circular buffer is to define it with enough extra cells to handle the anticipated "extra" production. If, over a period of time, we determine that the producer often produces as many as three more values than the consumer can consume, we can define a buffer of at least three cells to handle the extra production. We do not want the buffer to be too small, because that would cause threads to wait more; we do not want the buffer to be too large, because that would waste memory. Performance Tip 15.3
Figures 15.1115.12 demonstrate a producer and a consumer accessing a circular buffer (in this case, a shared array of three elements) with synchronization. In this version of the producer/consumer relationship, the consumer consumes a value only when the array is not empty, and the producer produces a value only when the array is not full. This example reuses interface IBuffer (Fig. 15.4) and classes Producer (Fig. 15.5) and Consumer (Fig. 15.6). The statements that created and started the thread objects in the Main methods of class UnsynchronizedBufferTest in Fig. 15.8 and SynchronizedBufferTest in Fig. 15.10 now appear in module CircularBufferTest (Fig. 15.12). Figure 15.11. CircularBuffer synchronizes access to a circular buffer containing three slots.
Figure 15.12. Producer and consumer threads accessing a circular buffer.
The most significant changes occur in class CircularBuffer (Fig. 15.11), which now contains four instance variables. Array buffers (line 9) is a three-element integer array that represents the circular buffer. Variable occupiedBufferCount is the condition variable that can be used to determine whether a producer can write in the circular buffer (i.e., occupiedBufferCount is less than the number of elements in array buffers) and whether a consumer can read from the circular buffer (i.e., occupiedBufferCount is greater than 0). Variable readLocation (line 15) indicates the position from which the next value can be read by a consumer. Variable writeLocation (line 16) indicates the next location in which a value can be placed by a producer. The Set accessor (Fig. 15.11, lines 5281) of property Buffer performs the same tasks that it did in Fig. 15.9, with a few modifications. Rather than using Monitor methods Enter and Exit to acquire and release the lock on the CircularBuffer object, we use a block of code preceded by keyword SyncLock to lock the CircularBuffer object. As program control enters the SyncLock block, the currently executing thread acquires the lock (assuming the lock is currently available) on the CircularBuffer object (i.e., Me). When program control reaches End SyncLock, the thread releases the lock automatically. Common Programming Error 15.4
Software Engineering Observation 15.1
The If statement in lines 5761 in the Set accessor determines whether the producer must wait (i.e., all buffers are full). If the producer thread must wait, lines 5859 output text indicating that the producer is waiting to perform its task, and line 60 invokes Monitor method Wait to place the producer thread in the WaitSleepJoin state. When execution continues at line 64 after the If statement, the value from the producer is placed in the circular buffer at location writeLocation. Next, lines 6667 output a message containing the value produced. Line 71 increments occupiedBufferCount, because there is now one more value in the buffer that the consumer can read. Then, line 75 updates writeLocation for the next call to the Set accessor of property Buffer. The output continues at line 76 by invoking method CreateStateOutput (declared in lines 85123), which outputs the number of occupied buffers, the contents of the buffers and the current writeLocation and readLocation. Finally, line 79 invokes Monitor method Pulse to indicate that a thread waiting on the CircularBuffer object (if there is a waiting thread) should transition to the Running state. Note that End SyncLock (line 80) causes the thread to release the lock on the CircularBuffer object. The Get accessor (lines 2051) of property Buffer also performs the same tasks in this example that it did in Fig. 15.9, with a few minor modifications. Once again, we use a SyncLock block to acquire and release the lock on the CircularBuffer object, rather than using Monitor methods Enter and Exit. The If statement in lines 2529 in the Get accessor determines whether the consumer must wait (i.e., all buffers are empty). If the consumer thread must wait, lines 2627 indicate that the consumer is waiting to perform its task, and line 28 invokes Monitor method Wait to place the consumer thread in the WaitSleepJoin state. When execution continues in line 32 after the If statement, readValue is assigned the value at location readLocation in the circular buffer. Lines 3435 output the value consumed. Line 39 decrements the occupiedBufferCount, because the buffer now contains one more position in which the producer thread can place a value. Then, line 43 updates readLocation for the next call to the Get accessor of Buffer. Line 44 invokes method CreateStateOutput to output the number of occupied buffers, the contents of the buffers and the current writeLocation and readLocation. Finally, line 47 invokes method Pulse to transition the next thread waiting for the CircularBuffer object in the Running state, and line 49 returns the consumed value to the calling method. In Fig. 15.12, line 9 now declares sharedBuffer as a CircularBuffer object, and line 16 displays the initial state of the shared buffer space. The outputs for this example include the current occupiedBufferCount, the contents of the buffers and the current writeLocation and readLocation. In the output, the letters W and R represent the current writeLocation and readLocation, respectively. Note that after the third value is placed in the third element of the buffer, the fourth value is inserted at the beginning of the array. This provides the circular buffer effect. |