Rust Async: async-stream源码分析
目前Rust的async/await
语法避免了手动实现Future
的负担。不过由于还没支持yield
,因此要自定义一个Stream
还是要构造状态机。async-stream通过提供了两个宏:stream
和try_stream
来支持yield
语法,完全不依赖unstable的编译器特性,带来了不少便利。
使用方式
stream!
返回一个实现Stream的匿名类型,Stream::Item是yield值的类型;try_stream
类似,不过Stream::Item是Result<T, Error>
,支持?
语法,使用方式大致如下:
fn zero_to_three() -> impl Stream<Item = u32> {
stream! {
for i in 0..3 {
yield i;
}
}
}
fn bind_and_accept(addr: SocketAddr)
-> impl Stream<Item = io::Result<TcpStream>>
{
try_stream! {
let mut listener = TcpListener::bind(&addr)?;
loop {
let (stream, addr) = listener.accept().await?;
println!("received on {:?}", addr);
yield stream;
}
}
}
实现分析
由于async/await应该是目前stable版本的Rust编译器唯一能自动构建状态机的机制,因此内部实现只能是借用它;不过async/await生成的是Future,所以问题转化为寻找一种将Future包装为Stream的方式,也即:Future在poll的时候能够通过某种方式不断向外部吐出Item。由于Future的poll方法签名如下:
fn poll(self: Pin<&mut Self>, ctx: Context<'_>) -> Poll<Self::Output>;
Future通过poll的返回值只能吐出一个,显然不能通过返回值传递;而由于Context不能扩展,因此也不能通过将Item设置在Context中进行返回。因此只剩下通过副作用隐式传递:比如thread_local/task_local,并发队列等。最轻量的应该是task_local,不过需要和特定的异步runtime绑定,不通用;并发队列会有不必要的跨线程同步开销;thread_local是通用并且性能损耗较小的方案。
因此我们需要能够将Future转换成Stream的AsyncStream包装类,同时支持包装类和内部future通讯的channel,需要他们在使用上大致如下:
let (sender, receiver) = thread_local_channel();
let stream = AsyncStream::new(receiver, async move {
for i in 0..10 {
sender.send(i).await;
}
});
while let Some(item) = stream.next().await {
//...
}
下面具体看看async-stream的实现。
AsyncStream的实现
AsyncStream的包装类定义如下:
pub struct AsyncStream<T, U> {
rx: Receiver<T>,
done: bool,
generator: U,
}
其中generator就是被包装的Future,内部会持有Sender,用于对外发送Item,Sender会将Item设置在thread_local中;Receiver是接收端,负责从thread_local里读取Item。
Stream的实现就比较简单直白了:
impl<T, U> Stream for AsyncStream<T, U>
where
U: Future<Output = ()>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe {
let me = Pin::get_unchecked_mut(self);
if me.done {
return Poll::Ready(None);
}
let mut dst = None;
let res = {
// 设置dst的地址进thread_local中
let _enter = me.rx.enter(&mut dst);
Pin::new_unchecked(&mut me.generator).poll(cx)
};
me.done = res.is_ready();
if dst.is_some() {
return Poll::Ready(dst.take());
}
if me.done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
}
基于thread_local的发送接收实现
为了做到通用,存储在thread_local中的指针抹除掉了具体类型,具体类型记录在Sender和Receiver中。为了允许嵌套使用thread_local,设计了一个Enter的RAII结构,在析构的时候将原先的值设置回thread_local中。
thread_local!(static STORE: Cell<*mut ()> = Cell::new(ptr::null_mut()));
pub(crate) struct Enter<'a, T> {
_rx: &'a mut Receiver<T>,
prev: *mut (),
}
struct Send<T> {
value: Option<T>,
}
impl<T: Unpin> Future for Send<T> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
if self.value.is_none() {
return Poll::Ready(());
}
STORE.with(|cell| unsafe {
let ptr = cell.get() as *mut Option<T>;
// receiver必须先调用enter设置接收指针。
let option_ref = ptr.as_mut().expect("invalid usage");
if option_ref.is_none() {
*option_ref = self.value.take();
}
// 注意返回的是Pending,这样可以立即返回到AsyncStream中。
Poll::Pending
})
}
}
impl<T> Receiver<T> {
pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> {
let prev = STORE.with(|cell| {
let prev = cell.get();
cell.set(dst as *mut _ as *mut ());
prev
});
Enter { _rx: self, prev }
}
}
impl<'a, T> Drop for Enter<'a, T> {
fn drop(&mut self) {
STORE.with(|cell| cell.set(self.prev));
}
}
stream!
宏的实现
显然AsyncStream并不能包装任意的Future,因为他约定了和Future通过Sender/Receiver传递Item的方式,因此不适合直接暴露给用户用,以避免用户随便拿个Future往AsyncStream上套。所以定义了一个stream!
宏给用户用。其实现方式是将yield expr
语句全部替换为sender.send(expr).await
:
stream! {
for i in 0..10 {
yield i;
}
}
转换为:
{
let (sender, receiver) = yielder::pair();
AsyncStream::new(receiver, async move {
for i in 0..10 {
sender.send(i).await;
}
})
}
因为syn库已经支持了包括yield
语句在内的rust语法解析,因此proc宏的实现主要是将语法解析出来后对yield语句进行替换。try_stream
的处理方式类似,只是将yield expr
语句全部替换为sender.send(Ok(expr)).await
。
总结
async-stream的实现整体是轻量高效的:
- 没有内存分配;
- Stream每生成一个item,只需要一次thread_local的读写开销。
因此在Rust编译器对Stream原生支持前,这应该是第三库能够实现的最优方案。