Section 13.2. Synchronizing Threads


13.1. Creating and Manipulating Threads

The most basic operations on threads include creating a thread, passing information in and out, stopping a thread, and so on. We can also obtain lists of threads, check the state of a thread, and check various other information.

We present an overview of these basic operations here.

13.1.1. Creating Threads

Creating a thread is easy. Simply call the new method and attach a block that will be the body of the thread.

thread = Thread.new do   # Statements comprising   #   the thread... end


The value returned is obviously an object of type Thread. This is usable by the main thread to control the thread it has created.

What if we want to pass parameters into a thread? We can do this by passing parameters into Thread.new, which in turn will pass them into the block.

a = 4 b = 5 c = 6 thread2 = Thread.new(a,b,c) do |a, x, y|   # Manipulate a, x, and y as needed. end # Note that if a is changed in the new thread, it will #   change suddenly and without warning in the main #   thread.


Like any other block parameters, any of these that correspond to existing variables will be effectively identical to those variables. The variable a in the preceding code fragment is a "dangerous" variable in this sense, as the comment points out.

Threads may also access variables from the scope in which they were created. Obviously without synchronization this can be problematic. The main thread and one or more other threads may modify the variable independently of each other, and the results may be unpredictable.

x = 1 y = 2 thread3 = Thread.new do   # This thread can manipulate x and y from the outer scope,   #   but this is not always safe.   sleep(rand(0))  # Sleep a random fraction of a second.   x = 3 end sleep(rand(0)) puts x # Running this code repeatedly, x may be 1 or 3 when it #   is printed here!


The method fork is an alias for new; this name is derived from the well-known UNIX system call of the same name.

13.1.2. Accessing Thread-local Variables

We know that it can be dangerous for a thread to use variables from outside its scope; we know also that a thread can have local data of its own. But what if a thread wants to "make public" some of the data that it owns?

There is a special mechanism for this purpose. If a thread object is treated as a hash, thread-local data can be accessed from anywhere within the scope of that thread object. We don't mean that actual local variables can be accessed in this way, but only that we have access to named data on a per-thread basis.

There is also a method called key? that will tell us whether a given name is in use for this thread.

Within the thread, we must refer to the data in the same hashlike way. Using Thread.current will make this a little less unwieldy.

thread = Thread.new do   t = Thread.current   t[:var1] = "This is a string"   t[:var2] = 365 end # Access the thread-local data from outside... x = thread[:var1]               # "This is a string" y = thread[:var2]               # 365 has_var2 = thread.key?("var2")  # true has_var3 = thread.key?("var3")  # false


Note that these data are accessible from other threads even after the thread that owned them is dead (as in this case).

Besides a symbol (as we just saw) we can also use a string to identify the thread-local variable.

thread = Thread.new do   t = Thread.current   t["var3"] = 25   t[:var4] = "foobar" end a = thread[:var3] = 25 b = thread["var4"] = "foobar"


Don't confuse these special names with actual local variables. The following code fragment illustrates the difference a little more clearly:

thread = Thread.new do   t = Thread.current   t["var3"] = 25   t[:var4] = "foobar"   var3 = 99            # True local variables (not   var4 = "zorch"       # accessible from outside) end a = thread[:var3]      # 25 b = thread["var4"]     # "foobar"


Finally, note that an object reference (to a true local variable) can be used as a sort of shorthand within the thread. This is true as long as you are careful to preserve the same object reference rather than create a new one.

thread = Thread.new do   t = Thread.current   x = "nXxeQPdMdxiBAxh"   t[:my_message] = x   x.reverse!   x.delete! "x"   x.gsub!(/[A-Z]/,"")   # On the other hand, assignment would create a new   # object and make this shorthand useless... end a = thread[:my_message]   # "hidden"


Also, this shortcut will obviously not work when you are dealing with values such as Fixnums, which are stored as immediate values rather than object references.

13.1.3. Querying and Changing Thread Status

The THRead class has several class methods that serve various purposes. The list method returns an array of all living threads; the main method returns a reference to the main thread, which spawns the others; and the current method allows a thread to find its own identity.

t1 = Thread.new { sleep 100 } t2 = Thread.new do   if Thread.current == Thread.main     puts "This is the main thread."   # Does NOT print   end   1.upto(1000) { sleep 0.1 } end count = Thread.list.size              # 3 if Thread.list.include?(Thread.main)   puts "Main thread is alive."        # Always prints! end if Thread.current == Thread.main   puts "I'm the main thread."         # Prints here... end


The exit, pass, start, stop, and kill methods are used to control the execution of threads (often from inside or outside):

# In the main thread... Thread.kill(t1)          # Kill this thread now Thread.pass              # Pass execution to t2 now t3 = Thread.new do   sleep 20   Thread.exit            # Exit the thread   puts "Can't happen!"   # Never reached end Thread.kill(t2)          # Now kill t2 # Now exit the main thread (killing any others) Thread.exit


Note that there is no instance method stop, so a thread can stop itself but not another thread.

There are various methods for checking the state of a thread. The instance method alive? will tell whether the thread is still "living" (not exited), and stop? will tell whether the thread is in a stopped state.

count = 0 t1 = Thread.new { loop { count += 1 } } t2 = Thread.new { Thread.stop } sleep 1 flags = [t1.alive?,    # true          t1.stop?,     # false          t2.alive?,    # true          t2.stop?]     # true


The status of a thread may be determined using the status method. The value returned will be "run" if the thread is currently running; sleep if it is stopped, sleeping, or waiting on I/O; false if it terminated normally; and nil if it terminated with an exception.

t1 = Thread.new { loop {} } t2 = Thread.new { sleep 5 } t3 = Thread.new { Thread.stop } t4 = Thread.new { Thread.exit } t5 = Thread.new { raise "exception" } s1 = t1.status    # "run" s2 = t2.status    # "sleep" s3 = t3.status    # "sleep" s4 = t4.status    # false s5 = t5.status    # nil


The global variable $SAFE may be set differently in different threads. In this sense, it is not truly a global variable at all, but we should not complain because this allows us to have threads run with different levels of safety. The safe_level method will tell us at what level a thread is running.

t1 = Thread.new { $SAFE = 1; sleep 5 } t2 = Thread.new { $SAFE = 3; sleep 5 } sleep 1 lev0 = Thread.main.safe_level    # 0 lev1 = t1.safe_level             # 1 lev2 = t2.safe_level             # 3


The priority of a thread may be examined and changed using the priority accessor:

t1 = Thread.new { loop { sleep 1 } } t2 = Thread.new { loop { sleep 1 } } t2.priority = 3    # Set t2 at priority 3 p1 = t1.priority   # 0 p2 = t2.priority   # 3


A thread with higher (numerically greater) priority will be scheduled more often.

The special method pass is used when a thread wants to yield control to the scheduler. The thread merely yields its current timeslice; it doesn't actually stop or go to sleep.

t1 = Thread.new do   puts "alpha"   Thread.pass   puts "beta" end t2 = Thread.new do   puts "gamma"   puts "delta" end t1.join t2.join


In this contrived example, we get output in the order alpha gamma delta beta when Thread.pass is called as shown. Without it we get alpha beta gamma delta as the order. Of course, this feature should not be used for synchronization but only for the thrifty allocation of timeslices.

A thread that is stopped may be awakened by use of the run or wakeup methods:

t1 = Thread.new do   Thread.stop   puts "There is an emerald here." end t2 = Thread.new do   Thread.stop   puts "You're at Y2." end sleep 1 t1.wakeup t2.run


The difference in these is subtle. The wakeup call will change the state of the thread so that it is runnable but will not schedule it to be run; on the other hand, run will wake up the thread and schedule it for immediate running.

In this particular case, the result is that t1 wakes up before t2, but t2 gets scheduled first, producing the following output:

You're at Y2. There is an emerald here.


Of course, it would be unwise to attempt true synchronization by depending on this subtlety.

The raise instance method will raise an exception in the thread specified as the receiver. (The call does not have to originate within the thread.)

factorial1000 = Thread.new do   begin     prod = 1     1.upto(1000) {|n| prod *= n }     puts "1000! = #{prod}"   rescue     # Do nothing...   end end sleep 0.01               # Your mileage may vary. if factorial1000.alive?   factorial1000.raise("Stop!")   puts "Calculation was interrupted!" else   puts "Calculation was successful." end


The thread spawned in the preceding example tries to calculate the factorial of 1,000; if it doesn't succeed within a hundredth of a second, the main thread will kill it. Thus on a relatively slow machine, this code fragment will print the message Calculation was interrupted! As for the rescue clause inside the thread, obviously we could have put any code there that we wanted, as with any other such clause.

13.1.4. Achieving a Rendezvous (and Capturing a Return Value)

Sometimes the main thread wants to wait for another thread to finish. The instance method join will accomplish this.

t1 = Thread.new { do_something_long() } do_something_brief() t1.join              # Wait for t1


Note that a join is necessary if the main thread is to wait on another thread. When the main thread exits, a thread is killed otherwise. For example, the following code fragment would never give us its final answer without the join at the end:

meaning_of_life = Thread.new do   puts "The answer is..."   sleep 10   puts 42 end sleep 9 meaning_of_life.join


Here is a useful little idiom. It will call join on every living thread except the main one. (It is an error for any thread, even the main thread, to call join on itself.)

Thread.list.each { |t| t.join if t != Thread.main }


It is, of course, possible for one thread to do a join on another when neither is the main thread. If the main thread and another attempt to join each other, a deadlock results; the interpreter will detect this case and exit the program.

thr = Thread.new { sleep 1; Thread.main.join } thr.join         # Deadlock results!


A thread has an associated block, and a block can have a return value. This implies that a thread can return a value. The value method will implicitly do a join operation and wait for the thread to complete; then it will return the value of the last evaluated expression in the thread.

max = 10000 thr = Thread.new do   sum = 0   1.upto(max) { |i| sum += i }   sum end guess = (max*(max+1))/2 print "Formula is " if guess == thr.value   puts "right." else   puts "wrong." end


13.1.5. Dealing with Exceptions

What happens if an exception occurs within a thread? As it turns out, the behavior is configurable.

There is a flag called abort_on_exception that operates both at the class and instance levels. This is implemented as an accessor at both levels (that is, it is readable and writable).

In short, if abort_on_exception is true for a thread, an exception in that thread will terminate all the other threads also.

Thread.abort_on_exception = true t1 = Thread.new do   puts "Hello"   sleep 2   raise "some exception"   puts "Goodbye" end t2 = Thread.new { sleep 100 } sleep 2 puts "The End"


In the preceding code, the systemwide abort_on_exception is set to TRue (overriding the default). Thus when t1 gets an exception, t1 and the main thread are also killed. The word Hello is the only output generated.

In this next example, the effect is the same:

t1 = Thread.new do   puts "Hello"   sleep 2   raise "some exception"   puts "Goodbye" end t1.abort_on_exception = true t2 = Thread.new { sleep 100 } sleep 2 puts "The End"


In the following example, the default of false is assumed, and we finally get to see the output The End from the main thread. (We never see Goodbye because t1 is always terminated when the exception is raised.)

t1 = Thread.new do   puts "Hello"   sleep 2   raise "some exception"   puts "Goodbye" end t2 = Thread.new { sleep 100 } sleep 2 puts "The End" # Output: Hello The End


13.1.6. Using a Thread Group

A thread group is a way of managing threads that are logically related to each other. Normally all threads belong to the Default tHRead group (which is a class constant). But if a new thread group is created, new threads may be added to it.

A thread may be in only one thread group at a time. When a thread is added to a thread group, it is automatically removed from whatever group it was in previously.

The ThreadGroup.new class method will create a new thread group, and the add instance method will add a thread to the group.

f1 = Thread.new("file1") { |file| waitfor(file) } f2 = Thread.new("file2") { |file| waitfor(file) } file_threads = ThreadGroup.new file_threads.add f1 file_threads.add f2


The instance method list will return an array of all the threads in the thread group.

# Count living threads in this_group count = 0 this_group.list.each {|x| count += 1 if x.alive? } if count < this_group.list.size   puts "Some threads in this_group are not living." else   puts "All threads in this_group are alive." end


There is plenty of room for useful methods to be added to THReadGroup. The following example shows methods to wake up every thread in a group, to wait for all threads to catch up (via join), and to kill all threads in a group:

class ThreadGroup   def wakeup     list.each { |t| t.wakeup }   end   def join     list.each { |t| t.join if t != Thread.current }   end   def kill     list.each { |t| t.kill }   end end





The Ruby Way(c) Solutions and Techniques in Ruby Programming
The Ruby Way, Second Edition: Solutions and Techniques in Ruby Programming (2nd Edition)
ISBN: 0672328844
EAN: 2147483647
Year: 2004
Pages: 269
Authors: Hal Fulton

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net