Rust 学习笔记(33)-Mutex

参考章节《Rust 程序设计语言》第16.2章 共享状态并发
参考章节《Rust语言圣经(Rust Course)》第3.6.4章 线程同步:锁、Condvar 和信号量

除了上一章的Channel(信道)以外,Rust也可以通过操作共享内存的方式在多个线程之间共享数据

两种方式有什么区别

信道(消息传递方式)类似于单所有权,一旦将一个值传送到信道中,将无法再使用这个值。
共享内存类似于多所有权,多个线程可以同时访问相同的内存位置。

由于共享内存拥有多所有权,因此出于线程安全考虑,我们需要一个机制限制同一时刻只能有一个线程能访问这块内存
这个机制就是有很多种,我们先看看一个在多线程场景中最常用的互斥锁,也有叫做互斥体互斥器,它们都是指同一个东西

我猜读到这里你肯定有一个疑问如果我们对它做了限制,那和单所有权有什么区别? 那我为什么还要用共享内存的方式?

这个问题先别着急,我们在本章的最后再来讨论,现在,让我们先看看互斥锁的用法

互斥器一次只允许一个线程访问数据

什么是互斥器?

在任意时刻,其只允许一个线程访问某些数据
为了访问互斥器中的数据,线程首先需要通过获取互斥器的锁(lock)来表明其希望访问数据。
是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥器为通过系统保护(guarding)其数据。

互斥器以难以使用著称,因为你不得不记住

  1. 在使用数据之前尝试获取锁。(这个没什么好说的)
  2. 处理完被互斥器所保护的数据之后,必须解锁数据,这样其他线程才能够获取锁。(难是难在这一点上,很多时候你可能会忘了释放锁)

好在Rust的所有权系统会帮助我们释放锁,你需要做的只是管理好锁的作用域
虽然所有权系统会减轻我们使用锁时的负担,但Rust 仍然无法帮我们避免所有的逻辑错误,如使用不当仍然可能造成死锁

互斥器的使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0); // 互斥器保存一个共享数据
    let mut handles = vec![]; // 存放线程句柄

    // 创建10个线程,每个线程对互斥器中的共享数据+1
    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap(); // 你可能发现了,明明这里返回的是锁,怎么就变成我们的num数值了
            // 其实这里返回了一个智能指针MutexGuard<T>,它实现了Deref特征和Drop特征
            // 正因为智能指针的使用,使得我们无需任何操作就能获取其中的数据。

            *num += 1;
        });

        // 把线程返回值变量(句柄)加入Vector中保存起来
        handles.push(handle);
    }

    // 遍历Vector,执行线程,并等待其结束
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印结果
    println!("Result: {}", *counter.lock().unwrap());
}

在这里我们创建了一个存放 i32 类型的 counter(互斥器),接下来遍历 range 创建了 10 个线程,并对所有线程使用了相同的闭包
它们每一个都将调用 lock 方法来获取 Mutex<T> 上的 ,接着将互斥器中的值 +1当线程执行结束时 num离开闭包作用域释放锁,这样另一个线程就能获取它了

可惜这段代码并不能编译,尝试运行这个程序,你会看到类似如下的输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: use of moved value: `counter`
  --> src/main.rs:9:36
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` due to previous error

原因很简单,counter 发生了 移动,但它不能被 移动多个线程中,因此,我们自然想到了用 Rc<T> 来解决,我们来看看下面的修改版

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

然而,这段代码还是不能编译

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
   --> src/main.rs:11:22
    |
11  |           let handle = thread::spawn(move || {
    |  ______________________^^^^^^^^^^^^^_-
    | |                      |
    | |                      `Rc<Mutex<i32>>` cannot be sent between threads safely
12  | |             let mut num = counter.lock().unwrap();
13  | |
14  | |             *num += 1;
15  | |         });
    | |_________- within this `[closure@src/main.rs:11:36: 15:10]`
    |
    = help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
    = note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10]`
note: required by a bound in `spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` due to previous error

原因是 Rc<T> 不能用于多线程环境,它不是线程安全的,因此我们需要的是一个完全类似 Rc<T>,又以一种线程安全的方式改变引用计数的类型。

原子引用计数 Arc<T>

所幸 Arc<T> 正是这么一个类似 Rc<T> 并可以安全的用于并发环境的类型(说白了,它是线程安全的)。

我们再次修改代码来看看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

这一次,我们的代码就可以编译通过,并正常运行了

1
Result: 10

Mutex/Arc 与RefCell/Rc 的相似性

聪明的你可能已经发现了 counter 虽然是不可变的,不过仍然 可以获取其内部值的可变引用,这意味着 Mutex<T> 提供了内部可变性,就像 RefCell 系列类型那样。

死锁问题

本章已经写的够多了,虽然我很不愿意再继续写了,但涉及到,就一定绕不开一个死锁问题,这个问题还很常见,因此必须要了解下

什么是死锁?

在多线程场景中,两个线程各持有一把锁,但又都在等待对方的锁,此时就会造成死锁
再说的直白一点就是,我锁住了一个资源,现在我需要获取另一个线程持有的锁后才会释放我的锁,此时另一个线程也锁住了一个资源,而恰好它也在等待我的锁

麻烦的是,在Rust中要想写一个死锁的例子还不是那么容易

1
static COUNTER1: Mutex<i32> = Mutex::new(0);

由于在Rust中不允许以这样的方式创建静态全局变量,因此我们需要借助一个第三方库lazy_static,它允许我们在运行时进行初始化,我们只需要在 Cargo.toml 中加入

1
2
[dependencies]
lazy_static = "1.4.0"

好了,来看看我们下面的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::{sync::Mutex, thread};

use lazy_static::lazy_static;
lazy_static! {
    static ref COUNTER1: Mutex<i32> = Mutex::new(0);
    static ref COUNTER2: Mutex<i32> = Mutex::new(0);
}

fn main() {
    let handle1 = thread::spawn(move || {
        let mut num1 = COUNTER1.lock().unwrap();
        *num1 += 1;

        println!("第一个线程获取了锁1");

        println!("第一个线程开始等待了锁2");
        let mut num2 = COUNTER2.lock().unwrap();
        *num2 += 1;

        println!("第一个线程获取了锁2");
    });

    let handle2 = thread::spawn(move || {
        let mut num2 = COUNTER2.lock().unwrap();
        *num2 += 1;

        println!("第二个线程获取了锁2");

        println!("第二个线程开始等待了锁1");
        let mut num1 = COUNTER1.lock().unwrap();
        *num1 += 1;

        println!("第二个线程获取了锁1");
    });

    // 主线程等待这两个线程执行完在结束
    let _ = handle1.join();
    let _ = handle2.join();

    println!("死锁没有发生");
}

需要知道的是,这段代码并不会100%发生死锁,因为我们并不知道线程什么时候执行,有可能线程1完全执行完毕后线程2才开始执行,这种情况下不会发生死锁

运行这段程序,如果你看到了类似如下的输出,则证明发生了死锁

1
2
3
4
5
6
7
8
    Blocking waiting for file lock on build directory
   Compiling rust_test v0.1.0 (/home/w/data/code/rust_test)
    Finished dev [unoptimized + debuginfo] target(s) in 0.30s
     Running `/home/w/data/code/rust_test/target/debug/rust_test`
第一个线程获取了锁1
第二个线程获取了锁2
第二个线程开始等待了锁1
第一个线程开始等待了锁2

try_lock

try_lock尝试去获取一次锁,如果无法获取则会返回一个错误,因此不会发生阻塞

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
use std::{sync::Mutex, thread};

use lazy_static::lazy_static;
lazy_static! {
    static ref COUNTER1: Mutex<i32> = Mutex::new(0);
    static ref COUNTER2: Mutex<i32> = Mutex::new(0);
}

fn main() {
    let handle1 = thread::spawn(move || {
        let mut num1 = COUNTER1.lock().unwrap();
        *num1 += 1;

        println!("第一个线程获取了锁1");

        println!("第一个线程开始等待了锁2");
        let num2 = COUNTER2.try_lock(); // lock 换成 try_lock
        println!("第一个线程获取锁2的结果是: {:?}", num2);
    });

    let handle2 = thread::spawn(move || {
        let mut num2 = COUNTER2.lock().unwrap();
        *num2 += 1;

        println!("第二个线程获取了锁2");

        println!("第二个线程开始等待了锁1");
        let num1 = COUNTER1.try_lock(); // lock 换成 try_lock
        println!("第二个线程获取锁1的结果是: {:?}", num1);
    });

    let _ = handle1.join();
    let _ = handle2.join();

    println!("死锁没有发生");
}

与之前的结果不同,这次这段代码无论如何将不会再发生死锁

1
2
3
4
5
6
7
第一个线程获取了锁1
第一个线程开始等待了锁2
第一个线程获取锁2的结果是: Err("WouldBlock")
第二个线程获取了锁2
第二个线程开始等待了锁1
第二个线程获取锁1的结果是: Ok(1)
死锁没有发生

如上所示,当try_lock失败时,会报出一个错误:Err("WouldBlock"),接着线程中的剩余代码会继续执行,不会被阻塞

总结一下

  1. Mutex<T> 是一个互斥锁,它在任意时刻,其只允许一个线程访问某些数据
  2. Mutex<T>lock 方法返回一个MutexGuard<T> 智能指针
  3. Mutex<T>RefCell<T> 一样都具有内部可变性(允许我们对一个不可变的值进行可变借用)
  4. Arc<T>Rc<T> 一样,但它是线程安全的
updatedupdated2025-03-012025-03-01