Benefits of Thread Pooling

Chapter 15 - Breaking Out of a Blocked I/O State

Java Thread Programming
Paul Hyde
  Copyright 1999 Sams Publishing

Throwing InterruptedIOException When Interrupted
The read() method on PipedInputStream will throw a subclass of IOException called InterruptedIOException if the blocked thread is interrupted while waiting for bytes to arrive . This is very useful functionality that I would like to see implemented across the whole java.io package. Until that happens, other techniques have to be used to get out of the blocked state. The next example illustrates such a technique.
Class ThreadedInputStream
The class ThreadedInputStream (see Listing 15.7) is a subclass of FilterInputStream and responds to interrupts by throwing an InterruptedIOException . It uses an internal thread to read from the underlying stream and loads the bytes into a ByteFIFO (discussed in Chapter 18, First-In-First-Out (FIFO) Queue ). This read-ahead mechanism can help speed performance but does carry the cost of an extra thread running in the VM. Although very useful, instances of ThreadedInputStream are not particularly lightweight and should be used sparingly in an application.
Listing 15.7  ThreadedInputStream.javaInterruptible read() Capability
  1: import java.io.*;
  2:
  3: // uses SureStop from chapter 16
  4: // uses ByteFIFO from chapter 18
  5:
  6: public class ThreadedInputStream extends FilterInputStream {
  7:     private ByteFIFO buffer;
  8:
  9:     private volatile boolean closeRequested;
10:     private volatile boolean eofDetected;
11:     private volatile boolean ioxDetected;
12:     private volatile String ioxMessage;
13:
14:     private Thread internalThread;
15:     private volatile boolean noStopRequested;
16:
17:     public ThreadedInputStream(InputStream in, int bufferSize) {
18:         super(in);
19:
20:         buffer = new ByteFIFO(bufferSize);
21:
22:         closeRequested = false;
23:         eofDetected = false;
24:         ioxDetected = false;
25:         ioxMessage = null;
26:
27:         noStopRequested = true;
28:         Runnable r = new Runnable() {
29:                 public void run() {
30:                     try {
31:                         runWork();
32:                     } catch (Exception x) {
33:                         // in case ANY exception slips through
34:                         x.printStackTrace();
35:                     }
36:                 }
37:             };
38:
39:         internalThread = new Thread(r);
40:         internalThread.setDaemon(true);
41:         internalThread.start();
42:     }
43:
44:     public ThreadedInputStream(InputStream in) {
45:         this(in, 2048);
46:     }
47:
48:     private void runWork() {
49:         byte[] workBuf = new byte[buffer.getCapacity()];
50:
51:         try {
52:             while (noStopRequested) {
53:                 int readCount = in.read(workBuf);
54:
55:                 if (readCount == -1) {
56:                     signalEOF();
57:                     stopRequest();
58:                 } else if (readCount > 0) {
59:                     addToBuffer(workBuf, readCount);
60:                 }
61:             }
62:         } catch (IOException iox) {
63:             if (!closeRequested) {
64:                 ioxMessage = iox.getMessage();
65:                 signalIOX();
66:             }
67:         } catch (InterruptedException x) {
68:             // ignore
69:         } finally {
70:             // no matter what, make sure that eofDetected is set
71:             signalEOF();
72:         }
73:     }
74:
75:     private void signalEOF() {
76:         synchronized (buffer) {
77:             eofDetected = true;
78:             buffer.notifyAll();
79:         }
80:     }
81:
82:     private void signalIOX() {
83:         synchronized (buffer) {
84:             ioxDetected = true;
85:             buffer.notifyAll();
86:         }
87:     }
88:    
89:     private void signalClose() {
90:         synchronized (buffer) {
91:             closeRequested = true;
92:             buffer.notifyAll();
93:         }
94:     }
95:
96:     private void addToBuffer(byte[] workBuf, int readCount)
97:             throws InterruptedException {
98:
99:         // Create an array exactly as large as the number of
100:         // bytes read and copy the data into it.
101:         byte[] addBuf = new byte[readCount];
102:         System.arraycopy(workBuf, 0, addBuf, 0, addBuf.length);
103:
104:         buffer.add(addBuf);
105:     }
106:
107:     private void stopRequest() {
108:         if (noStopRequested) {
109:             noStopRequested = false;
110:             internalThread.interrupt();
111:         }
Listing 15.7  Continued
112:     }
113:
114:     public void close() throws IOException {
115:         if (closeRequested) {
116:             // already closeRequested, just return
117:             return;
118:         }
119:         signalClose();
120:
121:         SureStop.ensureStop(internalThread, 10000);
122:         stopRequest();
123:
124:         // Use a new thread to close in in case it blocks
125:         final InputStream localIn = in;
126:         Runnable r = new Runnable() {
127:                 public void run() {
128:                     try {
129:                         localIn.close();
130:                     } catch (IOException iox) {
131:                         // ignore
132:                     }
133:                 }
134:             };
135:
136:         Thread t = new Thread(r, in-close);
137:         // give up when all other non-daemon threads die
138:         t.setDaemon(true); 
139:         t.start();
140:     }
141:
142:     private void throwExceptionIfClosed() throws IOException {
143:         if (closeRequested) {
144:             throw new IOException(stream is closed);
145:         }
146:     }
147:
148:     // Throws InterruptedIOException if the thread blocked on
149:     // read() is interrupted while waiting for data to arrive.
150:     public int read()
151:             throws InterruptedIOException, IOException {
152:
153:         // Using read(byte[]) to keep code in one place makes
154:         // single-byte read less efficient, but simplifies
155:         // the coding.
156:         byte[] data = new byte[1];
157:         int ret = read(data, 0, 1);
158:
159:         if (ret != 1) {
160:             return -1;
161:         }
162:
163:         return data[0] & 0x000000FF;
164:     }
165:
166:     // Throws InterruptedIOException if the thread blocked on
167:     // read() is interrupted while waiting for data to arrive.
168:     public int read(byte[] dest)
169:             throws InterruptedIOException, IOException {
170:
171:         return read(dest, 0, dest.length);
172:     }
173:
174:     // Throws InterruptedIOException if the thread blocked on
175:     // read() is interrupted while waiting for data to arrive.
176:     public int read(
177:                 byte[] dest,
178:                 int offset,
179:                 int length
180:            ) throws InterruptedIOException, IOException {
181:
182:         throwExceptionIfClosed();
183:
184:         if (length < 1) {
185:             return 0;
186:         }
187:
188:         if ((offset < 0)
189:              ((offset + length) > dest.length)
190:           ) {
191:
192:             throw new IllegalArgumentException(
193:                 offset must be at least 0, and +
194:                 (offset + length) must be less than or +
195:                 equal to dest.length. +
196:                 offset= + offset +
197:                 , (offset + length)= + (offset + length) +
198:                 , dest.length= + dest.length);
Listing 15.7  Continued
199:         }
200:
201:         byte[] data = removeUpTo(length);
202:
203:         if (data.length > 0) {
204:             System.arraycopy(data, 0, dest, offset, data.length);
205:             return data.length;
206:         }
207:
208:         // no data
209:         if (eofDetected) {
210:             return -1;
211:         }
212:
213:         // no data and not end of file, must be exception
214:         stopRequest();
215:
216:         if (ioxMessage == null) {
217:             ioxMessage = stream cannot be read;
218:         }
219:
220:         throw new IOException(ioxMessage);
221:     }
222:
223:     private byte[] removeUpTo(int maxRead) throws IOException {
224:         // Convenience method to assist read(byte[], int, int).
225:         // Waits until at least one byte is ready, EOF is
226:         // detected ,  an IOException is thrown, or the
227:         // stream is closed.
228:         try {
229:             synchronized (buffer) {
230:                 while (buffer.isEmpty() &&
231:                         !eofDetected &&
232:                         !ioxDetected &&
233:                         !closeRequested
234:                      ) {
235:    
236:                     buffer.wait();
237:                 }
238:    
239:                 // If stream was closed while waiting,
240:                 // get out right away.
241:                 throwExceptionIfClosed();
242:
243:                 // Ignore eof and exception flags for now, see
244:                 // if any data remains.
245:                 byte[] data = buffer.removeAll();
246:    
247:                 if (data.length > maxRead) {
248:                     // Pulled out too many bytes,
249:                     // put excess back.
250:                     byte[] putBackData =
251:                             new byte[data.length - maxRead];
252:                     System.arraycopy(data, maxRead,
253:                             putBackData, 0, putBackData.length);
254:                     buffer.add(putBackData);
255:    
256:                     byte[] keepData = new byte[maxRead];
257:                     System.arraycopy(data, 0,
258:                             keepData, 0, keepData.length);
259:                     data = keepData;
260:                 }
261:    
262:                 return data;
263:             }
264:         } catch (InterruptedException ix) {
265:             // convert to an IOException
266:             throw new InterruptedIOException(interrupted +
267:                 while waiting for data to arrive for reading);
268:         }
269:     }
270:
271:     public long skip(long n) throws IOException {
272:         throwExceptionIfClosed();
273:
274:         if (n <= 0) {
275:             return 0;
276:         }
277:
278:         int skipLen = (int) Math.min(n, Integer.MAX_VALUE);
279:         int readCount = read(new byte[skipLen]);
280:
281:         if (readCount < 0) {
282:             return 0;
283:         }
284:
285:         return readCount;
286:     }
287:
288:     public int available() throws IOException {
289:         throwExceptionIfClosed();
290:         return buffer.getSize();
291:     }
292:
293:     public boolean markSupported() {
294:         return false;
295:     }
296:
297:     public synchronized void mark(int readLimit) {
298:         // ignore method calls, mark not supported
299:     }
300:
301:     public synchronized void reset() throws IOException {
302:         throw new IOException(
303:                 mark-reset not supported on this stream);
304:     }
305: }
ThreadedInputStream extends FilterInputStream (line 6) and passes the InputStream handed to the constructor up the constructor for the superclass (line 18). The superclass has a protected member variable called in that holds a reference to the underlying InputStream ; this reference is used throughout ThreadedInputStream .
The internal thread basically just reads as much data as it can and loads it into the ByteFIFO buffer (lines 5261). If the buffer is full, the internal thread blocks waiting for some data to be read out of the buffer (line 104). When the internal thread gets to the end of the file (EOF), it sets a flag (lines 7580). If the internal thread encounters an IOException while reading into the buffer, it sets a flag (lines 8287). The ByteFIFO is used for all of the wait-notify signaling.
When an external thread comes in to read some data, it gets the data from the ByteFIFO buffer (lines 201220, 223269). The external thread pays attention to the EOF and exception flags only if there is no more data in the buffer. This delays the reporting until the external thread catches up to the internal thread. If the external thread is blocked waiting for some data to arrive (line 236) and is then interrupted, it will jump to the catch block (lines 264268). There, the InterruptedException is caught, and a new InterruptedIOException is thrown in its place (lines 266267). This means that a thread blocked on a read() will now respond to interrupts!
The close() method (lines 114140) is used to shut down the internal thread (notice that in this class, stopRequest() is private ). The close() method can be safely invoked more than once because it simply ignores subsequent requests . It starts by signaling that a close has been requested (line 119), which will cause any blocked read() calls on ThreadedInputStream to throw an IOException (line 241). It then uses SureStop to make sure that even if all else fails, the internal thread will be stopped in 10 seconds (line 121). Inside close() , a new thread is created to invoke close() on the underlying stream (lines 125139). This step is necessary in case the call blocks for quite a whileor even forever if a deadlock scenario occurs. ThreadedInputStream cant control what kind of InputStream it is passed in its constructor, so this extra thread is just added insurance.
Class BufferedThreadedInputStream
The class ThreadedInputStream just focuses on the task of splitting up the transfer of data between two threads. It performs poorly on single-byte reads and does not have any support for the mark-reset mechanism. BufferedThreadedInputStream (see Listing 15.8) makes up for these shortcomings by using a ThreadedInputStream with a BufferedInputStream added on both ends to smooth data flow.
Listing 15.8  BufferedThreadedInputStream.javaThreadedInputStream with Buffering
1: import java.io.*;
2:
3: // uses ThreadedInputStream
4:
5: public class BufferedThreadedInputStream
6:         extends FilterInputStream {
7:
8:     // fixed class that does *not* have a synchronized close()
9:     private static class BISFix extends BufferedInputStream {
10:         public BISFix(InputStream rawIn, int buffSize) {
11:             super(rawIn, buffSize);
12:         }
13:
14:         public void close() throws IOException {
15:             if (in != null) {
16:                 try {
17:                     in.close();
18:                 } finally {
19:                     in = null;
20:                 }
21:             }
22:         }
23:     }
24:
25:     public BufferedThreadedInputStream(
26:                 InputStream rawIn,
27:                 int bufferSize
28:            ) {
29:
30:         super(rawIn); // super-class in is set below
31:
32:         // rawIn -> BufferedIS -> ThreadedIS ->
33:         //       BufferedIS -> read()
34:
35:         BISFix bis = new BISFix(rawIn, bufferSize);
36:         ThreadedInputStream tis =
37:                 new ThreadedInputStream(bis, bufferSize);
38:
39:         // Change the protected variable ˜in from the
40:         // superclass from rawIn to the correct stream.
41:         in = new BISFix(tis, bufferSize);
42:     }
43:
44:     public BufferedThreadedInputStream(InputStream rawIn) {
45:         this(rawIn, 2048);
46:     }
47:
48:     // Overridden to show that InterruptedIOException might
49:     // be thrown.
50:     public int read()
51:             throws InterruptedIOException, IOException {
52:
53:         return in.read();
54:     }
55:
56:     // Overridden to show that InterruptedIOException might
57:     // be thrown.
58:     public int read(byte[] b)
59:             throws InterruptedIOException, IOException {
60:
61:         return in.read(b);
62:     }
63:
64:     // Overridden to show that InterruptedIOException might
65:     // be thrown.
66:     public int read(byte[] b, int off, int len)
67:             throws InterruptedIOException, IOException {
68:
69:         return in.read(b, off, len);
70:     }
71:
72:     // Overridden to show that InterruptedIOException might
73:     // be thrown.
74:     public long skip(long n)
75:             throws InterruptedIOException, IOException {
76:
77:         return in.skip(n);
78:     }
79:
80:     // The remainder of the methods are directly inherited from
81:     // FilterInputStream and access in in the much the same
82:     // way as the methods above do.
83: }
BufferedThreadedInputStream has a nested class (lines 823) called BISFix that simply extends BufferedInputStream and overrides close() so that it is not synchronized . This is a critical difference that is needed so that close() can be executed while another thread is blocked inside read() .
In the constructor (lines 2542), the raw input stream is wrapped in a BISFix (modified BufferedInputStream ), which is wrapped in a ThreadedInputStream , which is wrapped in another BISFix . This provides buffering for both the internal thread and any external thread that does some reading.

Toc


Java Thread Programming
Java Thread Programming
ISBN: 0672315858
EAN: 2147483647
Year: 2005
Pages: 149
Authors: Paul Hyde

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net