Rust 学习 15

archive time: 2022-09-25

继续学习并发内容

除了使用 move 闭包外,我们还可以使用消息机制来传递数据

消息机制

有种安全的保证安全并发的机制,消息传递,线程之间可以通过彼此间发送消息来进行通信

这里的线程可以被称为 Actor1

在 Rust 里,可以使用标准库的 Channel 来实现消息通信

Channel

Channel 包含发送端和接收端,发送端发送数据,接收端检查和接收数据

如果发送端或接收端任意一端被 “丢弃” 了,那么 channel 就关闭了

Rust 中使用 mpsc::channel 来创建 channel

mpsc: multiple producer, single consumer

即多个发送端,一个接收端

发送端使用 send() 发送消息,接收端使用 recv() 接收消息

use std::{sync::mpsc, thread};

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hello world");
        tx.send(val).unwrap();
    });

    let res = rx.recv().unwrap();
    println!("received: {}", res);
}

输出为

received: hello world

在这个简单例子里,我们使用 channel 实现了子线程和主线程之间的通信

我们 send 数据的时候,实际上是完成了一次所有权转移, 所以在 send 之后,无法使用 send 了的变量

tx.send(val).unwrap();
println!("val is {}", val); // error!

这就是个典型错误

我们还可以通过这个例子来看到接收端等待发送的例子

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let handle_t = thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_nanos(700));
        }
    });

    let handle_r = thread::spawn(move || {
        for rec in rx {
            println!("got: {}", rec);
        }
    });

    for i in 1..=5 {
        println!("main: {}", i);
        thread::sleep(Duration::from_nanos(900));
    }
    handle_t.join().unwrap();
    handle_r.join().unwrap();
}

对应输出

main: 1
got: hi
main: 2
main: 3
got: from
main: 4
got: the
main: 5
got: thread

我们可以有多个发送者,使用 mpsc::Sender::clone() 来实现生成其他发送者

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx1, rx) = mpsc::channel();

    let tx2 = mpsc::Sender::clone(&tx1);

    let handle_t1 = thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(format!("1: {}", val)).unwrap();
            thread::sleep(Duration::from_nanos(700));
        }
    });

    let handle_t2 = thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx2.send(format!("2: {}", val)).unwrap();
            thread::sleep(Duration::from_nanos(700));
        }
    });

    let handle_r = thread::spawn(move || {
        for rec in rx {
            println!("got: {}", rec);
        }
    });

    for i in 1..=5 {
        println!("main: {}", i);
        thread::sleep(Duration::from_nanos(2000));
    }
    handle_t1.join().unwrap();
    handle_t2.join().unwrap();
    handle_r.join().unwrap();
}

对应输出如下

main: 1
got: 1: hi
got: 2: hi
got: 1: from
main: 2
got: 2: from
got: 1: the
main: 3
got: 2: the
got: 1: thread
main: 4
got: 2: thread
main: 5

共享状态

Channel 里,我们可以使用发送者和接受者来传递消息,实现并发, 但是 channel 需要转移所有权,发送的数据无法再使用

所以有了另一种方式实现并发,那就是共享内存,或者说,共享状态

我们可以使用 Mutex 来完成这个操作

Mutex: mutual exclusion

Mutex 在使用数据前需要先获取锁 (lock),使用完数据后还需要解锁数据

不过 Mutex 实现了 Drop trait,可以自动解锁,基本无需担心

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let cnt = Arc::new(Mutex::new(0u8));
    let mut handles = Vec::new();

    for _ in 0..10 {
        let cnt = Arc::clone(&cnt);
        let handle = thread::spawn(move || {
            let mut num = cnt.lock().unwrap();
            println!("thread {}!", *num + 1u8);
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("{:?}", cnt);
}

要使得 Mutex 在多个线程间传递,需要配合 Arc 使用,输出为

thread 1!
thread 2!
thread 3!
thread 4!
thread 5!
thread 6!
thread 7!
thread 8!
thread 9!
thread 10!
Mutex { data: 10, poisoned: false, .. }

好,今天就这样吧

1

Wikipedia.Actor_model [DB/OL]. https://en.wikipedia.org/wiki/Actor_model, 2022-09-24/2022-09-25