|
||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||
java.lang.Objectcom.stumbleupon.async.Deferred<T>
T - The type of the deferred result.public final class Deferred<T>
A thread-safe implementation of a deferred result for easy asynchronous processing.
This implementation is based on Twisted's Python Deferred API.
Deferred offers:
Deferred is like a Future with
a dynamic Callback chain associated to it.Deferred, the next callback in
the chain doesn't get executed until that other Deferred result
becomes available.Callback that handles
errors is called an "errback".Deferred is an important building block for writing easy-to-use
asynchronous APIs in a thread-safe fashion.DeferredDeferred represents a result that's not yet
available. An asynchronous operation (I/O, RPC, whatever) has been started
and will hand its result (be it successful or not) to the Deferred
in the future. The key difference between a Deferred and a
Future is that a Deferred has
a callback chain associated to it, whereas with just a
Future you need get the result manually at some point, which poses
problems such as: How do you know when the result is available? What if
the result itself depends on another future?
When you start an asynchronous operation, you typically want to be called back when the operation completes. If the operation was successful, you want your callback to use its result to carry on what you were doing at the time you started the asynchronous operation. If there was an error, you want to trigger some error handling code.
But there's more to a Deferred than a single callback. You can add
arbitrary number of callbacks, which effectively allows you to easily build
complex processing pipelines in a really simple and elegant way.
get method in your library, you want to retrieve some piece of data from
your remote service and hand it back to the user, but you want to do so in
an asynchronous fashion.
When the user of your client library invokes get, you assemble a
request and send it out to the remote server through a socket. Before
sending it to the socket, you create a Deferred and you store it
somewhere, for example in a map, to keep an association between the request
and this Deferred. You then return this Deferred to the
user, this is how they will access the deferred result as soon as the RPC
completes.
Sooner or later, the RPC will complete (successfully or not), and your
socket will become readable (or maybe closed, in the event of a failure).
Let's assume for now that everything works as expected, and thus the socket
is readable, so you read the response from the socket. At this point you
extract the result of the remote get call, and you hand it out to
the Deferred you created for this request (remember, you had to
store it somewhere, so you could give it the deferred result once you have
it). The Deferred then stores this result and triggers any
callback that may have been added to it. The expectation is that the user
of your client library, after calling your get method, will add a
Callback to the Deferred you gave them. This way, when the
deferred result becomes available, you'll call it with the result in
argument.
So far what we've explained is nothing more than a Future with a
callback associated to it. But there's more to Deferred than just
this. Let's assume now that someone else wants to build a caching layer on
top of your client library, to avoid repeatedly getting the same
value over and over again through the network. Users who want to use the
cache will invoke get on the caching library instead of directly
calling your client library.
Let's assume that the caching library already has a result cached for a
get call. It will create a Deferred, and immediately hand
it the cached result, and it will return this Deferred to the user.
The user will add a Callback to it, which will be immediately
invoked since the deferred result is already available. So the entire
get call completed virtually instantaneously and entirely from the
same thread. There was no context switch (no other thread involved, no
I/O and whatnot), nothing ever blocked, everything just happened really
quickly.
Now let's assume that the caching library has a cache miss and needs to do
a remote get call using the original client library described
earlier. The RPC is sent out to the remote server and the client library
returns a Deferred to the caching library. This is where things
become exciting. The caching library can then add its own callback to the
Deferred before returning it to the user. This callback will take
the result that came back from the remote server, add it to the cache and
return it. As usual, the user then adds their own callback to process the
result. So now the Deferred has 2 callbacks associated to it:
1st callback 2nd callback
Deferred: add to cache --> user callback
When the RPC completes, the original client library will de-serialize the
result from the wire and hand it out to the Deferred. The first
callback will be invoked, which will add the result to the cache of the
caching library. Then whatever the first callback returns will be passed
on to the second callback. It turns out that the caching callback returns
the get response unchanged, so that will be passed on to the user
callback.
Now it's very important to understand that the first callback could have
returned another arbitrary value, and that's what would have been passed
to the second callback. This may sound weird at first but it's actually
the key behind Deferred.
To illustrate why, let's complicate things a bit more. Let's assume the
remote service that serves those get requests is a fairly simple
and low-level storage service (think memcached), so it only works
with byte arrays, it doesn't care what the contents is. So the original
client library is only de-serializing the byte array from the network and
handing that byte array to the Deferred.
Now you're writing a higher-level library that uses this storage system
to store some of your custom objects. So when you get the byte array from
the server, you need to further de-serialize it into some kind of an object.
Users of your higher-level library don't care about what kind of remote
storage system you use, the only thing they care about is getting
those objects asynchronously. Your higher-level library is built on top
of the original low-level library that does the RPC communication.
When the users of the higher-level library call get, you call
get on the lower-level library, which issues an RPC call and
returns a Deferred to the higher-level library. The higher-level
library then adds a first callback to further de-serialize the byte array
into an object. Then the user of the higher-level library adds their own
callback that does something with that object. So now we have something
that looks like this:
1st callback 2nd callback
Deferred: de-serialize to an object --> user callback
When the result comes in from the network, the byte array is de-serialized from the socket. The first callback is invoked and its argument is the initial result, the byte array. So the first callback further de-serializes it into some object that it returns. The second callback is then invoked and its argument is the result of the previous callback, that is the de-serialized object.
Now back to the caching library, which has nothing to do with the higher
level library. All it does is, given an object that implements some
interface with a get method, it keeps a map of whatever arguments
get receives to an Object that was cached for this
particular get call. Thanks to the way the callback chain works,
it's possible to use the caching library together with the higher-level
library transparently. Users who want to use caching simply need to
use the caching library together with the higher level library. Now when
they call get on the caching library, and there's a cache miss,
here's what happens, step by step:
get on the higher-level
library.get on the lower-level
library.Deferred, issues out the
RPC call and returns its Deferred.Deferred and returns it.Deferred and returns it.Deferred and adds their own callback
to do something with the object retrieved from the data store.
1st callback 2nd callback 3rd callback
Deferred: de-serialize --> add to cache --> user callback
result: (none available)
Once the response comes back, the first callback is invoked, it
de-serializes the object, returns it. The current result of the
Deferred becomes the de-serialized object. The current state of
the Deferred is as follows:
2nd callback 3rd callback
Deferred: add to cache --> user callback
result: de-serialized object
Because there are more callbacks in the chain, the Deferred invokes
the next one and gives it the current result (the de-serialized object) in
argument. The callback adds that object to its cache and returns it
unchanged.
3rd callback
Deferred: user callback
result: de-serialized object
Finally, the user's callback is invoked with the object in argument.
Deferred: (no more callbacks) result: (whatever the user's callback returned)If you think this is becoming interesting, read on, you haven't reached the most interesting thing about
Deferred yet.
Deferredget calls is a
distributed service that runs on many machines. The data is partitioned
over many nodes and moves around as nodes come and go (due to machine
failures and whatnot). In order to execute a get call, the
low-level client library first needs to know which server is currently
serving that piece of data. Let's assume that there's another server,
which is part of that distributed service, that maintains an index and
keeps track of where each piece of data is. The low-level client library
first needs to lookup the location of the data using that first server
(that's a first RPC), then retrieves it from the storage node (that's
another RPC). End users don't care that retrieving data involves a 2-step
process, they just want to call get and be called back when the
data (a byte array) is available.
This is where what's probably the most useful feature of Deferred
comes in. When the user calls get, the low-level library will issue
a first RPC to the index server to locate the piece of data requested by the
user. When issuing this lookup RPC, a Deferred gets
created. The low-level get code adds a first callback to process
the lookup response and then returns it to the user.
1st callback 2nd callback
Deferred: index lookup --> user callback
result: (none available)
Eventually, the lookup RPC completes, and the Deferred is
given the lookup response. So before triggering the first
callback, the Deferred will be in this state:
1st callback 2nd callback
Deferred: index lookup --> user callback
result: lookup response
The first callback runs and now knows where to find the piece of data
initially requested. It issues the get request to the right storage
node. Doing so creates another Deferred, let's call it
(B), which is then returned by the index lookup callback.
And this is where the magic happens. Now we're in this state:
(A) 2nd callback | (B)
|
Deferred: user callback | Deferred: (no more callbacks)
result: Deferred (B) | result: (none available)
Because a callback returned a Deferred, we can't invoke the user
callback just yet, since the user doesn't want their callback receive a
Deferred, they want it to receive a byte array. The current
callback gets paused and stops processing the callback chain.
This callback chain needs to be resumed whenever the Deferred of
the get call [(B)] completes. In order to achieve that, a
callback is added to that other Deferred that will resume
the execution of the callback chain.
(A) 2nd callback | (B) 1st callback
|
Deferred: user callback | Deferred: resume (A)
result: Deferred (B) | result: (none available)
Once (A) added the callback on (B), it can return
immediately, there's no need to wait, block a thread or anything like that.
So the whole process of receiving the lookup response and sending
out the get RPC happened really quickly, without blocking anything.
Now when the get response comes back from the network, the RPC
layer de-serializes the byte array, as usual, and hands it to (B):
(A) 2nd callback | (B) 1st callback
|
Deferred: user callback | Deferred: resume (A)
result: Deferred (B) | result: byte array
(B)'s first and only callback is going to set the result of
(A) and resume (A)'s callback chain.
(A) 2nd callback | (B) 1st callback
|
Deferred: user callback | Deferred: resume (A)
result: byte array | result: byte array
So now (A) resumes its callback chain, and invokes the user's
callback with the byte array in argument, which is what they wanted.
(A) | (B) 1st callback
|
Deferred: (no more cb) | Deferred: resume (A)
result: (return value of | result: byte array
the user's cb)
Then (B) moves on to its next callback in the chain, but there are
none, so (B) is done too.
(A) | (B)
|
Deferred: (no more cb) | Deferred: (no more cb)
result: (return value of | result: byte array
the user's cb)
The whole process of reading the get response, resuming the initial
Deferred and executing the second Deferred happened all in
the same thread, sequentially, and without blocking anything (provided that
the user's callback didn't block, as it must not).
What we've done is essentially equivalent to dynamically building an
implicit finite state machine to handle the life cycle of the get
request. This simple API allows you to build arbitrarily complex
processing pipelines that make dynamic decisions at each stage of the
pipeline as to what to do next.
Deferred has in fact not one but two callback chains. The first
chain is the "normal" processing chain, and the second is the error
handling chain. Twisted calls an error handling callback an "errback", so
we've kept that term here. When the asynchronous processing completes with
an error, the Deferred must be given the Exception that was
caught instead of giving it the result (or if no Exception was
caught, one must be created and handed to the Deferred). When the
current result of a Deferred is an instance of Exception,
the next errback is invoked. As for normal callbacks, whatever the errback
returns becomes the current result. If the current result is still an
instance of Exception, the next errback is invoked. If the current
result is no longer an Exception, the next callback is invoked.
When a callback or an errback itself throws an exception, it is
caught by the Deferred and becomes the current result, which means
that the next errback in the chain will be invoked with that exception in
argument. Note that Deferred will only catch Exceptions,
not any Throwable or Error.
Deferred can receive only one initial result.Deferred. This class does not
create or manage any thread or executor.Deferred will lose its
reference to it.Deferred does so in
O(1).Deferred cannot receive itself as an initial or
intermediate result, as this would cause an infinite recursion.Deferreds,
as this would cause an infinite recursion (thankfully, it will
quickly fail with a StackOverflowError).Deferred in
argument. This is because they always receive the result of a
previous callback, and when the result becomes a Deferred,
we suspend the execution of the callback chain until the result of
that other Deferred is available.Exception in argument. This
because they're always given to the errbacks.Deferred can lead to a deadlock, so
don't use it. In other words, writing
synchronized (some_deferred) { ... }
(or anything equivalent) voids your warranty.
| Constructor Summary | |
|---|---|
Deferred()
Constructor. |
|
| Method Summary | ||
|---|---|---|
|
addBoth(Callback<R,T> cb)
Registers a callback both as a callback and as an "errback". |
|
|
addBothDeferring(Callback<D,T> cb)
Registers a callback both as a callback and as an "errback". |
|
|
addCallback(Callback<R,T> cb)
Registers a callback. |
|
|
addCallbackDeferring(Callback<D,T> cb)
Registers a callback. |
|
|
addCallbacks(Callback<R,T> cb,
Callback<R2,E> eb)
Registers a callback and an "errback". |
|
|
addErrback(Callback<R,E> eb)
Registers an "errback". |
|
void |
callback(Object initresult)
Starts running the callback chain. |
|
Deferred<T> |
chain(Deferred<T> other)
Chains another Deferred to this one. |
|
static
|
fromError(Exception error)
Constructs a Deferred with an error that's readily available. |
|
static
|
fromResult(T result)
Constructs a Deferred with a result that's readily available. |
|
static
|
group(Collection<Deferred<T>> deferreds)
Groups multiple Deferreds together in a single one. |
|
static
|
group(Deferred<T> d1,
Deferred<T> d2)
Groups two Deferreds together in a single one. |
|
static
|
group(Deferred<T> d1,
Deferred<T> d2,
Deferred<T> d3)
Groups three Deferreds together in a single one. |
|
T |
join()
Synchronously waits until this Deferred is called back. |
|
T |
joinUninterruptibly()
Synchronously waits until this Deferred is called back. |
|
String |
toString()
Returns a helpful string representation of this Deferred. |
|
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait |
| Constructor Detail |
|---|
public Deferred()
| Method Detail |
|---|
public static <T> Deferred<T> fromResult(T result)
Deferred with a result that's readily available.
This is equivalent to writing:
Deferred<T> d = new Deferred<T>(); d.callback(result);Callbacks added to this
Deferred will be immediately called.
result - The "deferred" result.
Deferred.public static <T> Deferred<T> fromError(Exception error)
Deferred with an error that's readily available.
This is equivalent to writing:
Deferred<T> d = new Deferred<T>(); d.callback(error);Errbacks added to this
Deferred will be immediately called.
error - The error to use as a result.
public <R,R2,E> Deferred<R> addCallbacks(Callback<R,T> cb,
Callback<R2,E> eb)
If the deferred result is already available, the callback or the errback (depending on the nature of the result) is executed immediately from this thread.
cb - The callback to register.eb - Th errback to register.
this with an "updated" type.
StackOverflowError - if there are too many callbacks in this chain.
The maximum number of callbacks allowed in a chain is set by the
implementation. The limit is high enough that you shouldn't have to worry
about this exception (which is why it's an Error actually). If
you hit it, you probably did something wrong.public <R> Deferred<R> addCallback(Callback<R,T> cb)
If the deferred result is already available and isn't an exception, the
callback is executed immediately from this thread.
If the deferred result is already available and is an exception, the
callback is discarded.
If the deferred result is not available, this callback is queued and will
be invoked from whichever thread gives this deferred its initial result
by calling callback(java.lang.Object).
cb - The callback to register.
this with an "updated" type.public <R,D extends Deferred<R>> Deferred<R> addCallbackDeferring(Callback<D,T> cb)
This has the exact same effect as addCallback(com.stumbleupon.async.Callback, but keeps the type
information "correct" when the callback to add returns a Deferred.
cb - The callback to register.
this with an "updated" type.public <R,E> Deferred<T> addErrback(Callback<R,E> eb)
If the deferred result is already available and is an exception, the
errback is executed immediately from this thread.
If the deferred result is already available and isn't an exception, the
errback is discarded.
If the deferred result is not available, this errback is queued and will
be invoked from whichever thread gives this deferred its initial result
by calling callback(java.lang.Object).
eb - The errback to register.
this with an "updated" type.public <R> Deferred<R> addBoth(Callback<R,T> cb)
If the deferred result is already available, the callback is executed
immediately from this thread (regardless of whether or not the current
result is an exception).
If the deferred result is not available, this callback is queued and will
be invoked from whichever thread gives this deferred its initial result
by calling callback(java.lang.Object).
cb - The callback to register.
this with an "updated" type.public <R,D extends Deferred<R>> Deferred<R> addBothDeferring(Callback<D,T> cb)
This has the exact same effect as addBoth(com.stumbleupon.async.Callback, but keeps the type
information "correct" when the callback to add returns a Deferred.
cb - The callback to register.
this with an "updated" type.public Deferred<T> chain(Deferred<T> other)
Deferred to this one.
This method simply ensures that whenever the callback chain in
this is run, then the callback chain in other
gets run too. The result handed to other is whatever
result this currently has.
One case where this is particularly useful is when you want to multiplex
a result to multiple Deferreds, that is if multiple "listeners"
want to have their callback chain triggered when a single event completes:
public class ResultMultiplexer {
private Deferred<Foo> listeners = new Deferred<Foo>();
public void addListener(Deferred<Foo> d) {
listeners.chain(d);
}
public void emitResult(Foo event) {
listeners.callback(event);
// Remember that a Deferred is a one-time callback chain.
// Once emitResult is called, everyone interested in getting the
// next event would need to call addListener again. This isn't a
// pub-sub system, it's just showing how to multiplex a result.
listeners = new Deferred<Foo>();
}
}
other - The Deferred to chain to this one.
this, always.public static <T> Deferred<ArrayList<Object>> group(Collection<Deferred<T>> deferreds)
Deferreds together in a single one.
Conceptually, this does the opposite of chain(com.stumbleupon.async.Deferred, in the sense that
it demultiplexes multiple Deferreds into a single one, so that you
can easily take an action once all the Deferreds in the group have
been called back.
public class ResultDemultiplexer {
private ArrayList<Deferred<Foo>> deferreds = new ArrayList<Deferred<Foo>>();
public void addDeferred(final Deferred<Foo> d) {
deferreds.add(d);
}
public Deferred<ArrayList<Object>> demultiplex() {
Deferred<ArrayList<Object>> demultiplexed = Deferred.group(deferreds);
deferreds.clear();
return demultiplexed;
}
}
In the example above, any number of Deferred can be added to
the demultiplexer, and once demultiplex() is invoked, the
Deferred returned will be invoked once all the Deferreds
added to the demultiplexer have been called back.
deferreds - All the Deferreds to group together.
Deferred that will be called back once all the
Deferreds given in argument have been called back. Each element
in the list will be either of type <T> or an Exception.
If any of the elements in the list is an Exception,
the errback of the Deferred returned will be invoked
with a DeferredGroupException in argument.
There's no guarantee on the order of the results in the deferred list
returned, it depends on the order in which the Deferreds in the
group complete.
public static <T> Deferred<ArrayList<Object>> group(Deferred<T> d1,
Deferred<T> d2)
Deferreds together in a single one.
This is semantically equivalent to:
except that it's type safe as it doesn't involve an unchecked generic array creation of typegroup(Arrays.asList(d1, d2));
Deferred<T>
for the varargs parameter passed to asList.
d1 - The first Deferred to put in the group.d2 - The second Deferred to put in the group.
Deferred that will be called back once both
Deferreds given in argument have been called back.group(Collection)
public static <T> Deferred<ArrayList<Object>> group(Deferred<T> d1,
Deferred<T> d2,
Deferred<T> d3)
Deferreds together in a single one.
This is semantically equivalent to:
except that it's type safe as it doesn't involve an unchecked generic array creation of typegroup(Arrays.asList(d1, d2, d3));
Deferred<T>
for the varargs parameter passed to asList.
d1 - The first Deferred to put in the group.d2 - The second Deferred to put in the group.d3 - The third Deferred to put in the group.
Deferred that will be called back once all three
Deferreds given in argument have been called back.group(Collection)public void callback(Object initresult)
This posts the initial result that will be passed to the first callback
in the callback chain. If the argument is an Exception then
the "errback" chain will be triggered instead.
This method will not let any Exception thrown by a callback
propagate. You shouldn't try to catch any RuntimeException when
you call this method, as this is unnecessary.
initresult - The initial result with which to start the 1st callback.
The following must be true:
initresult instanceof T || initresult instanceof Exception
AssertionError - if this method was already called on this instance.
AssertionError - if initresult == this.
public T join()
throws InterruptedException,
Exception
This helps do synchronous operations using an asynchronous API. If this Deferred already completed, this method returns (or throws) immediately. Otherwise, the current thread will be blocked and will wait until the Deferred is called back.
InterruptedException - if this thread was interrupted before the
deferred result became available.
Exception - if the deferred result is an exception, this exception
will be thrown.
public T joinUninterruptibly()
throws Exception
This helps do synchronous operations using an asynchronous API. If this Deferred already completed, this method returns (or throws) immediately. Otherwise, the current thread will be blocked and will wait until the Deferred is called back. If the current thread gets interrupted while waiting, it will keep waiting anyway until the callback chain terminates, before returning (or throwing an exception) the interrupted status on the thread will be set again.
Exception - if the deferred result is an exception, this exception
will be thrown.public String toString()
Deferred.
The string returned is built in O(N) where N is the
number of callbacks currently in the chain. The string isn't built
entirely atomically, so it can appear to show this Deferred
in a slightly inconsistent state.
This method is not cheap. Avoid doing:
Deferred<Foo> d = ..;
LOG.debug("Got " + d);
The overhead of stringifying the Deferred can be significant,
especially if this is in the fast-path of your application.
toString in class Object
|
||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||