假设你要编写一个聊天服务器。对于每个网络连接,都会有一些要解析的传入数据包、要组装的传出数据包、要管理的安全参数、要跟踪的聊天组订阅等。要想同时管理这么多连接,就得进行一定的组织工作。 理论上,可以为传入的每个连接启动一个单独的线程:
use std::{net, thread};
let listener = net::TcpListener::bind(address)?;
for socket_result in listener.incoming() {
let socket = socket_result?;
let groups = chat_group_table.clone();
thread::spawn(|| {
log_error(serve(socket, groups));
});
}
对于每个新连接,这都会启动一个运行 serve 函数的新线程,此线程专注于管理单个连接所需的一切。 这确实很好,好得远远超出了预期,直到有一天突然涌入了数万个用户。每个线程都拥有 100 KiB 以上的栈,这很常见,但这可不是你花费数 GB 服务器内存的理由。如果要在多个处理器之间分配工作,那么线程固然好用,而且确有必要。但现在它们的内存需求已经太大了,所以通常在使用线程的同时,还要用一些补充手段来完成这些工作。 可以使用 Rust 异步任务在单个线程或工作线程池中交替执行许多彼此独立的活动。异步任务类似于线程,但其创建速度更快,在它们之间可以更有效地传递控制权,并且其内存开销比线程少一个数量级。在单个程序中同时运行数十万个异步任务是完全可行的。当然,你的应用程序可能仍会受到其他因素的制约,比如网络带宽、数据库速度、算力,或此工作的固有内存需求,但与线程的开销相比,这些异步任务的固有内存开销只是九牛一毛。 一般来说,异步 Rust 代码看上去很像普通的多线程代码,但实际上那些可能导致阻塞的操作(如 I/O 或获取互斥锁)会以略有不同的方式处理。通过对这些操作进行特殊处理,Rust 能够获得关于这段代码行为的更多信息以辅助优化,这就是它能提高性能的原因。前面代码的异步版本如下所示:
use async_std::{net, task};
let listener = net::TcpListener::bind(address).await?;
let mut new_connections = listener.incoming();
while let Some(socket_result) = new_connections.next().await {
let socket = socket_result?;
let groups = chat_group_table.clone();
task::spawn(async {
log_error(serve(socket, groups).await);
});
}
这里用的是 async_std 这个 crate 的网络模块和任务模块,并在可能发生阻塞的调用之后添加了 .await。但这段代码的整体结构与基于线程的版本无异。 本章的目标不仅是帮你编写异步代码,还要尽可能详细地展示它的工作原理,以便你可以预知如何在应用程序中执行异步代码以及把它用在哪里 能发挥出其价值。 为了展示异步编程的机制,我们会列举一组涵盖所有核心概念的小语言特性集:Future(未来值)、异步函数、await 表达式、任务以及 block_on 执行器和 spawn_local 执行器。 然后,我们会介绍异步块和 spawn 执行器。这些在实际工作中非常重要,但从概念上讲,它们只是刚才提过的那些特性的变体。在此过程中,我们会指出你可能会遇到的一些异步编程特有的问题,并解释该如何处理这些问题。 为了展示所有这些“零件”是如何协同工作的,我们还会浏览一遍聊天服务器和客户端的完整代码,前面的代码片段只是其中的一部分。 为了说明原生 Future 和执行器的工作原理,我们会展示 spawn_blocking 和 block_on 的简单而实用的实现。 后,我们会解释 Pin 类型,该类型在异步接口中会不时出现,以保证异步函数和异步式 Future 的安全使用。
20.1 从同步到异步
考虑调用以下(非异步,而是完全传统的)函数时会发生什么:
use std::io::prelude::*;
use std::net;
fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
let mut socket = net::TcpStream::connect((host, port))?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes())?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response)?;
Ok(response)
}
这段代码会打开到 Web 服务器的 TCP 连接,以过时的协议向其发送一个极简的 HTTP 请求[ 如果确实需要 HTTP 客户端,请考虑使用像 surf 或 request 这样的众多优秀的 crate 中的任何一个,它们可以正确且异步地完成工作。这里的客户端最多只能得到 HTTPS 重定向的返回。],然后读取其响应。图 20-1 展示了随着时间推移这个函数的执行情况。

图 20-1:同步 HTTP 请求的进度(深灰色区域表示正在等待操作系统) 图 20-1 展示了当时间从左到右流逝时,函数的调用栈的情况。每个 函数调用都是一个方框,叠放在其调用者上方。显然, cheapo_request 函数贯穿了整个执行过程。它会调用 Rust 标准库中的函数(如 TcpStream::connect)以及由 TcpStream 实现的 write_all 和 read_to_string 这两个特型方法。它们又会依次调用其他函数,但此程序最终会进行一些系统调用,请求操作系统实际完成某些操作,比如打开 TCP 连接,读取或写入一些数据。 深灰色背景表示程序正在等待操作系统完成系统调用的时间。我们没有按比例绘制这些时间。如果按比例绘制,则整张图都会变成深灰色:事实上,这个函数在几乎所有时间里都在等待操作系统。而前面代码的执行时间是系统调用之间的小窄条。 当这个函数正在等待系统调用返回时,它的单个线程是阻塞的,也就是说,在系统调用完成之前,它不能做任何其他事情。一个线程的栈大小有数十或数百 KB 的情况并不罕见,因此如果这是某个更大系统中的一小部分,那么就会有许多线程在同时做类似的事情,如果仅仅为了等待而锁定这些线程资源则可能会让开销变得相当高。 为了解决这个问题,就要允许线程在等待系统调用完成期间进行其他工作。但要做到这一点并非易事。例如,我们用来从套接字读取响应的函数签名如下所示:
fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize>;
它直接在类型签名里表明,这个函数在完成工作或出现问题之前不会返回。因此这个函数是同步的:调用者在操作完成后才会继续。如果想在操作系统工作时将此线程用于其他任务,就需要一个新的 I/O 库来提供这个函数的异步版本。
20.1.1 Future
Rust 支持异步操作的方法是引入特型 std::future::Future:
trait Future {
type Output;
// 现在,暂时把`Pin<&mut Self>`当作`&mut Self`
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
Future 代表一个你可以测试其完成情况的操作。Future 的 poll (轮询)方法从来不会等待操作完成,它总是立即返回。如果操作已完成,则 poll 会返回 Poll::Ready(output),其中 output 是它的最终结果。否则,它会返回 Pending。如果 Future 值得再次轮询,它承诺会通过调用 Context 中提供的回调函数 waker 来通知我们。我们将这种实现方式称为异步编程的“皮纳塔[ 皮纳塔(piñata,西班牙语)是一种纸糊的容器,其中装满玩具与糖果,于节庆或生日宴会上悬挂起来,让人用棍棒敲打,一旦打破,玩具和糖果就会掉落下来。——译者注 所有现代操作系统都包含其系统调用的一些变体,我们可以使用它们来实现这种轮询接口。例如,在 Unix 和 Windows 上,如果将网络套]模型”:对于 Future,你唯一能做的就是通过轮询来“敲打”它,直到某个值 “掉”出来。接字设置为非阻塞模式,那么一旦这些读写发生阻塞,就会返回某种错误。你必须稍后再试。 因此,异步版本的 read_to_string 的签名大致如下所示:
fn read_to_string(&mut self, buf: &mut String) -> impl Future<Output = Result<usize>>;
除了返回类型,这与我们之前展示过的签名基本相同:异步版本会返回携带 Result
fn read_to_string<'a>(
&'a mut self,
buf: &'a mut String,
) -> impl Future<Output = Result<usize>> + 'a;
这增加了生命周期以表明返回的 Future 的生存期只能与 self 和 buf 借用的值一样长。 async-std crate 提供了所有 std 中 I/O 设施的异步版本,包括 带有 read_to_string 方法的异步 Read 特型。async-std 选择紧紧跟随 std 的设计,尽可能在它自己的接口中重用 std 的类型,因此 Error、Result、网络地址和大多数其他相关数据在“两个世界”之间是兼容的。熟悉 std 有助于使用 async-std,反之亦然。 Future 特型的一个规则是,一旦 Future 返回了 Poll::Ready,它就会假定自己永远不会再被轮询(poll)。当某些 Future 被过度轮询时,它们只会永远返回 Poll::Pending,而其他 Future 则可能会 panic 或被挂起。(但是,它们绝不会违反内存安全或线程安全规则,或以其他方式导致未定义行为。)Future 特型上的 fuse 适配器方法能把任何 Future 变成被过度轮询时总会返回 Poll::Pending 的 Future。但所有常用的 Future 消耗方式都会遵守这一规则,因此通常不必动用 fuse。 完全没必要一听到轮询就觉得效率低下。Rust 的异步架构是经过精心设计的,只要你正确实现了基本的 I/O 函数(如 read_to_string),就只会在值得尝试时才轮询 Future。每当调用 poll 时,必然有某个地方的某些代码返回了 Ready,或者至少朝着那个目标前进了一步。20.3 节会对此工作原理进行解释。 但使用 Future 似乎很具挑战性:当轮询时,如果得到了 Poll::Pending,应该做些什么呢?你必须四处寻找这个线程暂时可以做的其他工作,还不能忘记稍后回到这个 Future 并再次轮询它。 整个程序将充斥着辅助性代码,以跟踪谁在等待处理以及一旦就绪应该做些什么之类的事情。cheapo_request 函数的简单性被破坏了。 好消息是,你大可不必这样做。
20.1.2 异步函数与 await 表达式
下面是一个写成异步函数的 cheapo_request 版本:
use async_std::io::prelude::*;
use async_std::net;
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
let mut socket = net::TcpStream::connect((host, port)).await?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes()).await?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response).await?;
Ok(response)
}
除了以下几点,这段程序跟我们的原始版本几乎是每个字母都一样。 函数以 async fn 而不是 fn 开头。使用 async_std crate 的异步版本的 TcpStream::connect、write_all 和 read_to_string。 这些都会返回其结果的 Future。(本节中的示例使用了 async_std 的 1.7 版。) 每次返回 Future 的调用之后,代码都会 .await。虽然这看起来像是在引用结构体中名为 await 的字段,但它实际上是语言中内置的特殊语法,用于等待 Future 就绪。await 表达式的计算结果为 Future 的最终值。这就是函数从 connect、 write_all 和 read_to_string 获取结果的方式。 与普通函数不同,当你调用异步函数时,它会在函数体开始执行之前立即返回。显然,调用的最终返回值还没有计算出来,你得到的只是承载它最终值的 Future。所以如果执行下面这段代码:
let response = cheapo_request(host, port, path);
那么 response 将是 std::io::Result
{
// 注意:这是伪代码,不是有效的Rust
let connect_future = TcpStream::connect(...);
'retry_point:
match connect_future.poll(cx) {
Poll::Ready(value) => value,
Poll::Pending => {
// 安排对`cheapo_request`返回的Future进行 // 下一次`poll`,以便在'retry_point处恢复执行
... return Poll::Pending;
}
}
}
await 表达式会获取 Future 的所有权,然后轮询它。如果已就绪,那么 Future 的最终值就是 await 表达式的值,然后继续执行。否则,此 Future 返回 Poll::Pending。 但至关重要的是,下一次对 cheapo_request 返回的 Future 进行轮询时不会再从函数的顶部开始,而是会在即将轮询 connect_future 的中途时间点恢复执行函数。直到 Future 就绪之前,我们都不会继续处理异步函数的其余部分。 随着对其返回的 Future 继续进行轮询,cheapo_request 将通过函数体从一个 await 走到下一个,仅当它等待的子 Future 就绪时才会继续。因此,要对 cheapo_request 返回的 Future 进行多少次轮询,既取决于子 Future 的行为,也取决于该函数自己的控制流。cheapo_request 返回的 Future 会跟踪下一次 poll 应该恢复的点,以及恢复该点所需的所有本地状态,比如变量、参数和临时变量。 在函数中间暂停执行稍后再恢复,这种能力是异步函数所独有的。当一个普通函数返回时,它的栈帧就永远消失了。由于 await 表达式依赖于这种恢复能力,因此只能在异步函数中使用它们。 在撰写本章时,Rust 还不允许特型具有异步方法。只有自由函数以及从属于具体类型的函数才能是异步的。要解除此限制就要对语言进行一些更改。同时,如果确实需要定义包含异步函数的特型,请考虑使用 async-trait crate,它提供了基于宏的解决方案。
20.1.3 从同步代码调用异步函数:block_on
从某种意义上说,异步函数就是在转移责任。的确,在异步函数中很 容易获得 Future 的值:只要使用 await 就可以。但是异步函数本身也会返回 Future,所以现在调用者的工作是以某种方式进行轮询。但最终还是得有人实际等待一个值。 可以使用 async_std 的 task::block_on 函数从普通的同步函数 (如 main)调用 cheapo_request,这会接受一个 Future 并轮询,直到它生成一个值:
fn main() -> std::io::Result<()> {
use async_std::task;
let response = task::block_on(cheapo_request("example.com", 80, "/"))?;
println!("{}", response);
Ok(())
}
由于 block_on 是一个会生成异步函数最终值的同步函数,因此可以将其视为从异步世界到同步世界的适配器。但 block_on 的阻塞式特征意味着我们不应该在异步函数中使用它,因为在值被准备好之前它会一直阻塞整个线程。异步函数中请改用 await。 图 20-2 展示了 main 的一种可能的执行方式。

图 20-2:阻塞异步函数上方的时间线(“简化过的视图”)部分展示了程序异步调用的抽象
视图:cheapo_request 会首先调用 TcpStream::connect 以获得套接字,然后在该套接字上调用 write_all 和 read_to_string。接下来会返回。这与本章前面的 cheapo_request 同步版本的时间线非常相似。
但是其中的每一个异步调用都是一个多步骤的过程:创建一个 Future,然后轮询直到它就绪,也许在这个过程中创建并轮询了其他子 Future。下方的时间线(“实现”)部分展示了实现此异步行为的实际同步调用。这是了解普通异步执行中究竟发生了什么的一个好机会。
首先,main 会调用 cheapo_request,返回其最终结果的 Future A。然后 main 会将此 Future 传给 async_std::block_on,由后者对其进行轮询。
轮询 Future A 让 cheapo_request 的主体开始执行。它会调用 TcpStream::connect 来获取套接字的 Future B,然后对其进行等待。更准确地说,由于 TcpStream::connect 可能会遇到错误,因此 B 其实是 Result
20.1.4 启动异步任务
在 Future 的值就绪之前,async_std::task::block_on 函数 会一直阻塞。但是把线程完全阻塞在单个 Future 上并不比同步调用好:本章的目标是让线程在等待的同时做其他工作。 为此,可以使用 async_std::task::spawn_local。该函数会接受一个 Future 并将其添加到任务池中,只要正阻塞着 block_on 的 Future 还未就绪,就会尝试轮询。因此,如果你将一堆 Future 传给 spawn_local,然后将 block_on 应用于最终结果的 Future,那么 block_on 就会在可以向前推进时轮询每个启动(spawn)后的 Future,并行执行整个任务池,直到你想要的结果就绪。 在撰写本章时,要想在 async-std 中使用 spawn_local,就必须启用该 crate 的 unstable 特性。为此,需要在 Cargo.toml 中使用下面这行代码去引用 async-std:
async-std = { version = "1", features = ["unstable"] }
spawn_local 函数是标准库的 std::thread::spawn 函数的异步模拟,用于启动线程。 std::thread::spawn© 会接受闭包 c 并启动线程来运行它,然后返回 std::thread::JoinHandle,其中 std::thread::JoinHandle 的 join 方法会等待线程完成并返回 c 中返回的任何内容。 async_std::task::spawn_local(f) 会接受 Future f 并将其添加到当前线程在调用 block_on 时要轮询的池中。 spawn_local 会返回自己的 async_std::task::JoinHandle 类型,它本身就是一个 Future,你可以等待(await)它以获取 f 的最终值。假设我们想同时发出一整套 HTTP 请求。下面是第一次尝试:
pub async fn many_requests(
requests: Vec<(String, u16, String)>,
) -> Vec<std::io::Result<String>> {
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(task::spawn_local(cheapo_request(&host, port, &path)));
}
let mut results = vec![];
for handle in handles {
results.push(handle.await);
}
results
}
该函数会在 requests 的每个元素上调用 cheapo_request,并将每个调用返回的 Future 传给 spawn_local。该函数还会将生成的 JoinHandle 收集到一个向量中,然后等待每一个 JoinHandle。可以用任意顺序等待这些 JoinHandle:由于请求已经发出,因此只要此线程调用了 block_on 并且没有更有价值的事情可做,请求的 Future 就会根据需要进行轮询。所有请求都将并行执行。一旦完成操作,many_requests 就会把结果返回给它的调用者。 前面的代码几乎是正确的,但 Rust 的借用检查器报错说它很担心 cheapo_request 返回的 Future 的生命周期:
error: `host` does not live long enough
handles.push(task::spawn_local(cheapo_request(&host, port,
&path)));
---------------^^^^^------------
--
| |
| borrowed value does not
| live long enough argument requires that `host` is borrowed for
`'static`
}
- `host` dropped here while still borrowed
path 也会出现类似的错误。 自然,如果将引用传给一个异步函数,那么它返回的 Future 就必须持有这些引用,因此,安全起见,Future 的生命周期不能超出它们借来的值。这和任何包含引用的值所受的限制是一样的。 问题是 spawn_local 无法确定你会在 host 和 path 被丢弃之前等待任务完成。事实上,spawn_local 只会接受生命周期为 ‘static 的 Future,因为你也可以简单地忽略它返回的 JoinHandle,并在程序执行其他部分时让此任务继续运行。这不是异步任务独有的问题:如果尝试使用 std::thread::spawn 启动一个线程,那么该线程的闭包也会捕获对局部变量的引用,并得到类似的错误。 解决此问题的方法是创建另一个接受这些参数的拥有型版本的异步函数:
async fn cheapo_owning_request(
host: String,
port: u16,
path: String,
) -> std::io::Result<String> {
cheapo_request(&host, port, &path).await
}
此函数会接受 String 引用而不是 &str 引用,因此它的 Future 拥有 host 字符串和 path 字符串本身,并且其生命周期为 ‘static。通过借用检查器可以发现它立即开始等待 cheapo_request 返回的 Future,因此,如果该 Future 被轮询,那么它借用的 host 变量和 path 变量必然仍旧存在。一切顺利。 可以使用 cheapo_owning_request 像下面这样分发所有请求:
for (host, port, path) in requests {
handles.push(task::spawn_local(cheapo_owning_request(host, port, path)));
}
可以借助 block_on 从同步 main 函数中调用 many_requests:
let requests = vec![
("example.com".to_string(), 80, "/".to_string()),
("www.red-bean.com".to_string(), 80, "/".to_string()),
("en.wikipedia.org".to_string(), 80, "/".to_string()),
];
let results = async_std::task::block_on(many_requests(requests));
for result in results {
match result {
Ok(response) => println!("{}", response),
Err(err) => eprintln!("error: {}", err),
}
}
上述代码会在对 block_on 的调用中同时运行所有 3 个请求。每一个都会在某种时机取得进展,而其他的则会被阻塞,所有这些都发生在调用线程上。图 20-3 展示了对 cheapo_request 的 3 个调用的一种可能的执行方式。

图 20-3:在单个线程上运行 3 个异步任务 (我们鼓励你尝试自己运行此代码,在 cheapo_request 的顶部和每个 await 表达式之后添加 eprintln! 调用,以便看出这些调用在一次执行与下一次执行之间的交错方式有何不同。) 对 many_requests 的调用(为简单起见,图 20-3 中未展示)启动了 3 个异步任务,我们将其标记为 A、B 和 C。block_on 首先轮询 A,这样 A 会连接到 example.com。一旦返回了 Poll::Pending,block_on 就会将注意力转向下一个异步任务,轮询 B,并最终轮询 C,这样每个任务都会连接到各自的服务器。 当所有可轮询的 Future 都返回了 Poll::Pending 时, block_on 就会进入休眠状态,直到某个 TcpStream::connect 返回的 Future 表明它的任务值得再次轮询时才唤醒。 在本次执行中,服务器 en.wikipedia.org 比其他服务器响应更快,因此该任务首先完成。当启动的任务完成后,它会将值保存在 JoinHandle 中并标记为就绪,以便正在等候的 many_requests 可以继续处理。最终,对 cheapo_request 的其他调用要么成功了,要么返回了错误,而 many_requests 本身也可以返回了。最后,main 会从 block_on 接收到结果向量。 上述操作发生在同一个线程上,对 cheapo_request 的 3 个调用会通过对它们的 Future 的连续轮询交错进行。虽然异步调用看起来 是单个函数调用一直运行到完成为止,但这种调用其实是通过对 Future 的 poll 方法的一系列同步调用实现的。每个单独的 poll 调用都会快速返回,让进程空闲,以便轮询另一个异步调用。 我们终于达成了本章开头设定的目标:让线程在等待 I/O 完成时承担其他工作,这样线程的资源就不会在无所事事中浪费掉。更妙的是,此目标是通过与普通 Rust 代码非常相似的代码实现的:一些函数被标记为 async,一些函数调用后面跟着 .await,并且改用来自 async_std 而不是 std 的函数,除此之外,就和普通的 Rust 代码一模一样。 异步任务与线程的一个重要区别是:从一个异步任务到另一个异步任 务的切换只会出现在 await 表达式处,且只有当等待的 Future 返回了 Poll::Pending 时才会发生。这意味着如果在 cheapo_request 中放置了一个长时间运行的计算,那么传给 spawn_local 的其他任务在它完成之前全都没有机会运行。使用线程则不会出现这个问题:操作系统可以在任何时候挂起任何线程,并设置定时器以确保没有哪个线程会独占处理器。异步代码要求共享同一个线程的各个 Future 自愿合作。如果想让长时间运行的计算与异步代码共存,可以参考 20.1.9 节讲到的一些选项。
20.1.5 异步块
除了异步函数,Rust 还支持异步块。普通的块语句会返回其最后一个表达式的值,而异步块会返回其最后一个表达式值的 Future。可以在异步块中使用 await 表达式。 异步块看起来就像普通的块语句,但其前面有 async 关键字:
let serve_one = async {
use async_std::net;
// 监听连接并接受其中一个
let listener = net::TcpListener::bind("localhost:8087").await?;
let (mut socket, _addr) = listener.accept().await?;
// 在`socket`上与客户端对话
};
上述代码会将 serve_one 初始化为一个 Future(当被轮询时),以侦听并处理单个 TCP 连接。直到轮询 serve_one 时才会开始执行代码块的主体,就像直到轮询 Future 时才会开始执行异步函数的主体一样。 如果在异步块中使用 ? 运算符处理错误,那么它只会从块中而不是围绕它的函数中返回。如果前面的 bind 调用返回了错误,则 ? 运算符会将其作为 serve_one 的最终值返回。同样,return 表达式也会从异步块而不是其所在函数中返回。如果异步块引用了围绕它的代码中定义的变量,那么它的 Future 就会捕获这些变量的值,就像闭包所做的那样。与 move 闭包(参见 14.1.2 节)的用法一样,也可以用 async move 启动该块以获取捕获的值的所有权,而不仅仅持有对它们的引用。 为了将你想要异步运行的那部分代码分离出去,异步块提供了一种简洁的方法。例如,在 20.1.4 节中,spawn_local 需要一个 ‘static 的 Future,因此我们定义了包装函数 cheapo_owning_request 来为我们提供一个拥有其参数所有权的 Future。只需从异步块中调用 cheapo_request 即可获得相同的效果,不用花心思去写包装函数:
pub async fn many_requests(
requests: Vec<(String, u16, String)>,
) -> Vec<std::io::Result<String>> {
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(task::spawn_local(async move {
cheapo_request(&host, port, &path).await
}));
}
}
由于这是一个 async move 块,因此它的 Future 获取了 String 值 host 和 path 的所有权,和 move 闭包一样。然后该 Future 会传递对 cheapo_request 的引用。借用检查器可以看到块的 await 表达式接手了 cheapo_request 返回的 Future 的所有 权,因此对 host 和 path 的引用的生命周期不能比它们借来的已捕获变量的生命周期长。对于 cheapo_owning_request 所能做的事,async 块也能完成,且使用的样板代码更少。 你可能会遇到的一个棘手问题是,与异步函数不同,没有任何语法可用于指定异步块的返回类型。这在使用 ? 运算符时会导致问题:
let input = async_std::io::stdin();
let future = async {
let mut line = String::new();
// 这会返回`std::io::Result<usize>` input.read_line(&mut line).await?;
println!("Read line: {}", line);
Ok(())
};
运行失败并出现以下错误:
error: type annotations needed
|
48 | let future = async {
| ------ consider giving `future` a type
...
60 | Ok(())
| ^^ cannot infer type for type parameter `E` declared
| on the enum `Result`
Rust 无法判断异步块的返回类型是什么。read_line 方法会返回 Result<(), std::io::Error>,但是因为 ? 运算符会使用 From 特型将手头的错误类型转换为场景要求的任何类型,所以异步块的返回类型 Result<(), E> 中的 E 可以是实现了 Fromstd::io::Error 的任意类型。 Rust 的未来版本中可能会新增相应的语法来指出 async 块的返回类型。目前,可以通过明确写出块的最终 Ok 的类型来解决这个问题:
let future = async { Ok::<(), std::io::Error>(()) };
由于 Result 是一个希望以成功类型和错误类型作为其参数的泛型类型,因此,如上例所示,可以在使用 Ok 或 Err 时指定这些类型参数。
20.1.6 从异步块构建异步函数
异步块为我们提供了另一种实现与异步函数相同效果的方式,并且这种方式更加灵活。例如,可以将我们的 cheapo_request 示例改写为一个普通的同步函数,该函数会返回一个异步块的 Future:
use std::io; use std::future::Future;
fn cheapo_request<'a>(host: &'a str, port: u16, path: &'a str)
-> impl Future<Output = io::Result<String>> + 'a
{
async move {
……函数体……
}
}
当你调用这个版本的函数时,它会立即返回异步块返回值的 Future。这会捕获该函数的参数表,并且表现得就像异步函数返回的 Future 一样。由于没有使用 async fn 语法,因此需要在返回类型中写上 impl Future。但就调用者而言,这两个定义是具有相同函数签名的可互换实现。 如果想在调用函数时立即进行一些计算,然后再创建其结果的 Future,那么第二种方法会很有用。例如,另一种让 cheapo_request 和 spawn_local 协同工作的方法是将其变成一个返回 ‘static Future 的同步函数,这会捕获由其参数完全拥有的副本:
fn cheapo_request(host: &str, port: u16, path: &str)
-> impl Future<Output = io::Result<String>> + 'static
{
let host = host.to_string(); let path = path.to_string();
async move {
……使用&*host、port和path……
}
}
这个版本允许异步块将 host 和 path 捕获为拥有型 String 值,而不是 &str 引用。由于 Future 拥有其运行所需的全部数据,因此它会在整个 ‘static 生命周期内有效。(在前面所展示的签名中 我们明确写出了 + ‘static,但 ‘static 本来就是各种 -> impl 返回类型的默认值,因此将其省略也不会有任何影响。) 由于这个版本的 cheapo_request 返回的是 ‘static Future,因此可以将它们直接传给 spawn_local。
let join_handle = async_std::task::spawn_local(cheapo_request("areweasyncyet.rs", 80, "/"));
……其他工作……
let response = join_handle.await?;
20.1.7 在线程池中启动异步任务
迄今为止,我们展示的这些示例把几乎所有时间都花在了等待 I/O 上,但某些工作负载主要是 CPU 任务和阻塞的混合体。当计算量繁重到无法仅靠单个 CPU 满足时,可以使用 async_std::task::spawn 在工作线程池中启动 Future,线程池专门用于轮询那些已准备好向前推进的 Future。 async_std::task::spawn 用起来很像 async_std::task::spawn_local:
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(task::spawn(async move {
cheapo_request(&host, port, &path).await
}));
}
与 spawn_local 一样,spawn 也会返回一个 JoinHandle 值,你可以等待它,以获得 Future 的最终值。但与 spawn_local 不同,Future 不必等到调用 block_on 才进行轮询。一旦线程池中的某个线程空闲了,该线程就会试着轮询它。 在实践中,spawn 比 spawn_local 用得多。这只是因为人们更希望看到他们的工作负载在机器资源上均匀分配,而不关心工作负载的计算和阻塞是如何混杂的。 使用 spawn 时要记住一点:线程池倾向于保持忙碌。因此无论哪个线程率先得到轮询的机会,都会轮询到你的 Future。异步调用可能在一个线程上开始执行,阻塞在 await 表达式上,然后在另一个线程中恢复。因此,虽然将异步函数调用视为单一的、连续的代码执行是一种合理的简化(实际上,异步函数和 await 表达式的设计目标就是鼓励你以这种方式思考),但实际上可能会通过许多不同的线程来承载此次调用。 如果你正在使用线程本地存储,可能会惊讶地看到你在 await 表达式之前放置的数据后来被换成了完全不同的东西。这是因为你的任务现在正由线程池中的不同线程轮询。如果你觉得这是一个问题,就应该改用任务本地存储,具体请参阅 async-std crate 的 task_local! 宏的详细信息。
20.1.8 你的 Future 实现 Send 了吗
spawn 具有 spawn_local 所没有的一项限制。由于 Future 会被发送到另一个线程运行,因此它必须实现标记特型 Send(参见 19.2.5 节)。只有当 Future 包含的所有值都符合 Send 要求时,它自己才符合 Send 要求:所有函数参数、局部变量,甚至匿名临时值都必须安全地转移给另一个线程。 和生命周期方面的限制一样,这项要求也不是异步任务独有的:如果尝试用 std::thread::spawn 启动其闭包以捕获非 Send 值的线程,那么也会遇到类似的错误。不同点在于,虽然传给 std::thread::spawn 的闭包会留在创建并运行它的线程上,但在线程池中启动的 Future 可以在等待期间的任意时刻从一个线程转移给另一个线程。这项限制很容易意外触发。例如,下面的代码乍看起来没问题:
use async_std::task;
use std::rc::Rc;
async fn reluctant() -> String {
let string = Rc::new("ref-counted string".to_string());
some_asynchronous_thing().await;
format!("Your splendid string: {}", string)
}
task::spawn(reluctant());
异步函数的 Future 需要保存足够的信息,以便此函数能从 await 表达式继续。在这种情况下,reluctant 返回的 Future 必须在 await 之后使用 string 的值,因此 Future(至少在某些时刻)会包含一个 Rc
error: future cannot be sent between threads safely |
17 | task::spawn(reluctant());
| ^^^^^^^^^^^ future returned by `reluctant` is not `Send`
|
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `async_std::task::spawn`
|
= help: within `impl Future`, the trait `Send` is not implemented for `Rc<String>` note: future is not `Send` as this value is used across an await |
10 | let string = Rc::new("ref-counted string".to_string());
| ------ has type `Rc<String>` which is not `Send` 11 |
12 | some_asynchronous_thing().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
await occurs here, with `string` maybe used later
...
15 | }
| - `string` is later dropped here
此错误消息很长,包含很多有用的详细信息。
解释了为什么 Future 需要符合 Send 的要求:task::spawn 需要它。
解释了哪个值不符合 Send 的要求:局部变量 string,其类型是 Rc
3
await 。
3
由于 await 之前和之后不属于同一个作用域,因此需要保存上下文。——译者注 有两种方法可以解决此问题。一种方法是限制非 Send 值的作用域,使其不跨越任何 await 表达式的作用域,因此也不需要保存在函数的 Future 中:
async fn reluctant() -> String {
let return_value = {
let string = Rc::new("ref-counted string".to_string());
format!("Your splendid string: {}", string)
// `Rc<String>`在此离开了作用域……
};
// ……因此当我们在这里暂停时,它不在周边环境里
some_asynchronous_thing().await;
return_value
}
另一种方法是简单地使用 std::sync::Arc 而非 Rc。Arc 使用原子更新来管理引用计数,这会让它略慢,但 Arc 指针是符合 Send 要求的。 虽然最终你将学会识别和避免非 Send 类型,但一开始它们可能有点儿令人吃惊。[ 至少,我们(本书作者)曾感到惊讶。] 例如,旧的 Rust 代码有时会使用下面这样的泛型结果类型:
// 别这样做!
type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;
这个 GenericError 类型使用了装箱过的特型对象来保存实现了 std::error::Error 的任意类型的值,但没有对它施加任何进一步的限制:如果有某个非 Send 类型实现了 Error,那么就可以将该类型的装箱值转换为 GenericError。由于这种可能性, GenericError 不符合 Send 要求,并且下面的代码无法工作:
fn some_fallible_thing() -> GenericResult<i32> {
...
}
// 这个函数的Future不符合`Send`要求…… async fn unfortunate() { // ……因为此调用的值……
match some_fallible_thing() { Err(error) => {
report_error(error);
}
Ok(output) => {
// ……其生命周期跨越了这个await…… use_output(output).await;
}
}
}
// ……因此这个`spawn`会出错
async_std::task::spawn(unfortunate());
与前面的示例一样,编译器的错误消息解释了正在发生的事情,并指 出 Result 类型是罪魁祸首。由于 Rust 认为 some_fallible_thing 的结果存在于整个 match 语句(包括 await 表达式)中,所以它确定 unfortunate 返回的 Future 不符合 Send 的要求。对于这个错误,Rust 过于谨慎了:虽然 GenericError 确实不能安全地发送到另一个线程,但 await 只有在结果为 Ok 时才会发生,因此当我们等待 use_output 返回的 Future 时其实并不存在错误值。理想的解决方案是使用更严格的泛型错误类型,比如 7.2.5 节提到的错误类型:
type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
type GenericResult<T> = Result<T, GenericError>;
这个特型对象会明确要求底层错误类型实现 Send。一切顺利。 即使你的 Future 不符合 Send 要求,而且不容易把它变成符合形式,仍然可以使用 spawn_local 在当前线程上运行它。当然,你需要确保此线程会在某个时刻调用 block_on 以便让它有机会运行,并且你无法受益于跨多个处理器分派工作的能力。
20.1.9 长时间运行的计算:yield_now 与 spawn_blocking
为了让 Future 更好地与其他任务共享线程,它的 poll 方法应该总是尽可能快地返回。但是,如果你正在进行长时间的计算,就可能需要很长时间才能到达下一个 await,从而让其他异步任务等待的时间比你预想的更久些。 避免这种情况的一种方法是偶尔等待某些事情。 async_std::task::yield_now 函数会返回一个为此而设计的简单的 Future:
while computation_not_done() {
……完成一个中等规模的计算步骤……
async_std::task::yield_now().await; }
当 yield_now 返回的 Future 第一次被轮询时,它会返回 Poll::Pending,但表示自己很快就值得再次轮询。因此你的异步调 用放弃了线程,以使其他任务有机会运行,但很快会再次轮到它。第 二次轮询 yield_now 返回的 Future 时,它会返回 Poll::Ready(()),让你的异步函数恢复执行。然而,这种方法并不总是可行。如果你使用外部 crate 进行长时间运行的计算或者调用 C 或 C++,那么将上述代码更改为异步友好型代码可能并不方便。或者很难确保计算所经过的每条路径一定会时不时地等待一下。 对于这种情况,可以使用 async_std::task::spawn_blocking。该函数会接受一个闭包,开始在独立的线程上运行它,并返回携带其返回值的 Future。异步代码可以等待那个 Future,将其线程让给其他任务,直到本次计算就绪。通过将繁重的工作放在单独的线程上,可以委托给操作系统去负责,让它更友善地分享处理器。 假设我们要根据存储在身份验证数据库中的密码哈希值来检查用户提供的密码。为安全起见,验证密码需要进行大量计算,这样即使攻击者获得了数据库的副本,也无法简单地通过尝试数万亿个可能的密码来查看是否有匹配项。argonautica crate 提供了一个专为存储密码而设计的哈希函数:正确生成的 argonautica 哈希需要相当一部 分时间才能验证。可以在异步应用程序中使用 argonautica(0.2 版),如下所示:
async fn verify_password(
password: &str,
hash: &str,
key: &str,
) -> Result<bool, argonautica::Error> {
// 制作参数的副本,以使闭包的生命周期是'static let password = password.to_string(); let hash = hash.to_string(); let key = key.to_string();
async_std::task::spawn_blocking(move || {
argonautica::Verifier::default()
.with_hash(hash)
.with_password(password)
.with_secret_key(key)
.verify()
})
.await
}
如果 password 与 hash 匹配,则返回 Ok(true),给定的 key 是整个数据库的键。通过在传给 spawn_blocking 的闭包中进行验证,可以将昂贵的计算推给其各自的线程,确保它不会影响我们对其他用户请求的响应。
20.1.10 对几种异步设计进行比较
在许多方面,Rust 的异步编程方式与其他语言所采用的方法相似。例如,JavaScript、C#和 Rust 都有带 await 表达式的异步函数。所有这些语言都有代表未完成计算的值:Rust 中叫作“Future”, JavaScript 中叫作“承诺”(Promise),C# 中叫作“任务” (Task),但它们都代表一种你可能不得不等待的值。 然而,Rust 对轮询的使用独树一帜。在 JavaScript 和 C# 中,异步函数在调用后会立即开始运行,并且系统库中内置了一个全局事件循环,可在等待的值可用时恢复挂起的异步函数调用。不过,在 Rust 中,异步调用什么都不会做,直到你将它传给 block_on、spawn 或 spawn_local 之类的函数,这些函数将轮询它并驱动此事直到完成。我们称这些函数为执行器,它们承担着与其他语言中全局事件循环类似的职责。 因为 Rust 会让你(程序员)选择一个执行器来轮询你的 Future,所以它并不需要在系统中内置全局事件循环。async-std crate 提供了迄今为止本章使用过的这些执行器函数,但是 tokio crate(本章稍后会用到)自己定义了一组类似的执行器函数。在本章的末尾,我们将实现自己的执行器。你可以在同一个程序中使用这 3 种执行器。
20.1.11 一个真正的异步 HTTP 客户端
如果不展示一个正确使用异步 HTTP 客户端 crate 的例子,那本章就 是不完整的,因为它非常简单,并且确有几个不错的 crate 可供选择,包括 request 和 surf。 下面是对 many_requests 的重写,它甚至比基于 cheapo_request 的重写更简单,而且会用 surf 同时运行一系列请求。你需要在 Cargo.toml 文件中添加如下依赖项:
[dependencies] async-std = "1.7" surf = "1.0"
然后,可以像下面这样定义 many_requests:
pub async fn many_requests(urls: &[String]) -> Vec<Result<String, surf::Exception>> {
let client = surf::Client::new();
let mut handles = vec![];
for url in urls {
let request = client.get(&url).recv_string();
handles.push(async_std::task::spawn(request));
}
let mut results = vec![];
for handle in handles {
results.push(handle.await);
}
results
}
fn main() {
let requests = &[
"http://example.com".to_string(),
"https://www.red-bean.com".to_string(),
"https://en.wikipedia.org/wiki/Main_Page".to_string(),
];
let results = async_std::task::block_on(many_requests(requests));
for result in results {
match result {
Ok(response) => println!("*** {}\n", response),
Err(err) => eprintln!("error: {}\n", err),
}
}
}
使用单个 surf::Client 发出所有请求可以让我们重用 HTTP 连接 (如果其中有多个请求指向同一台服务器的话),并且不需要异步 块:因为 recv_string 是一个返回 Send + ‘static 型 Future 的异步方法,所以可以将它返回的 Future 直接传给 spawn。
20.2 异步客户端与服务器
现在,我们要把这些已讨论过的关键思想组合成一个真正可用的程序。在很大程度上,异步应用程序和普通的多线程应用程序非常相似,但在某些需要紧凑而且富有表现力的代码的场合,异步编程可以大显身手。 本节的示例是聊天服务器和客户端。真正的聊天系统是很复杂的,涉及从安全、重新连接到隐私和内部审核的各种问题,但我们已将此系统缩减为一组非常基础的特性,来把注意力聚焦于少数我们感兴趣的要点上。 特别是,我们希望能好好处理背压。也就是说,即使一个客户端的网络连接速度较慢或完全断开连接,也绝不能影响其他客户端按照自己的节奏交换消息。由于“龟速”客户端不应该让服务器花费无限的内存来保存其不断增长的积压消息,因此我们的服务器应该丢弃那些发给掉队客户端的消息,但也有义务提醒他们其信息流不完整。(一个真正的聊天服务器会将消息记录到磁盘并允许客户端检索他们错过的消息,但这里不考虑那样做。) 使用命令 cargo new –lib async-chat 启动项目,并将以下文本放入 async-chat/Cargo.toml 中:
[package] name = "async-chat" version = "0.1.0" authors = ["You <you@example.com>"] edition = "2021"
[dependencies]
async-std = { version = "1.7", features = ["unstable"] } tokio = { version = "1.0", features = ["sync"] } serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0"
我们依赖于 4 个 crate。 async-std crate 是本章中一直在用的异步 I/O 基础构件和实用工具的集合。 tokio crate 是类似于 async-std crate 的另一个异步基础构件集合,它也是 古老且 成熟的 crate 之一。tokio crate 应用广泛,设计和实现的标准都很高,但使用时需要比 asyncstd crate 更加小心。 tokio 是一个大型 crate,但我们只需要其中的一个组件,因此 Cargo.toml 依赖行中的 features = [“sync”] 字段将 tokio 缩减为了我们需要的部分,使其成为一种轻型依赖。当异步库生态系统还不太成熟时,人们会避免在同一个程序中同时使用 tokio 和 async-std。不过,只要遵循这两个项目各自 crate 文档中的规则,就可以在同一个程序中使用。
serde 和 serde_json 是第 18 章中介绍过的两个 crate。它们为我们提供了方便且高效的工具来生成和解析 JSON,我们的聊天协议使用 JSON 来表示网络上的数据。我们想使用 serde 的一些可选特性,因此会在提供依赖项时选择它们。 我们的聊天应用程序、客户端和服务器的整体结构如下所示:
async-chat
├── Cargo.toml └── src ├── lib.rs ├── utils.rs └── bin ├── client.rs └── server ├── main.rs ├── connection.rs ├── group.rs └── group_table.rs 这个包的布局使用了 8.4 节中提到的一项 Cargo 特性:除了主库 crate src/lib.rs 及其子模块 src/utils.rs,还包括两个可执行文件。 src/bin/client.rs 是聊天客户端的单文件可执行文件。 src/bin/server 是服务端的可执行文件,分布在 4 个文件中: main.rs 包含 main 函数,另外 3 个子模块分别是 connection.rs、group.rs 和 group_table.rs。 我们将在本章中展示每个源文件的内容,如果它们都就位了,那么一旦在此目录树中键入 cargo build,就会编译库的 crate,然后构建出两个可执行文件。Cargo 会自动包含库的 crate 作为依赖项,使其成为放置客户端和服务器共享定义的约定位置。同样,cargo check 会检查整棵源代码树。要运行任何一个可执行文件,可以使用如下命令:
$ cargo run --release --bin server -- localhost:8088
$ cargo run --release --bin client -- localhost:8088
–bin 选项会指出要运行哪个可执行文件,而 – 选项后面的任何参数都会传给可执行文件本身。我们的客户端和服务器只希望知道服务器的地址和 TCP 端口。
20.2.1 Error 类型与 Result 类型
库 crate 的 utils 模块定义了要在整个应用程序中使用的 Error 类型与 Result 类型。以下来自 src/utils.rs:
use std::error::Error;
pub type ChatError = Box<dyn Error + Send + Sync + 'static>;
pub type ChatResult<T> = Result<T, ChatError>;
这些是我们在 7.2.5 节中建议的泛型错误类型。async_std
crate、serde_json crate 和 tokio crate 也分别定义了自己的
错误类型,但是 ? 运算符可以自动将它们全部转换为 ChatError,这是借助标准库的 From 特型实现的,该特型可以将任何合适的错误
类型转换为 Box
20.2.2 协议
库 crate 以下面这两种类型来支持整个聊天协议,这是在 lib.rs 中定义的:
use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub mod utils;
#[derive(Debug, Deserialize, Serialize, PartialEq)] pub enum FromClient {
Join { group_name: Arc<String> }, Post {
group_name: Arc<String>, message: Arc<String>,
},
}
#[derive(Debug, Deserialize, Serialize, PartialEq)] pub enum FromServer { Message {
group_name: Arc<String>, message: Arc<String>,
},
Error(String),
}
#[test]
fn test_fromclient_json() {
use std::sync::Arc;
let from_client = FromClient::Post {
group_name: Arc::new("Dogs".to_string()),
message: Arc::new("Samoyeds rock!".to_string()),
};
let json = serde_json::to_string(&from_client).unwrap();
assert_eq!(
json,
r#"{"Post":{"group_name":"Dogs","message":"Samoyeds rock!"}}"#
);
assert_eq!(
serde_json::from_str::<FromClient>(&json).unwrap(),
from_client
);
}
FromClient 枚举表示可以从客户端发送到服务器的数据包:它可以请求加入一个组或向已加入的任何组发布消息。FromServer 表示可以由服务器发回的内容,即发布到某个组的消息或错误消息。可以使
用带引用计数的 Arc
{"Post":{"group_name":"Dogs","message":"Samoyeds rock!"}}
然后,派生出的 Deserialize 实现会将其解析回等效的 FromClient 值。请注意,FromClient 中的 Arc 指针对其序列化形式没有任何影响:引用计数字符串会直接显示为 JSON 对象成员的值。
20.2.3 获取用户输入:异步流
我们的聊天客户端的首要职责是读取用户的命令并将相应的数据包发送到服务器。管理一个合适的用户界面超出了本章的范围,所以我们 将做 简单可行的事情:直接从标准输入中读取行。以下代码位于 src/bin/client.rs 中:
use async_std::prelude::*; use async_chat::utils::{self, ChatResult}; use async_std::io; use async_std::net;
async fn send_commands(mut to_server: net::TcpStream) -> ChatResult<()> { println!("Commands:\n\ join GROUP\n\ post GROUP MESSAGE...\n\
Type Control-D (on Unix) or Control-Z (on Windows) \ to close the connection.");
let mut command_lines = io::BufReader::new(io::stdin()).lines(); while let Some(command_result) = command_lines.next().await { let command = command_result?;
// 参见GitHub存储库中对`parse_command`的定义 let request = match parse_command(&command) {
Some(request) => request,
None => continue,
};
utils::send_as_json(&mut to_server, &request).await?; to_server.flush().await?;
}
Ok(())
}
这会调用 async_std::io::stdin 来获取客户端标准输入的异步句柄,并包装在 async_std::io::BufReader 中对其进行缓冲,然后调用 lines 逐行处理用户的输入。它会尝试将每一行解析为与某个 FromClient 值相对应的命令,如果成功,就将该值发送到服务器。如果用户输入了无法识别的命令,那么 parse_command 就会打印一条错误消息并返回 None,以便 send_commands 可以重新开始循环。如果用户键入了文件结束(EOF)指示符,则 lines 流会返回 None,并且 send_commands 也会返回。此代码与你在普通同步程序中编写的代码非常相似,只不过它使用的是 async_std 版本的库特性。
异步 BufReader 的 lines 方法很有趣。它没有像标准库那样返回一个迭代器:Iterator::next 方法是一个普通的同步函数,因此调用 command_lines.next() 会阻塞线程,直到下一行代码就
绪。而这里的 lines 会返回一个 Result
trait Stream {
type Item;
// 现在,把`Pin<&mut Self>`读取为`&mut Self`
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
可以将 Stream 特型视为 Iterator 特型和 Future 特型的混合体。与迭代器一样,Stream 也有关联的 Item 类型,并使用 Option 来指示序列何时结束。同时,与 Future 一样,流必须被轮询:要获取下一个条目(或了解流是否结束),就必须调用 poll_next 直到它返回 Poll::Ready。流的 poll_next 实现应该总是快速返回,不会阻塞。如果流返回了 Poll::Pending,则必须在值得再次轮询时通过 Context 通知调用者。 poll_next 方法很难直接使用,不过通常也不需要直接使用该方法。与迭代器一样,流有很多实用方法,比如 filter 和 map。在 这些方法中,有一个 next 方法,它会返回流中下一个 OptionSelf::Item 的 Future。可以调用 next 并等待它返回的 Future,而不必显式轮询流。 把这些片段结合起来看,send_commands 会利用 while let 和 next 循环遍历输入行组成的流来消耗这个流:
while let Some(item) = stream.next().await {
……使用条目……
}
(Rust 可能会在未来版本中引入可用来消耗流的异步 for 循环,就像普通 for 循环能消耗 Iterator 值一样。) 在流结束后(也就是说,在流返回 Poll::Ready(None) 指出流已结束之后)轮询流,就像在迭代器返回 None 之后调用 next,或者在 Future 返回 Poll::Ready 之后轮询 Future:Stream 特型没有规定此时流应该怎么做,某些流可能行为诡异。与 Future 和迭代器一样,流也有一个 fuse 方法来确保此类调用的行为在必要时是可预测的。有关详细信息,请参阅在线文档。 使用流时,务必记住使用 async_std 预导入:
use async_std::prelude::*;
这是因为 Stream 特型的实用方法(如 next、map、filter 等)实际上并没有定义在自身上,而是单独特型 StreamExt 上的默认方法,该特型会自动为所有 Stream 实现:
pub trait StreamExt: Stream {
……把一些实用工具方法定义为默认方法……
}
impl<T: Stream> StreamExt for T { }
这是 11.2.2 节描述的扩展特型模式的示例。async_std::prelude 模块会将 StreamExt 方法引入作用域,因此使用预导入可以确保这些方法在你的代码中可见。
20.2.4 发送数据包
为了在网络套接字上传输数据包,我们的客户端和服务器会使用库 crate 的 utils 模块中的 send_as_json 函数:
use async_std::prelude::*;
use serde::Serialize;
use std::marker::Unpin;
pub async fn send_as_json<S, P>(outbound: &mut S, packet: &P) -> ChatResult<()>
where
S: async_std::io::Write + Unpin,
P: Serialize,
{
let mut json = serde_json::to_string(&packet)?;
json.push('\n');
outbound.write_all(json.as_bytes()).await?;
Ok(())
}
这个函数会将 packet 的 JSON 表示形式构建为 String,在末尾添加换行符,然后将其全部写入 outbound。 从这个函数的 where 子句可以看出 send_as_json 非常灵活。要发送的数据包类型 P 可以是任何实现了 serde::Serialize 的值。输出流 S 可以是任何实现了 async_std::io::Write(输出流的 std::io::Write 特型的异步版本)的值。这足以让我们在异步 TcpStream 上发送 FromClient 值和 FromServer 值。只要遵守 send_as_json 的泛型定义,就能确保它不会意外依赖于流类型或数据包类型的细节,因为 send_as_json 只能使用来自这些特型的方法。 使用 write_all 方法需要满足 S 上的 Unpin 约束。本章在后面会介绍 Pin 和 Unpin,但就目前而言,只要在必要时向类型变量中添加 Unpin 约束就足够了,如果忘记了,Rust 编译器会帮你指出这些问题。 send_as_json 没有将数据包直接序列化到 outbound 流,而是将其序列化为临时 String,然后写入 outbound 中。serde_json crate 确实提供了将值直接序列化为输出流的函数,但这些函数只支持同步流。要想写入异步流,就要对 serde_json 和 serde 这两个 crate 中与格式无关的核心代码进行根本性更改,因为围绕它们设计的特型都有一些同步方法。 与流一样,async_std 的 I/O 特型的许多方法实际上是在其扩展特型上定义的,因此在使用它们时请务必记住 use async_std::prelude::*。
20.2.5 接收数据包:更多异步流
为了接收数据包,我们的服务器和客户端将使用一个来自 utils 模块的函数从异步缓冲的 TCP 套接字
(async_std::io::BufReader
use serde::de::DeserializeOwned;
pub fn receive_as_json<S, P>(inbound: S) -> impl Stream<Item = ChatResult<P>>
where
S: async_std::io::BufRead + Unpin,
P: DeserializeOwned,
{
inbound.lines().map(|line_result| -> ChatResult<P> {
let line = line_result?;
let parsed = serde_json::from_str::<P>(&line)?;
Ok(parsed)
})
}
与 send_as_json 一样,这个函数的输入流类型和数据包类型是泛型的。
流类型 S 必须实现 async_std::io::BufRead,这是 std::io::BufRead 的异步模拟,表示缓冲输入字节流。
数据包类型 P 必须实现 DeserializeOwned,这是 serde 的
Deserialize 特型的更严格变体。为了提高效率,
Deserialize 可以生成 &str 值和 &[u8] 值,这些值会直接从反序列化的缓冲区中借用它们的内容,以免复制数据。然而,在上面的例子中,这样做可不太好:我们要将反序列化后的值返回给调用者,因此它们的生命周期必须超出被解析的缓冲区。实现了 DeserializeOwned 的类型始终独立于被反序列化的缓冲区。
调用 inbound.lines() 会为我们提供一个携带
std::io::Result 值的流,我们直接将其返回。该函数的返回类型如下所示: 这表示我们返回了某种会异步生成 ChatResult 值序列的类型,但我们的调用者无法准确判断是哪种类型。由于传给 map 的闭包无论如何都是匿名类型,因此这已经是 receive_as_json 可能返回的具体的类型了。
请注意,receive_as_json 本身并不是异步函数,它是会返回一个异步值(一个流)的普通函数。现在,比起“只在某些地方添加
async 与 .await”,你更深入地理解了 Rust 的异步支持机制,能够写出清晰、灵活和高效的定义,就像刚才这个充分发挥出语言特性的定义一样。
要想了解 receive_as_json 的用法,可以看看下面这个来自
src/bin/client.rs 的聊天客户端的 handle_replies 函数,该函数会从网络接收 FromServer 的值流并将它们打印出来供用户查看: 这个函数会接受一个从服务器接收数据的套接字,把它包装进
BufReader(请注意,这是 async_std 版本),然后将其传给
receive_as_json 以获取传入的 FromServer 值流。接下来它会用 while let 循环来处理传入的回复,检查错误结果并打印每个服务器的回复以供用户查看。 介绍完 send_commands 和 handle_replies,现在可以展示聊天客户端的 main 函数了,该函数来自 src/bin/client.rs: 从命令行获取服务器地址后,main 要调用一系列异步函数,因此它
会将函数的其余部分都包装在一个异步块中,并将该块返回的
Future 传给 async_std::task::block_on 来运行。
建立连接后,我们希望 send_commands 函数和 handle_replies 函数双线运行,这样就可以在键入的同时看到别人发来的消息。如果遇到了 EOF 指示器或者与服务器的连接断开了,那么程序就应该退出。
考虑到我们在本章其他地方所做的工作,你可能想要写出这样的代码: 但由于我们在等待两个 JoinHandle,这会让程序在两个任务都完成后才能退出。但我们希望只要任何一个完成就立即退出。Future 上的 race(赛跑)方法可以满足这一要求。调用
from_server.race(to_server) 会返回一个新的 Future,它会同时轮询 from_server 和 to_server,并在二者之一就绪时返回
Poll::Ready(v)。这两个 Future 必须具有相同的输出类型,其
终值是先完成的那个 Future 的值。未完成的 Future 会被丢弃。
race 方法以及许多其他的便捷工具都是在
async_std::prelude::FutureExt 特型上定义的, async_std::prelude 能让它对我们可见。
迄今为止,我们唯一没有展示过的客户端代码是 parse_command 函数。这是一目了然的文本处理代码,所以这里就不展示它的定义了。
有关详细信息,请参阅 Git 库中的完整代码。 以下是服务器主文件 src/bin/server/main.rs 的全部内容: 服务器的 main 函数和客户端的 main 函数类似:它会先进行一些设置,然后调用 block_on 来运行一个异步块以完成真正的工作。为了
处理来自客户端的传入连接,它创建了 TcpListener 套接字,其 incoming 方法会返回一个 std::io::Result 下面这些位于 src/bin/server/connection.rs 的 connection 模块中的 serve 函数是服务器的主要工作代码: 这几乎就是客户端的 handle_replies 函数的镜像:大部分代码是一个循环,用于处理传入的 FromClient 值的流,它是从带有 receive_as_json 的缓冲 TCP 流构建出来的。如果发生错误,就
会生成一个 FromServer::Error 数据包,将坏消息传回给客户端。
除了错误消息,客户端还希望接收来自他们已加入的聊天组的消息,因此需要与每个组共享和客户端的连接。虽然可以简单地为每个人提供一份 TcpStream 的克隆,但是如果其中两个源试图同时将数据包写入套接字,那么他们的输出就可能彼此交叉,并且客户端 终会收到乱码 JSON。我们需要对此连接安排安全的并发访问。
这是使用 Outbound 类型管理的,在
src/bin/server/connection.rs 中的定义如下所示: Outbound 值在创建时会获得 TcpStream 的所有权并将其包装在 Mutex 中以确保一次只有一个任务可以使用它。serve 函数会将每个 Outbound 包装在一个 Arc 引用计数指针中,以便客户端加入的所有组都可以指向同一个共享的 Outbound 实例。
调用 Outbound::send 时会首先锁定互斥锁,返回一个可解引用为内部 TcpStream 的守卫值。我们使用 send_as_json 来传输 packet, 后会调用 guard.flush() 来确保它不会在某处缓冲区进行不完整传输。(据我们所知,TcpStream 实际上并不会缓冲数据,但 Write 特型确实允许它的实现这样做,所以不应该冒这个险。)
表达式 &mut *guard 可以帮我们解决 Rust 不会通过隐式解引用来满足特型限界的问题。我们会显式解引用互斥锁守卫,得到受保护的
TcpStream,然后借用一个可变引用,生成 send_as_json 所需的
&mut TcpStream。
请注意,Outbound 会使用 async_std::sync::Mutex 类型,而不是标准库的 Mutex。原因有以下 3 点。首先,如果任务在持有互斥锁守卫时被挂起,那么标准库的 Mutex 可能会行为诡异。如果一直运行该任务的线程选择了另一个试图锁定同一 Mutex 的任务,那么麻烦就会随之而来:从 Mutex 的角度来看,已经拥有它的线程正试图再次锁定它。标准的 Mutex 不是为处理这种情况而设计的,因此会发生 panic 或死锁。(它永远不会以不恰当的方式授予锁。)Rust 团队正在进行的一项工作就是在编译期检测到这个问题,并当 std::sync::Mutex 守卫运行在 await 表达式中时发出警告。由于 Outbound::send 在等待 send_as_json 和 guard.flush 返回的 Future 时需要持有锁,因此它必须使用 async_std 的 Mutex。
其次,异步 Mutex 的 lock 方法会返回一个守卫的 Future,因此正在等待互斥锁的任务会将其线程让给别的任务使用,直到互斥锁就绪。(如果互斥锁已然可用,则此 lock 的 Future 会立即就绪,任务根本不会自行挂起。)另外,标准库 Mutex 的 lock 方法在等待获取锁期间会锁定整个线程。由于前面的代码在通过网络传输数据包时持有互斥锁,因此这种等待可能会持续相当长的时间。
后,标准库 Mutex 必须由锁定它的同一个线程解锁。为了强制执行此操作,标准库互斥锁的守卫类型没有实现 Send,它不能传输到其他线程。这意味着持有这种守卫的 Future 本身不会实现 Send,
并且不能传给 spawn 以在线程池中运行,它只能与 block_on 或 spawn_local 一起使用。而 async_std Mutex 的守卫实现了
Send,因此在已启动的任务中使用它没有问题。 但前面所讲的那些并不能导向“在异步代码中应该始终使用
async_std::sync::Mutex”这样简单的结论。通常在持有互斥锁时不需要等待任何东西,并且这种锁定不会持续太久。在这种情况
下,标准库的 Mutex 效率会更高。聊天服务器的 GroupTable 类
型就说明了这种情况。以下是 src/bin/server/group_table.rs 的全部内容: GroupTable 只是一个受互斥锁保护的哈希表,它会将聊天组名称映射到实际组,两者都使用引用计数指针进行管理。get 方法和
get_or_create 方法会锁定互斥锁,执行一些哈希表操作,可能还会做一些内存分配,然后返回。
在 GroupTable 中,我们会使用普通的旧式 std::sync::Mutex。此模块中根本没有异步代码,因此无须避免 await。事实上,如果想在这里使用 async_std::sync::Mutex,就要将 get 和 get_or_create 变成异步函数,这会引入 Future 创建、暂停和恢复的开销,但收益甚微:互斥锁只会在一些哈希操作和可能出现的少量内存分配上锁定。
如果聊天服务器发现自己拥有数百万用户,并且 GroupTable 的互斥锁确实成了瓶颈,那么就算把它变成异步形式也无法解决该问题。使用某种专门用于并发访问的集合类型来代替 HashMap 可能会好一些。例如,dashmap crate 就提供了这样一个类型。 在我们的服务器中,group::Group 类型代表一个聊天组。该类型只需要支持 connection::serve 调用的两个方法:join 用于添加新成员,post 用于发布消息。发布的每条消息都要分发给所有成员。
现在我们来解决前面提过的背压大挑战。有几项需求相互掣肘。
如果一个成员无法跟上发布到群组的消息(比如,其网络连接速度较慢),则群组中的其他成员不应受到影响。
即使某个成员掉线了,也应该有办法重新加入对话并以某种方式继续参与。
用于缓冲消息的内存不应无限制地增长。
因为这些挑战在实现多对多通信模式时很常见,所以 tokio crate 提供了一种广播通道类型,可以对这些挑战进行合理的权衡。tokio 广播通道是一个值队列(在这个例子中就是聊天消息),它允许任意数量的不同线程或任务发送值和接收值。之所以称为“广播”通道,是因为每个消费者都会获得这里发出的每个值的副本。(这个值的类型必须实现了 Clone。)
通常,广播通道会在队列中把一条消息保留到每个消费者都获得了它的副本为止。但是,如果队列的长度超过通道的 大容量(在创建通道时指定),那么 旧的消息将被丢弃。任何掉队的消费者在下次尝试获取下一条消息时都会收到错误消息,并且通道会让他们赶上仍然可用的 旧消息。
例如,图 20-4 展示了一个 大容量为 16 个值的广播通道。 图 20-4:tokio 广播通道
有 2 个发送方会将消息排入队列,4 个接收方会将消息从队列中取出
——或者更准确地说,是将消息从队列中复制出来。接收者 B 还有
14 条消息要接收,接收者 C 还有 7 条,接收者 D 已经完全赶上
了。接收者 A 掉队了,有 11 条消息在它看到之前就被丢弃了。它的下一次接收消息的尝试将失败,然后会返回一个错误以说明情况,并快进到队列的当前尾部。
聊天服务器会将每个聊天组都表示为承载 Arc Group 结构体中包含聊天组的名称,以及表示组广播通道发送端的 broadcast::Sender。
Group::new 函数会调用 broadcast::channel 创建一个 大容量为 1000 条消息的广播通道。channel 函数会返回发送者和接收者,但此时我们不需要接收者,因为组中还没有任何成员。
要向组中添加新成员,Group::join 方法会调用发送者的 subscribe 方法来为通道创建新的接收者。然后聊天组会在
handle_subscribe 函数中启动一个新的异步任务来监视消息的接收者并将它们写回客户端。
有了这些细节,Group::post 方法就很简单了:它只是将消息发送到广播通道。由于通道携带的值是 Arc 尽管细节略有不同,但此函数的形式我们很熟悉:它是一个循环,从广播通道接收消息并通过共享的 Outbound 值将消息传输回客户端。如果此循环跟不上广播通道,它就会收到一个 Lagged 错误,并会尽职尽责地报告给客户端。
如果将数据包发送回客户端时完全失败了,那么可能是因为连接已关
闭,handle_subscriber 退出其循环并返回,导致异步任务退出。这会丢弃广播通道的 Receiver,并取消订阅该通道。这样,当连接断开时,它的每个组成员身份都会在下次该组试图向它发送消息时被清除。
这个聊天组永远不会关闭,因为我们不会从群组表中移除一个组。但为完整性考虑,一旦遇到 Closed 错误,handle_subscriber 就会退出该任务。
请注意,我们正在为每个客户端的每个组成员创建一个新的异步任务。这之所以可行,是因为异步任务使用的内存要比线程少得多,而且在同一个进程中从一个异步任务切换到另一个异步任务效率非常高。
这就是聊天服务器的完整代码。它有点儿简陋,async_std crate、 tokio crate 和 futures crate 中有许多比本书所讲更有价值的特性,但从理论上说,这个扩展示例已经阐明了异步生态系统的一些特性是如何协同工作的:例子中有两种风格的异步任务、流、异步 I/O 特型、通道和互斥锁。 聊天服务器展示了我们如何使用 TcpListener、broadcast 通道等异步原语来编写代码,并使用 block_on、spawn 等执行器来驱动它们的执行。现在来看看这些操作是如何实现的。关键问题是,当一个 Future 返回 Poll::Pending 时,应该如何与执行器协调,以便在正确的时机再次轮询。
想想当我们从聊天客户端的 main 函数运行如下代码时会发生什么: 在 block_on 第一次轮询异步块的 Future 时,几乎可以肯定网络连接没有立即就绪,所以 block_on 进入了睡眠状态。那它应该在什么时候醒来呢?一旦网络连接就绪,TcpStream 就需要以某种方式告诉 block_on 应该再次尝试轮询异步块的 Future,因为它知道这一次 await 将完成,并且异步块的执行可以向前推进。
当像 block_on 这样的执行器轮询 Future 时,必须传入一个称为唤醒器(waker)的回调。如果 Future 还没有就绪,那么 Future 特型的规则就会要求它必须暂时返回 Poll::Pending,并且如果
Future 值得再次轮询,就会安排在那时调用唤醒器。
所以 Future 的手写实现通常看起来是这样的: 换句话说,如果 Future 的值就绪了,就返回它。否则,将
Context 中唤醒器的克隆体存储在某处,并返回
Poll::Pending。
当 Future 值得再次轮询时,它一定会通过调用其唤醒器的 wake 方法通知最后一个轮询它的执行器: 理论上,执行器和 Future 会轮流轮询和唤醒:执行器会轮询
Future 并进入休眠状态,然后 Future 会调用唤醒器,这样,执行器就会醒来并再次轮询 Future。
异步函数和异步块的 Future 不会处理唤醒器本身,它们只会将自己获得的上下文传给要等待的子 Future,并将保存和调用唤醒器的义务委托给这些子 Future。在我们的聊天客户端中,对异步块返回的 Future 的第一次轮询只会在等待 TcpStream::connect 返回的 Future 时传递上下文(Context)。随后的轮询会同样将自己的上下文传给异步块接下来要等待的任何 Future。如前面的示例所示,TcpStream::connect 返回的 Future 会被轮询。也就是说,这些返回的 Future 会将唤醒器转移给一个辅助线程,该线程会等待连接就绪,然后调用唤醒器。
Waker 实现了 Clone 和 Send,因此 Future 总是可以制作自己的唤醒器副本并根据需要将其发送到其他线程。Waker::wake 方法会消耗此唤醒器。还有一个 wake_by_ref 方法,该方法不会消耗唤醒器,但某些执行器可以更高效地实现消耗唤醒器的版本。(但这种差异充其量也只是一次 clone 而已。)
执行器过度轮询 Future 并无害处,只会影响效率。然而,Future 应该只在轮询会取得实际进展时才小心地调用唤醒器:虚假唤醒和轮询之间的循环调用可能会阻止执行器完全休眠,从而浪费电量并使处理器对其他任务的响应速度降低。
既然已经展示了执行器和原始 Future 是如何通信的,那么接下来我们就自己实现一个原始 Future,然后看看 block_on 执行器的实现。 本章在前面介绍过 spawn_blocking 函数,该函数会启动在另一个线程上运行的给定闭包,并返回携带闭包返回值的 Future。现在,我们拥有实现 spawn_blocking 所需的所有“零件”。为简单起见,我们的版本会为每个闭包创建一个新线程,而不是像 async_std 的版本那样使用线程池。
尽管 spawn_blocking 会返回 Future,但我们并不会将其写成 async fn。相反,它将作为普通的同步函数,返回一个
SpawnBlocking 结构体,我们会利用该结构体实现自己的
Future。
spawn_blocking 的签名如下所示: 由于需要将闭包发送到另一个线程并带回返回值,因此闭包 F 及其返回值 T 必须实现 Send。由于不知道线程会运行多长时间,因此它们也必须是 ‘static 的。这些限界与 std::thread::spawn 自身的强制限界是一样的。
SpawnBlocking Shared 结构体必须充当 Future 和运行闭包的线程之间的结合点,因此它由 Arc 拥有并受 Mutex 保护。(同步互斥锁在这里很好用。)轮询此 Future 会检查 value 是否存在,如果不存在则将唤醒器保存在 waker 中。运行闭包的线程会将其返回值保存在 value 中,然后调用 waker(如果存在的话)。
下面是 spawn_blocking 的完整定义: 创建 Shared 值后,就会启动一个线程来运行此闭包,将结果存储在 Shared 的 value 字段中,并调用唤醒器(如果有的话)。
可以为 SpawnBlocking 实现 Future,如下所示: 轮询 SpawnBlocking 来检查闭包的值是否就绪,如果已经就绪,
就接手这个值的所有权并返回它。否则,Future 仍然处于
Pending 状态,因此它在 Future 的 waker 字段中保存了此上下文中唤醒器的克隆体。
一旦 Future 返回了 Poll::Ready,就不应该再次对其进行轮询。诸如 await 和 block_on 之类消耗 Future 的常用方式都遵守这条规则。过度轮询 SpawnBlocking 型 Future 并不会发生什么可怕的事情,因此也不必花费精力来处理这种情况。这就是典型的手写型 Future。 除了能够实现原始 Future,我们还拥有构建简单执行器所需的全部 “零件”。在本节中,我们将编写自己的 block_on 版本。它会比 async_std 的版本简单很多,比如,它不支持 spawn_local、任务局部变量或嵌套调用(从异步代码调用 block_on)。但这已足够运行我们的聊天客户端和服务器了。
代码如下所示: 上述代码虽然很短,但做了很多事,我们慢慢讲。 crossbeam crate 的 Parker 类型是一个简单的阻塞原语:调用 parker.park() 阻塞线程,直到其他人在相应的 Unparker(可以通过调用 parker.unparker() 预先获得)上调用 .unpark()。如果要 unpark 一个尚未停泊(park)的线程,那么它的下一次 park 调用将立即返回,而不会阻塞。这里的
block_on 将使用 Parker 在 Future 未就绪时等待,而我们传给 Future 的唤醒器将解除停泊。 来自 waker_fn crate 的 waker_fn 函数会从给定的闭包创建一个 Waker。在这里,我们制作了一个 Waker,当调用它时,它会调用闭包 move || unparker.unpark()。还可以通过实现 std::task::Wake 特型来创建唤醒器,但这里用 waker_fn 更方便一些。 给定一个携带 F 类型 Future 的变量,pin! 宏[ 在 Rust 1.68 及以上版本中,标准库已经内置 pin! 宏。——译者注]会获取 Future 的所有权并声明一个同名的新变量,其类型为 Pin<&mut F> 并借入了此 Future。这就为我们提供了 poll 方法所需的 Pin<&mut Self>。异步函数和异步块返回的 Future 必须在轮询之前通过 Pin 换成引用,20.4 节会对此进行解释。 最后,轮询循环非常简单。以一个携带唤醒器的上下文为入参,我们
会轮询 Future 直到它返回 Poll::Ready。如果返回的是 Poll::Pending,我们会暂停此线程,并阻塞到调用了 waker 为止。放行后就再重试。
as_mut 调用能让我们在不放弃所有权的情况下对 future 进行轮询,20.4 节会对此进行详细解释。 5pin 原意为“图钉”,这里指把某个值固定在原位,防止被移动。——译者注
尽管异步函数和异步块对于编写清晰的异步代码至关重要,但处理它们的 Future 时要小心一点儿。Pin 类型有助于确保 Rust 安全地使用它们。
本节首先会展示为什么异步函数调用和异步块的 Future 不能像普通
Rust 值那样随意处理;然后会展示 Pin 如何用作指针的“许可印章”,我们可以依靠这些“盖章指针”来安全地管理此类 Future;最后会展示几种使用 Pin 值的方法。 考虑下面这个简单的异步函数: 这会打开到给定地址的 TCP 连接,并以 String 的形式返回服务器发送的任何内容。标有 ➊、➋ 和 ➌ 的点是恢复点,即异步函数代码中可以暂停执行的点。
假设你调用它,但没有等待,就像下面这样: 现在 response 是一个 Future,它准备在 fetch_string 的开头开始执行,并带有给定的参数。在内存中,Future 看起来如图
20-5 所示。 图 20-5:为调用 fetch_string 而构建的 Future
由于我们刚刚创建了这个 Future,因此它认为执行应该从函数体顶部的恢复点 ➊ 开始。在这种状态下,Future 唯一能给出的值就是函数参数。
现在假设你对 response 进行了几次轮询,并且它在函数体中到达了下面这个点:
socket.read_to_string(&mut buf).await➌?;
进一步假设 read_to_string 的结果尚未就绪,因此轮询会返回 Poll::Pending。此时,Future 看起来如图 20-6 所示。 图 20-6:同一个 Future,正在等待 read_to_string
Future 必须始终保存下一次轮询时恢复执行需要的所有信息。在这种情况下是如下内容。
恢复点 ➌,表示执行应该在 await 处恢复,那时正在轮询
read_to_string 返回的 Future。
在那个恢复点处于活动状态的变量:socket 和 buf。address 的值在 Future 中不会再出现,因为该函数已不再需要它。
read_to_string 的子 Future,await 表达式正在对其进行轮询。
请注意,对 read_to_string 的调用借用了对 socket 和 buf 的引用。在同步函数中,所有局部变量都存在于栈中,但在异步函数
中,在 await 中仍然存活的局部变量必须位于 Future 中,这样当再次轮询时它们才是可用的。借入对这样一个变量的引用,就是借入了 Future 中的一部分。
然而,Rust 要求值在已借出时就不能再移动了。假设要将下面这个
Future 移动到一个新位置: Rust 无法找出所有活动引用并相应地调整它们。引用不会指向新位置的 socket 和 buf,而是继续指向它们在当前处于未初始化状态的 response 中的旧位置。它们变成了悬空指针,如图 20-7 所示。 图 20-7:fetch_string 返回的 Future,在已借出时移动(Rust 会阻止这样做)
防止已借出的值被移动通常是借用检查器的责任。借用检查器会将变量视为所有权树的根。但与存储在栈中的变量不同,如果 Future 本身已移动,则存储在 Future 中的变量也会移动。这意味着 socket
和 buf 的借用不仅会影响 fetch_string 可以用自己的变量做什么,还会影响其调用者可以安全地用 response(也就是持有这些变量的 Future)做什么。异步函数的 Future 是借用检查器的盲点,如果 Rust 想要保持其内存安全承诺,就必须以某种方式解决这个问题。
Rust 对这个问题的解决方案基于这样一种洞见:Future 在首次创建时总是可以安全地移动,只有在轮询时才会变得不安全。在一开始,通过调用异步函数创建的 Future 仅包含一个恢复点和参数值。这些
仅仅存在于尚未开始执行的异步函数主体的作用域内。只有当轮询
Future 时才会借用其内容。
由此可见,每一个 Future 的生命周期中都有两个阶段。 第一阶段从刚创建 Future 时开始。因为函数体还没有开始执行,所以它的任何部分都不可能被借用。在这一点上,移动它和移动其他 Rust 值一样安全。
第二阶段在第一次轮询 Future 时开始。一旦函数的主体开始执行,它就可以借用对存储在 Future 中的变量的引用,然后等待,保留对 Future 持有的变量的借用。从第一次轮询开始,就必须假设 Future 不能被安全地移动了。
第一个生命阶段的灵活性让我们能够将 Future 传给 block_on 和 spawn 并调用适配器方法(如 race 和 fuse),所有这些都会按值获取 Future。事实上,即使最初创建 Future 的那次异步函数调用也必须将其返回给调用者,那同样是一次移动。
要进入 Future 的第二个生命阶段,就必须对 Future 进行轮询。 poll 方法要求将 Future 作为 Pin<&mut Self> 值传递。Pin 是指针类型(如 &mut Self)的包装器,它限制了指针的使用方式,以确保它们的引用目标(如 Self)永远不会再次移动。因此,必须首先生成一个指向 Future 的以 Pin 包装的指针,然后才能对其进行轮询。
这就是 Rust 确保 Future 安全的策略:Future 只有在轮询之前移动才不会有危险,在构建指向 Future 的以 Pin 包装的指针之前无法轮询 Future,一旦这么做了,Future 就不可再移动。
“一个无法移动的值”听起来有点儿不可思议,因为在 Rust 中移动无处不在。20.4.2 节会详细解释 Pin 是如何保护 Future 的。
尽管本节讨论的是异步函数,但这里的所有内容也适用于异步块。一个新创建的异步块的 Future 只会从它周围的代码中捕获要使用的变量,就像闭包一样。只有轮询 Future 时才会创建对其内容的引用,使其移动变得不安全。
请记住,这种移动的脆弱性仅限于异步函数和异步块的 Future,以及编译器为它们生成的特殊 Future 实现。如果你为自己的类型手动实现了 Future,就像我们在 20.3.1 节为 SpawnBlocking 类型所做的那样,那么这样的 Future 无论在轮询之前还是之后移动都是完全安全的。在任何手写的 poll 实现中,借用检查器会确保当
poll 返回时你已借出的任何对 self 部分的引用都已消失。正是因
为异步函数和异步块有能力在函数调用过程中暂停执行并仍持有借用,所以才必须小心处理它们的 Future。 Pin 类型是指向 Future 的指针的包装器,它限制了指针的用法,以确保 Future 一旦被轮询就不能移动。这些限制对于不介意被移动的 Future 是可以取消的,但对于需要安全地轮询异步函数和异步块的 Future 必不可少。
这里的指针指的是任何实现了 Deref 或 DerefMut 的类型。包裹在指针上的 Pin 称为固定指针。Pin<&mut T> 和 Pin 请注意,pointer 字段不是 pub 的。这意味着构造或使用 Pin 的唯一方法是借助该类型提供的经过精心设计的方法。
给定一个异步函数或异步块返回的 Future,只有以下几种方法可以获得指向它的固定指针。
pin! 宏来自 futures-lite crate,它会用新的 Pin<&mut T> 类型的变量遮蔽 T 类型的变量。新变量会指向原始值,而原始值已移至栈中的匿名临时位置。当新变量超出作用域时,原始值会被丢弃。我们用 pin! 在 block_on 实现中固定了想要轮询的 Future。
标准库的 Box::pin 构造函数能获取任意类型 T 值的所有权,将其移动到堆中,并返回 Pin 在这里,pin! 宏已将 future 重新声明为 Pin<&mut F>,因此可以将其传给 poll。但是可变引用不是 Copy 类型,因此 Pin<&mut F> 也不是 Copy 类型,这意味着直接调用 future.poll() 将取得 future 的所有权,进而导致循环的下一次迭代留下未初始化的变量。为了避免这种情况,我们会调用 future.as_mut() 为每次循环迭代重新借入新的 Pin<&mut F>。
无法获得对已固定 Future 的 &mut 引用,因为如果可以获得该引
用,那么你就能用 std::mem::replace 或 std::mem::swap 将其移动出来并在原位置放入另一个 Future。
之所以不必担心普通异步代码中的固定 Future,是因为获取
Future 的最终值的最常见方式(等待它或传给执行器)都要求拥有
Future 的所有权并会在内部管理固定指针。例如,我们的
block_on 实现会接手 Future 的所有权并使用 pin! 来生成轮询所需的 Pin<&mut F> 的宏。await 表达式也会接手 Future 的所有权,其内部实现类似于 pin! 宏。 然而,并不是所有的 Future 都需要这样小心翼翼地处理。对于普通类型(如前面提到的 SpawnBlocking 类型)的 Future 的任何手写实现,这些对构造和使用固定指针方面的限制都是不必要的。
这种耐用类型实现了 Unpin 标记特型: Rust 中的几乎所有类型都使用编译器中的特殊支持自动实现了
Unpin。异步函数和异步块返回的 Future 是这条规则的例外情况。
对于各种 Unpin 类型,Pin 没有任何限制。可以使用 Pin::new 从普通指针创建固定指针,然后使用 Pin::into_inner 取回该指针。Pin 本身会传递指针自己的 Deref 实现和 DerefMut 实现。
例如,String 实现了 Unpin,所以可以这样写: 即使在制作出 Pin<&mut String> 之后,仍然可以完全可变地访问字符串,并且一旦这个 Pin 被 into_inner 消耗,可变引用消失后就可以将其转移给新变量。因此,对 Unpin 类型(几乎所有类型)来说,Pin 只是指向该类型指针的一个无聊包装器而已。
这意味着当你为自己的 Unpin 类型实现 Future 时,你的 poll 实现可以将 self 视为 &mut Self,而不是 Pin<&mut Self>。
Pin 成了几乎可以忽略的东西。
令人惊讶的是,即使 F 没有实现 Unpin,Pin<&mut F> 和
Pin 异步代码比多线程代码更难写。你必须使用正确的 I/O 和同步原语,手动分解长时间运行的计算或将它们分拆到其他线程上,并管理多线程代码中不会遇到的其他细节,比如固定。那么异步代码到底有哪些优势呢?
下面是你常会听到的两种说法,但它们经不起仔细推敲。
“异步代码非常适合 I/O。”这不完全正确。如果应用程序正在花费时间等待 I/O,那么把它变成异步形式并不会让 I/O 运行得更快。如今普遍使用的异步 I/O 接口没有什么比同步接口更高效的地方。对于这两种方式,操作系统会完成同样的工作。(事实上,未就绪的异步 I/O 操作必须稍后重试,因此需要两次系统调用才能完成,而不是一次。)
“异步代码比多线程代码更容易编写。”在 JavaScript、Python 等语言中,这很可能是正确的。在这些语言中,程序员使用
async/await 作为并发的一种形式:有一个执行线程,并且中断只发生在 await 表达式中,因此通常不需要互斥锁来保持数据一致,只是不要在使用它的中途进行 await。当只有在你的明确许可下才可能发生任务切换时,代码会更容易理解。
但是这个论点不适用于 Rust,因为在 Rust 中线程用起来几乎不怎么麻烦。一旦程序编译完成,就不会出现数据竞争。非确定性行为仅限于同步特性,比如互斥锁、通道、原子等,它们都是为应对该行为而设计的。因此,异步代码并不能帮你更好地了解其他线程何时会影响你,这在所有安全的 Rust 代码中一目了然。
当然,在与线程结合使用时,Rust 的异步支持真的很出色。如果放弃这种用法实在太可惜了。
那么,异步代码的真正优势是什么呢? 异步任务可以使用更少的内存。在 Linux 上,一个线程的内存
使用量至少为 20 KiB,包括用户空间和内核空间的内存使用量。
6
Future 则小得多:我们的聊天服务器的 Future 只有几百字节大小,并且随着 Rust 编译器的改进还能变得更小。
异步任务的创建速度更快。在 Linux 上,创建一个线程大约需要 15 微秒。而启动一个异步任务大约需要 300 纳秒,仅为创建线程所花费时间的约 1/50。
异步任务之间的上下文切换比操作系统线程之间的上下文切换更快。在 Linux 上这两个操作所需时间分别为 0.2 微秒和 1.7 微秒。7然而,这些都是最佳情况下的数值:如果切换是由于 I/O 就绪导致的,则这两个操作的时间都会上升到 1.7 微秒。线程之间的切换和不同处理器核心上的任务之间的切换大不相同:跨核心的通信非常慢。
6这包括内核内存,并按为线程分配的物理分页(而非虚拟分页)中尚未分配的数量计算。
macOS 和 Windows 上的数量类似。
7Linux 上下文切换也曾处于 0.2 微秒范围内,直到其内核由于处理器安全漏洞而被迫使用较慢的技术。
这给了我们一个关于异步代码适合解决哪种问题的提示。例如,异步服务器可能想为每项任务使用更少的内存,以便处理更多的并发连接。(这可能就是异步代码常因“适合 I/O”而享有盛誉的原因。)或者,如果你的设计可以自然地组织成许多相互通信的独立任务,那么每项任务开销低、创建时间短,并且能快速切换上下文都会是重要的优势。这就是为什么聊天服务器是异步编程的经典示例。不过,多人游戏和网络路由器也可能是很好的应用场景。
在其他场景中,要做出是否使用异步编程的决定就不这么显而易见了。如果你的程序有一个线程池来执行繁重的计算或闲置以等待 I/O 完成,那么前面列出的优势可能对其性能影响不大。你必须优化自己的计算,找到更快的网络连接,或者做点儿能实际影响这些限制因素的其他事情。
在实践中,我们能找到的每一个关于实现大容量服务器的说明,都强调了通过测量、调整和不懈努力来识别和消除任务之间产生争用的根源的重要性。异步架构无法让你跳过这些工作中的任何一项。事实上,虽然很多现成的工具可以评估多线程程序的行为,但这些工具无法识别 Rust 异步任务,因此它们需要自己的工具。(正如一位智者曾经说过的:“现在,你有两个问题了。”)
即使现在不使用异步代码,但如果将来你能有幸比现在忙得多,那么至少了解这个选项的存在也绝对是件好事。impl Stream<Item = ChatResult<P>>
use async_chat::FromServer;
async fn handle_replies(from_server: net::TcpStream) -> ChatResult<()> {
let buffered = io::BufReader::new(from_server);
let mut reply_stream = utils::receive_as_json(buffered);
while let Some(reply) = reply_stream.next().await {
match reply? {
FromServer::Message {
group_name,
message,
} => {
println!("message posted to {}: {}", group_name, message);
}
FromServer::Error(message) => {
println!("error from server: {}", message);
}
}
}
Ok(())
}
20.2.6 客户端的 main 函数
use async_std::task;
fn main() -> ChatResult<()> {
let address = std::env::args().nth(1).expect("Usage: client ADDRESS:PORT");
task::block_on(async {
let socket = net::TcpStream::connect(address).await?;
socket.set_nodelay(true)?;
let to_server = send_commands(socket.clone());
let from_server = handle_replies(socket);
from_server.race(to_server).await?;
Ok(())
})
}
let to_server = task::spawn(send_commands(socket.clone()));
let from_server = task::spawn(handle_replies(socket));
to_server.await?;
from_server.await?;
20.2.7 服务器的 main 函数
use async_chat::utils::ChatResult;
use async_std::prelude::*;
use std::sync::Arc;
mod connection;
mod group;
mod group_table;
use connection::serve;
fn main() -> ChatResult<()> {
let address = std::env::args().nth(1).expect(
"Usage: server
ADDRESS",
);
let chat_group_table = Arc::new(group_table::GroupTable::new());
async_std::task::block_on(async {
// 下面这段代码曾在本章的章节介绍中展示过
use async_std::{net, task};
let listener = net::TcpListener::bind(address).await?;
let mut new_connections = listener.incoming();
while let Some(socket_result) = new_connections.next().await {
let socket = socket_result?;
let groups = chat_group_table.clone();
task::spawn(async {
log_error(serve(socket, groups).await);
});
}
Ok(())
})
}
fn log_error(result: ChatResult<()>) {
if let Err(error) = result {
eprintln!("Error: {}", error);
}
}
20.2.8 处理聊天连接:异步互斥锁
use async_chat::utils::{self, ChatResult};
use async_chat::{FromClient, FromServer};
use async_std::io::BufReader;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::sync::Arc;
use crate::group_table::GroupTable;
pub async fn serve(socket: TcpStream, groups: Arc<GroupTable>) -> ChatResult<()> {
let outbound = Arc::new(Outbound::new(socket.clone()));
let buffered = BufReader::new(socket);
let mut from_client = utils::receive_as_json(buffered);
while let Some(request_result) = from_client.next().await {
let request = request_result?;
let result = match request {
FromClient::Join { group_name } => {
let group = groups.get_or_create(group_name);
group.join(outbound.clone());
Ok(())
}
FromClient::Post {
group_name,
message,
} => match groups.get(&group_name) {
Some(group) => {
group.post(message);
Ok(())
}
None => Err(format!("Group '{}' does not exist", group_name)),
},
};
if let Err(message) = result {
let report = FromServer::Error(message);
outbound.send(report).await?;
}
}
Ok(())
}
use async_std::sync::Mutex;
pub struct Outbound(Mutex<TcpStream>);
impl Outbound {
pub fn new(to_client: TcpStream) -> Outbound {
Outbound(Mutex::new(to_client))
}
pub async fn send(&self, packet: FromServer) -> ChatResult<()> {
let mut guard = self.0.lock().await;
utils::send_as_json(&mut *guard, &packet).await?;
guard.flush().await?;
Ok(())
}
}
20.2.9 群组表:同步互斥锁
use crate::group::Group;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub struct GroupTable(Mutex<HashMap<Arc<String>, Arc<Group>>>);
impl GroupTable {
pub fn new() -> GroupTable {
GroupTable(Mutex::new(HashMap::new()))
}
pub fn get(&self, name: &String) -> Option<Arc<Group>> {
self.0.lock().unwrap().get(name).cloned()
}
pub fn get_or_create(&self, name: Arc<String>) -> Arc<Group> {
self.0
.lock()
.unwrap()
.entry(name.clone())
.or_insert_with(|| Arc::new(Group::new(name)))
.clone()
}
}
20.2.10 聊天组:tokio 的广播通道

use crate::connection::Outbound;
use async_std::task;
use std::sync::Arc;
use tokio::sync::broadcast;
pub struct Group {
name: Arc<String>,
sender: broadcast::Sender<Arc<String>>,
}
impl Group {
pub fn new(name: Arc<String>) -> Group {
let (sender, _receiver) = broadcast::channel(1000);
Group { name, sender }
}
pub fn join(&self, outbound: Arc<Outbound>) {
let receiver = self.sender.subscribe();
task::spawn(handle_subscriber(self.name.clone(), receiver, outbound));
}
pub fn post(&self, message: Arc<String>) {
// 这只会在没有订阅者时返回错误。连接的发送端可能会退出,并恰好赶在其
// 接收端回复之前丢弃订阅,这可能会 终导致接收端试图向空组回复消息
let _ignored = self.sender.send(message);
}
}
use async_chat::FromServer;
use tokio::sync::broadcast::error::RecvError;
async fn handle_subscriber(
group_name: Arc<String>,
mut receiver: broadcast::Receiver<Arc<String>>,
outbound: Arc<Outbound>,
) {
loop {
let packet = match receiver.recv().await {
Ok(message) => FromServer::Message {
group_name: group_name.clone(),
message: message.clone(),
},
Err(RecvError::Lagged(n)) => {
FromServer::Error(format!("Dropped {} messages from {}.", n, group_name))
}
Err(RecvError::Closed) => break,
};
if outbound.send(packet).await.is_err() {
break;
}
}
}
20.3 原始 Future 与执行器:Future 什么时候值得再次轮询
task::block_on(async {
let socket = net::TcpStream::connect(address).await?;
})
use std::task::Waker;
struct MyPrimitiveFuture {
...
waker: Option<Waker>,
}
impl Future for MyPrimitiveFuture { type Output = ...;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<...> { ... if ... future is ready ... {
return Poll::Ready(final_value);
}
// 保存此唤醒器以备后用
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
// 如果有一个唤醒器,就调用它,并清除`self.waker`
if let Some(waker) = self.waker.take() {
waker.wake();
}
20.3.1 调用唤醒器:spawn_blocking
pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,
use std::sync::{Arc, Mutex};
use std::task::Waker;
pub struct SpawnBlocking<T>(Arc<Mutex<Shared<T>>>);
struct Shared<T> {
value: Option<T>,
waker: Option<Waker>,
}
pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
let inner = Arc::new(Mutex::new(Shared {
value: None,
waker: None,
}));
std::thread::spawn({
let inner = inner.clone();
move || {
let value = closure();
let maybe_waker = {
let mut guard = inner.lock().unwrap();
guard.value = Some(value);
guard.waker.take()
};
if let Some(waker) = maybe_waker {
waker.wake();
}
}
});
SpawnBlocking(inner)
}
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl<T: Send> Future for SpawnBlocking<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut guard = self.0.lock().unwrap();
if let Some(value) = guard.value.take() {
return Poll::Ready(value);
}
guard.waker = Some(cx.waker().clone());
Poll::Pending
}
}
20.3.2 实现 block_on
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1" use futures_lite::pin; // Cargo.toml: futures-lite = "1.11" use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8" use std::future::Future; use std::task::{Context, Poll};
fn block_on<F: Future>(future: F) -> F::Output {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
pin!(future);
loop {
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
}
}
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
pin!(future);
loop {
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
}
5
20.4 固定(Pin)
20.4.1 Future 生命周期的两个阶段
use async_std::io::prelude::*; use async_std::{io, net};
async fn fetch_string(address: &str) -> io::Result<String> {
➊
let mut socket = net::TcpStream::connect(address).await➋?; let mut buf = String::new();
socket.read_to_string(&mut buf).await➌?;
Ok(buf)
}
let response = fetch_string("localhost:6502");


let new_variable = response;

20.4.2 固定指针
pub struct Pin<P> {
pointer: P,
}
pin!(future);
loop {
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
}
20.4.3 Unpin 特型
trait Unpin {}
let mut string = "Pinned?".to_string();
let mut pinned: Pin<&mut String> = Pin::new(&mut string);
pinned.push_str(" Not");
Pin::into_inner(pinned).push_str(" so much.");
let new_home = string;
assert_eq!(new_home, "Pinned? Not so much.");
20.5 什么时候要用异步代码