52·异步编程高级

Stream 异步迭代

Stream 异步迭代

学习目标

  1. 理解 Stream trait
  2. 掌握异步迭代器
  3. 使用 tokio-stream

核心概念

Stream trait

// Stream 是异步版的 Iterator
trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

使用 tokio-stream

[dependencies]
tokio-stream = "0.1"
use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(vec![1, 2, 3, 4, 5]);

    while let Some(item) = stream.next().await {
        println!("{}", item);
    }

    // 定时器 Stream
    let mut interval = tokio::time::interval(Duration::from_secs(1));
    for _ in 0..5 {
        interval.tick().await;
        println!("tick!");
    }
}

stream! 宏

use async_stream::stream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let s = stream! {
        for i in 0..5 {
            yield i;
        }
    };

    tokio::pin!(s);
    while let Some(value) = s.next().await {
        println!("{}", value);
    }
}

常用操作

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let stream = tokio_stream::iter(1..=10);

    // map, filter, take
    let doubled: Vec<_> = stream
        .map(|x| x * 2)
        .filter(|x| *x > 5)
        .take(3)
        .collect()
        .await;

    println!("{:?}", doubled);
}

小结

概念说明
Stream异步迭代器
.next().await获取下一个值
tokio-streamStream 扩展
stream!创建 Stream 的宏

练习编辑器

rust
Loading...