Section 13.3. Conclusion

13.2. Synchronizing Threads

Why is synchronization necessary? It is because the "interleaving" of operations causes variables and other entities to be accessed in ways that are not obvious from reading the code of the individual threads. Two or more threads accessing the same variable may interact with each other in ways that are unforeseen and difficult to debug.

Let's take this simple piece of code as an example:

x = 0 t1 = do   1.upto(1000) do     x = x + 1   end end t2 = do   1.upto(1000) do     x = x + 1   end end t1.join t2.join puts x

The variable x starts at zero. Each thread increments it a thousand times. Logic tells us that x must be 2000 when it is printed out.

But you may find that the actual results contradict this logic. On one particular system, it prints 1044 as the result. What has gone wrong?

Our code assumes that the incrementing of an integer is an atomic (or indivisible) operation. But it isn't. Consider the logic flow in the following code example. We put thread t1 on the left side and t2 on the right. We put each separate timeslice on a separate line and assume that when we enter this piece of logic, x has the value 123.

t1                            t2 __________________________    __________________________ Retrieve value of x (123)                               Retrieve value of x (123) Add one to value (124)                               Add one to value (124) Store result back in x                               Store result back in x

It's obvious that each thread is doing a simple increment from its own point of view. But it's also obvious that, in this case, x is only 124 after having been incremented by both threads.

This is only the simplest of synchronization problems. The worst ones become truly difficult to manage and become genuine objects of study by computer scientists and mathematicians.

13.2.1. Performing Simple Synchronization with Critical Sections

The simplest form of synchronization is to use a critical section. When a thread enters a critical section of code, this technique guarantees that no other thread will run until the first thread has left its critical section.

The THRead.critical accessor, when set to TRue, will prevent other threads from being scheduled. In the following code, we revisit the previous example and use the critical accessor to define the critical section and protect the sensitive parts of the code.

x = 0 t1 = do   1.upto(1000) do     Thread.critical = true     x = x + 1     Thread.critical = false   end end t2 = do   1.upto(1000) do     Thread.critical = true     x = x + 1     Thread.critical = false   end end t1.join t2.join puts x

Now the logic flow is different; refer to the following description of how t1 and t2 now act. (Of course, outside the incrementing part, the threads are free to interleave operations more or less randomly.)

t1                            t2 __________________________    __________________________ Retrieve value of x (123) Add one to value (124) Store result back in x                               Retrieve value of x (124)                               Add one to value (125)                               Store result back in x

It is possible to perform combinations of thread manipulation operations that will cause a thread to be scheduled even if another thread is in a critical section. In the simplest case, a newly created thread will be run immediately regardless of whether another thread is in a critical section. For this reason, this technique should be used only in the simplest of circumstances.

13.2.2. Synchronizing Access to Resources (mutex.rb)

Let's take a web-indexing application as an example. We retrieve words from multiple sources over the Net and store them in a hash. The word itself will be the key, and the value will be a string that identifies the document and the line number within the document.

This is a crude example. But we will make it even more crude with these simplifying assumptions:

  • We will represent the remote documents as simple strings.

  • We will limit it to three such strings (simple hard-coded data).

  • We will simulate the variability of Net access with random sleeps.

Let's look at the example in Listing 13.1. It doesn't even print out the data it collects but only a (non-unique) count of the number of words found. Note that every time the hash is examined or changed, we call the hesitate method to sleep for a random interval. This will cause the program to run in a less deterministic and more realistic way.

Listing 13.1. Flawed Indexing Example (with a Race Condition)

@list = [] @list[0]="shoes ships\nsealing-wax" @list[1]="cabbages kings" @list[2]="quarks\nships\ncabbages" def hesitate   sleep rand(0) end @hash = {} def process_list(listnum)   lnum = 0   @list[listnum].each do |line|     words = line.chomp.split     words.each do |w|       hesitate       if @hash[w]         hesitate         @hash[w] += ["#{listnum}:#{lnum}"]       else         hesitate         @hash[w] = ["#{listnum}:#{lnum}"]       end     end     lnum += 1   end end t1 = {|num| process_list(num) } t2 = {|num| process_list(num) } t3 = {|num| process_list(num) } t1.join t2.join t3.join count = 0 @hash.values.each {|v| count += v.size } puts "Total: #{count} words"     # May print 7 or 8!

But there is a problem. If your system behaves as ours have, there are two possible numbers this program can output! In our tests, it prints the answers 7 and 8 with approximately equal likelihood. In a situation with more words and more lists, there would be even more variation.

Let's try to fix this with a mutex, which controls access to a shared resource. (The term is derived, of course, from the words mutual exclusion.) See Listing 13.2.

The Mutex library allows us to create and manipulate a mutex. We can lock it when we are about to access the hash and unlock it when we have finished with it.

Listing 13.2. Mutex-Protected Indexing Example

require 'thread.rb' @list = [] @list[0]="shoes ships\nsealing-wax" @list[1]="cabbages kings" @list[2]="quarks\nships\ncabbages" def hesitate   sleep rand(0) end @hash = {} @mutex = def process_list(listnum)   lnum = 0   @list[listnum].each do |line|     words = line.chomp.split     words.each do |w|       hesitate       @mutex.lock         if @hash[w]           hesitate           @hash[w] += ["#{listnum}:#{lnum}"]         else           hesitate           @hash[w] = ["#{listnum}:#{lnum}"]         end       @mutex.unlock     end     lnum += 1   end end t1 = {|num| process_list(num) } t2 = {|num| process_list(num) } t3 = {|num| process_list(num) } t1.join t2.join t3.join count = 0 @hash.values.each {|v| count += v.size } puts "Total: #{count} words"     # Always prints 8!

We should mention that in addition to lock, the Mutex class also has a try_lock method. It behaves the same as lock except that if another thread already has the lock, it will return false immediately rather than waiting.

require 'thread' mutex = t1 = { mutex.lock; sleep 30 } sleep 1 t2 = do   if mutex.try_lock     puts "Locked it"   else     puts "Could not lock"   # Prints immediately   end end sleep 2

This feature is useful any time a thread doesn't want to be blocked.

There is a synchronize method that takes a block.

mutex = mutex.synchronize do   # Whatever code needs to be   #   protected... end

There is also a mutex_m library defining a Mutex_m module, which can be mixed into a class (or used to extend an object). Any such extended object has mutex methods so that the object itself can be treated as a mutex.

require 'mutex_m' class MyClass   include Mutex_m   # Now any MyClass object can call   # lock, unlock, synchronize, ...   # or external objects can invoke   # these methods on a MyClass object. end

13.2.3. Using the Predefined Synchronized Queue Classes

The thread library thread.rb has a couple of classes that will be useful from time to time. The class Queue is a thread-aware queue that synchronizes access to the ends of the queue; that is, different threads may use the same queue without fear of problems. The class SizedQueue is essentially the same, except that it allows a limit to be placed on the size of the queue (the number of elements it can contain).

These have much the same set of methods available because SizedQueue actually inherits from Queue. The descendant also has the accessor max, used to get or set the maximum size of the queue.

buff = upper1 = buff.max            # 25 # Now raise it... buff.max = 50 upper2 = buff.max            # 50

Listing 13.3 is a simple producer/consumer illustration. The consumer is delayed slightly longer on average (through a longer sleep) so that the items will "pile up" a little.

Listing 13.3. The Producer-Consumer Problem

require 'thread' buffer = producer = do   item = 0   loop do     sleep rand 0     puts "Producer makes #{item}"     buffer.enq item     item += 1   end end consumer = do   loop do     sleep (rand 0)+0.9     item = buffer.deq     puts "Consumer retrieves #{item}"     puts "  waiting = #{buffer.num_waiting}"   end end sleep 60   # Run a minute, then die and kill threads

The methods enq and deq are the recommended way to get items into and out of the queue. We can also use push to add to the queue and pop or shift to remove items, but these names have somewhat less mnemonic value when we are explicitly using a queue.

The method empty? will test for an empty queue, and clear will cause a queue to be empty. The method size (or its alias length) will return the actual number of items in the queue.

# Assume no other threads interfering... buff = buff.enq "one" buff.enq "two" buff.enq "three" n1 = buff.size          # 3 flag1 = buff.empty?     # false buff.clear n2 = buff.size          # 0 flag2 = buff.empty?     # true

The num_waiting method is the number of threads waiting to access the queue. In the nonsized queue, this is the number of threads waiting to remove elements; in the sized queue, this also the threads waiting to add elements to the queue.

An optional parameter non_block defaults to false for the deq method in the Queue class. If it is TRue, an empty queue will give a THReadError rather than block the thread.

13.2.4. Using Condition Variables

And he called for his fiddlers three.

"Old King Cole" (traditional folk tune)

A condition variable is really just a queue of threads. It is used in conjunction with a mutex to provide a higher level of control when synchronizing threads.

A condition variable is always associated with a specific mutex; it is used to relinquish control of the mutex until a certain condition has been met. Imagine a situation in which a thread has a mutex locked but cannot continue because the circumstances aren't right. It can sleep on the condition variable and wait to be awakened when the condition is met.

It is important to understand that while a thread is waiting on a condition variable, the mutex is released so that other threads can gain access. It is also important to realize that when another thread does a signal operation (to awaken the waiting thread), the waiting thread reacquires the lock on the mutex.

Let's look at a contrived example in the tradition of the Dining Philosophers. Imagine a table where three violinists are seated, all of whom want to take turns playing. However, there are only two violins and only one bow. Obviously a violinist can play only if she has one of the violins and the lone bow at the same time.

We keep a count of the violins and bows available. When a player wants a violin or a bow, she must wait for it. In our code, we protect the test with a mutex and do separate waits for the violin and the bow, both associated with that mutex. If a violin or a bow is not available, the thread sleeps. It loses the mutex until it is awakened by another thread signaling that the resource is available, whereupon the original thread wakes up and once again owns the lock on the mutex.

Listing 13.4 shows the code.

Listing 13.4. The Three Violinists

require 'thread' @music  = @violin = @bow    = @violins_free = 2 @bows_free    = 1 def musician(n)   loop do     sleep rand(0)     @music.synchronize do       @violin.wait(@music) while @violins_free == 0       @violins_free -= 1       puts "#{n} has a violin"       puts "violins #@violins_free, bows #@bows_free"       @bow.wait(@music) while @bows_free == 0       @bows_free -= 1       puts "#{n} has a bow"       puts "violins #@violins_free, bows #@bows_free"     end     sleep rand(0)     puts "#{n}:  (...playing...)"     sleep rand(0)     puts "#{n}: Now I've finished."     @music.synchronize do       @violins_free += 1       @violin.signal if @violins_free == 1       @bows_free += 1       @bow.signal if @bows_free == 1     end   end end threads = [] 3.times {|i| threads << { musician(i) } } threads.each {|t| t.join }

We believe that this solution will never deadlock, though we've found it difficult to prove. But it is interesting to note that this algorithm is not a fair one. In our tests, the first player always got to play more often than the other two, and the second more often than the third. The cause and cure for this behavior are left as an interesting exercise.

13.2.5. Using Other Synchronization Techniques

Yet another synchronization mechanism is the monitor, implemented in Ruby in the form of the monitor.rb library. This technique is somewhat more advanced than the mutex; notably, a mutex lock cannot be nested, but a monitor lock can.

The trivial case of this would never occur. That is, no one would ever write the following:

@mutex = @mutex.synchronize do   @mutex.synchronize do     #...   end end

But it might happen this way (or through a recursive method call). The result is deadlock in any of these situations. Avoiding deadlock in this circumstance is one of the advantages of the Monitor mixin.

@mutex = def some_method   @mutex.synchronize do     #...     some_other_method    # Deadlock!   end end def some_other_method   @mutex.synchronize do     #...   end end

The Monitor mixin is typically used to extend any object. The new_cond method can then be used to instantiate a condition variable.

The class ConditionVariable in monitor.rb), is enhanced from the definition in the thread library. It has methods wait_until and wait_while, which will block a thread based on a condition. It also allows a timeout while waiting because the wait method has a timeout parameter, which is a number of seconds (defaulting to nil).

Because we are rapidly running out of thread examples, Listing 13.5 presents to you a rewrite of the Queue and SizedQueue classes using the monitor technique. The code is by Shugo Maeda, used with permission.

Listing 13.5. Implementing a Queue with a Monitor

# Author:  Shugo Maeda require 'monitor' class Queue   def initialize     @que = []     @monitor =     @empty_cond = @monitor.new_cond   end   def enq(obj)     @monitor.synchronize do       @que.push(obj)       @empty_cond.signal     end   end   def deq     @monitor.synchronize do       while @que.empty?         @empty_cond.wait       end       return @que.shift     end   end end class SizedQueue < Queue   attr :max   def initialize(max)     super()     @max = max     @full_cond = @monitor.new_cond   end   def enq(obj)     @monitor.synchronize do       while @que.length >= @max         @full_cond.wait       end       super(obj)     end   end   def deq     @monitor.synchronize do       obj = super       if @que.length < @max         @full_cond.signal       end       return obj     end   end   def max=(max)     @monitor.synchronize do       @max = max       @full_cond.broadcast     end   end  end

The sync.rb library is one more way of performing thread synchronization (using a two-phase lock with a counter). It defines a Sync_m module used in an include or an extend (much like Mutex_m). This module makes available methods such as locked?, shared?, exclusive?, lock, unlock, and TRy_lock.

13.2.6. Allowing Timeout of an Operation

There are many situations in which we want to allow a maximum length of time for an action to be performed. This avoids infinite loops and allows an additional level of control over processing. A feature like this is useful in the environment of the Net, where we may or may not get a response from a distant server, and in other circumstances.

The timeout.rb library is a thread-based solution to this problem (see Listing 13.6). The timeout method executes the block associated with the method call; when the specified number of seconds has elapsed, it throws a TimeoutError, which can be caught with a rescue clause.

Listing 13.6. A Timeout Example

require 'timeout.rb' flag = false answer = nil begin   timeout(5) do     puts "I want a cookie!"     answer = gets.chomp     flag = true   end rescue TimeoutError   flag = false end if flag   if answer == "cookie"     puts "Thank you! Chomp, chomp, ..."   else     puts "That's not a cookie!"     exit   end else   puts "Hey, too slow!"   exit end puts "Bye now..."

13.2.7. Waiting for an Event

In many situations we might want to have one or more threads monitoring the "outside world" while other threads are doing other things. The examples here are all rather contrived, but they do illustrate the general principle.

In the following example we see three threads doing the "work" of an application. Another thread simply wakes up every five seconds, checks the global variable $flag, and wakes up two other threads when it sees the flag set. This saves the three worker threads from interacting directly with the two other threads and possibly making multiple attempts to awaken them.

$flag = false work1 = { job1() } work2 = { job2() } work3 = { job3() } thread4 = { Thread.stop; job4() } thread5 = { Thread.stop; job5() } watcher = do   loop do     sleep 5     if $flag       thread4.wakeup       thread5.wakeup       Thread.exit     end   end end

If at any point during the execution of the job methods the variable $flag becomes true, thread4 and thread5 are guaranteed to start within five seconds. After that, the watcher tHRead terminates.

In this next example we are waiting for a file to be created. We check every 30 seconds for it, and start another thread if we see it; meanwhile, other threads can be doing anything at all. Actually, we are watching for three separate files here.

def waitfor(filename)   loop do     if File.exist? filename       file_processor = { process_file(filename) }       Thread.exit     else       sleep 30     end   end end waiter1 = { waitfor("Godot") } sleep 10 waiter2 = { waitfor("Guffman") } sleep 10 headwaiter = { waitfor("head") } # Main thread goes off to do other things...

There are many other situations in which a thread might wait for an outside event, such as a networked application where the server at the other end of a socket is slow or unreliable.

13.2.8. Continuing Processing During I/O

Frequently an application may have one or more I/O operations that are lengthy or time-consuming. This is especially true in the case of user input because a user typing at a keyboard is slower even than any disk operation. We can make use of this time by using threads.

Consider the case of a chess program that must wait for the human player to make his or her move. Of course, we present here only the barest outline of this concept.

We assume that the iterator predict_move will repeatedly generate likely moves that the person might make (and then determine the program's own responses to those moves). Then when the person moves, it is possible that the move has already been anticipated.

scenario = {}    # move-response hash humans_turn = true thinking_ahead = do   predict_move do |m|     scenario[m] = my_response(board,m)     Thread.exit if humans_turn == false   end end human_move = get_human_move(board) humans_turn = false   # Stop the thread gracefully # Now we can access scenario which may contain the # move the person just made...

Of course, real chess programs don't usually work this way.

13.2.9. Implementing Parallel Iterators

Imagine that you wanted to iterate in parallel over more than one object. That is, for each of n objects, you want the first item of each of them, then the second item of each, then the third, and so on.

To make this a little more concrete, look at the following example. Here we assume that compose is the name of the magic method that provides a composition of iterators. We also assume that every object specified has a default iterator each that will be used and that each object contributes one item at a time.

arr1 = [1, 2, 3, 4] arr2 = [5, 10, 15, 20] compose(arr1, arr2) {|a,b| puts "#{a} and #{b}" } # Should output: # 1 and 5 # 2 and 10 # 3 and 15 # 4 and 20

We could use zip for this, of course. But if we want a more elegant solution, one that does not actually store all the items, threads are the only easy way.

We offer our solution in Listing 13.7.

Listing 13.7. Iterating in Parallel

def compose(*objects)   threads = []   for obj in objects do     threads << do |myobj|       me = Thread.current       me[:queue] = []       myobj.each {|x| me[:queue].push(x) }     end   end   list = [0]                           # Dummy non-nil value   while list.nitems > 0 do          # Still some non-nils     list = []     for thr in threads       list << thr[:queue].shift        # Remove one from each     end     yield list if list.nitems > 0   # Don't yield all nils   end end x = [1, 2, 3, 4, 5, 6, 7, 8] y = "  first\n second\n  third\n fourth\n  fifth\n" z = %w[a b c d e f] compose(x, y, z) {|a,b,c| p [a, b, c] } # Output: # # [1, "  first\n", "a"] # [2, " second\n", "b"] # [3, "  third\n", "c"] # [4, " fourth\n", "d"] # [5, "  fifth\n", "e"] # [6, nil, "f"] # [7, nil, nil] # [8, nil, nil]

Notice that we do not assume that the objects all have the same number of items over which to iterate. If an iterator "runs out" before the others, it will generate nil values until the longest-running iterator has exhausted itself.

Of course, it is possible to write a more general method that will grab more than one value from each iteration. (After all, not all iterators return just one value at a time.) We could let the first parameter specify the number of values per iterator.

It would also be possible to use arbitrary iterators (rather than the default each). We might pass in their names as strings and use send to invoke them. Other tricks also could be performed.

However, we think that the example given here is adequate for most circumstances. The other variations we will leave as an exercise for the reader.

13.2.10. Recursive Deletion in Parallel

Just for fun, let's write some code to delete an entire directory tree, and let's make it concurrent. The recursive deletion routine appears here in a threaded form. When we find that a directory entry is itself a directory, we start a new thread to traverse that directory and delete its contents.

As we go along, we keep track of the threads we've created in an array called threads; because this is a local variable, each thread will have its own copy of the array. It can be accessed by only one thread at a time, and there is no need to synchronize access to it.

Note also that we pass fullname into the thread block so that we don't have to worry about the thread accessing a variable that is changing. The thread uses fn as a local copy of the same variable.

When we have traversed an entire directory, we want to wait on the threads we have created before deleting the directory we've just finished working on.

def delete_all(dir)   threads = []   Dir.foreach(dir) do |e|     next if [".",".."].include? e          # Skip . and ..     fullname = dir + "/" + e     if       threads << {|fn| delete_all(fn) }     else       File.delete(fullname)     end   end   threads.each { |t| t.join }   Dir.delete(dir) end delete_all("/tmp/stuff")

Is this actually faster than the nonthreaded version? We've found that the answer is not consistent. It probably depends on your operating system as well as on the actual directory structure being deletedthat is, its depth, size of files, and so on.

The Ruby Way(c) Solutions and Techniques in Ruby Programming
The Ruby Way, Second Edition: Solutions and Techniques in Ruby Programming (2nd Edition)
ISBN: 0672328844
EAN: 2147483647
Year: 2004
Pages: 269
Authors: Hal Fulton

Similar book on Amazon © 2008-2017.
If you may any questions please contact us: