rust-async
Anyway. Async is way more complicated than sync and it's just for the fucking performance
本文希望厘清Rust异步系统的原理,一些基础概念(building block)以及他们是如何协作的(How they connect).
当我们在谈论异步时我们在谈论什么
我们希望最大化的利用CPU来执行我们的业务逻辑. 因此之前的基于多线程等待的实现中,通过操作系统原生的实现多线程的方式(时间片切换)我们使得CPU能够同时处理多个执行流,这样的话当一个执行流发生阻塞(CPU空闲)时,可以去执行其他执行流,从而提升了CPU的使用率.
但问题在于操作系统的线程切换是有代价的.如果使用成千上万的线程,操作系统的实现反而导致了其大部分的CPU不是花在业务上,而是花在了切换执行流上.因此我们不能寄希望于无脑的开线程来解决问题.线程只是我们的执行业务逻辑的容器罢了.一般来讲线程的数量等同于CPU的核数.
对此Rust的解决方案是将一个完整的执行流视作一个状态机.每一个非CPU逻辑(基本上就是文件读写,网络请求的IO)就是一个await.通过操作系统提供的对IO事件的订阅能力(epoll,iocp etc),使得所有的CPU时间都在执行我们的业务逻辑.从而提高了CPU的使用率.
那么对此就必须有某种DSL,我们使用他来描述业务,这种DSL的特点是 必须能够明确的区分出CPU逻辑和非CPU逻辑,也就是每次调用await的那些地方.
在rust中这种DSL就是新的async函数和await语法.编译器会将async函数编译成一个状态,交给后面的runtime去执行.本文便是希望梳理出runtime是如何工作的,其中涉及到哪些基础概念(building block),以及他们是如何协作的(How they connect).
Future
一个future就是上文所描述的一个执行流.我们通过对一个future不停的应用变化(将其与其他future组合)最终得到了一个大的future.实际上观察js的代码和rust的代码特别使用aync/await的部分,会发现他们惊人的一致.因为他们都是通过这种语法来标记CPU逻辑和非CPU逻辑.作为程序来来将我们通过写await来在业务逻辑的happy path中显示的告诉了编译器.可以被切换的点在哪里.在JS中这些await点被实现成回调函数,在Rust中被实现成状态机的一个状态.所谓的future实际上就是一个多出来的可以让我们有空间有所指之物来表示这个await点的一个东西罢了. 在rust中对于这样的一个await点 我们需要描述的是 1. 是否已经结束 2. 类型是什么.描述清楚这两个东西后我们就可在后面继续写代码了,实际上在JS中类型也不用描述.
通过组合来构造Future
和通过组合来构造Js中的Promise一样,在可以组合时,Rust的异步代码同Js一样简单明了.
async fn main() {
let a = async_read_file_to_string("./a.txt").await;
println!("a {}",a);
}
readfiletostring 函数返回的是一个Future.main函数通过组合readfile_string返回的future来构造出来自己的业务逻辑.
但当我们深入底层时,一切都变得不一样了.
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>;
}
上述代码定义了Future.现在我们希望自己实现asyncreadfiletostring.
fn async_read_file_to_string()->impl Future<String> {
}
这个trait就是在描述状态机.他可以被poll,在每次的poll中他的状态会不停的变化,直到达到最终的状态也就是Output.
Executor为执行器,没有任何阻塞的等待,循环执行一系列就绪的Future,当Future返回pending的时候,会将Future转移到Reactor上等待进一步的唤醒。
Reactor为反应器(唤醒器),轮询并唤醒挂载的事件,并执行对应的wake方法,通常来说,wake会将Future的状态变更为就绪,同时将Future放到Executor的队列中等待执行。
Reactor会不断的poll就绪的事件,然后依次唤醒绑定在事件上的waker,waker唤醒的时候会把对应的task移动到Executor的就绪队列上安排执行。
Mio: Rust封装的底层(主要是网络)事件库,屏蔽操作系统区别,提供了操作系统层面的异步网络事件的能力.
从Waker开始
当仅仅是使用Rust提供的异步组合子,实际上不会涉及到Waker的概念.
// those code copy from https://zhuanlan.zhihu.com/p/66028983
pub trait Future {
/// future 结束时产生的结果类型
type Output;
/// 返回 `Poll::Pending`表示需要等待
/// 返回`Poll::Ready(val)`表示异步已完成
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
// Context结构目前其实就当作Waker用,主要是考虑向前兼容性,以防后面需要增加其他内容。
pub struct Context<'a> {
Waker: &'a Waker,
//...
}
Future本身定义比较简单:实现一个poll的方法,参数包含了Executor传递的Waker。如果整个异步完成了,则返回相应的结果,如果需要等待, 则将Context中的 Waker注册到底层的Reactor中。
Waker的wake到底做了什么
这个实际上要看不同的Runtime的实现,思路最起码有下面两种
ThreadNotify
Waker实际上是ThreadNotify,当Executor执行某个Task发现其NotReady时,就直接调用thread::park();
沉睡当前的线程,而等到对应的事件发生时,例如在Rust Async Book中TimeFuture的实现中另一个线程调用了waker的wake方法此时实际上就是将这个Executor的线程给arc_self.thread.unpark();
了
future-rs自带的local_pool executor大概是这种实现.
将Task重新发送到Executor的执行队列中
等到某个Executor执行时窃取到这个Task并调用Poll. 这里的Executor可以用类似channel的机制来操作,没有Task来时也是park的状态 AsyncBook中的示例executor大概是这种实现.
asyn-std 实现分析
async-std使用async-executor 而async-excutor使用 mutli-task
ref
Rust异步浅谈
Futures Explained in 200 Lines of Rust
Rust异步与并发
Rust Async Book
stjepang's blog
How does async work in async-std?