Stream 异步迭代
学习目标
- 理解 Stream trait
- 掌握异步迭代器
- 使用 tokio-stream
核心概念
Stream trait
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
使用 tokio-stream
[dependencies]
tokio-stream = "0.1"
use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() {
let mut stream = tokio_stream::iter(vec![1, 2, 3, 4, 5]);
while let Some(item) = stream.next().await {
println!("{}", item);
}
let mut interval = tokio::time::interval(Duration::from_secs(1));
for _ in 0..5 {
interval.tick().await;
println!("tick!");
}
}
stream! 宏
use async_stream::stream;
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let s = stream! {
for i in 0..5 {
yield i;
}
};
tokio::pin!(s);
while let Some(value) = s.next().await {
println!("{}", value);
}
}
常用操作
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let stream = tokio_stream::iter(1..=10);
let doubled: Vec<_> = stream
.map(|x| x * 2)
.filter(|x| *x > 5)
.take(3)
.collect()
.await;
println!("{:?}", doubled);
}
小结
| 概念 | 说明 |
|---|
Stream | 异步迭代器 |
.next().await | 获取下一个值 |
tokio-stream | Stream 扩展 |
stream! | 创建 Stream 的宏 |