Rust 学习笔记(45)-异步运行时Tokio

参考章节《Tokio官方文档Spawning
参考章节《Rust语言圣经(Rust Course)》第4.2章 Tokio 使用指南

这一节我们通过写一个简单的redis服务端来继续深入学习Tokio

移动之前的项目

我们在项目根目录创建一个examples目录,并把之前的代码移动到examples目录中

1
2
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs

编辑 Cargo.toml

1
2
3
[[example]]
name = "hello-redis"
path = "examples/hello-redis.rs"

接受套接字

我们的 Redis 服务器需要做的第一件事是接受入站 TCP 套接字。可以通过 tokio::net::TcpListener

TcpListener绑定到端口6379,然后在循环中接受套接字。每个套接字都被处理然后关闭。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    // Bind the listener to the address
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        // The second item contains the IP and port of the new connection.
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // The `Connection` lets us read/write redis **frames** instead of
    // byte streams. The `Connection` type is defined by mini-redis.
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // Respond with an error
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

现在运行这段代码

1
cargo run

在单独的终端窗口中,运行 hello-redis 示例

1
$ cargo run --example hello-redis

输出应该是

1
Error: "unimplemented"

在服务器终端中,输出为

1
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

创建并发任务

上面的服务器,如果你仔细看,它其实一次只能接受和处理一条 TCP 连接,只有等当前的处理完并结束后,才能开始接收下一条连接。
原因在于 loop 循环中的 await 会导致当前任务进入阻塞等待,在这里也就是 loop 循环会被阻塞。而我们希望我们的 Redis 服务器能够处理许多并发请求。

接受循环变为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // A new task is spawned for each inbound socket. The socket is
        // moved to the new task and processed there.
        // 为每一条连接都生成一个新的任务
        // socket 的所有权将被移动到新的任务中,并在那里进行处理
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

spawn 函数的参数是一个 async 语句块,该语句块可以返回一个值,然后调用者可以通过 JoinHandle 获取该值,例如

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Do some async work
        "return value"
    });
    // Do some other work
    let out = handle.await.unwrap(); // `.await` 会返回一个 `Result`,若 `spawn` 创建的任务正常运行结束,则返回一个 `Ok(T)` 的值,否则会返回一个错误 `Err`
    println!("GOT {}", out);
}

任务是一个由 Tokio 调度程序管理的执行单元。生成任务将首先提交给 Tokio 调度程序,然后确保任务在有工作要做时执行。
任务是一个异步的绿色线程,它们通过 tokio::spawn 进行创建,该函数会返回一个 JoinHandle 类型的句柄,调用者可以使用该句柄跟创建的任务进行交互。
需要注意的是,最终执行任务的线程未必是创建任务的线程,任务完全有可能运行在另一个不同的线程上,而且任务在生成后,它还可能会在线程间被移动。

'static 约束

当使用 Tokio 创建一个任务时,其类型的生命周期必须为 'static。这意味着生成的任务不得包含对任务外部拥有的数据的任何引用。

这里注意是类型而不是具体的,一个类型,内部如果包含了外部数据的引用,那它就不是'static

例如,以下将无法编译

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

上面代码中,由于引用了外部环境中的变量 v ,导致以下报错:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
error[E0373]: async block may outlive the current function, but
              it borrows `v`, which is owned by the current function
 --> src/main.rs:7:23
  |
7 |       task::spawn(async {
  |  _______________________^
8 | |         println!("Here's a vec: {:?}", v);
  | |                                        - `v` is borrowed here
9 | |     });
  | |_____^ may outlive borrowed value `v`
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:7:17
  |
7 |       task::spawn(async {
  |  _________________^
8 | |         println!("Here's a vector: {:?}", v);
9 | |     });
  | |_____^
help: to force the async block to take ownership of `v` (and any other
      referenced variables), use the `move` keyword
  |
7 |     task::spawn(async move {
8 |         println!("Here's a vec: {:?}", v);
9 |     });
  |

原因在于,变量并不是通过 move 的方式转移进 async 语句块的,变量 v 的所有权依然属于 main 函数
println! 会借用 v,但这并不满足上面我们所说的 'static 约束,我们只需要为 async 语句块 加上 move 关键字即可解决这个问题
但是 move 有一个问题,一个数据只能被一个任务使用,如果想要多个任务使用一个数据,则可以使用 Arc,它可以轻松解决该问题,还是线程安全的。

Send约束

tokio::spawn 产生的任务必须实现 Send,当这些任务在 .await 执行过程中发生阻塞时Tokio 调度器会将任务在线程间移动
而如果一个任务要实现 Send 特征,那它在 .await 调用的过程中所持有的全部数据都必须实现 Send 特征。

.await 调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。
该流程能正确的工作,任务必须将 .await 之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。
若这些状态实现了 Send 特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动。

例如,以下代码可以工作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // 语句块的使用强制了 rc 会在 .await 被调用前就被释放,
        // 因此 rc 并不会影响 .await 的安全性
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // rc 的作用范围已经失效,因此当任务让出所有权给当前线程时,它无需作为状态被保存起来
        yield_now().await;
    });
}

而下面的代码则不行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        // rc 在 .await 后还被继续使用,因此它必须被作为任务的状态保存起来
        yield_now().await;

        // 事实上,注释掉下面一行代码,依然会报错
        // 原因是:是否保存,不取决于 rc 是否被使用,而是取决于 .await 后,rc 是否仍然有效
        println!("{}", rc);

        // rc 作用域在这里结束
    });
}

需要特别注意的是,上面代码注释中写道的

1
// 是否保存,不取决于 rc 是否被使用,而是取决于 .await 后,rc 是否仍然有效,如果有效,那么它就需要被保存

完善redis服务器

现在我们要处理客户端发送过来的命令,并存储数据到一个 HashMap

如果这段代码你看不懂的话,我认为也无关紧要,我们主要是学习 Tokio 的使用,而不是学习 mini-redis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};

async fn process(socket: TcpStream) {
    use mini_redis::Command::{self, Get, Set};
    use std::collections::HashMap;

    // 用 HashMap 来存储数据
    let mut db = HashMap::new();

    // mini-redis 提供的便利函数,使用返回的 connection 可以用于从 socket 中读取数据并解析为数据帧
    let mut connection = Connection::new(socket);

    // 使用 read_frame 方法从连接获取一个数据帧:一条redis命令 + 相应的数据
    while let Some(frame) = connection.read_frame().await.unwrap() {
        // Command::from_frame 读取一个数据帧,并返回一个Result<Command>
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                // 值被存储为 `Vec<u8>` 的形式
                db.insert(cmd.key().to_string(), cmd.value().to_vec());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                if let Some(value) = db.get(cmd.key()) {
                    // `Frame::Bulk` 期待数据的类型是 `Bytes`, 该类型会在后面章节讲解,
                    // 此时,你只要知道 `&Vec<u8>` 可以使用 `into()` 方法转换成 `Bytes` 类型
                    Frame::Bulk(value.clone().into())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // 将请求响应返回给客户端
        connection.write_frame(&response).await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

运行我们的服务端

1
cargo run

运行之前写的客户端

1
cargo run --example hello-redis

你应该能看到

1
got value from the server; result=Some(b"world")

总结一下

这一节最重要的是学习怎么用 Tokio 创建任务,以及任务的特性

updatedupdated2025-03-012025-03-01