T
- The type of the deferred result.public final class Deferred<T> extends Object
This implementation is based on Twisted's Python Deferred
API.
Deferred
offers:
Deferred
is like a Future
with
a dynamic Callback
chain associated to it.Deferred
, the next callback in
the chain doesn't get executed until that other Deferred
result
becomes available.Callback
that handles
errors is called an "errback".Deferred
is an important building block for writing easy-to-use
asynchronous APIs in a thread-safe fashion.Deferred
Deferred
represents a result that's not yet
available. An asynchronous operation (I/O, RPC, whatever) has been started
and will hand its result (be it successful or not) to the Deferred
in the future. The key difference between a Deferred
and a
Future
is that a Deferred
has
a callback chain associated to it, whereas with just a
Future
you need get the result manually at some point, which poses
problems such as: How do you know when the result is available? What if
the result itself depends on another future?
When you start an asynchronous operation, you typically want to be called back when the operation completes. If the operation was successful, you want your callback to use its result to carry on what you were doing at the time you started the asynchronous operation. If there was an error, you want to trigger some error handling code.
But there's more to a Deferred
than a single callback. You can add
arbitrary number of callbacks, which effectively allows you to easily build
complex processing pipelines in a really simple and elegant way.
get
method in your library, you want to retrieve some piece of data from
your remote service and hand it back to the user, but you want to do so in
an asynchronous fashion.
When the user of your client library invokes get
, you assemble a
request and send it out to the remote server through a socket. Before
sending it to the socket, you create a Deferred
and you store it
somewhere, for example in a map, to keep an association between the request
and this Deferred
. You then return this Deferred
to the
user, this is how they will access the deferred result as soon as the RPC
completes.
Sooner or later, the RPC will complete (successfully or not), and your
socket will become readable (or maybe closed, in the event of a failure).
Let's assume for now that everything works as expected, and thus the socket
is readable, so you read the response from the socket. At this point you
extract the result of the remote get
call, and you hand it out to
the Deferred
you created for this request (remember, you had to
store it somewhere, so you could give it the deferred result once you have
it). The Deferred
then stores this result and triggers any
callback that may have been added to it. The expectation is that the user
of your client library, after calling your get
method, will add a
Callback
to the Deferred
you gave them. This way, when the
deferred result becomes available, you'll call it with the result in
argument.
So far what we've explained is nothing more than a Future
with a
callback associated to it. But there's more to Deferred
than just
this. Let's assume now that someone else wants to build a caching layer on
top of your client library, to avoid repeatedly get
ting the same
value over and over again through the network. Users who want to use the
cache will invoke get
on the caching library instead of directly
calling your client library.
Let's assume that the caching library already has a result cached for a
get
call. It will create a Deferred
, and immediately hand
it the cached result, and it will return this Deferred
to the user.
The user will add a Callback
to it, which will be immediately
invoked since the deferred result is already available. So the entire
get
call completed virtually instantaneously and entirely from the
same thread. There was no context switch (no other thread involved, no
I/O and whatnot), nothing ever blocked, everything just happened really
quickly.
Now let's assume that the caching library has a cache miss and needs to do
a remote get
call using the original client library described
earlier. The RPC is sent out to the remote server and the client library
returns a Deferred
to the caching library. This is where things
become exciting. The caching library can then add its own callback to the
Deferred
before returning it to the user. This callback will take
the result that came back from the remote server, add it to the cache and
return it. As usual, the user then adds their own callback to process the
result. So now the Deferred
has 2 callbacks associated to it:
1st callback 2nd callback Deferred: add to cache --> user callbackWhen the RPC completes, the original client library will de-serialize the result from the wire and hand it out to the
Deferred
. The first
callback will be invoked, which will add the result to the cache of the
caching library. Then whatever the first callback returns will be passed
on to the second callback. It turns out that the caching callback returns
the get
response unchanged, so that will be passed on to the user
callback.
Now it's very important to understand that the first callback could have
returned another arbitrary value, and that's what would have been passed
to the second callback. This may sound weird at first but it's actually
the key behind Deferred
.
To illustrate why, let's complicate things a bit more. Let's assume the
remote service that serves those get
requests is a fairly simple
and low-level storage service (think memcached
), so it only works
with byte arrays, it doesn't care what the contents is. So the original
client library is only de-serializing the byte array from the network and
handing that byte array to the Deferred
.
Now you're writing a higher-level library that uses this storage system
to store some of your custom objects. So when you get the byte array from
the server, you need to further de-serialize it into some kind of an object.
Users of your higher-level library don't care about what kind of remote
storage system you use, the only thing they care about is get
ting
those objects asynchronously. Your higher-level library is built on top
of the original low-level library that does the RPC communication.
When the users of the higher-level library call get
, you call
get
on the lower-level library, which issues an RPC call and
returns a Deferred
to the higher-level library. The higher-level
library then adds a first callback to further de-serialize the byte array
into an object. Then the user of the higher-level library adds their own
callback that does something with that object. So now we have something
that looks like this:
1st callback 2nd callback Deferred: de-serialize to an object --> user callback
When the result comes in from the network, the byte array is de-serialized from the socket. The first callback is invoked and its argument is the initial result, the byte array. So the first callback further de-serializes it into some object that it returns. The second callback is then invoked and its argument is the result of the previous callback, that is the de-serialized object.
Now back to the caching library, which has nothing to do with the higher
level library. All it does is, given an object that implements some
interface with a get
method, it keeps a map of whatever arguments
get
receives to an Object
that was cached for this
particular get
call. Thanks to the way the callback chain works,
it's possible to use the caching library together with the higher-level
library transparently. Users who want to use caching simply need to
use the caching library together with the higher level library. Now when
they call get
on the caching library, and there's a cache miss,
here's what happens, step by step:
get
on the higher-level
library.get
on the lower-level
library.Deferred
, issues out the
RPC call and returns its Deferred
.Deferred
and returns it.Deferred
and returns it.Deferred
and adds their own callback
to do something with the object retrieved from the data store.1st callback 2nd callback 3rd callback Deferred: de-serialize --> add to cache --> user callback result: (none available)Once the response comes back, the first callback is invoked, it de-serializes the object, returns it. The current result of the
Deferred
becomes the de-serialized object. The current state of
the Deferred
is as follows:
2nd callback 3rd callback Deferred: add to cache --> user callback result: de-serialized objectBecause there are more callbacks in the chain, the
Deferred
invokes
the next one and gives it the current result (the de-serialized object) in
argument. The callback adds that object to its cache and returns it
unchanged.
3rd callback Deferred: user callback result: de-serialized objectFinally, the user's callback is invoked with the object in argument.
Deferred: (no more callbacks) result: (whatever the user's callback returned)If you think this is becoming interesting, read on, you haven't reached the most interesting thing about
Deferred
yet.
Deferred
get
calls is a
distributed service that runs on many machines. The data is partitioned
over many nodes and moves around as nodes come and go (due to machine
failures and whatnot). In order to execute a get
call, the
low-level client library first needs to know which server is currently
serving that piece of data. Let's assume that there's another server,
which is part of that distributed service, that maintains an index and
keeps track of where each piece of data is. The low-level client library
first needs to lookup the location of the data using that first server
(that's a first RPC), then retrieves it from the storage node (that's
another RPC). End users don't care that retrieving data involves a 2-step
process, they just want to call get
and be called back when the
data (a byte array) is available.
This is where what's probably the most useful feature of Deferred
comes in. When the user calls get
, the low-level library will issue
a first RPC to the index server to locate the piece of data requested by the
user. When issuing this lookup
RPC, a Deferred
gets
created. The low-level get
code adds a first callback to process
the lookup
response and then returns it to the user.
1st callback 2nd callback Deferred: index lookup --> user callback result: (none available)Eventually, the
lookup
RPC completes, and the Deferred
is
given the lookup
response. So before triggering the first
callback, the Deferred
will be in this state:
1st callback 2nd callback Deferred: index lookup --> user callback result: lookup responseThe first callback runs and now knows where to find the piece of data initially requested. It issues the
get
request to the right storage
node. Doing so creates another Deferred
, let's call it
(B)
, which is then returned by the index lookup
callback.
And this is where the magic happens. Now we're in this state:
(A) 2nd callback | (B) | Deferred: user callback | Deferred: (no more callbacks) result: Deferred (B) | result: (none available)Because a callback returned a
Deferred
, we can't invoke the user
callback just yet, since the user doesn't want their callback receive a
Deferred
, they want it to receive a byte array. The current
callback gets paused and stops processing the callback chain.
This callback chain needs to be resumed whenever the Deferred
of
the get
call [(B)
] completes. In order to achieve that, a
callback is added to that other Deferred
that will resume
the execution of the callback chain.
(A) 2nd callback | (B) 1st callback | Deferred: user callback | Deferred: resume (A) result: Deferred (B) | result: (none available)Once
(A)
added the callback on (B)
, it can return
immediately, there's no need to wait, block a thread or anything like that.
So the whole process of receiving the lookup
response and sending
out the get
RPC happened really quickly, without blocking anything.
Now when the get
response comes back from the network, the RPC
layer de-serializes the byte array, as usual, and hands it to (B)
:
(A) 2nd callback | (B) 1st callback | Deferred: user callback | Deferred: resume (A) result: Deferred (B) | result: byte array
(B)
's first and only callback is going to set the result of
(A)
and resume (A)
's callback chain.
(A) 2nd callback | (B) 1st callback | Deferred: user callback | Deferred: resume (A) result: byte array | result: byte arraySo now
(A)
resumes its callback chain, and invokes the user's
callback with the byte array in argument, which is what they wanted.
(A) | (B) 1st callback | Deferred: (no more cb) | Deferred: resume (A) result: (return value of | result: byte array the user's cb)Then
(B)
moves on to its next callback in the chain, but there are
none, so (B)
is done too.
(A) | (B) | Deferred: (no more cb) | Deferred: (no more cb) result: (return value of | result: byte array the user's cb)The whole process of reading the
get
response, resuming the initial
Deferred
and executing the second Deferred
happened all in
the same thread, sequentially, and without blocking anything (provided that
the user's callback didn't block, as it must not).
What we've done is essentially equivalent to dynamically building an
implicit finite state machine to handle the life cycle of the get
request. This simple API allows you to build arbitrarily complex
processing pipelines that make dynamic decisions at each stage of the
pipeline as to what to do next.
Deferred
has in fact not one but two callback chains. The first
chain is the "normal" processing chain, and the second is the error
handling chain. Twisted calls an error handling callback an "errback", so
we've kept that term here. When the asynchronous processing completes with
an error, the Deferred
must be given the Exception
that was
caught instead of giving it the result (or if no Exception
was
caught, one must be created and handed to the Deferred
). When the
current result of a Deferred
is an instance of Exception
,
the next errback is invoked. As for normal callbacks, whatever the errback
returns becomes the current result. If the current result is still an
instance of Exception
, the next errback is invoked. If the current
result is no longer an Exception
, the next callback is invoked.
When a callback or an errback itself throws an exception, it is
caught by the Deferred
and becomes the current result, which means
that the next errback in the chain will be invoked with that exception in
argument. Note that Deferred
will only catch Exception
s,
not any Throwable
or Error
.
Deferred
can receive only one initial result.Deferred
. This class does not
create or manage any thread or executor.Deferred
will lose its
reference to it.Deferred
does so in
O(1)
.Deferred
cannot receive itself as an initial or
intermediate result, as this would cause an infinite recursion.Deferred
s,
as this would cause an infinite recursion (thankfully, it will
quickly fail with a CallbackOverflowError
).Deferred
in
argument. This is because they always receive the result of a
previous callback, and when the result becomes a Deferred
,
we suspend the execution of the callback chain until the result of
that other Deferred
is available.Exception
in argument. This
because they're always given to the errbacks.Deferred
can lead to a deadlock, so
don't use it. In other words, writing
synchronized (some_deferred) { ... }(or anything equivalent) voids your warranty.
Constructor and Description |
---|
Deferred()
Constructor.
|
Modifier and Type | Method and Description |
---|---|
<R> Deferred<R> |
addBoth(Callback<R,T> cb)
Registers a callback both as a callback and as an "errback".
|
<R,D extends Deferred<R>> |
addBothDeferring(Callback<D,T> cb)
Registers a callback both as a callback and as an "errback".
|
<R> Deferred<R> |
addCallback(Callback<R,T> cb)
Registers a callback.
|
<R,D extends Deferred<R>> |
addCallbackDeferring(Callback<D,T> cb)
Registers a callback.
|
<R,R2,E> Deferred<R> |
addCallbacks(Callback<R,T> cb,
Callback<R2,E> eb)
Registers a callback and an "errback".
|
<R,E> Deferred<T> |
addErrback(Callback<R,E> eb)
Registers an "errback".
|
void |
callback(Object initresult)
Starts running the callback chain.
|
Deferred<T> |
chain(Deferred<T> other)
Chains another
Deferred to this one. |
static <T> Deferred<T> |
fromError(Exception error)
Constructs a
Deferred with an error that's readily available. |
static <T> Deferred<T> |
fromResult(T result)
Constructs a
Deferred with a result that's readily available. |
static <T> Deferred<ArrayList<T>> |
group(Collection<Deferred<T>> deferreds)
Groups multiple
Deferred s together in a single one. |
static <T> Deferred<ArrayList<T>> |
group(Deferred<T> d1,
Deferred<T> d2)
Groups two
Deferred s together in a single one. |
static <T> Deferred<ArrayList<T>> |
group(Deferred<T> d1,
Deferred<T> d2,
Deferred<T> d3)
Groups three
Deferred s together in a single one. |
static <T> Deferred<ArrayList<T>> |
groupInOrder(Collection<Deferred<T>> deferreds)
Groups multiple
Deferred s together in a single one. |
T |
join()
Synchronously waits until this Deferred is called back.
|
T |
join(long timeout)
Synchronously waits until this Deferred is called back or a timeout occurs.
|
T |
joinUninterruptibly()
Synchronously waits until this Deferred is called back.
|
T |
joinUninterruptibly(long timeout)
Synchronously waits until this Deferred is called back or a timeout occurs.
|
String |
toString()
Returns a helpful string representation of this
Deferred . |
public static <T> Deferred<T> fromResult(T result)
Deferred
with a result that's readily available.
This is equivalent to writing:
Deferred<T> d = new Deferred<T>(); d.callback(result);Callbacks added to this
Deferred
will be immediately called.result
- The "deferred" result.Deferred
.public static <T> Deferred<T> fromError(Exception error)
Deferred
with an error that's readily available.
This is equivalent to writing:
Deferred<T> d = new Deferred<T>(); d.callback(error);Errbacks added to this
Deferred
will be immediately called.error
- The error to use as a result.public <R,R2,E> Deferred<R> addCallbacks(Callback<R,T> cb, Callback<R2,E> eb)
If the deferred result is already available, the callback or the errback (depending on the nature of the result) is executed immediately from this thread.
cb
- The callback to register.eb
- Th errback to register.this
with an "updated" type.CallbackOverflowError
- if there are too many callbacks in this chain.
The maximum number of callbacks allowed in a chain is set by the
implementation. The limit is high enough that you shouldn't have to worry
about this exception (which is why it's an Error
actually). If
you hit it, you probably did something wrong.public <R> Deferred<R> addCallback(Callback<R,T> cb)
If the deferred result is already available and isn't an exception, the
callback is executed immediately from this thread.
If the deferred result is already available and is an exception, the
callback is discarded.
If the deferred result is not available, this callback is queued and will
be invoked from whichever thread gives this deferred its initial result
by calling callback(java.lang.Object)
.
cb
- The callback to register.this
with an "updated" type.public <R,D extends Deferred<R>> Deferred<R> addCallbackDeferring(Callback<D,T> cb)
This has the exact same effect as addCallback(com.stumbleupon.async.Callback<R, T>)
, but keeps the type
information "correct" when the callback to add returns a Deferred
.
cb
- The callback to register.this
with an "updated" type.public <R,E> Deferred<T> addErrback(Callback<R,E> eb)
If the deferred result is already available and is an exception, the
errback is executed immediately from this thread.
If the deferred result is already available and isn't an exception, the
errback is discarded.
If the deferred result is not available, this errback is queued and will
be invoked from whichever thread gives this deferred its initial result
by calling callback(java.lang.Object)
.
eb
- The errback to register.this
with an "updated" type.public <R> Deferred<R> addBoth(Callback<R,T> cb)
If the deferred result is already available, the callback is executed
immediately from this thread (regardless of whether or not the current
result is an exception).
If the deferred result is not available, this callback is queued and will
be invoked from whichever thread gives this deferred its initial result
by calling callback(java.lang.Object)
.
cb
- The callback to register.this
with an "updated" type.public <R,D extends Deferred<R>> Deferred<R> addBothDeferring(Callback<D,T> cb)
This has the exact same effect as addBoth(com.stumbleupon.async.Callback<R, T>)
, but keeps the type
information "correct" when the callback to add returns a Deferred
.
cb
- The callback to register.this
with an "updated" type.public Deferred<T> chain(Deferred<T> other)
Deferred
to this one.
This method simply ensures that whenever the callback chain in
this
is run, then the callback chain in other
gets run too. The result handed to other
is whatever
result this
currently has.
One case where this is particularly useful is when you want to multiplex
a result to multiple Deferred
s, that is if multiple "listeners"
want to have their callback chain triggered when a single event completes:
public class ResultMultiplexer { private Deferred<Foo> listeners = new Deferred<Foo>(); public void addListener(Deferred<Foo> d) { listeners.chain(d); } public void emitResult(Foo event) { listeners.callback(event); // Remember that a Deferred is a one-time callback chain. // Once emitResult is called, everyone interested in getting the // next event would need to call addListener again. This isn't a // pub-sub system, it's just showing how to multiplex a result. listeners = new Deferred<Foo>(); } }
other
- The Deferred
to chain to this one.this
, always.AssertionError
- if this == other
.public static <T> Deferred<ArrayList<T>> group(Collection<Deferred<T>> deferreds)
Deferred
s together in a single one.
Conceptually, this does the opposite of chain(com.stumbleupon.async.Deferred<T>)
, in the sense that
it demultiplexes multiple Deferred
s into a single one, so that you
can easily take an action once all the Deferred
s in the group have
been called back.
public class ResultDemultiplexer { private ArrayList<Deferred<Foo>> deferreds = new ArrayList<Deferred<Foo>>(); public void addDeferred(final Deferred<Foo> d) { deferreds.add(d); } public Deferred<ArrayList<Object>> demultiplex() { Deferred<ArrayList<Object>> demultiplexed = Deferred.group(deferreds); deferreds.clear(); return demultiplexed; } }In the example above, any number of
Deferred
can be added to
the demultiplexer, and once demultiplex()
is invoked, the
Deferred
returned will be invoked once all the Deferred
s
added to the demultiplexer have been called back.deferreds
- All the Deferred
s to group together.Deferred
that will be called back once all the
Deferred
s given in argument have been called back. Each element
in the list will be either of type <T>
or an Exception
.
If any of the elements in the list is an Exception
,
the errback of the Deferred
returned will be invoked
with a DeferredGroupException
in argument.
There's no guarantee on the order of the results in the deferred list
returned, it depends on the order in which the Deferred
s in the
group complete. If you want to preserve the order, use
groupInOrder(Collection)
instead.
public static <T> Deferred<ArrayList<T>> groupInOrder(Collection<Deferred<T>> deferreds)
Deferred
s together in a single one.
This is the same thing as group(Collection)
except that we
guarantee we preserve the order of the Deferred
s.
deferreds
- All the Deferred
s to group together.Deferred
that will be called back once all the
Deferred
s given in argument have been called back.group(Collection)
public static <T> Deferred<ArrayList<T>> group(Deferred<T> d1, Deferred<T> d2)
Deferred
s together in a single one.
This is semantically equivalent to:
except that it's type safe as it doesn't involve an unchecked generic array creation of typegroup
(Arrays.asList
(d1, d2));
Deferred<T>
for the varargs parameter passed to asList
.d1
- The first Deferred
to put in the group.d2
- The second Deferred
to put in the group.Deferred
that will be called back once both
Deferred
s given in argument have been called back.group(Collection)
public static <T> Deferred<ArrayList<T>> group(Deferred<T> d1, Deferred<T> d2, Deferred<T> d3)
Deferred
s together in a single one.
This is semantically equivalent to:
except that it's type safe as it doesn't involve an unchecked generic array creation of typegroup
(Arrays.asList
(d1, d2, d3));
Deferred<T>
for the varargs parameter passed to asList
.d1
- The first Deferred
to put in the group.d2
- The second Deferred
to put in the group.d3
- The third Deferred
to put in the group.Deferred
that will be called back once all three
Deferred
s given in argument have been called back.group(Collection)
public void callback(Object initresult)
This posts the initial result that will be passed to the first callback
in the callback chain. If the argument is an Exception
then
the "errback" chain will be triggered instead.
This method will not let any Exception
thrown by a callback
propagate. You shouldn't try to catch any RuntimeException
when
you call this method, as this is unnecessary.
initresult
- The initial result with which to start the 1st callback.
The following must be true:
initresult instanceof T || initresult instanceof
Exception
AssertionError
- if this method was already called on this instance.AssertionError
- if initresult == this
.public T join() throws InterruptedException, Exception
This helps do synchronous operations using an asynchronous API. If this Deferred already completed, this method returns (or throws) immediately. Otherwise, the current thread will be blocked and will wait until the Deferred is called back.
InterruptedException
- if this thread was interrupted before the
deferred result became available.Exception
- if the deferred result is an exception, this exception
will be thrown.public T join(long timeout) throws InterruptedException, Exception
This helps do synchronous operations using an asynchronous API. If this Deferred already completed, this method returns (or throws) immediately. Otherwise, the current thread will be blocked and will wait until the Deferred is called back or the specified amount of time has elapsed.
timeout
- The maximum time to wait in milliseconds. A value of 0
means no timeout.InterruptedException
- if this thread was interrupted before the
deferred result became available.IllegalArgumentException
- If the value of timeout is negative.TimeoutException
- if there's a timeout.Exception
- if the deferred result is an exception, this exception
will be thrown.public T joinUninterruptibly() throws Exception
This helps do synchronous operations using an asynchronous API. If this Deferred already completed, this method returns (or throws) immediately. Otherwise, the current thread will be blocked and will wait until the Deferred is called back. If the current thread gets interrupted while waiting, it will keep waiting anyway until the callback chain terminates, before returning (or throwing an exception) the interrupted status on the thread will be set again.
Exception
- if the deferred result is an exception, this exception
will be thrown.public T joinUninterruptibly(long timeout) throws Exception
This helps do synchronous operations using an asynchronous API. If this Deferred already completed, this method returns (or throws) immediately. Otherwise, the current thread will be blocked and will wait until the Deferred is called back or the specified amount of time has elapsed. If the current thread gets interrupted while waiting, it will keep waiting anyway until the callback chain terminates, before returning (or throwing an exception) the interrupted status on the thread will be set again.
timeout
- The maximum time to wait in milliseconds. A value of 0
means no timeout.IllegalArgumentException
- If the value of timeout is negative.TimeoutException
- if there's a timeout.Exception
- if the deferred result is an exception, this exception
will be thrown.public String toString()
Deferred
.
The string returned is built in O(N)
where N
is the
number of callbacks currently in the chain. The string isn't built
entirely atomically, so it can appear to show this Deferred
in a slightly inconsistent state.
This method is not cheap. Avoid doing:
Deferred<Foo> d = ..; LOG.debug("Got " + d);The overhead of stringifying the
Deferred
can be significant,
especially if this is in the fast-path of your application.