|
A queue is a data structure with two fundamental operations: to add an element to the tail of the queue and to remove an element from the head. That is, the queue manages the data in a first-in/first-out discipline. A blocking queue causes a thread to block when you try to add an element when the queue is currently full or to remove an element when the queue is empty. Blocking queues are a useful tool for coordinating the work of multiple threads. Worker threads can periodically deposit intermediate results in a blocking queue. Other worker threads remove the intermediate results and modify them further. The queue automatically balances the workload. If the first set of threads runs slower than the second, the second set blocks while waiting for the results. If the first set of threads runs faster, the queue fills up until the second set catches up. Table 1-1 shows the operations for blocking queues.
The blocking queue operations fall into three categories, depending on their response. The add, remove, and element operations throw an exception when you try to add to a full queue or get the head of an empty queue. Of course, in a multithreaded program, the queue might become full or empty at any time, so you will instead want to use the offer, poll, and peek methods. These methods simply return with a failure indicator instead of throwing an exception if they cannot carry out their tasks. NOTE
There are also variants of the offer and poll methods with a timeout. For example, the call boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS); tries for 100 milliseconds to insert an element to the tail of the queue. If it succeeds, it immediately returns TRue; otherwise, it returns false when it times out. Similarly, the call Object head = q.poll(100, TimeUnit.MILLISECONDS) returns TRue for 100 milliseconds to remove the head of the queue. If it succeeds, it immediately returns the head; otherwise, it returns null when it times out. Finally, we have blocking operations put and take. The put method blocks if the queue is full, and the take method blocks if the queue is empty. These are the equivalents of offer and poll with no timeout. The java.util.concurrent package supplies four variations of blocking queues. By default, the LinkedBlockingQueue has no upper bound on its capacity, but a maximum capacity can be optionally specified. The ArrayBlockingQueue is constructed with a given capacity and an optional parameter to require fairness. If fairness is specified, then the longest-waiting threads are given preferential treatment. As always, fairness exacts a significant performance penalty, and you should only use it if your problem specifically requires it. The PriorityBlockingQueue is a priority queue, not a first-in/first-out queue. Elements are removed in order of their priority. The queue has unbounded capacity, but retrieval will block if the queue is empty. (We discuss priority queues in greater detail in Chapter 2.) Finally, a DelayQueue contains objects that implement the Delayed interface: interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); } The getdelay method returns the remaining delay of the object. A negative value indicates that the delay has elapsed. Elements can only be removed from a DelayQueue if their delay has elapsed. You also need to implement the compareTo method. The DelayQueue uses that method to sort the entries. The program in Example 1-6 shows how to use a blocking queue to control a set of threads. The program searches through all files in a directory and its subdirectories, printing lines that contain a given keyword. A producer thread enumerates all files in all subdirectories and places them in a blocking queue. This operation is fast, and the queue would quickly fill up with all files in the file system if it was not bounded. We also start a large number of search threads. Each search thread takes a file from the queue, opens it, prints all lines containing the keyword, and then takes the next file. We use a trick to terminate the application when no further work is required. In order to signal completion, the enumeration thread places a dummy object into the queue. (This is similar to a dummy suitcase with a label "last bag" in a baggage claim belt.) When a search thread takes the dummy, it puts it back and terminates. Note that no explicit thread synchronization is required. In this application, we use the queue data structure as a synchronization mechanism. Example 1-6. BlockingQueueTest.java[View full width] 1. import java.io.*; 2. import java.util.*; 3. import java.util.concurrent.*; 4. 5. public class BlockingQueueTest 6. { 7. public static void main(String[] args) 8. { 9. Scanner in = new Scanner(System.in); 10. System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): "); 11. String directory = in.nextLine(); 12. System.out.print("Enter keyword (e.g. volatile): "); 13. String keyword = in.nextLine(); 14. 15. final int FILE_QUEUE_SIZE = 10; 16. final int SEARCH_THREADS = 100; 17. 18. BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE); 19. 20. FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File (directory)); 21. new Thread(enumerator).start(); 22. for (int i = 1; i <= SEARCH_THREADS; i++) 23. new Thread(new SearchTask(queue, keyword)).start(); 24. } 25. } 26. 27. /** 28. This task enumerates all files in a directory and its subdirectories. 29. */ 30. class FileEnumerationTask implements Runnable 31. { 32. /** 33. Constructs a FileEnumerationTask. 34. @param queue the blocking queue to which the enumerated files are added 35. @param startingDirectory the directory in which to start the enumeration 36. */ 37. public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) 38. { 39. this.queue = queue; 40. this.startingDirectory = startingDirectory; 41. } 42. 43. public void run() 44. { 45. try 46. { 47. enumerate(startingDirectory); 48. queue.put(DUMMY); 49. } 50. catch (InterruptedException e) {} 51. } 52. 53. /** 54. Recursively enumerates all files in a given directory and its subdirectories 55. @param directory the directory in which to start 56. */ 57. public void enumerate(File directory) throws InterruptedException 58. { 59. File[] files = directory.listFiles(); 60. for (File file : files) { 61. if (file.isDirectory()) enumerate(file); 62. else queue.put(file); 63. } 64. } 65. 66. public static File DUMMY = new File(""); 67. 68. private BlockingQueue<File> queue; 69. private File startingDirectory; 70. } 71. 72. /** 73. This task searches files for a given keyword. 74. */ 75. class SearchTask implements Runnable 76. { 77. /** 78. Constructs a SearchTask. 79. @param queue the queue from which to take files 80. @param keyword the keyword to look for 81. */ 82. public SearchTask(BlockingQueue<File> queue, String keyword) 83. { 84. this.queue = queue; 85. this.keyword = keyword; 86. } 87. 88. public void run() 89. { 90. try 91. { 92. boolean done = false; 93. while (!done) 94. { 95. File file = queue.take(); 96. if (file == FileEnumerationTask.DUMMY) { queue.put(file); done = true; } 97. else search(file); 98. } 99. } 100. catch (IOException e) { e.printStackTrace(); } 101. catch (InterruptedException e) {} 102. } 103. 104. /** 105. Searches a file for a given keyword and prints all matching lines. 106. @param file the file to search 107. */ 108. public void search(File file) throws IOException 109. { 110. Scanner in = new Scanner(new FileInputStream(file)); 111. int lineNumber = 0; 112. while (in.hasNextLine()) 113. { 114. lineNumber++; 115. String line = in.nextLine(); 116. if (line.contains(keyword)) 117. System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line); 118. } 119. in.close(); 120. } 121. 122. private BlockingQueue<File> queue; 123. private String keyword; 124. } java.util.concurrent.ArrayBlockingQueue<E> 5.0
java.util.concurrent.LinkedBlockingQueue<E> 5.0
java.util.concurrent.DelayQueue<E extends Delayed> 5.0
java.util.concurrent.Delayed 5.0
java.util.concurrent.PriorityBlockingQueue<E> 5.0
java.util.concurrent.BlockingQueue<E> 5.0
java.util.Queue<E> 5.0
|
|