| ||||
Copyright 1999 Sams Publishing |
|
An Expanded FIFO Queue for Object References: ObjectFIFO |
The class ObjectFIFO removes the printState() method from SimpleObjectFIFO and expands on its foundation to create a more feature-rich FIFO queue for holding object references. The public API for ObjectFIFO consists of the following: |
public ObjectFIFO(int cap) |
public int getCapacity() |
public synchronized int getSize() |
public synchronized boolean isEmpty() |
public synchronized boolean isFull() |
public synchronized void add(Object obj) throws InterruptedException |
public synchronized void addEach(Object[] list) throws InterruptedException |
public synchronized Object remove() throws InterruptedException |
public synchronized Object[] removeAll() throws InterruptedException |
public synchronized Object[] removeAtLeastOne() throws InterruptedException |
public synchronized boolean waitUntilEmpty(long msTimeout) |
throws InterruptedException |
public synchronized void waitUntilEmpty() throws InterruptedException |
public synchronized void waitWhileEmpty() throws InterruptedException |
public synchronized void waitUntilFull() throws InterruptedException |
public synchronized void waitWhileFull() throws InterruptedException |
All the methods (except getCapacity() ) are synchronized to ensure that multiple threads can safely and simultaneously interact with ObjectFIFO . The getCapacity() method does not have to be synchronized because the capacity never changes. The methods that might block waiting for something to change declare that they will throw an InterruptedException if interrupted while waiting. Listing 18.4 shows the code for ObjectFIFO . You should feel free to expand on this functionality by adding more methods especially consider adding timeout options to all the methods that can block! |
Listing 18.4 ObjectFIFO.javaFuller Implementation of a FIFO for Objects |
1: public class ObjectFIFO extends Object { |
2: private Object[] queue; |
3: private int capacity; |
4: private int size ; |
5: private int head; |
6: private int tail; |
7: |
8: public ObjectFIFO(int cap) { |
9: capacity = (cap > 0) ? cap : 1; // at least 1 |
10: queue = new Object[capacity]; |
11: head = 0; |
12: tail = 0; |
13: size = 0; |
14: } |
15: |
16: public int getCapacity() { |
17: return capacity; |
18: } |
19: |
20: public synchronized int getSize() { |
21: return size; |
22: } |
23: |
24: public synchronized boolean isEmpty() { |
25: return (size == 0); |
26: } |
27: |
28: public synchronized boolean isFull() { |
29: return (size == capacity); |
30: } |
31: |
32: public synchronized void add(Object obj) |
33: throws InterruptedException { |
34: |
35: waitWhileFull(); |
36: |
37: queue[head] = obj; |
38: head = (head + 1) % capacity; |
39: size++; |
40: |
41: notifyAll(); // let any waiting threads know about change |
42: } |
43: |
44: public synchronized void addEach(Object[] list) |
45: throws InterruptedException { |
46: |
47: // |
48: // You might want to code a more efficient |
49: // implementation here ... (see ByteFIFO.java) |
50: // |
51: |
52: for (int i = 0; i < list.length; i++) { |
53: add(list[i]); |
54: } |
55: } |
56: |
57: public synchronized Object remove() |
58: throws InterruptedException { |
59: |
60: waitWhileEmpty(); |
61: |
62: Object obj = queue[tail]; |
63: |
64: // dont block GC by keeping unnecessary reference |
65: queue[tail] = null; |
66: |
67: tail = (tail + 1) % capacity; |
68: size; |
69: |
70: notifyAll(); // let any waiting threads know about change |
71: |
72: return obj; |
73: } |
74: |
75: public synchronized Object[] removeAll() |
76: throws InterruptedException { |
77: |
78: // |
79: // You might want to code a more efficient |
80: // implementation here ... (see ByteFIFO.java) |
81: // |
82: |
83: Object[] list = new Object[size]; // use the current size |
84: |
85: for (int i = 0; i < list.length; i++) { |
86: list[i] = remove(); |
87: } |
88: |
89: // if FIFO was empty, a zero-length array is returned |
90: return list; |
91: } |
92: |
93: public synchronized Object[] removeAtLeastOne() |
94: throws InterruptedException { |
95: |
96: waitWhileEmpty(); // wait for at least one to be in FIFO |
97: return removeAll(); |
98: } |
99: |
Listing 18.4 Continued |
100: public synchronized boolean waitUntilEmpty(long msTimeout) |
101: throws InterruptedException { |
102: |
103: if (msTimeout == 0L) { |
104: waitUntilEmpty(); // use other method |
105: return true; |
106: } |
107: |
108: // wait only for the specified amount of time |
109: long endTime = System.currentTimeMillis() + msTimeout; |
110: long msRemaining = msTimeout; |
111: |
112: while (!isEmpty() && (msRemaining > 0L)) { |
113: wait(msRemaining); |
114: msRemaining = endTime - System.currentTimeMillis(); |
115: } |
116: |
117: // May have timed out, or may have met condition, |
118: // calc return value. |
119: return isEmpty(); |
120: } |
121: |
122: public synchronized void waitUntilEmpty() |
123: throws InterruptedException { |
124: |
125: while (!isEmpty()) { |
126: wait(); |
127: } |
128: } |
129: |
130: public synchronized void waitWhileEmpty() |
131: throws InterruptedException { |
132: |
133: while (isEmpty()) { |
134: wait(); |
135: } |
136: } |
137: |
138: public synchronized void waitUntilFull() |
139: throws InterruptedException { |
140: |
141: while (!isFull()) { |
142: wait(); |
143: } |
144: } |
145: |
146: public synchronized void waitWhileFull() |
147: throws InterruptedException { |
148: |
149: while (isFull()) { |
150: wait(); |
151: } |
152: } |
153: } |
In ObjectFIFO , the member variables , the constructor, and the getSize() and isFull() methods work the same as in SimpleObjectFIFO (described earlier in this chapter). |
The getCapacity() method (lines 1618) has been added for convenience to determine the maximum number of object references that can be held in the FIFO queue. The isEmpty() method (lines 2426) returns true if the size is currently . |
The add() method (lines 3242) has been changed slightly to call waitWhileFull() , rather than handle the wait() call directly. Otherwise , the add() in ObjectFIFO is the same as in SimpleObjectFIFO . |
The addEach() method (lines 4455) supports adding each element in an Object[] as its own item. If an Object[] should be added as one item, add() should be called instead. Inside addEach() , the array is simply stepped through, and each element is individually put into the FIFO queue by invoking add() (lines 5254). This could be done in a more efficient manner that directly works with queue , head , tail , and size (see ByteFIFO for one technique). |
The remove() method (lines 5773) has been changed slightly to call waitWhileEmpty() , rather than handle the wait() call directly. Otherwise, the remove() in ObjectFIFO is the same as in SimpleObjectFIFO . |
The removeAll() method (lines 7591) supports removing all the items currently in the FIFO queue and returning them in an Object[] . This method does not blockeven if the current size is . If the queue is empty, a zero-length array will be returned. Inside removeAll() , an Object[] is created based on the current value of size (line 83). Then for each item, remove() is called, and the value returned is stored in this new array (line 86). Finally, the Object[] is returned (line 90). This could be done in a more efficient manner that directly works with queue , head , tail , and size (see ByteFIFO for one technique). |
The removeAtLeastOne() method (lines 9398) is used to wait until at least one item is in the FIFO queue and then to remove and return all the items in the queue. This method will block as long as the queue is empty (line 96). As soon as it is not empty (or if it wasnt empty to start with), removeAll() is invoked, and the Object[] it generates is returned (line 97). |
The rest of the methods ( waitUntil X and waitWhile X ) kindly encapsulate the wait-notify mechanism so that users of ObjectFIFO dont have to burden themselves with synchronized and wait() . One of them, waitUntilEmpty(long msTimeout) , takes a timeout value so that the waiting is not indefinite. You should consider extending this functionality to the others. |
The waitUntilEmpty(long msTimeout) method (lines 100120) is used to block until either no more items are in the FIFO queue or until the specified number of milliseconds elapses. If the queue is empty, true is returned; otherwise, false is returned. If the timeout is , the indefinite waitUntilEmpty() is called, and after that returns, true is returned (lines 103106). Otherwise, the waiting for the full timeout technique from Chapter 14 is used (lines 109119). |
The ObjectFIFOTest class, in Listing 18.5, is used to demonstrate the ObjectFIFO class. |
Listing 18.5 ObjectFIFOTest.javaDemonstration Code for ObjectFIFO |
1: public class ObjectFIFOTest extends Object { |
2: |
3: private static void fullCheck(ObjectFIFO fifo) { |
4: try { |
5: // Syncd to allow messages to print while |
6: // condition is still true. |
7: synchronized (fifo) { |
8: while (true) { |
9: fifo.waitUntilFull(); |
10: print(FULL); |
11: fifo.waitWhileFull(); |
12: print(NO LONGER FULL); |
13: } |
14: } |
15: } catch (InterruptedException ix) { |
16: return; |
17: } |
18: } |
19: |
20: private static void emptyCheck(ObjectFIFO fifo) { |
21: try { |
22: // Syncd to allow messages to print while |
23: // condition is still true. |
24: synchronized (fifo) { |
25: while (true) { |
26: fifo.waitUntilEmpty(); |
27: print(EMPTY); |
28: fifo.waitWhileEmpty(); |
29: print(NO LONGER EMPTY); |
30: } |
31: } |
32: } catch (InterruptedException ix) { |
33: return; |
34: } |
35: } |
36: |
37: private static void consumer(ObjectFIFO fifo) { |
38: try { |
39: print(just entered consumer()); |
40: |
41: for (int i = 0; i < 3; i++) { |
42: synchronized (fifo) { |
43: Object obj = fifo.remove(); |
44: print(DATA-OUT - did remove(), obj= + obj); |
45: } |
46: Thread.sleep(3000); |
47: } |
48: |
49: synchronized (fifo) { |
50: boolean resultOfWait = fifo.waitUntilEmpty(500); |
51: print(did waitUntilEmpty(500), resultOfWait= + |
52: resultOfWait + , getSize()= + |
53: fifo.getSize()); |
54: } |
55: |
56: for (int i = 0; i < 3; i++) { |
57: synchronized (fifo) { |
58: Object[] list = fifo.removeAll(); |
59: print(did removeAll(), list.length= + |
60: list.length); |
61: |
62: for (int j = 0; j < list.length; j++) { |
63: print(DATA-OUT - list[ + j + ]= + |
64: list[j]); |
65: } |
66: } |
67: Thread.sleep(100); |
68: } |
69: |
70: for (int i = 0; i < 3; i++) { |
71: synchronized (fifo) { |
72: Object[] list = fifo.removeAtLeastOne(); |
73: print( |
74: did removeAtLeastOne(), list.length= + |
75: list.length); |
76: |
77: for (int j = 0; j < list.length; j++) { |
78: print(DATA-OUT - list[ + j + ]= + |
79: list[j]); |
80: } |
81: } |
82: Thread.sleep(1000); |
83: } |
84: |
85: while (!fifo.isEmpty()) { |
86: synchronized (fifo) { |
87: Object obj = fifo.remove(); |
88: print(DATA-OUT - did remove(), obj= + obj); |
89: } |
90: Thread.sleep(1000); |
91: } |
92: |
93: print(leaving consumer()); |
94: } catch (InterruptedException ix) { |
95: return; |
96: } |
97: } |
98: |
99: private static void producer(ObjectFIFO fifo) { |
100: try { |
Listing 18.5 Continued |
101: print(just entered producer()); |
102: int count = 0; |
103: |
104: Object obj0 = new Integer(count); |
105: count++; |
106: synchronized (fifo) { |
107: fifo.add(obj0); |
108: print(DATA-IN - did add(), obj0= + obj0); |
109: |
110: boolean resultOfWait = fifo.waitUntilEmpty(500); |
111: print(did waitUntilEmpty(500), resultOfWait= + |
112: resultOfWait + , getSize()= + |
113: fifo.getSize()); |
114: } |
115: |
116: for (int i = 0; i < 10; i++) { |
117: Object obj = new Integer(count); |
118: count++; |
119: synchronized (fifo) { |
120: fifo.add(obj); |
121: print(DATA-IN - did add(), obj= + obj); |
122: } |
123: Thread.sleep(1000); |
124: } |
125: |
126: Thread.sleep(2000); |
127: |
128: Object obj = new Integer(count); |
129: count++; |
130: synchronized (fifo) { |
131: fifo.add(obj); |
132: print(DATA-IN - did add(), obj= + obj); |
133: } |
134: Thread.sleep(500); |
135: |
136: Integer[] list1 = new Integer[3]; |
137: for (int i = 0; i < list1.length; i++) { |
138: list1[i] = new Integer(count); |
139: count++; |
140: } |
141: |
142: synchronized (fifo) { |
143: fifo.addEach(list1); |
144: print(did addEach(), list1.length= + |
145: list1.length); |
146: } |
147: |
148: Integer[] list2 = new Integer[8]; |
149: for (int i = 0; i < list2.length; i++) { |
150: list2[i] = new Integer(count); |
151: count++; |
152: } |
153: |
154: synchronized (fifo) { |
155: fifo.addEach(list2); |
156: print(did addEach(), list2.length= + |
157: list2.length); |
158: } |
159: |
160: synchronized (fifo) { |
161: fifo.waitUntilEmpty(); |
162: print(fifo.isEmpty()= + fifo.isEmpty()); |
163: } |
164: |
165: print(leaving producer()); |
166: } catch (InterruptedException ix) { |
167: return; |
168: } |
169: } |
170: |
171: private static synchronized void print(String msg) { |
172: System.out.println( |
173: Thread.currentThread().getName() + : + msg); |
174: } |
175: |
176: public static void main(String[] args) { |
177: final ObjectFIFO fifo = new ObjectFIFO(5); |
178: |
179: Runnable fullCheckRunnable = new Runnable() { |
180: public void run() { |
181: fullCheck(fifo); |
182: } |
183: }; |
184: |
185: Thread fullCheckThread = |
186: new Thread(fullCheckRunnable, fchk); |
187: fullCheckThread.setPriority(9); |
188: fullCheckThread.setDaemon(true); // die automatically |
189: fullCheckThread.start(); |
190: |
191: Runnable emptyCheckRunnable = new Runnable() { |
192: public void run() { |
193: emptyCheck(fifo); |
194: } |
195: }; |
196: |
197: Thread emptyCheckThread = |
198: new Thread(emptyCheckRunnable, echk); |
199: emptyCheckThread.setPriority(8); |
200: emptyCheckThread.setDaemon(true); // die automatically |
201: emptyCheckThread.start(); |
202: |
203: Runnable consumerRunnable = new Runnable() { |
204: public void run() { |
205: consumer(fifo); |
206: } |
207: }; |
208: |
209: Thread consumerThread = |
210: new Thread(consumerRunnable, cons); |
211: consumerThread.setPriority(7); |
212: consumerThread.start(); |
213: |
214: Runnable producerRunnable = new Runnable() { |
215: public void run() { |
216: producer(fifo); |
217: } |
218: }; |
219: |
220: Thread producerThread = |
221: new Thread(producerRunnable, prod); |
222: producerThread.setPriority(6); |
223: producerThread.start(); |
224: } |
225: } |
ObjectFIFOTest starts four threads to simultaneously access an ObjectFIFO created with a capacity of 5 items (line 177). The fullCheckThread (lines 179189) is used to run the fullCheck() method (lines 318). The fullCheck() method first waits until the FIFO queue is full (line 9) and then prints a message (line 10). It then waits until the FIFO queue is no longer full (line 11) and then prints a message (line 12). It continues to loop through these two checks indefinitely (line 8). |
Next, the main thread creates the emptyCheckThread (lines 191201) to run emptyCheck() (lines 2035). This method works just like fullCheck() but instead prints messages when the FIFO queue transitions to and from the empty state. |
The consumerThread is then created (lines 203212) to run consumer() (lines 3797). Inside the consumer() method, the thread exercises all the remove X methods. |
Finally the producerThread is created (lines 214223) to run producer() (lines 99169). Inside the producer() method, several of the add X methods are used to sporadically add items to the queue. |
The print() method (lines 171174) is used to produce all the console output. It prefixes each message with the name of the thread that sent the message. |
Listing 18.6 shows the output from a particular run of ObjectFIFOTest . Your output is likely to differ slightly because the four threads running will be scheduled somewhat randomly . The main thing to notice is that all the items added are eventually removed in the proper order. |
Listing 18.6 Possible Output from ObjectFIFOTest |
1: echk: EMPTY |
2: cons: just entered consumer() |
3: prod: just entered producer() |
4: prod: DATA-IN - did add(), obj0=0 |
5: echk: NO LONGER EMPTY |
6: cons: DATA-OUT - did remove(), obj=0 |
7: echk: EMPTY |
8: prod: did waitUntilEmpty(500), resultOfWait=true, getSize()=0 |
9: prod: DATA-IN - did add(), obj=1 |
10: echk: NO LONGER EMPTY |
11: prod: DATA-IN - did add(), obj=2 |
12: prod: DATA-IN - did add(), obj=3 |
13: cons: DATA-OUT - did remove(), obj=1 |
14: prod: DATA-IN - did add(), obj=4 |
15: prod: DATA-IN - did add(), obj=5 |
16: prod: DATA-IN - did add(), obj=6 |
17: fchk: FULL |
18: cons: DATA-OUT - did remove(), obj=2 |
19: fchk: NO LONGER FULL |
20: prod: DATA-IN - did add(), obj=7 |
21: fchk: FULL |
22: cons: did waitUntilEmpty(500), resultOfWait=false, getSize()=5 |
23: cons: did removeAll(), list.length=5 |
24: cons: DATA-OUT - list[0]=3 |
25: cons: DATA-OUT - list[1]=4 |
26: cons: DATA-OUT - list[2]=5 |
27: cons: DATA-OUT - list[3]=6 |
28: cons: DATA-OUT - list[4]=7 |
29: fchk: NO LONGER FULL |
30: echk: EMPTY |
31: prod: DATA-IN - did add(), obj=8 |
32: echk: NO LONGER EMPTY |
33: cons: did removeAll(), list.length=1 |
34: cons: DATA-OUT - list[0]=8 |
35: echk: EMPTY |
36: cons: did removeAll(), list.length=0 |
37: prod: DATA-IN - did add(), obj=9 |
38: echk: NO LONGER EMPTY |
39: cons: did removeAtLeastOne(), list.length=1 |
40: cons: DATA-OUT - list[0]=9 |
41: echk: EMPTY |
42: prod: DATA-IN - did add(), obj=10 |
43: echk: NO LONGER EMPTY |
44: cons: did removeAtLeastOne(), list.length=1 |
45: cons: DATA-OUT - list[0]=10 |
46: echk: EMPTY |
47: prod: DATA-IN - did add(), obj=11 |
48: echk: NO LONGER EMPTY |
49: cons: did removeAtLeastOne(), list.length=1 |
50: cons: DATA-OUT - list[0]=11 |
51: echk: EMPTY |
52: prod: did addEach(), list1.length=3 |
53: echk: NO LONGER EMPTY |
54: fchk: FULL |
55: cons: DATA-OUT - did remove(), obj=12 |
56: fchk: NO LONGER FULL |
57: fchk: FULL |
58: cons: DATA-OUT - did remove(), obj=13 |
59: fchk: NO LONGER FULL |
60: fchk: FULL |
61: cons: DATA-OUT - did remove(), obj=14 |
62: fchk: NO LONGER FULL |
63: fchk: FULL |
64: cons: DATA-OUT - did remove(), obj=15 |
65: fchk: NO LONGER FULL |
66: fchk: FULL |
67: cons: DATA-OUT - did remove(), obj=16 |
68: fchk: NO LONGER FULL |
69: fchk: FULL |
70: cons: DATA-OUT - did remove(), obj=17 |
71: fchk: NO LONGER FULL |
72: prod: did addEach(), list2.length=8 |
73: fchk: FULL |
74: cons: DATA-OUT - did remove(), obj=18 |
75: fchk: NO LONGER FULL |
76: cons: DATA-OUT - did remove(), obj=19 |
77: cons: DATA-OUT - did remove(), obj=20 |
78: cons: DATA-OUT - did remove(), obj=21 |
79: cons: DATA-OUT - did remove(), obj=22 |
80: echk: EMPTY |
81: prod: fifo.isEmpty()=true |
82: prod: leaving producer() |
83: cons: leaving consumer() |
| |||
Toc |