Synchronizing Threads

 
   

Ruby Way
By Hal Fulton
Slots : 1.0
Table of Contents
 


Why is synchronization necessary? It is because the "interleaving" of operations causes variables and other entities to be accessed in ways that aren't obvious from reading the code of the individual threads. Two or more threads accessing the same variable might interact with each other in ways that are unforeseen and difficult to debug.

Let's take this simple piece of code as an example:

 

 x = 0 t1 = Thread.new do   1.upto(1000) do     x = x + 1   end end t2 = Thread.new do   1.upto(1000) do     x = x + 1   end end t1.join t2.join puts x 

The variable x starts at 0. Each thread increments it a thousand times. Logic tells us that x must be 2000 when it is printed out.

But what have we here? On one particular system, it prints 1044 as the result. What has gone wrong?

Our code assumes that the incrementing of an integer is an atomic (or indivisible) operation. But it isn't. Consider the following logic flow. We put thread t1 on the left side and t2 on the right. Each separate timeslice is on a separate line, and we assume that when we enter this piece of logic, x has the value 123.

 

 t1                            t2 __________________________    __________________________ Retrieve value of x (123)                               Retrieve value of x (123) Add one to value (124)                               Add one to value (124) Store result back in x                               Store result back in x 

It's obvious that each thread is doing a simple increment from its own point of view. In this case, also obvious is that x is only 124 after having been incremented by both threads.

This is only the simplest of synchronization problems. The worst ones become truly difficult to manage and also become genuine objects of study by computer scientists and mathematicians.

Performing Simple Synchronization with Critical Sections

The simplest form of synchronization is to use a critical section. When a thread enters a critical section of code, this technique guarantees that no other thread will run until the first thread has left its critical section.

The Thread.critical accessor, when set to true, will prevent other threads from being scheduled. Here we look at the example we just mentioned and use this technique to fix it.

 

 x = 0 t1 = Thread.new do   1.upto(1000) do     Thread.critical = true     x = x + 1     Thread.critical = false   end end t2 = Thread.new do   1.upto(1000) do     Thread.critical = true     x = x + 1     Thread.critical = false   end end t1.join t2.join puts x 

Now the logic flow is forced to resemble the following. (Of course, outside of the incrementing part, the threads are free to interleave operations more or less randomly.)

 

 t1                            t2 __________________________    __________________________ Retrieve value of x (123) Add one to value (124) Store result back in x                               Retrieve value of x (124)                               Add one to value (125)                               Store result back in x 

It is possible to perform combinations of thread manipulation operations that cause a thread to be scheduled even if another thread is in a critical section. In the simplest case, a thread that is newly created will be run immediately regardless of whether another thread is in a critical section. For this reason, this technique should be used only in the simplest of circumstances.

Synchronizing Access to Resources (mutex.rb)

Let's take a Web indexing application as an example. We retrieve words from multiple sources over the Net and store them in a hash. The word itself will be the key and the value will be a string that identifies the document and the line number within the document.

This is a very crude example. But we will make it even more crude with these simplifying assumptions:

  • We will represent the remote documents as simple strings.

  • We will limit it to three such strings (simple hard-coded data).

  • We will simulate the variability of Net access with random sleeps.

So let's look at Listing 7.1. It doesn't even print out the data it collects, but only a (non-unique) count of the number of words found. Note that every time the hash is examined or changed, we call the hesitate method to sleep for a random interval. This will cause the program to run in a less deterministic and more realistic way.

Listing 7.1 Flawed Indexing Example (with a Race Condition)
 $list = [] $list[0]="shoes ships\nsealing-wax" $list[1]="cabbages kings" $list[2]="quarks\nships\ncabbages" def hesitate   sleep rand(0) end $hash = {} def process_list(listnum)   lnum = 0   $list[listnum].each do |line|     words = line.chomp.split     words.each do |w|       hesitate       if $hash[w]         hesitate         $hash[w] += ["#{ listnum} :#{ lnum} "]       else         hesitate         $hash[w] = ["#{ listnum{ :#{ lnum} "]       end     end     lnum += 1   end end t1 = Thread.new(0) { |list| process_list(list) } t2 = Thread.new(1) { |list| process_list(list) } t3 = Thread.new(2) { |list| process_list(list) } t1.join t2.join t3.join count = 0 $hash.values.each do |v|   count += v.size end puts "Total: #{ count}  words"     # May print 7 or 8! 

But there is a problem. If your system behaves as ours has, there are two possible numbers this program can output! In our tests, it prints the answers 7 and 8 with approximately equal likelihood. In a situation with more words and more lists, there would be even more variation.

Let's try to fix this with a mutex that controls access to a shared resource. (The term is derived, of course, from the words mutual exclusion.) The Mutex library will allow us to create and manipulate a mutex. We can lock it when we are about to access the hash and unlock it when we have finished with it (see Listing 7.2).

Listing 7.2 Mutex Protected Indexing Example
 require "thread.rb" $list = [] $list[0]="shoes ships\nsealing-wax" $list[1]="cabbages kings" $list[2]="quarks\nships\ncabbages" def hesitate   sleep rand(0) end $hash = {} $mutex = Mutex.new def process_list(listnum)   lnum = 0   $list[listnum].each do |line|     words = line.chomp.split     words.each do |w|       hesitate       $mutex.lock         if $hash[w]           hesitate           $hash[w] += ["#{ listnum} :#{ lnum} "]         else           hesitate           $hash[w] = ["#{ listnum{ :#{ lnum} "]         end       $mutex.unlock     end     lnum += 1   end end t1 = Thread.new(0) {  |list| process_list(list) } t2 = Thread.new(1) {  |list| process_list(list) } t3 = Thread.new(2) {  |list| process_list(list) } t1.join t2.join t3.join count = 0 $hash.values.each do |v|   count += v.size end puts "Total: #{ count}  words"     # Always prints 8! 

We should mention that in addition to lock, the Mutex class also has a try_lock method. It behaves the same as lock except that if another thread already has the lock, it will return false immediately rather than waiting.

 

 $mutex = Mutex.new t1 = Thread.new {  $mutex.lock; sleep 30 } sleep 1 t2 = Thread.new do   if $mutex.try_lock     puts "Locked it"   else     puts "Could not lock"   # Prints immediately   end end sleep 2 

This feature is useful any time a thread doesn't want to be blocked.

Using the Predefined Synchronized Queue Classes

The thread library thread.rb has a couple of classes that will be useful from time to time. The class Queue is a thread-aware queue that synchronizes access to the ends of the queue; that is, different threads can use the same queue without fear of problems. The class SizedQueue is essentially the same, except that it allows a limit to be placed on the size of the queue (the number of elements it can contain).

These have much the same set of methods available because SizedQueue actually inherits from Queue. The descendant also has the accessor maxused to get or set the maximum size of the queue.

 

 buff = SizedQueue.new(25) upper1 = buff.max            # 25 # Now raise it... buff.max = 50 upper2 = buff.max            # 50 

Listing 7.3 shows a simple producer-consumer illustration. The consumer is delayed slightly longer on the average (through a longer sleep) so that the items will pile up a little.

Listing 7.3 The Producer-Consumer Problem
 require "thread" buffer = SizedQueue.new(2) producer = Thread.new do   item = 0   loop do     sleep rand 0     puts "Producer makes #{ item} "     buffer.enq item     item += 1   end end consumer = Thread.new do   loop do     sleep (rand 0)+0.9     item = buffer.deq     puts "Consumer retrieves #{ item} "     puts "  waiting = #{ buffer.num_waiting} "   end end sleep 60   # Run a minute, then die and kill threads 

The methods enq and deq are the recommended way to get items into and out of the queue. We can also use push to add to the queue and pop or shift to remove items, but these names have somewhat less mnemonic value when we are explicitly using a queue.

The method empty? tests for an empty queue, and clear causes a queue to be empty. The method size (or its alias length) returns the actual number of items in the queue.

 

     # Assume no other threads interfering...     buff = Queue.new     buff.enq "one"     buff.enq "two"     buff.enq "three"     n1 = buff.size          # 3     flag1 = buff.empty?     # false     buff.clear     n2 = buff.size          # 0     flag2 = buff.empty?     # true 

The num_waiting method is the number of threads waiting to access the queue. In the non-sized queue, this is the number of threads waiting to remove elements; in the sized queue, this is also the threads waiting to add elements to the queue.

An optional parameter non_block defaults to false for the deq method in the Queue class. If it is true, an empty queue will give a ThreadError rather than blocking the thread.

Using Condition Variables

And he called for his fiddlers three.

"Old King Cole" (traditional folk tune)

A condition variable is really just a queue of threads. It is used in conjunction with a mutex to provide a higher level of control when synchronizing threads.

A condition variable is always associated with a specific mutex; it is used to relinquish control of the mutex until a certain condition has been met. Imagine a situation in which a thread has a mutex locked but cannot continue because the circumstances aren't right. It can sleep on the condition variable and wait to be awakened when the condition is met.

It is important to understand that while a thread is waiting on a condition variable, the mutex is released so that other threads can gain access. It is also important to realize that when another thread does a signal operation (to awaken the waiting thread), the waiting thread reacquires the lock on the mutex.

Here, we present a very contrived example in the tradition of the dining philosophers. Imagine a table in which three violinists are seated, all of whom want to take turns playing. However, there are only two violins and only one bow. Obviously, a violinist can play only if she has one of the violins and the lone bow at the same time.

We keep a count of the violins and bows available. When a player wants a violin or a bow, she must wait for it. In our code, we protect the test with a mutex and do separate waits for the violin and the bowboth associated with that mutex. If a violin or a bow isn't available, the thread sleeps. It loses the mutex until it is awakened by another thread signaling that the resource is available, whereupon the original thread wakes up and once again owns the lock on the mutex.

Let's take a look at Listing 7.4.

Listing 7.4 The Three Violinists
 require "thread" $music  = Mutex.new $violin = ConditionVariable.new $bow    = ConditionVariable.new $violinsFree = 2 $bowsFree    = 1 def musician(n)   loop do     sleep rand 0     $music.synchronize do       $violin.wait($music) while $violinsFree == 0       $violinsFree -= 1       puts "#{ n}  has a violin"       puts "violins #$violinsFree, bows #$bowsFree"       $bow.wait($music) while $bowsFree == 0       $bowsFree -= 1       puts "#{ n}  has a bow"       puts "violins #$violinsFree, bows #$bowsFree"     end     sleep rand 0     puts "#{ n} :  (...playing...)"     sleep rand 0     puts "#{ n} : Now I've finished."     $music.synchronize do       $violinsFree += 1       $violin.signal if $violinsFree == 1       $bowsFree += 1       $bow.signal if $bowsFree == 1     end   end end threads = [] 3.times do |i|   threads << Thread.new {  musician(i) } end threads.each { |t| t.join } 

We believe that this solution will never deadlock, although we've found it difficult to prove. But it is interesting to note that this algorithm isn't a fair one. In our tests, the first player always got to play more often than the other two, and the second more often than the third. The cause and cure for this behavior are left as an interesting exercise.

Using Other Synchronization Techniques

Yet another synchronization mechanism is the monitor, implemented in Ruby in the form of the monitor.rb library. This technique is somewhat more advanced than the mutex; notably, a mutex lock cannot be nested, but a monitor lock can.

The trivial case of this would never occur. That is, no one would ever write the following:

 

 $mutex = Mutex.new $mutex.synchronize do   $mutex.synchronize do     #...   end end 

But it might happen this way (or through a recursive method call). The result is a deadlock in any of these situations. Avoiding deadlock in this circumstance is one of the advantages of the Monitor mixin.

 

 $mutex = Mutex.new def some_method   $mutex.synchronize do     #...     some_other_method    # Deadlock!   end end def some_other_method   $mutex.synchronize do     #...   end end 

The Monitor mixin is typically used to extend any object. The new_cond method can then be used to instantiate a condition variable.

The class ConditionVariable in monitor.rb is enhanced from the definition in the thread library. It has methods wait_until and wait_while, which block a thread based on a condition. It also allows a timeout while waiting because the wait method has a timeout parameter that is a number of seconds (defaulting to nil).

Because we are rapidly running out of thread examples, we present to you a rewrite of the Queue and SizedQueue classes using the monitor technique in Listing 7.5. The code is by Shugo Maeda, used with permission.

Listing 7.5 Implementing a Queue with a Monitor
 # Author:  Shugo Maeda require "monitor" class Queue   def initialize     @que = []     @monitor = Monitor.new     @empty_cond = @monitor.new_cond   end   def enq(obj)     @monitor.synchronize do       @que.push(obj)       @empty_cond.signal     end   end   def deq     @monitor.synchronize do       while @que.empty?         @empty_cond.wait       end       return @que.shift     end   end end class SizedQueue < Queue   attr :max   def initialize(max)     super()     @max = max     @full_cond = @monitor.new_cond   end   def enq(obj)     @monitor.synchronize do       while @que.length >= @max         @full_cond.wait       end       super(obj)     end   end   def deq     @monitor.synchronize do       obj = super       if @que.length < @max         @full_cond.signal       end       return obj     end   end   def max=(max)     @monitor.synchronize do       @max = max       @full_cond.broadcast     end   end end 

The sync.rb library is one more way of performing thread synchronization. For those who know and care about such things, it implements a two-phase lock with a counter. At the time of this writing, the only documentation is inside the library itself.

Allowing Timeout of an Operation

There are many situations in which we want to allow a maximum length of time for an action to be performed. This avoids infinite loops and allows an additional level of control over processing. A feature such as this is useful in the environment of the Net, where we might or might not get a response from a distant server, and in other circumstances.

The timeout.rb library is a thread-based solution to this problem. The timeout method executes the block associated with the method call; when the specified number of seconds has elapsed, it throws a TimeoutError that can be caught with a rescue clause (see Listing 7.6).

Listing 7.6 A Timeout Example
 require "timeout.rb" flag = false answer = nil begin   timeout(5) do     puts "I want a cookie!"     answer = gets.chomp     flag = true   end rescue TimeoutError   flag = false end if flag   if answer == "cookie"     puts "Thank you! Chomp, chomp, ..."   else     puts "That's not a cookie!"     exit   end else   puts "Hey, too slow!"   exit end puts "Bye now..." 

Waiting for an Event

There are many situations in which we might want to have one or more threads monitoring the outside world while other threads are doing other things. The examples here are all rather contrived, but they illustrate the general principle.

Here, we see three threads doing the work of an application. Another thread simply wakes up every five seconds, checks the global variable $flag, and wakes up three other threads when it sees the flag set. This saves the three worker threads from interacting directly with the two other threads and possibly making multiple attempts to awaken them.

 

 $job = false work1 = Thread.new {  job1() } work2 = Thread.new {  job2() } work3 = Thread.new {  job3() } thread5 = Thread.new {  Thread.stop; job4() } thread6 = Thread.new {  Thread.stop; job5() } watcher = Thread.new do   loop do     sleep 5     if $flag       thread5.wakeup       thread6.wakeup       Thread.exit     end   end end 

If at any point during the execution of the job methods the variable $flag becomes true, thread5 and thread6 are guaranteed to start within five seconds. After that, the watcher thread terminates.

In this next example, we are waiting for a file to be created. We check every 30 seconds for it, and start another thread if we see it; meanwhile, other threads can be doing anything at all. Actually, we are watching for three separate files here.

 

 def waitfor(filename)   loop do     if File.exist? filename       file_processor = Thread.new do         process_file(filename)       end       Thread.exit     else       sleep 30     end   end end waiter1 = Thread.new {  waitfor("Godot") } sleep 10 waiter2 = Thread.new {  waitfor("Guffman") } sleep 10 headwaiter = Thread.new {  waitfor("head") } # Main thread goes off to do other things... 

There are many other situations in which a thread might wait for an outside event, such as a networked application where the server at the other end of a socket is slow or unreliable.

Continuing Processing During I/O

Frequently, an application might have one or more I/O operations that are lengthy or time-consuming. This is especially true in the case of user input because a user typing at a keyboard is slower even than any disk operation. We can make use of this time by using threads.

Consider the case of a chess program that must wait for the human player to make her move. Of course, we present here only the barest outline of this concept.

We assume that the iterator predictMove will repeatedly generate likely moves that the person might make (and then determine the program's own responses to those moves). Then when the person moves, it is possible that the move has already been anticipated.

 

 scenario = { }     # move-response hash humans_turn = true thinking_ahead = Thread.new(board) do   predictMove do |m|     scenario[m] = myResponse(board,m)     Thread.exit if humans_turn == false   end end human_move = getHumanMove(board) humans_turn = false   # Stop the thread gracefully # Now we can access scenario which may contain the # move the person just made... 

We have to make the disclaimer that real chess programs don't usually work this way. The concern is usually to search quickly and thoroughly to a certain depth; in real life, a better solution would be to store partial state information obtained during the thinking thread, and then continue in the same vein until the program finds a good response or time runs out for its turn.

Implementing Parallel Iterators

Imagine that you wanted to iterate in parallel over more than one object. That is, for each of n objects, you want the first item of each of them, the second item of each, the third, and so on.

To make this a little more concrete, look at the following example. Here we assume that compose is the name of the magic method that provides a composition of iterators. We also assume that every object specified has a default iterator each that will be used, and that each object contributes one item at a time.

 

 arr1 = [1, 2, 3, 4] arr2 = [5, 10, 15, 20] compose(arr1, arr2) do |a,b|   puts "#{ a}  and #{ b} " end # Should output: # 1 and 5 # 2 and 10 # 3 and 15 # 4 and 20 

We could take the most simple-minded approach and iterate over the objects to completion, one after another, storing the results. But if we want a more elegant solution, one that doesn't actually store all the items, threads are the only easy solution. Our solution is shown in Listing 7.7.

Listing 7.7 Iterating in Parallel
 def compose(*objects)   threads = []   for obj in objects do     threads << Thread.new(obj) do |myobj|       me = Thread.current       me[:queue] = []       myobj.each do |element|         me[:queue].push element       end     end   end   list = [0]                       # Dummy non-nil value   while list.nitems > 0 do         # Still some non-nils     list = []     for thr in threads       list << thr[:queue].shift    # Remove one from each     end     yield list if list.nitems > 0  # Don't yield all nils   end end x = [1, 2, 3, 4, 5, 6, 7, 8] y = "  first\n second\n  third\n fourth\n  fifth\n" z = %w[a b c d e f] compose(x, y, z) do |a,b,c|   p [a, b, c] end # Output: # # [1, "  first\n", "a"] # [2, " second\n", "b"] # [3, "  third\n", "c"] # [4, " fourth\n", "d"] # [5, "  fifth\n", "e"] # [6, nil, "f"] # [7, nil, nil] # [8, nil, nil] 

Notice that we do not assume that the objects all have the same number of items over which to iterate. If an iterator "runs out" before the others, it will generate nil values until the longest-running iterator has exhausted itself.

Of course, it is possible to write a more general method that will grab more than one value from each iteration. (After all, not all iterators return just one value at a time.) We could let the first parameter specify the number of values per iterator.

It would also be possible to use arbitrary iterators (rather than the default each). We might pass in their names as strings and use send to invoke them. Doubtless there are other tricks that could be performed.

However, we think that the example given here is adequate for most circumstances. We will leave the other variations as an exercise for you.

Recursive Deletion in Parallel

Just for fun, let's take an example from Chapter 4, "External Data Manipulation," and "parallelize" it. (No, we don't mean parallel in the sense of using multiple processors.) The recursive deletion routine appears here in a threaded form. When we find that a directory entry is itself a directory, we start a new thread to traverse that directory and delete its contents.

As we go along, we keep track of the threads we've created in an array called threads; because this is a local variable, each thread will have its own copy of the array. It can be accessed by only one thread at a time, and there is no need to synchronize access to it.

Note also that we pass fullname into the thread block so that we don't have to worry about the thread accessing a changing value. The thread uses fn as a local copy of the same variable.

When we have traversed an entire directory, we want to wait on the threads we have created before deleting the directory we've just finished working on.

 

 def delete_all(dir)   threads = []   Dir.foreach(dir) do |e|     # Don't bother with . and ..     next if [".",".."].include? e     fullname = dir + File::Separator + e     if FileTest::directory?(fullname)       threads << Thread.new(fullname) do |fn|         delete_all(fn)       end     else       File.delete(fullname)     end   end   threads.each {  |t| t.join }   Dir.delete(dir) end delete_all("/tmp/stuff") 

Is this actually faster than the non-threaded version? We've found that the answer isn't consistent. It probably depends on your operating system as well as on the actual directory structure being deleted, that is, its depth, size of files, and so on.


   

 

 



The Ruby Way
The Ruby Way, Second Edition: Solutions and Techniques in Ruby Programming (2nd Edition)
ISBN: 0672328844
EAN: 2147483647
Year: 2000
Pages: 119
Authors: Hal Fulton

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net