参考章节《Rust 程序设计语言》第16.2章 共享状态并发
参考章节《Rust语言圣经(Rust Course)》第3.6.4章 线程同步:锁、Condvar 和信号量
除了
上一章的Channel(信道)
以外,Rust也可以通过操作共享内存
的方式在多个线程之间共享数据
两种方式有什么区别
信道(消息传递方式)
类似于单所有权
,一旦将一个值传送到信道中,将无法再使用这个值。
共享内存
类似于多所有权
,多个线程可以同时访问相同的内存位置。
由于共享内存
拥有多所有权
,因此出于线程安全考虑
,我们需要一个机制
来限制同一时刻只能有一个线程能访问这块内存
这个机制
就是锁
,锁
有很多种,我们先看看一个在多线程场景中最常用的互斥锁
,也有叫做互斥体
、互斥器
,它们都是指同一个东西
我猜
读到这里你肯定有一个疑问如果我们对它做了限制,那和单所有权有什么区别? 那我为什么还要用共享内存的方式?
这个问题先别着急,我们在本章的最后再来讨论,现在,让我们先看看互斥锁
的用法
互斥器一次只允许一个线程访问数据
什么是互斥器?
在任意时刻,其只允许一个线程访问某些数据
。
为了访问互斥器中的数据,线程首先需要通过获取互斥器的锁(lock)
来表明其希望访问数据。
锁
是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥器为通过锁
系统保护(guarding)其数据。
互斥器以难以使用著称,因为你不得不记住
- 在使用数据之前尝试获取锁。
(这个没什么好说的)
- 处理完被互斥器所保护的数据之后,必须解锁数据,这样其他线程才能够获取锁。
(难是难在这一点上,很多时候你可能会忘了释放锁)
好在Rust的所有权系统会帮助我们释放锁,你需要做的只是管理好锁的作用域
虽然所有权系统会减轻我们使用锁时的负担,但Rust 仍然无法帮我们避免所有的逻辑错误,如使用不当仍然可能造成死锁
互斥器的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0); // 互斥器保存一个共享数据
let mut handles = vec![]; // 存放线程句柄
// 创建10个线程,每个线程对互斥器中的共享数据+1
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap(); // 你可能发现了,明明这里返回的是锁,怎么就变成我们的num数值了
// 其实这里返回了一个智能指针MutexGuard<T>,它实现了Deref特征和Drop特征
// 正因为智能指针的使用,使得我们无需任何操作就能获取其中的数据。
*num += 1;
});
// 把线程返回值变量(句柄)加入Vector中保存起来
handles.push(handle);
}
// 遍历Vector,执行线程,并等待其结束
for handle in handles {
handle.join().unwrap();
}
// 打印结果
println!("Result: {}", *counter.lock().unwrap());
}
|
在这里我们创建了一个存放 i32
类型的 counter(互斥器)
,接下来遍历 range
创建了 10
个线程,并对所有线程使用了相同的闭包
。
它们每一个都将调用 lock
方法来获取 Mutex<T>
上的 锁
,接着将互斥器中的值 +1
。当线程执行结束时
num
会 离开闭包作用域
并 释放锁
,这样另一个线程就能获取它了
可惜这段代码并不能编译,尝试运行这个程序,你会看到类似如下的输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: use of moved value: `counter`
--> src/main.rs:9:36
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
9 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop
10 | let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure
For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` due to previous error
|
原因很简单,counter
发生了 移动
,但它不能被 移动
到 多个线程中
,因此,我们自然想到了用 Rc<T>
来解决,我们来看看下面的修改版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
|
然而,这段代码还是不能编译
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
--> src/main.rs:11:22
|
11 | let handle = thread::spawn(move || {
| ______________________^^^^^^^^^^^^^_-
| | |
| | `Rc<Mutex<i32>>` cannot be sent between threads safely
12 | | let mut num = counter.lock().unwrap();
13 | |
14 | | *num += 1;
15 | | });
| |_________- within this `[closure@src/main.rs:11:36: 15:10]`
|
= help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
= note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10]`
note: required by a bound in `spawn`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` due to previous error
|
原因是 Rc<T>
不能用于多线程
环境,它不是线程安全的
,因此我们需要的是一个完全类似 Rc<T>
,又以一种线程安全
的方式改变引用计数的类型。
原子引用计数 Arc<T>
所幸 Arc<T>
正是这么一个类似 Rc<T>
并可以安全的
用于并发环境的类型(说白了,它是线程安全
的)。
我们再次修改代码来看看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
|
这一次,我们的代码就可以编译通过,并正常运行了
Mutex/Arc 与RefCell/Rc 的相似性
聪明的你可能已经发现了 counter
虽然是不可变的,不过仍然 可以获取其内部值的可变引用
,这意味着 Mutex<T>
提供了内部可变性
,就像 RefCell
系列类型那样。
死锁问题
本章已经写的够多了,虽然我很不愿意再继续写了,但涉及到锁
,就一定绕不开一个死锁
问题,这个问题还很常见,因此必须要了解下
什么是死锁?
在多线程场景中,两个线程各持有一把锁,但又都在等待对方的锁,此时就会造成死锁
再说的直白一点就是,我锁住了一个资源,现在我需要获取另一个线程持有的锁后才会释放我的锁,此时另一个线程也锁住了一个资源,而恰好它也在等待我的锁
麻烦的是,在Rust中要想写一个死锁的例子还不是那么容易
1
|
static COUNTER1: Mutex<i32> = Mutex::new(0);
|
由于在Rust中不允许以这样的方式创建静态全局变量
,因此我们需要借助一个第三方库lazy_static
,它允许我们在运行时进行初始化
,我们只需要在 Cargo.toml
中加入
1
2
|
[dependencies]
lazy_static = "1.4.0"
|
好了,来看看我们下面的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
use std::{sync::Mutex, thread};
use lazy_static::lazy_static;
lazy_static! {
static ref COUNTER1: Mutex<i32> = Mutex::new(0);
static ref COUNTER2: Mutex<i32> = Mutex::new(0);
}
fn main() {
let handle1 = thread::spawn(move || {
let mut num1 = COUNTER1.lock().unwrap();
*num1 += 1;
println!("第一个线程获取了锁1");
println!("第一个线程开始等待了锁2");
let mut num2 = COUNTER2.lock().unwrap();
*num2 += 1;
println!("第一个线程获取了锁2");
});
let handle2 = thread::spawn(move || {
let mut num2 = COUNTER2.lock().unwrap();
*num2 += 1;
println!("第二个线程获取了锁2");
println!("第二个线程开始等待了锁1");
let mut num1 = COUNTER1.lock().unwrap();
*num1 += 1;
println!("第二个线程获取了锁1");
});
// 主线程等待这两个线程执行完在结束
let _ = handle1.join();
let _ = handle2.join();
println!("死锁没有发生");
}
|
需要知道的是,这段代码并不会100%发生死锁
,因为我们并不知道线程什么时候执行
,有可能线程1
完全执行完毕后线程2
才开始执行,这种情况下不会发生死锁
运行这段程序,如果你看到了类似如下的输出,则证明发生了死锁
1
2
3
4
5
6
7
8
|
Blocking waiting for file lock on build directory
Compiling rust_test v0.1.0 (/home/w/data/code/rust_test)
Finished dev [unoptimized + debuginfo] target(s) in 0.30s
Running `/home/w/data/code/rust_test/target/debug/rust_test`
第一个线程获取了锁1
第二个线程获取了锁2
第二个线程开始等待了锁1
第一个线程开始等待了锁2
|
try_lock
try_lock
会尝试
去获取一次锁,如果无法获取则会返回一个错误,因此不会发生阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
use std::{sync::Mutex, thread};
use lazy_static::lazy_static;
lazy_static! {
static ref COUNTER1: Mutex<i32> = Mutex::new(0);
static ref COUNTER2: Mutex<i32> = Mutex::new(0);
}
fn main() {
let handle1 = thread::spawn(move || {
let mut num1 = COUNTER1.lock().unwrap();
*num1 += 1;
println!("第一个线程获取了锁1");
println!("第一个线程开始等待了锁2");
let num2 = COUNTER2.try_lock(); // lock 换成 try_lock
println!("第一个线程获取锁2的结果是: {:?}", num2);
});
let handle2 = thread::spawn(move || {
let mut num2 = COUNTER2.lock().unwrap();
*num2 += 1;
println!("第二个线程获取了锁2");
println!("第二个线程开始等待了锁1");
let num1 = COUNTER1.try_lock(); // lock 换成 try_lock
println!("第二个线程获取锁1的结果是: {:?}", num1);
});
let _ = handle1.join();
let _ = handle2.join();
println!("死锁没有发生");
}
|
与之前的结果不同,这次这段代码无论如何将不会再发生死锁
1
2
3
4
5
6
7
|
第一个线程获取了锁1
第一个线程开始等待了锁2
第一个线程获取锁2的结果是: Err("WouldBlock")
第二个线程获取了锁2
第二个线程开始等待了锁1
第二个线程获取锁1的结果是: Ok(1)
死锁没有发生
|
如上所示,当try_lock
失败时,会报出一个错误:Err("WouldBlock")
,接着线程中的剩余代码会继续执行,不会被阻塞
。
Mutex<T>
是一个互斥锁
,它在任意时刻,其只允许一个线程访问某些数据
Mutex<T>
的 lock
方法返回一个MutexGuard<T> 智能指针
Mutex<T>
和 RefCell<T>
一样都具有内部可变性(允许我们对一个不可变的值进行可变借用)
Arc<T>
和 Rc<T>
一样,但它是线程安全的