参考章节《Tokio官方文档》Spawning
参考章节《Rust语言圣经(Rust Course)》第4.2章 Tokio 使用指南
这一节我们通过写一个简单的redis服务端来继续深入学习Tokio
移动之前的项目
我们在项目根目录创建一个examples
目录,并把之前的代码移动到examples
目录中
1
2
|
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
|
编辑 Cargo.toml
1
2
3
|
[[example]]
name = "hello-redis"
path = "examples/hello-redis.rs"
|
接受套接字
我们的 Redis 服务器需要做的第一件事是接受入站 TCP 套接字。可以通过 tokio::net::TcpListener
TcpListener
绑定到端口6379
,然后在循环中接受套接字。每个套接字都被处理然后关闭。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis **frames** instead of
// byte streams. The `Connection` type is defined by mini-redis.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Respond with an error
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
|
现在运行这段代码
在单独的终端窗口中,运行 hello-redis
示例
1
|
$ cargo run --example hello-redis
|
输出应该是
在服务器终端中,输出为
1
|
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
|
创建并发任务
上面的服务器,如果你仔细看,它其实一次只能接受和处理一条 TCP
连接,只有等当前的处理完并结束后,才能开始接收下一条连接。
原因在于 loop
循环中的 await 会导致当前任务进入阻塞等待
,在这里也就是 loop
循环会被阻塞
。而我们希望我们的 Redis 服务器能够处理许多并发请求。
接受循环变为
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
// 为每一条连接都生成一个新的任务
// socket 的所有权将被移动到新的任务中,并在那里进行处理
tokio::spawn(async move {
process(socket).await;
});
}
}
|
spawn
函数的参数是一个 async 语句块
,该语句块可以返回一个值,然后调用者可以通过 JoinHandle
获取该值,例如
1
2
3
4
5
6
7
8
9
10
|
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap(); // `.await` 会返回一个 `Result`,若 `spawn` 创建的任务正常运行结束,则返回一个 `Ok(T)` 的值,否则会返回一个错误 `Err`
println!("GOT {}", out);
}
|
任务是一个由 Tokio 调度程序管理的执行单元
。生成任务将首先提交给 Tokio 调度程序,然后确保任务在有工作要做时执行。
任务是一个异步的绿色线程
,它们通过 tokio::spawn
进行创建,该函数会返回一个 JoinHandle
类型的句柄,调用者可以使用该句柄跟创建的任务进行交互。
需要注意的是,最终执行任务的线程未必是创建任务的线程,任务完全有可能运行在另一个不同的线程上,而且任务在生成后,它还可能会在线程间被移动。
'static 约束
当使用 Tokio 创建一个任务时,其类型
的生命周期必须为 'static
。这意味着生成的任务不得包含对任务外部拥有的数据的任何引用。
这里注意是类型
而不是具体的值
,一个类型
,内部如果包含了外部数据的引用,那它就不是'static
的
例如,以下将无法编译
1
2
3
4
5
6
7
8
9
10
|
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
|
上面代码中,由于引用了外部环境中的变量 v
,导致以下报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
|
原因在于,变量并不是通过 move
的方式转移进 async
语句块的,变量 v
的所有权依然属于 main
函数
而 println!
会借用 v
,但这并不满足上面我们所说的 'static
约束,我们只需要为 async 语句块
加上 move
关键字即可解决这个问题
但是 move
有一个问题,一个数据只能被一个任务使用,如果想要多个任务使用一个数据,则可以使用 Arc
,它可以轻松解决该问题,还是线程安全的。
Send约束
tokio::spawn
产生的任务必须实现 Send
,当这些任务在 .await
执行过程中发生阻塞时
,Tokio 调度器会将任务在线程间移动
。
而如果一个任务要实现 Send
特征,那它在 .await
调用的过程中所持有的全部数据都必须实现 Send
特征。
当 .await
调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。
该流程能正确的工作,任务必须将 .await
之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。
若这些状态实现了 Send
特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动。
例如,以下代码可以工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// 语句块的使用强制了 rc 会在 .await 被调用前就被释放,
// 因此 rc 并不会影响 .await 的安全性
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// rc 的作用范围已经失效,因此当任务让出所有权给当前线程时,它无需作为状态被保存起来
yield_now().await;
});
}
|
而下面的代码则不行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// rc 在 .await 后还被继续使用,因此它必须被作为任务的状态保存起来
yield_now().await;
// 事实上,注释掉下面一行代码,依然会报错
// 原因是:是否保存,不取决于 rc 是否被使用,而是取决于 .await 后,rc 是否仍然有效
println!("{}", rc);
// rc 作用域在这里结束
});
}
|
需要特别注意的是,上面代码注释中写道的
1
|
// 是否保存,不取决于 rc 是否被使用,而是取决于 .await 后,rc 是否仍然有效,如果有效,那么它就需要被保存
|
完善redis服务器
现在我们要处理客户端发送过来的命令,并存储数据到一个 HashMap
中
如果这段代码你看不懂的话,我认为也无关紧要,我们主要是学习 Tokio 的使用,而不是学习 mini-redis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// 用 HashMap 来存储数据
let mut db = HashMap::new();
// mini-redis 提供的便利函数,使用返回的 connection 可以用于从 socket 中读取数据并解析为数据帧
let mut connection = Connection::new(socket);
// 使用 read_frame 方法从连接获取一个数据帧:一条redis命令 + 相应的数据
while let Some(frame) = connection.read_frame().await.unwrap() {
// Command::from_frame 读取一个数据帧,并返回一个Result<Command>
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// 值被存储为 `Vec<u8>` 的形式
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` 期待数据的类型是 `Bytes`, 该类型会在后面章节讲解,
// 此时,你只要知道 `&Vec<u8>` 可以使用 `into()` 方法转换成 `Bytes` 类型
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// 将请求响应返回给客户端
connection.write_frame(&response).await.unwrap();
}
}
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
process(socket).await;
});
}
}
|
运行我们的服务端
运行之前写的客户端
1
|
cargo run --example hello-redis
|
你应该能看到
1
|
got value from the server; result=Some(b"world")
|
总结一下
这一节最重要的是学习怎么用 Tokio 创建任务,以及任务的特性