Recipe9.4.Working with a Thread Pool


Recipe 9.4. Working with a Thread Pool

Credit: John Nielsen, Justin A

Problem

You want your main thread to be able to farm out processing tasks to a pool of worker threads.

Solution

The Queue.Queue type is the simplest and most effective way to coordinate a pool of worker threads. We could group all the needed data structures and functions into a class, but there's no real need to. So, here they are, shown as globals instead:

import threading, Queue, time, sys # Globals (start with a capital letter) Qin  = Queue.Queue( ) Qout = Queue.Queue( ) Qerr = Queue.Queue( ) Pool = [  ] def report_error( ):     ''' we "report" errors by adding error information to Qerr '''     Qerr.put(sys.exc_info( )[:2]) def get_all_from_queue(Q):     ''' generator to yield one after the others all items currently         in the Queue Q, without any waiting     '''     try:         while True:             yield Q.get_nowait( )     except Queue.Empty:         raise StopIteration def do_work_from_queue( ):     ''' the get-some-work, do-some-work main loop of worker threads '''     while True:         command, item = Qin.get( )       # implicitly stops and waits         if command == 'stop':             break         try:             # simulated work functionality of a worker thread             if command == 'process':                 result = 'new' + item             else:                 raise ValueError, 'Unknown command %r' % command         except:             # unconditional except is right, since we report _all_ errors             report_error( )         else:             Qout.put(result) def make_and_start_thread_pool(number_of_threads_in_pool=5, daemons=True):     ''' make a pool of N worker threads, daemonize, and start all of them '''     for i in range(number_of_threads_in_pool):          new_thread = threading.Thread(target=do_work_from_queue)          new_thread.setDaemon(daemons)          Pool.append(new_thread)          new_thread.start( ) def request_work(data, command='process'):     ''' work requests are posted as (command, data) pairs to Qin '''     Qin.put((command, data)) def get_result( ):     return Qout.get( )     # implicitly stops and waits def show_all_results( ):     for result in get_all_from_queue(Qout):         print 'Result:', result def show_all_errors( ):     for etyp, err in get_all_from_queue(Qerr):         print 'Error:', etyp, err def stop_and_free_thread_pool( ):     # order is important: first, request all threads to stop...:     for i in range(len(Pool)):         request_work(None, 'stop')     # ...then, wait for each of them to terminate:     for existing_thread in Pool:         existing_thread.join( )     # clean up the pool from now-unused thread objects     del Pool[:]

Discussion

It is generally a mistake to architect a multithreading program on the premise of having it spawn arbitrarily high numbers of threads as needed. Most often, the best architecture for such a program is based on farming out work to a fixed and relatively small number of worker threadsan arrangement known as a thread pool. This recipe shows a very simple example of a thread pool, focusing on the use of Queue.Queue instances as the most useful and simplest way for inter-thread communication and synchronization.

In this recipe, worker threads run function do_work_from_queue, which has the right structure for a typical worker thread but does really minimal "processing" (just as an example). In this case, the worker thread computes a "result" by prepending the string 'new' to each arriving item (note that this implicitly assumes that arriving items are strings). In your applications, of course, you will have, in the equivalent of this do_work_from_queue function, more substantial processing, and quite possibly different kinds of processing depending on the value of the command parameter.

In addition to the worker threads in the pool, a multithreading program often has other specialized threads for various purposes, such as interfacing to various entities external to the program (a GUI, a database, a library that is not guaranteed to be thread-safe). In this recipe, such specialized threads are not shown. However, it does include at least a "main thread", which starts and stops the thread pool, determines the units of work to be farmed out, and eventually gathers all results and any errors that may have been reported.

In your applications, you may or may not want to start and stop the thread pool repeatedly. Most typically, you may start the pool as a part of your program's initialization, leave it running throughout, and stop it, if at all, only as a part of your program's final cleanup. If you set your worker threads as "daemons", as this recipe's function make_and_start_thread_pool sets them by default, it means that your program will not continue running when only worker threads are left. Rather, your program will terminate as soon as the main thread terminates. Again, this arrangement is a typically advisable architecture. At any rate, the recipe also provides a function stop_and_free_thread_pool, just in case you do want to terminate and clean up your thread pool at some point (and possibly later make and restart another one with another call to make_and_start_thread_pool).

An example use of the functionality in this recipe might be:

for i in ('_ba', '_be', '_bo'): request_work(i) make_and_start_thread_pool( ) stop_and_free_thread_pool( ) show_all_results( ) show_all_errors( )

The output from this snippet should normally be:

Result: new_ba Result: new_be Result: new_bo

although it's possible (but quite unlikely) that two of the results might end up exchanged. (If ordering of results is important to you, be sure to add a progressive number to the work requests you post from the main thread, and report it back to the main thread as part of each result or error.)

Here is a case where an error occurs and gets reported:

for i in ('_ba', 7, '_bo'): request_work(i) make_and_start_thread_pool( ) stop_and_free_thread_pool( ) show_all_results( ) show_all_errors( )

The output from this snippet should normally be (net of an extremely unlikely, but not impossible, exchange between the two "Result" lines):

Result: new_ba Result: new_bo Error: exceptions.TypeError cannot concatenate 'str' and 'int' objects

The worker thread that gets the item 7 reports a TypeError because it tries to concatenate the string 'new' with this item, which is an intan invalid operation. Not to worry: we have the try/except statement in function do_work_from_queue exactly to catch any kind of error, and Queue Qerr and functions report_error and show_all_errors exactly to ensure that errors do not pass silently, unless explicitly silenced, which is a key point of Python's general approach to programming.

See Also

Library Reference docs on tHReading and Queue modules; Python in a Nutshell chapter on threads.



Python Cookbook
Python Cookbook
ISBN: 0596007973
EAN: 2147483647
Year: 2004
Pages: 420

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net