Pipe channels move data between two threads, one of which writes and one of which reads. The basic ideas are essentially the same as for piped streams. Data is written onto a Pipe.SinkChannel and then read from the connected Pipe.SourceChannel.
The details are about as simple as they can be. Pipe.SinkChannel is a subclass of AbstractSelectableChannel and implements WritableByteChannel and GatheringByteChannel:
public abstract static class Pipe.SinkChannel extends AbstractSelectableChannel implements WritableByteChannel, GatheringByteChannel
Pipe.SourceChannel is also a subclass of AbstractSelectableChannel and implements ReadableByteChannel and ScatteringByteChannel:
public abstract static class Pipe.SinkChannel extends AbstractSelectableChannel implements ReadableByteChannel, ScatteringByteChannel
Both are public inner classes in the java.nio.channels.Pipe class. Setting up a pipe between two threads is accomplished as follows:
Of course, since this is multithreaded, steps 4 and 5 happen in parallel. If anything, this is a little simpler than using PipedInputStream and PipedOutputStream to do the same job. Furthermore, because both channels can be put into nonblocking mode, each thread can do other things if it's running ahead of its partner. The speeds of the two channels aren't locked together.
As an example of this, I'll reproduce the Fibonacci producers and consumers from Chapter 9, this time implemented with channels and buffers instead of streams. To make this a little more interesting, I'll do it with BigIntegers instead of plain ints, since Fibonacci numbers grow exponentially. This is going to require a protocol for recognizing number boundaries in the stream. The protocol I chose is that the producing thread first writes the number of integers it plans to write onto the stream as a 4-byte int. It then writes the length (in bytes) of each number as a 4-byte int, then writes the bytes that make up that number.
The consuming thread first reads the number of numbers to read. For each such number, it then reads the size of the number from the channel, reads exactly that many bytes from the channel, and converts that to a BigInteger. This does set a theoretical upper limit on the size of the numbers that can be calculated, but in practice you'd run out of heap space long before you hit that limit.
This program has three classes: FibonacciProducer and FibonacciConsumer, which are subclasses of THRead, and NewIOFibonacciDriver, which sets up and runs the threads.Example 16-3 shows the driver class. It opens a pipe and retrieves its source and sink channels, which it uses to construct FibonacciProducer and FibonacciConsumer objects. It then starts those two threads.
Example 16-3. The NewIOFibonacciDriver class
import java.io.IOException; import java.nio.channels.*; public class NewIOFibonacciDriver { public static void main (String[] args) throws IOException { Pipe pipe = Pipe.open( ); WritableByteChannel out = pipe.sink( ); ReadableByteChannel in = pipe.source( ); FibonacciProducer producer = new FibonacciProducer(out, 200); FibonacciConsumer consumer = new FibonacciConsumer(in); producer.start( ); consumer.start( ); } } |
Example 16-4 shows the FibonacciProducer class, a subclass of Thread. This class does not directly use a sink channel; it just writes data onto the channel it's given in its constructor. After it has finished writing the requested amount of numbers, the channel is closed.
Example 16-4. The FibonacciProducer class
import java.io.IOException; import java.math.BigInteger; import java.nio.*; import java.nio.channels.*; public class FibonacciProducer extends Thread { private WritableByteChannel out; private int howMany; public FibonacciProducer(WritableByteChannel out, int howMany) { this.out = out; this.howMany = howMany; } public void run( ) { BigInteger low = BigInteger.ONE; BigInteger high = BigInteger.ONE; try { ByteBuffer buffer = ByteBuffer.allocate(4); buffer.putInt(this.howMany); buffer.flip( ); while (buffer.hasRemaining( )) out.write(buffer); for (int i = 0; i < howMany; i++) { byte[] data = low.toByteArray( ); // These numbers can become arbitrarily large, and they grow // exponentially so no fixed size buffer will suffice. buffer = ByteBuffer.allocate(4 + data.length); buffer.putInt(data.length); buffer.put(data); buffer.flip( ); while (buffer.hasRemaining( )) out.write(buffer); // find the next number in the series BigInteger temp = high; high = high.add(low); low = temp; } out.close( ); System.err.println("Closed"); } catch (IOException ex) { System.err.println(ex); } } } |
Example 16-5 shows the FibonacciConsumer class. It could just as well have been called the BigIntegerConsumer class, since it doesn't know anything about Fibonacci numbers. Its run( ) method merely reads the size of the BigInteger from the source channel, reads that many bytes, and converts those bytes into a BigInteger, which it then prints. It repeats this until the channel is exhausted.
Example 16-5. The FibonacciConsumer class
import java.io.IOException; import java.math.BigInteger; import java.nio.channels.*; import java.nio.*; public class FibonacciConsumer extends Thread{ private ReadableByteChannel in; public FibonacciConsumer(ReadableByteChannel in) { this.in = in; } public void run( ) { ByteBuffer sizeb = ByteBuffer.allocate(4); try { while (sizeb.hasRemaining( )) in.read(sizeb); sizeb.flip( ); int howMany = sizeb.getInt( ); sizeb.clear( ); for (int i = 0; i < howMany; i++) { while (sizeb.hasRemaining( )) in.read(sizeb); sizeb.flip( ); int length = sizeb.getInt( ); sizeb.clear( ); ByteBuffer data = ByteBuffer.allocate(length); while (data.hasRemaining( )) in.read(data); BigInteger result = new BigInteger(data.array( )); System.out.println(result); } } catch (IOException ex) { System.err.println(ex); } finally { try { in.close( ); } catch (Exception ex) { // We tried } } } } |
One thing that's a little unusual about this example is the use of two buffers to read the channel. This is necessary because the first buffer has to read the size of the second buffer. The first buffer can be reused. However, because the size of the numbers increases as we read further, new buffers are necessary to read the Fibonacci numbers themselves. It would probably be possible to contrive a way to reuse the same buffer repeatedly if it were too small for the numbers, but that seemed unnecessarily complex for no particular benefit.
There's not a lot of call for nonblocking mode in this example because the producer thread only writes and the consumer thread only reads, and both on only one channel. If either thread had something else to do it might make sense to use a Selector and put these channels into nonblocking mode.
Basic I/O
Introducing I/O
Output Streams
Input Streams
Data Sources
File Streams
Network Streams
Filter Streams
Filter Streams
Print Streams
Data Streams
Streams in Memory
Compressing Streams
JAR Archives
Cryptographic Streams
Object Serialization
New I/O
Buffers
Channels
Nonblocking I/O
The File System
Working with Files
File Dialogs and Choosers
Text
Character Sets and Unicode
Readers and Writers
Formatted I/O with java.text
Devices
The Java Communications API
USB
The J2ME Generic Connection Framework
Bluetooth
Character Sets