16.2 使用消息传递在线程间传送数据

INFO

ch16-02-message-passing.mdopen in new window
commit 36383b4da21dbd0a0781473bc8ad7ef0ed1b6751

一个日益流行的确保安全并发的方式是 消息传递message passing),这里线程或 actor 通过发送包含数据的消息来相互沟通。这个思想来源于 Go 编程语言文档中open in new window 的口号:“不要通过共享内存来通讯;而是通过通讯来共享内存。”(“Do not communicate by sharing memory; instead, share memory by communicating.”)

为了实现消息传递并发,Rust 标准库提供了一个 信道channel)实现。信道是一个通用编程概念,表示数据从一个线程发送到另一个线程。

你可以将编程中的信道想象为一个水流的渠道,比如河流或小溪。如果你将诸如橡皮鸭或小船之类的东西放入其中,它们会顺流而下到达下游。

编程中的信息渠道(信道)有两部分组成,一个发送者(transmitter)和一个接收者(receiver)。发送者位于上游位置,在这里可以将橡皮鸭放入河中,接收者则位于下游,橡皮鸭最终会漂流至此。代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。当发送者或接收者任一被丢弃时可以认为信道被 关闭closed)了。

这里,我们将开发一个程序,它会在一个线程生成值向信道发送,而在另一个线程会接收值并打印出来。这里会通过信道在线程间发送简单值来演示这个功能。一旦你熟悉了这项技术,你就可以将信道用于任何相互通信的任何线程,例如一个聊天系统,或利用很多线程进行分布式计算并将部分计算结果发送给一个线程进行聚合。

首先,在示例 16-6 中,创建了一个信道但没有做任何事。注意这还不能编译,因为 Rust 不知道我们想要在信道中发送什么类型:

文件名:src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

示例 16-6: 创建一个信道,并将其两端赋值给 txrx

这里使用 mpsc::channel 函数创建一个新的信道;mpsc多个生产者,单个消费者multiple producer, single consumer)的缩写。简而言之,Rust 标准库实现信道的方式意味着一个信道可以有多个产生值的 发送sending)端,但只能有一个消费这些值的 接收receiving)端。想象一下多条小河小溪最终汇聚成大河:所有通过这些小河发出的东西最后都会来到下游的大河。目前我们以单个生产者开始,但是当示例可以工作后会增加多个生产者。

mpsc::channel 函数返回一个元组:第一个元素是发送端 -- 发送者,而第二个元素是接收端 -- 接收者。由于历史原因,txrx 通常作为 发送者transmitter)和 接收者receiver)的缩写,所以这就是我们将用来绑定这两端变量的名字。这里使用了一个 let 语句和模式来解构了此元组;第十八章会讨论 let 语句中的模式和解构。现在只需知道使用 let 语句是一个方便提取 mpsc::channel 返回的元组中一部分的手段。

让我们将发送端移动到一个新建线程中并发送一个字符串,这样新建线程就可以和主线程通讯了,如示例 16-7 所示。这类似于在河的上游扔下一只橡皮鸭或从一个线程向另一个线程发送聊天信息:

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

示例 16-7: 将 tx 移动到一个新建的线程中并发送 “hi”

这里再次使用 thread::spawn 来创建一个新线程并使用 movetx 移动到闭包中这样新建线程就拥有 tx 了。新建线程需要拥有信道的发送端以便能向信道发送消息。信道的发送端有一个 send 方法用来获取需要放入信道的值。send 方法返回一个 Result<T, E> 类型,所以如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作会返回错误。在这个例子中,出错的时候调用 unwrap 产生 panic。不过对于一个真实程序,需要合理地处理它:回到第九章复习正确处理错误的策略。

在示例 16-8 中,我们在主线程中从信道的接收者获取值。这类似于在河的下游捞起橡皮鸭或接收聊天信息:

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

示例 16-8: 在主线程中接收并打印内容 “hi”

信道的接收者有两个有用的方法:recvtry_recv。这里,我们使用了 recv,它是 receive 的缩写。这个方法会阻塞主线程执行直到从信道中接收一个值。一旦发送了一个值,recv 会在一个 Result<T, E> 中返回它。当信道发送端关闭,recv 会返回一个错误表明不会再有新的值到来了。

try_recv 不会阻塞,相反它立刻返回一个 Result<T, E>Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv 很有用:可以编写一个循环来频繁调用 try_recv,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。

出于简单的考虑,这个例子使用了 recv;主线程中除了等待消息之外没有任何其他工作,所以阻塞主线程是合适的。

如果运行示例 16-8 中的代码,我们将会看到主线程打印出这个值:

Got: hi

完美!

信道与所有权转移

所有权规则在消息传递中扮演了重要角色,其有助于我们编写安全的并发代码。防止并发编程中的错误是在 Rust 程序中考虑所有权的一大优势。现在让我们做一个试验来看看信道与所有权如何一同协作以避免产生问题:我们将尝试在新建线程中的信道中发送完 val之后 再使用它。尝试编译示例 16-9 中的代码并看看为何这是不允许的:

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

示例 16-9: 在我们已经发送到信道中后,尝试使用 val 引用

这里尝试在通过 tx.send 发送 val 到信道中之后将其打印出来。允许这么做是一个坏主意:一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。然而,尝试编译示例 16-9 的代码时,Rust 会给出一个错误:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                          ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider cloning the value if the performance cost is acceptable
   |
9  |         tx.send(val.clone()).unwrap();
   |                    ++++++++

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

我们的并发错误会造成一个编译时错误。send 函数获取其参数的所有权并移动这个值归接收者所有。这可以防止在发送后再次意外地使用这个值;所有权系统检查一切是否合乎规则。

发送多个值并观察接收者的等待

示例 16-8 中的代码可以编译和运行,不过它并没有明确的告诉我们两个独立的线程通过信道相互通讯。示例 16-10 则有一些改进会证明示例 16-8 中的代码是并发执行的:新建线程现在会发送多个消息并在每个消息之间暂停一秒钟。

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

示例 16-10: 发送多个消息,并在每次发送后暂停一段时间

这一次,在新建线程中有一个字符串 vector 希望发送到主线程。我们遍历它们,单独的发送每一个字符串并通过一个 Duration 值调用 thread::sleep 函数来暂停一秒。

在主线程中,不再显式调用 recv 函数:而是将 rx 当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当信道被关闭时,迭代器也将结束。

当运行示例 16-10 中的代码时,将看到如下输出,每一行都会暂停一秒:

Got: hi
Got: from
Got: the
Got: thread

因为主线程中的 for 循环里并没有任何暂停或等待的代码,所以可以说主线程是在等待从新建线程中接收值。

通过克隆发送者来创建多个生产者

之前我们提到了mpscmultiple producer, single consumer 的缩写。可以运用 mpsc 来扩展示例 16-10 中的代码来创建向同一接收者发送值的多个线程。这可以通过克隆发送者来做到,如示例 16-11 所示:

文件名:src/main.rs

    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--

示例 16-11: 从多个生产者发送多个消息

这一次,在创建新线程之前,我们对发送者调用了 clone 方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的信道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向信道的接收端发送不同的消息。

如果运行这些代码,你 可能 会看到这样的输出:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

虽然你可能会看到这些值以不同的顺序出现;这依赖于你的系统。这也就是并发既有趣又困难的原因。如果通过 thread::sleep 做实验,在不同的线程中提供不同的值,就会发现它们的运行更加不确定,且每次都会产生不同的输出。

现在我们见识过了信道如何工作,再看看另一种不同的并发方式吧。

Last Updated 2024-09-21 08:40:20