一直对 rust 的 async 有一些迷思,例如:
- async 函数为什么会返回一个 Future?这个 Future 什么东西?我们为什么需要一个 async runtime?
经过一段时间的大脑升级,终于理解了一些东西。
Future
是什么
Future
代表了一个未完成的异步操作的状态。
请循其本!当然要来看代码了:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
Future
是一个 trait。唯一的一个方法是 poll
,返回一个 Poll<Self::Output>
。直观上理解好像是轮询一样的东西,如果返回了 Ready(T)
就说明数据来了;反之则是还需要等待。
但是 Pin
和 Context
又是什么东西呢?以及,为什么 Future 会是一个 trait 呢?另一门语言的 std::future
只是一个普通的结构体而已。
另一方面,我们在写异步代码的时候,好像也没有去实现过什么 poll
之类的东西。是编译器帮我们生成的吗?编译器生成的结构体是怎么实现 Future
trait 的呢?这个结构体本身又长什么样呢?
带着诸如此类的问题,我们将从 Generator
开始,一步步理解 Future
.
Generator
s
在某些语言中,有一种特殊的函数,叫做 Generator
。与普通的函数不同,Generator
可以“返回”多次。
函数初次被调用时,会向正常的函数一样从函数的第一条语句开始执行。而当函数被再次调用时,会回到上一次 yield
的位置,并恢复当时的状态(局部变量等)。
int series() {
for (int i = 0; ; ++i) {
yield i;
}
}
例如,在上面的例子中,我们每次调用 series
的时候都会得到不一样的值。series
的返回值是一个递增的整数序列。实际上,还可以有更加复杂的例子:
int series() {
for (;;) {
for (int i = 0; i < 10; ++i) {
yield i;
}
for (int i = 10; i > 0; --i) {
yield i;
}
}
}
这样的 Generator
实际上就是一个状态机,它的状态是各个局部变量的值,以及函数的执行位置(或者说是 栈帧 + 寄存器堆 + PC)。状态的迁移是 yield
语句和指令的执行过程。yield
语句的作用则是暂停状态机的执行。(并把状态保存在某个地方,以供下次恢复)
async
函数的状态变迁
async
函数在某种意义上与 Generator
非常相似。考虑如下的例子:
async fn foo() {
let fd = open_some_file();
let content = fd.read_all().await;
let result = process(content);
result
}
该函数首先打开文件,然后从文件中读取内容,最后处理内容。在这个过程中,存在一个 await
点。我们可以感性理解一下这个函数的执行过程:
- 函数开始执行,打开文件
- 从文件中读取内容
- 读取文件是一个耗时操作,需要等待 IO 的完成。不如先让函数暂停,去做其他事情,等待 IO 完成后再继续执行。
- IO 完成后,重新回到该点,继续执行代码
- 处理文件内容,返回结果
在此处,await
的作用是暂停当前函数的执行,并让出执行流的控制权。这与 Generator
的 yield
是非常相似的。这样以来,我们的 foo
函数实际上被划分为了三个状态:初态、等待文件读入
终态。当执行到 await
时,程序会将当前的状态设置为等待文件读入,并让出控制权。等到文件读入完成后,程序会转移到终态,并继续执行代码。
Poorman’s Future
既然我们看不懂 Pin
和 Context
,不如假装他们不存在。来实现一个我们自己的 Future
trait。
pub trait Future {
type Output;
fn poll(self: &mut Self) -> Poll<Self::Output>;
}
enum MyFuture {
Start,
WaitingFileContent {
fd: impl Future<Output = File>,
},
End,
}
这代表了我们之前提到的三个状态。接下来我们来给他实现 poll
:
impl Future for MyFuture {
type Output = String;
fn poll(self: &mut Self) -> Poll<Self::Output> {
match self {
MyFuture::Start => {
let fd = open_some_file();
*self = MyFuture::WaitingFileContent { fd };
Poll::Pending
}
MyFuture::WaitingFileContent { fd } => {
let content_future = fd.read_all();
match content_future.poll() {
Poll::Ready(content) => {
*self = MyFuture::End;
Poll::Ready(process(content))
},
Poll::Pending => Poll::Pending,
}
}
MyFuture::End => panic!("poll after completion"),
}
}
}
似乎目前看来…除了需要外面一直轮询之外,并没有任何问题?
自引用与 Pin
async fn foo() {
let array = [1, 2, 3];
let element = &array[2];
let fd = open_some_file();
let content: Vec<u8> = fd.read_all().await;
let result = process(content);
result
}
假如我们对上面的示例代码稍作修改,就会发现,我们的结构体中,竟然出现了自引用!
enum MyFuture {
Start,
WaitingFileContent {
array: [usize; 3], // <-------| 自引用!
element: &usize, // ----------|
fd: impl Future<Output = File>,
},
End,
}
假如我们的 MyFuture
在内存上的位置出现了变动,那么我们的 element
就会变成了一个无效的指针,这是因为我们的 element
是一个指向 array
的指针,而 array
在内存上的位置发生了变动。这导致我们需要把这个结构体 Pin
住,禁止去 move 它。
因此,我们的 Future
trait 现在变为
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>) -> Poll<Self::Output>;
}
Async Runtime
我们为什么需要一个 async runtime?async runtime 都做了什么?
协作式多任务
之前我们提到,我们希望在进行 IO 操作时,能把 CPU 让给其他任务使用。那么“任务”究竟是个什么东西呢?我们在使用 tokio 的时候,可以使用 tokio::spawn
来创建新的“任务”。tokio 内部也有一些神秘的调度策略,当某个任务完成或被暂停时,tokio 就会去调度执行其他任务。
我们不妨先实现一个比较乞丐的 async runtime:task 里面只存储了一个在堆上动态分发的 Future,而执行器也只是一个任务队列罢了。
struct Task {
future: Pin<Box<dyn Future<Output = ()>>>,
}
pub struct MyExecutor {
task_queue: VecDeque<Task>,
}
当我们去 spawn
一个新的任务时,我们会把它放到队列的尾部。而当我们去执行任务时,我们会把队列的头部的任务取出来 poll
一下,如果完成了就好,没完成就放进队列里面去。
impl Executor {
pub fn run(&mut self) {
while let Some(mut task) = self.task_queue.pop_front() {
match task.poll() {
Poll::Ready(()) => {} // task done
Poll::Pending => self.task_queue.push_back(task),
}
}
}
}
Waker
轮询也太拉了,完全不符合我对异步的想象。我们不应该去频繁的 poll
一些 Future
。例如,正在等待 IO 的任务,我们不应该每次都去 poll
,而是应该等 IO 操作完成后再去 poll
它。更具体的说,当一个 Task
的叶子节点完成相关操作后,我们应该把它的父节点也 poll
一下,这样就可以继续执行其他的 Task
。
这时,我们不禁回想起了 Future::poll
的最后一个参数: cx: &mut Context<'_>
. 难道这就是我们需要的 Waker
吗?打开 rust 文档,发现 Context
竟然只有两个方法:
pub fn from_waker(waker: &'a Waker) -> Context<'a>
pub fn waker(&self) -> &'a Waker
原来这个东西就是个 waker 啊 正是如此。目前来看,Context
只是 Waker
的包装。
Waker 会和 runtime 提供的能力紧密配合。例如我们可以让 wake 被调用时,让相应的 task 被插入 ready 队列中。
Async Cancellation
Future
可能会在任意一个 await 点被取消掉,不过好在 Future
的取消是传播的。而 Task
的取消则大有不同。对于 tokio
和 async-std
来说,Task
的取消是不传播的,Task
的 JoinHandle
被 drop
掉之后竟然是默认 detach 的,令人大跌眼镜。
我愿称之为异步 rust 中的空指针异常。
–rapiz
这非常不结构化并发。可能会造成各种各样的问题,例如一些没有本应该取消掉的 Task 还在持续运行,或者是 channel 的一段已经关闭,而另一端还在尝试读取/发送。因此我们往往需要手工实现结构化并发的 guard. 让每一个子 Task
都活得比它的父 Task
短。
References
- https://doc.rust-lang.org/beta/unstable-book/language-features/generators.html
- https://www.chiark.greenend.org.uk/~sgtatham/coroutines.html
- https://os.phil-opp.com/async-await/
- https://doc.rust-lang.org/nightly/core/task/struct.Waker.html
- https://cfsamson.github.io/books-futures-explained/introduction.html
- https://doc.rust-lang.org/stable/std/pin/index.html#projections-and-structural-pinning
- https://www.skyzh.dev/posts/articles/2022-01-31-gat-async-trait
- https://rust-lang.github.io/async-book/02_execution/05_io.html
- https://rustmagazine.github.io/rust_magazine_2021/chapter_8/ant-futures-compat.html#std-%E4%B8%AD%E7%9A%84-rawwaker-rawwaker
- https://rustmagazine.github.io/rust_magazine_2021/chapter_12/async_cancel_propagation.html
- https://blog.yoshuawuyts.com/async-cancellation-1
- https://smallcultfollowing.com/babysteps/blog/2019/10/26/async-fn-in-traits-are-hard/
- https://boats.gitlab.io/blog/post/poll-drop/