12.2 An Example Server

     

Clients are well and good, but channels and buffers are really intended for server systems that need to process many simultaneous connections efficiently . Handling servers requires a third new piece in addition to the buffers and channels used for the client. Specifically , you need selectors that allow the server to find all the connections that are ready to receive output or send input.

To demonstrate the basics, I'll implement a simple server for the character generator protocol. When implementing a server that takes advantage of Java 1.4's new I/O APIs, begin by calling the static factory method ServerSocketChannel. open () method to create a new ServerSocketChannel object:

 ServerSocketChannel serverChannel = ServerSocketChannel .open( ); 

Initially this channel is not actually listening on any port. To bind it to a port, retrieve its ServerSocket peer object with the socket( ) method and then use the bind( ) method on that peer. For example, this code fragment binds the channel to a server socket on port 19:

 ServerSocket ss = serverChannel.socket( ); ss.bind(new InetSocketAddress(19)); 

As with regular server sockets, binding to port 19 requires you to be root on Unix (including Linux and Mac OS X). Nonroot users can only bind to ports 1024 and higher.

The server socket channel is now listening for incoming connections on port 19. To accept one, call the ServerSocketChannel accept() method, which returns a SocketChannel object:

 SocketChannel clientChannel = serverChannel.accept( ); 

On the server side, you'll definitely want to make the client channel non-blocking to allow the server to process multiple simultaneous connections:

 clientChannel.configureBlocking(false); 

You may also want to make the ServerSocketChannel non-blocking. By default, this accept( ) method blocks until there's an incoming connection, like the accept( ) method of ServerSocket . To change this, simply call configureBlocking(false) before calling accept( ) :

 serverChannel.configureBlocking(false); 

A non-blocking accept( ) returns null almost immediately if there are no incoming connections. Be sure to check for that or you'll get a nasty NullPointerException when trying to use the socket.

There are now two open channels: a server channel and a client channel. Both need to be processed . Both can run indefinitely. Furthermore, processing the server channel will create more open client channels. In the traditional approach, you assign each connection a thread, and the number of threads climbs rapidly as clients connect. Instead, in the new I/O API, you create a Selector that enables the program to iterate over all the connections that are ready to be processed. To construct a new Selector , just call the static Selector.open( ) factory method:

 Selector selector = Selector.open( ); 

Next you need to register each channel with the selector that monitors it using the channel's register() method. When registering, specify the operation you're interested in using a named constant from the SelectionKey class. For the server socket, the only operation of interest is OP_ACCEPT; that is, is the server socket channel ready to accept a new connection?

 serverChannel.register(selector, SelectionKey.OP_ACCEPT); 

For the client channels, you want to know something a little different, specifically, whether they're ready to have data written onto them. For this, use the OP_WRITE key:

 SelectionKey key = clientChannel.register(selector, SelectionKey.OP_WRITE); 

Both register( ) methods return a SelectionKey object. However, we're only going to need to use that key for the client channels, because there can be more than one of them. Each SelectionKey has an attachment of arbitrary Object type. This is normally used to hold an object that indicates the current state of the connection. In this case, we can store the buffer that the channel writes onto the network. Once the buffer is fully drained, we'll refill it. Fill an array with the data that will be copied into each buffer. Rather than writing to the end of the buffer, then rewinding to the beginning of the buffer and writing again, it's easier just to start with two sequential copies of the data so every line is available as a contiguous sequence in the array:

 byte[] rotation = new byte[95*2]; for (byte i = ' '; i <= '~'; i++) {   rotation[i-' '] = i;       rotation[i+95-' '] = i;    } 

Because this array will only be read from after it's been initialized , you can reuse it for multiple channels. However, each channel will get its own buffer filled with the contents of this array. We'll stuff the buffer with the first 72 bytes of the rotation array, then add a carriage return/ linefeed pair to break the line. Then we'll flip the buffer so it's ready for draining, and attach it to the channel's key:

 ByteBuffer buffer = ByteBuffer.allocate(74); buffer.put(rotation, 0, 72); buffer.put((byte) '\r'); buffer.put((byte) '\n'); buffer.flip( ); key2.attach(buffer); 

To check whether anything is ready to be acted on, call the selector's select( ) method. For a long-running server, this normally goes in an infinite loop:

 while (true) {   selector.select ( );   // process selected keys... } 

Assuming the selector does find a ready channel, its selectedKeys( ) method returns a java.util.Set containing one SelectionKey object for each ready channel. Otherwise it returns an empty set. In either case, you can loop through this with a java.util.Iterator :

 Set readyKeys = selector.selectedKeys( ); Iterator iterator = readyKeys.iterator( ); while (iterator.hasNext( )) {   SelectionKey key = (SelectionKey) (iterator.next( ));   // Remove key from set so we don't process it twice   iterator.remove( );   // operate on the channel... } 

Removing the key from the set tells the selector that we've dealt with it, and the Selector doesn't need to keep giving it back to us every time we call select( ) . The Selector will add the channel back into the ready set when select( ) is called again if the channel becomes ready again. It's really important to remove the key from the ready set here, though.

If the ready channel is the server channel, the program accepts a new socket channel and adds it to the selector. If the ready channel is a socket channel, the program writes as much of the buffer as it can onto the channel. If no channels are ready, the selector waits for one. One thread, the main thread, processes multiple simultaneous connections.

In this case, it's easy to tell whether a client or a server channel has been selected because the server channel will only be ready for accepting and the client channels will only be ready for writing. Both of these are I/O operations, and both can throw IOException s for a variety of reasons, so you'll want to wrap this all in a try block.

 try {   if (key.isAcceptable( )) {     ServerSocketChannel server = (ServerSocketChannel )                                           key.channel( );     SocketChannel  connection = server.accept( );     connection.configureBlocking(false);     connection.register(selector,                          SelectionKey.OP_WRITE);     // set up the buffer for the client...   }   else if (key.isWritable( )) {     SocketChannel client = (SocketChannel ) key.channel( );     // write data to client...   } } 

Writing the data onto the channel is easy. Retrieve the key's attachment, cast it to ByteBuffer , and call hasRemaining() to check whether there's any unwritten data left in the buffer. If there is, write it. Otherwise, refill the buffer with the next line of data from the rotation array and write that.

 ByteBuffer buffer = (ByteBuffer) key.attachment( ); if (!buffer.hasRemaining( )) {   // Refill the buffer with the next line   // Figure out where the last line started   buffer.rewind( );   int first = buffer.get( );   // Increment to the next character   buffer.rewind( );   int position = first - ' ' + 1;   buffer.put(rotation, position, 72);   buffer.put((byte) '\r');   buffer.put((byte) '\n');   buffer.flip( ); } client.write(buffer); 

The algorithm that figures out where to grab the next line of data relies on the characters being stored in the rotation array in ASCII order. It should be familiar to anyone who learned C from Kernighan and Ritchie, but for the rest of us it needs a little explanation. buffer.get() reads the first byte of data from the buffer. From this number we subtract the space character (32) because that's the first character in the rotation array. This tells us which index in the array the buffer currently starts at. We add 1 to find the start of the next line and refill the buffer.

In the chargen protocol, the server never closes the connection. It waits for the client to break the socket. When this happens, an exception will be thrown. Cancel the key and close the corresponding channel:

 catch (IOException ex) {   key.cancel( );   try {     // You can still get the channel from the key after cancelling the key.     key.channel( ).close( );   }   catch (IOException cex) {   } } 

Example 12-2 puts this all together in a complete chargen server that processes multiple connections efficiently in a single thread.

Example 12-2. A non-blocking chargen server
 import java.nio.*; import java.nio.channels.*; import java.net.*; import java.util.*; import java.io.IOException; public class ChargenServer {        public static int DEFAULT_PORT = 19;      public static void main(String[] args) {        int port;     try {       port = Integer.parseInt(args[0]);     }     catch (Exception ex) {       port = DEFAULT_PORT;        }     System.out.println("Listening for connections on port " + port);     byte[] rotation = new byte[95*2];     for (byte i = ' '; i <= '~'; i++) {         rotation[i-' '] = i;             rotation[i+95-' '] = i;         }          ServerSocketChannel serverChannel;     Selector selector;     try {       serverChannel = ServerSocketChannel.open( );       ServerSocket ss = serverChannel.socket( );       InetSocketAddress address = new InetSocketAddress(port);       ss.bind(address);       serverChannel.configureBlocking(false);       selector = Selector.open( );       serverChannel.register(selector, SelectionKey.OP_ACCEPT);     }     catch (IOException ex) {       ex.printStackTrace( );       return;        }          while (true) {              try {         selector.select( );       }       catch (IOException ex) {         ex.printStackTrace( );         break;       }                Set readyKeys = selector.selectedKeys( );       Iterator iterator = readyKeys.iterator( );       while (iterator.hasNext( )) {                  SelectionKey key = (SelectionKey) iterator.next( );         iterator.remove( );         try {           if (key.isAcceptable( )) {             ServerSocketChannel server = (ServerSocketChannel) key.channel( );             SocketChannel client = server.accept( );             System.out.println("Accepted connection from " + client);             client.configureBlocking(false);             SelectionKey key2 = client.register(selector, SelectionKey.                                                                     OP_WRITE);             ByteBuffer buffer = ByteBuffer.allocate(74);             buffer.put(rotation, 0, 72);             buffer.put((byte) '\r');             buffer.put((byte) '\n');             buffer.flip( );             key2.attach(buffer);           }           else if (key.isWritable( )) {             SocketChannel client = (SocketChannel) key.channel( );             ByteBuffer buffer = (ByteBuffer) key.attachment( );             if (!buffer.hasRemaining( )) {               // Refill the buffer with the next line               buffer.rewind( );                // Get the old first character               int first = buffer.get( );               // Get ready to change the data in the buffer               buffer.rewind( );               // Find the new first characters position in rotation               int position = first - ' ' + 1;               // copy the data from rotation into the buffer               buffer.put(rotation, position, 72);               // Store a line break at the end of the buffer               buffer.put((byte) '\r');               buffer.put((byte) '\n');               // Prepare the buffer for writing               buffer.flip( );             }             client.write(buffer);           }         }         catch (IOException ex) {           key.cancel( );           try {             key.channel( ).close( );           }           catch (IOException cex) {}         }                }     }      } } 

This example only uses one thread. There are situations where you might still want to use multiple threads, especially if different operations have different priorities. For instance, you might want to accept new connections in one high priority thread and service existing connections in a lower priority thread. However, you're no longer required to have a 1:1 ratio between threads and connections, which dramatically improves the scalability of servers written in Java.

It may also be important to use multiple threads for maximum performance. Multiple threads allow the server to take advantage of multiple CPUs. Even with a single CPU, it's often a good idea to separate the accepting thread from the processing threads. The thread pools discussed in Chapter 5 are still relevant even with the new I/O model. The thread that accepts the connections can add the connections it's accepted into the queue for processing by the threads in the pool. This is still faster than doing the same thing without selectors because select() ensures you're never wasting any time on connections that aren't ready to receive data. On the other hand, the synchronization issues here are quite tricky, so don't attempt this solution until profiling proves there is a bottleneck.



Java Network Programming
Java Network Programming, Third Edition
ISBN: 0596007213
EAN: 2147483647
Year: 2003
Pages: 164

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net