Under-appreciated Java Classes Part I: CompletionService

Java’s concurrency library, java.util.concurrent, has a variety of useful classes to make concurrent code easier to write and more reliable. (If you’re not familiar with that library, you’ll find this post a lot more comprehensible if you read Sun’s tutorial on concurrency.)

There are many ways to put the various building blocks that java.util.concurrent (j.u.c from now on) provides to create concurrent systems. Before we begin, here’s a short reminder of what the most basic j.u.c classes and interfaces are.

Runnable
Intended to be more or less equivalent to starting a Thread. It defines a method that does not return anything and cannot throw checked exceptions.
Callable
A more flexible variant of the same idea. It defines a method that can return a value and throw a checked exception.
Future
Represents the result of a computation (e.g. a Callable). It is used in place of a value that may not be computed yet.
Executor
Accepts Runnables and returns void.
ExecutorService
Extends Executor to also provide a method that accepts a Callable and returns a Future.
Executors
Provides simple methods to get common types of ExecutorService implementations.

Basic Concurrent Tasks

Let’s assume that you have a parallelizable task to complete. For the sake of clarity, let’s further assume that the parallelizable parts of the task are all independent (that is, they don’t have to be started or completed in any particular order relative to each other). Here are just a few of the many different ways you might use the basic tools to satisfiy the domain requirements you are operating under.

  • Submit some Callables to an ExecutorService and assume they complete normally (i.e. don’t use the resulting Future objects). This is the simplest option, but provides only very limited ways of handling any errors that may occur . This is mostly equivalent to submitting Runnables to an Executor (a Runnable that dies will trigger its UncaughtExceptionHandler while a Callable will not, as far as I can tell).
  • Submit some Callables to an ExecutorService. As soon as any of the Callables finishes normally, your task is complete and the rest of the results will be ignored.
  • Submit some Callables to an ExecutorService. The entire computation should fail eventually if one or more of the Callables does not execute normally.
  • Submit some Callables to an ExecutorService with the restriction that if any Callable fails, Callables that haven’t yet finished should be canceled.
  • Submit some Callables to an ExecutorService. As each Callable completes (or fails), an action should be taken as soon as possible (e.g. updating a GUI display of task progress)

Note: from here on, we will use Callable and ExecutorService instead of their less capable cousins Runnable and Executor.

All of these share the same basic structure: Callable objects are submitted to an ExecutorService, which then returns Future objects. The question now is how to manage the Future objects to achieve the desired effect. Let’s say that you want to do the last scenario from the list above. You could keep a Set or List of the Futures and iterate over them periodically, but this requires that you trade off the egregiousness of the polling inefficiency for responsiveness. What we want is to be able to treat the group of Futures like a queue that provides Futures as their corresponding Callables complete. Unfortunately, the aforementioned Java Concurrency tutorial does not provide any explanation of how to do this efficiently. This is a shame, especially when there’s already a class that does exactly what we need:CompletionService. ExecutorCompletionService is the sole standard implementation of the interface.

Simplifying with CompletionService

Let’s take a look at some sample code. The goal is to produce Widget objects and do some arbitrary task with them as they are created.

 
// service that wraps a thread pool with 5 threads
CompletionService compService = new ExecutorCompletionService(
    Executors.newFixedThreadPool(5));
 
// how many futures there are to check
int remainingFutures = 0;
 
for (Callable<Widget> c: getCallables()) {
    remainingFutures++;
 
    compService.submit(c);
}
 
Future<Widget> completedFuture;
Widget newWidget;
 
while (remainingFutures > 0) {
    // block until a callable completes
    completedFuture = compService.take();
    remainingFutures--;
 
    // get the Widget, if the Callable was able to create it
    try {
        newWidget = completedFuture.get();
    } catch (ExecutionException e) {
        Throwable cause = e.getCause();
        logger.warn("Widget creation failed", cause);
        continue;
    }
 
    // a Widget was created, so do something with it
    processCompletedWidget(newWidget);
}

This way, you can avoid the inefficiency of polling, as well as increasing responsiveness. This is far from the only way to use CompletionService, though. Just as an example, you could modify this code to cancel all waiting or in progress Callables if any Callable failed.

// service that wraps a thread pool with 5 threads
CompletionService compService = new ExecutorCompletionService(
    Executors.newFixedThreadPool(5));
 
// Futures for all submitted Callables that have not yet been checked
Set<Future<Widget>> futures = new HashSet<Future<Widget>>();
 
for (Callable<Widget> c: getCallables()) {
    // keep track of the futures that get created so we can cancel them if necessary
    futures.add(compService.submit(c));
}
 
Future<Widget> completedFuture;
Widget newWidget;
 
while (futures.size() > 0) {
    // block until a callable completes
    completedFuture = compService.take();
    futures.remove(completedFuture);
 
    // get the Widget, if the Callable was able to create it
    try {
        newWidget = completedFuture.get();
    } catch (ExecutionException e) {
        Throwable cause = e.getCause();
        logger.warn("Widget creation failed", cause);
 
        for (Future<Widget> f: futures) {
            // pass true if you wish to cancel in-progress Callables as well as
            // pending Callables
            f.cancel(true);
        }
 
        break;
    }
 
    // a Widget was created, so do something with it
    processCompletedWidget(newWidget);
}

As you can see, a CompletionService gives you the flexibility to treat it as a standard ExecutorService by providing Futures as you submit Callables, while also providing a handy queue-like interface to the very same Futures as their corresponding Callables complete. The one important thing you can’t do via the CompletionService interface is shutdown the underlying ExecutorService, so in some situations you may wish to keep a reference to the underlying ExecutorService to shutdown when you’re done using it. Nonetheless, CompletionService is a useful class for concurrent systems, and it deserves to be part of every Java programmer’s toolbox.

  • Digg
  • StumbleUpon
  • del.icio.us
  • Facebook
  • Twitter
  • Google Bookmarks
  • DZone
  • HackerNews
  • LinkedIn
  • Reddit
  • Pingback: Alex Miller - Weekly twitter links (May 5th)

  • kedar

    You are right. This is more comprehensible than the java.sun.com tutorial. Great job!

  • JeetendraS

    Thanks a lot ! I found this article very useful.

  • harry

    thanks.. good comprehensive tutorial that covers all the basics.. was looking for this among the clutters in the web or sun documentation.

  • George

    Nice and clear. Thanks for taking the time to share this.

  • Scott

    Nicely written, very clear, and helped me solve my problem. Thank you !

  • Trejkaz

    There’s a gotcha not mentioned here, which is that the order the values are returned in is not necessarily the same order as the tasks you submit. I was writing what’s supposed to be an even simpler parallel utility which *does* have this feature, and to simulate it with CompletionService I had to store the index of each result so that the array of results could be constructed in the right order.

  • http://progys.blogas.lt progys

    Great example, thanks :)

  • Quidam7

    Clear and helpful :)
    Thanks for taking the time to share this.

  • Durga Prasad

    Good one … Thanks

    As mentioned already , we have to maintain the index to get the results back in the same order ( If the order of results is important).
    Is there any other service which gives back the results in the order of submission !!

    • Marshall Pierce

      If you want to process Futures in the order you submit your Runnables/Callables, just put your Future objects in a Queue as you submit.

      The whole point of a CompletionService is to do something completely different: to get Futures in the order they complete, not the order they’re submitted in.