长远看来,不建议使用面向机器的语言编写大型并发程序,因为面向机器的语言允许不受限制地使用存储位置及其地址。这就意味着,即使借助复杂的硬件机制,我们也根本没有能力确保程序的可靠性。 ——Per Brinch Hansen(1977 年)
“通信”的模式就是“并行”的模式。 ——Whit Morriss
如果你看待并发的态度在职业生涯中发生过变化,那你并不是特例。 这种现象太常见了。 起初,编写并发代码看起来既简单又有趣。线程、锁、队列等工具很容易上手,固然其中也有很多陷阱,但幸运的是我们知道都有哪些陷阱,并且多加小心就不会出错。 我们总有不得不调试其他人的多线程代码的情况,这时候,你只能得出以下结论:某些人确实不适合使用这些工具。 但迟早,你不得不调试自己的多线程代码。 过去的教训告诉你,如果还没有对多线程技术彻底失望,那么至少也应该对所有多线程代码保持适度的警惕。你偶尔会碰上几篇文章详细解释为什么一些看似正确的多线程惯用法却根本不起作用(与“内存模型”有关),这又进一步强化了这种警惕。但是,你最终会找到一种自己用起来顺手且不会经常出错的并发惯用法。你会把几乎所有经验都塞进那个惯用法中,并且,如果你真的很厉害,那么还能对凭空增加的复杂性说“不”。当然,还有很多惯用法。系统程序员常用的方法包括以下几种。 具有单一作业的后台线程,需要定期唤醒执行作业。 通过任务队列与客户端通信的通用工作池。 管道,数据在其中从一个线程流向下一个线程,每个线程只负责一部分工作。 数据并行处理,假设(无论对错)整个计算机只进行一次主要的大型计算,将这次计算分成 n 个部分且在 n 个线程上运行,并期望机器的所有 n 个核心都能立即开始工作。 同步复杂对象关系,其中多个线程可以访问相同的数据,并且使 用基于互斥锁等底层原语的临时加锁方案避免了竞争。(Java 内置了对此模型的支持,它曾在 20 世纪 90 年代和 21 世纪初非常流行。) 原子化整数操作允许多个核心借助一个机器字大小的字段传递信息来进行通信。(这种惯用法比其他所有方法更难正确使用,除非要交换的数据恰好是整数值,但实际上,数据通常是指针。) 随着时间的推移,你已经对其中的几种方法非常娴熟,还能彼此相安无事地组合使用它们——简直就是艺术大师!如果其他人不会以任何方式修改你的系统,那么就能岁月静好——然而,尽管这些程序可以很好地利用线程,但其中充满了“潜规则”。 Rust 提供了一种更好的并发处理方式,不是强制所有程序采用单一风格(对系统程序员来说这可算不上什么解决方案),而是安全地支持多种风格。通过代码把“潜规则”写出来并由编译器强制执行。 你可能听说过 Rust 能让你编写安全、快速、并发的程序。本章将向你展示它是如何做到的。我们将介绍 3 种使用 Rust 线程的方法。 分叉与合并(fork-join)并行通道 共享可变状态 在此过程中,你会用上迄今为止学过的有关 Rust 语言的所有知识。 Rust 对引用、可变性和生命周期的处理方式在单线程程序中已经足够有价值了,但在并发编程中,这些规则的意义才开始真正显现。它们会扩展你的工具箱,让快速而正确地编写各种风格的多线程代码成为可能——不再怀疑,不再愤世嫉俗,不再恐惧。
19.1 分叉与合并并行
当我们有几个完全独立的任务想要同时完成时,线程 简单的用例就出现了。假设我们正在对大量文档进行自然语言处理。可以写这样一个循环:
fn process_files(filenames: Vec<String>) -> io::Result<()> {
for document in filenames {
let text = load(&document)?; // 读取源文件 let results = process(text); // 计算统计信息 save(&document, results)?; // 写入输出文件
}
Ok(())
}
图 19-1 展示了这个程序的执行过程。

图 19-1:process_files() 的单线程执行 由于每个文档都是单独处理的,因此要想加快任务处理速度,可以将语料库分成多个块并在单独的线程上处理每个块,如图 19-2 所示。

图 19-2:使用分叉与合并方法的多线程文件处理 这种模式称为分叉与合并并行。fork(分叉)是启动一个新线程, join(合并)是等待线程完成。我们已经见过这种技术:第 2 章中曾用它来加速曼德博程序。 出于以下几个原因,分叉与合并并行很有吸引力。 非常简单。分叉与合并很容易实现,在 Rust 中更不容易写错。避免了瓶颈。分叉与合并中没有对共享资源的锁定。任何线程只会在 后一步才不得不等待另一个线程。同时,每个线程都可以自由运行。这有助于降低任务切换开销。 这种模式在性能方面的数学模型对程序员来说比较直观。在 好的情况下,通过启动 4 个线程,我们只花 1⁄4 的时间就能完成原本的工作。图 19-2 展示了不应该期望这种理想加速的一个原因:我们可能无法在所有线程之间平均分配工作。另一个需要注意的原因是,有时分叉与合并程序必须在线程联结后花费一些时间来组合各线程的计算结果。也就是说,完全隔离这些任务可能会产生一些额外的工作。不过,除了这两个原因,任何具有独立工作单元的 CPU 密集型程序都可以获得显著的性能提升。 很容易推断出程序是否正确。只要线程真正隔离了,分叉与合并程序就是确定性的,就像曼德博程序中的计算线程一样。无论线程速度如何变化,程序总会生成相同的结果。这是一个没有竞态条件的并发模型。 分叉与合并的主要缺点是要求工作单元彼此隔离。本章在后面会考虑一些无法完全隔离的问题。 现在,继续以自然语言处理为例。我们将展示几种将分叉与合并模式应用于 process_files 函数的方法。
19.1.1 启动与联结
函数 std::thread::spawn 会启动一个新线程:
use std::thread;
thread::spawn(|| {
println!("hello from a child thread");
});
它会接受一个参数,即一个 FnOnce 闭包或函数型的参数。Rust 会启动一个新线程来运行该闭包或函数的代码。新线程是一个真正的操作系统线程,有自己的栈,就像 C++、C#、Java 中的线程一样。 下面是一个更实际的例子,它使用 spawn 实现了之前的 process_files 函数的并行版本:
use std::{thread, io};
fn process_files_in_parallel(filenames: Vec<String>) ->
io::Result<()> {
// 把工作拆分成几块
const NTHREADS: usize = 8; let worklists = split_vec_into_chunks(filenames, NTHREADS);
分 启动 个线程来 每 个块
// 分叉:启动一个线程来处理每一个块
let mut thread_handles = vec![]; for worklist in worklists { thread_handles.push( thread::spawn(move || process_files(worklist))
);
}
// 联结:等待所有线程结束
for handle in thread_handles { handle.join().unwrap()?;
}
Ok(()) }
下面来逐行分析一下这个函数。
fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {
我们的新函数与原始 process_files 具有相同的类型签名,这样它就是一个方便的无缝替代品了。
// 把工作拆分成几块
const NTHREADS: usize = 8;
let worklists = split_vec_into_chunks(filenames, NTHREADS);
我们使用了尚未展示过的实用函数 split_vec_into_chunks 来拆分工作。它的返回值 worklists 是由向量组成的向量,其中包含从原始向量 filenames 中均分出来的 8 个部分。
// 分叉:启动一个线程来处理每一个块
let mut thread_handles = vec![];
for worklist in worklists {
thread_handles.push(thread::spawn(move || process_files(worklist)));
}
我们会为每个 worklist 启动一个线程。spawn() 会返回一个名为 JoinHandle 的值,稍后会用到。现在,先将所有 JoinHandle 放入一个向量中。
请注意我们是如何将文件名列表放入工作线程的。
在父线程中,通过 for 循环来定义和填充 worklist。
一旦创建了 move 闭包,worklist 就会被移动到此闭包中。
然后 spawn 会将闭包(内含 worklist 向量)转移给新的子线程。
这些操作开销很低。就像第 4 章中讨论过的 Vec
// 联结:等待所有线程结束
for handle in thread_handles {
handle.join().unwrap()?;
}
我们使用之前收集的 JoinHandle 的 .join() 方法来等待所有 8 个线程完成。联结这些线程对于保证程序的正确性是必要的,因为 Rust 程序会在 main 返回后立即退出,即使其他线程仍在运行。这些线程并不会调用析构器,而是直接被“杀死”了。如果这不是你想要的结果,请确保在从 main 返回之前联结了任何你关心的线程。 如果我们通过了这个循环,则意味着所有 8 个子线程都成功完成了。 因此,该函数会以返回 Ok(()) 结束。
Ok(())
}
19.1.2 跨线程错误处理
由于要做错误处理,我们在示例中用于联结子线程的代码比看起来更棘手。再重温一下那行代码:
handle.join().unwrap()?;
.join() 方法为我们做了两件事。 首先,handle.join() 会返回 std::thread::Result,如果子线程出现了 panic,就返回一个错误(Err)。这使得 Rust 中的线程比 C++ 中的线程更加健壮。在 C++ 中,越界数组访问是未定义行为,并且无法保护系统的其余部分免受后果的影响。在 Rust 中, panic 是安全且局限于每个线程的。线程之间的边界充当着 panic 的防火墙,panic 不会自动从一个线程传播到依赖它的其他线程。相 反,一个线程中的 panic 在其他线程中会报告为错误型 Result。程序整体而言很容易恢复。 不过,在本程序中,我们不会尝试任何花哨的 panic 处理,而是会立即在 Result 上使用 .unwrap(),断言它是一个 Ok 结果而不是 Err 结果。如果一个子线程确实发生了 panic,那么这个断言就会失败,所以父线程也会出现 panic。如此一来,我们就显式地将 panic 从子线程传播到了父线程。 其次,handle.join() 会将子线程的返回值传回父线程。我们传给 spawn 的闭包的返回类型是 io::Result<()>,因为它就是 process_files 返回值的类型。此返回值不会被丢弃。当子线程完成时,它的返回值会被保存下来,并且 JoinHandle::join() 会把该值传回父线程。 在这个程序中,handle.join() 返回的完整类型是 std::thread::Resultstd::io::Result<()>。 thread::Result 是 spawn/ joinAPI 的一部分,而 io::Result 是我们的应用程序的一部分。 在这个例子中,展开(unwrap)thread::Result 之后,我们就用 io::Result 上的 ? 运算符显式地将 I/O 错误从子线程传播到了父线程。所有这些看起来可能相当琐碎。但如果只把它当作一行代码,则可以与别的语言对比一下。Java 和 C# 中的默认行为是子线程中的异常会转储到终端,然后被遗忘。在 C++ 中,默认行为是中止进程。在 Rust 中,错误是 Result 值(数据)而不是异常(控制流)。它们会像其他值一样跨线程传递。每当你使用底层线程 API 时, 终都必 须仔细编写错误处理代码,但如果不得不编写错误处理代码,那么 Result 是非常合适的选择。
19.1.3 跨线程共享不可变数据
假设我们正在进行的分析需要一个大型的英语单词和短语的数据库:
// 之前
fn process_files(filenames: Vec<String>)
// 之后
fn process_files(filenames: Vec<String>, glossary: &GigabyteMap)
这个 glossary 会很大,所以要通过引用传递它。该如何修改 process_files_in_parallel 以便将词汇表传给工作线程呢?想当然的改法是不行的:
fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap) -> io::Result<()> {
for worklist in worklists {
thread_handles.push(
spawn(move || process_files(worklist, glossary)), // 错误
);
}
}
我们只给此函数添加了一个 glossary 参数并将其传给 process_files。Rust 报错说:
error: explicit lifetime required in the type of `glossary` |
38 | spawn(move || process_files(worklist, glossary)) // 错误
| ^^^^^ lifetime `'static` required
Rust 对传给 spawn 的闭包的生命周期报了错,而编译器在此处显示的“有用”消息实际上根本没有帮助。 spawn 会启动独立线程。Rust 无法知道子线程要运行多长时间,因此它假设了 坏的情况:即使在父线程完成并且父线程中的所有值都消失后,子线程仍可能继续运行。显然,如果子线程要持续那么久,那么它运行的闭包也需要持续那么久。但是这个闭包有一个有限的生命周期,它依赖于 glossary 引用,而此引用不需要永久存在。 请注意,Rust 拒绝编译此代码是对的。按照我们编写这个函数的方式,一个线程确实有可能遇到 I/O 错误,导致 process_files_in_parallel 在其他线程完成之前退出。在主线程释放词汇表后,子线程可能仍然会试图使用词汇表。这将是一场竞赛,如果主线程获胜,就会赢得“未定义行为”这份大奖。而 Rust 不允许发生这种事。 spawn 似乎过于开放了,无法支持跨线程共享引用。事实上,我们已经在 14.1.2 节中看到过这样的情况。那时候,解决方案是用 move 闭包将数据的所有权转移给新线程。但在这里行不通,因为有许多线程要使用同一份数据。一种安全的替代方案是为每个线程都克隆整个词汇表,但由于词汇表很大,我们不希望这么做。幸运的是,标准库提供了另一种方式:原子化引用计数。 4.4 节介绍过 Arc。是时候使用它了:
use std::sync::Arc;
fn process_files_in_parallel(filenames: Vec<String>, glossary: Arc<GigabyteMap>)
-> io::Result<()>
{ ...
for worklist in worklists {
对 的 会克隆 并增加引 数 并不会克隆
// 对.clone()的调用只会克隆Arc并增加引用计数,并不会克隆
GigabyteMap
let glossary_for_child = glossary.clone(); thread_handles.push( spawn(move || process_files(worklist,
&glossary_for_child))
);
} ... }
我们更改了 glossary 的类型:要执行并行分析,调用者就必须传入
Arc
19.1.4 rayon
标准库的 spawn 函数是一个重要的基础构件,但它并不是专门为分叉与合并的并行而设计的,基于它,我们可以封装出更好的分叉与合并式 API。例如,在第 2 章中,我们使用 crossbeam 库将一些工作拆分为 8 个线程。crossbeam 的作用域线程能非常自然地支持分叉与合并并行。 由 Niko Matsakis 和 Josh Stone 设计的 rayon1 库是另一个例子。它提供了两种运行并发任务的方式: 1rayon 意为“人造丝”,引申为“人造线程”。——译者注
use rayon::prelude::*;
并行做 件事
// “并行做两件事”
let (v1, v2) = rayon::join(fn1, fn2);
// “并行做N件事”
giant_vector.par_iter().for_each(|value| {
do_thing_with_value(value);
});
rayon::join(fn1, fn2) 只是调用这两个函数并返回两个结果。.par_iter() 方法会创建 ParallelIterator,这是一个带 有 map、filter 和其他方法的值,很像 Rust 的 Iterator。在这两种情况下,rayon 都会用自己的工作线程池来尽可能拆分工作。只要告诉 rayon 哪些任务可以并行完成就可以了,rayon 会管理这些线程并尽其所能地分派工作。 图 19-3 展示了对 giant_vector.par_iter().for_each(…) 调用的两种思考方式。(a) rayon 表现得就好像它为向量中的每个元 素启动了一个线程。(b) 在幕后,rayon 在每个 CPU 核心上都有一个工作线程,这样效率更高。这个工作线程池由程序中的所有线程共享。当成千上万个任务同时进来时,rayon 会拆分这些工作。

图 19-3:理论上与实践中的 rayon
下面是一个使用 rayon 的 process_files_in_parallel 版本和一个接受 Vec
use rayon::prelude::*;
fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap) -> io::Result<()> {
filenames
.par_iter()
.map(|filename| process_file(filename, glossary))
.reduce_with(|r1, r2| if r1.is_err() { r1 } else { r2 })
.unwrap_or(Ok(()))
}
比起使用 std::thread::spawn 的版本,这段代码更简短,也不需要很多技巧。我们一行一行地看。 首先,用 filenames.par_iter() 创建一个并行迭代器。 然后,用 .map() 在每个文件名上调用 process_file。这会在一系列 io::Result<()> 型的值上生成一个 ParallelIterator。 后,用 .reduce_with() 来合并结果。在这里,我们会保留第一个错误(如果有的话)并丢弃其余错误。如果想累积所有的错误或者打印它们,也可以在这里修改。 当传递一个能在成功时返回有用值的 .map() 闭包时,.reduce_with() 方法也非常好用。这时可以给 .reduce_with() 传入一个闭包,指定如何组合两个成功结果。
reduce_with 只有在 filenames 为空时才会返回一个为 None 的 Option。在这种情况下,我们会用 Option 的 .unwrap_or() 方法来生成结果 Ok(())。 在幕后,rayon 使用了一种叫作工作窃取的技术来动态平衡线程间的工作负载。相比于 19.1.1 节的手动预先分配工作的方式,这通常能更好地让所有 CPU 都处于忙碌状态。 另外,rayon 还支持跨线程共享引用。幕后发生的任何并行处理都能确保在 reduce_with 返回时完成。这解释了为什么即使该闭包会在多个线程上调用,也能安全地将 glossary 传给 process_file。(顺便说一句,这里使用 map 和 reduce 这两个方法名并非巧合。 由 Google 和 Apache Hadoop 推广的 MapReduce 编程模型与分叉与合并有很多共同点。可以将其看作查询分布式数据的分叉与合并方法。)
19.1.5 重温曼德博集
回想第 2 章,我们曾用分叉与合并并发来渲染曼德博集。这让渲染速度提升了 4 倍,令人印象非常深刻。但考虑到我们让程序在 8 核机器上启动了 8 个工作线程,因此这个速度还不够快。 问题的根源在于我们没有平均分配工作量。计算图像的一个像素相当于运行一个循环(参见 2.6.1 节)。事实上,图像的浅灰色部分(循环会快速退出的地方)比黑色部分(循环会运行整整 255 次迭代的地方)渲染速度要快得多。因此,虽然我们将整个区域划分成了大小相等的水平条带,但创建了不均等的工作负载,如图 19-4 所示。

图 19-4:曼德博程序中的工作分配不均等 使用 rayon 很容易解决这个问题。我们可以为输出中的每一行像素启动一个并行任务。这会创建数百个任务,而 rayon 可以在其线程 中分配这些任务。有了工作窃取机制,任务的规模是无关紧要的。 rayon 会对这些工作进行平衡。 下面是实现代码。第 1 行和 后一行是 2.6.6 节中展示过的 main 函数的一部分,但我们更改了这两行之间的渲染代码:
let mut pixels = vec![0; bounds.0 * bounds.1];
// 把`pixels`拆分成一些水平条带
{
let bands: Vec<(usize, &mut [u8])> = pixels.chunks_mut(bounds.0).enumerate().collect();
bands.into_par_iter().for_each(|(i, band)| {
let top = i;
let band_bounds = (bounds.0, 1);
let band_upper_left = pixel_to_point(bounds, (0, top), upper_left, lower_right);
let band_lower_right =
pixel_to_point(bounds, (bounds.0, top + 1), upper_left, lower_right);
render(band, band_bounds, band_upper_left, band_lower_right);
});
}
write_image(&args[1], &pixels, bounds).expect("error writing PNG file");
首先,创建 bands,也就是要传给 rayon 的任务集合。每个任务只是一个元组类型 (usize, &mut [u8]):第一个是计算所需的行 号,第二个是要填充的 pixels 切片。我们使用 chunks_mut 方法将图像缓冲区分成一些行,enumerate 则会给每一行添加行号,然后 collect 会将所有数值切片对放入一个向量中。(这里需要一个向量,因为 rayon 只能从数组和向量中创建并行迭代器。)接下来,将 bands 转成一个并行迭代器,并使用 .for_each() 方法告诉 rayon 我们想要完成的工作。 由于我们在使用 rayon,因此必须将下面这行代码添加到 main.rs 中:
use rayon::prelude::*;
下面是要添加到 Cargo.toml 中的内容:
[dependencies] rayon = "1"
通过这些更改,现在该程序在 8 核机器上使用了大约 7.75 个核心。速度比以前手动分配工作时快 75%,而且代码更简短。这体现出了让 crate 负责工作分配而不是我们自己去完成的好处。
19.2 通道
通道是一种单向管道,用于将值从一个线程发送到另一个线程。换句话说,通道是一个线程安全的队列。 图 19-5 说明了如何使用通道。通道有点儿像 Unix 管道:一端用于发送数据,另一端用于接收数据。两端通常由两个不同的线程拥有。 但是,Unix 管道用于发送字节,而通道用于发送 Rust 值。 sender.send(item) 会将单个值放入通道,receiver.recv() 则会移除一个值。值的所有权会从发送线程转移给接收线程。如果通道为空,则 receiver.recv() 会一直阻塞到有值发出为止。

图 19-5:String 的通道:字符串 msg 的所有权从线程 1 转移给线程 2 使用通道,线程可以通过彼此传值来进行通信。这是线程协同工作的一种非常简单的方法,无须使用锁或共享内存。这并不是一项新技术。Erlang 中的独立进程和消息传递已经有 30 年历史了。Unix 管道已经有将近 50 年历史了。我们一般会认为管道具 有灵活性和可组合性,而没有意识到它还具有并发的特性,但事实上,管道具有上述所有特性。图 19-6 展示了一个 Unix 管道的例子。当然,这 3 个程序也可以同时工作。

图 19-6:Unix 管道的执行过程 Rust 通道比 Unix 管道更快。发送值只是移动而不是复制,即使要移动的数据结构包含数兆字节数据速度也很快。
19.2.1 发送值
在接下来的几节中,我们将使用通道来构建一个创建倒排索引的并发程序,倒排索引是搜索引擎的关键组成部分之一。每个搜索引擎都会处理特定的文档集合。倒排索引是记录“哪些词出现在哪里”的数据库。 我们将展示与线程和通道有关的部分代码。完整的程序(参见本书在 GitHub 网站上的页面)也不长,大约 1000 行代码。 我们的程序结构是管道式的,如图 19-7 所示。管道只是使用通道的众多方法之一(稍后会讨论其他几种方式),但它们是将并发引入现有单线程程序的最直观方式。

图 19-7:索引构建器管道,其中箭头表示通过通道将值从一个线程发送到另一个线程(未展示磁盘 I/O)
这个程序使用总共 5 个线程分别执行了不同的任务。每个线程在程序的生命周期内不断地生成输出。例如,第一个线程只是将源文档从磁盘逐个读取到内存中。(之所以用一个线程来做这件事,是因为我们
想在这里编写尽可能简单的代码,该代码只会调用像
fs::read_to_string 这样的阻塞式 API。在磁盘工作时,我们不希望 CPU 闲置。)该阶段会为每个文档输出一个表示其内容的长
String,因此这个线程与下一个线程可以通过 String 型通道连接。
我们的程序将从启动读取文件的线程开始。假设 documents 是一个 Vec
use std::sync::mpsc;
use std::{fs, thread};
let (sender, receiver) = mpsc::channel();
let handle = thread::spawn(move || {
for filename in documents {
let text = fs::read_to_string(filename)?;
if sender.send(text).is_err() {
break;
}
}
Ok(())
});
通道是 std::sync::mpsc 模块的一部分,本章稍后会解释这个名字的含义。下面来看这段代码是如何工作的。先创建一个通道:
let (sender, receiver) = mpsc::channel();
channel 函数会返回一个值对:发送者和接收者。底层队列的数据结构是标准库的内部实现细节。通道是有类型的。我们要使用这个通道来发送每个文件的文本,因此 sender 和 receiver 的类型分别为 Sender
let handle = thread::spawn(move || {
和以前一样,使用 std::thread::spawn 来启动一个线程。 sender(而不是 receiver)的所有权会通过这个 move 闭包转移给新线程。接下来的几行代码只会从磁盘读取文件:
for filename in documents {
let text = fs::read_to_string(filename)?;
成功读取文件后,要将其文本发送到通道中:
if sender.send(text).is_err() { break;
} }
sender.send(text) 会将 text 值移动到通道中。最终,通道会再次把 text 值转交给接收到该值的任何对象。无论 text 包含 10 行文本还是 10 兆字节,此操作都只会复制 3 个机器字(String 结构体的大小),相应的 receiver.recv() 调用也只会复制 3 个机器字。 send 方法和 recv 方法都会返回 Result,这两种方法只有当通道的另一端已被丢弃时才会失败。如果 Receiver 已被丢弃,那么 send 调用就会失败,因为如果不失败,则该值会永远存在于通道中:没有 Receiver,任何线程都无法再接收它。同样,如果通道中没有值在等待并且 Sender 已被丢弃,则 recv 调用会失败,因为 如果不失败,recv 就只能永远等待: 没有 Sender,任何线程都无法再发出下一个值。丢弃通道的某一端是正常的“挂断”方式,完成后就会关闭连接。 在我们的代码中,只有当接收者的线程提前退出时, sender.send(text) 才会失败。这是使用通道的典型代码。无论接收者是故意退出还是出错退出,读取者线程都可以悄悄地自行关闭。 无论是发生了这种情况还是线程读取完了所有文档,程序都会返回 Ok(()):
Ok(())
});
请注意,这个闭包返回了一个 Result。如果线程遇到 I/O 错误,则会立即退出,错误会被存储在线程的 JoinHandle 中。 当然,就像其他编程语言一样,Rust 在错误处理方面也有许多其他选择。当发生错误时,可以使用 println! 将其打印出来,然后再处理下一个文件。还可以通过用于传递数据的同一通道传递错误,把它变成 Result 的通道——或者为传递错误创建第二个通道。我们在这里选择的方法既轻量又可靠:我们使用了 ? 运算符,这样就不会有一堆样板代码,甚至连 Java 中可能看到的显式 try/catch 都没有,而且也不会悄无声息地传递错误。 为便于使用,程序会把所有这些代码都包装在一个函数中,该函数会返回至今尚未用到的 receiver 和新线程的 JoinHandle:
fn start_file_reader_thread(
documents: Vec<PathBuf>,
) -> (mpsc::Receiver<String>, thread::JoinHandle<io::Result<()>>) {
let (sender, receiver) = mpsc::channel();
let handle = thread::spawn(move || {});
(receiver, handle)
}
请注意,这个函数会启动新线程并立即返回。我们会为管道的每个阶段编写一个类似的函数。 19.2.2 接收值 现在我们有了一个线程来运行发送值的循环。接下来可以启动第二个线程来运行调用 receiver.recv() 的循环:
while let Ok(text) = receiver.recv() {
do_something_with(text);
}
但 Receiver 是可迭代的,所以还有更好的写法:
for text in receiver {
do_something_with(text);
}
这两个循环是等效的。无论怎么写,当控制流到达循环顶部时,只要通道恰好为空,接收线程在其他线程发送值之前都会阻塞。当通道为空且 Sender 已被丢弃时,循环将正常退出。在我们的程序中,当读取者线程退出时,循环会自然而然地退出。该线程正在运行一个拥有变量 sender 的闭包,当闭包退出时,sender 会被丢弃。现在可以为管道的第二阶段编写代码了:
fn start_file_indexing_thread(
texts: mpsc::Receiver<String>,
) -> (mpsc::Receiver<InMemoryIndex>, thread::JoinHandle<()>) {
let (sender, receiver) = mpsc::channel();
let handle = thread::spawn(move || {
for (doc_id, text) in texts.into_iter().enumerate() {
let index = InMemoryIndex::from_single_document(doc_id, text);
if sender.send(index).is_err() {
break;
}
}
});
(receiver, handle)
}
这个函数会启动一个线程,该线程会从一个通道(texts)接收 String 值并将 InMemoryIndex 值发送给另一个通道(sender/receiver)。这个线程的工作是获取第一阶段加载的每个文件,并将每个文档变成一个小型单文件内存倒排索引。 这个线程的主循环很简单。索引文档的所有工作都是由函数 InMemoryIndex::from_single_document 完成的。我们不会在 这里展示它的源代码,你只要知道它会在单词边界处拆分输入字符串,然后生成从单词到位置列表的映射就可以了。 这个阶段不会执行 I/O,所以不必处理各种 io::Error。它会返回 () 而非 io::Result<()>。
19.2.3 运行管道
其余 3 个阶段的设计也是类似的。每个阶段都会使用上一阶段创建的 Receiver。对管道的其余部分,我们设定的目标是将所有小索引合并到磁盘上的单个大索引文件中。最快的方法是将这个任务分为 3 个阶段。我们不会在这里展示代码,只会展示这 3 个函数的类型签名。完整的源代码请参见在线文档。首先,合并内存中的索引,直到它们变得“笨重”(第三阶段):
fn start_in_memory_merge_thread(file_indexes:
mpsc::Receiver<InMemoryIndex>)
-> (mpsc::Receiver<InMemoryIndex>, thread::JoinHandle<()>)
然后,将这些大型索引写入磁盘(第四阶段):
fn start_index_writer_thread(big_indexes:
mpsc::Receiver<InMemoryIndex>, output_dir: &Path)
-> (mpsc::Receiver<PathBuf>, thread::JoinHandle<io::Result<()>>)
最后,如果有多个大文件,就用基于文件的合并算法合并它们(第五阶段):
fn merge_index_files(files: mpsc::Receiver<PathBuf>, output_dir:
&Path)
-> io::Result<()>
最后一个阶段不会返回 Receiver,因为它是此管道的末尾。这个阶段会在磁盘上生成单个输出文件。它也不会返回 JoinHandle,因为我们没有为这个阶段启动线程。这项工作是在调用者的线程上完成的。 现在来看一下启动线程和检查错误的代码:
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf) -> io::Result<()> {
// 启动管道的所有5个阶段
let (texts, h1) = start_file_reader_thread(documents);
let (pints, h2) = start_file_indexing_thread(texts);
let (gallons, h3) = start_in_memory_merge_thread(pints);
let (files, h4) = start_index_writer_thread(gallons, &output_dir);
let result = merge_index_files(files, &output_dir);
// 等待这些线程结束,保留它们遇到的任何错误
let r1 = h1.join().unwrap();
h2.join().unwrap();
h3.join().unwrap();
let r4 = h4.join().unwrap();
// 返回遇到的第一个错误(如果有的话)(如你所见,h2和h3
// 不会失败,因为这些线程都是纯粹的内存数据处理) r1?; r4?; result
}
和以前一样,使用 .join().unwrap() 显式地将 panic 从子线程传播到主线程。这里唯一不寻常的事情是:我们没有马上使用 ?,而是将 io::Result 值放在一边,直到所有 4 个线程都联结完成。这个管道比等效的单线程管道快 40%。这一下午的工作还算小有所成,但与曼德博程序曾获得的 675% 的提升相比就有点儿微不足道了。我们显然没有让系统的 I/O 容量或所有 CPU 核心的工作量饱和。这是怎么回事? 管道就像制造业工厂中的装配流水线,其性能受限于最慢阶段的吞吐量。一条全新的、未调整过的装配线可能和单元化生产一样慢,只有对装配流水线做针对性的调整才能获得回报。在这个例子中,测量表明第二阶段是瓶颈。我们的索引线程使用了 .to_lowercase() 和 .is_alphanumeric(),因此它会花费大量时间在 Unicode 表中查找。对于索引下游的其他阶段,它们大部分时间在 Receiver::recv 中休眠,等待输入。 这意味着应该还可以更快。只要解决了这些瓶颈,并行度就会提高。既然你已经知道如何使用通道,再加上我们的程序是由孤立的代码片段组成的,那么就很容易找到解决第一个瓶颈的方法。可以手动优化第二阶段的代码,就像优化其他代码一样,将工作拆分成两个或更多阶段,或同时运行多个文件索引线程。
19.2.4 通道的特性与性能
std::sync::mpsc 中的 mpsc 代表多生产者、单消费者(multiproducer, single-consumer),这是对 Rust 通道提供的通信类型的简洁描述。 这个示例程序中的通道会将值从单个发送者传送到单个接收者。这是相当普遍的案例。但是 Rust 通道也支持多个发送者,如果需要的话,你可以用一个线程来处理来自多个客户端线程的请求,如图 19-8 所示。

图 19-8:单个通道接收来自多个发送者的请求
Sender
use std::sync::mpsc;
let (sender, receiver) = mpsc::sync_channel(1000);
同步通道与常规通道非常像,但在创建时可以指定它能容纳多少个 值。对于同步通道,sender.send(value) 可能是一个阻塞操作。 毕竟,有时候阻塞也不是坏事。在我们的示例程序中,将 start_file_reader_thread 中的 channel 更改为具有 32 个值空间的 sync_channel 后,可将基准数据集上的内存使用量节省 2/3,却不会降低吞吐量。
19.2.5 线程安全:Send 与 Sync
迄今为止,我们一直假定所有值都可以在线程之间自由移动和共享。这基本正确,但 Rust 完整的线程安全故事取决于两个内置特型,即 std::marker::Send 和 std::marker::Sync。
实现了 Send 的类型可以安全地按值传给另一个线程。它们可以跨线程移动。
实现了 Sync 的类型可以安全地将一个值的不可变引用传给另一个线程。它们可以跨线程共享。
这里所说的安全,就是我们一直在强调的意思:没有数据竞争和其他未定义行为。
例如,在本章开头的 process_files_in_parallel 示例中,我们使用闭包将 Vec

图 19-9:Send 类型和 Sync 类型
如果 Rc

图 19-10:为什么 Rc
use std::rc::Rc;
use std::thread;
fn main() {
let rc1 = Rc::new("ouch".to_string());
let rc2 = rc1.clone();
thread::spawn(move || { // 错误 rc2.clone();
});
rc1.clone();
}
Rust 会拒绝编译这段代码,并给出详细的错误消息:
error: `Rc<String>` cannot be sent between threads safely
|
10 | thread::spawn(move || { // 错误
| ^^^^^ `Rc<String>` cannot be sent between threads safely
|
= help: the trait `std::marker::Send` is not implemented for `Rc<String>`
= note: required because it appears within the type
`[closure@...]`
= note: required by `std::thread::spawn`
现在可以看出 Send 和 Sync 如何帮助 Rust 加强线程安全了。对于跨线程边界传输数据的函数,Send 和 Sync 会作为函数类型签名中的限界。当你生成(spawn)一个线程时,传入的闭包必须实现了 Send 特型,这意味着它包含的所有值都必须是 Send 的。同样,如果要通过通道将值发送到另一个线程,则该值必须是 Send 的。 19.2.6 绝大多数迭代器能通过管道传给通道 我们的倒排索引构建器是作为管道构建的。虽然代码很清晰,但需要手动建立通道和启动线程。相比之下,我们在第 15 章中构建的迭代器流水线似乎将更多的工作打包到了几行代码中。可以为线程管道构建类似的东西吗? 如果能统一迭代器流水线和线程管道就好了。这样索引构建器就可以写成迭代器流水线了。它可能是这样开始的:
documents
.into_iter()
.map(read_whole_file)
.errors_to(error_sender) // 过滤出错误结果
.off_thread() // 为上面的工作生成线程
.map(make_single_file_index)
.off_thread() // 为第二阶段生成另一个线程
特型允许我们向标准库类型添加一些方法,所以确实可以这样做。首先,编写一个特型来声明自己想要的方法:
use std::sync::mpsc;
pub trait OffThreadExt: Iterator {
将这个迭代 转换为线程外迭代 发生在
/// 将这个迭代器转换为线程外迭代器:`next()`调用发生在
/// 单独的工作线程上,因此该迭代器和循环体会同时运行
fn off_thread(self) -> mpsc::IntoIter<Self::Item>; }
然后,为迭代器类型实现这个特型。mpsc::Receiver 已经是可迭代类型了,对于我们的实现很有帮助:
use std::thread;
impl<T> OffThreadExt for T
where
T: Iterator + Send + 'static,
T::Item: Send + 'static,
{
fn off_thread(self) -> mpsc::IntoIter<Self::Item> {
// 创建一个通道把条目从工作线程中传出去
let (sender, receiver) = mpsc::sync_channel(1024);
// 把这个迭代器转移给新的工作线程,并在那里运行它
thread::spawn(move || {
for item in self {
if sender.send(item).is_err() {
break;
}
}
});
// 返回一个从通道中拉取值的迭代器
receiver.into_iter()
}
}
此代码中的 where 子句是通过类似于 11.5 节描述的一个流程确定的。起初,我们只有如下内容:
impl<T> OffThreadExt for T
也就是说,我们希望此实现适用于所有迭代器。而 Rust 说不行。因为要用 spawn 将 T 类型的迭代器转移给新线程,所以必须指定 T: Iterator + Send + ‘static。因为要用通道发回条目,所以必 须指定 T::Item: Send + ‘static。做完这些改动,Rust 很满意。 简而言之,这就是 Rust 的特征:我们可以自由地为该语言中的几乎每个迭代器添加一个提供并发能力的工具,但前提是要理解并用代码说明它在安全使用方面的限制条件。 19.2.7 除管道之外的用法 本节会以管道作为示例,因为管道是使用通道的一种很好、很直白的方式。每个人都能理解它们。管道是具体、实用且具有确定性的。不过,通道不仅仅在管道中有用,它们也是向同一进程中的其他线程提供异步服务的快捷且简便的方法。 假设你想在自己的线程上进行日志记录,如图 19-8 所示。其他线程可以通过通道将日志消息发送到日志线程。由于你可以克隆通道的 Sender,因此许多客户端线程可以具有向同一个日志记录线程发送日志消息的发送器。 在独立线程上运行诸如记录日志之类的服务有一些优势。日志记录线程可以在需要时轮换日志文件。它不必与其他线程进行任何花哨的协调。这些线程也不会被阻塞。消息可以在通道中无害地累积片刻,直到日志线程恢复工作。 通道也可用于一个线程向另一个线程发送请求并要求返回某种响应的 情况。第一个线程的请求可以是一个结构体或元组,包含一个 Sender,这个 Sender 是第二个线程用来发送其回复的一种回邮信封。但这并不意味着这种交互必须是同步的。第一个线程可以自行决定是阻塞并等待响应,还是使用 .try_recv() 方法轮询结果。 迄今为止,我们介绍了用于高度并行计算的分叉与合并和用于松散连接组件的通道,这两种工具已经足以应对大部分应用程序。但本章内容还未结束,请接着往下看。
19.3 共享可变状态
自从你在第 8 章发布了 fern_sim crate,在之后的几个月里,你的蕨类植物模拟软件真的“火”了。现在你正在创建一个多人即时战略游戏,其中 8 名玩家在模拟的侏罗纪景观中竞相种植大部分真实的同时代蕨类植物。该游戏的服务器是一个大规模并行应用程序,要处理从很多线程涌入的大量请求。一旦有了 8 名待加入的玩家,这些线程该如何相互协调以开始游戏呢? 这里要解决的问题是,很多线程需要访问待加入游戏玩家的共享列表。这个数据必然是可变的,并且会在所有线程之间共享。如果 Rust 不支持共享的可变状态,那我们还能用它做些什么呢? 可以通过创建一个新线程来解决这个问题,该线程的全部工作就是管理这个列表。其他线程将通过通道与它通信。当然,这会多占用一个线程,从而产生一些操作系统开销。 另一种选择是使用 Rust 提供的工具来安全地共享可变数据。这种工具确实存在。它们就是任何使用过线程的系统程序员都熟悉的底层原语。本节将介绍互斥锁、读 / 写锁、条件变量和原子化整数。 后,我们将展示如何在 Rust 中实现全局可变变量。
19.3.1 什么是互斥锁
互斥锁(mutex)或锁(lock)用于强制多个线程在访问某些数据时轮流读写。19.3.2 节会介绍 Rust 的互斥锁。先回顾一下互斥锁在其他语言中的用法是有好处的。下面是互斥锁在 C++ 中的简单用法:
// C++代码,不是Rust代码
void FernEmpireApp::JoinWaitingList(PlayerId player) { mutex.Acquire();
waitingList.push_back(player);
// 如果有了足够的待进入玩家,就开始游戏
if (waitingList.size() >= GAME_SIZE) { vector<PlayerId> players;
waitingList.swap(players);
StartGame(players);
}
mutex.Release();
}
调用 mutex.Acquire() 和 mutex.Release() 会标记出此代码中临界区的开始和结束。对于程序中的每个 mutex,一次只能有一个 线程在临界区内运行。如果临界区中有一个线程,那么所有调用 mutex.Acquire() 的其他线程都将被阻塞,直到第一个线程到达 mutex.Release()。 我们说互斥锁能保护数据,在这个例子中就是 mutex 会保护 waitingList。不过,程序员有责任确保每个线程总是在访问数据之前获取互斥锁,并在之后释放它。 互斥锁很有用,原因如下。 它们可以防止数据竞争,即多个竞争线程同时读取和写入同一内存的情况。数据竞争是 C++ 和 Go 中的未定义行为。Java、C# 等托管语言承诺不会崩溃,但发生数据竞争时,产出的结果仍然没有意义。 即使不存在数据竞争,并且所有读取和写入在程序中都是按顺序一个接一个地发生,如果没有互斥锁,不同线程的操作也可能会以任意方式相互交错。想象一下,如何写出即使在运行期被其他线程修改了数据也能照常工作的代码。再想象一下你试图调试这个程序。那简直就像程序在“闹鬼”。 互斥锁支持使用不变条件进行编程,在初始化设置时,那些关于受保护数据的规则在刚构造出来时就是成立的,并且会让每个临界区负责维护这些规则。 当然,所有这些实际上都基于同一个原因:不受控的竞态条件会让编程变得非常棘手。互斥锁给混乱带来了一些秩序,尽管不如通道或分叉与合并那么有序。 然而,在大多数语言中,互斥锁很容易搞砸。例如,在 C++ 中,数据和锁是彼此独立的对象。理想情况下,可以通过注释来解释每个线程必须在接触数据之前获取互斥锁:
class FernEmpireApp {
... private:
// 等待加入游戏的玩家列表。通过`mutex`来保护
vector<PlayerId> waitingList;
// 请在读写`waitingList`之前获取互斥锁
Mutex mutex;
... };
但即使有这么好的注释,编译器也无法在此处强制执行安全访问。当一段代码忘了获取互斥锁时,就会得到未定义行为。现实中,这意味着极难重现和修复的 bug。 虽然在 Java 中对象和互斥锁之间存在某种概念上的关联,但这种关联也不是很紧密。编译器不会试图强制执行这种关联,实际上,受锁保护的数据在大多数时候不仅仅是相关对象中的几个字段,而是经常包含分布于多个对象中的数据。“锁定”方案依旧很棘手。注释仍然是执行这种关联的主要工具。
19.3.2 Mutex
现在我们将展示在 Rust 中如何实现等待列表。在我们的蕨类帝国游戏服务器中,每个玩家都有一个唯一的 ID:
type PlayerId = u32;
等待列表只是玩家的集合:
const GAME_SIZE: usize = 8;
/// 等候列表永远不会超过GAME_SIZE个玩家 type WaitingList = Vec<PlayerId>;
等待列表会被存储为 FernEmpireApp 中的一个字段,这是在服务器 启动期间在 Arc 中设置的一个单例。每个线程都有一个 Arc 指向它。它包含我们程序中所需的全部共享配置和其他“零件”,其中大 部分是只读的。由于等待列表既是共享的又是可变的,因此必须由 Mutex 提供保护:
use std::sync::Mutex;
/// 所有线程都可以共享对这个大型上下文结构体的访问
struct FernEmpireApp {
waiting_list: Mutex<WaitingList>,
}
与 C++ 不同,在 Rust 中,受保护的数据存储于 Mutex 内部。建立此 Mutex 的代码如下所示:
use std::sync::Arc;
let app = Arc::new(FernEmpireApp {
waiting_list: Mutex::new(vec![]),
});
创建新的 Mutex 看起来就像创建新的 Box 或 Arc,但是 Box 和 Arc 意味着堆分配,而 Mutex 仅与锁操作有关。如果希望在堆中分配 Mutex,则必须明确写出来,就像这里所做的这样:对整个应用程序使用 Arc::new,而仅对受保护的数据使用 Mutex::new。这两个类型经常一起使用,Arc 用于跨线程共享数据,而 Mutex 用于跨线程共享的可变数据。 现在可以实现使用互斥锁的 join_waiting_list 方法了:
impl FernEmpireApp {
/// 往下一个游戏的等候列表中添加一个玩家。如果有足够
/// 的待进入玩家,则立即启动一个新游戏
fn join_waiting_list(&self, player: PlayerId) {
// 锁定互斥锁,并授予内部数据的访问权。`guard`的作用域是一个临界区
let mut guard = self.waiting_list.lock().unwrap();
// 现在开始执行游戏逻辑
guard.push(player);
if guard.len() == GAME_SIZE {
let players = guard.split_off(0);
self.start_game(players);
}
}
}
获取数据的唯一方法就是调用 .lock() 方法:
let mut guard = self.waiting_list.lock().unwrap();
self.waiting_list.lock() 会阻塞,直到获得互斥锁。这个方法调用所返回的 MutexGuard
guard.push(player);
此守卫甚至允许我们借用对底层数据的直接引用。Rust 的生命周期体系会确保这些引用的生命周期不会超出守卫本身。如果不持有锁,就无法访问 Mutex 中的数据。 当 guard 被丢弃时,锁就被释放了。这通常会发生在块的末尾,但也可以手动丢弃。
if guard.len() == GAME_SIZE {
let players = guard.split_off(0);
drop(guard); // 启动游戏时就不必锁定列表了 self.start_game(players);
}
19.3.3 mut 与互斥锁
join_waiting_list 方法并没有通过可变引用获取 self,这可能看起来很奇怪,至少初看上去是这样。它的类型签名如下所示:
fn join_waiting_list(&self, player: PlayerId)
当你调用底层集合 Vec
pub fn push(&mut self, item: T)
然而这段代码不仅能编译而且运行良好。这是怎么回事? 在 Rust 中,&mut 表示独占访问。普通 & 表示共享访问。 我们习惯于把 &mut 访问从父级传到子级,从容器传到内容。只有一开始你就拥有对 starships 的 &mut 引用 [ 或者你也可能拥有这些 starships(星舰)?如果是这样,那么……恭喜你成了埃隆 • 马斯克。],才可以在 starships[id].engine 上调用 &mut self 方法。这是默认设置,因为如果没有对父项的独占访问权,那么 Rust 通常无法确保你对子项拥有独占访问权。 但是 Mutex 有办法确保这一点:锁。事实上,互斥锁只不过是提供 对内部数据的独占(mut)访问的一种方法,即使有许多线程也在共享 (非 mut)访问 Mutex 本身时,也能确保一切正常。 Rust 的类型系统会告诉我们 Mutex 在做什么。它在动态地强制执行独占访问,而这通常是由 Rust 编译器在编译期间静态完成的。 (你可能还记得 std::cell::RefCell 也是这么做的,但它没有试图支持多线程。Mutex 和 RefCell 是内部可变性的两种形式,详情请参见 9.11 节。)
19.3.4 为什么互斥锁不是“银弹”
在开始使用互斥锁之前,我们就介绍了一些并发方式,如果你是 C++ 用户,那么这些方法可能看起来非常容易正确使用。这并非巧合,因为这些方法本来就是为了给并发编程中 令人困惑的方面提供强有力的保证。专门使用分叉与合并并行的程序具有确定性,不会死锁。使用通道的程序几乎同样表现良好。那些专门供管道使用的通道(比如我们的索引构建器)也具有确定性:虽然消息传递的时机可能有所不同,但不会影响输出。这些关于多线程编程的保证都很好。 Rust 的 Mutex 设计几乎肯定会让你比以往任何时候都更系统、更明智地使用互斥锁。但也值得停下来思考一下 Rust 的安全保证可以帮你做什么,不能帮你做什么。 安全的 Rust 代码不会引发数据竞争,这是一种特定类型的 bug,其中多个线程会同时读写同一内存,并产生无意义的结果。这很好,因为数据竞争总会出 bug,这在真正的多线程程序中并不罕见。 但是,使用互斥锁的线程会遇到 Rust 无法为你修复的另一些问题。 有效的 Rust 程序不会有数据竞争,但仍然可能有其他竞态条件 ——程序的行为取决于各线程之间的运行时间长短,因此可能每次运行时都不一样。有些竞态条件是良性的,有些则表现为普遍的不稳定性和难以修复的 bug。以非结构化方式使用互斥锁会引发竞态条件。你需要确保竞态条件是良性的。 共享可变状态也会影响程序设计。通道作为代码中的抽象边界,可以轻松地拆出彼此隔离的组件以进行测试,而互斥锁则会鼓励一种“只要再添加一个方法就行了”的工作方式,这可能会导致彼此有联系的代码耦合成一个单体。 后,互斥锁也并不像 初看起来那么简单,接下来的 19.3.5 节和 19.3.6 节会详解介绍。 所有这些问题都是工具本身所固有的。要尽可能使用更结构化的方法,只在必要时使用 Mutex。
19.3.5 死锁
线程在尝试获取自己正持有的锁时会让自己陷入死锁:
let mut guard1 = self.waiting_list.lock().unwrap();
let mut guard2 = self.waiting_list.lock().unwrap(); // 死锁
假设第一次调用 self.waiting_list.lock() 成功,获得了锁。第二次调用时看到锁已被持有,所以线程就会阻塞自己,等待锁被释放。它会永远等下去,因为这个正等待的线程就是持有锁的线程。 换而言之,Mutex 中的锁并不是递归锁。 这里的 bug 是显而易见的。但在实际程序中,这两个 lock() 调用可能位于两个不同的方法中,其中一个会调用另一个。单独来看,每个方法的代码看起来都没什么问题。还有其他方式可以导致死锁,比如涉及多个线程或每个线程同时获取多个互斥锁。Rust 的借用系统不能保护你免于死锁。 好的保护是保持临界区尽可能小:进入,开始工作,完成后马上离开。 通道也有可能陷入死锁。例如,两个线程可能会互相阻塞,每个线程都在等待从另一个线程接收消息。然而,再次强调,良好的程序设计可以让你确信这在实践中不会发生。在管道中,就像我们的倒排索引构建器一样,数据流是非循环的。与 Unix shell 管道一样,这样的程序不可能发生死锁。
19.3.6 “中毒”的互斥锁
Mutex::lock() 返回 Result 的原因与 JoinHandle::join() 是一样的:如果另一个线程发生 panic,则可以优雅地失败。当我们编写 handle.join().unwrap() 时,就是在告诉 Rust 将 panic 从一个线程传播到另一个线程。mutex.lock().unwrap() 惯用法同样如此。 如果线程在持有 Mutex 期间出现 panic,则 Rust 会把 Mutex 标记为已“中毒”。之后每当试图锁住已“中毒”的 Mutex 时都会得到错误结果。如果发生这种情况,我们的 .unwrap() 调用就会告诉 Rust 发生了 panic,将 panic 从另一个线程传播到本线程。 “中毒”的互斥锁有多糟糕?中毒听起来很致命,但在这个场景中并不一定致命。正如我们在第 7 章中所说,panic 是安全的。一个发生了 panic 的线程能让程序的其余部分仍然留在安全状态。这样看来,互斥锁因 panic 而“中毒”的原因并非害怕出现未定义行为。相反,它真正的关注点在于你编程时一直在维护不变条件。由于你的程序在未完成其正在执行的操作的情况下发生 panic 并退出临界区,可能更新了受保护数据的某些字段但未更新其他字段,因此不变条件现在有可能已经被破坏了。于是 Rust 决定让这个互斥锁“中毒”,以防止其他线程无意中误入这种已破坏的场景并让情况变得更糟。你仍然可以锁定已“中毒”的互斥锁并访问其中的数据,完全强制运行互斥代码。具体请参阅 PoisonError::into_inner() 的文档。但你肯定不会希望这发生在自己的意料之外。
19.3.7 使用互斥锁的多消费者通道
我们之前提到过,Rust 的通道是多生产者、单一消费者。或者更具体地说,一个通道只能有一个 Receiver。如果有一个线程池,则不能让其中的多个线程使用单个 mpsc 通道作为共享工作列表。 其实有一种非常简单的解决方法,只要使用标准库的一点点“能力” 就可以。可以在 Receiver 周围包装一个 Mutex 然后再共享。下面就是这样做的一个模块:
pub mod shared_channel { use std::sync::{Arc, Mutex}; use std::sync::mpsc::{channel, Sender, Receiver};
/// 对`Receiver`的线程安全的包装
#[derive(Clone)]
pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);
impl<T> Iterator for SharedReceiver<T> { type Item = T;
/// 从已包装的接收者中获取下一个条目
fn next(&mut self) -> Option<T> { let guard = self.0.lock().unwrap(); guard.recv().ok()
}
}
/// 创建一个新通道,它的接收者可以跨线程共享。这会返回一个发送者和一个
/// 接收者,就像标准库的 `channel()`,有时可以作为无缝替代品使用
pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
let (sender, receiver) = channel();
(sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
}
}
我们正在使用 Arc

图 19-11:如何阅读复杂类型
19.3.8 读/写锁(RwLock)
介绍完互斥锁,下面来看一下 std::sync 中提供的其他工具:Rust 标准库的线程同步工具包。我们将快速介绍,因为对这些工具的完整讨论超出了本书的范畴。 服务器程序通常都有一些只加载一次且很少更改的配置信息。大多数线程只会查询此配置,但由于配置可以更改(例如,可能要求服务器从磁盘重新加载其配置),所以无论如何都必须用锁进行保护。在这种情况下,可以使用互斥锁,但它会形成不必要的瓶颈。如果配置没有改变,那么各个线程就不应该轮流查询配置。这时就可以使用读 / 写锁或 RwLock。 互斥锁只有一个 lock 方法,而读 / 写锁有两个,即 read 和 write。RwLock::write 方法类似于 Mutex::lock。它会等待对 受保护数据的独占的 mut 访问。RwLock::read 方法提供了非 mut 访问,它的优点是可能不怎么需要等待,因为本就可以让许多线程同时安全地读取。使用互斥锁,在任何给定时刻,受保护的数据都只有一个读取者或写入者(或两者都没有)。使用读 / 写锁,则可以有一个写入者或多个读取者,就像一般的 Rust 引用一样。 FernEmpireApp 可能有一个用作配置的结构体,由 RwLock 提供保护:
use std::sync::RwLock;
struct FernEmpireApp {
config: RwLock<AppConfig>,
}
读取配置的方法会使用 RwLock::read():
/// 如果应该使用试验性的真菌代码,则为True
fn mushrooms_enabled(&self) -> bool {
let config_guard = self.config.read().unwrap();
config_guard.mushrooms_enabled
}
重新加载配置的方法就要使用 RwLock::write():
fn reload_config(&self) -> io::Result<()> {
let new_config = AppConfig::load()?;
let mut config_guard = self.config.write().unwrap();
*config_guard = new_config;
Ok(())
}
当然,Rust 特别适合在 RwLock 数据上执行安全规则。单写者或多读者的概念是 Rust 借用体系的核心。self.config.read() 会返回一个守卫,以提供对 AppConfig 的非 mut(共享)访问。 self.config.write() 会返回另一种类型的守卫,以提供 mut (独占)访问。
19.3.9 条件变量(Condvar)
通常线程需要一直等到某个条件变为真。 在关闭服务器的过程中,主线程可能需要等到所有其他线程都完成后才能退出。 当工作线程无事可做时,需要一直等待,直到有数据需要处理为止。 实现分布式共识协议的线程可能要等到一定数量的对等点给出响应为止。 有时,对于我们想要等待的确切条件,会有一个方便的阻塞式 API,比如服务器关闭示例中的 JoinHandle::join。其他情况下,则没有内置的阻塞式 API。程序可以使用条件变量来构建自己的 API。在 Rust 中,std::sync::Condvar 类型实现了条件变量。Condvar 中有方法 .wait() 和 .notify_all(),其中 .wait() 会阻塞线程,直到其他线程调用了 .notify_all()。 但条件变量的用途不止于此,因为说到底条件变量是关于受特定 Mutex 保护的某些数据的特定“真或假”条件。因此,Mutex 和 Condvar 是相关的。对条件变量的完整解释超出了本书的范畴,但为了让曾使用过条件变量的程序员更容易理解,我们将展示代码的两个关键部分。 当所需条件变为真时,就调用 Condvar::notify_all(或 notify_one)来唤醒所有等待的线程:
self.has_data_condvar.notify_all();
要进入睡眠状态并等待条件变为真,可以使用 Condvar::wait():
while !guard.has_data() {
guard = self.has_data_condvar.wait(guard).unwrap();
}
这个 while 循环是条件变量的标准用法。然而,Condvar::wait 的签名非比寻常。它会按值获取 MutexGuard 对象,消耗它,并在成功时返回新的 MutexGuard。这种签名给我们的直观感觉是 wait 方法会释放互斥锁并在返回之前重新获取它。按值传递 MutexGuard 要表达的意思是“我授予你通过 .wait() 方法释放互斥锁的独占权限。”
19.3.10 原子化类型
std::sync::atomic 模块包含用于无锁并发编程的原子化类型。这些类型与标准 C++ 原子化类型基本相同,但也有一些独特之处。
AtomicIsize 和 AtomicUsize 是与单线程 isize 类型和 usize 类型对应的共享整数类型。
AtomicI8、AtomicI16、AtomicI32、AtomicI64 及其无符号变体(如 AtomicU8)是共享整数类型,对应于单线程中的类型 i8、i16 等。
AtomicBool 是一个共享的 bool 值。
AtomicPtr
use std::sync::atomic::{AtomicIsize, Ordering};
let atom = AtomicIsize::new(0);
atom.fetch_add(1, Ordering::SeqCst);
这些方法可以编译成专门的机器语言指令。在 x86-64 架构上,这个 .fetch_add() 调用会编译为 lock incq 指令,而普通 n += 1 可以编译为简单的 incq 指令或其他各种与此相关的变体。Rust 编译器还必须放弃围绕原子化操作的一些优化,因为与正常的加载或存储不同,它可以立即合法地影响其他线程或被其他线程影响。 参数 Ordering::SeqCst 是指内存排序。内存排序类似于数据库中的事务隔离级别。它们告诉系统,相对于性能,你有多关心诸如对因果性的影响和不存在时间循环之类的哲学概念。内存排序对于程序的正确性至关重要,而且很难进行理解和推理。不过令人高兴的是,选择顺序一致性( 严格的内存排序类型)的性能损失通常很低,与将 SQL 数据库置于 SERIALIZABLE 模式时的性能损失截然不同。因 此,只要拿不准,就尽情使用 Ordering::SeqCst 吧。Rust 从标准 C++ 原子化机制继承了另外几种内存排序,分别对存续性和因果性提供了几种保证。我们就不在这里讨论它们了。 原子化的一个简单用途是中途取消。假设有一个线程正在执行一些长时间运行的计算(如渲染视频),我们希望能异步取消它。问题在于如何与希望关闭的线程进行通信。可以通过共享的 AtomicBool 来做到这一点:
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
let cancel_flag = Arc::new(AtomicBool::new(false));
let worker_cancel_flag = cancel_flag.clone();
上述代码会创建两个 Arc
use std::sync::atomic::Ordering;
use std::thread;
let worker_handle = thread::spawn(move || {
for pixel in animation.pixels_mut() {
render(pixel); // 光线跟踪,需要花几微秒时间
if worker_cancel_flag.load(Ordering::SeqCst) {
return None;
}
}
Some(animation)
});
渲染完每个像素后,线程会通过调用其 .load() 方法检查标志的值:
worker_cancel_flag.load(Ordering::SeqCst)
如果决定在主线程中取消工作线程,可以将 true 存储在 AtomicBool 中,然后等待线程退出:
// 取消渲染
cancel_flag.store(true, Ordering::SeqCst);
// 放弃结果,该结果有可能是`None` worker_handle.join().unwrap();
当然,还有其他实现方法。此处的 AtomicBool 可以替换为 Mutex
19.3.11 全局变量
假设我们正在编写网络代码。我们想要一个全局变量,即一个每当发出数据包时都会递增的计数器:
/// 服务器已成功处理的数据包的数量
static PACKETS_SERVED: usize = 0;
这可以正常编译,但有一个问题:PACKETS_SERVED 是不可变的,所以我们永远都不能改变它。 Rust 会尽其所能阻止全局可变状态。用 const 声明的常量当然是不 可变的。默认情况下,静态变量也是不可变的,因此无法获得一个 mut 引用。static 固然可以声明为 mut,但访问它是不安全的。所有这些规则的制定,出发点都是 Rust 对线程安全的坚持。 全局可变状态也有不幸的软件工程后果:它往往使程序的各个部分更紧密耦合,更难测试,以后更难更改。尽管如此,在某些情况下并没有合理的替代,所以 好找到一种安全的方法来声明可变静态变量。 支持递增 PACKETS_SERVED 并保持其线程安全的 简单方式是让它变成原子化整数:
use std::sync::atomic::AtomicUsize;
static PACKETS_SERVED: AtomicUsize = AtomicUsize::new(0);
一旦声明了这个静态变量,增加数据包计数就很简单了:
use std::sync::atomic::Ordering;
PACKETS_SERVED.fetch_add(1, Ordering::SeqCst);
原子化全局变量仅限于简单的整数和布尔值。不过,要创建任何其他类型的全局变量,就要解决以下两个问题。 首先,变量必须以某种方式成为线程安全的,否则它就不能是全局变量:为了安全起见,静态变量必须同时是 Sync 和非 mut 的。幸运的是,我们已经看到了这个问题的解决方案。Rust 具有用于安全地共享变化的值的类型:Mutex、RwLock 和原子化类型。即使声明为非 mut,也可以修改这些类型。这就是它们的用途。(参见 19.3.3 节。) 其次,静态初始化程序只能调用被专门标记为 const 的函数,编译器可以在编译期间对其进行求值。换句话说,它们的输出是确定性的,这些输出只会取决于它们的参数,而不取决于任何其他状态或 I/O。这样,编译器就可以将计算结果作为编译期常量嵌入了。这类似于 C++ 的 constexpr。 Atomic 类型(AtomicUsize、AtomicBool 等)的构造函数都是 const 函数,这使我们能够更早地创建 static AtomicUsize。一些其他类型,比如 String、Ipv4Addr 和 Ipv6Addr,同样有简单的 const 构造函数。 还可以直接在函数的签名前加上 const 来定义自己的 const 函数。Rust 将 const 函数可以做的事情限制在一小部分操作上,这些操作足够有用,同时仍然不会带来任何不确定的结果。const 函数不能以类型而只能以生命周期作为泛型参数,并且不能分配内存或对裸指针进行操作,即使在 unsafe 的块中也是如此。但是,我们可以使用算术运算[包括回绕型算术(wrapping arithmetic)和饱和型算术 (saturating arithmetic)]、非短路逻辑运算和其他 const 函 数。例如,可以创建便捷函数来更轻松地定义 static 和 const 并减少代码重复:
const fn mono_to_rgba(level: u8) -> Color {
Color {
red: level,
green: level,
blue: level,
alpha: 0xFF,
}
}
const WHITE: Color = mono_to_rgba(255);
const BLACK: Color = mono_to_rgba(000);
结合这些技术,我们可能会试着像下面这样写:
static HOSTNAME: Mutex<String> = Mutex::new(String::new()); // 错误:静态调用仅限于常量函数、常量元组、
// 常量结构体和常量元组变体
不过很遗憾,虽然 AtomicUsize::new() 和 String::new() 是 const fn,但 Mutex::new() 不是2。为了绕过这些限制,需要使用 lazy_static crate。
2
在 Rust 1.63 及以上版本中,Mutex::new() 已经是 const fn 了。——译者注 我们在 17.5.2 节介绍过 lazy_static crate。通过 lazy_static! 宏定义的变量允许你使用任何喜欢的表达式进行初始化,该表达式会在第一次解引用变量时运行,并保存该值以供后续操作使用。 可以像下面这样使用 lazy_static 声明一个全局 Mutex 控制的 HashMap:
use lazy_static::lazy_static;
use std::sync::Mutex;
lazy_static! {
static ref HOSTNAME: Mutex<String> = Mutex::new(String::new());
}
同样的技术也适用于其他复杂的数据结构,比如 HashMap 和 Deque。对于根本不可变、只是需要进行非平凡初始化3的静态变量,它也非常方便。
“非平凡初始化”是指初始化一个变量或对象时,需要进行一些复杂的计算或操作,而不是简单的赋值或默认初始化。在计算机编程中,有些变量或对象的初始化需要进行一些计算,比如读取配置文件、解析命令行参数、连接数据库等,这些初始化操作都可以称为“非平凡初始化”。与之相反,简单的赋值操作或者默认初始化可以称为“平凡初始化”。—— 译者注 使用 lazy_static! 会在每次访问静态数据时产生很小的性能成 本。该实现使用了 std::sync::Once,这是一种专为一次性初始化而设计的底层同步原语。在幕后,每次访问惰性静态数据时,程序都会执行原子化加载指令以检查初始化是否已然发生。(Once 有比较特殊的用途,这里不做详细介绍。通常使用 lazy_static! 更方便。但是,std::sync::Once 对于初始化非 Rust 库很有用,有关示例,请参阅 23.5 节。)
19.4 在 Rust 中编写并发代码的一点儿经验
本章介绍了在 Rust 中使用线程的 3 种技术:分叉与合并并行、通道和带锁的共享可变状态。我们的目标是好好介绍一下 Rust 提供的这些“零件”,重点在于如何将它们组合到实际程序中。 Rust 坚持安全性,因此从你决定编写多线程程序的那一刻起,重点就是构建安全、结构化的通信。保持线程近乎处于隔离态可以让 Rust 相信你的代码正在做的事是安全的。恰好,隔离也是确保你的代码正确且可维护的好办法。同样,Rust 也会引导你开发优秀的程序。 更重要的是,Rust 能让你组合多种技术并进行实验。你可以快速迭代。换句话说,“在编译器的督促下知错就改”肯定比“等出了问题后再调试数据竞争”能更快地开工并正确运行。