Rust Async: async-stream源码分析

目前Rust的async/await语法避免了手动实现Future的负担。不过由于还没支持yield,因此要自定义一个Stream还是要构造状态机。async-stream通过提供了两个宏:streamtry_stream来支持yield语法,完全不依赖unstable的编译器特性,带来了不少便利。

使用方式

stream!返回一个实现Stream的匿名类型,Stream::Item是yield值的类型;try_stream类似,不过Stream::Item是Result<T, Error>,支持?语法,使用方式大致如下:

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

fn bind_and_accept(addr: SocketAddr)
    -> impl Stream<Item = io::Result<TcpStream>>
{
    try_stream! {
        let mut listener = TcpListener::bind(&addr)?;
        loop {
            let (stream, addr) = listener.accept().await?;
            println!("received on {:?}", addr);
            yield stream;
        }
    }
}

实现分析

由于async/await应该是目前stable版本的Rust编译器唯一能自动构建状态机的机制,因此内部实现只能是借用它;不过async/await生成的是Future,所以问题转化为寻找一种将Future包装为Stream的方式,也即:Future在poll的时候能够通过某种方式不断向外部吐出Item。由于Future的poll方法签名如下:

fn poll(self: Pin<&mut Self>, ctx: Context<'_>) -> Poll<Self::Output>;

Future通过poll的返回值只能吐出一个,显然不能通过返回值传递;而由于Context不能扩展,因此也不能通过将Item设置在Context中进行返回。因此只剩下通过副作用隐式传递:比如thread_local/task_local,并发队列等。最轻量的应该是task_local,不过需要和特定的异步runtime绑定,不通用;并发队列会有不必要的跨线程同步开销;thread_local是通用并且性能损耗较小的方案。

因此我们需要能够将Future转换成Stream的AsyncStream包装类,同时支持包装类和内部future通讯的channel,需要他们在使用上大致如下:

let (sender, receiver) = thread_local_channel();
let stream = AsyncStream::new(receiver, async move {
    for i in 0..10 {
        sender.send(i).await;
    }
});

while let Some(item) = stream.next().await {
    //...
}

下面具体看看async-stream的实现。

AsyncStream的实现

AsyncStream的包装类定义如下:

pub struct AsyncStream<T, U> {
    rx: Receiver<T>,
    done: bool,
    generator: U,
}

其中generator就是被包装的Future,内部会持有Sender,用于对外发送Item,Sender会将Item设置在thread_local中;Receiver是接收端,负责从thread_local里读取Item。

Stream的实现就比较简单直白了:

impl<T, U> Stream for AsyncStream<T, U>
where
    U: Future<Output = ()>,
{
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        unsafe {
            let me = Pin::get_unchecked_mut(self);
            if me.done {
                return Poll::Ready(None);
            }
            let mut dst = None;
            let res = {
                // 设置dst的地址进thread_local中
                let _enter = me.rx.enter(&mut dst);
                Pin::new_unchecked(&mut me.generator).poll(cx)
            };
            me.done = res.is_ready();
            if dst.is_some() {
                return Poll::Ready(dst.take());
            }
            if me.done {
                Poll::Ready(None)
            } else {
                Poll::Pending
            }
        }
    }
}

基于thread_local的发送接收实现

为了做到通用,存储在thread_local中的指针抹除掉了具体类型,具体类型记录在Sender和Receiver中。为了允许嵌套使用thread_local,设计了一个Enter的RAII结构,在析构的时候将原先的值设置回thread_local中。

thread_local!(static STORE: Cell<*mut ()> = Cell::new(ptr::null_mut()));

pub(crate) struct Enter<'a, T> {
    _rx: &'a mut Receiver<T>,
    prev: *mut (),
}

struct Send<T> {
    value: Option<T>,
}

impl<T: Unpin> Future for Send<T> {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
        if self.value.is_none() {
            return Poll::Ready(());
        }

        STORE.with(|cell| unsafe {
            let ptr = cell.get() as *mut Option<T>;
            // receiver必须先调用enter设置接收指针。
            let option_ref = ptr.as_mut().expect("invalid usage");

            if option_ref.is_none() {
                *option_ref = self.value.take();
            }
            // 注意返回的是Pending,这样可以立即返回到AsyncStream中。
            Poll::Pending
        })
    }
}

impl<T> Receiver<T> {
    pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> {
        let prev = STORE.with(|cell| {
            let prev = cell.get();
            cell.set(dst as *mut _ as *mut ());
            prev
        });

        Enter { _rx: self, prev }
    }
}

impl<'a, T> Drop for Enter<'a, T> {
    fn drop(&mut self) {
        STORE.with(|cell| cell.set(self.prev));
    }
}

stream!宏的实现

显然AsyncStream并不能包装任意的Future,因为他约定了和Future通过Sender/Receiver传递Item的方式,因此不适合直接暴露给用户用,以避免用户随便拿个Future往AsyncStream上套。所以定义了一个stream!宏给用户用。其实现方式是将yield expr语句全部替换为sender.send(expr).await:

stream! {
    for i in 0..10 {
        yield i;
    }
}

转换为:

{
    let (sender, receiver) = yielder::pair();
    AsyncStream::new(receiver, async move {
        for i in 0..10 {
            sender.send(i).await;
        }
    })
}

因为syn库已经支持了包括yield语句在内的rust语法解析,因此proc宏的实现主要是将语法解析出来后对yield语句进行替换。try_stream的处理方式类似,只是将yield expr语句全部替换为sender.send(Ok(expr)).await

总结

async-stream的实现整体是轻量高效的:

  • 没有内存分配;
  • Stream每生成一个item,只需要一次thread_local的读写开销。

因此在Rust编译器对Stream原生支持前,这应该是第三库能够实现的最优方案。

编辑于 2020-10-16 18:51