参考章节《Rust 程序设计语言》第16.2章 使用消息传递在线程间传送数据
学习线程的两大难点
- 如何在
多个线程之间共享数据
- 如何保证
线程安全
这一章我们就来看看在Rust中如何在多个线程之间共享数据
Rust中在线程之间共享数据采用的方式是Channel(信道),它实现了生产者与消费者模型,它由两部分组成,一个发送者(transmitter)和一个接收者(receiver)。
我们来看看下面这个简单的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
use std::sync::mpsc; // mpsc 是多个生产者,单个消费者 (multiple producer, single consumer) 的缩写。
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel(); // 创建一个信道
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap(); // 发送消息
});
let received = rx.recv().unwrap(); // 接收消息
println!("Got: {}", received);
}
|
我们可以通过调用 mpsc::channel 方法来创建一个信道 mpsc::channel 函数返回一个元组:第一个元素是发送端,而第二个元素是接收端。
发送端有一个 send 方法用来获取需要放入信道的值。send 方法返回一个 Result<T, E> 类型,如果接收端已经被丢弃了,发送操作将会返回错误。
接收端有两个有用的方法:recv 和 try_recv。recv方法会阻塞主线程执行直到从信道中拿到一个值。当发送端关闭,recv 会返回一个错误表明不会再有新的值到来了。
try_recv方法不会阻塞,相反它立刻返回一个 Result<T, E> 来表示当前时刻下信道中的值,Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。
运行这个程序
信道与所有权转移
我们来看看如下代码,我们将尝试在新建线程中的信道中发送完 val 值 之后再次使用它。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val); // 再次打印val
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
|
这段代码不能通过编译,原因是send方法获取了参数的所有权,那么问题来了,为什么要获取所有权?
答:如果不获取所有权,那么很可能我们在使用这个值时,它已经被另一个线程修改或丢弃
引用书上的话说就是
一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。
通过克隆发送者来创建多个生产者
在上面的代码中,我们提到了 mpsc 是 multiple producer, single consumer(多生产者单消费者)的缩写。所以我们可以克隆信道的发送端,来向同一个接收端发送数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx2 = tx.clone(); // 克隆一个发送端
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap(); // 发送数据
thread::sleep(Duration::from_secs(1)); // 暂停1秒
}
});
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx2.send(val).unwrap(); // 发送数据
thread::sleep(Duration::from_secs(1)); // 暂停1秒
}
});
// 将 rx 当作一个迭代器。
for received in rx {
println!("Got: {}", received);
}
}
|
这代码真的很简单,没有什么难度或不能理解的
mpsc::channel 函数返回一个元组第一个元素是发送端,而第二个元素是接收端
send 方法会获取参数的所有权
recv 方法会阻塞主线程直到从信道中拿到一个值,而相反 try_recv 则不会阻塞,它立刻返回一个Result<T, E>,来表示此时信道中的值
- 可以通过
克隆信道的发送端,来向同一个接收端发送数据
接收端可以当作一个迭代器来使用