Rust 学习 15
archive time: 2022-09-25
继续学习并发内容
除了使用 move 闭包外,我们还可以使用消息机制来传递数据
消息机制
有种安全的保证安全并发的机制,消息传递,线程之间可以通过彼此间发送消息来进行通信
这里的线程可以被称为 Actor1
在 Rust 里,可以使用标准库的 Channel
来实现消息通信
Channel
Channel 包含发送端和接收端,发送端发送数据,接收端检查和接收数据
如果发送端或接收端任意一端被 “丢弃” 了,那么 channel 就关闭了
Rust 中使用 mpsc::channel
来创建 channel
mpsc: multiple producer, single consumer
即多个发送端,一个接收端
发送端使用 send()
发送消息,接收端使用 recv()
接收消息
use std::{sync::mpsc, thread};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hello world");
tx.send(val).unwrap();
});
let res = rx.recv().unwrap();
println!("received: {}", res);
}
输出为
received: hello world
在这个简单例子里,我们使用 channel 实现了子线程和主线程之间的通信
我们 send 数据的时候,实际上是完成了一次所有权转移, 所以在 send 之后,无法使用 send 了的变量
tx.send(val).unwrap();
println!("val is {}", val); // error!
这就是个典型错误
我们还可以通过这个例子来看到接收端等待发送的例子
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let handle_t = thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_nanos(700));
}
});
let handle_r = thread::spawn(move || {
for rec in rx {
println!("got: {}", rec);
}
});
for i in 1..=5 {
println!("main: {}", i);
thread::sleep(Duration::from_nanos(900));
}
handle_t.join().unwrap();
handle_r.join().unwrap();
}
对应输出
main: 1
got: hi
main: 2
main: 3
got: from
main: 4
got: the
main: 5
got: thread
我们可以有多个发送者,使用 mpsc::Sender::clone()
来实现生成其他发送者
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx1, rx) = mpsc::channel();
let tx2 = mpsc::Sender::clone(&tx1);
let handle_t1 = thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(format!("1: {}", val)).unwrap();
thread::sleep(Duration::from_nanos(700));
}
});
let handle_t2 = thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx2.send(format!("2: {}", val)).unwrap();
thread::sleep(Duration::from_nanos(700));
}
});
let handle_r = thread::spawn(move || {
for rec in rx {
println!("got: {}", rec);
}
});
for i in 1..=5 {
println!("main: {}", i);
thread::sleep(Duration::from_nanos(2000));
}
handle_t1.join().unwrap();
handle_t2.join().unwrap();
handle_r.join().unwrap();
}
对应输出如下
main: 1
got: 1: hi
got: 2: hi
got: 1: from
main: 2
got: 2: from
got: 1: the
main: 3
got: 2: the
got: 1: thread
main: 4
got: 2: thread
main: 5
共享状态
Channel 里,我们可以使用发送者和接受者来传递消息,实现并发, 但是 channel 需要转移所有权,发送的数据无法再使用
所以有了另一种方式实现并发,那就是共享内存,或者说,共享状态
我们可以使用 Mutex
来完成这个操作
Mutex: mutual exclusion
Mutex 在使用数据前需要先获取锁 (lock),使用完数据后还需要解锁数据
不过 Mutex 实现了 Drop
trait,可以自动解锁,基本无需担心
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let cnt = Arc::new(Mutex::new(0u8));
let mut handles = Vec::new();
for _ in 0..10 {
let cnt = Arc::clone(&cnt);
let handle = thread::spawn(move || {
let mut num = cnt.lock().unwrap();
println!("thread {}!", *num + 1u8);
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("{:?}", cnt);
}
要使得 Mutex 在多个线程间传递,需要配合 Arc 使用,输出为
thread 1!
thread 2!
thread 3!
thread 4!
thread 5!
thread 6!
thread 7!
thread 8!
thread 9!
thread 10!
Mutex { data: 10, poisoned: false, .. }
好,今天就这样吧
Wikipedia.Actor_model [DB/OL]. https://en.wikipedia.org/wiki/Actor_model, 2022-09-24/2022-09-25