Section 15.6. ProducerConsumer Relationship without Thread Synchronization


15.6. Producer/Consumer Relationship without Thread Synchronization

In a producer/consumer relationship, the producer portion of an application generates data and the consumer portion of the application uses that data. In a multithreaded producer/consumer relationship, a producer thread calls a produce method to generate data and place it in a shared region of memory called a buffer. A consumer thread calls a consume method to read the data. If the producer wishes to put the next data in the buffer but determines that the consumer has not yet read the previous data from the buffer, the producer thread should call Wait. Otherwise, the consumer would never see the previous data, which would be lost to that application. When the consumer thread reads the data, it should call Pulse to allow a waiting producer to proceed, since there is now free space in the buffer. If a consumer thread finds the buffer empty or finds that the previous data has already been read, the consumer should call Wait. Otherwise, the consumer might read "garbage" from the buffer or might process a previous data item more than onceeach of these possibilities results in a logic error in the application. When the producer places the next data into the buffer, the producer should call Pulse to allow the consumer thread to proceed and read that data.

Let us consider how logic errors can arise if we do not synchronize access among multiple threads manipulating shared data. Consider a producer/consumer relationship in which a producer thread writes a sequence of numbers (we use 110) into a shared buffera memory location shared between multiple threads. The consumer thread reads this data from the shared buffer, then displays the data. We display in the program's output the values that the producer writes (produces) and the consumer reads (consumes). Figures 15.415.8 demonstrate a producer thread and a consumer thread accessing a single shared Integer variable without any synchronization. The producer thread writes to the variable; the consumer thread reads from it. We would like each value the producer thread writes to the shared variable to be consumed exactly once by the consumer thread. However, the threads in this example are not synchronized. Therefore, data can be lost if the producer places new data in the variable before the consumer consumes the previous data. Also, data can be incorrectly repeated if the consumer consumes data again before the producer produces the next value. If the consumer attempts to read before the producer produces the first value, the consumer reads garbage. To show these possibilities, the consumer thread in the example keeps a total of all the values it reads. The producer thread produces values from 1 to 10. If the consumer reads each value produced once and only once, the total will be 55. However, when you execute this program several times, you'll see that the total is rarely, if ever, 55. Also, to emphasize our point, the producer and consumer threads in the example each sleep for random intervals of up to three seconds between performing their tasks. Thus, we do not know exactly when the producer thread will attempt to write a new value, nor do we know when the consumer thread will attempt to read a value.

Figure 15.4. IBuffer interface used in producer/consumer examples.

 1  ' Fig. 15.4: IBuffer.vb 2  ' Interface for a shared Integer buffer. 3  Public Interface IBuffer 4     Property Buffer() As Integer ' property to access the buffer 5  End Interface ' IBuffer 

The program consists of interface IBuffer (Fig. 15.4), three classesProducer (Fig. 15.5), Consumer (Fig. 15.6), UnsynchronizedBuffer (Fig. 15.7), and one module UnsynchronizedBufferTest (Fig. 15.8). Interface IBuffer declares an Integer property called Buffer. Any implementation of IBuffer must provide a Get and a Set accessor for this property to allow the producer and consumer to access the shared data stored in the buffer.

Figure 15.5. Producer represents the producer thread in a producer/consumer relationship.

  1  ' Fig. 15.5: Producer.vb  2  ' Producer produces ten integer values in the shared buffer.  3  Imports System.Threading  4  5  ' class Producer's Produce method controls a thread that  6  ' stores values from 1 to 10 in sharedLocation  7  Public Class Producer  8     Private sharedLocation As IBuffer  9     Private randomSleepTime As Random 10 11     ' constructor 12     Public Sub New(ByVal sharedBuffer As IBuffer, ByVal random As Random) 13        sharedLocation = sharedBuffer 14        randomSleepTime = random 15     End Sub ' New 16 17     ' store values 1-10 in object sharedLocation 18     Public Sub Produce() 19        ' sleep for random interval up to 3000 milliseconds 20        ' then set sharedLocation's Buffer property 21        For count As Integer = 1 To 10 22           Thread.Sleep(randomSleepTime.Next(1, 3001)) 23           sharedLocation.Buffer = count               24        Next count 25 26        Console.WriteLine(Thread.CurrentThread.Name & " done producing." & _ 27           vbCrLf & "Terminating " & Thread.CurrentThread.Name & ".")        28     End Sub ' Produce 29  End Class ' Producer 

Figure 15.6. Consumer represents the consumer thread in a producer/consumer relationship.

  1  ' Fig. 15.6: Consumer.vb  2  ' Consumer consumes ten integer values from the shared buffer.  3  Imports System.Threading  4  5  ' class Consumer's Consume method controls a thread that  6  ' loops ten times and reads a value from sharedLocation  7  Public Class Consumer  8     Private sharedLocation As IBuffer  9     Private randomSleepTime As Random 10 11     ' constructor 12     Public Sub New(ByVal sharedBuffer As IBuffer, ByVal random As Random) 13        sharedLocation = sharedBuffer 14        randomSleepTime = random 15     End Sub ' New 16 17     ' read sharedLocation's value ten times 18     Public Sub Consume() 19        Dim sum As Integer = 0 20 21        ' sleep for random interval up to 3000 milliseconds then 22        ' add sharedLocation's Buffer property value to sum 23        For count As Integer = 1 To 10 24           Thread.Sleep(randomSleepTime.Next(1, 3001)) 25           sum += sharedLocation.Buffer ' consume      26        Next count 27 28        Console.WriteLine(Thread.CurrentThread.Name & _       29           " read values totaling: " & sum & "." & vbCrLf & _ 30           "Terminating " & Thread.CurrentThread.Name & ".")  31     End Sub ' Consume 32  End Class ' Consumer 

Figure 15.7. UnsynchronizedBuffer maintains the shared integer variable accessed by a producer thread and a consumer thread via property Buffer.

  1  ' Fig. 15.7: UnsynchronizedBuffer.vb  2  ' An unsynchronized shared buffer implementation.  3  Imports System.Threading  4  5  ' this class represents a single shared Integer  6  Public Class UnsynchronizedBuffer  7     Implements IBuffer  8  9     ' buffer shared by producer and consumer threads 10     Private bufferValue As Integer = -1              11 12     ' property Buffer 13     Public Property Buffer() As Integer Implements IBuffer.Buffer 14        Get                                                        15           Console.WriteLine(Thread.CurrentThread.Name & _         16              " reads " & bufferValue)                             17           Return bufferValue                                      18        End Get                                                    19        Set(ByVal value As Integer)                                20           Console.WriteLine(Thread.CurrentThread.Name & _         21              " writes " & value)                                  22           bufferValue = value                                     23        End Set                                                    24     End Property ' Buffer                                         25  End Class ' UnsynchronizedBuffer 

Figure 15.8. Producer and consumer threads accessing a shared object without synchronization.

  1  ' Fig. 15.8: UnsynchronizedBufferTest.vb  2  ' Showing multiple threads modifying a shared object  3  ' without synchronization.  4  Imports System.Threading  5  6  ' this module creates producer and consumer threads  7  Module UnsynchronizedBufferTest  8     ' create producer and consumer threads and start them  9     Sub Main() 10        ' create shared object used  by threads      11        Dim sharedBuffer As New UnsynchronizedBuffer 12 13        ' Random object used by each thread 14        Dim random As New Random 15 16        ' create  Producer and Consumer objects 17        Dim producer As New Producer(sharedBuffer, random) 18        Dim consumer As New Consumer(sharedBuffer, random) 19 20        ' create threads for producer and consumer and set 21        ' delegates for each thread 22        Dim producerThread As New Thread(New ThreadStart( _ 23           AddressOf producer.Produce))                     24        producerThread.Name = "Producer"                    25 26        Dim consumerThread As New Thread(New ThreadStart( _ 27           AddressOf consumer.Consume))                     28        consumerThread.Name = "Consumer"                    29 30        ' start each thread 31        producerThread.Start() 32        consumerThread.Start() 33     End Sub ' Main 34  End Module ' UnsynchronizedBufferTest 

 Consumer reads -1 Producer writes 1 Consumer reads 1 Producer writes 2 Consumer reads 2 Consumer reads 2 Producer writes 3 Consumer reads 3 Consumer reads 3 Producer writes 4 Consumer reads 4 Producer writes 5 Consumer reads 5 Consumer reads 5 Producer writes 6 Consumer reads 6 Consumer read values totaling: 30. Terminating Consumer. Producer writes 7 Producer writes 8 Producer writes 9 Producer writes 10 Producer done producing. Terminating Producer. 



Producer writes 1 Producer writes 2 Consumer reads 2 Consumer reads 2 Producer writes 3 Producer writes 4 Consumer reads 4 Consumer reads 4 Producer writes 5 Consumer reads 5 Producer writes 6 Consumer reads 6 Producer writes 7 Consumer reads 7 Producer writes 8 Consumer reads 8 Producer writes 9 Producer writes 10 Producer done producing. Terminating Producer. Consumer reads 10 Consumer reads 10 Consumer read values totaling: 58. Terminating Consumer. 



Class Producer (Figure 15.5) consists of instance variable sharedLocation (line 8) of type IBuffer, instance variable randomSleepTime (line 9) of type Random, a constructor (lines 1215) to initialize the instance variables and a Produce method (lines 1828). The constructor initializes instance variable sharedLocation to refer to the IBuffer object received from method Main as the parameter sharedBuffer. The producer thread in this program executes the tasks specified in method Produce of class Producer. The For statement in method Produce (lines 2124) loops ten times. Each iteration of the loop first invokes THRead method Sleep to place the producer thread in the WaitSleepJoin state for a random time interval between 0 and 3 seconds. When the thread awakens, line 23 assigns the value of control variable count to sharedLocation's Buffer property. When the loop completes, lines 2627 display a line of text in the console window indicating that the thread has finished producing data and the thread is terminating. The Produce method then terminates, and the producer thread enters the Stopped state.

Class Consumer (Figure 15.6) consists of instance variable sharedLocation (line 8) of type IBuffer, instance variable randomSleepTime (line 9) of type Random, a constructor (lines 1215) to initialize the instance variables and a Consume method (lines 1831). The constructor initializes sharedLocation to refer to the IBuffer object received from Main as the parameter shared. The consumer thread in this program performs the tasks specified in class Consumer's Consume method. The method contains a for statement (lines 2326) that loops ten times. Each iteration of the loop invokes Thread method Sleep to put the consumer thread into the WaitSleepJoin state for a random time interval between 0 and 3 seconds. Next, line 25 gets the value of sharedLocation's Buffer property and adds the value to variable sum. When the loop completes, lines 2830 display a line in the console window indicating the sum of all values read. Again, ideally the total should be 55, but because access to the shared data is not synchronized, this sum will almost never appear. The Consume method then terminates, and the consumer thread enters the Stopped state.

We use method Sleep in this example's threads to emphasize the fact that in multithreaded applications, it is unclear when each thread will perform its task and for how long it will perform that task when it has the processor. Normally, these thread-scheduling issues are the job of the computer's operating system. In this program, our thread's tasks are quite simplefor the producer, loop ten times and perform an assignment statement; for the consumer, loop ten times and add a value to variable sum. Without the Sleep method call, and if the producer executes first, the producer would most likely complete its task before the consumer ever gets a chance to execute. If the consumer executes first, it would consume 1 ten times, then terminate before the producer could produce the first real value.

Class UnsynchronizedBuffer (Figure 15.7) implements interface IBuffer (line 7) and consists of instance variable bufferValue (line 10) and property Buffer (lines 1324), which provides Get and Set accessors. Property Buffer's accessors do not synchronize access to instance variable bufferValue. Note that each accessor uses class THRead's Shared property CurrentThread to obtain a reference to the currently executing thread, then uses that thread's Name property to obtain the thread's name for output purposes.

Module UnsynchronizedBufferTest (Figure 15.8) defines a Main method (lines 933) that instantiates a shared UnsynchronizedBuffer object (line 11) and a Random object (line 14) for generating random sleep times. These are used as arguments to the constructors for the objects of classes Producer (line 17) and Consumer (line 18). The UnsynchronizedBuffer object contains the data that will be shared between the producer and consumer threads. Because UnsynchronizedBuffer implements the IBuffer interface, the Producer and Consumer constructors can each take an UnsynchronizedBuffer object and assign it to their respective IBuffer variables named sharedLocation. Lines 2224 create and name producerThread. The THReadStart delegate for producerThread specifies that the thread will execute method Produce of object producer. Lines 2628 create and name the consumerThread. The THReadStart delegate for the consumerThread specifies that the thread will execute method Consume of object consumer. Finally, lines 3132 place the two threads in the Running state by invoking each thread's Start method, then the thread executing Main terminates.

Ideally, we would like every value produced by the Producer object to be consumed exactly once by the Consumer object. However, when we study the first output of Fig. 15.8, we see (in bold black text) that the consumer retrieved a value (1) before the producer ever placed a value in the shared buffer and that the values 2, 3 and 5 were consumed twice each. The consumer finished executing before the producer had an opportunity to produce the values 7, 8, 9 and 10. Therefore, those four values were lost, and an incorrect sum resulted. In the second output, we see that the value 1 was lost, because the values 1 and 2 were produced before the consumer thread could read the value 1. The values 3 and 9 were also lost and the values 4 and 10 were consumed twice each, also resulting in an incorrect sum. This example clearly demonstrates that access to shared data by concurrent threads must be controlled carefully; otherwise, a program may produce incorrect results.

To solve the problems of lost data and data consumed more than once in the previous example, we will (in Figs. 15.915.10) synchronize the access of the concurrent producer and consumer threads to the code that manipulates the shared data by using Monitor class methods Enter, Wait, Pulse and Exit. When a thread uses synchronization to access a shared object, the object is locked, and thus no other thread can acquire the lock for that shared object at the same time.

Figure 15.9. SynchronizedBuffer synchronizes access to a shared integer.

  1  ' Fig. 15.9: SynchronizedBuffer.vb  2  ' A synchronized shared buffer implementation.  3  Imports System.Threading  4  5  ' this class represents a single shared int  6  Public Class SynchronizedBuffer  7     Implements IBuffer  8     ' buffer shared by producer and consumer threads  9     Private bufferValue As Integer = -1              10 11     ' occupiedBufferCount maintains count of occupied buffers 12     Private occupiedBufferCount As Integer = 0                13 14     ' property Buffer 15     Property Buffer() As Integer Implements IBuffer.Buffer 16        Get 17           ' obtain lock on this object 18           Monitor.Enter(Me)            19 20           ' if there is no data to read, place invoking 21           ' thread in WaitSleepJoin state               22           If occupiedBufferCount = 0 Then               23              Console.WriteLine( _ 24                 Thread.CurrentThread.Name & " tries to read.") 25              DisplayState("Buffer empty. " & _ 26                 Thread.CurrentThread.Name & " waits.") 27              Monitor.Wait(Me) ' enter WaitSleepJoin state 28           End If 29 30           ' indicate that producer can store another value       31           ' because consumer is about to retrieve a buffer value 32           occupiedBufferCount -= 1                               33 34           DisplayState(Thread.CurrentThread.Name & " reads " & bufferValue) 35 36           ' tell waiting thread (if there is one) to 37           ' become ready to execute (Running state)  38           Monitor.Pulse(Me)                          39 40           ' Get copy of buffer before releasing lock.      41           ' It is possible that the producer could be      42           ' assigned the processor immediately after the   43           ' monitor is released and before the return      44           ' statement executes. In this case, the producer 45           ' would assign a new value to buffer before the  46           ' return statement returns the value to the      47           ' consumer. Thus, the consumer would receive the 48           ' new value. Making a copy of buffer and         49           ' returning the copy ensures that the            50           ' consumer receives the proper value.            51           Dim bufferCopy As Integer = bufferValue          52 53           ' release lock on this object 54           Monitor.Exit(Me)              55 56           Return bufferCopy 57        End Get 58        Set(ByVal value As Integer) 59            ' acquire lock for this object 60            Monitor.Enter(Me)              61 62            ' if there are no empty locations, place invoking 63            ' thread in WaitSleepJoin state                   64            If occupiedBufferCount = 1 Then                   65               Console.WriteLine( _ 66                  Thread.CurrentThread.Name & " tries to write.") 67               DisplayState("Buffer full. " & _ 68                  Thread.CurrentThread.Name & " waits.") 69               Monitor.Wait( Me ) ' enter WaitSleepJoin state 70            End If 71 72            ' set new buffer value 73            bufferValue = value    74 75            ' indicate consumer can retrieve another value    76            ' because producer has just stored a buffer value 77            occupiedBufferCount += 1                          78 79            DisplayState(Thread.CurrentThread.Name & _ 80               " writes " & bufferValue) 81 82            ' tell waiting thread (if there is one) to 83            ' become ready to execute (Running state)  84            Monitor.Pulse(Me)                          85 86            ' release lock on this object 87            Monitor.Exit(Me)              88         End Set 89      End Property ' Buffer 90 91      ' display current operation and buffer state 92      Public Sub DisplayState(ByVal operation As String) 93         Console.WriteLine("{0,-35}{1,-9}{2}" & vbCrLf, operation, _ 94            bufferValue, occupiedBufferCount) 95     End Sub ' DisplayState 96  End Class ' SynchronizedBuffer 

Figure 15.10. Producer and consumer threads accessing a shared object with synchronization.

  1  ' Fig. 15.10: SynchronizedBufferTest.vb  2  ' Showing multiple threads modifying a shared object with synchronization.  3  Imports System.Threading  4  5  ' this module creates producer and consumer threads  6  Module SynchronizedBufferTest  7     ' create producer and consumer threads and start them  8     Sub Main()  9        ' create shared object used by threads     10        Dim sharedBuffer As New SynchronizedBuffer 11 12        ' Random object used by each thread 13        Dim random As New Random 14 15        ' output column heads and initial buffer state   16        Console.WriteLine("{0,-35}{1,-9}{2}" & vbCrLf, _ 17           "Operation", "Buffer", "Occupied Count")      18        sharedBuffer.DisplayState("Initial state")       19 20        ' create Producer and Consumer objects 21        Dim producer As New Producer(sharedBuffer, random) 22        Dim consumer As New Consumer(sharedBuffer, random) 23 24        ' create threads for producer and consumer and set 25        ' delegates for each thread 26        Dim producerThread As New Thread(New ThreadStart( _ 27           AddressOf producer.Produce)) 28        producerThread.Name = "Producer" 29 30        Dim consumerThread As New Thread(New ThreadStart( _ 31           AddressOf consumer.Consume)) 32        consumerThread.Name = "Consumer" 33 34        ' start each thread 35        producerThread.Start() 36        consumerThread.Start() 37     End Sub ' Main 38  End Module ' SynchronizedBufferTest 

[View full width]

Operation Buffer Occupied Count Initial state -1 0 Producer writes 1 1 1 Consumer reads 1 1 0 Consumer tries to read. Buffer empty. Consumer waits. 1 0 Producer writes 2 2 1 Consumer reads 2 2 0 Producer writes 3 3 1 Producer tries to write. Buffer full. Producer waits. 3 1 Consumer reads 3 3 0 Producer writes 4 4 1 Consumer reads 4 4 0 Consumer tries to read. Buffer empty. Consumer waits. 4 0 Producer writes 5 5 1 Consumer reads 5 5 0 Producer writes 6 6 1 Consumer reads 6 6 0 Producer writes 7 7 1 Consumer reads 7 7 0 Producer writes 8 8 1 Producer tries to write. Buffer full. Producer waits. 8 1 Consumer reads 8 8 0 Producer writes 9 9 1 Consumer reads 9 9 0 Consumer tries to read. Buffer empty. Consumer waits. 9 0 Producer writes 10 10 1 Producer done producing. Terminating Producer. Consumer reads 10 10 0 Consumer read values totaling: 55. Terminating Consumer. Initial state -1 0 Consumer tries to read. Buffer empty. Consumer waits. -1 0 Producer writes 1 1 1 Consumer reads 1 1 0 Consumer tries to read. Buffer empty. Consumer waits. 1 0 Producer writes 2 2 1 Consumer reads 2 2 0 Producer writes 3 3 1 Consumer reads 3 3 0 Producer writes 4 4 1 Producer tries to write. Buffer full. Producer waits. 4 1 Consumer reads 4 4 0 Producer writes 5 5 1 Producer tries to write. Buffer full. Producer waits. 5 1 Consumer reads 5 5 0 Producer writes 6 6 1 Consumer reads 6 6 0 Producer writes 7 7 1 Consumer reads 7 7 0 Producer writes 8 8 1 Consumer reads 8 8 0 Consumer tries to read. Buffer empty. Consumer waits. 8 0 Producer writes 9 9 1 Consumer reads 9 9 0 Consumer tries to read. Buffer empty. Consumer waits. 9 0 Producer writes 10 10 1 Consumer reads 10 10 0 Producer done producing. Terminating Producer. Consumer read values totaling: 55. Terminating Consumer.





Visual BasicR 2005 for Programmers. DeitelR Developer Series
Visual Basic 2005 for Programmers (2nd Edition)
ISBN: 013225140X
EAN: 2147483647
Year: 2004
Pages: 435

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net