Section 15.8. ProducerConsumer Relationship: Circular Buffer


15.8. Producer/Consumer Relationship: Circular Buffer

Figures 15.915.10 use thread synchronization to guarantee that two threads manipulate data in a shared buffer correctly. However, the application may not perform optimally. If the two threads operate at different speeds, one will spend more (or most) of its time waiting. For example, in Fig. 15.9 we shared a single integer between the two threads. If the producer thread produces values faster than the consumer can consume them, then the producer thread waits for the consumer, because there are no other locations in memory to place the next value. Similarly, if the consumer consumes faster than the producer can produce values, the consumer waits until the producer places the next value in the shared location in memory. Even when we have threads that operate at the same relative speeds, they may become "out of sync" over a period of time, causing one of the threads to wait for the other. We cannot make assumptions about the relative speeds of concurrently executing threads. There are too many interactions that occur with the operating system, the network, the user and other components, which can cause the threads to operate at different speeds. When this happens, threads wait. When threads wait, programs can become less productive, user-interactive programs can become less responsive and network applications can suffer longer delays because the processor is not used efficiently.

To minimize the waiting for threads that share resources and operate at the same relative speeds, we can implement a circular buffer that provides extra locations in which the producer can place values (if it "gets ahead" of the consumer) and from which the consumer can retrieve those values (while it is "catching up" to the producer). Let us assume the buffer is implemented as an array. The producer and consumer work from the beginning of the array. When either thread reaches the end of the array, it simply returns to the first element of the array to perform its next task. If the producer temporarily produces values faster than the consumer can consume them, the producer can write additional values in the extra buffers (if cells are available; otherwise, the producer must, once again, wait). This enables the producer to perform its task even though the consumer is not ready to receive the current value being produced. Similarly, if the consumer consumes faster than the producer produces new values, the consumer can read additional values from the buffer (if there are any; otherwise, the consumer must, once again, wait) and thus "catch up" to the producer. This enables the consumer to perform its task even though the producer is not ready to produce additional values.

A circular buffer would be inappropriate if the producer and consumer operate at different speeds. If the consumer always executes faster than the producer, then a buffer with one location is sufficient. Additional locations would waste memory. If the producer always executes faster, a buffer with an infinite number of locations would be required to absorb the extra production.

The key to using a circular buffer is to define it with enough extra cells to handle the anticipated "extra" production. If, over a period of time, we determine that the producer often produces as many as three more values than the consumer can consume, we can define a buffer of at least three cells to handle the extra production. We do not want the buffer to be too small, because that would cause threads to wait more; we do not want the buffer to be too large, because that would waste memory.

Performance Tip 15.3

Even when using a circular buffer, it is possible that a producer thread could fill the buffer. This would force the producer thread to wait until a consumer consumes a value to free an element in the buffer. Similarly, if the buffer is empty, the consumer thread must wait until the producer produces another value. The key to using a circular buffer is to optimize the buffer size to minimize the amount of thread wait time.


Figures 15.1115.12 demonstrate a producer and a consumer accessing a circular buffer (in this case, a shared array of three elements) with synchronization. In this version of the producer/consumer relationship, the consumer consumes a value only when the array is not empty, and the producer produces a value only when the array is not full. This example reuses interface IBuffer (Fig. 15.4) and classes Producer (Fig. 15.5) and Consumer (Fig. 15.6). The statements that created and started the thread objects in the Main methods of class UnsynchronizedBufferTest in Fig. 15.8 and SynchronizedBufferTest in Fig. 15.10 now appear in module CircularBufferTest (Fig. 15.12).

Figure 15.11. CircularBuffer synchronizes access to a circular buffer containing three slots.

  1   ' Fig. 15.11: CircularBuffer.vb  2   ' A circular shared buffer for the producer/consumer relationship.  3   Imports System.Threading  4  5   ' implement the an array of shared integers with synchronization  6   Public Class CircularBuffer  7      Implements IBuffer  8      ' each array element is a buffer             9      Private buffers As Integer() = { -1, -1, -1} 10 11      ' occupiedBufferCount maintains count of occupied buffers 12      Private occupiedBufferCount As Integer = 0 13 14      ' variable that maintain read and write buffer locations 15      Private readLocation As Integer = 0                      16      Private writeLocation As Integer = 0                     17 18      ' property Buffer 19      Public Property Buffer() As Integer Implements IBuffer.Buffer 20         Get 21             ' lock this object while getting value from buffers array 22             SyncLock (Me) 23                ' if there is no data to read, place invoking thread 24                ' in WaitSleepJoin state 25                If occupiedBufferCount = 0 Then 26                   Console.Write(vbCrLf & "All buffers empty. " & _ 27                      Thread.CurrentThread.Name & " waits.") 28                   Monitor.Wait(Me) ' enter the WaitSleepJoin state 29                End If 30 31                ' obtain value at current readLocation           32                Dim readValue As Integer = buffers(readLocation) 33 34                Console.Write(vbCrLf & Thread.CurrentThread.Name & _ 35                    " reads " & buffers(readLocation) & " ") 36 37                ' just consumed a value, so decrement 38                ' number of occupied buffers          39                occupiedBufferCount -= 1              40 41                ' update readLocation for future read operation, then 42                ' add current state to output                         43                readLocation = (readLocation + 1) Mod buffers.Length  44                CreateStateOutput()                                   45 46                ' return waiting thread (if there is one) to Running state 47                Monitor.Pulse(Me) 48 49                Return readValue 50             End SyncLock 51          End Get 52          Set(ByVal value As Integer) 53              ' lock this object while setting value in buffers array 54              SyncLock (Me) 55                 ' if there are no empty locations, place invoking thread 56                 ' in WaitSleepJoin state 57                 If occupiedBufferCount = buffers.Length Then 58                    Console.Write(vbCrLf & "All buffers full. " & _ 59                       Thread.CurrentThread.Name & " waits.") 60                    Monitor.Wait(Me) ' enter the WaitSleepJoin state 61                 End If 62 63                 ' place value in writeLocation of buffers 64                 buffers(writeLocation) = value            65 66                 Console.Write(vbCrLf & Thread.CurrentThread.Name _ 67                    " writes " & buffers(writeLocation) & " ")      68 69                 ' just produced a value, so increment 70                 ' number of occupied buffers          71                 occupiedBufferCount += 1              72 73                 ' update writeLocation for future write operation,     74                 ' then add current state to output                     75                 writeLocation = (writeLocation + 1) Mod buffers.Length 76                 CreateStateOutput()                                    77 78                 ' return waiting thread (if there is one) to Running state 79                 Monitor.Pulse(Me ) 80              End SyncLock 81           End Set 82        End Property ' Buffer 83 84        ' create state output 85        Public Sub CreateStateOutput() 86           ' display first line of state information 87           Console.Write("(buffers occupied: " & _ 88              occupiedBufferCount & ")" & vbCrLf & "buffers: " ) 89 90           For i As Integer = 0 To (buffers.Length - 1 ) 91              Console.Write(" " & String.Format( "{0,2}", buffers(i)) & " " ) 92           Next (i) 93 94           Console.WriteLine() 95 96           ' display second line of state information 97           Console.Write(" " ) 98 99           For i As Integer = 0 To (buffers.Length - 1) 100             Console.Write("---- " ) 101          Next (i) 102 103          Console.WriteLine() 104 105          ' display third line of state information 106          Console.Write(" " ) 107 108          ' display readLocation (R) and writeLocation (W) indicators 109          ' below appropriate buffer locations 110          For i As Integer = 0 To (buffers.Length - 1 ) 111             If i = writeLocation And writeLocation = readLocation Then 112                Console.Write(" WR " ) 113             ElseIf i = writeLocation Then 114                Console.Write(" W " ) 115             ElseIf i = readLocation Then 116                Console.Write(" R " ) 117             Else 118                Console.Write("  " ) 119             End If 120          Next (i) 121 122        Console.WriteLine() 123     End Sub ' CreateStateOutput 124  End Class ' CircularBuffer 

Figure 15.12. Producer and consumer threads accessing a circular buffer.

  1  ' Fig. 15.12: CircularBufferTest.vb  2  ' Implementing the producer/consumer relationship with a circular buffer.  3  Imports System.Threading  4  5  Module CircularBufferTest  6     ' create producer and consumer threads and start them  7     Sub Main()  8        ' create shared object used by threads  9        Dim sharedBuffer As New CircularBuffer() 10 11        ' Random object used by each thread 12        Dim random As New Random() 13 14        ' display shared state before producer and consumer 15        ' threads begin execution 16        sharedBuffer.CreateStateOutput() 17 18        ' create Producer and Consumer objects 19        Dim producer As New Producer(sharedBuffer, random) 20        Dim consumer As New Consumer(sharedBuffer, random) 21 22        ' create threads for producer and consumer and set 23        ' delegates for each thread 24        Dim producerThread As New Thread(New ThreadStart( _ 25           AddressOf producer.Produce)) 26        producerThread.Name = "Producer" 27 28        Dim consumerThread As New Thread(New ThreadStart( _ 29           AddressOf consumer.Consume)) 30        consumerThread.Name = "Consumer" 31 32        ' start each thread 33        producerThread.Start() 34        consumerThread.Start() 35     End Sub ' Main 36  End Module ' CircularBufferTest 

 (buffers occupied: 0) buffers:  -1   -1   -1          ---- ---- ----           WR All buffers empty. Consumer waits. Producer writes 1 (buffers occupied: 1) buffers:   1   -1   -1          ---- ---- ----            R   W Consumer reads 1 (buffers occupied: 0) buffers:   1   -1   -1          ---- ---- ----                WR Producer writes 2 (buffers occupied: 1) buffers:   1    2   -1          ---- ---- ----                 R   W Consumer reads 2 (buffers occupied: 0) buffers:   1    2   -1          ---- ---- ----                     WR All buffers empty. Consumer waits. Producer writes 3 (buffers occupied: 1) buffers:   1    2    3          ---- ---- ----           W          R Consumer reads 3 (buffers occupied: 0) buffers:   1    2    3          ---- ---- ----           WR All buffers empty. Consumer waits. Producer writes 4 (buffers occupied: 1) buffers:   4    2    3          ---- ---- ----            R   W Producer writes 5 (buffers occupied: 2) buffers:   4   5    3          ---- ---- ----            R        W Consumer reads 4 (buffers occupied: 1) buffers:   4    5     3          ---- ---- ----                 R   W Producer writes 6 (buffers occupied: 2) buffers:   4    5    6          ---- ---- ----           W     R Producer writes 7 (buffers occupied: 3) buffers:   7    5    6          ---- ---- ----                WR All buffers full. Producer waits. Consumer reads 5 (buffers occupied: 2) buffers:   7    5    6          ---- ---- ----                W     R Consumer reads 6 (buffers occupied: 1) buffers:   7    5    6          ---- ---- ----            R   W Producer writes 8 (buffers occupied: 2) buffers:   7    8    6          ---- ---- ----            R        W Consumer reads 7 (buffers occupied: 1) buffers:   7    8    6          ---- ---- ----                 R   W Consumer reads 8 (buffers occupied: 0) buffers:   7    8    6          ---- ---- ----                     WR Producer writes 9 (buffers occupied: 1) buffers:   7    8   9          ---- ---- ----           W          R Producer writes 10 (buffers occupied: 2) buffers:  10    8   9          ---- ---- ----                W     R Producer done producing. Terminating Producer. Consumer reads 9 (buffers occupied: 1) buffers:  10    8   9          ---- ---- ----            R   W Consumer reads 10 (buffers occupied: 0) buffers:  10    8   9          ---- ---- ----                WR Consumer read values totaling: 55. Terminating Consumer. 



The most significant changes occur in class CircularBuffer (Fig. 15.11), which now contains four instance variables. Array buffers (line 9) is a three-element integer array that represents the circular buffer. Variable occupiedBufferCount is the condition variable that can be used to determine whether a producer can write in the circular buffer (i.e., occupiedBufferCount is less than the number of elements in array buffers) and whether a consumer can read from the circular buffer (i.e., occupiedBufferCount is greater than 0). Variable readLocation (line 15) indicates the position from which the next value can be read by a consumer. Variable writeLocation (line 16) indicates the next location in which a value can be placed by a producer.

The Set accessor (Fig. 15.11, lines 5281) of property Buffer performs the same tasks that it did in Fig. 15.9, with a few modifications. Rather than using Monitor methods Enter and Exit to acquire and release the lock on the CircularBuffer object, we use a block of code preceded by keyword SyncLock to lock the CircularBuffer object. As program control enters the SyncLock block, the currently executing thread acquires the lock (assuming the lock is currently available) on the CircularBuffer object (i.e., Me). When program control reaches End SyncLock, the thread releases the lock automatically.

Common Programming Error 15.4

When using class Monitor's Enter and Exit methods to manage an object's lock, Exit must be called explicitly to release the lock. If an exception occurs in a method before Exit can be called and the exception is not caught, the method could terminate without calling Exit. If so, the lock is not released. To avoid this error, place code that could throw exceptions in a try block, and place the call to Exit in the corresponding finally block to ensure that the lock is released.


Software Engineering Observation 15.1

Using a SyncLock block to manage the lock on a synchronized object eliminates the possibility of forgetting to relinquish the lock with a call to Monitor method Exit. Visual Basic implicitly calls Monitor method Exit when a SyncLock block terminates for any reason. Thus, even if an exception occurs in the block, the lock will be released.


The If statement in lines 5761 in the Set accessor determines whether the producer must wait (i.e., all buffers are full). If the producer thread must wait, lines 5859 output text indicating that the producer is waiting to perform its task, and line 60 invokes Monitor method Wait to place the producer thread in the WaitSleepJoin state. When execution continues at line 64 after the If statement, the value from the producer is placed in the circular buffer at location writeLocation. Next, lines 6667 output a message containing the value produced. Line 71 increments occupiedBufferCount, because there is now one more value in the buffer that the consumer can read. Then, line 75 updates writeLocation for the next call to the Set accessor of property Buffer. The output continues at line 76 by invoking method CreateStateOutput (declared in lines 85123), which outputs the number of occupied buffers, the contents of the buffers and the current writeLocation and readLocation. Finally, line 79 invokes Monitor method Pulse to indicate that a thread waiting on the CircularBuffer object (if there is a waiting thread) should transition to the Running state. Note that End SyncLock (line 80) causes the thread to release the lock on the CircularBuffer object.

The Get accessor (lines 2051) of property Buffer also performs the same tasks in this example that it did in Fig. 15.9, with a few minor modifications. Once again, we use a SyncLock block to acquire and release the lock on the CircularBuffer object, rather than using Monitor methods Enter and Exit. The If statement in lines 2529 in the Get accessor determines whether the consumer must wait (i.e., all buffers are empty). If the consumer thread must wait, lines 2627 indicate that the consumer is waiting to perform its task, and line 28 invokes Monitor method Wait to place the consumer thread in the WaitSleepJoin state. When execution continues in line 32 after the If statement, readValue is assigned the value at location readLocation in the circular buffer. Lines 3435 output the value consumed. Line 39 decrements the occupiedBufferCount, because the buffer now contains one more position in which the producer thread can place a value. Then, line 43 updates readLocation for the next call to the Get accessor of Buffer. Line 44 invokes method CreateStateOutput to output the number of occupied buffers, the contents of the buffers and the current writeLocation and readLocation. Finally, line 47 invokes method Pulse to transition the next thread waiting for the CircularBuffer object in the Running state, and line 49 returns the consumed value to the calling method.

In Fig. 15.12, line 9 now declares sharedBuffer as a CircularBuffer object, and line 16 displays the initial state of the shared buffer space. The outputs for this example include the current occupiedBufferCount, the contents of the buffers and the current writeLocation and readLocation. In the output, the letters W and R represent the current writeLocation and readLocation, respectively. Note that after the third value is placed in the third element of the buffer, the fourth value is inserted at the beginning of the array. This provides the circular buffer effect.



Visual BasicR 2005 for Programmers. DeitelR Developer Series
Visual Basic 2005 for Programmers (2nd Edition)
ISBN: 013225140X
EAN: 2147483647
Year: 2004
Pages: 435

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net