Skip to content
Published at:

20. Async Programming — 异步编程

异步编程是 Rust 处理高并发 I/O 密集型任务的核心方案。与操作系统线程不同,异步任务的内存开销极小(数百字节 vs 数十 KB),切换成本几乎为零,使得在单台机器上同时运行数十万个连接成为可能。本章从 Future trait 出发,介绍 async/await 语法、tokio 运行时、异步 I/O 以及常见的异步模式。

为什么需要异步

考虑一个简单的 TCP echo 服务器——为每个连接创建一个线程:

rust
let listener = TcpListener::bind("127.0.0.1:8080")?;
for stream in listener.incoming() {
    let stream = stream?;
    thread::spawn(|| handle_client(stream));
}

这在几千个连接时工作良好。但面对数万个并发连接时,问题显现:

  • 内存:每个线程栈约 8KB~2MB(取决于 OS 配置),数万线程消耗数 GB 内存
  • 创建开销:线程创建约 15us,异步任务约 300ns(约 50 倍差距)
  • 上下文切换:线程切换约 1.7us,异步任务切换约 0.2us

异步任务让单个线程在等待 I/O 时处理其他任务,用更少的资源服务更多的连接。

Future Trait

异步编程的核心是 std::future::Future trait:

rust
trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
  • poll 不会阻塞,总是立即返回
  • 如果操作完成,返回 Poll::Ready(value)
  • 如果尚未完成,返回 Poll::Pending,并通过 Context 中的 waker 注册"值得再次 poll"的回调
  • 一旦 future 返回 Ready,不应再被 poll

所有异步函数的返回值都是一个实现了 Future 的匿名类型。当你调用 async fn foo() -> T 时,实际上获得的是 impl Future<Output = T>

async 与 await

async 将一个函数或代码块标记为异步;.await 等待一个 future 完成:

rust
use tokio::net::TcpStream;

async fn fetch_url(url: &str) -> Result<String, std::io::Error> {
    let mut stream = TcpStream::connect(url).await?;
    let mut buf = String::new();
    stream.read_to_string(&mut buf).await?;
    Ok(buf)
}

当执行到 .await 时:

  1. 如果 future 已经 ready,直接获取值并继续
  2. 如果 future 是 pending,当前异步函数挂起,控制权交还给运行时
  3. 当 future 就绪时,运行时从挂起点恢复执行

关键区别:普通函数调用会一直执行到返回;异步函数在 .await 处可能让出执行权,稍后恢复。

async 块

除了 async fn,Rust 还支持 async 块表达式:

rust
let future = async {
    let result = fetch_data().await?;
    process(result)
};

async move 块获取捕获变量的所有权,类似于 move 闭包:

rust
let data = vec![1, 2, 3];
let future = async move {
    // data 的所有权移入 future
    process(data).await
};

异步运行时:Tokio

Rust 本身只定义了 Future trait 和 async/await 语法,不提供运行时。异步代码需要一个运行时来驱动 future 的执行。Tokio 是最广泛使用的异步运行时。

基本组件

rust
#[tokio::main]
async fn main() {
    // 启动一个异步任务
    let handle = tokio::spawn(async {
        "hello from task"
    });

    let result = handle.await.unwrap();
    println!("{}", result);
}

#[tokio::main] 宏将 async fn main() 转换为使用 tokio 运行时的普通 main 函数。

异步 I/O

Tokio 提供了标准库 I/O 类型的异步版本:

rust
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, addr) = listener.accept().await?;
        tokio::spawn(async move {
            let mut buf = [0; 1024];
            loop {
                let n = socket.read(&mut buf).await?;
                if n == 0 { break; }
                socket.write_all(&buf[..n]).await?;
            }
        });
    }
}

异步 I/O 的关键:当 readwrite 需要等待时,tokio 会让当前任务休眠,转而处理其他就绪的任务。

异步 Mutex 与 RwLock

标准库的 Mutex 在异步代码中可能导致问题——如果持锁期间跨越了 .await,可能导致死锁或长时间阻塞。Tokio 提供了异步版本:

rust
use tokio::sync::Mutex;

let data = Mutex::new(HashMap::new());

// 异步获取锁——如果不可用则让出任务
let mut map = data.lock().await;
map.insert("key", "value");

与标准库 Mutex 的关键区别:

特性std::sync::Mutextokio::sync::Mutex
阻塞行为阻塞线程挂起任务
.await 持有不推荐允许
guard 的 SendMutexGuard 不实现 SendMutexGuard 实现 Send
适用场景临界区短、不跨越 await临界区可能包含 await

异步通道

rust
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(32);  // 缓冲区大小 32

// 生产者
tokio::spawn(async move {
    tx.send("message").await.unwrap();
});

// 消费者
while let Some(msg) = rx.recv().await {
    println!("{}", msg);
}

并发组合器

join! — 并发执行多个 future

rust
use tokio::join;

let (result1, result2) = tokio::join!(
    fetch_url("https://example.com"),
    fetch_url("https://rust-lang.org"),
);

join! 并发 poll 所有 future,全部完成后返回所有结果的元组。

select! — 竞速多个 future

rust
use tokio::select;

tokio::select! {
    result = long_running_task() => {
        println!("Task completed: {:?}", result);
    }
    _ = tokio::time::sleep(Duration::from_secs(5)) => {
        println!("Timeout!");
    }
}

select! 同时 poll 多个 future,第一个完成的胜出,其余被取消。

try_join! — 带错误处理的并发

rust
use tokio::try_join;

let (data1, data2) = tokio::try_join!(
    fetch_and_parse("url1"),
    fetch_and_parse("url2"),
)?;

任何一个 future 返回 Err 时,try_join! 立即返回该错误。

Stream Trait — 异步迭代器

StreamIterator 的异步对应物:

rust
trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

消费 stream 的惯用方式:

rust
use tokio_stream::StreamExt;

let mut stream = /* ... */;

while let Some(item) = stream.next().await {
    process(item);
}

常见的使用场景:TCP 连接的 incoming stream、消息队列的消费、文件逐行读取。

异步 vs 多线程对比

维度异步(async/await)多线程
内存开销~数百字节/任务~8KB~2MB/线程
创建速度~300ns~15us
上下文切换~0.2us(用户态协作式)~1.7us(内核态抢占式)
并发上限数十万任务数千线程
CPU 密集型不擅长(会阻塞运行时)擅长
I/O 密集型擅长可行但资源消耗大
编程模型协作式调度,需手动插入 yield 点抢占式调度,OS 自动切换
调试调用栈信息有限调用栈完整
学习曲线较陡较平缓

常见陷阱

在异步代码中调用阻塞函数

在异步任务中调用 std::thread::sleep 或同步 I/O 会阻塞整个 worker 线程:

rust
// 错误示范
async fn bad_example() {
    std::thread::sleep(Duration::from_secs(1));  // 阻塞整个 worker 线程!
}

// 正确做法
async fn good_example() {
    tokio::time::sleep(Duration::from_secs(1)).await;  // 只挂起当前任务
}

对于必须执行的阻塞操作,使用 tokio::task::spawn_blocking

rust
let result = tokio::task::spawn_blocking(|| {
    // 阻塞操作在自己的线程中运行
    heavy_computation()
}).await.unwrap();

跨 .await 持有非 Send 类型

如果 future 需要被 spawn 到线程池(tokio::spawn),它必须是 Send 的。在 .await 点前后仍然存活的值都必须是 Send

rust
// 错误:Rc 不是 Send
async fn bad() {
    let data = Rc::new(42);
    some_async_fn().await;  // data 跨 await 存活,future 不是 Send
}

// 修正:使用 Arc
async fn good() {
    let data = Arc::new(42);
    some_async_fn().await;  // OK
}

长计算占用 worker 线程

rust
async fn cpu_bound() {
    // 在计算密集的循环中插入 yield 点
    for chunk in data.chunks(1000) {
        process_chunk(chunk);
        tokio::task::yield_now().await;  // 给其他任务执行机会
    }
}

异步生态关键 crate

Crate用途
tokio最广泛使用的异步运行时,提供 I/O、同步原语、定时器
async-std对标标准库 API 的异步运行时
futuresFuture 组合器、Stream trait 定义、工具函数
reqwest异步 HTTP 客户端
hyper底层 HTTP 库
tonicgRPC 框架
sqlx异步数据库驱动

小结

  • Future 是异步编程的基石,通过 poll 方法驱动执行而不阻塞线程。
  • async/await 让异步代码看起来像同步代码,编译器自动将函数体转换为状态机。
  • 异步运行时(如 tokio)负责调度任务、管理 I/O 事件、驱动 future 完成。
  • 异步 I/O 使用非阻塞系统调用 + epoll/iocp/kqueue,让单个线程管理数万连接。
  • select!join! 是并发的核心组合器,分别用于竞速和等待全部完成。
  • 异步擅长 I/O 密集型负载;CPU 密集型任务应使用 spawn_blocking 或专用线程池。
  • 牢记两个关键限制:跨 .await 的值必须是 Send(如果使用 tokio::spawn),且不要在异步代码中调用阻塞函数。