20. Async Programming — 异步编程
异步编程是 Rust 处理高并发 I/O 密集型任务的核心方案。与操作系统线程不同,异步任务的内存开销极小(数百字节 vs 数十 KB),切换成本几乎为零,使得在单台机器上同时运行数十万个连接成为可能。本章从 Future trait 出发,介绍 async/await 语法、tokio 运行时、异步 I/O 以及常见的异步模式。
为什么需要异步
考虑一个简单的 TCP echo 服务器——为每个连接创建一个线程:
let listener = TcpListener::bind("127.0.0.1:8080")?;
for stream in listener.incoming() {
let stream = stream?;
thread::spawn(|| handle_client(stream));
}这在几千个连接时工作良好。但面对数万个并发连接时,问题显现:
- 内存:每个线程栈约 8KB~2MB(取决于 OS 配置),数万线程消耗数 GB 内存
- 创建开销:线程创建约 15us,异步任务约 300ns(约 50 倍差距)
- 上下文切换:线程切换约 1.7us,异步任务切换约 0.2us
异步任务让单个线程在等待 I/O 时处理其他任务,用更少的资源服务更多的连接。
Future Trait
异步编程的核心是 std::future::Future trait:
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}poll不会阻塞,总是立即返回- 如果操作完成,返回
Poll::Ready(value) - 如果尚未完成,返回
Poll::Pending,并通过Context中的 waker 注册"值得再次 poll"的回调 - 一旦 future 返回
Ready,不应再被 poll
所有异步函数的返回值都是一个实现了 Future 的匿名类型。当你调用 async fn foo() -> T 时,实际上获得的是 impl Future<Output = T>。
async 与 await
async 将一个函数或代码块标记为异步;.await 等待一个 future 完成:
use tokio::net::TcpStream;
async fn fetch_url(url: &str) -> Result<String, std::io::Error> {
let mut stream = TcpStream::connect(url).await?;
let mut buf = String::new();
stream.read_to_string(&mut buf).await?;
Ok(buf)
}当执行到 .await 时:
- 如果 future 已经 ready,直接获取值并继续
- 如果 future 是 pending,当前异步函数挂起,控制权交还给运行时
- 当 future 就绪时,运行时从挂起点恢复执行
关键区别:普通函数调用会一直执行到返回;异步函数在 .await 处可能让出执行权,稍后恢复。
async 块
除了 async fn,Rust 还支持 async 块表达式:
let future = async {
let result = fetch_data().await?;
process(result)
};async move 块获取捕获变量的所有权,类似于 move 闭包:
let data = vec![1, 2, 3];
let future = async move {
// data 的所有权移入 future
process(data).await
};异步运行时:Tokio
Rust 本身只定义了 Future trait 和 async/await 语法,不提供运行时。异步代码需要一个运行时来驱动 future 的执行。Tokio 是最广泛使用的异步运行时。
基本组件
#[tokio::main]
async fn main() {
// 启动一个异步任务
let handle = tokio::spawn(async {
"hello from task"
});
let result = handle.await.unwrap();
println!("{}", result);
}#[tokio::main] 宏将 async fn main() 转换为使用 tokio 运行时的普通 main 函数。
异步 I/O
Tokio 提供了标准库 I/O 类型的异步版本:
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, addr) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = socket.read(&mut buf).await?;
if n == 0 { break; }
socket.write_all(&buf[..n]).await?;
}
});
}
}异步 I/O 的关键:当 read 或 write 需要等待时,tokio 会让当前任务休眠,转而处理其他就绪的任务。
异步 Mutex 与 RwLock
标准库的 Mutex 在异步代码中可能导致问题——如果持锁期间跨越了 .await,可能导致死锁或长时间阻塞。Tokio 提供了异步版本:
use tokio::sync::Mutex;
let data = Mutex::new(HashMap::new());
// 异步获取锁——如果不可用则让出任务
let mut map = data.lock().await;
map.insert("key", "value");与标准库 Mutex 的关键区别:
| 特性 | std::sync::Mutex | tokio::sync::Mutex |
|---|---|---|
| 阻塞行为 | 阻塞线程 | 挂起任务 |
跨 .await 持有 | 不推荐 | 允许 |
| guard 的 Send | MutexGuard 不实现 Send | MutexGuard 实现 Send |
| 适用场景 | 临界区短、不跨越 await | 临界区可能包含 await |
异步通道
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(32); // 缓冲区大小 32
// 生产者
tokio::spawn(async move {
tx.send("message").await.unwrap();
});
// 消费者
while let Some(msg) = rx.recv().await {
println!("{}", msg);
}并发组合器
join! — 并发执行多个 future
use tokio::join;
let (result1, result2) = tokio::join!(
fetch_url("https://example.com"),
fetch_url("https://rust-lang.org"),
);join! 并发 poll 所有 future,全部完成后返回所有结果的元组。
select! — 竞速多个 future
use tokio::select;
tokio::select! {
result = long_running_task() => {
println!("Task completed: {:?}", result);
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
println!("Timeout!");
}
}select! 同时 poll 多个 future,第一个完成的胜出,其余被取消。
try_join! — 带错误处理的并发
use tokio::try_join;
let (data1, data2) = tokio::try_join!(
fetch_and_parse("url1"),
fetch_and_parse("url2"),
)?;任何一个 future 返回 Err 时,try_join! 立即返回该错误。
Stream Trait — 异步迭代器
Stream 是 Iterator 的异步对应物:
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}消费 stream 的惯用方式:
use tokio_stream::StreamExt;
let mut stream = /* ... */;
while let Some(item) = stream.next().await {
process(item);
}常见的使用场景:TCP 连接的 incoming stream、消息队列的消费、文件逐行读取。
异步 vs 多线程对比
| 维度 | 异步(async/await) | 多线程 |
|---|---|---|
| 内存开销 | ~数百字节/任务 | ~8KB~2MB/线程 |
| 创建速度 | ~300ns | ~15us |
| 上下文切换 | ~0.2us(用户态协作式) | ~1.7us(内核态抢占式) |
| 并发上限 | 数十万任务 | 数千线程 |
| CPU 密集型 | 不擅长(会阻塞运行时) | 擅长 |
| I/O 密集型 | 擅长 | 可行但资源消耗大 |
| 编程模型 | 协作式调度,需手动插入 yield 点 | 抢占式调度,OS 自动切换 |
| 调试 | 调用栈信息有限 | 调用栈完整 |
| 学习曲线 | 较陡 | 较平缓 |
常见陷阱
在异步代码中调用阻塞函数
在异步任务中调用 std::thread::sleep 或同步 I/O 会阻塞整个 worker 线程:
// 错误示范
async fn bad_example() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞整个 worker 线程!
}
// 正确做法
async fn good_example() {
tokio::time::sleep(Duration::from_secs(1)).await; // 只挂起当前任务
}对于必须执行的阻塞操作,使用 tokio::task::spawn_blocking:
let result = tokio::task::spawn_blocking(|| {
// 阻塞操作在自己的线程中运行
heavy_computation()
}).await.unwrap();跨 .await 持有非 Send 类型
如果 future 需要被 spawn 到线程池(tokio::spawn),它必须是 Send 的。在 .await 点前后仍然存活的值都必须是 Send:
// 错误:Rc 不是 Send
async fn bad() {
let data = Rc::new(42);
some_async_fn().await; // data 跨 await 存活,future 不是 Send
}
// 修正:使用 Arc
async fn good() {
let data = Arc::new(42);
some_async_fn().await; // OK
}长计算占用 worker 线程
async fn cpu_bound() {
// 在计算密集的循环中插入 yield 点
for chunk in data.chunks(1000) {
process_chunk(chunk);
tokio::task::yield_now().await; // 给其他任务执行机会
}
}异步生态关键 crate
| Crate | 用途 |
|---|---|
tokio | 最广泛使用的异步运行时,提供 I/O、同步原语、定时器 |
async-std | 对标标准库 API 的异步运行时 |
futures | Future 组合器、Stream trait 定义、工具函数 |
reqwest | 异步 HTTP 客户端 |
hyper | 底层 HTTP 库 |
tonic | gRPC 框架 |
sqlx | 异步数据库驱动 |
小结
- Future 是异步编程的基石,通过
poll方法驱动执行而不阻塞线程。 - async/await 让异步代码看起来像同步代码,编译器自动将函数体转换为状态机。
- 异步运行时(如 tokio)负责调度任务、管理 I/O 事件、驱动 future 完成。
- 异步 I/O 使用非阻塞系统调用 + epoll/iocp/kqueue,让单个线程管理数万连接。
select!和join!是并发的核心组合器,分别用于竞速和等待全部完成。- 异步擅长 I/O 密集型负载;CPU 密集型任务应使用
spawn_blocking或专用线程池。 - 牢记两个关键限制:跨
.await的值必须是Send(如果使用tokio::spawn),且不要在异步代码中调用阻塞函数。