19. Concurrency — 并发编程
Rust 的并发编程模型建立在所有权和类型系统之上,通过 Send 和 Sync trait 在编译期保证线程安全。本章介绍 Rust 中三种主要的并发范式:std::thread 的 fork-join 并行、mpsc 通道的消息传递、以及基于 Mutex/RwLock/Atomic 的共享可变状态。
线程基础:spawn 与 join
std::thread::spawn 启动一个新的操作系统线程,接受一个 FnOnce 闭包:
use std::thread;
let handle = thread::spawn(|| {
println!("Hello from a child thread");
});
// 等待子线程结束,获取返回值
let result = handle.join().unwrap();spawn 返回 JoinHandle<T>,其中 T 是闭包的返回类型。join() 返回 thread::Result<T>——如果子线程 panic,join() 返回 Err,将 panic 隔离在线程边界内。
跨线程移动数据
spawn 要求闭包是 'static 的,因为编译器无法确定子线程会运行多久。这意味着必须通过 move 闭包转移数据的所有权:
let data = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("{:?}", data); // data 的所有权移入子线程
});
// data 在此不再可用跨线程共享不可变数据
多个线程共享只读数据时,使用 Arc(原子引用计数):
use std::sync::Arc;
let glossary = Arc::new(load_large_database());
let mut handles = vec![];
for _ in 0..8 {
let g = Arc::clone(&glossary); // 只递增引用计数,不拷贝数据
handles.push(thread::spawn(move || {
process_with_glossary(&g);
}));
}Arc 与 Rc 的核心区别:Arc 使用原子操作维护引用计数,因此可以安全地跨线程共享;Rc 使用非原子操作,只能在单线程中使用。
Send 与 Sync
这两个 trait 是 Rust 线程安全的基础,由编译器自动推导:
| Trait | 含义 | 自动实现条件 |
|---|---|---|
Send | 可以安全地将所有权转移到另一个线程 | 所有字段都是 Send |
Sync | 可以安全地通过共享引用跨线程访问 | 所有字段都是 Sync |
大多数类型都是 Send + Sync。关键例外:
Rc<T>既不是Send也不是Sync(引用计数非原子)RefCell<T>是Send但不是Sync(运行时借用检查非线程安全)*const T/*mut T既不是Send也不是Sync(raw pointer 不提供任何保证)Mutex<T>是Send + Sync(当T: Send时)
当类型不满足要求时,编译器会给出清晰的错误信息,阻止数据竞争在编译期发生。
Mutex<T> — 互斥锁
Rust 的 Mutex<T> 将受保护的数据存储于锁的内部,只有通过 lock() 方法才能访问:
use std::sync::{Arc, Mutex};
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(*counter.lock().unwrap(), 10);lock() 返回 LockResult<MutexGuard<T>>。MutexGuard 实现了 Deref 和 DerefMut,可以像引用一样操作内部数据。当 MutexGuard 离开作用域时,锁自动释放。
中毒(Poisoning)
如果线程在持有锁时 panic,Mutex 会被标记为"中毒"。后续 lock() 调用返回 Err(PoisonError),提醒调用者受保护数据的不变量可能已被破坏。你可以选择:
.unwrap()— 将 panic 传播到当前线程.lock().unwrap_or_else(|e| e.into_inner())— 忽略中毒,强制获取数据
Mutex 的注意事项
- 不可重入:同一线程两次
lock()同一个Mutex会导致死锁 - 临界区尽量小:持有锁的时间越短越好
- 避免在持有锁时调用外部代码:可能导致死锁或性能问题
- Mutex 不是万能药:它解决了数据竞争,但不能解决所有并发问题(如逻辑上的竞争条件)
RwLock<T> — 读写锁
当读操作远多于写操作时,RwLock 比 Mutex 更高效:
use std::sync::RwLock;
let config = RwLock::new(AppConfig::default());
// 多个线程可以同时读取
let enabled = config.read().unwrap().feature_enabled;
// 写操作独占访问
let mut cfg = config.write().unwrap();
cfg.reload_from_disk()?;RwLock 的规则与 Rust 的借用规则一致:多个 read() 或一个 write(),但不能同时存在。区别在于 RwLock 在运行时执行检查,可能 panic 或死锁(取决于实现)。
Condvar — 条件变量
条件变量用于线程间的信号通知,通常与 Mutex 配合使用:
use std::sync::{Arc, Mutex, Condvar};
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
// 工作线程
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one(); // 通知等待者
});
// 主线程等待
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap(); // 释放锁并阻塞,被唤醒后重新获取锁
}Condvar::wait 以值的形式接收 MutexGuard 并返回新的 MutexGuard——这保证了在等待期间锁被释放,唤醒后锁被重新获取。始终在 while 循环中检查条件,以处理虚假唤醒。
通道(mpsc)
std::sync::mpsc 提供多生产者、单消费者通道:
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::channel();
// 生产者线程
let tx1 = tx.clone();
thread::spawn(move || {
tx1.send("message from thread 1").unwrap();
});
thread::spawn(move || {
tx.send("message from thread 2").unwrap();
});
// 消费者
for received in rx {
println!("Got: {}", received);
}channel()创建异步通道(无界缓冲)sync_channel(n)创建同步通道(缓冲区大小为n,满时send阻塞),提供背压机制Sender可以Clone,Receiver不能克隆Receiver实现了Iterator,可以直接用for循环接收- 当所有
Sender被 drop 时,迭代自动结束
原子类型
std::sync::atomic 模块提供无锁并发原语:
| 类型 | 对应单线程类型 |
|---|---|
AtomicBool | bool |
AtomicUsize / AtomicIsize | usize / isize |
AtomicU8 ~ AtomicU64 | u8 ~ u64 |
AtomicPtr<T> | *mut T |
原子操作通过 Ordering 参数控制内存顺序:
use std::sync::atomic::{AtomicUsize, Ordering};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
COUNTER.fetch_add(1, Ordering::SeqCst); // 原子递增
let value = COUNTER.load(Ordering::Acquire); // 原子读取
COUNTER.store(0, Ordering::Release); // 原子写入内存顺序选项:
| Ordering | 保证 | 性能 |
|---|---|---|
Relaxed | 只保证原子性,不保证顺序 | 最快 |
Acquire / Release | 成对使用,建立 happens-before 关系 | 中等 |
SeqCst | 全局顺序一致性 | 最慢(但通常可接受) |
原子类型的典型应用:取消标志、全局计数器、无锁数据结构。
全局可变状态
Rust 默认禁止全局可变状态。声明 static mut 是可能的,但访问它需要 unsafe。安全的替代方案:
原子全局变量:
use std::sync::atomic::AtomicUsize;
static PACKETS_SERVED: AtomicUsize = AtomicUsize::new(0);
// 在任何地方安全地递增
PACKETS_SERVED.fetch_add(1, Ordering::SeqCst);lazy_static + Mutex:
use lazy_static::lazy_static;
use std::sync::Mutex;
use std::collections::HashMap;
lazy_static! {
static ref CACHE: Mutex<HashMap<String, Data>> = Mutex::new(HashMap::new());
}Rayon:数据并行
Rayon crate 提供基于 work-stealing 的数据并行:
use rayon::prelude::*;
// 并行迭代
let sum: u64 = (0..1_000_000u64)
.into_par_iter()
.map(|n| n * n)
.sum();
// 并行执行两个独立任务
let (v1, v2) = rayon::join(
|| compute_heavy_1(),
|| compute_heavy_2(),
);Rayon 自动管理线程池,通过 work-stealing 实现负载均衡——相比手动划分任务,通常能获得更好的 CPU 利用率。
Crossbeam
Crossbeam crate 提供了比标准库更丰富的并发工具:
- Scoped thread:允许安全地借用栈上数据,无需
Arc - 无锁数据结构:
SegQueue、ArrayQueue等 - 增强的通道:
select!多通道选择、after/tick定时器
use crossbeam::thread;
let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8];
crossbeam::scope(|s| {
for chunk in data.chunks_mut(2) {
s.spawn(move |_| {
for item in chunk {
*item *= 2;
}
});
}
}).unwrap();
// scope 保证所有线程在此处已结束,data 可安全使用并发范式选择指南
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| CPU 密集型、可拆分计算 | Rayon / fork-join | 确定性、无锁、易推理 |
| 松耦合组件通信 | mpsc 通道 | 清晰的边界,天然的背压 |
| 共享配置/缓存 | RwLock<T> | 读多写少时高效 |
| 简单共享计数器 | AtomicUsize | 最低开销 |
| 复杂共享状态 | Mutex<T> | 最通用的方案 |
| 跨线程借用栈数据 | crossbeam scope | 无需 Arc |
小结
- Rust 的并发安全不依赖运行时检查,而是通过
Send/Synctrait 在编译期保证——要么编译通过,要么得到清晰的错误信息。 - 线程创建(
spawn/join)是 fork-join 并行的基础;线程间的 panic 被隔离,通过JoinHandle传播。 Mutex<T>将数据封装在锁内部,强制"先获取锁再访问数据"的模式,从设计上杜绝了"忘记加锁"的 bug。- 通道(
mpsc)实现消息传递模型;sync_channel提供背压机制防止内存无限增长。 - 原子类型提供无锁的原语操作,适用于简单计数器和标志位场景。
- Rayon 和 Crossbeam 等 crate 在标准库基础上提供了更高级、更易用的并发抽象。
- Rust 引导你使用结构化的并发模式——隔离线程、明确所有权——这恰好也是编写可维护并发程序的最佳实践。