46·并发编程进阶

Rayon 并行迭代器

Rayon 并行迭代器

学习目标

  1. 掌握 Rayon 基本用法
  2. 理解并行迭代器 API
  3. 掌握 fork-join 模型

核心概念

基本用法

[dependencies]
rayon = "1.10"
use rayon::prelude::*;

fn main() {
    let numbers: Vec<i64> = (1..=1_000_000).collect();

    // 串行
    let sum_serial: i64 = numbers.iter().sum();

    // 并行(只需 .par_iter())
    let sum_parallel: i64 = numbers.par_iter().sum();

    assert_eq!(sum_serial, sum_parallel);
}

并行迭代器

use rayon::prelude::*;

fn main() {
    let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    // par_iter: 不可变并行迭代
    let squares: Vec<i32> = data.par_iter().map(|x| x * x).collect();

    // par_iter_mut: 可变并行迭代
    let mut data = vec![1, 2, 3, 4, 5];
    data.par_iter_mut().for_each(|x| *x *= 2);

    // into_par_iter: 消耗性并行迭代
    let data = vec![1, 2, 3, 4, 5];
    let sum: i32 = data.into_par_iter().sum();

    // par_sort
    let mut data = vec![5, 3, 1, 4, 2];
    data.par_sort();
}

并行 map + reduce

use rayon::prelude::*;

fn main() {
    let text = "hello world foo bar baz qux";

    let word_count = text.par_split_whitespace()
        .fold(|| 0usize, |acc, _| acc + 1)
        .sum::<usize>();

    println!("单词数: {}", word_count);
}

自定义并行任务

use rayon::join;

fn parallel_quick_sort<T: Ord + Send>(v: &mut [T]) {
    if v.len() <= 1 {
        return;
    }

    let pivot = v.len() / 2;
    v.swap(0, pivot);
    let (left, right) = v.split_at_mut(1);

    join(
        || parallel_quick_sort(left),
        || parallel_quick_sort(right),
    );
}

fn main() {
    let mut data: Vec<i32> = (0..1000).rev().collect();
    parallel_quick_sort(&mut data);
    println!("sorted: {}...{}", data[0], data[data.len()-1]);
}

性能对比

use rayon::prelude::*;
use std::time::Instant;

fn is_prime(n: u64) -> bool {
    if n < 2 { return false; }
    let mut i = 2;
    while i * i <= n {
        if n % i == 0 { return false; }
        i += 1;
    }
    true
}

fn main() {
    let range: Vec<u64> = (2..100_000).collect();

    let start = Instant::now();
    let serial: Vec<_> = range.iter().filter(|&&n| is_prime(n)).collect();
    println!("串行: {} 个质数, {:?}", serial.len(), start.elapsed());

    let start = Instant::now();
    let parallel: Vec<_> = range.par_iter().filter(|&&n| is_prime(n)).collect();
    println!("并行: {} 个质数, {:?}", parallel.len(), start.elapsed());
}

实践练习

练习 1:并行文件处理

use rayon::prelude::*;
use std::fs;

fn count_lines_in_file(path: &str) -> usize {
    fs::read_to_string(path)
        .map(|content| content.lines().count())
        .unwrap_or(0)
}

fn main() {
    let files = vec!["file1.txt", "file2.txt", "file3.txt"];
    let total: usize = files.par_iter()
        .map(|f| count_lines_in_file(f))
        .sum();
    println!("总行数: {}", total);
}

小结

方法说明
.par_iter()并行不可变迭代
.par_iter_mut()并行可变迭代
.into_par_iter()并行消耗迭代
rayon::join并行执行两个任务
.par_sort()并行排序

练习编辑器

rust
Loading...