Rust 学习笔记(35)-Atomic原子操作

参考章节《Rust语言圣经(Rust Course)》第3.6.5章 线程同步:Atomic 原子类型与内存顺序

在探讨原子操作之前,我们先看看下面的代码

创建N_THREADS(10)个线程,然后每个线程循环对一个共享变量R1,每个线程循环N_TIMES(10000000)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use std::{sync::Mutex, thread, time::Instant, ops::Sub};

use lazy_static::lazy_static;
lazy_static! {
    static ref R: Mutex<u64> = Mutex::new(0);
}

const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;

fn main() {
    let s = Instant::now(); // 用于记录开始时间

    let mut threads = Vec::with_capacity(N_THREADS); // 创建一个容量为 N_THREADS(10) 长度的 Vector

    // 循环创建线程,并添加到threads中
    for _ in 0..N_THREADS {
        let handle = thread::spawn(move || {
            let mut r = R.lock().unwrap();
            // 对共享变量R + 1 N_TIMES次(10000000)
            for _ in 0..N_TIMES {
                *r += 1;
            }
        });
        threads.push(handle);
    }

    // 等待所有线程结束
    for thread in threads {
        thread.join().unwrap();
    }

    let r = R.lock().unwrap();

    // 断言R的结果是否是 N_TIMES * N_THREADS
    assert_eq!(N_TIMES * N_THREADS as u64, *r);

    // 打印从开始到结束消耗的时间
    println!("{:?}", Instant::now().sub(s));
}

运行这个程序

1
2.273785168s

可以看到,总耗时2秒多(根据机器不同,结果也不同),我们为了保证线程安全,在每个线程中都申请了锁,这是一笔很大的开销
那么有没有一种既能实现和上面一样的功能,性能又比它好的东西呢?
答:当然有,它就是Atomic

Rust1.34 版本后,就正式支持原子类型。原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作
多核CPU 下,当某个CPU核心开始运行原子操作时,会先暂停其它CPU内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰

现在我们来看看用Atomic来完成上面的功能

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Instant;

const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;

static R: AtomicU64 = AtomicU64::new(0); // u64类型的原子类型

fn main() {
    let s = Instant::now();
    let mut threads = Vec::with_capacity(N_THREADS);

    for _ in 0..N_THREADS {
        let handle = thread::spawn(move || {
            for _ in 0..N_TIMES {
                R.fetch_add(1, Ordering::Relaxed); // +1 操作 
            }
        });
        threads.push(handle);
    }

    for thread in threads {
        thread.join().unwrap();
    }

    assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));

    println!("{:?}", Instant::now().sub(s));
}

我们可以看到其他地方都一样,我们只是把共享变量(锁)换成Atomic原子类型

运行这个程序

1
2.143941391s

怎么好像也没快多少?这应该是和我的计算机有关系,总之Atomic要比Mutex快一些

内存顺序

你可能注意到

1
R.fetch_add(1, Ordering::Relaxed);

可以看到这里有一个奇怪的枚举成员 Ordering::Relaxed, 它实际上它用于控制原子操作使用的内存顺序
关于内存顺序可以参考Rust语言圣经(Rust Course)内存顺序这一小节

主要就是说,编译器的优化或CPU缓存机制等等原因可能会造成内存顺序的改变,从而导致结果不是你预期的结果
因此,当你在多线程环境中使用原子操作时,如果结果不是你预期的结果,你可能需要注意是不是内存顺序导致的

Ordering 枚举有 5 个成员

Relaxed

1
这是最宽松的规则,它对编译器和 CPU 不做任何限制,可以乱序

Release

1
释放,设定内存屏障(Memory barrier),保证它之前的操作永远在它之前,但是它后面的操作可能被重排到它前面

Acquire

1
获取, 设定内存屏障,保证在它之后的访问永远在它之后,但是它之前的操作却有可能被重排到它后面,往往和`Release`在不同线程中联合使用

AcqRel

1
它是 `Acquire` 和 `Release` 的结合,同时拥有它们俩提供的保证。

SeqCst

1
2
3
顺序一致性, SeqCst 就像是 AcqRel 的加强版,它不管原子操作是属于读取还是写入的操作
只要某个线程有用到 SeqCst 的原子操作,线程中该 SeqCst 操作前的数据操作绝对不会被重新排在该 SeqCst 操作之后
且该 SeqCst 操作后的数据操作也绝对不会被重新排在 SeqCst 操作前。

我们来看一个内存屏障的例子

以下代码摘自Rust语言圣经(Rust Course)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};

static mut DATA: u64 = 0;
static READY: AtomicBool = AtomicBool::new(false);

fn reset() {
    unsafe {
        DATA = 0;
    }
    READY.store(false, Ordering::Relaxed);
}

fn producer() -> JoinHandle<()> {
    thread::spawn(move || {
        unsafe {
            DATA = 100; // A
        }
        READY.store(true, Ordering::Release); // B: 内存屏障 ↑

        /*
        // 难道DATA会重排到屏障之后?
        unsafe {
            DATA = 100; // A
        }
        */
    })
}

fn consumer() -> JoinHandle<()> {
    thread::spawn(move || {
        while !READY.load(Ordering::Acquire) {} // C: 内存屏障 ↓

        assert_eq!(100, unsafe { DATA }); // D
    })
}

fn main() {
    loop {
        reset();

        let t_producer = producer();
        let t_consumer = consumer();

        t_producer.join().unwrap();
        t_consumer.join().unwrap();
    }
}

书上这个例子,说实话我没看懂,难道DATA会重排到屏障之后?
我想应该不会因为它不满足指令重排的条件,所以,我猜它这个例子是不是说会发生CPU 缓存导致内存顺序的改变的问题

线程producerconsumer如果不设置内存屏障,那么DATA的值可能由于CPU 缓存导致内存顺序的改变
举个例子,假如reset中将DATA = 0,此时,producer线程中将DATA = 100,但由于CPU 缓存的原因,DATA = 100还没有被同步到其它CPU 缓存中,
此时consumer线程中开始读取DATA,结果读到了值0,这也就造成了断言失败,这里就是保证在设置屏障后,一定让 DATA = 100

总结一下,内存屏障可以

  1. 避免其之前的指令重排到其之后
  2. 避免其之后的指令重排到其之前
  3. 在内存屏障中被写入的数据,都可以被其它线程读取到,不会有 CPU 缓存的问题

内存顺序的选择

  1. 大部分情况下,不知道怎么选择时,优先使用SeqCst,虽然会稍微减慢速度,但是慢一点也比出现错误好
  2. 多线程只计数fetch_add而不使用该值触发其他逻辑分支的简单使用场景,可以使用Relaxed
    参考 Which std::sync::atomic::Ordering to use?

Atomic能代替锁吗?

下面引用Rust语言圣经(Rust Course)中的回答

答:不能

  1. 对于复杂的场景下,锁的使用简单粗暴,不容易有坑
  2. std::sync::atomic 包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16等,而锁可以应用于各种类型
  3. 在有些情况下,必须使用锁来配合,例如上一章节中使用Mutex配合Condvar

使用原子操作,还需要注意内存顺序的问题,因此,综上原因如果不是特别特殊的场合,不建议使用原子操作,参考 Atomic 的应用场景

updatedupdated2025-03-012025-03-01