| ||||
Copyright 1999 Sams Publishing |
|
A FIFO Queue for Bytes: ByteFIFO |
The class ByteFIFO is very similar to ObjectFIFO , except that it holds byte values instead of object references. It is much more efficient to store bytes directly, rather than wrap them in Byte instances and store them as references. The public API for ByteFIFO consists of the following: |
public ByteFIFO(int cap) |
public int getCapacity() |
public synchronized int getSize() |
public synchronized boolean isEmpty() |
public synchronized boolean isFull() |
public synchronized void add(byte b) throws InterruptedException |
public synchronized void add(byte[] list) throws InterruptedException |
public synchronized byte remove() throws InterruptedException |
public synchronized byte[] removeAll() |
public synchronized byte[] removeAtLeastOne() throws InterruptedException |
public synchronized boolean waitUntilEmpty(long msTimeout) |
throws InterruptedException |
public synchronized void waitUntilEmpty() throws InterruptedException |
public synchronized void waitWhileEmpty() throws InterruptedException |
public synchronized void waitUntilFull() throws InterruptedException |
public synchronized void waitWhileFull() throws InterruptedException |
This is pretty much the same API as ObjectFIFO , except that byte and byte[] are passed and returned instead of Object and Object[] . In addition, the addEach() method is gone; it is replaced with an overloaded add() method that takes a byte[] as the parameter. |
All the methods (except getCapacity() ) are synchronized to ensure that multiple threads can safely and simultaneously interact with ByteFIFO . The getCapacity() method does not have to be synchronized because the capacity never changes. The methods that might block, waiting for something to change, declare that they will throw an InterruptedException if interrupted while waiting. Listing 18.7 shows the code for ByteFIFO . |
Listing 18.7 ByteFIFO.javaA FIFO Queue That Holds Bytes |
1: public class ByteFIFO extends Object { |
2: private byte[] queue; |
3: private int capacity; |
4: private int size ; |
5: private int head; |
6: private int tail; |
7: |
8: public ByteFIFO(int cap) { |
9: capacity = (cap > 0) ? cap : 1; // at least 1 |
10: queue = new byte[capacity]; |
11: head = 0; |
12: tail = 0; |
13: size = 0; |
14: } |
15: |
16: public int getCapacity() { |
17: return capacity; |
18: } |
19: |
20: public synchronized int getSize() { |
21: return size; |
22: } |
23: |
24: public synchronized boolean isEmpty() { |
25: return (size == 0); |
26: } |
27: |
28: public synchronized boolean isFull() { |
29: return (size == capacity); |
30: } |
31: |
32: public synchronized void add(byte b) |
33: throws InterruptedException { |
34: |
35: waitWhileFull(); |
36: |
37: queue[head] = b; |
38: head = (head + 1) % capacity; |
39: size++; |
40: |
41: notifyAll(); // let any waiting threads know about change |
42: } |
43: |
44: public synchronized void add(byte[] list) |
45: throws InterruptedException { |
46: |
47: // For efficiency, the bytes are copied in blocks |
48: // instead of one at a time. As space becomes available, |
49: // more bytes are copied until all of them have been |
50: // added. |
51: |
52: int ptr = 0; |
53: |
54: while (ptr < list.length) { |
55: // If full, the lock will be released to allow |
56: // another thread to come in and remove bytes. |
57: waitWhileFull(); |
58: |
59: int space = capacity - size; |
60: int distToEnd = capacity - head; |
61: int blockLen = Math.min(space, distToEnd); |
62: |
63: int bytesRemaining = list.length - ptr; |
64: int copyLen = Math.min(blockLen, bytesRemaining); |
65: |
66: System.arraycopy(list, ptr, queue, head, copyLen); |
67: head = (head + copyLen) % capacity; |
68: size += copyLen; |
69: ptr += copyLen; |
70: |
71: // Keep the lock, but let any waiting threads |
72: // know that something has changed. |
73: notifyAll(); |
74: } |
75: } |
76: |
77: public synchronized byte remove() |
78: throws InterruptedException { |
79: |
80: waitWhileEmpty(); |
81: |
82: byte b = queue[tail]; |
83: tail = (tail + 1) % capacity; |
84: size; |
85: |
86: notifyAll(); // let any waiting threads know about change |
87: |
88: return b; |
89: } |
90: |
91: public synchronized byte[] removeAll() { |
92: // For efficiency, the bytes are copied in blocks |
93: // instead of one at a time. |
94: |
95: if (isEmpty()) { |
96: // Nothing to remove, return a zero-length |
97: // array and do not bother with notification |
98: // since nothing was removed. |
99: return new byte[0]; |
100: } |
Listing 18.7 Continued |
101: |
102: // based on the current size |
103: byte[] list = new byte[size]; |
104: |
105: // copy in the block from tail to the end |
106: int distToEnd = capacity - tail; |
107: int copyLen = Math.min(size, distToEnd); |
108: System.arraycopy(queue, tail, list, 0, copyLen); |
109: |
110: // If data wraps around, copy the remaining data |
111: // from the front of the array. |
112: if (size > copyLen) { |
113: System.arraycopy( |
114: queue, 0, list, copyLen, size - copyLen); |
115: } |
116: |
117: tail = (tail + size) % capacity; |
118: size = 0; // everything has been removed |
119: |
120: // Signal any and all waiting threads that |
121: // something has changed. |
122: notifyAll(); |
123: |
124: return list; |
125: } |
126: |
127: public synchronized byte[] removeAtLeastOne() |
128: throws InterruptedException { |
129: |
130: waitWhileEmpty(); // wait for at least one to be in FIFO |
131: return removeAll(); |
132: } |
133: |
134: public synchronized boolean waitUntilEmpty(long msTimeout) |
135: throws InterruptedException { |
136: |
137: if (msTimeout == 0L) { |
138: waitUntilEmpty(); // use other method |
139: return true; |
140: } |
141: |
142: // wait only for the specified amount of time |
143: long endTime = System.currentTimeMillis() + msTimeout; |
144: long msRemaining = msTimeout; |
145: |
146: while (!isEmpty() && (msRemaining > 0L)) { |
147: wait(msRemaining); |
148: msRemaining = endTime - System.currentTimeMillis(); |
149: } |
150: |
151: // May have timed out, or may have met condition, |
152: // calc return value. |
153: return isEmpty(); |
154: } |
155: |
156: public synchronized void waitUntilEmpty() |
157: throws InterruptedException { |
158: |
159: while (!isEmpty()) { |
160: wait(); |
161: } |
162: } |
163: |
164: public synchronized void waitWhileEmpty() |
165: throws InterruptedException { |
166: |
167: while (isEmpty()) { |
168: wait(); |
169: } |
170: } |
171: |
172: public synchronized void waitUntilFull() |
173: throws InterruptedException { |
174: |
175: while (!isFull()) { |
176: wait(); |
177: } |
178: } |
179: |
180: public synchronized void waitWhileFull() |
181: throws InterruptedException { |
182: |
183: while (isFull()) { |
184: wait(); |
185: } |
186: } |
187: } |
The getCapacity(), getSize(), isEmpty(), isFull(), waitUntilEmpty(long msTimeout), waitUntilEmpty(), waitWhileEmpty(), waitUntilFull() , and waitWhileFull() methods work exactly the same as in ObjectFIFO ( see the descriptions earlier in this chapter ). The removeAtLeastOne() method (lines 127132) differs only in that it returns a byte ]. The add( byte b ) method (lines 3242) differs only in that it is passed a byte (line 32) and stores a byte (line 37). |
The remove() method (lines 7789) is much the same as before but instead handles bytes. In addition, it no longer has to set the vacated cell to null (or any other value) because the values in a byte[] dont interfere with garbage collection. |
The add(byte[] list) method (lines 4475) efficiently stores the values directly into the queue, rather than repeatedly invoking add(byte b) . If list has a length greater than , the while loop is entered. Inside the while , waitWhileFull() is invoked (line 57) to block and wait, if necessary, for more space to become available. The number of open cells ( space ) is calculated (line 59). The number of cells ( distToEnd ) between the current position of head and the end of the array is calculated (line 60). The lesser of space and distToEnd is used to calculate blockLen , which is the largest block that can be copied in one operation (line 61). |
Next, the number of bytes that still have to be copied ( bytesRemaining ) from list into queue is calculated (line 63). The number of bytes that will be copied this time through the loop ( copyLen ) is the lesser of blockLen and bytesRemaining (line 64). The actual copying from list into queue is performed for copyLen bytes (line 66). The values for head , size , and ptr are all adjusted based on copyLen (lines 6769). Any and all waiting threads are notified that the state of the FIFO queue has changed (line 73). The while loop continues until all the bytes in list have been copied into queue . Each time through the while loop, the thread may block in waitWhileFull() to allow another thread to come in and remove some bytes to make more space. |
The removeAll() method (lines 91125) efficiently copies bytes from queue into a new byte array. Note that it has no need to throw an InterruptedException because it never blocks. This byte array has a length equal to the current number of bytes in the FIFO queuepossibly a length of if the FIFO queue is currently empty (lines 95100). If the queue is not empty, a byte[] is created, with a length equal to the current number of items in the queue (line 103). The number of cells from the tail pointer to the end of the array is calculated and stored in distToEnd (line 106). The number of bytes to copy ( copyLen ) is the minimum of distToEnd and size (line 107). These bytes are copied into list , which will be returned (line 108). If more bytes have to be copied from the beginning of the queue because the data wrapped around, they are copied based on the difference between size and copyLen (lines 112115). The tail pointer is adjusted by size to reflect the removal of the bytes (line 117). The size is set to because the FIFO queue is now empty (line 118). Any and all waiting threads are notified of the changes to the FIFO queue (line 122). Finally, the array containing the copied bytes is returned (line 124). |
ByteFIFOTest , in Listing 18.8, is used to demonstrate some of the functionality of ByteFIFO . Basically, a set of strings is serialized and passed by one thread through a ByteFIFO to another thread. This other thread gathers up the bytes as they come through the FIFO queue and reconstructs the strings. |
Listing 18.8 ByteFIFOTest.javaCode to Demonstrate ByteFIFO |
1: import java.io.*; |
2: |
3: public class ByteFIFOTest extends Object { |
4: private ByteFIFO fifo; |
5: private byte[] srcData; |
6: |
7: public ByteFIFOTest() throws IOException { |
8: fifo = new ByteFIFO(20); |
9: |
10: makeSrcData(); |
11: System.out.println(srcData.length= + srcData.length); |
12: |
13: Runnable srcRunnable = new Runnable() { |
14: public void run() { |
15: src(); |
16: } |
17: }; |
18: Thread srcThread = new Thread(srcRunnable); |
19: srcThread.start(); |
20: |
21: Runnable dstRunnable = new Runnable() { |
22: public void run() { |
23: dst(); |
24: } |
25: }; |
26: Thread dstThread = new Thread(dstRunnable); |
27: dstThread.start(); |
28: } |
29: |
30: private void makeSrcData() throws IOException { |
31: String[] list = { |
32: The first string is right here, |
33: The second string is a bit longer and also right here, |
34: The third string, |
35: ABCDEFGHIJKLMNOPQRSTUVWXYZ, |
36: 0123456789, |
37: The last string in the list |
38: }; |
39: |
40: ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
41: ObjectOutputStream oos = new ObjectOutputStream(baos); |
42: oos.writeObject(list); |
43: oos.flush(); |
44: oos.close(); |
45: |
46: srcData = baos.toByteArray(); |
47: } |
48: |
49: private void src() { |
50: try { |
51: boolean justAddOne = true; |
52: int count = 0; |
53: |
54: while (count < srcData.length) { |
55: if (!justAddOne) { |
56: int writeSize = (int) (40.0 * Math.random()); |
57: writeSize = Math.min(writeSize, |
srcData.length - count); |
58: |
59: byte[] buf = new byte[writeSize]; |
60: System.arraycopy(srcData, count, buf, 0, writeSize); |
61: fifo.add(buf); |
62: count += writeSize; |
63: |
64: System.out.println(just added + writeSize + bytes); |
65: } else { |
66: fifo.add(srcData[count]); |
67: count++; |
68: |
69: System.out.println(just added exactly 1 byte); |
70: } |
71: |
72: justAddOne = !justAddOne; |
73: } |
74: } catch (InterruptedException x) { |
75: x.printStackTrace(); |
76: } |
77: } |
78: |
79: private void dst() { |
80: try { |
81: boolean justAddOne = true; |
82: int count = 0; |
83: byte[] dstData = new byte[srcData.length]; |
84: |
85: while (count < dstData.length) { |
86: if (!justAddOne) { |
87: byte[] buf = fifo.removeAll(); |
88: if (buf.length > 0) { |
89: System.arraycopy(buf, 0, dstData, count, buf.length); |
90: count += buf.length; |
91: } |
92: |
93: System.out.println( |
94: just removed + buf.length + bytes); |
95: } else { |
96: byte b = fifo.remove(); |
97: dstData[count] = b; |
98: count++; |
99: |
100: System.out.println( |
101: just removed exactly 1 byte); |
102: } |
103: |
104: justAddOne = !justAddOne; |
105: } |
106: |
107: System.out.println(received all data, count= + count); |
108: |
109: ObjectInputStream ois = new ObjectInputStream( |
110: new ByteArrayInputStream(dstData)); |
111: |
112: String[] line = (String[]) ois.readObject(); |
113: |
114: for (int i = 0; i < line.length; i++) { |
115: System.out.println(line[ + i + ]= + line[i]); |
116: } |
117: } catch (ClassNotFoundException x1) { |
118: x1.printStackTrace(); |
119: } catch (IOException iox) { |
120: iox.printStackTrace(); |
121: } catch (InterruptedException x) { |
122: x.printStackTrace(); |
123: } |
124: } |
125: |
126: public static void main(String[] args) { |
127: try { |
128: new ByteFIFOTest(); |
129: } catch (IOException iox) { |
130: iox.printStackTrace(); |
131: } |
132: } |
133: } |
The constructor for ByteFIFOTest creates a relatively small ByteFIFO with a capacity of 20 (line 8) for transferring data from one thread to another. The makeSrcData() method is called (line 10) to load srcData (line 5) with the bytes that will be pushed through the ByteFIFO . Next, a thread is created to run the src() method, and another thread is created to run the dst() method. |
Inside makeSrcData() (lines 3047), an array of strings (lines 3138) is created and written to an ObjectOutputStream (lines 4144). The ObjectOutputStream() passes the serialized data on to a ByteArrayOutputStream() (line 40). The bytes collected are turned into a byte[] and stored in srcData (line 46). |
The src() method (lines 4977) takes the bytes from srcData and pushes them into the FIFO queue. It alternates between adding a single byte and adding a byte array each time through the while loop by toggling the justAddOne variable (lines 51, 55, and 72). The size of the byte[] to be added is randomly determined (line 56) to keep things interesting. As data is added to the ByteFIFO , messages are printed (lines 64 and 69). This method completes when all the bytes in srcData have been added to the FIFO queue. |
The dst() method (lines 79124) removes the bytes from the ByteFIFO , stores them in a local array, and then de-serializes the object to confirm its successful transmission. The dst() method alternates between remove() and removeAll() each time through the while loop. The looping continues until the specified number of bytes has been removed (lines 8385). As data is removed from the ByteFIFO , messages are printed (lines 9394, 100, and 101). When all the bytes have been retrieved, they are used to create a ByteArrayInputStream that is, in turn , used to create an ObjectInputStream (lines 109110). The one object that is serialized is a String[] , and an attempt to read it back and cast it into its proper type is made (line 112). The String[] is then iterated through, and each string is printed to confirm uncorrupted delivery (lines 114116). |
Listing 18.9 shows possible output when ByteFIFOTest is run. Your output is likely to differ a bit, but ultimately, the list of strings printed at the end should match exactly. |
Listing 18.9 Possible Output from Running ByteFIFOTest |
1: srcData.length=224 |
2: just added exactly 1 byte |
3: just removed exactly 1 byte |
4: just removed 19 bytes |
5: just added 26 bytes |
6: just added exactly 1 byte |
7: just added 7 bytes |
8: just added exactly 1 byte |
9: just removed exactly 1 byte |
10: just removed 20 bytes |
11: just added 20 bytes |
12: just removed exactly 1 byte |
13: just added exactly 1 byte |
14: just removed 15 bytes |
15: just added 18 bytes |
16: just removed exactly 1 byte |
17: just added exactly 1 byte |
18: just removed 18 bytes |
19: just removed exactly 1 byte |
20: just added 21 bytes |
21: just removed 20 bytes |
22: just added exactly 1 byte |
23: just removed exactly 1 byte |
24: just added 0 bytes |
25: just removed 0 bytes |
26: just added exactly 1 byte |
27: just removed exactly 1 byte |
28: just removed 20 bytes |
29: just added 33 bytes |
30: just removed exactly 1 byte |
31: just added exactly 1 byte |
32: just removed 13 bytes |
33: just removed exactly 1 byte |
34: just removed 20 bytes |
35: just added 23 bytes |
36: just added exactly 1 byte |
37: just removed exactly 1 byte |
38: just removed 19 bytes |
39: just added 24 bytes |
40: just added exactly 1 byte |
41: just added 5 bytes |
42: just added exactly 1 byte |
43: just removed exactly 1 byte |
44: just added 6 bytes |
45: just removed 19 bytes |
46: just added exactly 1 byte |
47: just removed exactly 1 byte |
48: just added 20 bytes |
49: just removed 20 bytes |
50: just added exactly 1 byte |
51: just removed exactly 1 byte |
52: just added 8 bytes |
53: just removed 8 bytes |
54: received all data, count=224 |
55: line[0]=The first string is right here |
56: line[1]=The second string is a bit longer and also right here |
57: line[2]=The third string |
58: line[3]=ABCDEFGHIJKLMNOPQRSTUVWXYZ |
59: line[4]=0123456789 |
60: line[5]=The last string in the list |
| |||
Toc |