Skip to content
Published at:

19. Concurrency — 并发编程

Rust 的并发编程模型建立在所有权和类型系统之上,通过 SendSync trait 在编译期保证线程安全。本章介绍 Rust 中三种主要的并发范式:std::thread 的 fork-join 并行、mpsc 通道的消息传递、以及基于 Mutex/RwLock/Atomic 的共享可变状态。

线程基础:spawn 与 join

std::thread::spawn 启动一个新的操作系统线程,接受一个 FnOnce 闭包:

rust
use std::thread;

let handle = thread::spawn(|| {
    println!("Hello from a child thread");
});

// 等待子线程结束,获取返回值
let result = handle.join().unwrap();

spawn 返回 JoinHandle<T>,其中 T 是闭包的返回类型。join() 返回 thread::Result<T>——如果子线程 panic,join() 返回 Err,将 panic 隔离在线程边界内。

跨线程移动数据

spawn 要求闭包是 'static 的,因为编译器无法确定子线程会运行多久。这意味着必须通过 move 闭包转移数据的所有权:

rust
let data = vec![1, 2, 3];

let handle = thread::spawn(move || {
    println!("{:?}", data);  // data 的所有权移入子线程
});
// data 在此不再可用

跨线程共享不可变数据

多个线程共享只读数据时,使用 Arc(原子引用计数):

rust
use std::sync::Arc;

let glossary = Arc::new(load_large_database());

let mut handles = vec![];
for _ in 0..8 {
    let g = Arc::clone(&glossary);  // 只递增引用计数,不拷贝数据
    handles.push(thread::spawn(move || {
        process_with_glossary(&g);
    }));
}

ArcRc 的核心区别:Arc 使用原子操作维护引用计数,因此可以安全地跨线程共享;Rc 使用非原子操作,只能在单线程中使用。

Send 与 Sync

这两个 trait 是 Rust 线程安全的基础,由编译器自动推导:

Trait含义自动实现条件
Send可以安全地将所有权转移到另一个线程所有字段都是 Send
Sync可以安全地通过共享引用跨线程访问所有字段都是 Sync

大多数类型都是 Send + Sync。关键例外:

  • Rc<T> 既不是 Send 也不是 Sync(引用计数非原子)
  • RefCell<T>Send 但不是 Sync(运行时借用检查非线程安全)
  • *const T / *mut T 既不是 Send 也不是 Sync(raw pointer 不提供任何保证)
  • Mutex<T>Send + Sync(当 T: Send 时)

当类型不满足要求时,编译器会给出清晰的错误信息,阻止数据竞争在编译期发生。

Mutex<T> — 互斥锁

Rust 的 Mutex<T> 将受保护的数据存储于锁的内部,只有通过 lock() 方法才能访问:

rust
use std::sync::{Arc, Mutex};

let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
    let counter = Arc::clone(&counter);
    handles.push(thread::spawn(move || {
        let mut num = counter.lock().unwrap();
        *num += 1;
    }));
}

for handle in handles {
    handle.join().unwrap();
}

assert_eq!(*counter.lock().unwrap(), 10);

lock() 返回 LockResult<MutexGuard<T>>MutexGuard 实现了 DerefDerefMut,可以像引用一样操作内部数据。当 MutexGuard 离开作用域时,锁自动释放。

中毒(Poisoning)

如果线程在持有锁时 panic,Mutex 会被标记为"中毒"。后续 lock() 调用返回 Err(PoisonError),提醒调用者受保护数据的不变量可能已被破坏。你可以选择:

  • .unwrap() — 将 panic 传播到当前线程
  • .lock().unwrap_or_else(|e| e.into_inner()) — 忽略中毒,强制获取数据

Mutex 的注意事项

  • 不可重入:同一线程两次 lock() 同一个 Mutex 会导致死锁
  • 临界区尽量小:持有锁的时间越短越好
  • 避免在持有锁时调用外部代码:可能导致死锁或性能问题
  • Mutex 不是万能药:它解决了数据竞争,但不能解决所有并发问题(如逻辑上的竞争条件)

RwLock<T> — 读写锁

当读操作远多于写操作时,RwLockMutex 更高效:

rust
use std::sync::RwLock;

let config = RwLock::new(AppConfig::default());

// 多个线程可以同时读取
let enabled = config.read().unwrap().feature_enabled;

// 写操作独占访问
let mut cfg = config.write().unwrap();
cfg.reload_from_disk()?;

RwLock 的规则与 Rust 的借用规则一致:多个 read() 或一个 write(),但不能同时存在。区别在于 RwLock 在运行时执行检查,可能 panic 或死锁(取决于实现)。

Condvar — 条件变量

条件变量用于线程间的信号通知,通常与 Mutex 配合使用:

rust
use std::sync::{Arc, Mutex, Condvar};

let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);

// 工作线程
thread::spawn(move || {
    let (lock, cvar) = &*pair2;
    let mut started = lock.lock().unwrap();
    *started = true;
    cvar.notify_one();  // 通知等待者
});

// 主线程等待
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
    started = cvar.wait(started).unwrap();  // 释放锁并阻塞,被唤醒后重新获取锁
}

Condvar::wait 以值的形式接收 MutexGuard 并返回新的 MutexGuard——这保证了在等待期间锁被释放,唤醒后锁被重新获取。始终在 while 循环中检查条件,以处理虚假唤醒。

通道(mpsc)

std::sync::mpsc 提供多生产者、单消费者通道:

rust
use std::sync::mpsc;
use std::thread;

let (tx, rx) = mpsc::channel();

// 生产者线程
let tx1 = tx.clone();
thread::spawn(move || {
    tx1.send("message from thread 1").unwrap();
});

thread::spawn(move || {
    tx.send("message from thread 2").unwrap();
});

// 消费者
for received in rx {
    println!("Got: {}", received);
}
  • channel() 创建异步通道(无界缓冲)
  • sync_channel(n) 创建同步通道(缓冲区大小为 n,满时 send 阻塞),提供背压机制
  • Sender 可以 CloneReceiver 不能克隆
  • Receiver 实现了 Iterator,可以直接用 for 循环接收
  • 当所有 Sender 被 drop 时,迭代自动结束

原子类型

std::sync::atomic 模块提供无锁并发原语:

类型对应单线程类型
AtomicBoolbool
AtomicUsize / AtomicIsizeusize / isize
AtomicU8 ~ AtomicU64u8 ~ u64
AtomicPtr<T>*mut T

原子操作通过 Ordering 参数控制内存顺序:

rust
use std::sync::atomic::{AtomicUsize, Ordering};

static COUNTER: AtomicUsize = AtomicUsize::new(0);

COUNTER.fetch_add(1, Ordering::SeqCst);  // 原子递增
let value = COUNTER.load(Ordering::Acquire);  // 原子读取
COUNTER.store(0, Ordering::Release);  // 原子写入

内存顺序选项:

Ordering保证性能
Relaxed只保证原子性,不保证顺序最快
Acquire / Release成对使用,建立 happens-before 关系中等
SeqCst全局顺序一致性最慢(但通常可接受)

原子类型的典型应用:取消标志、全局计数器、无锁数据结构。

全局可变状态

Rust 默认禁止全局可变状态。声明 static mut 是可能的,但访问它需要 unsafe。安全的替代方案:

原子全局变量

rust
use std::sync::atomic::AtomicUsize;

static PACKETS_SERVED: AtomicUsize = AtomicUsize::new(0);
// 在任何地方安全地递增
PACKETS_SERVED.fetch_add(1, Ordering::SeqCst);

lazy_static + Mutex

rust
use lazy_static::lazy_static;
use std::sync::Mutex;
use std::collections::HashMap;

lazy_static! {
    static ref CACHE: Mutex<HashMap<String, Data>> = Mutex::new(HashMap::new());
}

Rayon:数据并行

Rayon crate 提供基于 work-stealing 的数据并行:

rust
use rayon::prelude::*;

// 并行迭代
let sum: u64 = (0..1_000_000u64)
    .into_par_iter()
    .map(|n| n * n)
    .sum();

// 并行执行两个独立任务
let (v1, v2) = rayon::join(
    || compute_heavy_1(),
    || compute_heavy_2(),
);

Rayon 自动管理线程池,通过 work-stealing 实现负载均衡——相比手动划分任务,通常能获得更好的 CPU 利用率。

Crossbeam

Crossbeam crate 提供了比标准库更丰富的并发工具:

  • Scoped thread:允许安全地借用栈上数据,无需 Arc
  • 无锁数据结构SegQueueArrayQueue
  • 增强的通道select! 多通道选择、after/tick 定时器
rust
use crossbeam::thread;

let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8];

crossbeam::scope(|s| {
    for chunk in data.chunks_mut(2) {
        s.spawn(move |_| {
            for item in chunk {
                *item *= 2;
            }
        });
    }
}).unwrap();
// scope 保证所有线程在此处已结束,data 可安全使用

并发范式选择指南

场景推荐方案原因
CPU 密集型、可拆分计算Rayon / fork-join确定性、无锁、易推理
松耦合组件通信mpsc 通道清晰的边界,天然的背压
共享配置/缓存RwLock<T>读多写少时高效
简单共享计数器AtomicUsize最低开销
复杂共享状态Mutex<T>最通用的方案
跨线程借用栈数据crossbeam scope无需 Arc

小结

  • Rust 的并发安全不依赖运行时检查,而是通过 Send/Sync trait 在编译期保证——要么编译通过,要么得到清晰的错误信息。
  • 线程创建spawn/join)是 fork-join 并行的基础;线程间的 panic 被隔离,通过 JoinHandle 传播。
  • Mutex<T> 将数据封装在锁内部,强制"先获取锁再访问数据"的模式,从设计上杜绝了"忘记加锁"的 bug。
  • 通道mpsc)实现消息传递模型;sync_channel 提供背压机制防止内存无限增长。
  • 原子类型提供无锁的原语操作,适用于简单计数器和标志位场景。
  • RayonCrossbeam 等 crate 在标准库基础上提供了更高级、更易用的并发抽象。
  • Rust 引导你使用结构化的并发模式——隔离线程、明确所有权——这恰好也是编写可维护并发程序的最佳实践。