com.stumbleupon.async
Class Deferred<T>

java.lang.Object
  extended by com.stumbleupon.async.Deferred<T>
Type Parameters:
T - The type of the deferred result.

public final class Deferred<T>
extends Object

A thread-safe implementation of a deferred result for easy asynchronous processing.

This implementation is based on Twisted's Python Deferred API.

This API is a simple and elegant way of managing asynchronous and dynamic "pipelines" (processing chains) without having to explicitly define a finite state machine.

The tl;dr version

We're all busy and don't always have time to RTFM in details. Please pay special attention to the invariants you must respect. Other than that, here's an executive summary of what Deferred offers:

Understanding the concept of Deferred

The idea is that a Deferred represents a result that's not yet available. An asynchronous operation (I/O, RPC, whatever) has been started and will hand its result (be it successful or not) to the Deferred in the future. The key difference between a Deferred and a Future is that a Deferred has a callback chain associated to it, whereas with just a Future you need get the result manually at some point, which poses problems such as: How do you know when the result is available? What if the result itself depends on another future?

When you start an asynchronous operation, you typically want to be called back when the operation completes. If the operation was successful, you want your callback to use its result to carry on what you were doing at the time you started the asynchronous operation. If there was an error, you want to trigger some error handling code.

But there's more to a Deferred than a single callback. You can add arbitrary number of callbacks, which effectively allows you to easily build complex processing pipelines in a really simple and elegant way.

Understanding the callback chain

Let's take a typical example. You're writing a client library for others to use your simple remote storage service. When your users call the get method in your library, you want to retrieve some piece of data from your remote service and hand it back to the user, but you want to do so in an asynchronous fashion.

When the user of your client library invokes get, you assemble a request and send it out to the remote server through a socket. Before sending it to the socket, you create a Deferred and you store it somewhere, for example in a map, to keep an association between the request and this Deferred. You then return this Deferred to the user, this is how they will access the deferred result as soon as the RPC completes.

Sooner or later, the RPC will complete (successfully or not), and your socket will become readable (or maybe closed, in the event of a failure). Let's assume for now that everything works as expected, and thus the socket is readable, so you read the response from the socket. At this point you extract the result of the remote get call, and you hand it out to the Deferred you created for this request (remember, you had to store it somewhere, so you could give it the deferred result once you have it). The Deferred then stores this result and triggers any callback that may have been added to it. The expectation is that the user of your client library, after calling your get method, will add a Callback to the Deferred you gave them. This way, when the deferred result becomes available, you'll call it with the result in argument.

So far what we've explained is nothing more than a Future with a callback associated to it. But there's more to Deferred than just this. Let's assume now that someone else wants to build a caching layer on top of your client library, to avoid repeatedly getting the same value over and over again through the network. Users who want to use the cache will invoke get on the caching library instead of directly calling your client library.

Let's assume that the caching library already has a result cached for a get call. It will create a Deferred, and immediately hand it the cached result, and it will return this Deferred to the user. The user will add a Callback to it, which will be immediately invoked since the deferred result is already available. So the entire get call completed virtually instantaneously and entirely from the same thread. There was no context switch (no other thread involved, no I/O and whatnot), nothing ever blocked, everything just happened really quickly.

Now let's assume that the caching library has a cache miss and needs to do a remote get call using the original client library described earlier. The RPC is sent out to the remote server and the client library returns a Deferred to the caching library. This is where things become exciting. The caching library can then add its own callback to the Deferred before returning it to the user. This callback will take the result that came back from the remote server, add it to the cache and return it. As usual, the user then adds their own callback to process the result. So now the Deferred has 2 callbacks associated to it:

              1st callback       2nd callback

   Deferred:  add to cache  -->  user callback
 
When the RPC completes, the original client library will de-serialize the result from the wire and hand it out to the Deferred. The first callback will be invoked, which will add the result to the cache of the caching library. Then whatever the first callback returns will be passed on to the second callback. It turns out that the caching callback returns the get response unchanged, so that will be passed on to the user callback.

Now it's very important to understand that the first callback could have returned another arbitrary value, and that's what would have been passed to the second callback. This may sound weird at first but it's actually the key behind Deferred.

To illustrate why, let's complicate things a bit more. Let's assume the remote service that serves those get requests is a fairly simple and low-level storage service (think memcached), so it only works with byte arrays, it doesn't care what the contents is. So the original client library is only de-serializing the byte array from the network and handing that byte array to the Deferred.

Now you're writing a higher-level library that uses this storage system to store some of your custom objects. So when you get the byte array from the server, you need to further de-serialize it into some kind of an object. Users of your higher-level library don't care about what kind of remote storage system you use, the only thing they care about is getting those objects asynchronously. Your higher-level library is built on top of the original low-level library that does the RPC communication.

When the users of the higher-level library call get, you call get on the lower-level library, which issues an RPC call and returns a Deferred to the higher-level library. The higher-level library then adds a first callback to further de-serialize the byte array into an object. Then the user of the higher-level library adds their own callback that does something with that object. So now we have something that looks like this:

              1st callback                    2nd callback

   Deferred:  de-serialize to an object  -->  user callback
 

When the result comes in from the network, the byte array is de-serialized from the socket. The first callback is invoked and its argument is the initial result, the byte array. So the first callback further de-serializes it into some object that it returns. The second callback is then invoked and its argument is the result of the previous callback, that is the de-serialized object.

Now back to the caching library, which has nothing to do with the higher level library. All it does is, given an object that implements some interface with a get method, it keeps a map of whatever arguments get receives to an Object that was cached for this particular get call. Thanks to the way the callback chain works, it's possible to use the caching library together with the higher-level library transparently. Users who want to use caching simply need to use the caching library together with the higher level library. Now when they call get on the caching library, and there's a cache miss, here's what happens, step by step:

  1. The caching library calls get on the higher-level library.
  2. The higher-level library calls get on the lower-level library.
  3. The lower-level library creates a Deferred, issues out the RPC call and returns its Deferred.
  4. The higher-level library adds its own object de-serialization callback to the Deferred and returns it.
  5. The caching library adds its own cache-updating callback to the Deferred and returns it.
  6. The user gets the Deferred and adds their own callback to do something with the object retrieved from the data store.
              1st callback       2nd callback       3rd callback

   Deferred:  de-serialize  -->  add to cache  -->  user callback
   result: (none available)
 
Once the response comes back, the first callback is invoked, it de-serializes the object, returns it. The current result of the Deferred becomes the de-serialized object. The current state of the Deferred is as follows:
              2nd callback       3rd callback

   Deferred:  add to cache  -->  user callback
   result: de-serialized object
 
Because there are more callbacks in the chain, the Deferred invokes the next one and gives it the current result (the de-serialized object) in argument. The callback adds that object to its cache and returns it unchanged.
              3rd callback

   Deferred:  user callback
   result: de-serialized object
 
Finally, the user's callback is invoked with the object in argument.
   Deferred:  (no more callbacks)
   result: (whatever the user's callback returned)
 
If you think this is becoming interesting, read on, you haven't reached the most interesting thing about Deferred yet.

Building dynamic processing pipelines with Deferred

Let's complicate the previous example a little bit more. Let's assume that the remote storage service that serves those get calls is a distributed service that runs on many machines. The data is partitioned over many nodes and moves around as nodes come and go (due to machine failures and whatnot). In order to execute a get call, the low-level client library first needs to know which server is currently serving that piece of data. Let's assume that there's another server, which is part of that distributed service, that maintains an index and keeps track of where each piece of data is. The low-level client library first needs to lookup the location of the data using that first server (that's a first RPC), then retrieves it from the storage node (that's another RPC). End users don't care that retrieving data involves a 2-step process, they just want to call get and be called back when the data (a byte array) is available.

This is where what's probably the most useful feature of Deferred comes in. When the user calls get, the low-level library will issue a first RPC to the index server to locate the piece of data requested by the user. When issuing this lookup RPC, a Deferred gets created. The low-level get code adds a first callback to process the lookup response and then returns it to the user.

              1st callback       2nd callback

   Deferred:  index lookup  -->  user callback
   result: (none available)
 
Eventually, the lookup RPC completes, and the Deferred is given the lookup response. So before triggering the first callback, the Deferred will be in this state:
              1st callback       2nd callback

   Deferred:  index lookup  -->  user callback
   result: lookup response
 
The first callback runs and now knows where to find the piece of data initially requested. It issues the get request to the right storage node. Doing so creates another Deferred, let's call it (B), which is then returned by the index lookup callback. And this is where the magic happens. Now we're in this state:
   (A)        2nd callback    |   (B)
                              |
   Deferred:  user callback   |   Deferred:  (no more callbacks)
   result: Deferred (B)       |   result: (none available)
 
Because a callback returned a Deferred, we can't invoke the user callback just yet, since the user doesn't want their callback receive a Deferred, they want it to receive a byte array. The current callback gets paused and stops processing the callback chain. This callback chain needs to be resumed whenever the Deferred of the get call [(B)] completes. In order to achieve that, a callback is added to that other Deferred that will resume the execution of the callback chain.
   (A)        2nd callback    |   (B)        1st callback
                              |
   Deferred:  user callback   |   Deferred:  resume (A)
   result: Deferred (B)       |   result: (none available)
 
Once (A) added the callback on (B), it can return immediately, there's no need to wait, block a thread or anything like that. So the whole process of receiving the lookup response and sending out the get RPC happened really quickly, without blocking anything.

Now when the get response comes back from the network, the RPC layer de-serializes the byte array, as usual, and hands it to (B):

   (A)        2nd callback    |   (B)        1st callback
                              |
   Deferred:  user callback   |   Deferred:  resume (A)
   result: Deferred (B)       |   result: byte array
 
(B)'s first and only callback is going to set the result of (A) and resume (A)'s callback chain.
   (A)        2nd callback    |   (B)        1st callback
                              |
   Deferred:  user callback   |   Deferred:  resume (A)
   result: byte array         |   result: byte array
 
So now (A) resumes its callback chain, and invokes the user's callback with the byte array in argument, which is what they wanted.
   (A)                        |   (B)        1st callback
                              |
   Deferred:  (no more cb)    |   Deferred:  resume (A)
   result: (return value of   |   result: byte array
            the user's cb)
 
Then (B) moves on to its next callback in the chain, but there are none, so (B) is done too.
   (A)                        |   (B)
                              |
   Deferred:  (no more cb)    |   Deferred:  (no more cb)
   result: (return value of   |   result: byte array
            the user's cb)
 
The whole process of reading the get response, resuming the initial Deferred and executing the second Deferred happened all in the same thread, sequentially, and without blocking anything (provided that the user's callback didn't block, as it must not).

What we've done is essentially equivalent to dynamically building an implicit finite state machine to handle the life cycle of the get request. This simple API allows you to build arbitrarily complex processing pipelines that make dynamic decisions at each stage of the pipeline as to what to do next.

Handling errors

A Deferred has in fact not one but two callback chains. The first chain is the "normal" processing chain, and the second is the error handling chain. Twisted calls an error handling callback an "errback", so we've kept that term here. When the asynchronous processing completes with an error, the Deferred must be given the Exception that was caught instead of giving it the result (or if no Exception was caught, one must be created and handed to the Deferred). When the current result of a Deferred is an instance of Exception, the next errback is invoked. As for normal callbacks, whatever the errback returns becomes the current result. If the current result is still an instance of Exception, the next errback is invoked. If the current result is no longer an Exception, the next callback is invoked.

When a callback or an errback itself throws an exception, it is caught by the Deferred and becomes the current result, which means that the next errback in the chain will be invoked with that exception in argument. Note that Deferred will only catch Exceptions, not any Throwable or Error.

Contract and Invariants

Read this carefully as this is your warranty.


Constructor Summary
Deferred()
          Constructor.
 
Method Summary
<R> Deferred<R>
addBoth(Callback<R,T> cb)
          Registers a callback both as a callback and as an "errback".
<R,D extends Deferred<R>>
Deferred<R>
addBothDeferring(Callback<D,T> cb)
          Registers a callback both as a callback and as an "errback".
<R> Deferred<R>
addCallback(Callback<R,T> cb)
          Registers a callback.
<R,D extends Deferred<R>>
Deferred<R>
addCallbackDeferring(Callback<D,T> cb)
          Registers a callback.
<R,R2,E> Deferred<R>
addCallbacks(Callback<R,T> cb, Callback<R2,E> eb)
          Registers a callback and an "errback".
<R,E> Deferred<T>
addErrback(Callback<R,E> eb)
          Registers an "errback".
 void callback(Object initresult)
          Starts running the callback chain.
 Deferred<T> chain(Deferred<T> other)
          Chains another Deferred to this one.
static
<T> Deferred<T>
fromError(Exception error)
          Constructs a Deferred with an error that's readily available.
static
<T> Deferred<T>
fromResult(T result)
          Constructs a Deferred with a result that's readily available.
static
<T> Deferred<ArrayList<Object>>
group(Collection<Deferred<T>> deferreds)
          Groups multiple Deferreds together in a single one.
static
<T> Deferred<ArrayList<Object>>
group(Deferred<T> d1, Deferred<T> d2)
          Groups two Deferreds together in a single one.
static
<T> Deferred<ArrayList<Object>>
group(Deferred<T> d1, Deferred<T> d2, Deferred<T> d3)
          Groups three Deferreds together in a single one.
 T join()
          Synchronously waits until this Deferred is called back.
 T joinUninterruptibly()
          Synchronously waits until this Deferred is called back.
 String toString()
          Returns a helpful string representation of this Deferred.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

Deferred

public Deferred()
Constructor.

Method Detail

fromResult

public static <T> Deferred<T> fromResult(T result)
Constructs a Deferred with a result that's readily available.

This is equivalent to writing:

Deferred<T> d = new Deferred<T>();
   d.callback(result);
 
Callbacks added to this Deferred will be immediately called.

Parameters:
result - The "deferred" result.
Returns:
a new Deferred.

fromError

public static <T> Deferred<T> fromError(Exception error)
Constructs a Deferred with an error that's readily available.

This is equivalent to writing:

Deferred<T> d = new Deferred<T>();
   d.callback(error);
 
Errbacks added to this Deferred will be immediately called.

Parameters:
error - The error to use as a result.

addCallbacks

public <R,R2,E> Deferred<R> addCallbacks(Callback<R,T> cb,
                                         Callback<R2,E> eb)
Registers a callback and an "errback".

If the deferred result is already available, the callback or the errback (depending on the nature of the result) is executed immediately from this thread.

Parameters:
cb - The callback to register.
eb - Th errback to register.
Returns:
this with an "updated" type.
Throws:
StackOverflowError - if there are too many callbacks in this chain. The maximum number of callbacks allowed in a chain is set by the implementation. The limit is high enough that you shouldn't have to worry about this exception (which is why it's an Error actually). If you hit it, you probably did something wrong.

addCallback

public <R> Deferred<R> addCallback(Callback<R,T> cb)
Registers a callback.

If the deferred result is already available and isn't an exception, the callback is executed immediately from this thread. If the deferred result is already available and is an exception, the callback is discarded. If the deferred result is not available, this callback is queued and will be invoked from whichever thread gives this deferred its initial result by calling callback(java.lang.Object).

Parameters:
cb - The callback to register.
Returns:
this with an "updated" type.

addCallbackDeferring

public <R,D extends Deferred<R>> Deferred<R> addCallbackDeferring(Callback<D,T> cb)
Registers a callback.

This has the exact same effect as addCallback(com.stumbleupon.async.Callback), but keeps the type information "correct" when the callback to add returns a Deferred.

Parameters:
cb - The callback to register.
Returns:
this with an "updated" type.

addErrback

public <R,E> Deferred<T> addErrback(Callback<R,E> eb)
Registers an "errback".

If the deferred result is already available and is an exception, the errback is executed immediately from this thread. If the deferred result is already available and isn't an exception, the errback is discarded. If the deferred result is not available, this errback is queued and will be invoked from whichever thread gives this deferred its initial result by calling callback(java.lang.Object).

Parameters:
eb - The errback to register.
Returns:
this with an "updated" type.

addBoth

public <R> Deferred<R> addBoth(Callback<R,T> cb)
Registers a callback both as a callback and as an "errback".

If the deferred result is already available, the callback is executed immediately from this thread (regardless of whether or not the current result is an exception). If the deferred result is not available, this callback is queued and will be invoked from whichever thread gives this deferred its initial result by calling callback(java.lang.Object).

Parameters:
cb - The callback to register.
Returns:
this with an "updated" type.

addBothDeferring

public <R,D extends Deferred<R>> Deferred<R> addBothDeferring(Callback<D,T> cb)
Registers a callback both as a callback and as an "errback".

This has the exact same effect as addBoth(com.stumbleupon.async.Callback), but keeps the type information "correct" when the callback to add returns a Deferred.

Parameters:
cb - The callback to register.
Returns:
this with an "updated" type.

chain

public Deferred<T> chain(Deferred<T> other)
Chains another Deferred to this one.

This method simply ensures that whenever the callback chain in this is run, then the callback chain in other gets run too. The result handed to other is whatever result this currently has.

One case where this is particularly useful is when you want to multiplex a result to multiple Deferreds, that is if multiple "listeners" want to have their callback chain triggered when a single event completes:

public class ResultMultiplexer {
   private Deferred<Foo> listeners = new Deferred<Foo>();

   public void addListener(Deferred<Foo> d) {
     listeners.chain(d);
   }

   public void emitResult(Foo event) {
     listeners.callback(event);
     // Remember that a Deferred is a one-time callback chain.
     // Once emitResult is called, everyone interested in getting the
     // next event would need to call addListener again.  This isn't a
     // pub-sub system, it's just showing how to multiplex a result.
     listeners = new Deferred<Foo>();
   }
 }
 

Parameters:
other - The Deferred to chain to this one.
Returns:
this, always.

group

public static <T> Deferred<ArrayList<Object>> group(Collection<Deferred<T>> deferreds)
Groups multiple Deferreds together in a single one.

Conceptually, this does the opposite of chain(com.stumbleupon.async.Deferred), in the sense that it demultiplexes multiple Deferreds into a single one, so that you can easily take an action once all the Deferreds in the group have been called back.

public class ResultDemultiplexer {
   private ArrayList<Deferred<Foo>> deferreds = new ArrayList<Deferred<Foo>>();

   public void addDeferred(final Deferred<Foo> d) {
     deferreds.add(d);
   }

   public Deferred<ArrayList<Object>> demultiplex() {
     Deferred<ArrayList<Object>> demultiplexed = Deferred.group(deferreds);
     deferreds.clear();
     return demultiplexed;
   }
 }
 
In the example above, any number of Deferred can be added to the demultiplexer, and once demultiplex() is invoked, the Deferred returned will be invoked once all the Deferreds added to the demultiplexer have been called back.

Parameters:
deferreds - All the Deferreds to group together.
Returns:
A new Deferred that will be called back once all the Deferreds given in argument have been called back. Each element in the list will be either of type <T> or an Exception. If any of the elements in the list is an Exception, the errback of the Deferred returned will be invoked with a DeferredGroupException in argument.

There's no guarantee on the order of the results in the deferred list returned, it depends on the order in which the Deferreds in the group complete.


group

public static <T> Deferred<ArrayList<Object>> group(Deferred<T> d1,
                                                    Deferred<T> d2)
Groups two Deferreds together in a single one.

This is semantically equivalent to:

group(Arrays.asList(d1, d2));
except that it's type safe as it doesn't involve an unchecked generic array creation of type Deferred<T> for the varargs parameter passed to asList.

Parameters:
d1 - The first Deferred to put in the group.
d2 - The second Deferred to put in the group.
Returns:
A new Deferred that will be called back once both Deferreds given in argument have been called back.
See Also:
group(Collection)

group

public static <T> Deferred<ArrayList<Object>> group(Deferred<T> d1,
                                                    Deferred<T> d2,
                                                    Deferred<T> d3)
Groups three Deferreds together in a single one.

This is semantically equivalent to:

group(Arrays.asList(d1, d2, d3));
except that it's type safe as it doesn't involve an unchecked generic array creation of type Deferred<T> for the varargs parameter passed to asList.

Parameters:
d1 - The first Deferred to put in the group.
d2 - The second Deferred to put in the group.
d3 - The third Deferred to put in the group.
Returns:
A new Deferred that will be called back once all three Deferreds given in argument have been called back.
See Also:
group(Collection)

callback

public void callback(Object initresult)
Starts running the callback chain.

This posts the initial result that will be passed to the first callback in the callback chain. If the argument is an Exception then the "errback" chain will be triggered instead.

This method will not let any Exception thrown by a callback propagate. You shouldn't try to catch any RuntimeException when you call this method, as this is unnecessary.

Parameters:
initresult - The initial result with which to start the 1st callback. The following must be true: initresult instanceof T || initresult instanceof Exception
Throws:
AssertionError - if this method was already called on this instance.
AssertionError - if initresult == this.

join

public T join()
       throws InterruptedException,
              Exception
Synchronously waits until this Deferred is called back.

This helps do synchronous operations using an asynchronous API. If this Deferred already completed, this method returns (or throws) immediately. Otherwise, the current thread will be blocked and will wait until the Deferred is called back.

Returns:
The deferred result, at this point in the callback chain.
Throws:
InterruptedException - if this thread was interrupted before the deferred result became available.
Exception - if the deferred result is an exception, this exception will be thrown.

joinUninterruptibly

public T joinUninterruptibly()
                      throws Exception
Synchronously waits until this Deferred is called back.

This helps do synchronous operations using an asynchronous API. If this Deferred already completed, this method returns (or throws) immediately. Otherwise, the current thread will be blocked and will wait until the Deferred is called back. If the current thread gets interrupted while waiting, it will keep waiting anyway until the callback chain terminates, before returning (or throwing an exception) the interrupted status on the thread will be set again.

Returns:
The deferred result, at this point in the callback chain.
Throws:
Exception - if the deferred result is an exception, this exception will be thrown.

toString

public String toString()
Returns a helpful string representation of this Deferred.

The string returned is built in O(N) where N is the number of callbacks currently in the chain. The string isn't built entirely atomically, so it can appear to show this Deferred in a slightly inconsistent state.

This method is not cheap. Avoid doing:

Deferred<Foo> d = ..;
 LOG.debug("Got " + d);
 
The overhead of stringifying the Deferred can be significant, especially if this is in the fast-path of your application.

Overrides:
toString in class Object