Simple Self-Running Class

Chapter 13 - Thread Pooling

Java Thread Programming
Paul Hyde
  Copyright 1999 Sams Publishing

A Generic Thread Pool: ThreadPool
The class ThreadPool , shown in Listing 13.1 , is used to pool a set of threads for generic tasks . The worker threads are running inside ThreadPoolWorker objects, shown in Listing 13.2 . When a ThreadPool object is constructed , it constructs as many ThreadPoolWorker objects as are specified. To run a task, ThreadPool is passed a Runnable object through its execute() method. If a ThreadPoolWorker object is available, the execute() method removes it from the pool and hands off the Runnable to it for execution. If the pool is empty, the execute() method blocks until a worker becomes available. When the run() method of the Runnable task passed in returns, the ThreadPoolWorker has completed the task and puts itself back into the pool of available workers. There is no other signal that the task has been completed. If a signal is necessary, it should be coded in the tasks run() method just before it returns.
  Note The Runnable interface is being used here in a slightly different manner than youve seen before. Earlier in the book, it was required that a Runnable object reference be passed to the constructor of Thread , and the run() method was the entry point for the new thread. The run() method was never called directly.
  Here, instead of creating a new interface for thread pooling, the use of the existing Runnable interface is being expanded a little. Now, one of the worker threads will invoke the run() method directly (see line 72 of ThreadPoolWorker in Listing 13.2 ) when it is assigned to execute the Runnable task. I chose to use Runnable in this design so that passing a task to execute() would cause the run() method to be called by another thread in much the same way as Thread s start() method causes a new thread to invoke run() .
Listing 13.1  ThreadPool.javaA Thread Pool Used to Run Generic Tasks
1: // uses ObjectFIFO from chapter 18
2:
3: public class ThreadPool extends Object {
4:     private ObjectFIFO idleWorkers;
5:     private ThreadPoolWorker[] workerList;
6:
7:     public ThreadPool(int numberOfThreads) {
8:         // make sure that its at least one
9:         numberOfThreads = Math.max(1, numberOfThreads);
10:
11:         idleWorkers = new ObjectFIFO(numberOfThreads);
12:         workerList = new ThreadPoolWorker[numberOfThreads];
13:
14:         for (int i = 0; i < workerList.length; i++) {
15:             workerList[i] = new ThreadPoolWorker(idleWorkers);
16:         }
17:     }
18:
19:     public void execute(Runnable target) throws InterruptedException {
20:         // block (forever) until a worker is available
21:         ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
22:         worker.process(target);
23:     }
24:
25:     public void stopRequestIdleWorkers() {
26:         try {
27:             Object[] idle = idleWorkers.removeAll();
28:             for (int i = 0; i < idle.length; i++) {
29:                 ((ThreadPoolWorker) idle[i]).stopRequest();
30:             }
31:         } catch (InterruptedException x) {
32:             Thread.currentThread().interrupt(); // re-assert
33:         }
34:     }
35:
36:     public void stopRequestAllWorkers() {
37:         // Stop the idle ones first
38:         // productive.
39:         stopRequestIdleWorkers();
40:
41:         // give the idle workers a quick chance to die
42:         try { Thread.sleep(250); } catch (InterruptedException x) { }
43:
44:         // Step through the list of ALL workers.
45:         for (int i = 0; i < workerList.length; i++) {
46:             if (workerList[i].isAlive()) {
47:                 workerList[i].stopRequest();
48:             }
49:         }
50:     }
51: }
ThreadPool serves as the central point of control for managing the worker threads. It holds a list of all the workers created in workerList (line 5). The current pool of idle ThreadPoolWorker objects is kept in a FIFO queue, idleWorkers (line 4).
  Note First-In-First-Out (FIFO) queues allow items to be added to one end of the queue and removed from the other end. Items are removed in the exact same order as they were added (the first item in is the first item out ). A FIFO queue has a fixed capacity. If a thread invokes the add() method when the FIFO is full, it blocks waiting until another thread removes an item. If a thread invokes the remove() method when the FIFO is empty, it blocks waiting until another thread adds an item.
  FIFO queues are explained and demonstrated in Chapter 18, First-In-First-Out (FIFO) Queue. You can skip ahead to look at that technique at this time if you want to know more.
The constructor (lines 717) takes as its only parameter an int specifying the number of worker threads that should be created for this pool (line 7). The number of threads is silently forced to be at least 1 (line 9). A new ObjectFIFO is created with a capacity large enough to hold the entire pool of worker threads (line 11). This queue holds all the workers currently available for assignment to new tasks. A ThreadPoolWorker[] is created to keep a handle on all the workersregardless of whether they are currently idle (line 12). The for loop (lines 1416) is used to construct each of the ThreadPoolWorker objects. Each has a reference to the pool of available workers passed to its constructor (line 15). Each one will use this reference to add itself back to the pool when it is ready to service a new task.
When an external thread wants to run a task using one of the threads in the pool, it invokes the execute() method (lines 1923). The execute() method takes a Runnable object as a parameter. This object will have its run() method invoked by the next available worker thread. The external thread blocks waiting until an idle ThreadPoolWorker becomes available (line 21). When one is ready, the external thread passes the Runnable to the workers process() method (line 22), which returns right away. The external thread returns from execute() and is free to continue with whatever else it has to do while the worker thread runs the target .
The stopRequestIdleWorkers() method (lines 2534) is used to request that the internal threads of the idle workers stop as soon as possible. First, all the currently idle workers are removed from the queue (line 27). Each worker then has its stopRequest() method invoked (line 29). You should keep in mind that as other tasks finish, more idle workers could be added to the pool and will not be stopped until another stopRequestIdleWorkers() invocation occurs.
The stopRequestAllWorkers() method (lines 3650) is used to request that all the workers stop as soon as possible, regardless of whether they are currently idle. First, a call to stopRequestIdleWorkers() is done because they can be stopped right away with negligible impact (line 39). A quarter-second break is taken to give the idle workers a chance to shut down. Next, the list of all the workers is stepped through using a for loop (lines 4549). Each worker that is still alive (line 46) has its stopRequest() method invoked (line 47). Its possible that one or more of the idle threads will not have a chance to die before the isAlive() check. In this case, the stopRequest() method will be called twice, which should be harmless.
The ThreadPoolWorker class, shown in Listing 13.2, is in charge of providing the thread to run the specified task. In a real-world setting, this class should probably not be public , but should have package scope or be an inner class to ThreadPool . It is never accessed directly because ThreadPool acts as the sole interface to external code.
Listing 13.2  ThreadPoolWorker.javaThe Internal Assistant to ThreadPool Used to Run a Task
1: // uses class ObjectFIFO from chapter 18
2:
3: public class ThreadPoolWorker extends Object {
4:     private static int nextWorkerID = 0;
5:
6:     private ObjectFIFO idleWorkers;
7:     private int workerID;
8:     private ObjectFIFO handoffBox;
9:
10:     private Thread internalThread;
11:     private volatile boolean noStopRequested;
12:
13:     public ThreadPoolWorker(ObjectFIFO idleWorkers) {
14:         this.idleWorkers = idleWorkers;
15:
16:         workerID = getNextWorkerID();
17:         handoffBox = new ObjectFIFO(1); // only one slot
18:
19:         // just before returning, the thread should be created.
20:         noStopRequested = true;
21:
22:         Runnable r = new Runnable() {
23:                 public void run() {
24:                     try {
25:                         runWork();
26:                     } catch (Exception x) {
27:                         // in case ANY exception slips through
28:                         x.printStackTrace();
29:                     }
30:                 }
31:             };
32:
33:         internalThread = new Thread(r);
34:         internalThread.start();
35:     }
36:
37:     public static synchronized int getNextWorkerID() {
38:         // notice: syncd at the class level to ensure uniqueness
39:         int id = nextWorkerID;
40:         nextWorkerID++;
41:         return id;
42:     }
43:
44:     public void process(Runnable target) throws InterruptedException {
45:         handoffBox.add(target);
46:     }
47:
48:     private void runWork() {
49:         while (noStopRequested) {
50:             try {
51:                 System.out.println(workerID= + workerID +
52:                         , ready for work);
53:                 // Worker is ready work. This will never block
54:                 // because the idleWorker FIFO queue has
55:                 // enough capacity for all the workers.
56:                 idleWorkers.add(this);
57:
58:                 // wait here until the server adds a request
59:                 Runnable r = (Runnable) handoffBox.remove();
60:
61:                 System.out.println(workerID= + workerID +
62:                    , starting execution of new Runnable: + r);
63:                 runIt(r); // catches all exceptions
64:             } catch (InterruptedException x) {
65:                 Thread.currentThread().interrupt(); // re-assert
66:             }
67:         }
68:     }
69:
70:     private void runIt(Runnable r) {
71:         try {
72:             r.run();
73:         } catch (Exception runex) {
74:             // catch any and all exceptions
75:             System.err.println(Uncaught exception fell through from run());
76:             runex.printStackTrace();
77:         } finally {
78:             // Clear the interrupted flag (in case it comes back
79:             // set) so that if the loop goes again, the
80:             // handoffBox.remove() does not mistakenly
81:             // throw an InterruptedException.
82:             Thread.interrupted();
83:         }
84:     }
85:
86:     public void stopRequest() {
87:         System.out.println(workerID= + workerID +
88:                 , stopRequest() received.);
89:         noStopRequested = false;
90:         internalThread.interrupt();
91:     }
92:
93:     public boolean isAlive() {
94:         return internalThread.isAlive();
95:     }
96: }
ThreadPoolWorker uses the active object technique discussed in Chapter 11, Self-Running Objects. Each worker constructed is assigned a unique workerID (line 7) to help clarify the output messages. In a real-world setting, individual identity tracking is not always necessary.
At the class level, the next worker ID is held in a static member variable, nextWorkerID (line 4). This variable is retrieved and incremented inside the getNextWorkerID() method (lines 3742). It is static and synchronized so that the class-level lock is acquired before changes are made (line 37). This ensures that no two instances of ThreadPoolWorker are accidentally assigned the same workerID value.
A reference to the list of currently unused workers is held in idleWorkers (line 6). This is a reference to an ObjectFIFO queue, and the worker adds itself back to idleWorkers when it is available for assignment. The handoffBox FIFO queue (line 8) is used to pass Runnable objects to the worker in a thread-safe manner.
In the constructor (lines 1335), the passed reference to the pool of available workers is assigned to a member variable for later access (line 14). The getNextWorkerID() method is used to obtain a unique int to store in workerID (line 16). An ObjectFIFO with a capacity of only 1 is created to be used for handing off the next Runnable task to the internal thread. The rest of the code in the constructor uses the standard pattern for an active object (see Chapter 11 ).
The process() method (lines 4446) is invoked by code inside the execute() method of ThreadPool . It is used to pass the Runnable task in to the worker for processing. It is put into the handoff box to be noticed and picked up by the internal thread (line 45). Although add() declares that it will throw an InterruptedException if it is interrupted while waiting for space, this should never happen in this scenario. The handoffBox FIFO queue should be empty when the worker is available and waiting for another assignment. I chose to use an ObjectFIFO here to encapsulate the wait-notify mechanism that is necessary to signal the internal thread that a new task has arrived. Its a simpler approach and uses well- tested code.
The runWork() method  (lines 4868) follows the active object pattern of looping using the internal thread as long as no stop has been requested (line 49). Each time through the loop, the internal thread adds itself to the pool of available workers (line 56). It then waits indefinitely for an external thread to invoke the process() method and put a Runnable into the handoff box. When assigned a request, the internal thread removes it from handoffBox and casts it down from Object to Runnable (line 59). The internal thread then passes the task to the runIt() method.
The private method runIt() (lines 7084) takes the Runnable passed (line 70) and invokes its run() method (line 72). If any exceptions slip through especially RuntimeException s such as NullPointerException that can occur unexpectedly just about anywhere they are caught to protect the worker thread (line 73). Instances of Error (and its subclasses, such as OutOfMemoryError ) will break the worker, but all instances of Exception (and its subclasses) will be safely caught. If one is caught, a message and a stack trace are printed to the console (lines 7576). Regardless of how the internal thread returns from run() , the finally clause (lines 7783) ensures that the threads interrupted flag is cleared (line 82) before returning to runWork() . This is important because if the flag comes back set, and noStopRequested is still true , an erroneous InterruptedException will be thrown by the remove() method on line 59.
If the interrupted flag was set by stopRequest() , no harm will be done by clearing it. This is because, after runIt() returns (line 63), the very next action is a check of the noStopRequested flag (line 49). Because stopRequest() sets this false , runWork() will return (line 25), and the worker thread will die quietly as requested. I give a full explanation of stopRequest() and isAlive() in Chapter 11 .
ThreadPoolMain , shown in Listing 13.3, is used to demonstrate how ThreadPool can be used to run several tasks and then recycle its threads to run more tasks.
Listing 13.3  ThreadPoolMain.javaUsed to Demonstrate ThreadPool
1: public class ThreadPoolMain extends Object {
2:
3:     public static Runnable makeRunnable(
4:                 final String name ,
5:                 final long firstDelay
6:            ) {
7:
8:         return new Runnable() {
9:                 public void run() {
10:                     try {
11:                         System.out.println(name +: starting up);
12:                         Thread.sleep(firstDelay);
13:                         System.out.println(name + : doing some stuff);
14:                         Thread.sleep(2000);
15:                         System.out.println(name + : leaving);
16:                     } catch (InterruptedException ix) {
17:                         System.out.println(name + : got interrupted!);
18:                         return;
19:                     } catch (Exception x) {
20:                         x.printStackTrace();
21:                     }
22:                 }
23:
24:                 public String toString() {
25:                     return name;
26:                 }
27:             };
28:     }
29:
30:     public static void main(String[] args) {
31:         try {
32:             ThreadPool pool = new ThreadPool(3);
33:
34:             Runnable ra = makeRunnable(RA, 3000);
35:             pool.execute(ra);
36:
37:             Runnable rb = makeRunnable(RB, 1000);
38:             pool.execute(rb);
39:
40:             Runnable rc = makeRunnable(RC, 2000);
41:             pool.execute(rc);
42:
43:             Runnable rd = makeRunnable(RD, 60000);
44:             pool.execute(rd);
45:
46:             Runnable re = makeRunnable(RE, 1000);
47:             pool.execute(re);
48:
49:             pool.stopRequestIdleWorkers();
50:             Thread.sleep(2000);
51:             pool.stopRequestIdleWorkers();
52:
53:             Thread.sleep(5000);
54:             pool.stopRequestAllWorkers();
55:         } catch (InterruptedException ix) {
56:             ix.printStackTrace();
57:         }
58:     }
59: }
ThreadPoolMain creates five Runnable objects and passes them to the execute() method of ThreadPool . The static method makeRunnable() (lines 328) is used to manufacture Runnable objects that are similar. It takes two parameters, the first being the name to use in output messages to differentiate the Runnable from the others (line 4). The second is the number of milliseconds to wait between printing the first and second messages (line 5). These two parameters are declared final so that they can be accessed from the anonymous inner class that is created (lines 8-27).
The Runnable interface is implemented on-the-fly . The two methods that are defined are toString() (lines 2426) and run() (lines 922). The toString() method simply prints out name . The run() method prints several messages, all of which include the name to clarify the output (lines 11, 13, 15, and 17). The delay factor passed in is used to control the length of the first sleep() (line 12). If either sleep() is interrupted, a message is printed and the method returns (lines 1618). If any other exception occurs, a stack trace is printed and the method returns (lines 1921).
In main() , a ThreadPool object is constructed with the specification that it should create 3 instances of ThreadPoolWorker (line 32). The makeRunnable() method is invoked 5 times, and the results of each are passed to the execute() method (lines 3447).  All 5 will not be able to run at the same time because the pool has only 3 workers. The fourth and fifth calls to execute() will block briefly until a worker becomes available. After all 5 have been started (and at least 2 will have finished), the stopRequestIdleWorkers() method is invoked (line 49) on the pool to remove and shut down any and all workers that are currently not processing a request. After 2 seconds (line 50), another request is issued to stop all idle workers (line 51). After an additional 5 seconds have elapsed, the stopRequestAllWorkers() method is called to shut down any and all remaining workers, regardless of whether they are currently busy servicing a request (line 54).
Listing 13.4 shows possible output from running ThreadPoolMain . Your output should differ a bit because of the whims of the thread scheduler.
Listing 13.4  Possible Output from ThreadPoolMain
1: workerID=0, ready for work
2: workerID=2, ready for work
3: workerID=1, ready for work
4: workerID=0, starting execution of new Runnable: RA
5: RA: starting up
6: workerID=2, starting execution of new Runnable: RB
7: RB: starting up
8: workerID=1, starting execution of new Runnable: RC
9: RC: starting up
10: RB: doing some stuff
11: RC: doing some stuff
12: RA: doing some stuff
13: RB: leaving
14: workerID=2, ready for work
15: workerID=2, starting execution of new Runnable: RD
16: RD: starting up
17: RC: leaving
18: workerID=1, ready for work
19: workerID=1, starting execution of new Runnable: RE
20: RE: starting up
21: RA: leaving
22: workerID=0, ready for work
23: RE: doing some stuff
24: workerID=0, stopRequest() received.
25: RE: leaving
26: workerID=1, ready for work
27: workerID=1, stopRequest() received.
28: workerID=2, stopRequest() received.
29: RD: got interrupted!
Notice that the workers add themselves to the idle list in just about any order (output lines 13). However, the tasks are started in the requested order (lines 49). When the RB task is done (line 13), the worker that was running it, 2 , adds itself back to the idle queue (line 14). Task RD was blocked inside execute() , waiting for a worker to become available. As soon as 2 puts itself on the idle queue, it is recycled and removed to run task RD (line 15). When worker 1 finishes running task RC (line 17), it is recycled to run task RE (lines 1819). Next, worker finishes task RA and adds itself to the idle queue (line 22).
The first request to stop the currently idle threads gets idle worker to stop (line 24). The next request gets idle worker 1 to stop. Task RD was started with a 60 -second delay and is still running. When the request to stop all the threads comes in (line 28), task RD is interrupted during its long sleep (line 29), but then returns to allow the thread to die.

Toc


Java Thread Programming
Java Thread Programming
ISBN: 0672315858
EAN: 2147483647
Year: 2005
Pages: 149
Authors: Paul Hyde

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net