flag
mode_edit

一直对 rust 的 async 有一些迷思,例如:

  • async 函数为什么会返回一个 Future?这个 Future 什么东西?我们为什么需要一个 async runtime?

经过一段时间的大脑升级,终于理解了一些东西。

Future 是什么

Future 代表了一个未完成的异步操作的状态。

请循其本!当然要来看代码了:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
    Ready(T),
    Pending,
}

Future 是一个 trait。唯一的一个方法是 poll,返回一个 Poll<Self::Output>。直观上理解好像是轮询一样的东西,如果返回了 Ready(T) 就说明数据来了;反之则是还需要等待。

但是 PinContext 又是什么东西呢?以及,为什么 Future 会是一个 trait 呢?另一门语言的 std::future 只是一个普通的结构体而已。

另一方面,我们在写异步代码的时候,好像也没有去实现过什么 poll 之类的东西。是编译器帮我们生成的吗?编译器生成的结构体是怎么实现 Future trait 的呢?这个结构体本身又长什么样呢?

带着诸如此类的问题,我们将从 Generator 开始,一步步理解 Future.

Generators

在某些语言中,有一种特殊的函数,叫做 Generator。与普通的函数不同,Generator 可以“返回”多次。

函数初次被调用时,会向正常的函数一样从函数的第一条语句开始执行。而当函数被再次调用时,会回到上一次 yield 的位置,并恢复当时的状态(局部变量等)。

int series() {
  for (int i = 0; ; ++i) {
    yield i;
  }
}

例如,在上面的例子中,我们每次调用 series 的时候都会得到不一样的值。series 的返回值是一个递增的整数序列。实际上,还可以有更加复杂的例子:

int series() {
  for (;;) {
    for (int i = 0; i < 10; ++i) {
      yield i;
    }
    for (int i = 10; i > 0; --i) {
      yield i;
    }
  }
}

这样的 Generator 实际上就是一个状态机,它的状态是各个局部变量的值,以及函数的执行位置(或者说是 栈帧 + 寄存器堆 + PC)。状态的迁移是 yield 语句和指令的执行过程。yield 语句的作用则是暂停状态机的执行。(并把状态保存在某个地方,以供下次恢复)

async 函数的状态变迁

async 函数在某种意义上与 Generator 非常相似。考虑如下的例子:

async fn foo() {
  let fd = open_some_file();
  let content = fd.read_all().await;
  let result = process(content);
  result
}

该函数首先打开文件,然后从文件中读取内容,最后处理内容。在这个过程中,存在一个 await 点。我们可以感性理解一下这个函数的执行过程:

  • 函数开始执行,打开文件
  • 从文件中读取内容
    • 读取文件是一个耗时操作,需要等待 IO 的完成。不如先让函数暂停,去做其他事情,等待 IO 完成后再继续执行。
    • IO 完成后,重新回到该点,继续执行代码
  • 处理文件内容,返回结果

在此处,await 的作用是暂停当前函数的执行,并让出执行流的控制权。这与 Generatoryield 是非常相似的。这样以来,我们的 foo 函数实际上被划分为了三个状态:初态、等待文件读入 终态。当执行到 await 时,程序会将当前的状态设置为等待文件读入,并让出控制权。等到文件读入完成后,程序会转移到终态,并继续执行代码。

Poorman’s Future

既然我们看不懂 PinContext,不如假装他们不存在。来实现一个我们自己的 Future trait。

pub trait Future {
    type Output;
    fn poll(self: &mut Self) -> Poll<Self::Output>;
}
enum MyFuture {
  Start,
  WaitingFileContent {
    fd: impl Future<Output = File>,
  },
  End,
}

这代表了我们之前提到的三个状态。接下来我们来给他实现 poll:

impl Future for MyFuture {
  type Output = String;
  fn poll(self: &mut Self) -> Poll<Self::Output> {
    match self {
      MyFuture::Start => {
        let fd = open_some_file();
        *self = MyFuture::WaitingFileContent { fd };
        Poll::Pending
      }
      MyFuture::WaitingFileContent { fd } => {
        let content_future = fd.read_all();
        match content_future.poll() {
          Poll::Ready(content) => {
            *self = MyFuture::End;
            Poll::Ready(process(content))
          },
          Poll::Pending => Poll::Pending,
        }
      }
      MyFuture::End => panic!("poll after completion"),
    }
  }
}

似乎目前看来…除了需要外面一直轮询之外,并没有任何问题?

自引用与 Pin

async fn foo() {
  let array = [1, 2, 3];
  let element = &array[2];
  let fd = open_some_file();
  let content: Vec<u8> = fd.read_all().await;
  let result = process(content);
  result
}

假如我们对上面的示例代码稍作修改,就会发现,我们的结构体中,竟然出现了自引用!

enum MyFuture {
  Start,
  WaitingFileContent {
    array: [usize; 3], // <-------| 自引用!
    element: &usize, // ----------|
    fd: impl Future<Output = File>,
  },
  End,
}

假如我们的 MyFuture 在内存上的位置出现了变动,那么我们的 element 就会变成了一个无效的指针,这是因为我们的 element 是一个指向 array 的指针,而 array 在内存上的位置发生了变动。这导致我们需要把这个结构体 Pin 住,禁止去 move 它。

因此,我们的 Future trait 现在变为

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>) -> Poll<Self::Output>;
}

Async Runtime

我们为什么需要一个 async runtime?async runtime 都做了什么?

协作式多任务

之前我们提到,我们希望在进行 IO 操作时,能把 CPU 让给其他任务使用。那么“任务”究竟是个什么东西呢?我们在使用 tokio 的时候,可以使用 tokio::spawn 来创建新的“任务”。tokio 内部也有一些神秘的调度策略,当某个任务完成或被暂停时,tokio 就会去调度执行其他任务。

我们不妨先实现一个比较乞丐的 async runtime:task 里面只存储了一个在堆上动态分发的 Future,而执行器也只是一个任务队列罢了。

struct Task {
    future: Pin<Box<dyn Future<Output = ()>>>,
}

pub struct MyExecutor {
    task_queue: VecDeque<Task>,
}

当我们去 spawn 一个新的任务时,我们会把它放到队列的尾部。而当我们去执行任务时,我们会把队列的头部的任务取出来 poll 一下,如果完成了就好,没完成就放进队列里面去。

impl Executor {
    pub fn run(&mut self) {
        while let Some(mut task) = self.task_queue.pop_front() {
            match task.poll() {
                Poll::Ready(()) => {} // task done
                Poll::Pending => self.task_queue.push_back(task),
            }
        }
    }
}

Waker

轮询也太拉了,完全不符合我对异步的想象。我们不应该去频繁的 poll 一些 Future。例如,正在等待 IO 的任务,我们不应该每次都去 poll,而是应该等 IO 操作完成后再去 poll 它。更具体的说,当一个 Task 的叶子节点完成相关操作后,我们应该把它的父节点也 poll 一下,这样就可以继续执行其他的 Task

这时,我们不禁回想起了 Future::poll 的最后一个参数: cx: &mut Context<'_>. 难道这就是我们需要的 Waker 吗?打开 rust 文档,发现 Context 竟然只有两个方法:

pub fn from_waker(waker: &'a Waker) -> Context<'a>
pub fn waker(&self) -> &'a Waker

原来这个东西就是个 waker 啊 正是如此。目前来看,Context 只是 Waker 的包装。

Waker 会和 runtime 提供的能力紧密配合。例如我们可以让 wake 被调用时,让相应的 task 被插入 ready 队列中。

Async Cancellation

Future 可能会在任意一个 await 点被取消掉,不过好在 Future 的取消是传播的。而 Task 的取消则大有不同。对于 tokioasync-std 来说,Task 的取消是不传播的,TaskJoinHandledrop 掉之后竟然是默认 detach 的,令人大跌眼镜。

我愿称之为异步 rust 中的空指针异常。

–rapiz

这非常不结构化并发。可能会造成各种各样的问题,例如一些没有本应该取消掉的 Task 还在持续运行,或者是 channel 的一段已经关闭,而另一端还在尝试读取/发送。因此我们往往需要手工实现结构化并发的 guard. 让每一个子 Task 都活得比它的父 Task 短。

References