Interthread Communication

I l @ ve RuBoard

We've examined some basic synchronization techniques, but they are not appropriate for every situation when you're handling multiple threads. Let's look at some additional techniques you can use for implementing more advanced cooperation and communication between threads.

Thread Notification

You've learned how to use the Monitor class to perform exclusive locking using a sync object. But consider a common scenario from the world of computer science: that of producers and consumers .

A producer is an object that creates resources. A consumer is an object that obtains resources from a producer. A simple producer might implement an iterative process, creating one resource after another. A consumer will wait until a resource is ready, and then retrieve it from the producer and exercise it in some way. When the consumer has finished, it will wait for the next resource to become available, and so on.

The producer and the consumer should both run concurrently, but the consumer cannot access a resource until the producer has finished building it, and the producer cannot modify the resource once the consumer has started to use it. The producer and consumer could run at different speeds ”the producer might be able to create resources faster than the consumer can gobble them up. In this situation, the resources must not be lost, but somehow queued for processing. There might also be a variable number of consumers, and no resource can be accessed by more than one consumer. Figure 8-8 shows the architecture of a typical producer-consumer system.

Figure 8-8. The producer-consumer architecture

To manage access to resources, you could guarantee exclusive access to the resource queue by locking it with Monitor.Enter . However, if a consumer locks the queue and there is nothing for the consumer to process, the consumer will have to unlock the queue immediately; otherwise , the producer will not be able to place anything in it. The consumer could poll for items on the queue, but this would be expensive ”the consumer would have to lock the queue, examine it, and then release the lock if nothing is there. Another problem is how long the consumer should sleep between polls . If the interval is too short, the system will spend its time obtaining and releasing locks and not doing any real work. If the interval is too long, the consumer might be overwhelmed with data when it processes the queue, the producer might have to wait for a long time while the consumer uses the queue, and the system as a whole might become fitful.

A better solution is to use the Monitor.Wait and Monitor.Pulse methods .

Waiting

The Wait method of the Monitor class is used in conjunction with Monitor.Enter and Monitor.Exit . As you'll recall, Monitor.Enter exclusively locks a resource. Monitor.Wait releases the lock on the resource but puts the thread into the WaitSleepJoin state. The thread will be woken up when another thread executes Monitor.Pulse over the same resource, and it will attempt to reacquire the lock (effectively automatically performing another Monitor.Enter command).

The sample program ProducerConsumer.jsl in the ProducerConsumer project shows an implementation of a producer and a consumer based on this model. The producer is the Calculator class. Calculator exposes a public System.Collections.Queue object called power2Queue and a method called calcPowersOfTwo . The purpose of calcPowersOfTwo is to calculate powers of two for all integers from 1 to 20. Each result is posted to power2Queue as it is generated. The method initially locks the queue:

 Monitor.Enter(Calculator.power2Queue); 

The program then enters a while loop to generate the various powers of two. As each value is generated, it is placed on the queue:

 power2Queue.Enqueue(newInteger(currPos*currPos)); 

Note

You can queue any type that descends from Object . However, unlike the hierarchy defined in .NET Framework Class Library, the Java primitive types (such as int ) are not freely convertible to Object . The solution is to use an appropriate wrapper ” Integer , in this case.


Having queued the value, the method signals to any waiting consumer that some data is ready, using Monitor.Pulse :

 Monitor.Pulse(Calculator.power2Queue); 

Before the consumer can continue, the producer must release the lock on the queue:

 Monitor.Exit(Calculator.power2Queue); 

If a consumer is waiting and is scheduled to run, it can obtain the lock. The producer calls Monitor.Enter again and blocks until the lock is granted before generating the next power of two.

The consumer is the Printer class. The printPowersOfTwo method does the work ”it grabs the lock on the queue:

 Monitor.Enter(Calculator.power2Queue); 

The method then executes Monitor.Wait on the power2Queue , which releases the lock and puts the thread into a WaitSleepJoin state. The reason for calling Monitor.Enter first is that you cannot execute Monitor.Wait on an object unless you currently hold a lock on that object, which is obtained using Monitor.Enter . (If you attempt to wait without getting the lock first, the common language runtime will throw a SynchronizationLockException .) The Wait method specifies the object to wait on, and optionally a timeout parameter. The example will wait for a maximum of 10 seconds (approximately). The value returned by Wait indicates whether the wait was successful (it was released by a pulse from another thread) or not (a timeout occurred). The while loop shown in the printPowersOfTwo method terminates if the Wait operation times out:

 while(Monitor.Wait(Calculator.power2Queue,newTimeSpan(0,0,10))) { } 

Inside the loop, the code iterates through all the available items on the queue, extracting them and printing them. Remember that you cannot control when threads will execute, and there is no guarantee that when the Calculator object executes the Pulse method, the Printer will get a chance to run before the Calculator gets the lock again and adds another value to the queue. So, when the Printer runs, more than one item might be waiting to be processed .

Returning to the calcPowersOfTwo method in the Calculator class, after all the powers of two have been generated and dispatched, the method executes the PulseAll command:

 Monitor.PulseAll(Calculator.power2Queue); 

If several threads are in the WaitSleepJoin state, Pulse will wake just the first thread that happens to be waiting. The others will remain waiting. Another method, PulseAll , is available that wakes all threads waiting on the object, and each one will attempt to obtain the lock when it is released. (Only one will succeed ”the others will block.) This is useful in a multithreaded environment if you're not sure how many threads are waiting on an object and you want to release them all. An illustration of when you might want to use PulseAll is shown in the example.

Monitor Queues

Internally, a monitor contains two queues ”the Waiting queue and the Ready queue. (Do not confuse these with the System.Collections.Queue object used by the ProducerConsumer example.) When a thread executes Monitor.Enter and gets blocked, it is placed in the Ready queue. When the lock is released, a subsequent thread switch will cause the next available thread to be extracted from the Ready queue and run. If a thread executes Monitor.Wait , it will be placed instead in the Waiting queue. When the Pulse method is executed by another thread, the first thread in the Waiting queue will be moved to the Ready queue and become available to run. If the PulseAll method is invoked, all threads in the Waiting queue will be moved to the Ready queue.

To make life more interesting, the main method in the Runner class in the ProducerConsumer program creates two Printer threads, which compete for output from the Calculator . The Printer class exposes the printerName field, which the Runner class sets to a different value for each thread. Each Printer thread displays its name as it prints data. If you run this program several times, you'll observe the nondeterministic nature of thread scheduling. One thread might dominate another, or the output might be shared evenly. Figure 8-9 shows the output of one run on a single-processor computer. Occasionally, you might find that the Printer threads do not output anything ”they just sit idle for 10 seconds and then terminate. We'll explain the reason for this in a moment.

Figure 8-9. The output of the ProducerConsumer program

Using Events

The reason you occasionally get no output from the Printer objects is due to a race condition that can occur when you use Pulse and Wait . The problem is that if you execute a Pulse when there are no waiting objects, the pulse will be lost. Pulse and Wait rely on the fact that a thread is waiting before another thread sends a Pulse . A more reliable approach to using a monitor to pass signals between threads is to employ events. The common language runtime provides two specialized event classes for signaling threads: System.Threading.ManualResetEvent and System.Threading.AutoResetEvent . An event object can be in one of two states: signaled or unsignaled. The event classes provide methods that allow a thread to wait until an event enters the signaled state, blocking while it is unsignaled. Additional methods are available that you can use to change the state.

The technique for using a ManualResetEvent object is quite straightforward. When you create one, you specify an initial state ” true means signaled and false means unsignaled:

 importSystem.Threading.*; ManualResetEventdataReady=newManualResetEvent(false); 

To wait for a ManualResetEvent object to become signaled, you invoke the WaitOne method. This method will block the current thread until the event is signaled. You can specify an optional timeout parameter: WaitOne returns true if the wait was successful, false on a timeout:

 dataReady.WaitOne(); 

Another thread can set the state of the event to signaled by executing the Set method:

 dataReady.Set(); 

When the event is signaled, it will release all threads that are waiting for it. What's more, the event will remain in a signaled state ”if you wait again, the wait operation will complete immediately. If a thread has successfully waited for an event, it should set the event back to the unsignaled state using the Reset method:

 dataReady.Reset(); 

The following code shows an implementation of the Calculator class based on ManualResetEvent objects rather than on monitors . This version is available in the EventProducerConsumer project. The class creates two ManualResetEvent objects called dataReady and queueReady , initializing them both to the unsignaled state. The calcPowersOfTwo method simply signals the dataReady event when the next power of two is available on the queue, and then it waits for the queueReady event before continuing. The Printer class signals this event when it has retrieved the data from the queue. After receiving the signal, the calcPowersOfTwo method resets the queueReady signal before looping and generating the next power of two.

 publicclassCalculator { publicstaticQueuepower2Queue=newQueue(); publicstaticManualResetEventdataReady= newManualResetEvent(false); publicstaticManualResetEventqueueReady= newManualResetEvent(false); publicvoidcalcPowersOfTwo() { intcurrPos=1; //Calculatepowersof2upto20 while(currPos<=20) { power2Queue.Enqueue(newInteger(currPos*currPos)); //Informthenextconsumerthatsomedataisavailable dataReady.Set(); //Waitfortheconsumertosignalthatthequeueisempty queueReady.WaitOne(); queueReady.Reset(); currPos+=1; } dataReady.Set(); } } 

The Printer class makes use of the two ManualResetEvent objects defined in the Calculator class. The printPowersOfTwo method waits for the dataReady event to be signaled, resets it, and then retrieves the next item from the queue if there is one. (The queue might be empty because the calcPowersOfTwo method also signals the dataReady event when it has finished.) Once it has extracted the item from the queue, the method signals the queueReady event, releasing the Calculator thread to generate the next value:

 publicclassPrinter { publicStringprinterName; publicvoidprintPowersOfTwo() { //Waitforthedatareadysignalfromtheproducer Calculator.dataReady.WaitOne(); Calculator.dataReady.Reset(); //Retrieveanddisplayasmanyvaluesasareonthequeue while(Calculator.power2Queue.get_Count()>0) { Integerdata=(Integer)Calculator.power2Queue.Dequeue(); Console.WriteLine(printerName+" ":" +data); //Signalthatthequeueisempty Calculator.queueReady.Set(); //Waitformoredata Calculator.dataReady.WaitOne(); Calculator.dataReady.Reset(); } Console.WriteLine(printerName+" " finishing"); } } 

This solution appears to be more natural than using a monitor, and it works well for a single producer with a single consumer. However, the inherent nature of ManualResetEvents can lead to race conditions, mainly because when an event is signaled it releases all threads that are waiting on it. Both threads can run concurrently. Applying the Reset method immediately after waiting, as both classes shown above do, might not prevent this. You can solve this problem using an AutoResetEvent .

The AutoResetEvent class is remarkably similar to the ManualResetEvent class, but with one vital difference: When an AutoResetEvent object is signaled, it releases only one waiting object and resets itself to the unsignaled state automatically. This eliminates the need to perform separate Reset calls, and the operation is atomic, removing the race condition. You can use AutoResetEvents in the Calculator and Printer class ”you just change the types of dataReady and queueReady in the Calculator class:

 publicstaticAutoResetEventdataReady=newAutoResetEvent(false); publicstaticAutoResetEventqueueReady=newAutoResetEvent(false); 

You should also hunt down every call to Reset and remove it. With this modification, the code will look like this:

 publicvoidcalcPowersOfTwo() { intcurrPos=1; //Calculatepowersof2upto20 while(currPos<=20) { power2Queue.Enqueue(newInteger(currPos*currPos)); //Informthenextconsumerthatsomedataisavailable dataReady.Set(); //Waitfortheconsumertosignalthatthequeueisempty queueReady.WaitOne(); currPos+=1; } dataReady.Set(); } publicvoidprintPowersOfTwo() { //Waitforthedatareadysignalfromtheproducer Calculator.dataReady.WaitOne(); //Retrieveanddisplayasmanyvaluesasareonthequeue while(Calculator.power2Queue.get_Count()>0) { Integerdata=(Integer)Calculator.power2Queue.Dequeue(); Console.WriteLine(printerName+ ":" +data); //Signalthatthequeueisempty Calculator.queueReady.Set(); //Waitformoredata Calculator.dataReady.WaitOne(); } Console.WriteLine(printerName+ " finishing"); } 

Again, this works fine with a single consumer thread, but a different problem arises if two or more consumers are required. You'll find that one consumer terminates normally, but the remaining consumers hang. This is because they're all waiting for the dataReady event. At the end of the calcPowersOfTwo method, the Calculator class signals this event, but as it is now an AutoResetEvent it will release only one waiting thread. The others will remain blocked indefinitely. The solution is to use a ManualResetEvent , called finished , to indicate that the calcPowersOfTwo method has finished rather than overloading the dataReady event. Being a ManualResetEvent , it will release all threads that are waiting for it. The printPowersOfTwo method in the consumer should then wait for the dataReady or finished event to be signaled. Fortunately, this is possible as I will now describe.

The AutoResetEvent and the ManualResetEvent classes both inherit directly from a third class called WaitHandle (where much of their common processing is implemented). The WaitHandle class has a static method called WaitAny , which takes an array of WaitHandle objects (which can be a mixture of the two event types) and waits until one of them is signaled:

 publicstaticAutoResetEventdataReady=newAutoResetEvent(false); publicstaticManualResetEventfinished=newManualResetEvent(false); WaitHandle.WaitAny(newWaitHandle[]{Calculator.dataReady, Calculator.finished}); 

Another method, WaitAll , takes an array of WaitHandle objects but completes only when they have all been signaled. Completed event-based versions of the Calculator and Printer classes are shown here (the completed code is available in the AutoResetProducerConsumer project):

 publicclassCalculator { publicstaticQueuepower2Queue=newQueue(); publicstaticAutoResetEventdataReady=newAutoResetEvent(false); publicstaticAutoResetEventqueueReady=newAutoResetEvent(false); publicstaticManualResetEventfinished= newManualResetEvent(false); publicvoidcalcPowersOfTwo() { intcurrPos=1; //Calculatepowersof2upto20 while(currPos<=20) { power2Queue.Enqueue(newInteger(currPos*currPos)); //Informthenextconsumerthatsomedataisavailable dataReady.Set(); //Waitfortheconsumertosignalthatthequeueisempty queueReady.WaitOne(); currPos+=1; } finished.Set(); } } publicclassPrinter { publicStringprinterName; publicvoidprintPowersOfTwo() { //Waitforthedatareadyorfinishedsignalsfromtheproducer WaitHandle.WaitAny(newWaitHandle[]{Calculator.dataReady, Calculator.finished}); //Retrieveanddisplayasmanyvaluesasareonthequeue while(Calculator.power2Queue.get_Count()>0) { Integerdata=(Integer)Calculator.power2Queue.Dequeue(); Console.WriteLine(printerName+ ":" +data); //Signalthatthequeueisempty Calculator.queueReady.Set(); //Waitformoredata WaitHandle.WaitAny(newWaitHandle[]{Calculator.dataReady, Calculator.finished}); } Console.WriteLine(printerName+ " finishing"); } } 
Mutexes

Mutexes are lightweight objects that guarantee mutually exclusive access to a resource. (The name mutex is a contraction of the phrase "mutual exclusion"). Like the event classes, the Mutex class inherits from WaitHandle , so much of its functionality should now be familiar to you. Semantically, mutexes operate on the concept of ownership rather than events, but the signaling mechanism is the same as that used by events. The general idea is that to access a shared resource, a thread should first issue the WaitOne method of a mutex:

 MutexresourceMutex=newMutex(); resourceMutex.WaitOne(); 

If the mutex is free, access to it is granted to the requesting thread, and it is said to be owned by that thread. Another thread that issues WaitOne over the same mutex will be blocked. A thread relinquishes ownership of a mutex using the ReleaseMutex method:

 resourceMutex.ReleaseMutex(); 

If any other threads are waiting, one of them will be unblocked and granted ownership. The others will remain blocked. If a thread terminates while it owns a mutex, ownership will be granted to the next waiting thread.

Perhaps the main reason for using a mutex instead of some of the other mechanisms you've seen in this chapter is that mutexes can be used across processes. All the other synchronization mechanisms you've seen so far operate only inside a single process. When you create a mutex, you can give it a name:

 MutexresourceMutex=newMutex(true,"MyResource"); 

The Boolean parameter to the constructor indicates whether the thread owns the named mutex. Only the thread that actually creates the mutex should specify a value of true . Other threads in other processes can attach to the same mutex by specifying the same name (you must adopt a naming scheme that prevents unintentional clashes of mutex names ) but setting the ownership parameter to false in the constructor.

Timers

Besides allowing you to create threads manually, the common language runtime will create threads for you automatically when you employ various .NET features ”including Web services, Remoting, and timers. We'll cover Web services and Remoting later in this book, but it's worth taking a brief look at timers here.

The System.Threading.Timer class allows you to execute a method at defined intervals. To do this, it makes use of a TimerCallback delegate that refers to a method that you specify, and it creates a thread to run this method whenever the timer expires .

The ClockTick project offers a simple example of using the Timer class to execute a method every five seconds. The Timer constructor expects a TimerCallback delegate that points to the method to be invoked every time the timer is triggered. This method must return void and take a single Object parameter containing user -defined state information. The second parameter to the Timer method is the data that will be passed as the parameter to the callback method every time it runs. This should be an Object (that is, almost anything can be passed in) or null if you don't want to use this parameter. The third argument indicates when the timer should first run. It is specified as an interval relative to the current time. The final parameter is the period to wait between invocations of the timer. Here's an example:

 System.Threading.Timerticker= newSystem.Threading.Timer(newTimerCallback(tickTock),null, newTimeSpan(0,0,0),newTimeSpan(0,0,5)); 

Each time the indicated period has elapsed, a new thread is created that runs the tickTock method. The method itself prints out the current date and time:

 privatevoidtickTock(Objectstate) { Console.WriteLine("Itisnow" +System.DateTime.get_Now()); } 

Once the thread is created, you cannot change the timer interval or suspend the timer. You can kill the thread by calling its Dispose method or by terminating the entire program. The ClockTick program runs until you press the Enter key.

Figure 8-10 shows the output generated by this program.

Figure 8-10. The output of the ClockTick program

I l @ ve RuBoard


Microsoft Visual J# .NET (Core Reference)
Microsoft Visual J# .NET (Core Reference) (Pro-Developer)
ISBN: 0735615500
EAN: 2147483647
Year: 2002
Pages: 128

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net