Pipe Channels

Pipe channels move data between two threads, one of which writes and one of which reads. The basic ideas are essentially the same as for piped streams. Data is written onto a Pipe.SinkChannel and then read from the connected Pipe.SourceChannel.

The details are about as simple as they can be. Pipe.SinkChannel is a subclass of AbstractSelectableChannel and implements WritableByteChannel and GatheringByteChannel:

public abstract static class Pipe.SinkChannel extends AbstractSelectableChannel
 implements WritableByteChannel, GatheringByteChannel

Pipe.SourceChannel is also a subclass of AbstractSelectableChannel and implements ReadableByteChannel and ScatteringByteChannel:

public abstract static class Pipe.SinkChannel extends AbstractSelectableChannel
 implements ReadableByteChannel, ScatteringByteChannel

Both are public inner classes in the java.nio.channels.Pipe class. Setting up a pipe between two threads is accomplished as follows:

  1. Open a pipe with the static Pipe.open( ) method.
  2. Get the SinkChannel from the pipe and pass it to the producing thread.
  3. Get the SourceChannel from the pipe and pass it to the consuming thread.
  4. The producing thread writes data onto the SinkChannel using the usual WritableByteChannel channel methods.
  5. The consuming thread reads data from the SourceChannel using the usual ReadableByteChannel methods.

Of course, since this is multithreaded, steps 4 and 5 happen in parallel. If anything, this is a little simpler than using PipedInputStream and PipedOutputStream to do the same job. Furthermore, because both channels can be put into nonblocking mode, each thread can do other things if it's running ahead of its partner. The speeds of the two channels aren't locked together.

As an example of this, I'll reproduce the Fibonacci producers and consumers from Chapter 9, this time implemented with channels and buffers instead of streams. To make this a little more interesting, I'll do it with BigIntegers instead of plain ints, since Fibonacci numbers grow exponentially. This is going to require a protocol for recognizing number boundaries in the stream. The protocol I chose is that the producing thread first writes the number of integers it plans to write onto the stream as a 4-byte int. It then writes the length (in bytes) of each number as a 4-byte int, then writes the bytes that make up that number.

The consuming thread first reads the number of numbers to read. For each such number, it then reads the size of the number from the channel, reads exactly that many bytes from the channel, and converts that to a BigInteger. This does set a theoretical upper limit on the size of the numbers that can be calculated, but in practice you'd run out of heap space long before you hit that limit.

This program has three classes: FibonacciProducer and FibonacciConsumer, which are subclasses of THRead, and NewIOFibonacciDriver, which sets up and runs the threads.Example 16-3 shows the driver class. It opens a pipe and retrieves its source and sink channels, which it uses to construct FibonacciProducer and FibonacciConsumer objects. It then starts those two threads.

Example 16-3. The NewIOFibonacciDriver class

import java.io.IOException;
import java.nio.channels.*;
public class NewIOFibonacciDriver {
 public static void main (String[] args) throws IOException {
 Pipe pipe = Pipe.open( );
 WritableByteChannel out = pipe.sink( );
 ReadableByteChannel in = pipe.source( );
 FibonacciProducer producer = new FibonacciProducer(out, 200);
 FibonacciConsumer consumer = new FibonacciConsumer(in);
 producer.start( );
 consumer.start( );
 }
}

Example 16-4 shows the FibonacciProducer class, a subclass of Thread. This class does not directly use a sink channel; it just writes data onto the channel it's given in its constructor. After it has finished writing the requested amount of numbers, the channel is closed.

Example 16-4. The FibonacciProducer class

import java.io.IOException;
import java.math.BigInteger;
import java.nio.*;
import java.nio.channels.*;
public class FibonacciProducer extends Thread {
 private WritableByteChannel out;
 private int howMany;
 public FibonacciProducer(WritableByteChannel out, int howMany) {
 this.out = out;
 this.howMany = howMany;
 }
 public void run( ) {
 BigInteger low = BigInteger.ONE;
 BigInteger high = BigInteger.ONE;
 try {
 ByteBuffer buffer = ByteBuffer.allocate(4);
 buffer.putInt(this.howMany);
 buffer.flip( );
 while (buffer.hasRemaining( )) out.write(buffer);
 for (int i = 0; i < howMany; i++) {
 byte[] data = low.toByteArray( );
 // These numbers can become arbitrarily large, and they grow
 // exponentially so no fixed size buffer will suffice.
 buffer = ByteBuffer.allocate(4 + data.length);
 buffer.putInt(data.length);
 buffer.put(data);
 buffer.flip( );
 while (buffer.hasRemaining( )) out.write(buffer);
 // find the next number in the series
 BigInteger temp = high;
 high = high.add(low);
 low = temp;
 }
 out.close( );
 System.err.println("Closed");
 }
 catch (IOException ex) {
 System.err.println(ex);
 }
 }
}

Example 16-5 shows the FibonacciConsumer class. It could just as well have been called the BigIntegerConsumer class, since it doesn't know anything about Fibonacci numbers. Its run( ) method merely reads the size of the BigInteger from the source channel, reads that many bytes, and converts those bytes into a BigInteger, which it then prints. It repeats this until the channel is exhausted.

Example 16-5. The FibonacciConsumer class

import java.io.IOException;
import java.math.BigInteger;
import java.nio.channels.*;
import java.nio.*;
public class FibonacciConsumer extends Thread{
 private ReadableByteChannel in;
 public FibonacciConsumer(ReadableByteChannel in) {
 this.in = in;
 }
 public void run( ) {
 ByteBuffer sizeb = ByteBuffer.allocate(4);
 try {
 while (sizeb.hasRemaining( )) in.read(sizeb);
 sizeb.flip( );
 int howMany = sizeb.getInt( );
 sizeb.clear( );
 for (int i = 0; i < howMany; i++) {
 while (sizeb.hasRemaining( )) in.read(sizeb);
 sizeb.flip( );
 int length = sizeb.getInt( );
 sizeb.clear( );
 ByteBuffer data = ByteBuffer.allocate(length);
 while (data.hasRemaining( )) in.read(data);
 BigInteger result = new BigInteger(data.array( ));
 System.out.println(result);
 }
 }
 catch (IOException ex) {
 System.err.println(ex);
 }
 finally {
 try {
 in.close( );
 }
 catch (Exception ex) {
 // We tried
 }
 }
 }
}

One thing that's a little unusual about this example is the use of two buffers to read the channel. This is necessary because the first buffer has to read the size of the second buffer. The first buffer can be reused. However, because the size of the numbers increases as we read further, new buffers are necessary to read the Fibonacci numbers themselves. It would probably be possible to contrive a way to reuse the same buffer repeatedly if it were too small for the numbers, but that seemed unnecessarily complex for no particular benefit.

There's not a lot of call for nonblocking mode in this example because the producer thread only writes and the consumer thread only reads, and both on only one channel. If either thread had something else to do it might make sense to use a Selector and put these channels into nonblocking mode.

Basic I/O

Introducing I/O

Output Streams

Input Streams

Data Sources

File Streams

Network Streams

Filter Streams

Filter Streams

Print Streams

Data Streams

Streams in Memory

Compressing Streams

JAR Archives

Cryptographic Streams

Object Serialization

New I/O

Buffers

Channels

Nonblocking I/O

The File System

Working with Files

File Dialogs and Choosers

Text

Character Sets and Unicode

Readers and Writers

Formatted I/O with java.text

Devices

The Java Communications API

USB

The J2ME Generic Connection Framework

Bluetooth

Character Sets



Java I/O
Java I/O
ISBN: 0596527500
EAN: 2147483647
Year: 2004
Pages: 244

Flylib.com © 2008-2020.
If you may any questions please contact us: flylib@qtcs.net