The SureStop Class

Chapter 18 - First-In-First-Out (FIFO) Queue

Java Thread Programming
Paul Hyde
  Copyright 1999 Sams Publishing

A FIFO Queue for Bytes: ByteFIFO
The class ByteFIFO is very similar to ObjectFIFO , except that it holds byte values instead of object references. It is much more efficient to store bytes directly, rather than wrap them in Byte instances and store them as references. The public API for ByteFIFO consists of the following:
public ByteFIFO(int cap)
public int getCapacity()
public synchronized int getSize()
public synchronized boolean isEmpty()
public synchronized boolean isFull()
public synchronized void add(byte b) throws InterruptedException
public synchronized void add(byte[] list) throws InterruptedException
public synchronized byte remove() throws InterruptedException
public synchronized byte[] removeAll()
public synchronized byte[] removeAtLeastOne() throws InterruptedException
public synchronized boolean waitUntilEmpty(long msTimeout)
               throws InterruptedException
public synchronized void waitUntilEmpty() throws InterruptedException
public synchronized void waitWhileEmpty() throws InterruptedException
public synchronized void waitUntilFull() throws InterruptedException
public synchronized void waitWhileFull() throws InterruptedException
This is pretty much the same API as ObjectFIFO , except that byte and byte[] are passed and returned instead of Object and Object[] . In addition, the addEach() method is gone; it is replaced with an overloaded add() method that takes a byte[] as the parameter.
All the methods (except getCapacity() ) are synchronized to ensure that multiple threads can safely and simultaneously interact with ByteFIFO . The getCapacity() method does not have to be synchronized because the capacity never changes. The methods that might block, waiting for something to change, declare that they will throw an InterruptedException if interrupted while waiting. Listing 18.7 shows the code for ByteFIFO .
Listing 18.7  ByteFIFO.javaA FIFO Queue That Holds Bytes
  1: public class ByteFIFO extends Object {
  2:     private byte[] queue;
  3:     private int capacity;
  4:     private int size ;
  5:     private int head;
  6:     private int tail;
  7:
  8:     public ByteFIFO(int cap) {
  9:         capacity = (cap > 0) ? cap : 1; // at least 1
10:         queue = new byte[capacity];
11:         head = 0;
12:         tail = 0;
13:         size = 0;
14:     }
15:
16:     public int getCapacity() {
17:         return capacity;
18:     }
19:
20:     public synchronized int getSize() {
21:         return size;
22:     }
23:
24:     public synchronized boolean isEmpty() {
25:         return (size == 0);
26:     }
27:
28:     public synchronized boolean isFull() {
29:         return (size == capacity);
30:     }
31:
32:     public synchronized void add(byte b)
33:             throws InterruptedException {
34:
35:         waitWhileFull();
36:
37:         queue[head] = b;
38:         head = (head + 1) % capacity;
39:         size++;
40:
41:         notifyAll(); // let any waiting threads know about change
42:     }
43:
44:     public synchronized void add(byte[] list)
45:             throws InterruptedException {
46:
47:         // For efficiency, the bytes are copied in blocks
48:         // instead of one at a time. As space becomes available,
49:         // more bytes are copied until all of them have been
50:         // added.
51:
52:         int ptr = 0;
53:
54:         while (ptr < list.length) {
55:             // If full, the lock will be released to allow
56:             // another thread to come in and remove bytes.
57:             waitWhileFull();
58:
59:             int space = capacity - size;
60:             int distToEnd = capacity - head;
61:             int blockLen = Math.min(space, distToEnd);
62:
63:             int bytesRemaining = list.length - ptr;
64:             int copyLen = Math.min(blockLen, bytesRemaining);
65:
66:             System.arraycopy(list, ptr, queue, head, copyLen);
67:             head = (head + copyLen) % capacity;
68:             size += copyLen;
69:             ptr += copyLen;
70:
71:             // Keep the lock, but let any waiting threads
72:             // know that something has changed.
73:             notifyAll();
74:         }
75:     }
76:
77:     public synchronized byte remove()
78:             throws InterruptedException {
79:
80:         waitWhileEmpty();
81:
82:         byte b = queue[tail];
83:         tail = (tail + 1) % capacity;
84:         size;
85:
86:         notifyAll(); // let any waiting threads know about change
87:
88:         return b;
89:     }
90:
91:     public synchronized byte[] removeAll() {
92:         // For efficiency, the bytes are copied in blocks
93:         // instead of one at a time.
94:
95:         if (isEmpty()) {
96:             // Nothing to remove, return a zero-length
97:             // array and do not bother with notification
98:             // since nothing was removed.
99:             return new byte[0];
100:         }
Listing 18.7  Continued
101:
102:         // based on the current size
103:         byte[] list = new byte[size];
104:
105:         // copy in the block from tail to the end
106:         int distToEnd = capacity - tail;
107:         int copyLen = Math.min(size, distToEnd);
108:         System.arraycopy(queue, tail, list, 0, copyLen);
109:
110:         // If data wraps around, copy the remaining data
111:         // from the front of the array.
112:         if (size > copyLen) {
113:             System.arraycopy(
114:                     queue, 0, list, copyLen, size - copyLen);
115:         }
116:
117:         tail = (tail + size) % capacity;
118:         size = 0; // everything has been removed
119:
120:         // Signal any and all waiting threads that
121:         // something has changed.
122:         notifyAll();
123:
124:         return list;
125:     }
126:
127:     public synchronized byte[] removeAtLeastOne()
128:             throws InterruptedException {
129:
130:         waitWhileEmpty(); // wait for at least one to be in FIFO
131:         return removeAll();
132:     }
133:
134:     public synchronized boolean waitUntilEmpty(long msTimeout)
135:             throws InterruptedException {
136:
137:         if (msTimeout == 0L) {
138:             waitUntilEmpty();  // use other method
139:             return true;
140:         }
141:
142:         // wait only for the specified amount of time
143:         long endTime = System.currentTimeMillis() + msTimeout;
144:         long msRemaining = msTimeout;
145:
146:         while (!isEmpty() && (msRemaining > 0L)) {
147:             wait(msRemaining);
148:             msRemaining = endTime - System.currentTimeMillis();
149:         }
150:
151:         // May have timed out, or may have met condition,
152:         // calc return value.
153:         return isEmpty();
154:     }
155:
156:     public synchronized void waitUntilEmpty()
157:             throws InterruptedException {
158:
159:         while (!isEmpty()) {
160:             wait();
161:         }
162:     }
163:
164:     public synchronized void waitWhileEmpty()
165:             throws InterruptedException {
166:
167:         while (isEmpty()) {
168:             wait();
169:         }
170:     }
171:
172:     public synchronized void waitUntilFull()
173:             throws InterruptedException {
174:
175:         while (!isFull()) {
176:             wait();
177:         }
178:     }
179:
180:     public synchronized void waitWhileFull()
181:             throws InterruptedException {
182:
183:         while (isFull()) {
184:             wait();
185:         }
186:     }
187: }
The getCapacity(), getSize(), isEmpty(), isFull(), waitUntilEmpty(long msTimeout), waitUntilEmpty(), waitWhileEmpty(), waitUntilFull() , and waitWhileFull() methods work exactly the same as in ObjectFIFO ( see the descriptions earlier in this chapter ). The removeAtLeastOne() method (lines 127132) differs only in that it returns a byte ]. The add( byte b ) method (lines 3242) differs only in that it is passed a byte (line 32) and stores a byte (line 37).
The remove() method (lines 7789) is much the same as before but instead handles bytes. In addition, it no longer has to set the vacated cell to null (or any other value) because the values in a byte[] dont interfere with garbage collection.
The add(byte[] list) method (lines 4475) efficiently stores the values directly into the queue, rather than repeatedly invoking add(byte b) . If list has a length greater than , the while loop is entered. Inside the while , waitWhileFull() is invoked (line 57) to block and wait, if necessary, for more space to become available. The number of open cells ( space ) is calculated (line 59). The number of cells ( distToEnd ) between the current position of head and the end of the array is calculated (line 60). The lesser of space and distToEnd is used to calculate blockLen , which is the largest block that can be copied in one operation (line 61).
Next, the number of bytes that still have to be copied ( bytesRemaining ) from list into queue is calculated (line 63). The number of bytes that will be copied this time through the loop ( copyLen ) is the lesser of blockLen and bytesRemaining (line 64). The actual copying from list into queue is performed for copyLen bytes (line 66). The values for head , size , and ptr are all adjusted based on copyLen (lines 6769). Any and all waiting threads are notified that the state of the FIFO queue has changed (line 73). The while loop continues until all the bytes in list have been copied into queue . Each time through the while loop, the thread may block in waitWhileFull() to allow another thread to come in and remove some bytes to make more space.
The removeAll() method (lines 91125) efficiently copies bytes from queue into a new byte array. Note that it has no need to throw an InterruptedException because it never blocks. This byte array has a length equal to the current number of bytes in the FIFO queuepossibly a length of if the FIFO queue is currently empty (lines 95100). If the queue is not empty, a byte[] is created, with a length equal to the current number of items in the queue (line 103). The number of cells from the tail pointer to the end of the array is calculated and stored in distToEnd (line 106). The number of bytes to copy ( copyLen ) is the minimum of distToEnd and size (line 107). These bytes are copied into list , which will be returned (line 108). If more bytes have to be copied from the beginning of the queue because the data wrapped around, they are copied based on the difference between size and copyLen (lines 112115). The tail pointer is adjusted by size to reflect the removal of the bytes (line 117). The size is set to because the FIFO queue is now empty (line 118). Any and all waiting threads are notified of the changes to the FIFO queue (line 122). Finally, the array containing the copied bytes is returned (line 124).
ByteFIFOTest , in Listing 18.8, is used to demonstrate some of the functionality of ByteFIFO . Basically, a set of strings is serialized and passed by one thread through a ByteFIFO to another thread. This other thread gathers up the bytes as they come through the FIFO queue and reconstructs the strings.
Listing 18.8  ByteFIFOTest.javaCode to Demonstrate ByteFIFO
  1: import java.io.*;
  2:
  3: public class ByteFIFOTest extends Object {
  4:     private ByteFIFO fifo;
  5:     private byte[] srcData;
  6:
  7:     public ByteFIFOTest() throws IOException {
  8:         fifo = new ByteFIFO(20);
  9:
10:         makeSrcData();
11:         System.out.println(srcData.length= + srcData.length);
12:
13:         Runnable srcRunnable = new Runnable() {
14:                 public void run() {
15:                     src();
16:                 }
17:             };
18:         Thread srcThread = new Thread(srcRunnable);
19:         srcThread.start();
20:
21:         Runnable dstRunnable = new Runnable() {
22:                 public void run() {
23:                     dst();
24:                 }
25:             };
26:         Thread dstThread = new Thread(dstRunnable);
27:         dstThread.start();
28:     }
29:
30:     private void makeSrcData() throws IOException {
31:         String[] list = {
32:                 The first string is right here,
33:          The second string is a bit longer and also right here,
34:                 The third string,
35:                 ABCDEFGHIJKLMNOPQRSTUVWXYZ,
36:                 0123456789,
37:                 The last string in the list
38:             };
39:
40:         ByteArrayOutputStream baos = new ByteArrayOutputStream();
41:         ObjectOutputStream oos = new ObjectOutputStream(baos);
42:         oos.writeObject(list);
43:         oos.flush();
44:         oos.close();
45:
46:         srcData = baos.toByteArray();
47:     }
48:
49:     private void src() {
50:         try {
51:             boolean justAddOne = true;
52:             int count = 0;
53:
54:             while (count < srcData.length) {
55:                 if (!justAddOne) {
56:                     int writeSize = (int) (40.0 * Math.random());
57:                     writeSize = Math.min(writeSize,
                                        srcData.length - count);
58:
59:                     byte[] buf = new byte[writeSize];
60:                     System.arraycopy(srcData, count, buf, 0, writeSize);
61:                     fifo.add(buf);
62:                     count += writeSize;
63:
64:                     System.out.println(just added + writeSize + bytes);
65:                 } else {
66:                     fifo.add(srcData[count]);
67:                     count++;
68:
69:                     System.out.println(just added exactly 1 byte);
70:                 }
71:
72:                 justAddOne = !justAddOne;
73:             }
74:         } catch (InterruptedException x) {
75:             x.printStackTrace();
76:         }
77:     }
78:
79:     private void dst() {
80:         try {
81:             boolean justAddOne = true;
82:             int count = 0;
83:             byte[] dstData = new byte[srcData.length];
84:
85:             while (count < dstData.length) {
86:                 if (!justAddOne) {
87:                     byte[] buf = fifo.removeAll();
88:                     if (buf.length > 0) {
89:                         System.arraycopy(buf, 0, dstData, count, buf.length);
90:                         count += buf.length;
91:                     }
92:
93:                     System.out.println(
94:                         just removed + buf.length + bytes);
95:                 } else {
96:                     byte b = fifo.remove();
97:                     dstData[count] = b;
98:                     count++;
99:
100:                     System.out.println(
101:                         just removed exactly 1 byte);
102:                 }
103:
104:                 justAddOne = !justAddOne;
105:             }
106:
107:             System.out.println(received all data, count= + count);
108:
109:             ObjectInputStream ois = new ObjectInputStream(
110:                     new ByteArrayInputStream(dstData));
111:
112:             String[] line = (String[]) ois.readObject();
113:
114:             for (int i = 0; i < line.length; i++) {
115:                 System.out.println(line[ + i + ]= + line[i]);
116:             }
117:         } catch (ClassNotFoundException x1) {
118:             x1.printStackTrace();
119:         } catch (IOException iox) {
120:             iox.printStackTrace();
121:         } catch (InterruptedException x) {
122:             x.printStackTrace();
123:         }
124:     }
125:
126:     public static void main(String[] args) {
127:         try {
128:             new ByteFIFOTest();
129:         } catch (IOException iox) {
130:             iox.printStackTrace();
131:         }
132:     }
133: }
The constructor for ByteFIFOTest creates a relatively small ByteFIFO with a capacity of 20 (line 8) for transferring data from one thread to another. The makeSrcData() method is called (line 10) to load srcData (line 5) with the bytes that will be pushed through the ByteFIFO . Next, a thread is created to run the src() method, and another thread is created to run the dst() method.
Inside makeSrcData() (lines 3047), an array of strings (lines 3138) is created and written to an ObjectOutputStream (lines 4144). The ObjectOutputStream() passes the serialized data on to a ByteArrayOutputStream() (line 40). The bytes collected are turned into a byte[] and stored in srcData (line 46).
The src() method  (lines 4977) takes the bytes from srcData and pushes them into the FIFO queue. It alternates between adding a single byte and adding a byte array each time through the while loop by toggling the justAddOne variable (lines 51, 55, and 72). The size of the byte[] to be added is randomly determined (line 56) to keep things interesting. As data is added to the ByteFIFO , messages are printed (lines 64 and 69). This method completes when all the bytes in srcData have been added to the FIFO queue.
The dst() method  (lines 79124) removes the bytes from the ByteFIFO , stores them in a local array, and then de-serializes the object to confirm its successful transmission. The dst() method alternates between remove() and removeAll() each time through the while loop. The looping continues until the specified number of bytes has been removed (lines 8385). As data is removed from the ByteFIFO , messages are printed (lines 9394, 100, and 101). When all the bytes have been retrieved, they are used to create a ByteArrayInputStream that is, in turn , used to create an ObjectInputStream (lines 109110). The one object that is serialized is a String[] , and an attempt to read it back and cast it into its proper type is made (line 112). The String[] is then iterated through, and each string is printed to confirm uncorrupted delivery (lines 114116).
Listing 18.9 shows possible output when ByteFIFOTest is run. Your output is likely to differ a bit, but ultimately, the list of strings printed at the end should match exactly.
Listing 18.9  Possible Output from Running ByteFIFOTest
1: srcData.length=224
2: just added exactly 1 byte
3: just removed exactly 1 byte
4: just removed 19 bytes
5: just added 26 bytes
6: just added exactly 1 byte
7: just added 7 bytes
8: just added exactly 1 byte
9: just removed exactly 1 byte
10: just removed 20 bytes
11: just added 20 bytes
12: just removed exactly 1 byte
13: just added exactly 1 byte
14: just removed 15 bytes
15: just added 18 bytes
16: just removed exactly 1 byte
17: just added exactly 1 byte
18: just removed 18 bytes
19: just removed exactly 1 byte
20: just added 21 bytes
21: just removed 20 bytes
22: just added exactly 1 byte
23: just removed exactly 1 byte
24: just added 0 bytes
25: just removed 0 bytes
26: just added exactly 1 byte
27: just removed exactly 1 byte
28: just removed 20 bytes
29: just added 33 bytes
30: just removed exactly 1 byte
31: just added exactly 1 byte
32: just removed 13 bytes
33: just removed exactly 1 byte
34: just removed 20 bytes
35: just added 23 bytes
36: just added exactly 1 byte
37: just removed exactly 1 byte
38: just removed 19 bytes
39: just added 24 bytes
40: just added exactly 1 byte
41: just added 5 bytes
42: just added exactly 1 byte
43: just removed exactly 1 byte
44: just added 6 bytes
45: just removed 19 bytes
46: just added exactly 1 byte
47: just removed exactly 1 byte
48: just added 20 bytes
49: just removed 20 bytes
50: just added exactly 1 byte
51: just removed exactly 1 byte
52: just added 8 bytes
53: just removed 8 bytes
54: received all data, count=224
55: line[0]=The first string is right here
56: line[1]=The second string is a bit longer and also right here
57: line[2]=The third string
58: line[3]=ABCDEFGHIJKLMNOPQRSTUVWXYZ
59: line[4]=0123456789
60: line[5]=The last string in the list

Toc


Java Thread Programming
Java Thread Programming
ISBN: 0672315858
EAN: 2147483647
Year: 2005
Pages: 149
Authors: Paul Hyde

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net