参考章节《Rust 程序设计语言》第16.2章 使用消息传递在线程间传送数据
学习线程
的两大难点
- 如何在
多个线程之间共享数据
- 如何保证
线程安全
这一章我们就来看看在Rust中如何在多个线程之间共享数据
Rust
中在线程之间共享数据
采用的方式是Channel(信道)
,它实现了生产者
与消费者
模型,它由两部分组成,一个发送者(transmitter)
和一个接收者(receiver)
。
我们来看看下面这个简单的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
use std::sync::mpsc; // mpsc 是多个生产者,单个消费者 (multiple producer, single consumer) 的缩写。
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel(); // 创建一个信道
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap(); // 发送消息
});
let received = rx.recv().unwrap(); // 接收消息
println!("Got: {}", received);
}
|
我们可以通过调用 mpsc::channel
方法来创建一个信道 mpsc::channel
函数返回一个元组
:第一个元素是发送端
,而第二个元素是接收端
。
发送端
有一个 send
方法用来获取需要放入信道的值。send
方法返回一个 Result<T, E>
类型,如果接收端
已经被丢弃了,发送操作将会返回错误。
接收端
有两个有用的方法:recv
和 try_recv
。recv
方法会阻塞主线程
执行直到从信道中拿到一个值。当发送端
关闭,recv
会返回一个错误表明不会再有新的值到来了。
try_recv
方法不会阻塞
,相反它立刻返回一个 Result<T, E>
来表示当前时刻下信道
中的值,Ok
值包含可用的信息
,而 Err
值代表此时没有任何消息
。
运行这个程序
信道与所有权转移
我们来看看如下代码,我们将尝试在新建线程中的信道中发送完 val
值 之后再次使用它。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val); // 再次打印val
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
|
这段代码不能通过编译,原因是send
方法获取了参数的所有权
,那么问题来了,为什么要获取所有权?
答:如果不获取所有权,那么很可能我们在使用这个值时,它已经被另一个线程修改或丢弃
引用书上的话说就是
一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。
通过克隆发送者来创建多个生产者
在上面的代码中,我们提到了 mpsc
是 multiple producer, single consumer(多生产者单消费者)
的缩写。所以我们可以克隆信道的发送端
,来向同一个接收端
发送数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx2 = tx.clone(); // 克隆一个发送端
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap(); // 发送数据
thread::sleep(Duration::from_secs(1)); // 暂停1秒
}
});
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx2.send(val).unwrap(); // 发送数据
thread::sleep(Duration::from_secs(1)); // 暂停1秒
}
});
// 将 rx 当作一个迭代器。
for received in rx {
println!("Got: {}", received);
}
}
|
这代码真的很简单,没有什么难度或不能理解的
mpsc::channel
函数返回一个元组
第一个元素是发送端
,而第二个元素是接收端
send
方法会获取参数的所有权
recv
方法会阻塞主线程
直到从信道
中拿到一个值,而相反 try_recv 则不会阻塞
,它立刻返回一个Result<T, E>
,来表示此时信道
中的值
- 可以通过
克隆
信道的发送端
,来向同一个接收端
发送数据
接收端
可以当作一个迭代器
来使用