Limiting Multithreading with a Thread Pool

Problem

You want to process multiple requests in parallel, but you don necessarily want to run all the requests simultaneously. Using a technique like that in Recipe 20.6 can create a huge number of threads running at once, slowing down the average response time. You want to set a limit on the number of simultaneously running threads.

Solution

You want a thread pool. If you e writing an Internet server and you want to service requests in parallel, you should build your code on top of the gserver module, as seen in Recipe 14.14: it has a thread pool and many TCP/IP-specific features. Otherwise, heres a generic THReadPool class, based on code from gserver.

The instance variable @pool contains the active threads. The Mutex and the ConditionVariable are used to control the addition of threads to the pool, so that the pool never contains more than @max_size tHReads:

	require 	hread

	class ThreadPool
	 def initialize(max_size)
	 @pool = []
	 @max_size = max_size
	 @pool_mutex = Mutex.new
	 @pool_cv = ConditionVariable.new
	end

When a thread wants to enter the pool, but the pool is full, the thread puts itself to sleep by calling ConditionVariable#wait. When a thread in the pool finishes executing, it removes itself from the pool and calls ConditionVariable#signal to wake up the first sleeping thread:

	def dispatch(*args)
	 Thread.new do
	 # Wait for space in the pool.
	 @pool_mutex.synchronize do
	 while @pool.size >= @max_size
	 print "Pool is full; waiting to run #{args.join(,)}…
" if $DEBUG
	 # Sleep until some other thread calls @pool_cv.signal.
	 @pool_cv.wait(@pool_mutex)
	 end
	 end

The newly-awakened thread adds itself to the pool, runs its code, and then calls ConditionVariable#signal to wake up the next sleeping thread:

	 @pool << Thread.current
	 begin
	 yield(*args)
	 rescue => e
	 exception(self, e, *args)
	 ensure
	 @pool_mutex.synchronize do
	 # Remove the thread from the pool.
	 @pool.delete(Thread.current)
	 # Signal the next waiting thread that theres a space in the pool.
	 @pool_cv.signal
	 end
	 end
	 end
	 end

	 def shutdown
	 @pool_mutex.synchronize { @pool_cv.wait(@pool_mutex) until @pool.empty? }
	 end

	 def exception(thread, exception, *original_args)
	 # Subclass this method to handle an exception within a thread.
	 puts "Exception in thread #{thread}: #{exception}"
	 end
	end

Heres a simulation of five incoming jobs that take different times to run. The pool ensures no more than three jobs run at a time. The job code doesn need to know anything about threads or thread pools; thats all handled by THReadPool#dispatch.

	$DEBUG = true
	pool = ThreadPool.new(3)

	1.upto(5) do |i|
	 pool.dispatch(i) do |i|
	 print "Job #{i} started.
"
	 sleep(5-i)
	 print "Job #{i} complete.
"
	 end
	end
	# Job 1 started.
	# Job 3 started.
	# Job 2 started.
	# Pool is full; waiting to run 4…
	# Pool is full; waiting to run 5…
	# Job 3 complete.
	# Job 4 started.
	# Job 2 complete.
	# Job 5 started.
	# Job 5 complete.
	# Job 4 complete.
	# Job 1 complete.

	pool.shutdown

Discussion

When should you use a thread pool, and when should you just send a swarm of threads after the problem? Consider why this pattern is so common in Internet servers that its built into Rubys gserver library. Internet server requests are usually I/O bound, because most servers operate on the filesystem or a database. If you run high latency requests in parallel (like requests for filesystem files), you can complete multiple requests in about the same time it would take to complete a single request.

But Internet server requests can use a lot of memory, and any random user on the Internet can trigger a job on your server. If you create and start a thread for every incoming request, its easy to run out of resources. You need to find a tradeoff between the performance benefit of multithreading and the performance hazard of thrashing due to insufficient resources. The simplest way to do this is to limit the number of requests that can be processed at a given time.

A thread pool isn a connection pool, like you might see with a database. Database connections are often pooled because they e expensive to create. Threads are pretty cheap; we just don want a lot of them actively running at once. The example in the Solution creates five threads at once, but only three of them can be active at any one time. The rest are asleep, waiting for a notification from the condition variable pool_cv.

Calling ThreadPool#dispatch with a code block creates a new thread that runs the code block, but not until it finds a free slot in the thread pool. Until then, its waiting on the condition variable @pool_cv. When one of the threads in the pool completes its code block, it calls signal on the condition variable, waking up the first thread currently waiting on it.

The shutdown method makes sure all the jobs complete by repeatedly waiting on the condition variable until no other threads want access to the pool.

See Also

  • Recipe 14.14, "Writing an Internet Server"


Strings

Numbers

Date and Time

Arrays

Hashes

Files and Directories

Code Blocks and Iteration

Objects and Classes8

Modules and Namespaces

Reflection and Metaprogramming

XML and HTML

Graphics and Other File Formats

Databases and Persistence

Internet Services

Web Development Ruby on Rails

Web Services and Distributed Programming

Testing, Debugging, Optimizing, and Documenting

Packaging and Distributing Software

Automating Tasks with Rake

Multitasking and Multithreading

User Interface

Extending Ruby with Other Languages

System Administration



Ruby Cookbook
Ruby Cookbook (Cookbooks (OReilly))
ISBN: 0596523696
EAN: 2147483647
Year: N/A
Pages: 399

Flylib.com © 2008-2020.
If you may any questions please contact us: flylib@qtcs.net