Rust 学习笔记(32)-Channel

参考章节《Rust 程序设计语言》第16.2章 使用消息传递在线程间传送数据

学习线程的两大难点

  1. 如何在多个线程之间共享数据
  2. 如何保证线程安全

这一章我们就来看看在Rust中如何在多个线程之间共享数据

Rust中在线程之间共享数据采用的方式是Channel(信道),它实现了生产者消费者模型,它由两部分组成,一个发送者(transmitter)和一个接收者(receiver)

我们来看看下面这个简单的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
use std::sync::mpsc; // mpsc 是多个生产者,单个消费者 (multiple producer, single consumer) 的缩写。
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel(); // 创建一个信道

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap(); // 发送消息
    });

    let received = rx.recv().unwrap(); // 接收消息
    println!("Got: {}", received);
}

我们可以通过调用 mpsc::channel 方法来创建一个信道 mpsc::channel 函数返回一个元组:第一个元素是发送端,而第二个元素是接收端
发送端有一个 send 方法用来获取需要放入信道的值。send 方法返回一个 Result<T, E> 类型,如果接收端已经被丢弃了,发送操作将会返回错误。
接收端有两个有用的方法:recvtry_recvrecv方法会阻塞主线程执行直到从信道中拿到一个值。当发送端关闭,recv 会返回一个错误表明不会再有新的值到来了。
try_recv方法不会阻塞,相反它立刻返回一个 Result<T, E> 来表示当前时刻下信道中的值,Ok 值包含可用的信息,而 Err 值代表此时没有任何消息

运行这个程序

1
Got: hi

信道与所有权转移

我们来看看如下代码,我们将尝试在新建线程中的信道中发送完 val 值 之后再次使用它。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val); // 再次打印val
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

这段代码不能通过编译,原因是send方法获取了参数的所有权,那么问题来了,为什么要获取所有权?
答:如果不获取所有权,那么很可能我们在使用这个值时,它已经被另一个线程修改或丢弃

引用书上的话说就是

一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。

通过克隆发送者来创建多个生产者

在上面的代码中,我们提到了 mpscmultiple producer, single consumer(多生产者单消费者)的缩写。所以我们可以克隆信道的发送端,来向同一个接收端发送数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx2 = tx.clone(); // 克隆一个发送端

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap(); // 发送数据
            thread::sleep(Duration::from_secs(1)); // 暂停1秒
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx2.send(val).unwrap(); // 发送数据
            thread::sleep(Duration::from_secs(1)); // 暂停1秒
        }
    });

    // 将 rx 当作一个迭代器。
    for received in rx {
        println!("Got: {}", received);
    }
}

这代码真的很简单,没有什么难度或不能理解的

总结一下

  1. mpsc::channel 函数返回一个元组第一个元素是发送端,而第二个元素是接收端
  2. send 方法会获取参数的所有权
  3. recv 方法会阻塞主线程直到从信道中拿到一个值,而相反 try_recv 则不会阻塞,它立刻返回一个Result<T, E>,来表示此时信道中的值
  4. 可以通过克隆信道的发送端,来向同一个接收端发送数据
  5. 接收端可以当作一个迭代器来使用
updatedupdated2025-03-012025-03-01