Introduction §
Rust is breaking new ground with unboxed futures — trying to build an
async/await system that doesn’t mandate pointing into the heap.
As such, it’s not unusual to run into problems where what would be a
single Promise<T> interface type in other languages is actually a
slew of different implementation types in Rust. When this situation
occurs it’s tempting to stick them all in a box and be done with it.
Unfortunately, due to the way Rust handles trait objects, this can
make things more complicated down the line. Let’s look in detail at
the consequences of that decision, and explore other patterns we can
use instead.
The problem §
Let’s say, for the purposes of illustration, that we have a
background task with a FuturesUnordered and we want to push a bunch
of different futures into it, then wait for them and handle the
results as they arise:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use futures::prelude::*;
enum Outcome {
Foo,
Bar,
Baz,
}
async fn foo() -> Outcome { Outcome::Foo }
async fn bar() -> Outcome { Outcome::Bar }
async fn baz() -> Outcome { Outcome::Baz }
async fn run() {
let mut futures = futures::stream::FuturesUnordered::new();
futures.push(foo());
futures.push(bar()); (1)
futures.push(baz());
while let Some(result) = futures.next().await {
match result {
Outcome::Foo => println!("foo!"),
Outcome::Bar => println!("bar!"),
Outcome::Baz => println!("baz!"),
}
}
}
async fn do_something_else() {
// some long-running async code
}
#[tokio::main]
async fn main() {
let task = tokio::spawn(run());
do_something_else().await;
task.await.unwrap();
}
| 1 | Error: expected future, found a different future |
The error here arises because the future returned by foo() is not
the same type as the future returned by bar(), and
FuturesUnordered requires that all futures inside it be of the same
type.
Just box it? §
A simple solution is to box all the futures:
15
16
17
futures.push(foo().boxed());
futures.push(bar().boxed());
futures.push(baz().boxed());
That compiles and runs, and we commit it and forget about it. I guess this article is shorter than I expected!
Except… a few months later, we want to generalize run() to also take
a user-supplied future:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async fn run(future: impl Future<Output = Outcome>) {
let mut futures = futures::stream::FuturesUnordered::new();
futures.push(foo().boxed());
futures.push(bar().boxed());
futures.push(baz().boxed());
futures.push(future.boxed()); (1)
while let Some(result) = futures.next().await {
use Outcome::*;
match result {
Foo => println!("foo!"),
Bar => println!("bar!"),
Baz => println!("baz!"),
}
}
}
| 1 | Error: `impl Future<Output = Outcome>` cannot be sent between threads safely |
The problem here is that we have made a choice: by writing boxed(),
or other equivalent methods of creating a Box<dyn Future + Send>, we
have committed to requiring that the future being boxed is Send —
otherwise it wouldn’t be safe to create a Box<dyn Future + Send>
from it! From here we have two ways forward:
Require that future: Send: this allows us to use boxed(), but
restricts our function to only take Send futures.
Use boxed_local(): this doesn’t require any additional bounds on
future, but now our resulting future is never Send, and so our
tokio::spawn will fail (even if the future we pass in is in fact
Send!).
What we’d like to do is to have the future returned by run be Send
if and only if future is Send. Rust has considered this case:
Send is an
auto
trait, and so every type — including the future generated by run —
containing only Send types is also a Send type. Unfortunately, if
we are to box the future to add it to our FuturesUnordered, we have
to make a choice within the function whether we want a Box<dyn
Future> or a Box<dyn Future + Send>, and never the twain shall meet
— boxing the future negates the benefits of auto traits. So how can
we do this without boxing?
Either, or… §
The futures library’s response to this is the Either future.
Either allows constructing a future that behaves as either one
future or another:
3
4
5
6
futures.push(Either::Left(Either::Left(foo())));
futures.push(Either::Left(Either::Right(bar())));
futures.push(Either::Right(Either::Left(baz())));
futures.push(Either::Right(Either::Right(future))));
This works, but the noise of the Either tags is rather unpleasant,
especially as we add more cases. In general, to write \(n\)
different futures, we now need to write down \(\log_2 n\) Either
tags — and keep track of what they all mean! Just as with deeply
nested Option or Result types, our code becomes much more
maintainable if we flatten the nested enums into a structure that is
specialized for our use case.
enum MyFuture<Foo, Bar, Baz, User> {
Foo(Foo),
Bar(Bar),
Baz(Baz),
User(User),
}
Then, when it comes to constructing the future, we have some
meaningful names to use instead of Either::Left(Either::Right(…)):
3
4
5
6
futures.push(MyFuture::Foo(foo()));
futures.push(MyFuture::Bar(bar()));
futures.push(MyFuture::Baz(baz()));
futures.push(MyFuture::User(future));
The only downside is that we now have to manually implement our future:
use std::{pin::Pin, task::{Context, Poll}};
impl<Foo, Bar, Baz, User> Future for MyFuture<Foo, Bar, Baz, User>
where
Foo: Future,
Bar: Future<Output = Foo::Output>,
Baz: Future<Output = Foo::Output>,
User: Future<Output = Foo::Output>,
{
type Output = Foo::Output;
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
use MyFuture::*;
match self.project() { (1)
Foo(foo) => foo.poll(context),
Bar(bar) => bar.poll(context),
Baz(baz) => baz.poll(context),
User(user) => user.poll(context),
}
}
}
| 1 | and don’t forget pin-project! |
Don’t we have computers for this? §
I don’t know about you, but I hate manually writing futures. In general, the compiler is capable of generating this boilerplate for us: that’s why it’s a type error to write
fn one_or_the_other(maybe: bool) -> impl Future<Output = Outcome> {
if maybe { (1)
foo()
} else {
bar() (2)
}
}
| 1 | `if` and `else` have incompatible types |
| 2 | expected future, found a different future |
but it isn’t a type error to write
async fn one_or_the_other(maybe: bool) -> Outcome {
if maybe {
foo().await
} else {
bar().await
}
}
Namely, the compiler will take the if in the async function above
and use it to generate an enum very much like the one we wrote
manually, along with a Future implementation. Can’t we get it to do
the same thing for our MyFuture enum? In fact, we can!
Although not for the actual type, which doesn’t contain enough intermediate state for all the different futures: we have to add a level of indirection.
impl<Foo, Bar, Baz, User> MyFuture<Foo, Bar, Baz, User>
where
Foo: Future,
Bar: Future<Output = Foo::Output>,
Baz: Future<Output = Foo::Output>,
User: Future<Output = Foo::Output>,
{
async fn into_future(self) -> Foo::Output { (1)
use MyFuture::*;
match self {
Foo(foo) => foo.await,
Bar(bar) => bar.await,
Baz(baz) => baz.await,
User(user) => user.await,
}
}
}
| 1 | Note that we can’t use the IntoFuture trait, as that would
require us to write down the type of this anonymous future. |
The trick here is that since into_future is a single function (with
the same type parameters), the future type it generates is the same
every time. So now we can call:
futures.push(MyFuture::Foo(foo()).into_future());
futures.push(MyFuture::Bar(bar()).into_future());
futures.push(MyFuture::Baz(baz()).into_future());
futures.push(MyFuture::User(future).into_future());
In fact we can simplify this further, removing the known future type
parameters from MyFuture, by pushing the construction of the futures
we know how to construct inside the function as well:
enum Job<User> {
Foo,
Bar,
Baz,
User(User),
}
impl<User: Future<Output = Outcome>> Job<User> {
async fn run(self) -> Outcome {
match self {
Job::Foo => foo().await,
Job::Bar => bar().await,
Job::Baz => baz().await,
Job::User(user) => user.await,
}
}
}
Then our function becomes just:
3
4
5
6
futures.push(Job::Foo.run());
futures.push(Job::Bar.run());
futures.push(Job::Baz.run());
futures.push(Job::User(future).run());
Wrapping up §
It’s been a long journey, but I hope the path to get here made the final result more comprehensible — and maybe the real code was the tradeoffs we made along the way. So, putting it all together, we end up with the code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
use futures::prelude::*;
enum Outcome {
Foo,
Bar,
Baz,
}
async fn foo() -> Outcome { Outcome::Foo }
async fn bar() -> Outcome { Outcome::Bar }
async fn baz() -> Outcome { Outcome::Baz }
async fn run(future: impl Future<Output = Outcome>) {
enum Job<User> {
Foo,
Bar,
Baz,
User(User),
}
impl<User: Future<Output = Outcome>> Job<User> {
async fn run(self) -> Outcome {
match self {
Job::Foo => foo().await,
Job::Bar => bar().await,
Job::Baz => baz().await,
Job::User(user) => user.await,
}
}
}
let mut futures = futures::stream::FuturesUnordered::new();
futures.push(Job::Foo.run());
futures.push(Job::Bar.run());
futures.push(Job::Baz.run());
futures.push(Job::User(future).run());
while let Some(result) = futures.next().await {
match result {
Outcome::Foo => println!("foo!"),
Outcome::Bar => println!("bar!"),
Outcome::Baz => println!("baz!"),
}
}
}
async fn do_something_else() {
// some long-running async code
}
#[tokio::main]
async fn main() {
let task = tokio::spawn(run(foo()));
do_something_else().await;
task.await.unwrap();
}
With this, we’ve avoided boxing the futures in the FuturesUnordered,
which means we no longer have to make a decision about whether they’re
Send or not — the run() future inherits its Sendness from
future and the other component futures, and will automatically be
Send if they all are; if one of them is not Send then run() will
also not be Send, but will continue to compile and run in contexts
that don’t require it to be Send.
At the same time, by implementing an interpreter for our tasks as an
async function we’ve managed to avoid manually implementing Future
for our unboxed future, letting the compiler do the work. As usual,
if we need to write down the name of the future for some reason, using
async functions will not work and we’ll have to resort to naming all
the futures involved, then combining them as we did with the
MyFuture implementation above.
The structure of this technique resembles
defunctionalization,
but is applied to futures rather than functions. By carefully
de-future-izing our tasks, we are able to consolidate the generation
of the futures into a single async function, which the Rust compiler
can then use to generate a combined future.