Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust 异步编程 - 我叫尤加利 #21

Open
hexiaowen opened this issue Jan 24, 2022 · 0 comments
Open

Rust 异步编程 - 我叫尤加利 #21

hexiaowen opened this issue Jan 24, 2022 · 0 comments

Comments

@hexiaowen
Copy link
Owner

Rust 异步编程稳定蛮久了,但我只偶尔用一下,对其中许多概念都不清楚,就学习下。

基本概念

按我之前其他语言的经验,异步编程核心是 event loop,基本模型都是单个 / 多个线程 / 进程阻塞在 epoll 这种 I/O 多路复用的系统调用,有事件就绪一般就在当前线程直接处理了,长时间的工作会扔到单独的线程池里执行,防止阻塞 event loop。Rust 把这一整套模型拆成了多个部分:

  • Future:一段计算逻辑,需要不断 poll 来驱动它执行,最终会返回一个值。
  • Executor:异步编程通常会存在很多并发执行的逻辑,比如管理成千上万个连接。Executor 的作用就是管理和执行 Futures。
  • Waker:Future 一般都有会阻塞的部分,为了高效的执行 Future,Executor 当然不可能一直 poll 阻塞状态中的 Future,所以需要有种机制能够在 Future 可以继续执行时才让 Executor 执行它。Waker 就是当 Future 不再阻塞时用来通知 Executor 的,每个 Future 都会和 Waker 绑定,每个 Executor 都会有自己的 Waker 类型。
  • Reactor:只有 Waker 还不够,还要有调用 Waker 的东西,也就是 Reactor。Reactor 的实现和 Future 紧密相关,只有 Future 阻塞时才需要注册 Waker 到 Reactor,也只有 Future 才知道需要什么样的 Reactor。当 Reactor 认为一个 Future 可以继续执行了,比如套接字可读了、有新连接了,就会调用 Waker。
  • Task:Future 只是逻辑,Executor 在管理 Future 的时候通常需要记录一些信息,比如它的状态是阻塞、就绪还是已经执行完成了。Task 就是 Future 和这些信息的封装,Waker 的实现一般也和 Task 实现相关,因为 Executor 需要找到 Waker 唤醒的 Task,为了避免重复唤醒也需要获取 Task 的状态。Task 和线程就比较像了,Future 是栈和代码,Task 是 context。

如果以 one loop per thread + thread pool 模型来对比的话,event loop 线程既是 Executor 也是 Reactor;Waker 就是通过 epoll_wait 返回的事件找到对应的 Task 并执行;每个套接字都是 Future,比如监听套接字的 Future 逻辑是接收新连接再 spawn 到 Executor,每个连接的 Future 逻辑是处理请求再返回响应;thread pool 也是 Executor,每个长时间执行的任务也是 Future。Waker 使得 Executor 和 Future、Reactor 解耦不再绑定,就可以灵活组合不同的实现。

async/.await

刚开始用 Future 的时候还是 combinator-based,各种 callback、clone,体验极差,async/.await 稳定后体验就好多了。async fn/block 返回的 Future 是用 Generator 实现的,内部实现为状态机,await point 就对应 yield point,不过是根据 Future::poll 的结果来决定要不要 yield。为了实现 zero-cost,状态机的实现至关重要:

  • 状态机的大小:最理想的实现当然是每个状态只需要保存它所需要的数据,所以整个 Future 的大小由占用最大的状态决定。这也涉及到哪些状态需要保存在状态机里,哪些在栈上就行。
  • 状态转换:
    • 状态机通常用 enum 来实现,能够解决大小问题,但如果有些数据存活于状态机的整个生命周期,在状态转换时就会频繁 move,这部分的开销不能忽视。
    • async fn/block 要支持跨 await point 的引用,但这会导致自引用结构,用 move 实现状态转换的话可能会出现野指针,所以状态机也需要解决这个问题。

只有跨 yield point 的变量才需要保存在状态机里,也就是在某 yield point 之前创建,但在它之后还会使用的变量,其它的保存在栈上就行。为了解决状态转换时的 move 开销和自引用结构的安全问题,Rust 会为每个变量分配固定的空间,变量可以被不同状态使用,而不会发生 move。为了避免状态机大小膨胀,新的变量会复用不再使用的变量的空间,需要注意的是,实现了 Drop 的变量会被保存到最后才被释放,可以用 block 提前 drop 来避免状态机变大。

Pin

async fn/block 创建的 Future 通常会是自引用结构,编译器只能保证它生成的代码不会有内存安全问题,但用户代码有可能会 move Future 导致内存不安全。move 自引用结构导致内存不安全的原因是新引用仍然指向了旧地址,如果把自引用结构的地址固定下来的话,比如用 Box,move 就不会有问题,因为 move Box 只是 move 指针而不是指针指向的内容,但是用户能通过 Box<T> 获取 &mut T,再用 mem::replace/swap 之类的方法或者 *Box 还是能够 move 自引用结构,并不能完全解决该问题,而且 Box 会带来额外开销,不符合 zero-cost。为了解决这个问题,Rust 通过类型系统 (Pin + Unpin) 杜绝了在 safe Rust 中 move 导致自引用结构内存不安全的可能,完整的 Future trait 实现如下:

trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
} 

不是所有类型都需要防止 move,只有 T: !Unpin 才需要,对于 T: Unpin 类型而言,Pin 是零开销的,等价于 &mut T。几乎所有类型都是 Unpin,想实现 !Unpin 可以用 PhantomPinnedasync fn/block 创建的 Future 均是 !Unpin,但只有调用过 Future::poll 后才需要 pin 住它,因为 Generator 是惰性的,在初始状态时不可能是自引用结构。Pin 不是不允许 move,poll 过的 Future 也可以 move,比如在支持 work-stealing 的 Executor 里,很有可能会发生 poll 过的 Future 在线程间 move 的情况。Pin 想要保证的是:

Concretely, for pinned data you have to maintain the invariant that its memory will not get invalidated or repurposed from the moment it gets pinned until when drop is called. Only once drop returns or panics, the memory may be reused.

Pin 也是种智能指针,move Pin 不会有问题,但无法在 safe Rust 里通过 Pin<T: !Unpin> 获取 &mut T,只能通过 Pin::get_unchecked_mut,这是 unsafe 的,不在编译器保障之内,需要用户来保证安全性。在 safe Rust 里构造 Pin<T: !Unpin> 需要用 Box::pin(注意,Pin 关注的是 P::Target 是不是 Unpin 而不是指针),也就是 pin 在堆上,安全的原因是 pinned 对象地址已经固定了,safe Rust 里已经不会再有内存安全问题,但这不是 zero-cost(Executor 要执行不同的 Future 就要用 trait object,这本身就需要 Box,所以这点通常不会带来额外开销),所以 Rust 也提供了 unsafe 的 Pin::new_unchecked,能通过引用 pin 在栈上,不安全的原因是还可以 move 原先的变量,所以一般会用创建出来的 Pin<T> 把原先变量 shadow 掉,而且对象地址和栈帧绑定,当从当前栈帧 move 出去时也会有安全问题。

Waker

Executor 执行 Future 传入的 Waker 是和 root-Future(Task) 绑定的,中间的 Future 只需要把 Waker 不断往下传直到 leaf-Future,因为只有 leaf-Future 才会阻塞,才需要注册 Waker 到 Reactor。因为 Task 可能会在线程间移动,poll 相同的 Task 可能会传入不同的 Waker,所以每次 poll 时都需要重新注册 Waker 到 Reactor,而 Reactor 可能会和 Executor 在不同的线程,更新 Waker 会有很多 race,比如在更新 Waker 时,Reactor 用旧 Waker 唤醒了,这时就可以用 AtomicWaker,它高效妥善地处理的相关逻辑,既能解决 race,还能提供 happend-before 关系。

Waker(RawWaker) 是用 vtable 实现的,而不是用 trait object,可能有这几个原因:

  • Waker 会在不同的 Future 间传递,所以 Future 要支持范型或者是动态分发的,而 Future 要能成为 trait object,就要符合 object-safety,就不能有类型参数,所以只能用动态分发。
  • 单个 Future 可能有多个事件源,就可能注册多个 Waker 到 Reactor,所以 Waker 要实现 Clone,这就不符合 object-safety 了,就不能用 trait object,只能用 vtable 来实现动态分发。
  • 用 vtable 也更灵活,也避免了 trait object 的内存分配。

Waker 通知 Executor 执行 Future 的实现和 Executor 如何管理 Task 有关,比如:

  • Executor 给每个 Task 分配 ID,阻塞的 Task 保存在 map 里,那 wake 就是发送 ID 给 Executor。
  • Task 用 Arc 封装,wake 直接发送 Arc<Task> 就行。因为这种实现比较常见,而创建 Waker 又比较麻烦,所以 future-rs 提供了 ArcWake,只要给 Arc<Task> 实现该 trait 就能作为 Waker 使用。

async-global-executor

async-global-executorasync-std 的 Runtime 实现,主要用到了 smol 的几个 subcrates:

  • async-task:Task 的抽象,用于实现 Executor。
  • async-executor:基于 async-task 实现的 Executor。
  • async-io:async I/O 组件和 Reactor 的实现。

async-task

Task 是 stateful Future,async-task 提供了通用的 Task 实现,用起来很简单:

pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
    S: Fn(Runnable) + Send + Sync + 'static, 

Runnable 封装了 future,隐藏了 Waker 和 poll Future 的实现,只要调用 Runnable::run 即可。Waker 的实现和 Executor 如何管理 Task 有关,所以需要 Executor 提供 schedule,Waker 唤醒就是调用该方法,通常会实现为把 Runnable 发送到 task queue 里。Task<F::Output> 是用于获取 Runnable 结果的 Future。

async-task 解决了以下几个问题:

  • 内存分配:Executor 需要用类似 Box<dyn Future> 方式保存不同类型的 Future,所以至少需要一次内存分配。async-task 为每个 Task 分配了一块连续的内存,保存了所有信息,RunnableWakerTask 只需要保存这块内存的指针即可,只需要一次内存分配。因为 Future 和它的结果不会同时存在,所以会共用内存。

  • Future 结果获取:有些 Runtime 只支持 Future::Output = () 的 Future,需要调用方自己用 futures::channel::oneshot 之类的发送结果,async-task 内置了 Task<F::Output> 用于获取结果,不需要额外的内存分配。

  • Task 生命周期和状态管理:RunnableWakerTask 共享了一块内存且各属于不同的部分,可能发生很多竞争问题,比如 Task 已经在 task queue 里了又被 Waker 唤醒了,可能会导致 poll 已经完成的 Future 或者不同的线程同时 poll 了相同的 Future;如果实现为 Task 正在 poll 就不会唤醒,那又有可能丢失唤醒,导致 Task 永远不会完成;获取 Future 结果时也可能出现丢失唤醒的情况,比如 Future 在 Task 检查结果和注册 Waker 之间完成了。async-task 使用 state 字段记录了 Task 当前状态和引用数量,每次状态转换后都会检查可能发生竞态条件的状态,比如说当 Waker 唤醒时发现 Future 处于 RUNNING 状态 (正在被 poll),就只会设置 SCHEDULED 状态,不会调用 schedule 函数;当 Future poll 执行完成后发现设置了 SCHEDULED,就会调用 schedule 函数。

async-executor

async-executor 是基于 async-task 实现的 Executor 组件库,它提供了 work-stealing multi-threaded Executor 和 thread-local LocalExecutorLocalExecutor 基于 Executor 实现,通过类型系统限制了 Future 只能在单个线程运行。async-executor 不是开箱即用的,需要每个 Executor 线程都 block_on 在 Executor::run,各 Runtime 可以基于它实现符合自己特点的 Executor。

Executor 实现的基本模型就是不断运行 task queue 里的 Task,spawn 和 wake 是往 queue 里扔 Task。

while let Some(task) = self.queue.pop() {
    task.run();
} 

对于多线程 Executor,有多种实现选择,比如:

  • 单队列多线程:所有线程都从单个 task queue 里争抢 Task,好处是调度非常公平、实现简单,坏处是 task queue 竞争严重可能会导致性能差。
  • 每个线程各有一个队列:Task 按一定规则分发到某个队列,并固定在单个线程上运行。好处是竞争少,坏处就是线程负载可能不均衡,有热点问题。
  • work-stealing:为了解决上面的不均衡问题,Task 不再固定由某个线程运行,空闲线程会从其他线程的 task queue 里 steal 任务来执行。work-stealing 的实现方式至关重要,如果每个线程都在不停尝试 steal 任务,这并不会减少竞争,还可能会降低性能。

async-executor 的 work-stealing 实现如下:

  • 单个 global queue: ConcurrentQueue<Runnable>, Task 直接扔到 global queue 中;每个 block_on 在 Executor::run 的线程 (Runner) 各有一个 local queue,会加入到 local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>> 来实现 work-stealing。
  • Runner 按照 local -> global -> random sibling 的顺序处理 Task,一次性会偷其他队列里一半的 Task。
  • 当有新 Task 时会唤醒 Runner,但同一时间只会有一个 Runner 处于 notified 状态,该状态的 Runner 会从其他 queue 里偷任务,发现 Task 后会变为 woken 状态再唤醒下一个 Runner,所以当 Task 很多时会一个个唤醒 Runner 而不是立刻唤醒所有的。

以上图为例,global queue 里有多个 Task,但只唤醒了一个 Runner。第一个 Runner 从 global queue 里偷了 4 个 Task 后唤醒下一个,下一个偷了 2 个后再唤醒下一个,最后一个发现 global queue 为空时,从第一个的 local queue 中偷了一半 Task。

对于任务无优先级且执行都很快的 Executor 来说,评价标准大概就是性能和调度公平性,性能包括吞吐和单个任务的延迟,调度公平性决定 .50 .99 max 延迟的差距,负载均衡也包含在这两点里了。async-executor 这种每次只唤醒一个 Runner 的方式能减少竞争,对 cache 友好,从而提高性能,各个 Runner 也会在不断 steal 的过程中实现负载均衡。假如负载完全均衡的话,很可能很多 Runner 同时消耗完了 local queue 里的任务,这时都会去其他 queue 里偷任务,对于这种情况下的竞争,async-executor 会随机选择 sibling queue 为起点来减少 steal 的竞争,但无法减少 global queue 上的竞争,一种优化方式是限制同时 steal 的 Runner 个数或者限制在单个 queue 上 steal 的个数。至于调度公平性,最理想的当然是 FIFO,async-executor 有可能会出现后到的请求被先处理的情况,因为一次偷一个 batch,且 steal 顺序是先 global 再 sibling,后唤醒的 Runner 会先处理 global queue 里后到的 Task。不过 local queue 容量上限是 512,一次也偷不了多少,而且 work-stealing 也能缓解这种问题,所以应该还好。如果真要优化的话,还可以在减小 batch、调整 steal 顺序上做文章,这需要压测来验证效果。

async-io

async-io 提供了异步 I/O 和 Timer 的实现。实现异步 I/O 的策略比较特殊,不是为标准库中各种类型的同步 I/O 接口都提供对应的异步版本,这样工作量太大也难以维护,而是提供了异步 adapter Async,它可以封装任意支持 non-blocking I/O 的类型,并实现了异步 I/O 最基础的功能,包括设置为 non-blocking、readable/writable 通知,再搭配上 read_with/write_with 方法就可以为各种同步 I/O 接口实现对应的异步版本。对于异步 I/O 来说,只有 Future trait 接口的话用起来非常不方便,而且异步 I/O 需求是非常普遍的,所以也需要有类似标准库中的 Read/Write trait,有了标准的 trait 后就能实现各种工具集了,但是目前并没有统一的 AsyncRead/AsyncWrite trait,tokiofutures 分别提供了自己的 trait 和工具集,有一定的割裂。Async 实现的是 futuresAsyncRead/AsyncWrite,所以可以用 futures 提供的工具集 AsyncReadExt/AsyncWriteExt

Async 实现很简单,就是不断读 / 写直到返回 EWOULDBLOCK,然后异步等待可读 / 可写,根据接口的不同可实现为 .await 如果不是 leaf-future 或者 poll 如果是的话。

Reactor 实现也很简单,Linux 下面用的是 epoll + LT + oneshot,它是 lazy-init 的,第一次用到时会创建一个 async-io 线程来驱动它,也就是调用 epoll_wait 之类的等待事件就绪。创建 Async 时会把 fd 添加 (EPOLL_CTL_ADD) 到兴趣列表里,阻塞时就会关注 (EPOLL_CTL_MOD) 对应的事件并把 Waker 保存到 Reactor 里,事件就绪时就会用 Waker 来唤醒对应的 Future。Timer 实现也很简单,用 BTreeMap 保存所有 Timer 触发的时间和对应的 Waker,最近的 Timer 触发时间就是 epoll_wait 的超时时间。Reactor 除了可以由 async-io 驱动,还可以用 block_onblock_on 其实就是只支持单个 Future 的 Executor,不过 async-io 提供的会在传入的 Future 阻塞时来处理 I/O 事件,不过每个进程只有一个 Reactor,用了锁也只有一个线程能调用 epoll_wait,不清楚为啥会这样设计。

async-io 为了更通用,用的是 LT + oneshot,epoll_ctl 的开销不会小。虽然只有一个 event loop 线程,但只用来等待 I/O 事件就绪和调用 Waker 唤醒 Future,单线程应该也可以支撑很高的并发,相比多个 event loop 线程且事件就绪时直接调用对应的 callback 的方式,感觉 async-io 的方式性能会差一点,毕竟是用 Waker 通知 Executor 执行 Future,而不是直接执行 Future。不过确实抽象的更好,Reactor 只做等待和通知的工作,而 event loop 既是 Reactor 也是 Executor。

async-global-executor

async-global-executor 默认由 async-ioasync-executor 构建,实现就是多个线程 async_io::block_onExecutor::run 上,每个线程会跑 2 个 Executor:一个是 global Executor,一个是 LocalExecutor,从而支持 spawnspawn_local。除此之外还提供了 spawn_blockingblocking 也是用 async-task 实现的 Runtime,功能是处理长时间执行的任务防止阻塞 Executor,实现就是提供 async 接口的线程池。

总结

Rust 异步编程概念很多,我感觉最有意思的设计是 Waker,它连接了 Executor 和 Future、Reactor。关于 Rust async 优缺点的讨论有很多,即使 Rust 提供了 async/.await,但相比阻塞式编程而言,还是非常困难和容易出错的,而且 async 是传染性的。async 带来的性能提升随着操作系统的不断优化也越来越小了,现在线程的 context swtich 开销已经很小了,而 async 要实现的非常好才能不影响性能。context-switch 对比了 Rust async 和 Linux thread 的 context switch 和 memory 开销,这里还有 sled 作者关于 async 的讨论。
https://youjiali1995.github.io/rust/async/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant