Guidelines for Using SureStop

Chapter 18 - First-In-First-Out (FIFO) Queue

Java Thread Programming
Paul Hyde
  Copyright 1999 Sams Publishing

An Expanded FIFO Queue for Object References: ObjectFIFO
The class ObjectFIFO removes the printState() method from SimpleObjectFIFO and expands on its foundation to create a more feature-rich FIFO queue for holding object references. The public API for ObjectFIFO consists of the following:
public ObjectFIFO(int cap)
public int getCapacity()
public synchronized int getSize()
public synchronized boolean isEmpty()
public synchronized boolean isFull()
public synchronized void add(Object obj) throws InterruptedException
public synchronized void addEach(Object[] list) throws InterruptedException
public synchronized Object remove() throws InterruptedException
public synchronized Object[] removeAll() throws InterruptedException
public synchronized Object[] removeAtLeastOne() throws InterruptedException
public synchronized boolean waitUntilEmpty(long msTimeout)
               throws InterruptedException
public synchronized void waitUntilEmpty() throws InterruptedException
public synchronized void waitWhileEmpty() throws InterruptedException
public synchronized void waitUntilFull() throws InterruptedException
public synchronized void waitWhileFull() throws InterruptedException
All the methods (except getCapacity() ) are synchronized to ensure that multiple threads can safely and simultaneously interact with ObjectFIFO . The getCapacity() method does not have to be synchronized because the capacity never changes. The methods that might block waiting for something to change declare that they will throw an InterruptedException if interrupted while waiting. Listing 18.4 shows the code for ObjectFIFO . You should feel free to expand on this functionality by adding more methods especially consider adding timeout options to all the methods that can block!
Listing 18.4  ObjectFIFO.javaFuller Implementation of a FIFO for Objects
  1: public class ObjectFIFO extends Object {
  2:     private Object[] queue;
  3:     private int capacity;
  4:     private int size ;
  5:     private int head;
  6:     private int tail;
  7:
  8:     public ObjectFIFO(int cap) {
  9:         capacity = (cap > 0) ? cap : 1; // at least 1
10:         queue = new Object[capacity];
11:         head = 0;
12:         tail = 0;
13:         size = 0;
14:     }
15:
16:     public int getCapacity() {
17:         return capacity;
18:     }
19:
20:     public synchronized int getSize() {
21:         return size;
22:     }
23:
24:     public synchronized boolean isEmpty() {
25:         return (size == 0);
26:     }
27:
28:     public synchronized boolean isFull() {
29:         return (size == capacity);
30:     }
31:
32:     public synchronized void add(Object obj)
33:             throws InterruptedException {
34:
35:         waitWhileFull();
36:
37:         queue[head] = obj;
38:         head = (head + 1) % capacity;
39:         size++;
40:
41:         notifyAll(); // let any waiting threads know about change
42:     }
43:
44:     public synchronized void addEach(Object[] list)
45:             throws InterruptedException {
46:
47:         //
48:         // You might want to code a more efficient
49:         // implementation here ... (see ByteFIFO.java)
50:         //
51:
52:         for (int i = 0; i < list.length; i++) {
53:             add(list[i]);
54:         }
55:     }
56:
57:     public synchronized Object remove()
58:             throws InterruptedException {
59:
60:         waitWhileEmpty();
61:
62:         Object obj = queue[tail];
63:
64:         // dont block GC by keeping unnecessary reference
65:         queue[tail] = null;
66:
67:         tail = (tail + 1) % capacity;
68:         size;
69:
70:         notifyAll(); // let any waiting threads know about change
71:
72:         return obj;
73:     }
74:
75:     public synchronized Object[] removeAll()
76:             throws InterruptedException {
77:
78:         //
79:         // You might want to code a more efficient
80:         // implementation here ... (see ByteFIFO.java)
81:         //
82:
83:         Object[] list = new Object[size]; // use the current size
84:
85:         for (int i = 0; i < list.length; i++) {
86:             list[i] = remove();
87:         }
88:
89:         // if FIFO was empty, a zero-length array is returned
90:         return list;
91:     }
92:
93:     public synchronized Object[] removeAtLeastOne()
94:             throws InterruptedException {
95:
96:         waitWhileEmpty(); // wait for at least one to be in FIFO
97:         return removeAll();
98:     }
99:
Listing 18.4  Continued
100:     public synchronized boolean waitUntilEmpty(long msTimeout)
101:             throws InterruptedException {
102:
103:         if (msTimeout == 0L) {
104:             waitUntilEmpty();  // use other method
105:             return true;
106:         }
107:
108:         // wait only for the specified amount of time
109:         long endTime = System.currentTimeMillis() + msTimeout;
110:         long msRemaining = msTimeout;
111:
112:         while (!isEmpty() && (msRemaining > 0L)) {
113:             wait(msRemaining);
114:             msRemaining = endTime - System.currentTimeMillis();
115:         }
116:
117:         // May have timed out, or may have met condition,
118:         // calc return value.
119:         return isEmpty();
120:     }
121:
122:     public synchronized void waitUntilEmpty()
123:             throws InterruptedException {
124:
125:         while (!isEmpty()) {
126:             wait();
127:         }
128:     }
129:
130:     public synchronized void waitWhileEmpty()
131:             throws InterruptedException {
132:
133:         while (isEmpty()) {
134:             wait();
135:         }
136:     }
137:
138:     public synchronized void waitUntilFull()
139:             throws InterruptedException {
140:
141:         while (!isFull()) {
142:             wait();
143:         }
144:     }
145:
146:     public synchronized void waitWhileFull()
147:             throws InterruptedException {
148:
149:         while (isFull()) {
150:             wait();
151:         }
152:     }
153: }
In ObjectFIFO , the member variables , the constructor, and the getSize() and isFull() methods work the same as in SimpleObjectFIFO (described earlier in this chapter).
The getCapacity() method (lines 1618) has been added for convenience to determine the maximum number of object references that can be held in the FIFO queue. The isEmpty() method (lines 2426) returns true if the size is currently .
The add() method (lines 3242) has been changed slightly to call waitWhileFull() , rather than handle the wait() call directly. Otherwise , the add() in ObjectFIFO is the same as in SimpleObjectFIFO .
The addEach() method (lines 4455) supports adding each element in an Object[] as its own item. If an Object[] should be added as one item, add() should be called instead. Inside addEach() , the array is simply stepped through, and each element is individually put into the FIFO queue by invoking add() (lines 5254). This could be done in a more efficient manner that directly works with queue , head , tail , and size (see ByteFIFO for one technique).
The remove() method (lines 5773) has been changed slightly to call waitWhileEmpty() , rather than handle the wait() call directly. Otherwise, the remove() in ObjectFIFO is the same as in SimpleObjectFIFO .
The removeAll() method (lines 7591) supports removing all the items currently in the FIFO queue and returning them in an Object[] . This method does not blockeven if the current size is . If the queue is empty, a zero-length array will be returned. Inside removeAll() , an Object[] is created based on the current value of size (line 83). Then for each item, remove() is called, and the value returned is stored in this new array (line 86). Finally, the Object[] is returned (line 90). This could be done in a more efficient manner that directly works with queue , head , tail , and size (see ByteFIFO for one technique).
The removeAtLeastOne() method (lines 9398) is used to wait until at least one item is in the FIFO queue and then to remove and return all the items in the queue. This method will block as long as the queue is empty (line 96). As soon as it is not empty (or if it wasnt empty to start with), removeAll() is invoked, and the Object[] it generates is returned (line 97).
The rest of the methods ( waitUntil X and waitWhile X ) kindly encapsulate the wait-notify mechanism so that users of ObjectFIFO dont have to burden themselves with synchronized and wait() . One of them, waitUntilEmpty(long msTimeout) , takes a timeout value so that the waiting is not indefinite. You should consider extending this functionality to the others.
The waitUntilEmpty(long msTimeout) method (lines 100120) is used to block until either no more items are in the FIFO queue or until the specified number of milliseconds elapses. If the queue is empty, true is returned; otherwise, false is returned. If the timeout is , the indefinite waitUntilEmpty() is called, and after that returns, true is returned (lines 103106). Otherwise, the waiting for the full timeout technique from Chapter 14 is used (lines 109119).
The ObjectFIFOTest class, in Listing 18.5, is used to demonstrate the ObjectFIFO class.
Listing 18.5  ObjectFIFOTest.javaDemonstration Code for ObjectFIFO
  1: public class ObjectFIFOTest extends Object {
  2:    
  3:     private static void fullCheck(ObjectFIFO fifo) {
  4:         try {
  5:             // Syncd to allow messages to print while
  6:             // condition is still true.
  7:             synchronized (fifo) {
  8:                 while (true) {
  9:                     fifo.waitUntilFull();
10:                     print(FULL);
11:                     fifo.waitWhileFull();
12:                     print(NO LONGER FULL);
13:                 }
14:             }
15:         } catch (InterruptedException ix) {
16:             return;
17:         }
18:     }
19:
20:     private static void emptyCheck(ObjectFIFO fifo) {
21:         try {
22:             // Syncd to allow messages to print while
23:             // condition is still true.
24:             synchronized (fifo) {
25:                 while (true) {
26:                     fifo.waitUntilEmpty();
27:                     print(EMPTY);
28:                     fifo.waitWhileEmpty();
29:                     print(NO LONGER EMPTY);
30:                 }
31:             }
32:         } catch (InterruptedException ix) {
33:             return;
34:         }
35:     }
36:
37:     private static void consumer(ObjectFIFO fifo) {
38:         try {
39:             print(just entered consumer());
40:
41:             for (int i = 0; i < 3; i++) {
42:                 synchronized (fifo) {
43:                     Object obj = fifo.remove();
44:                     print(DATA-OUT - did remove(), obj= + obj);
45:                 }
46:                 Thread.sleep(3000);
47:             }
48:
49:             synchronized (fifo) {
50:                 boolean resultOfWait = fifo.waitUntilEmpty(500);
51:                 print(did waitUntilEmpty(500), resultOfWait= +
52:                         resultOfWait + , getSize()= +
53:                         fifo.getSize());
54:             }
55:
56:             for (int i = 0; i < 3; i++) {
57:                 synchronized (fifo) {
58:                     Object[] list = fifo.removeAll();
59:                     print(did removeAll(), list.length= +
60:                             list.length);
61:
62:                     for (int j = 0; j < list.length; j++) {
63:                         print(DATA-OUT - list[ + j + ]= +
64:                                 list[j]);
65:                     }
66:                 }
67:                 Thread.sleep(100);
68:             }
69:
70:             for (int i = 0; i < 3; i++) {
71:                 synchronized (fifo) {
72:                     Object[] list = fifo.removeAtLeastOne();
73:                     print(
74:                         did removeAtLeastOne(), list.length= +
75:                         list.length);
76:
77:                     for (int j = 0; j < list.length; j++) {
78:                         print(DATA-OUT - list[ + j + ]= +
79:                                 list[j]);
80:                     }
81:                 }
82:                 Thread.sleep(1000);
83:             }
84:
85:             while (!fifo.isEmpty()) {
86:                 synchronized (fifo) {
87:                     Object obj = fifo.remove();
88:                     print(DATA-OUT - did remove(), obj= + obj);
89:                 }
90:                 Thread.sleep(1000);
91:             }
92:
93:             print(leaving consumer());
94:         } catch (InterruptedException ix) {
95:             return;
96:         }
97:     }
98:
99:     private static void producer(ObjectFIFO fifo) {
100:         try {
Listing 18.5  Continued
101:             print(just entered producer());
102:             int count = 0;
103:
104:             Object obj0 = new Integer(count);
105:             count++;
106:             synchronized (fifo) {
107:                 fifo.add(obj0);
108:                 print(DATA-IN - did add(), obj0= + obj0);
109:
110:                 boolean resultOfWait = fifo.waitUntilEmpty(500);
111:                 print(did waitUntilEmpty(500), resultOfWait= +
112:                         resultOfWait + , getSize()= +
113:                         fifo.getSize());
114:             }
115:
116:             for (int i = 0; i < 10; i++) {
117:                 Object obj = new Integer(count);
118:                 count++;
119:                 synchronized (fifo) {
120:                     fifo.add(obj);
121:                     print(DATA-IN - did add(), obj= + obj);
122:                 }
123:                 Thread.sleep(1000);
124:             }
125:
126:             Thread.sleep(2000);
127:
128:             Object obj = new Integer(count);
129:             count++;
130:             synchronized (fifo) {
131:                 fifo.add(obj);
132:                 print(DATA-IN - did add(), obj= + obj);
133:             }
134:             Thread.sleep(500);
135:
136:             Integer[] list1 = new Integer[3];
137:             for (int i = 0; i < list1.length; i++) {
138:                 list1[i] = new Integer(count);
139:                 count++;
140:             }
141:
142:             synchronized (fifo) {
143:                 fifo.addEach(list1);
144:                 print(did addEach(), list1.length= +
145:                         list1.length);
146:             }
147:
148:             Integer[] list2 = new Integer[8];
149:             for (int i = 0; i < list2.length; i++) {
150:                 list2[i] = new Integer(count);
151:                 count++;
152:             }
153:
154:             synchronized (fifo) {
155:                 fifo.addEach(list2);
156:                 print(did addEach(), list2.length= +
157:                         list2.length);
158:             }
159:
160:             synchronized (fifo) {
161:                 fifo.waitUntilEmpty();
162:                 print(fifo.isEmpty()= + fifo.isEmpty());
163:             }
164:
165:             print(leaving producer());
166:         } catch (InterruptedException ix) {
167:             return;
168:         }
169:     }
170:
171:     private static synchronized void print(String msg) {
172:         System.out.println(
173:             Thread.currentThread().getName() + : + msg);
174:     }
175:
176:     public static void main(String[] args) {
177:         final ObjectFIFO fifo = new ObjectFIFO(5);
178:
179:         Runnable fullCheckRunnable = new Runnable() {
180:                 public void run() {
181:                     fullCheck(fifo);
182:                 }
183:             };
184:
185:         Thread fullCheckThread =
186:                 new Thread(fullCheckRunnable, fchk);
187:         fullCheckThread.setPriority(9);
188:         fullCheckThread.setDaemon(true); // die automatically
189:         fullCheckThread.start();
190:
191:         Runnable emptyCheckRunnable = new Runnable() {
192:                 public void run() {
193:                     emptyCheck(fifo);
194:                 }
195:             };
196:
197:         Thread emptyCheckThread =
198:                 new Thread(emptyCheckRunnable, echk);
199:         emptyCheckThread.setPriority(8);
200:         emptyCheckThread.setDaemon(true); // die automatically
201:         emptyCheckThread.start();
202:
203:         Runnable consumerRunnable = new Runnable() {
204:                 public void run() {
205:                     consumer(fifo);
206:                 }
207:             };
208:
209:         Thread consumerThread =
210:                 new Thread(consumerRunnable, cons);
211:         consumerThread.setPriority(7);
212:         consumerThread.start();
213:
214:         Runnable producerRunnable = new Runnable() {
215:                 public void run() {
216:                     producer(fifo);
217:                 }
218:             };
219:
220:         Thread producerThread =
221:                 new Thread(producerRunnable, prod);
222:         producerThread.setPriority(6);
223:         producerThread.start();
224:     }
225: }
ObjectFIFOTest starts four threads to simultaneously access an ObjectFIFO created with a capacity of 5 items (line 177). The fullCheckThread (lines 179189) is used to run the fullCheck() method (lines 318). The fullCheck() method first waits until the FIFO queue is full (line 9) and then prints a message (line 10). It then waits until the FIFO queue is no longer full (line 11) and then prints a message (line 12). It continues to loop through these two checks indefinitely (line 8).
Next, the main thread creates the emptyCheckThread (lines 191201) to run emptyCheck() (lines 2035). This method works just like fullCheck() but instead prints messages when the FIFO queue transitions to and from the empty state.
The consumerThread is then created (lines 203212) to run consumer() (lines 3797). Inside the consumer() method, the thread exercises all the remove X methods.
Finally the producerThread is created (lines 214223) to run producer() (lines 99169). Inside the producer() method, several of the add X methods are used to sporadically add items to the queue.
The print() method (lines 171174) is used to produce all the console output. It prefixes each message with the name of the thread that sent the message.
Listing 18.6 shows the output from a particular run of ObjectFIFOTest . Your output is likely to differ slightly because the four threads running will be scheduled somewhat randomly . The main thing to notice is that all the items added are eventually removed in the proper order.
Listing 18.6  Possible Output from ObjectFIFOTest
1: echk: EMPTY
2: cons: just entered consumer()
3: prod: just entered producer()
4: prod: DATA-IN - did add(), obj0=0
5: echk: NO LONGER EMPTY
6: cons: DATA-OUT - did remove(), obj=0
7: echk: EMPTY
8: prod: did waitUntilEmpty(500), resultOfWait=true, getSize()=0
9: prod: DATA-IN - did add(), obj=1
10: echk: NO LONGER EMPTY
11: prod: DATA-IN - did add(), obj=2
12: prod: DATA-IN - did add(), obj=3
13: cons: DATA-OUT - did remove(), obj=1
14: prod: DATA-IN - did add(), obj=4
15: prod: DATA-IN - did add(), obj=5
16: prod: DATA-IN - did add(), obj=6
17: fchk: FULL
18: cons: DATA-OUT - did remove(), obj=2
19: fchk: NO LONGER FULL
20: prod: DATA-IN - did add(), obj=7
21: fchk: FULL
22: cons: did waitUntilEmpty(500), resultOfWait=false, getSize()=5
23: cons: did removeAll(), list.length=5
24: cons: DATA-OUT - list[0]=3
25: cons: DATA-OUT - list[1]=4
26: cons: DATA-OUT - list[2]=5
27: cons: DATA-OUT - list[3]=6
28: cons: DATA-OUT - list[4]=7
29: fchk: NO LONGER FULL
30: echk: EMPTY
31: prod: DATA-IN - did add(), obj=8
32: echk: NO LONGER EMPTY
33: cons: did removeAll(), list.length=1
34: cons: DATA-OUT - list[0]=8
35: echk: EMPTY
36: cons: did removeAll(), list.length=0
37: prod: DATA-IN - did add(), obj=9
38: echk: NO LONGER EMPTY
39: cons: did removeAtLeastOne(), list.length=1
40: cons: DATA-OUT - list[0]=9
41: echk: EMPTY
42: prod: DATA-IN - did add(), obj=10
43: echk: NO LONGER EMPTY
44: cons: did removeAtLeastOne(), list.length=1
45: cons: DATA-OUT - list[0]=10
46: echk: EMPTY
47: prod: DATA-IN - did add(), obj=11
48: echk: NO LONGER EMPTY
49: cons: did removeAtLeastOne(), list.length=1
50: cons: DATA-OUT - list[0]=11
51: echk: EMPTY
52: prod: did addEach(), list1.length=3
53: echk: NO LONGER EMPTY
54: fchk: FULL
55: cons: DATA-OUT - did remove(), obj=12
56: fchk: NO LONGER FULL
57: fchk: FULL
58: cons: DATA-OUT - did remove(), obj=13
59: fchk: NO LONGER FULL
60: fchk: FULL
61: cons: DATA-OUT - did remove(), obj=14
62: fchk: NO LONGER FULL
63: fchk: FULL
64: cons: DATA-OUT - did remove(), obj=15
65: fchk: NO LONGER FULL
66: fchk: FULL
67: cons: DATA-OUT - did remove(), obj=16
68: fchk: NO LONGER FULL
69: fchk: FULL
70: cons: DATA-OUT - did remove(), obj=17
71: fchk: NO LONGER FULL
72: prod: did addEach(), list2.length=8
73: fchk: FULL
74: cons: DATA-OUT - did remove(), obj=18
75: fchk: NO LONGER FULL
76: cons: DATA-OUT - did remove(), obj=19
77: cons: DATA-OUT - did remove(), obj=20
78: cons: DATA-OUT - did remove(), obj=21
79: cons: DATA-OUT - did remove(), obj=22
80: echk: EMPTY
81: prod: fifo.isEmpty()=true
82: prod: leaving producer()
83: cons: leaving consumer()

Toc


Java Thread Programming
Java Thread Programming
ISBN: 0672315858
EAN: 2147483647
Year: 2005
Pages: 149
Authors: Paul Hyde

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net