41·并发编程进阶

消息传递 mpsc

消息传递 mpsc

学习目标

  1. 掌握 channel 创建和使用
  2. 理解多生产者单消费者
  3. 掌握同步和异步 channel

核心概念

基本 Channel

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let messages = vec!["hello", "world", "from", "thread"];
        for msg in messages {
            tx.send(msg.to_string()).unwrap();
            thread::sleep(std::time::Duration::from_millis(200));
        }
    });

    // 接收消息(阻塞)
    for received in rx {
        println!("收到: {}", received);
    }
}

多生产者

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    let mut handles = vec![];

    for i in 0..3 {
        let tx_clone = tx.clone();
        let handle = thread::spawn(move || {
            let msg = format!("线程 {} 的消息", i);
            tx_clone.send(msg).unwrap();
        });
        handles.push(handle);
    }

    drop(tx);  // 关闭原始发送端

    for received in rx {
        println!("{}", received);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

同步 Channel(有界)

use std::sync::mpsc;
use std::thread;

fn main() {
    // 最多缓冲 5 条消息
    let (tx, rx) = mpsc::sync_channel(5);

    thread::spawn(move || {
        for i in 0..10 {
            println!("发送 {}", i);
            tx.send(i).unwrap();  // 超过 5 条时会阻塞
        }
    });

    for received in rx {
        println!("收到: {}", received);
        thread::sleep(std::time::Duration::from_millis(200));
    }
}

try_recv 非阻塞

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        thread::sleep(Duration::from_secs(1));
        tx.send("delayed message").unwrap();
    });

    loop {
        match rx.try_recv() {
            Ok(msg) => {
                println!("收到: {}", msg);
                break;
            }
            Err(mpsc::TryRecvError::Empty) => {
                println!("暂无消息,继续工作...");
                thread::sleep(Duration::from_millis(300));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("发送端已关闭");
                break;
            }
        }
    }
}

实践练习

练习 1:日志收集器

use std::sync::mpsc;
use std::thread;

enum LogLevel {
    Info(String),
    Warn(String),
    Error(String),
}

fn logger(rx: mpsc::Receiver<LogLevel>) {
    for log in rx {
        match log {
            LogLevel::Info(msg) => println!("[INFO] {}", msg),
            LogLevel::Warn(msg) => println!("[WARN] {}", msg),
            LogLevel::Error(msg) => eprintln!("[ERROR] {}", msg),
        }
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();

    let logger_handle = thread::spawn(move || logger(rx));

    for i in 0..5 {
        let tx = tx.clone();
        thread::spawn(move || {
            tx.send(LogLevel::Info(format!("Worker {} 完成任务", i))).unwrap();
        });
    }

    drop(tx);
    logger_handle.join().unwrap();
}

小结

操作语法
创建 channelmpsc::channel()
发送tx.send(value)
接收(阻塞)rx.recv() / for x in rx
接收(非阻塞)rx.try_recv()
多生产者tx.clone()
同步 channelmpsc::sync_channel(capacity)

练习编辑器

rust
Loading...