Stopping a Thread-based Service

Applications commonly create services that own threads, such as thread pools, and the lifetime of these services is usually longer than that of the method that creates them. If the application is to shut down gracefully, the threads owned by these services need to be terminated. Since there is no preemptive way to stop a thread, they must instead be persuaded to shut down on their own.

Sensible encapsulation practices dictate that you should not manipulate a threadinterrupt it, modify its priority, etc.unless you own it. The thread API has no formal concept of thread ownership: a thread is represented with a Thread object that can be freely shared like any other object. However, it makes sense to think of a thread as having an owner, and this is usually the class that created the thread. So a thread pool owns its worker threads, and if those threads need to be interrupted, the thread pool should take care of it.

As with any other encapsulated object, thread ownership is not transitive: the application may own the service and the service may own the worker threads, but the application doesn't own the worker threads and therefore should not attempt to stop them directly. Instead, the service should provide lifecycle methods for shutting itself down that also shut down the owned threads; then the application can shut down the service, and the service can shut down the threads. ExecutorService provides the shutdown and shutdownNow methods; other thread-owning services should provide a similar shutdown mechanism.

Provide lifecycle methods whenever a thread-owning service has a lifetime longer than that of the method that created it.

 

7.2.1. Example: A Logging Service

Most server applications use logging, which can be as simple as inserting println statements into the code. Stream classes like PrintWriter are thread-safe, so this simple approach would require no explicit synchronization.[3] However, as we'll see in Section 11.6, inline logging can have some performance costs in highvolume applications. Another alternative is have the log call queue the log message for processing by another thread.

[3] If you are logging multiple lines as part of a single log message, you may need to use additional client-side locking to prevent undesirable interleaving of output from multiple threads. If two threads logged multiline stack traces to the same stream with one println call per line, the results would be interleaved unpredictably, and could easily look like one large but meaningless stack trace.

Listing 7.12. Encapsulating Nonstandard Cancellation in a Task with Newtaskfor.

public interface CancellableTask extends Callable {
 void cancel();
 RunnableFuture newTask();
}

@ThreadSafe
public class CancellingExecutor extends ThreadPoolExecutor {
 ...
 protected RunnableFuture newTaskFor(Callable callable) {
 if (callable instanceof CancellableTask)
 return ((CancellableTask) callable).newTask();
 else
 return super.newTaskFor(callable);
 }
}

public abstract class SocketUsingTask
 implements CancellableTask {
 @GuardedBy("this") private Socket socket;

 protected synchronized void setSocket(Socket s) { socket = s; }

 public synchronized void cancel() {
 try {
 if (socket != null)
 socket.close();
 } catch (IOException ignored) { }
 }

 public RunnableFuture newTask() {
 return new FutureTask(this) {
 public boolean cancel(boolean mayInterruptIfRunning) {
 try {
 SocketUsingTask.this.cancel();
 } finally {
 return super.cancel(mayInterruptIfRunning);
 }
 }
 };
 }
}

LogWriter in Listing 7.13 shows a simple logging service in which the logging activity is moved to a separate logger thread. Instead of having the thread that produces the message write it directly to the output stream, LogWriter hands it off to the logger thread via a BlockingQueue and the logger thread writes it out. This is a multiple-producer, single-consumer design: any activity calling log is acting as a producer, and the background logger thread is the consumer. If the logger thread falls behind, the BlockingQueue eventually blocks the producers until the logger thread catches up.

Listing 7.13. Producer-Consumer Logging Service with No Shutdown Support.

public class LogWriter {
 private final BlockingQueue queue;
 private final LoggerThread logger;

 public LogWriter(Writer writer) {
 this.queue = new LinkedBlockingQueue(CAPACITY);
 this.logger = new LoggerThread(writer);
 }

 public void start() { logger.start(); }

 public void log(String msg) throws InterruptedException {
 queue.put(msg);
 }

 private class LoggerThread extends Thread {
 private final PrintWriter writer;
 ...
 public void run() {
 try {
 while (true)
 writer.println(queue.take());
 } catch(InterruptedException ignored) {
 } finally {
 writer.close();
 }
 }
 }
}

For a service like LogWriter to be useful in production, we need a way to terminate the logger thread so it does not prevent the JVM from shutting down normally. Stopping the logger thread is easy enough, since it repeatedly calls take, which is responsive to interruption; if the logger thread is modified to exit on catching InterruptedException, then interrupting the logger thread stops the service.

However, simply making the logger thread exit is not a very satifying shutdown mechanism. Such an abrupt shutdown discards log messages that might be waiting to be written to the log, but, more importantly, threads blocked in log because the queue is full will never become unblocked. Cancelling a producerconsumer activity requires cancelling both the producers and the consumers. Interrupting the logger thread deals with the consumer, but because the producers in this case are not dedicated threads, cancelling them is harder.

Another approach to shutting down LogWriter would be to set a "shutdown requested" flag to prevent further messages from being submitted, as shown in Listing 7.14. The consumer could then drain the queue upon being notified that shutdown has been requested, writing out any pending messages and unblocking any producers blocked in log. However, this approach has race conditions that make it unreliable. The implementation of log is a check-then-act sequence: producers could observe that the service has not yet been shut down but still queue messages after the shutdown, again with the risk that the producer might get blocked in log and never become unblocked. There are tricks that reduce the likelihood of this (like having the consumer wait several seconds before declaring the queue drained), but these do not change the fundamental problem, merely the likelihood that it will cause a failure.

Listing 7.14. Unreliable Way to Add Shutdown Support to the Logging Service.

public void log(String msg) throws InterruptedException {
 if (!shutdownRequested)
 queue.put(msg);
 else
 throw new IllegalStateException("logger is shut down");
}

The way to provide reliable shutdown for LogWriter is to fix the race condition, which means making the submission of a new log message atomic. But we don't want to hold a lock while trying to enqueue the message, since put could block. Instead, we can atomically check for shutdown and conditionally increment a counter to "reserve" the right to submit a message, as shown in LogService in Listing 7.15.

7.2.2. ExecutorService Shutdown

In Section 6.2.4, we saw that ExecutorService offers two ways to shut down: graceful shutdown with shutdown, and abrupt shutdown with shutdownNow. In an abrupt shutdown, shutdownNow returns the list of tasks that had not yet started after attempting to cancel all actively executing tasks.

Listing 7.15. Adding Reliable Cancellation to LogWriter.

public class LogService {
 private final BlockingQueue queue;
 private final LoggerThread loggerThread;
 private final PrintWriter writer;
 @GuardedBy("this") private boolean isShutdown;
 @GuardedBy("this") private int reservations;

 public void start() { loggerThread.start(); }

 public void stop() {
 synchronized (this) { isShutdown = true; }
 loggerThread.interrupt();
 }

 public void log(String msg) throws InterruptedException {
 synchronized (this) {
 if (isShutdown)
 throw new IllegalStateException(...);
 ++reservations;
 }
 queue.put(msg);
 }

 private class LoggerThread extends Thread {
 public void run() {
 try {
 while (true) {
 try {
 synchronized (this) {
 if (isShutdown && reservations == 0)
 break;
 }
 String msg = queue.take();
 synchronized (this) { --reservations; }
 writer.println(msg);
 } catch (InterruptedException e) { /* retry */ }
 }
 } finally {
 writer.close();
 }
 }
 }
}

The two different termination options offer a tradeoff between safety and responsiveness: abrupt termination is faster but riskier because tasks may be interrupted in the middle of execution, and normal termination is slower but safer because the ExecutorService does not shut down until all queued tasks are processed. Other thread-owning services should consider providing a similar choice of shutdown modes.

Simple programs can get away with starting and shutting down a global ExecutorService from main. More sophisticated programs are likely to encapsulate an ExecutorService behind a higher-level service that provides its own lifecycle methods, such as the variant of LogService in Listing 7.16 that delegates to an ExecutorService instead of managing its own threads. Encapsulating an ExecutorService extends the ownership chain from application to service to thread by adding another link; each member of the chain manages the lifecycle of the services or threads it owns.

Listing 7.16. Logging Service that Uses an ExecutorService.

public class LogService {
 private final ExecutorService exec = newSingleThreadExecutor();
 ...
 public void start() { }

 public void stop() throws InterruptedException {
 try {
 exec.shutdown();
 exec.awaitTermination(TIMEOUT, UNIT);
 } finally {
 writer.close();
 }
 }
 public void log(String msg) {
 try {
 exec.execute(new WriteTask(msg));
 } catch (RejectedExecutionException ignored) { }
 }
}

7.2.3. Poison Pills

Another way to convince a producer-consumer service to shut down is with a poison pill: a recognizable object placed on the queue that means "when you get this, stop." With a FIFO queue, poison pills ensure that consumers finish the work on their queue before shutting down, since any work submitted prior to submitting the poison pill will be retrieved before the pill; producers should not submit any work after putting a poison pill on the queue. IndexingService in Listings 7.17, 7.18, and 7.19 shows a single-producer, single-consumer version of the desktop search example from Listing 5.8 on page 91 that uses a poison pill to shut down the service.

Listing 7.17. Shutdown with Poison Pill.

public class IndexingService {
 private static final File POISON = new File("");
 private final IndexerThread consumer = new IndexerThread();
 private final CrawlerThread producer = new CrawlerThread();
 private final BlockingQueue queue;
 private final FileFilter fileFilter;
 private final File root;

 class CrawlerThread extends Thread { /* Listing 7.18 */ }
 class IndexerThread extends Thread { /* Listing 7.19 */ }

 public void start() {
 producer.start();
 consumer.start();
 }

 public void stop() { producer.interrupt(); }

 public void awaitTermination() throws InterruptedException {
 consumer.join();
 }
}

Poison pills work only when the number of producers and consumers is known. The approach in IndexingService can be extended tomultiple producers by having each producer place a pill on the queue and having the consumer stop only when it receives Nproducers pills. It can be extended to multiple consumers by having each producer place Nconsumers pills on the queue, though this can get unwieldy with large numbers of producers and consumers. Poison pills work reliably only with unbounded queues.

7.2.4. Example: A One-shot Execution Service

If a method needs to process a batch of tasks and does not return until all the tasks are finished, it can simplify service lifecycle management by using a private Executor whose lifetime is bounded by that method. (The invokeAll and invokeAny methods can often be useful in such situations.)

The checkMail method in Listing 7.20 checks for new mail in parallel on a number of hosts. It creates a private executor and submits a task for each host: it then shuts down the executor and waits for termination, which occurs when all the mail-checking tasks have completed.[4]

[4] The reason an AtomicBoolean is used instead of a volatile boolean is that in order to access the hasNewMail flag from the inner Runnable, it would have to be final, which would preclude modifying it.

Listing 7.18. Producer Thread for IndexingService.

public class CrawlerThread extends Thread {
 public void run() {
 try {
 crawl(root);
 } catch (InterruptedException e) { /* fall through */ }
 finally {
 while (true) {
 try {
 queue.put(POISON);
 break;
 } catch (InterruptedException e1) { /* retry */ }
 }
 }
 }

 private void crawl(File root) throws InterruptedException {
 ...
 }
}

Listing 7.19. Consumer Thread for IndexingService.

public class IndexerThread extends Thread {
 public void run() {
 try {
 while (true) {
 File file = queue.take();
 if (file == POISON)
 break;
 else
 indexFile(file);
 }
 } catch (InterruptedException consumed) { }
 }
}

Listing 7.20. Using a Private Executor Whose Lifetime is Bounded by a Method Call.

boolean checkMail(Set hosts, long timeout, TimeUnit unit)
 throws InterruptedException {
 ExecutorService exec = Executors.newCachedThreadPool();
 final AtomicBoolean hasNewMail = new AtomicBoolean(false);
 try {
 for (final String host : hosts)
 exec.execute(new Runnable() {
 public void run() {
 if (checkMail(host))
 hasNewMail.set(true);
 }
 });
 } finally {
 exec.shutdown();
 exec.awaitTermination(timeout, unit);
 }
 return hasNewMail.get();
}

7.2.5. Limitations of Shutdownnow

When an ExecutorService is shut down abruptly with shutdownNow, it attempts to cancel the tasks currently in progress and returns a list of tasks that were submitted but never started so that they can be logged or saved for later processing.[5]

[5] The Runnable objects returned by shutdownNow might not be the same objects that were submitted to the ExecutorService: they might be wrapped instances of the submitted tasks.

However, there is no general way to find out which tasks started but did not complete. This means that there is no way of knowing the state of the tasks in progress at shutdown time unless the tasks themselves perform some sort of checkpointing. To know which tasks have not completed, you need to know not only which tasks didn't start, but also which tasks were in progress when the executor was shut down.[6]

[6] Unfortunately, there is no shutdown option in which tasks not yet started are returned to the caller but tasks in progress are allowed to complete; such an option would eliminate this uncertain intermediate state.

TRackingExecutor in Listing 7.21 shows a technique for determining which tasks were in progress at shutdown time. By encapsulating an ExecutorService and instrumenting execute (and similarly submit, not shown) to remember which tasks were cancelled after shutdown, trackingExecutor can identify which tasks started but did not complete normally. After the executor terminates, getCancelledTasks returns the list of cancelled tasks. In order for this technique to work, the tasks must preserve the thread's interrupted status when they return, which well behaved tasks will do anyway.

Listing 7.21. ExecutorService that Keeps Track of Cancelled Tasks After Shutdown.

public class TrackingExecutor extends AbstractExecutorService {
 private final ExecutorService exec;
 private final Set tasksCancelledAtShutdown =
 Collections.synchronizedSet(new HashSet());
 ...
 public List getCancelledTasks() {
 if (!exec.isTerminated())
 throw new IllegalStateException(...);
 return new ArrayList(tasksCancelledAtShutdown);
 }

 public void execute(final Runnable runnable) {
 exec.execute(new Runnable() {
 public void run() {
 try {
 runnable.run();
 } finally {
 if (isShutdown()
 && Thread.currentThread().isInterrupted())
 tasksCancelledAtShutdown.add(runnable);
 }
 }
 });
 }

 // delegate other ExecutorService methods to exec
}

WebCrawler in Listing 7.22 shows an application of trackingExecutor. The work of a web crawler is often unbounded, so if a crawler must be shut down we might want to save its state so it can be restarted later. CrawlTask provides a getPage method that identifies what page it is working on. When the crawler is shut down, both the tasks that did not start and those that were cancelled are scanned and their URLs recorded, so that page-crawling tasks for those URLs can be added to the queue when the crawler restarts.

TRackingExecutor has an unavoidable race condition that could make it yield false positives: tasks that are identified as cancelled but actually completed. This arises because the thread pool could be shut down between when the last instruction of the task executes and when the pool records the task as complete. This is not a problem if tasks are idempotent (if performing them twice has the same effect as performing them once), as they typically are in a web crawler. Otherwise, the application retrieving the cancelled tasks must be aware of this risk and be prepared to deal with false positives.

Listing 7.22. Using TRackingExecutorService to Save Unfinished Tasks for Later Execution.

public abstract class WebCrawler {
 private volatile TrackingExecutor exec;
 @GuardedBy("this")
 private final Set urlsToCrawl = new HashSet();
 ...
 public synchronized void start() {
 exec = new TrackingExecutor(
 Executors.newCachedThreadPool());
 for (URL url : urlsToCrawl) submitCrawlTask(url);
 urlsToCrawl.clear();
 }

 public synchronized void stop() throws InterruptedException {
 try {
 saveUncrawled(exec.shutdownNow());
 if (exec.awaitTermination(TIMEOUT, UNIT))
 saveUncrawled(exec.getCancelledTasks());
 } finally {
 exec = null;
 }
 }

 protected abstract List processPage(URL url);

 private void saveUncrawled(List uncrawled) {
 for (Runnable task : uncrawled)
 urlsToCrawl.add(((CrawlTask) task).getPage());
 }
 private void submitCrawlTask(URL u) {
 exec.execute(new CrawlTask(u));
 }
 private class CrawlTask implements Runnable {
 private final URL url;
 ...
 public void run() {
 for (URL link : processPage(url)) {
 if (Thread.currentThread().isInterrupted())
 return;
 submitCrawlTask(link);
 }
 }
 public URL getPage() { return url; }
 }
}


Handling Abnormal Thread Termination

Introduction

Part I: Fundamentals

Thread Safety

Sharing Objects

Composing Objects

Building Blocks

Part II: Structuring Concurrent Applications

Task Execution

Cancellation and Shutdown

Applying Thread Pools

GUI Applications

Part III: Liveness, Performance, and Testing

Avoiding Liveness Hazards

Performance and Scalability

Testing Concurrent Programs

Part IV: Advanced Topics

Explicit Locks

Building Custom Synchronizers

Atomic Variables and Nonblocking Synchronization

The Java Memory Model



Java Concurrency in Practice
Java Concurrency in Practice
ISBN: 0321349601
EAN: 2147483647
Year: 2004
Pages: 141

Flylib.com © 2008-2020.
If you may any questions please contact us: flylib@qtcs.net