消息传递 mpsc
学习目标
- 掌握 channel 创建和使用
- 理解多生产者单消费者
- 掌握同步和异步 channel
核心概念
基本 Channel
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec!["hello", "world", "from", "thread"];
for msg in messages {
tx.send(msg.to_string()).unwrap();
thread::sleep(std::time::Duration::from_millis(200));
}
});
for received in rx {
println!("收到: {}", received);
}
}
多生产者
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let mut handles = vec![];
for i in 0..3 {
let tx_clone = tx.clone();
let handle = thread::spawn(move || {
let msg = format!("线程 {} 的消息", i);
tx_clone.send(msg).unwrap();
});
handles.push(handle);
}
drop(tx);
for received in rx {
println!("{}", received);
}
for handle in handles {
handle.join().unwrap();
}
}
同步 Channel(有界)
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::sync_channel(5);
thread::spawn(move || {
for i in 0..10 {
println!("发送 {}", i);
tx.send(i).unwrap();
}
});
for received in rx {
println!("收到: {}", received);
thread::sleep(std::time::Duration::from_millis(200));
}
}
try_recv 非阻塞
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
tx.send("delayed message").unwrap();
});
loop {
match rx.try_recv() {
Ok(msg) => {
println!("收到: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("暂无消息,继续工作...");
thread::sleep(Duration::from_millis(300));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("发送端已关闭");
break;
}
}
}
}
实践练习
练习 1:日志收集器
use std::sync::mpsc;
use std::thread;
enum LogLevel {
Info(String),
Warn(String),
Error(String),
}
fn logger(rx: mpsc::Receiver<LogLevel>) {
for log in rx {
match log {
LogLevel::Info(msg) => println!("[INFO] {}", msg),
LogLevel::Warn(msg) => println!("[WARN] {}", msg),
LogLevel::Error(msg) => eprintln!("[ERROR] {}", msg),
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
let logger_handle = thread::spawn(move || logger(rx));
for i in 0..5 {
let tx = tx.clone();
thread::spawn(move || {
tx.send(LogLevel::Info(format!("Worker {} 完成任务", i))).unwrap();
});
}
drop(tx);
logger_handle.join().unwrap();
}
小结
| 操作 | 语法 |
|---|
| 创建 channel | mpsc::channel() |
| 发送 | tx.send(value) |
| 接收(阻塞) | rx.recv() / for x in rx |
| 接收(非阻塞) | rx.try_recv() |
| 多生产者 | tx.clone() |
| 同步 channel | mpsc::sync_channel(capacity) |