5.6 Thread Pools


Adding multiple threads to a program dramatically improves performance, especially for I/O-bound programs such as most network programs. However, threads are not without overhead of their own. Starting a thread and cleaning up after a thread that has died takes a noticeable amount of work from the virtual machine, especially if a program spawns hundreds of threadsnot an unusual occurrence for even a low- to medium-volume network server. Even if the threads finish quickly, this can overload the garbage collector or other parts of the VM and hurt performance, just like allocating thousands of any other kind of object every minute. Even more importantly, switching between running threads carries overhead. If the threads are blocking naturallyfor instance, by waiting for data from the networkthere's no real penalty to this, but if the threads are CPU-bound, then the total task may finish more quickly if you can avoid a lot of switching between threads. Finally, and most importantly, although threads help make more efficient use of a computer's limited CPU resources, there are still only a finite amount of resources to go around. Once you've spawned enough threads to use all the computer's available idle time, spawning more threads just wastes MIPS and memory on thread management.

Fortunately, you can get the best of both worlds by reusing threads. You cannot restart a thread once it's died, but you can engineer threads so that they don't die as soon as they've finished one task. Instead, put all the tasks you need to accomplish in a queue or other data structure and have each thread retrieve a new task from the queue when it's completed its previous task. This is called thread pooling , and the data structure in which the tasks are kept is called the pool .

The simplest way to implement a thread pool is by allotting a fixed number of threads when the pool is first created. When the pool is empty, each thread waits on the pool. When a task is added to the pool, all waiting threads are notified. When a thread finishes its assigned task, it goes back to the pool for a new task. If it doesn't get one, it waits until a new task is added to the pool.

An alternative is to put the threads themselves in the pool and have the main program pull threads out of the pool and assign them tasks. If no thread is in the pool when a task becomes necessary, the main program can spawn a new thread. As each thread finishes a task, it returns to the pool. (Imagine this scheme as a union hall in which new workers join the union only when full employment of current members is achieved.)

There are many data structures you can use for a pool, although a queue is probably the most efficient for ensuring that tasks are performed in a first-in, first-out order. Whichever data structure you use to implement the pool, however, you have to be extremely careful about synchronization, since many threads will interact with it very close together in time. The simplest way to avoid problems is to use either a java.util.Vector (which is fully synchronized) or a synchronized Collection from the Java Collections API.

Let's look at an example. Suppose you want to gzip every file in the current directory using a java.util.zip.GZIPOutputStream . On the one hand, this is an I/O-heavy operation because all the files have to be read and written. On the other hand, data compression is a very CPU- intensive operation, so you don't want too many threads running at once. This is a good opportunity to use a thread pool. Each client thread will compress files while the main program will determine which files to compress. In this example, the main program is likely to significantly outpace the compressing threads since all it has to do is list the files in a directory. Therefore, it's not out of the question to fill the pool first, then start the threads that compress the files in the pool. However, to make this example as general as possible, we'll allow the main program to run in parallel with the zipping threads.

Example 5-15 shows the GZipThread class. It contains a private field called pool containing a reference to the pool. Here that field is declared to have List type, but it's always accessed in a strictly queue-like first-in, first-out order. The run( ) method removes File objects from the pool and gzips each one. If the pool is empty when the thread is ready to get something new from the pool, then the thread waits on the pool object.

Example 5-15. The GZipThread class
 import java.io.*; import java.util.*; import java.util.zip.*; public class GZipThread extends Thread {   private List pool;   private static int filesCompressed = 0;   public GZipThread(List pool) {     this.pool = pool;   }      private static synchronized void incrementFilesCompressed( ) {     filesCompressed++;   }   public void run( ) {          while (filesCompressed != GZipAllFiles.                               getNumberOfFilesToBeCompressed( )) {            File input = null;              synchronized (pool) {                  while (pool.isEmpty( )) {           if (filesCompressed == GZipAllFiles.                                  getNumberOfFilesToBeCompressed( )) {             System.out.println("Thread ending");             return;           }           try {             pool.wait( );           }           catch (InterruptedException ex) {           }         }         input = (File) pool.remove(pool.size( )-1);          incrementFilesCompressed( );       }            // don't compress an already compressed file       if (!input.getName( ).endsWith(".gz")) {                try {           InputStream in = new FileInputStream(input);           in = new BufferedInputStream(in);                      File output = new File(input.getParent( ), input.getName( ) + ".gz");           if (!output.exists( )) { // Don't overwrite an existing file             OutputStream out = new FileOutputStream(output);             out = new GZIPOutputStream(out);             out = new BufferedOutputStream(out);             int b;             while ((b = in.read( )) != -1) out.write(b);             out.flush( );             out.close( );             in.close( );           }         }         catch (IOException ex) {           System.err.println(ex);         }                } // end if         } // end while        } // end run } // end ZipThread 

Example 5-16 is the main program. It constructs the pool as a Vector object, passes this to four newly constructed GZipThread objects, starts all four threads, and iterates through all the files and directories listed on the command line. Those files and files in those directories are added to the pool for eventual processing by the four threads.

Example 5-16. The GZipThread user interface class
 import java.io.*; import java.util.*; public class GZipAllFiles {      public final static int THREAD_COUNT = 4;   private static int filesToBeCompressed = -1;   public static void main(String[] args) {     Vector pool = new Vector( );     GZipThread[] threads = new GZipThread[THREAD_COUNT];          for (int i = 0; i < threads.length; i++) {       threads[i] = new GZipThread(pool);        threads[i].start( );     }     int totalFiles = 0;     for (int i = 0; i < args.length; i++) {              File f = new File(args[i]);       if (f.exists( )) {         if (f.isDirectory( )) {           File[] files = f.listFiles( );           for (int j = 0; j < files.length; j++) {             if (!files[j].isDirectory( )) { // don't recurse directories               totalFiles++;               synchronized (pool) {                 pool.add(0, files[j]);                 pool.notifyAll( );               }             }           }         }          else {           totalFiles++;           synchronized (pool) {             pool.add(0, f);             pool.notifyAll( );           }         }                } // end if            } // end for          filesToBeCompressed = totalFiles;          // make sure that any waiting thread knows that no      // more files will be added to the pool     for (int i = 0; i < threads.length; i++) {       threads[i].interrupt( );     }   }   public static int getNumberOfFilesToBeCompressed( ) {     return filesToBeCompressed;   } } 

The big question here is how to tell the program that it's done and should exit. You can't simply exit when all files have been added to the pool, because at that point most of the files haven't been processed. Neither can you exit when the pool is empty, because that may occur at the start of the program (before any files have been placed in the pool) or at various intermediate times when not all files have yet been put in the pool but all files that have been put there are processed . The latter possibility also prevents the use of a simple counter scheme.

The solution adopted here is to separately track the number of files that need to be processed ( GZipAllFiles.filesToBeCompressed ) and the number of files actually processed ( GZipThread.filesCompressed ). When these two values match, all threads' run( ) methods return. Checks are made at the start of each of the while loops in the run( ) method to see whether it's necessary to continue. This scheme is preferred to the deprecated stop( ) method, because it won't suddenly stop the thread while it's halfway through compressing a file. This gives us much more fine-grained control over exactly when and where the thread stops.

Initially, GZipAllFiles.filesToBeCompressed is set to the impossible value -1. Only when the final number is known is it set to its real value. This prevents early coincidental matches between the number of files processed and the number of files to be processed. It's possible that when the final point of the main( ) method is reached, one or more of the threads will be waiting. Thus, we interrupt each of the threads (an action that has no effect if the thread is merely processing and not waiting or sleeping) to make sure it checks one last time.

And finally, the last element of this program is the private GZipThread.incrementFilesCompressed( ) method. This method is synchronized to ensure that if two threads try to update the filesCompressed field at the same time, one will wait. Otherwise, the GZipThread.filesCompressed field could end up one short of the true value and the program would never exit. Since the method is static, all threads synchronize on the same Class object. A synchronized instance method wouldn't be sufficient here.

The complexity of determining when to stop this program is mostly atypical of the more heavily threaded programs you'll write because it does have such a definite ending point: the point at which all files are processed. Most network servers continue indefinitely until some part of the user interface shuts them down. The real solution here is to provide some sort of simple user interfacesuch as typing a period on a line by itselfthat ends the program.

This chapter has been a whirlwind tour of threading in Java, covering the bare minimum you need to know to write multithreaded network programs. For a more detailed and comprehensive look with many more examples, check out Java Threads , by Scott Oaks and Henry Wong (O'Reilly). Once you've mastered that book, Doug Lea's Concurrent Programming in Java (Addison Wesley) provides a comprehensive look at the traps and pitfalls of concurrent programming from a design patterns perspective.

Java Network Programming
Java Network Programming, Third Edition
ISBN: 0596007213
EAN: 2147483647
Year: 2003
Pages: 164

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net