6.10 A Multiplexed Network Client

We end this chapter with Example 6-14, a complex class named HttpDownloadManager. As its name implies, this class is a client-side utility that manages any number of concurrent HTTP downloads on a background thread. To download something, just call the download( ) method, passing a java.net.URI and an optional Listener object to be notified when the download is complete or has aborted with an error. download( ) does not block: it returns a Download object immediately, and you can monitor the status of the download by polling the methods of this object. The data that is downloaded is not saved to a file, but is available from the getData( ) method of the Download object.

The Listener and Download interfaces are defined as inner classes of HttpDownloadManager. The Status class is another inner class: it is a type-safe enumeration of download states returned by Download.getStatus( ). Two other inner classes used in this example are DownloadImpl, the package-private concrete implementation of the Download interface, and Test, a simple test program that demonstrates the usage of the HttpDownloadManager.

HttpDownloadManager extends Thread, and the downloads are handled in this background thread. Multiple downloads can be handled concurrently (this is useful when the client has more bandwidth available than the servers which are being downloaded from) because the thread uses a Selector to multiplex the SocketChannel objects from which data is being read: the select( ) call wakes up whenever data is ready to be read from one of the pending downloads. Most of the interesting code is found in the run( ) method, which is the body of the background thread.

We've seen basic channel multiplexing code with Selector.select( ) in previous examples. This one demonstrates three new features. The first is the call to wakeup( ) in the download( ) method. A new channel cannot be registered with a Selector while that selector is blocked in a select( ) call in the background thread. So the download( ) method creates a DownloadImpl object containing all the information about the download, places this object in a synchronized list of pending downloads, and then calls wakeup( ) and returns the Download object to the caller. The wakeup( ) call in download( ) causes the background thread to stop blocking in select( ). In other examples, we've immediately checked the selectedKeys( ) of a Selector when its select( ) method returns. In this case, we first look at the pendingDownloads list and create and register a SocketChannel for any DownloadImpl objects found there.

The second new feature of interest in this example is that it performs asynchronous connection. Before an HTTP GET request can be sent to a web server, the client must connect to the server. Establishing a TCP connection over the Internet can sometimes take a second or two, and we don't want the background thread to block while this connection is set up. So when the thread detects a pending download, it creates a SocketChannel in an unconnected state. It then puts the channel into nonblocking mode and registers it with the Selector object, indicating that the Selector should monitor the channel for readiness to connect as well as readiness to read. Only after registering the channel does the thread call the connect( ) method and supply the address and port to connect to. Since the channel is nonblocking, connect( ) returns immediately. When the connection is ready, select( ) wakes up and the thread calls finishConnect( ) on the channel to complete the connection. (After completing the connection, the thread immediately sends the HTTP GET request across the channel. It assumes that the channel is writable and that the complete text of the request can be written quickly; if this assumption fails, the thread will end up busy-waiting while it repeatedly attempts to send the request to the server.)

The third new feature demonstrated by this example is that when it registers a SocketChannel with the Selector, it uses a three-argument version of the register( ) call to associate the DownloadImpl object with the SelectionKey for the channel. Then, when the key becomes connectable or readable, the background thread can retrieve the channel and the state object associated with the key.

Finally, this example also demonstrates the logging API of java.util.logging. We'll discuss logging in the subsection that follows the example code.

Example 6-14. HttpDownloadManager.java
package je3.nio; import java.io.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.util.*; import java.util.logging.*; /**  * This class manages asynchronous HTTP GET downloads and demonstrates  * non-blocking I/O with SocketChannel and Selector and also demonstrates  * logging with the java.util.logging package.  This example uses a number  * of inner classes and interfaces.  *   * Call download( ) for each HTTP GET request you want to issue.  You may  * optionally pass a Listener object that will be notified when the download  * terminates or encounters an exception.  download( ) returns a Download object  * which holds the downloaded bytes (including HTTP headers) and which allows  * you to poll the Status of the download.  Call release( ) when there are   * no more downloads.  */ public class HttpDownloadManager extends Thread {     // An enumerated type.  Values are returned by Download.getStatus( )     public static class Status {         // We haven't connected to the server yet         public static final Status UNCONNECTED = new Status("Unconnected");         // We're connected to the server, sending request or receiving response         public static final Status CONNECTED = new Status("Connected");         // Response has been received.  Response may have been an HTTP error         public static final Status DONE = new Status("Done");         // Something went wrong: bad hostname, for example.         public static final Status ERROR = new Status("Error");         private final String name;         private Status(String name) { this.name = name; }         public String toString( ) { return name; }     }     // Everything you need to know about a pending download     public interface Download {         public String getHost( );   // Hostname we're downloading from         public int getPort( );      // Defaults to port 80         public String getPath( );   // includes query string as well         public Status getStatus( ); // Status of the download         public byte[  ] getData( );   // Download data, including response headers         public int getHttpStatus( );// Only call when status is DONE     }     // Implement this interface if you want to know when a download completes     public interface Listener {         public void done(Download download);         public void error(Download download, Throwable throwable);     }     Selector selector;          // For multiplexing non-blocking I/O.     ByteBuffer buffer;          // A shared buffer for downloads     List pendingDownloads;      // Downloads that don't have a Channel yet     boolean released = false;   // Set when the release( ) method is called.     Logger log;                 // Logging output goes here     // The HTTP protocol uses this character encoding     static final Charset LATIN1 = Charset.forName("ISO-8859-1");     public HttpDownloadManager(Logger log) throws IOException {         if (log == null) log = Logger.getLogger(this.getClass( ).getName( ));         this.log = log;         selector = Selector.open( );                  // create Selector         buffer = ByteBuffer.allocateDirect(64*1024); // allocate buffer         pendingDownloads = Collections.synchronizedList(new ArrayList( ));         this.start( );                                // start thread     }     // Ask the HttpDownloadManager to begin a download.  Returns a Download     // object that can be used to poll the progress of the download.  The     // optional Listener object will be notified when the download completes     // or aborts.     public Download download(URI uri, Listener l)             throws IOException     {         if (released)            throw new IllegalStateException("Can't download( ) after release( )");         // Get info from the URI         String scheme = uri.getScheme( );         if (scheme == null || !scheme.equals("http"))             throw new IllegalArgumentException("Must use 'http:' protocol");         String hostname = uri.getHost( );         int port = uri.getPort( );         if (port == -1) port = 80; // Use default port if none specified         String path = uri.getRawPath( );         if (path == null || path.length( ) == 0) path = "/";         String query = uri.getRawQuery( );         if (query != null) path += "?" + query;         // Create a Download object with the pieces of the URL         Download download = new DownloadImpl(hostname, port, path, l);         // Add it to the list of pending downloads. This is a synchronized list         pendingDownloads.add(download);         // And ask the thread to stop blocking in the select( ) call so that         // it will notice and process this new pending Download object.         selector.wakeup( );         // Return the Download so that the caller can monitor it if desired.         return download;     }     public void release( ) {         released = true; // The thread will terminate when it notices the flag.         try { selector.close( ); } // This will wake the thread up         catch(IOException e) {             log.log(Level.SEVERE, "Error closing selector", e);         }     }     public void run( ) {         log.info("HttpDownloadManager thread starting.");         // The download thread runs until release( ) is called         while(!released) {             // The thread blocks here waiting for something to happen             try { selector.select( ); }             catch(IOException e) {                 // This should never happen.                 log.log(Level.SEVERE, "Error in select( )", e);                 return;             }             // If release( ) was called, the thread should exit.             if (released) break;             // If any new Download objects are pending, deal with them first             if (!pendingDownloads.isEmpty( )) {                 // Although pendingDownloads is a synchronized list, we still                 // need to use a synchronized block to iterate through its                 // elements to prevent a concurrent call to download( ).                 synchronized(pendingDownloads) {                     Iterator iter = pendingDownloads.iterator( );                     while(iter.hasNext( )) {                         // Get the pending download object from the list                         DownloadImpl download = (DownloadImpl)iter.next( );                         iter.remove( );  // And remove it.                         // Now begin an asynchronous connection to the                          // specified host and port.  We don't block while                         // waiting to connect.                         SelectionKey key = null;                         SocketChannel channel = null;                         try {                             // Open an unconnected channel                             channel = SocketChannel.open( );                             // Put it in non-blocking mode                             channel.configureBlocking(false);                             // Register it with the selector, specifying that                             // we want to know when it is ready to connect                             // and when it is ready to read.                             key = channel.register(selector,                                                    SelectionKey.OP_READ |                                                     SelectionKey.OP_CONNECT,                                                    download);                             // Create the web server address                             SocketAddress address =                                  new InetSocketAddress(download.host,                                                       download.port);                             // Ask the channel to start connecting                             // Note that we don't send the HTTP request yet.                             // We'll do that when the connection completes.                             channel.connect(address);                         }                         catch(Exception e) {                             handleError(download, channel, key, e);                         }                     }                 }             }             // Now get the set of keys that are ready for connecting or reading             Set keys = selector.selectedKeys( );             if (keys == null) continue; // bug workaround; should not be needed             // Loop through the keys in the set             for(Iterator i = keys.iterator( ); i.hasNext( ); ) {                 SelectionKey key = (SelectionKey)i.next( );                 i.remove( );  // Remove the key from the set before handling                 // Get the Download object we attached to the key                 DownloadImpl download = (DownloadImpl) key.attachment( );                 // Get the channel associated with the key.                 SocketChannel channel = (SocketChannel)key.channel( );                 try {                     if (key.isConnectable( )) {                           // If the channel is ready to connect, complete the                         // connection and then send the HTTP GET request to it.                         if (channel.finishConnect( )) {                             download.status = Status.CONNECTED;                             // This is the HTTP request we send                             String request =                                 "GET " + download.path + " HTTP/1.1\r\n" +                                 "Host: " + download.host + "\r\n" +                                 "Connection: close\r\n" +                                 "\r\n";                             // Wrap in a CharBuffer and encode to a ByteBuffer                             ByteBuffer requestBytes =                                 LATIN1.encode(CharBuffer.wrap(request));                             // Send the request to the server.  If the bytes                             // aren't all written in one call, we busy loop!                             while(requestBytes.hasRemaining( ))                                 channel.write(requestBytes);                             log.info("Sent HTTP request: " + download.host +                                       ":" + download.port + ": " + request);                         }                     }                     if (key.isReadable( )) {                         // If the key indicates that there is data to be read,                         // then read it and store it in the Download object.                         int numbytes = channel.read(buffer);                                                  // If we read some bytes, store them, otherwise                         // the download is complete and we need to note this                         if (numbytes != -1) {                             buffer.flip( );  // Prepare to drain the buffer                             download.addData(buffer); // Store the data                             buffer.clear( ); // Prepare for another read                             log.info("Read " + numbytes + " bytes from " +                                      download.host + ":" + download.port);                         }                         else {                             // If there are no more bytes to read                             key.cancel( );     // We're done with the key                             channel.close( );  // And with the channel.                             download.status = Status.DONE;                              if (download.listener != null)  // notify listener                                 download.listener.done(download);                             log.info("Download complete from " +                                      download.host + ":" + download.port);                         }                     }                 }                 catch (Exception e) {                     handleError(download, channel, key, e);                 }             }         }         log.info("HttpDownloadManager thread exiting.");     }     // Error-handling code used by the run( ) method:      // set status, close channel, cancel key, log error, notify listener.     void handleError(DownloadImpl download, SocketChannel channel,                      SelectionKey key, Throwable throwable)     {         download.status = Status.ERROR;         try {if (channel != null) channel.close( );} catch(IOException e) {  }         if (key != null) key.cancel( );         log.log(Level.WARNING,                 "Error connecting to or downloading from " + download.host +                 ":" + download.port,                 throwable);         if (download.listener != null)             download.listener.error(download, throwable);     }     // This is the Download implementation we use internally.     static class DownloadImpl implements Download {         final String host;     // Final fields are immutable for thread-safety         final int port;         final String path;         final Listener listener;         volatile Status status; // Volatile fields may be changed concurrently         volatile byte[  ] data = new byte[0];         DownloadImpl(String host, int port, String path, Listener listener) {             this.host = host;             this.port = port;             this.path = path;             this.listener = listener;             this.status = Status.UNCONNECTED;  // Set initial status         }         // These are the basic getter methods         public String getHost( ) { return host; }         public int getPort( ) { return port; }         public String getPath( ) { return path; }         public Status getStatus( ) { return status; }         public byte[  ] getData( ) { return data; }         /**          * Return the HTTP status code for the download.          * Throws IllegalStateException if status is not Status.DONE          */         public int getHttpStatus( ) {             if (status != Status.DONE) throw new IllegalStateException( );             // In HTTP 1.1, the return code is in ASCII bytes 10-12.             return                 (data[9] - '0') * 100 +                 (data[10]- '0') * 10 +                 (data[11]- '0') * 1;         }         // Used internally when we read more data.         // This should use a larger buffer to prevent frequent re-allocation.         void addData(ByteBuffer buffer) {             assert status == Status.CONNECTED;  // only called during download             int oldlen = data.length;           // How many existing bytes             int numbytes = buffer.remaining( );  // How many new bytes             int newlen = oldlen + numbytes;             byte[  ] newdata = new byte[newlen];  // Create new array             System.arraycopy(data, 0, newdata, 0, oldlen); // Copy old bytes             buffer.get(newdata, oldlen, numbytes);         // Copy new bytes             data = newdata;                     // Save new array         }     }     // This class demonstrates a simple use of HttpDownloadManager.     public static class Test {         static int completedDownloads = 0;         public static void main(String args[  ])             throws IOException, URISyntaxException         {             // With a -v argument, our logger will display lots of messages             final boolean verbose = args[0].equals("-v");             int firstarg = 0;             Logger logger = Logger.getLogger(Test.class.getName( ));             if (verbose) {                 firstarg = 1;                 logger.setLevel(Level.INFO);             }             else                       // regular output                 logger.setLevel(Level.WARNING);                          // How many URLs are on the command line?             final int numDownloads = args.length - firstarg;             // Create the download manager             final HttpDownloadManager dm = new HttpDownloadManager(logger);             // Now loop through URLs and call download( ) for each one             // passing a listener object to receive notifications             for(int i = firstarg; i < args.length; i++) {                 URI uri = new URI(args[i]);                 dm.download(uri,                      new Listener( ) {                             public void done(Download d) {                                 System.err.println("DONE: " + d.getHost( ) +                                                    ": " + d.getHttpStatus( ));                                 // If all downloads are complete, we're done                                 // with the HttpDownloadManager thread.                                 if (++completedDownloads == numDownloads)                                     dm.release( );                             }                             public void error(Download d, Throwable t) {                                 System.err.println(d.getHost( ) + ": " + t);                                 if (++completedDownloads == numDownloads)                                     dm.release( );                             }                         });             }         }     } }

6.10.1 Logging in HttpDownloadManager

HttpDownloadManager uses a Logger object to log informational and error messages. It demonstrates the info( ) and log( ) methods of Logger; see the Logger documentation for descriptions of many other logging methods. HttpDownloadManager can use a Logger passed to it, or it can obtain its own. There are examples of obtaining a Logger object in both the constructor method and the inner Test class. The Test class also implements a -v "verbose" switch to demonstrate how to set the logging threshold of a Logger. Informational logging messages will be discarded unless the -v option is specified.

The java.util.logging package allows flexible runtime configuration of how logging is done. In most installations, the default is to print logging messages to the console. See the file jre/lib/logging.properties for the default configuration on your installation. You can override this default configuration with your own properties file: define the property java.util.logging.config.file with the -D option when you start the Java VM. For example, to run the HttpDownloadManager test program using a logging configuration specified in a file named log.props, you'd use a command line like this one (it has been word wrapped to fit on the page; you'd type it on one line):

java -Djava.util.logging.config.file=log.props je3.nio.HttpDownloadManager\$Test -v ...urls here...

Note that we use the -v switch so that the program actually generates log messages. Without this switch, you'd have to purposely specify bad URLs so that the program would log some errors. The following listing is a sample logging configuration file, which you'll find, along with the HttpDownloadManager source code, in a file named log.props:

# # A logging properties file for the HttpDownloadManager example # See also jre/lib/logging.properties in your Java installation. # Use this file by specifying it as the value of a property named # java.util.logging.config.file.  For example: # # java -Djava.util.logging.config.file je3.nio.Http... # # This property specifies the default logging level for log messages # sent by our program.  Note, however, that if you run the  # HttpDownloadManager\$Test class, this property will be overridden by # the presence or absence of the -v option. je3.nio.HttpDownloadManager.level: INFO # This property says where output should go.  The default configuration # is for it to go to the console.  This property sends log messages to # a FileHandler configured below instead handlers=java.util.logging.FileHandler # These properties configure the FileHandler # See java.util.logging.FileHandler for other available properties java.util.logging.FileHandler.pattern = %h/java%u.log java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter


Java Examples in a Nutshell
Java Examples in a Nutshell, 3rd Edition
ISBN: 0596006209
EAN: 2147483647
Year: 2003
Pages: 285

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net