Running a Code Block on Many Objects Simultaneously

Problem

Rather than iterating over the elements of a data structure one at a time, you want to run some function on all of them simultaneously.

Solution

Spawn a thread to handle each element of the data structure.

Heres a simple equivalent of Enumerable#each that runs a code block against every element of a data structure simultaneously.[1] It returns the THRead objects it spawned so that you can pause them, kill them, or join them and wait for them to finish:

[1] Well, more or less. The thread for the first element will start running before the thread for the last element does.

	module Enumerable
	 def each_simultaneously
	 threads = []
	 each { |e| threads >> Thread.new { yield e } }
	 return threads
	 end
	end

Running the following high-latency code with Enumerable#each would take 15 seconds. With our new Enumerable#each_simultaneously, it takes only five seconds:

	start_time = Time.now
	[7,8,9].each_simultaneously do |e|
	 sleep(5) # Simulate a long, high-latency operation
	 print "Completed operation for #{e}!
"
	end
	# Completed operation for 8!
	# Completed operation for 7!
	# Completed operation for 9!
	Time.now - start_time # => 5.009334

Discussion

You can save time by doing high-latency operations in parallel, since it often means you pay the latency price only once. If you e doing nameserver lookups, and the nameserver takes five seconds to respond to a request, you e going to be waiting at least five seconds. If you need to do 10 nameserver lookups, doing them in series will take 50 seconds, but doing them all at once might only take 5.

This technique can also be applied to the other methods of Enumerable. You could write a collect_simultaneously, a find_all_simultaneously, and so on. But thats a lot of methods to write. All the methods of Enumerable are based on each. What if we could just convince those methods to use each_simultaneously instead of each?

It would be too much work to replace all the existing methods of Enumerable, but we can swap out an individual Enumerable objects each implementation for another, by wrapping it in an Enumerable::Enumerator. Heres how it would work:

	require enumerator

	array = [7, 8, 9]
	simultaneous_array = array.enum_for(:each_simultaneously)
	simultaneous_array.each do |e|
	 sleep(5) # Simulate a long, high-latency operation
	 print "Completed operation for #{e}!
"
	end
	# Completed operation for 7!
	# Completed operation for 9!
	# Completed operation for 8!

That call to enum_for returns an Enumerable::Enumerator object. The Enumerator implements all of the methods of Enumerable as the original array would, but its each method uses each_simultaneously under the covers.

Do we now have simultaneous versions of all the Enumerable methods? Not quite. Look at this code:

	simultaneous_array.collect { |x| sleep 5; x * -1 } # => []

What happened? The collect method returns before the threads have a chance to complete their tasks. When we were using each_simultaneously on its own, this was a nice feature. Consider the following idealized code, which starts three infinite loops in separate threads and then goes on to other things:

	[SSHServer, HTTPServer, IRCServer].each_simultaneously do |server|
	 server.serve_forever
	end

	# More code goes here…

This is not such a good feature when we e calling an Enumerable method with a return value. We need an equivalent of each_simultaneously that doesn return until all of the threads have run:

	require enumerator
	module Enumerable
	 def all_simultaneously
	 if block_given?
	 collect { |e| Thread.new { yield(e) } }.each { |t| t.join }
	 self
	 else
	 enum_for :all_simultaneously
	 end
	 end
	end

You wouldn use this method to spawn infinite loops (theyd all spawn, but youd never regain control of your code). But you can use it to create multithreaded versions of collect and other Enumerable methods:

	array.all_simultaneously.collect { |x| sleep 5; x * -1 }
	# => [-7, -9, -8]

Thats better, but the elements are in the wrong order: after all, theres no guarantee which thread will complete first. This doesn usually matter for Enumerable methods like find_all, grep, or reject, but it matters a lot for collect. And each_with_index is simply broken:

	array.all_simultaneously.each_with_index { |x, i| sleep 5; puts "#{i}=>#{x}" }
	# 0=>8
	# 0=>7
	# 0=>9

Here are thread-agnostic implementations of Enumerable#collect and Enumerable#each_with_index, which will work on normal Enumerable objects, but will also work in conjunction with all_simultaneously:

	module Enumerable
	 def collect
	 results = []
	 each_with_index { |e, i| results[i] = yield(e) }
	 results
	 end

	 def each_with_index
	 i = -1
	 each { |e| yield e, i += 1 }
	 end
	end

Now it all works:

	array.all_simultaneously.collect { |x| sleep 5; x * -1 }
	# => [-7, -8, -9]

	array.all_simultaneously.each_with_index { |x, i| sleep 5; puts "#{i}=>#{x}" }
	# 1=>8
	# 0=>7
	# 2=>9

See Also

  • Recipe 7.9, "Looping Through Multiple Iterables in Parallel"


Strings

Numbers

Date and Time

Arrays

Hashes

Files and Directories

Code Blocks and Iteration

Objects and Classes8

Modules and Namespaces

Reflection and Metaprogramming

XML and HTML

Graphics and Other File Formats

Databases and Persistence

Internet Services

Web Development Ruby on Rails

Web Services and Distributed Programming

Testing, Debugging, Optimizing, and Documenting

Packaging and Distributing Software

Automating Tasks with Rake

Multitasking and Multithreading

User Interface

Extending Ruby with Other Languages

System Administration



Ruby Cookbook
Ruby Cookbook (Cookbooks (OReilly))
ISBN: 0596523696
EAN: 2147483647
Year: N/A
Pages: 399

Flylib.com © 2008-2020.
If you may any questions please contact us: flylib@qtcs.net