Producer/Consumer Relationship: Circular Buffer

Producer Consumer Relationship Circular Buffer

Figures 15.9 and 15.10 use thread synchronization to guarantee that two threads manipulate data in a shared buffer correctly. However, the application may not perform optimally. If the two threads operate at different speeds, one will spend more (or most) of its time waiting. For example, in Fig. 15.9 we shared a single integer between the two threads. If the producer thread produces values faster than the consumer can consume those values, then the producer thread waits for the consumer, because there are no other locations in memory to place the next value. Similarly, if the consumer consumes faster than the producer can produce values, the consumer waits until the producer places the next value in the shared location in memory. Even when we have threads that operate at the same relative speeds, those threads over a period of time may become "out of sync," causing one of the threads to wait for the other. We cannot make assumptions about the relative speeds of concurrently executing threads. There are too many interactions that occur with the operating system, the network, the user and other components, which can cause the threads to operate at different speeds. When this happens, threads wait. When threads wait, programs become less productive, user-interactive programs become less responsive and network applications suffer longer delays because the processor is not used efficiently.

To minimize the waiting for threads that share resources and operate at the same relative speeds, we can implement a circular buffer that provides extra locations in which the producer can place values (if it "gets ahead" of the consumer) and from which the consumer can retrieve those values (if it "catches up" to the producer). Let us assume the buffer is implemented as an array. The producer and consumer work from the beginning of the array. When either thread reaches the end of the array, it simply returns to the first element of the array to perform its next task. If the producer temporarily produces values faster than the consumer can consume them, the producer can write additional values in the extra buffers (if cells are available; otherwise, the producer must, once again, wait). This enables the producer to perform its task even though the consumer is not ready to receive the current value being produced. Similarly, if the consumer consumes faster than the producer produces new values, the consumer can read additional values from the buffer (if there are any; otherwise, the consumer must, once again, wait) and thus "catch up" to the producer. This enables the consumer to perform its task even though the producer is not ready to produce additional values.

Note that a circular buffer would be inappropriate if the producer and consumer operate at different speeds. If the consumer always executes faster than the producer, then a buffer with one location is enough. Additional locations would waste memory. If the producer always executes faster, a buffer with an infinite number of locations would be required to absorb the extra production.

The key to using a circular buffer is to define it with enough extra cells to handle the anticipated "extra" production. If, over a period of time, we determine that the producer often produces as many as three more values than the consumer can consume, we can define a buffer of at least three cells to handle the extra production. We do not want the buffer to be too small, because that would cause threads to wait more. On the other hand, we do not want the buffer to be too large, because that would waste memory.

Performance Tip 15 3

Even when using a circular buffer, it is possible that a producer thread could fill the buffer, which would force the producer thread to wait until a consumer consumes a value to free an element in the buffer. Similarly, if the buffer is empty, the consumer thread must wait until the producer produces another value. The key to using a circular buffer is optimizing the buffer size to minimize the amount of thread wait time.

Figures 15.11 and 15.12 demonstrate a producer and a consumer accessing a circular buffer (in this case, a shared array of three elements) with synchronization. In this version of the producer/consumer relationship, the consumer consumes a value only when the array is not empty, and the producer produces a value only when the array is not full. This example reuses interface Buffer (Fig. 15.4) and classes Producer (Fig. 15.5) and Consumer (Fig. 15.6). The statements that created and started the thread objects in the Main methods of class UnsynchronizedBufferTest in Fig. 15.8 and SynchronizedBufferTest in Fig. 15.10 now appear in class CircularBufferTest (Fig. 15.12).

Figure 15.11. CircularBuffer synchronizes access to a circular buffer containing three slots.

(This item is displayed on pages 747 - 749 in the print version)

 1 // Fig. 15.11: CircularBuffer.cs
 2 // A circular shared buffer for the producer/consumer relationship.
 3 using System;
 4 using System.Threading;
 5
 6 // implement the an array of shared integers with synchronization
 7 public class CircularBuffer : Buffer
 8 {
 9 // each array element is a buffer 
10 private int[] buffers = { -1, -1, -1 };
11
12 // occupiedBufferCount maintains count of occupied buffers
13 private int occupiedBufferCount = 0;
14
15 private int readLocation = 0; // location of the next read 
16 private int writeLocation = 0; // location of the next write
17
18 // property Buffer
19 public int Buffer
20 {
21 get
22 {
23 // lock this object while getting value
24 // from buffers array
25 lock ( this )
26 {
27 // if there is no data to read, place invoking
28 // thread in WaitSleepJoin state
29 if ( occupiedBufferCount == 0 )
30 {
31 Console.Write( "
All buffers empty. {0} waits.",
32 Thread.CurrentThread.Name );
33 Monitor.Wait( this ); // enter the WaitSleepJoin state
34 } // end if
35
36 // obtain value at current readLocation 
37 int readValue = buffers[ readLocation ];
38
39 Console.Write( "
{0} reads {1} ", 
40  Thread.CurrentThread.Name, buffers[ readLocation ] );
41
42 // just consumed a value, so decrement number of
43 // occupied buffers 
44 --occupiedBufferCount; 
45
46 // update readLocation for future read operation, 
47 // then add current state to output 
48 readLocation = ( readLocation + 1 ) % buffers.Length;
49 Console.Write( CreateStateOutput() ); 
50
51 // return waiting thread (if there is one)
52 // to Running state
53 Monitor.Pulse( this ); 54 55 return readValue; 56 } // end lock 57 } // end get 58 set 59 { 60 // lock this object while setting value 61 // in buffers array 62 lock ( this ) 63 { 64 // if there are no empty locations, place invoking 65 // thread in WaitSleepJoin state 66 if ( occupiedBufferCount == buffers.Length ) 67 { 68 Console.Write( " All buffers full. {0} waits.", 69 Thread.CurrentThread.Name ); 70 Monitor.Wait( this ); // enter the WaitSleepJoin state 71 } // end if 72 73 // place value in writeLocation of buffers 74 buffers[ writeLocation ] = value; 75 76 Console.Write( " {0} writes {1} ", 77 Thread.CurrentThread.Name, buffers[ writeLocation ] ); 78 79 // just produced a value, so increment number of 80 // occupied buffers 81 ++occupiedBufferCount; 82 83 // update writeLocation for future write operation, 84 // then add current state to output 85 writeLocation = ( writeLocation + 1 ) % buffers.Length; 86 Console.Write( CreateStateOutput() ); 87 88 // return waiting thread (if there is one) 89 // to Running state 90 Monitor.Pulse( this ); 91 } // end lock 92 } // end set 93 } // end property Buffer 94 95 // create state output 96 public string CreateStateOutput() 97 { 98 // display first line of state information 99 string output = "(buffers occupied: " + 100 occupiedBufferCount + ") buffers: "; 101 102 for ( int i = 0; i < buffers.Length; i++ ) 103 output += " " + string.Format( "{0,2}", buffers[ i ] ) + " "; 104 105 output += " "; 106 107 // display second line of state information 108 output += " "; 109 110 for ( int i = 0; i < buffers.Length; i++ ) 111 output += "---- "; 112 113 output += " "; 114 115 // display third line of state information 116 output += " "; 117 118 // display readLocation (R) and writeLocation (W) 119 // indicators below appropriate buffer locations 120 for ( int i = 0; i < buffers.Length; i++ ) 121 { 122 if ( i == writeLocation && 123 writeLocation == readLocation ) 124 output += " WR "; 125 else if ( i == writeLocation ) 126 output += " W "; 127 else if ( i == readLocation ) 128 output += " R "; 129 else 130 output += " "; 131 } // end for 132 133 output += " "; 134 return output; 135 } // end method CreateStateOutput 136 } // end class HoldIntegerSynchronized

Figure 15.12. Producer and consumer threads accessing a circular buffer.

(This item is displayed on pages 750 - 753 in the print version)

 1 // Fig. 15.12: CircularBufferTest.cs
 2 // Implementing the producer/consumer relationship with a
 3 // circular buffer.
 4 using System;
 5 using System.Threading;
 6
 7 class CircularBufferTest
 8 {
 9 // create producer and consumer threads and start them
10 static void Main( string[] args )
11 {
12 // create shared object used by threads 
13 CircularBuffer shared = new CircularBuffer();
14
15 // Random object used by each thread
16 Random random = new Random();
17
18 // display shared state before producer 
19 // and consumer threads begin execution 
20 Console.Write( shared.CreateStateOutput() );
21
22 // create Producer and Consumer objects
23 Producer producer = new Producer( shared, random );
24 Consumer consumer = new Consumer( shared, random ); 25 26 // create threads for producer and consumer and set 27 // delegates for each thread 28 Thread producerThread = 29 new Thread( new ThreadStart( producer.Produce ) ); 30 producerThread.Name = "Producer"; 31 32 Thread consumerThread = 33 new Thread( new ThreadStart( consumer.Consume ) ); 34 consumerThread.Name = "Consumer"; 35 36 // start each thread 37 producerThread.Start(); 38 consumerThread.Start(); 39 } // end Main 40 } // end class CircularBufferTest
(buffers occupied: 0)
buffers: -1 -1 -1
 ---- ---- ----
 WR

All buffers empty. Consumer waits.
Producer writes 1 (buffers occupied: 1)
buffers: 1 -1 -1
 ---- ---- ----
 R W

Consumer reads 1 (buffers occupied: 0)
buffers: 1 -1 -1
 ---- ---- ----
 WR

Producer writes 2 (buffers occupied: 1)
buffers: 1 2 -1
 ---- ---- ----
 R W

Consumer reads 2 (buffers occupied: 0)
buffers: 1 2 -1
 ---- ---- ----
 WR

All buffers empty. Consumer waits.
Producer writes 3 (buffers occupied: 1)
buffers: 1 2 3
 ---- ---- ----
 W R

Consumer reads 3 (buffers occupied: 0)
buffers: 1 2 3
 ---- ---- ----
 WR

All buffers empty. Consumer waits.
Producer writes 4 (buffers occupied: 1)
buffers: 4 2 3
 ---- ---- ----
 R W

Producer writes 5 (buffers occupied: 2)
buffers: 4 5 3
 ---- ---- ----
 R W

Consumer reads 4 (buffers occupied: 1)
buffers: 4 5 3
 ---- ---- ----
 R W

Producer writes 6 (buffers occupied: 2)
buffers: 4 5 6
 ---- ---- ----
 W R

Producer writes 7 (buffers occupied: 3)
buffers: 7 5 6
 ---- ---- ----
 WR

All buffers full. Producer waits.
Consumer reads 5 (buffers occupied: 2)
buffers: 7 5 6
 ---- ---- ----
 W R

Consumer reads 6 (buffers occupied: 1)
buffers: 7 5 6
 ---- ---- ----
 R W

Producer writes 8 (buffers occupied: 2)
buffers: 7 8 6
 ---- ---- ----
 R W

Consumer reads 7 (buffers occupied: 1)
buffers: 7 8 6
 ---- ---- ----
 R W

Consumer reads 8 (buffers occupied: 0)
buffers: 7 8 6
 ---- ---- ----
 WR

Producer writes 9 (buffers occupied: 1)
buffers: 7 8 9
 ---- ---- ----
 W R

 
Producer writes 10 (buffers occupied: 2)
buffers: 10 8 9
 ---- ---- ----
 W R
Producer done producing.
Terminating Producer.

Consumer reads 9 (buffers occupied: 1)
buffers: 10 8 9
 ---- ---- ----
 R W

Consumer reads 10 (buffers occupied: 0)
buffers: 10 8 9
 ---- ---- ----
 WR
Consumer read values totaling: 55.
Terminating Consumer.

The most significant changes occur in class CircularBuffer (Fig. 15.11), which now contains four instance variables. Array buffers (line 10) is a three-element integer array that represents the circular buffer. Variable occupiedBufferCount is the condition variable that can be used to determine whether a producer can write to the circular buffer (i.e., occupiedBufferCount is less than the number of elements in array buffers) and whether a consumer can read from the circular buffer (i.e., occupiedBufferCount is greater than 0). Variable readLocation (line 15) indicates the position from which the next value can be read by a consumer. Variable writeLocation (line 16) indicates the next location in which a value can be placed by a producer.

The set accessor (lines 5892) of property Buffer performs the same tasks that it did in Fig. 15.9, with a few modifications. Rather than using Monitor methods Enter and Exit to acquire and release the lock on the CircularBuffer object, we use a block of code preceded by keyword lock to lock the CircularBuffer object. As program control enters the lock block, the currently executing thread acquires the lock (assuming the lock is currently available) on the CircularBuffer object (i.e., this). When the lock block terminates, the thread releases the lock automatically.

Common Programming Error 15 5

When using class Monitor's Enter and Exit methods to manage an object's lock, Exit must be called explicitly to release the lock. If an exception occurs in a method before Exit can be called and that exception is not caught, the method could terminate without calling Exit. If so, the lock is not released. To avoid this error, place code that could throw exceptions in a TRy block, and place the call to Exit in the corresponding finally block to ensure that the lock is released.

Software Engineering Observation 15 1

Using a lock block to manage the lock on a synchronized object eliminates the possibility of forgetting to relinquish the lock with a call to Monitor method Exit. C# implicitly calls Monitor method Exit when a lock block terminates for any reason. Thus, even if an exception occurs in the block, the lock will be released.

The if statement at lines 6671 in the set accessor determines whether the producer must wait (i.e., all buffers are full). If so, lines 6869 output text indicating that the producer is waiting to perform its task, and line 70 invokes Monitor method Wait to place the producer thread in the WaitSleepJoin state. When execution continues at line 74 after the if statement, the value from the producer is placed in the circular buffer at location writeLocation. Next, lines 7677 output a message containing the value produced. Line 81 increments occupiedBufferCount, because there is now at least one value in the buffer that the consumer can read. Then, line 85 updates writeLocation for the next call to the set accessor of property Buffer. The output continues at line 86 by invoking method CreateStateOutput (declared in lines 96135), which outputs the number of occupied buffers, the contents of the buffers and the current writeLocation and readLocation. Finally, line 90 invokes Monitor method Pulse to indicate that a thread waiting on the CircularBuffer object (if there is a waiting thread) should transition to the Running state. Note that reaching the closing right brace of the lock block at line 91 causes the thread to release the lock on the CircularBuffer object.

The get accessor (lines 2157) of property Buffer also performs the same tasks in this example that it did in Fig. 15.9, with a few minor modifications. Once again, we use a lock block to acquire and release the lock on the CircularBuffer object, rather than using Monitor methods Enter and Exit. The if statement at lines 2934 in the get accessor determines whether the consumer must wait (i.e., all buffers are empty). If the consumer thread must wait, lines 3132 indicate that the consumer is waiting to perform its task, and line 33 invokes Monitor method Wait to place the consumer thread in the WaitSleepJoin state. When execution continues at line 37 after the if statement, readValue is assigned the value at location readLocation in the circular buffer. Lines 3940 output the value consumed. Line 44 decrements the occupiedBufferCount, because the buffer now contains one more position in which the producer thread can place a value. Then, line 48 updates readLocation for the next call to the get accessor of Buffer. Line 49 invokes method CreateStateOutput to output the number of occupied buffers, the contents of the buffers and the current writeLocation and readLocation. Finally, line 53 invokes method Pulse to transition the next thread waiting for the CircularBuffer object to the Running state, and line 55 returns the consumed value to the calling method.

In Fig. 15.12, line 13 now declares shared as a CircularBuffer object, and line 20 displays the initial state of the shared buffer space. The outputs for this example include the current occupiedBufferCount, the contents of the buffers and the current writeLocation and readLocation. In the output, the letters W and R represent the current writeLocation and readLocation, respectively. Notice that after the third value is placed in the third element of the buffer, the fourth value is inserted at the beginning of the array. This provides the circular buffer effect.

Preface

Index

    Introduction to Computers, the Internet and Visual C#

    Introduction to the Visual C# 2005 Express Edition IDE

    Introduction to C# Applications

    Introduction to Classes and Objects

    Control Statements: Part 1

    Control Statements: Part 2

    Methods: A Deeper Look

    Arrays

    Classes and Objects: A Deeper Look

    Object-Oriented Programming: Inheritance

    Polymorphism, Interfaces & Operator Overloading

    Exception Handling

    Graphical User Interface Concepts: Part 1

    Graphical User Interface Concepts: Part 2

    Multithreading

    Strings, Characters and Regular Expressions

    Graphics and Multimedia

    Files and Streams

    Extensible Markup Language (XML)

    Database, SQL and ADO.NET

    ASP.NET 2.0, Web Forms and Web Controls

    Web Services

    Networking: Streams-Based Sockets and Datagrams

    Searching and Sorting

    Data Structures

    Generics

    Collections

    Appendix A. Operator Precedence Chart

    Appendix B. Number Systems

    Appendix C. Using the Visual Studio 2005 Debugger

    Appendix D. ASCII Character Set

    Appendix E. Unicode®

    Appendix F. Introduction to XHTML: Part 1

    Appendix G. Introduction to XHTML: Part 2

    Appendix H. HTML/XHTML Special Characters

    Appendix I. HTML/XHTML Colors

    Appendix J. ATM Case Study Code

    Appendix K. UML 2: Additional Diagram Types

    Appendix L. Simple Types

    Index



    Visual C# How to Program
    Visual C# 2005 How to Program (2nd Edition)
    ISBN: 0131525239
    EAN: 2147483647
    Year: 2004
    Pages: 600

    Flylib.com © 2008-2020.
    If you may any questions please contact us: flylib@qtcs.net