Implementing a Distributed Queue

Credit: James Edward Gray II

Problem

You want to use a central server as a workhorse, queueing up requests from remote clients and handling them one at a time.

Solution

Heres a method that shares a Queue object with clients. Clients put job objects into the queue, and the server handles them by yielding them to a code block. #!/usr/bin/ruby

	#!/usr/bin/ruby
	# queue_server.rb

	require 	hread # For Rubys thread-safe Queue
	require drb

	$SAFE = 1 # Minimum acceptable paranoia level when sharing code!

	def run_queue(url=druby://127.0.0.1:61676)
	 queue = Queue.new # Containing the jobs to be processed

	 # Start up DRb with URI and object to share
	 DRb.start_service(url, queue)
	 puts Listening for connection…
	 while job = queue.deq
	 yield job
	 end
	end

Have your server call run_queue, passing in a code block that handles a single job. Every time one of your clients puts a job into the server queue, the server passes the job into the code block. Heres a sample code block that can handle a fast-running job ("Report") or a slow-running job ("Process"):

	run_queue do |job|
	 case job[
equest]
	 when Report
	 puts "Reporting for #{job[from]}… Done."
	 when Process
	 puts "Processing for #{job[from]}…"
	 sleep 3 # Simulate real work
	 puts Processing complete.
	 end
	end

If we get a couple of clients sending in requests, output might look like this:

	$ ruby queue_server.rb
	Listening for connection…

	Processing for Client 1…
	Processing complete.
	Processing for Client 2…
	Processing complete.
	Reporting for Client 1… Done.
	Reporting for Client 2… Done.
	Processing for Client 1…
	Processing complete.
	Reporting for Client 2… Done.
	…

Discussion

A client for the queue server defined in the Solution simply needs to connect to the DRB server and add a mix of "Report" and "Process" jobs to the queue. Heres a client that connects to the DRb server and adds 20 jobs to the queue at random:

	#!/usr/bin/ruby
	# queue_client.rb

	require 	hread
	require drb

	# Get a unique name for this client
	NAME = ARGV.shift or raise "Usage: #{File.basename($0)} CLIENT_NAME"

	DRb.start_service
	queue = DRbObject.new_with_uri("druby://127.0.0.1:61676")

	20.times do
	 queue.enq(
equest => [Report, Process][rand(2)], from => NAME)
	 sleep 1 # simulating network delays
	end

Everything from Recipe 16.10 applies here. The major difference is that Ruby ships with a thread-safe Queue. That saves us the trouble of building our own.

See Also

  • Recipe 16.10


Strings

Numbers

Date and Time

Arrays

Hashes

Files and Directories

Code Blocks and Iteration

Objects and Classes8

Modules and Namespaces

Reflection and Metaprogramming

XML and HTML

Graphics and Other File Formats

Databases and Persistence

Internet Services

Web Development Ruby on Rails

Web Services and Distributed Programming

Testing, Debugging, Optimizing, and Documenting

Packaging and Distributing Software

Automating Tasks with Rake

Multitasking and Multithreading

User Interface

Extending Ruby with Other Languages

System Administration



Ruby Cookbook
Ruby Cookbook (Cookbooks (OReilly))
ISBN: 0596523696
EAN: 2147483647
Year: N/A
Pages: 399

Flylib.com © 2008-2020.
If you may any questions please contact us: flylib@qtcs.net