SICP 笔记 - 0x06

archive time: 2025-03-19

流这个概念还是我看 Java 才了解到的,不过没想到这么早就有研究了

Lecture 6A

在上一节介绍了各种副作用的使用,特别是赋值

各种副作用的引入使得对象状态可能改变,替换模型失效,还有了时序问题, 不过执行顺序以及内存共享导致的竞争问题正是各种多线程所面临的难点, 这也是现实世界的复杂度

两个简单的例子

这里有两个例子,第一个例子,我有如下二叉树定义:

enum BiTree {
    Tree(i32, Box<BiTree>, Box<BiTree>),
    Leaf(i32),
    Nothing,
}

现在我想要计算整个树中的奇数的平方的和,一种可能的解法是:

fn odd_square_sum_bitree(tree: &BiTree) -> i32 {
    fn odd_square(n: i32) -> i32 {
        if n % 2 == 0 { 0 } else { n * n }
    }

    fn iter(tree: &BiTree, sum: i32) -> i32 {
        match tree {
            BiTree::Tree(value, left, right) => {
                let left_sum = iter(&left, sum + odd_square(*value));
                iter(&right, left_sum)
            }
            BiTree::Leaf(value) => sum + odd_square(*value),
            BiTree::Nothing => sum,
        }
    }

    iter(tree, 0)
}

好,我们再来看另外一个例子,来获取前 k 个斐波那契数中奇数组成的列表,这个的一种可能解法如下:

fn first_kth_fib_odd(k: u8) -> Vec<u128> {
    fn fib_c(n: u8, a: u128, b: u128) -> u128 {
        match n {
            0 => a,
            1 => b,
            _ => fib_c(n - 1, b, a + b),
        }
    }

    fn iter(idx: u8, n: u8, lst: Vec<u128>) -> Vec<u128> {
        if idx < n {
            let f = fib_c(idx, 0, 1);
            if f % 2 == 0 {
                iter(idx + 1, n, lst)
            } else {
                let mut ex_lst = lst.clone();
                ex_lst.push(f);
                iter(idx + 1, n, ex_lst)
            }
        } else {
            lst
        }
    }
    iter(0, k, Vec::new())
}

那么我们为什么要看这两个例子呢?

用自然语言,我们可以分别描述其解法:

  1. 遍历整棵树,找出所有奇数元素,将找出的元素平方,最后将所有结果求和
  2. 遍历 ,计算对应的斐波那契数,找出其中奇数元素,将所有结果收集成一个列表

可以发现,这两个解法有共同性, 即先遍历某个容器内的所有元素,然后应用上一些函数,或者通过某些条件筛选出特定的值, 最后把处理后的结果累积成一个结果

基于这种思想,我们可以引入 流(Stream) 这个概念

流是一种特殊的数据结构,有如下的接口定义:

(cons-stream x y) ;; => stream
(head s) ;; => x
(tail s) ;; => y
the-empty-stream ;; => stream

单纯从这个结果看,流就是列表,或者有序对, 那么为什么还需要单独提出这么一个抽象呢?

因为我们需要一个 统一接口(Conventional Interface) 来规范代码编写, 对于像上述两个例子的问题,我们都可以用流的方式来编写, 这样方便我们复用结构和排错

使用流的方式重写上述问题:

enum BiTree {
    Tree(i32, Box<BiTree>, Box<BiTree>),
    Leaf(i32),
    Nothing,
}

impl BiTree {
    pub fn to_vec(&self) -> Vec<&i32> {
        fn iter<'a>(tree: &'a BiTree, acc: Vec<&'a i32>) -> Vec<&'a i32> {
            let mut acc = acc.clone();
            match tree {
                BiTree::Tree(value, left, right) => {
                    acc.push(value);
                    acc = iter(&left, acc);
                    iter(&right, acc)
                }
                BiTree::Leaf(value) => {
                    acc.push(value);
                    acc
                }
                BiTree::Nothing => acc,
            }
        }
        iter(self, Vec::new())
    }
}

// q01
fn odd_square_sum_bitree(tree: &BiTree) -> i32 {
    tree.to_vec()
        .iter()
        .filter(|&&&e| e % 2 != 0)
        .map(|&e| e * e)
        .sum::<i32>()
}

// q02
fn first_kth_fib_odd(k: u8) -> Vec<u128> {
    fn fib_c(n: u8, a: u128, b: u128) -> u128 {
        match n {
            0 => a,
            1 => b,
            _ => fib_c(n - 1, b, a + b),
        }
    }

    (0..k)
        .map(|e| fib_c(e, 0, 1))
        .filter(|e| e % 2 != 0)
        .collect::<Vec<u128>>()
}

有了流的概念,对于类似的问题,我们只需要将对应结构变成流,然后就可以用类似的方法解决

统一接口的概念非常重要,数据导向编程也是基于这种思想, 有了统一接口,我们就可以有比较标准的解决流程,并且可以自动扩展,比如 化简器

流的实例

有了流的定义,要完成一些工作就非常简单,这里有一些例子

flatten

flatten 的定义非常简单,就是对于由流组成的流,可以将其拼接成一个流, 即 [[a]] -> [a]

fn flatten<A: Clone>(it: &Vec<Vec<A>>) -> Vec<A> {
    it.iter().fold(Vec::new(), |mut acc, e| {
        acc.extend(e.clone());
        acc
    })
}

fn main() {
    let vs = vec![vec![1, 2, 3], vec![4, 5, 6]];
    let v = flatten(&vs);
    println!("{:?}", v);
    // => [1, 2, 3, 4, 5, 6]
}

Iterator 中已经实现

flatmap

flatmap 就是在 flatten 的基础上再应用一个函数的过程:

fn flatmap<A: Clone, B>(it: &Vec<Vec<A>>, f: impl Fn(A) -> B) -> Vec<B> {
    flatten(it).iter().map(|e| f(e.clone())).collect::<Vec<B>>()
}

Iterator 中叫做 flat_map

找到 N 以内的质数拆分

给定一个正整数 ,找到 ,使得 ,其中 是所有质数的集合

fn divide_prime(n: u64) -> Vec<(u64, u64, u64)> {
    // filter
    fn is_prime(p: &u64) -> bool {
        for i in 2..=p.isqrt() {
            if p % i == 0 {
                return false;
            }
        }
        true
    }
    // construct stream
    (1..=n)
        .flat_map(|i| (1..i).map(move |j| (i, j, i + j))) // accumulate
        .filter(|(_, _, sum)| is_prime(sum)) // filter
        .collect() // accumulate
}

fn main() {
    let ps = divide_prime(6);
    println!("{:?}", ps);
    // => [(2, 1, 3), (3, 2, 5), (4, 1, 5), (4, 3, 7), (5, 2, 7), (6, 1, 7), (6, 5, 11)]
}

八皇后问题

这个是一个经典问题,通常的解法是使用回溯法,但是如果结合 wishful thinking 和流,我们可以有更好的方法求解所有可能方案:

fn queen(n: u8) -> Vec<Vec<u8>> {
    fn is_safe(row: u8, column: u8, size: u8, queens: &Vec<u8>) -> bool {
        queens.iter().enumerate().all(|(idx, p)| {
            let diff = row - idx as u8;
            *p != column
                && (column < diff || column - diff != *p)
                && (column + diff > size || column + diff != *p)
        })
    }

    fn iter(k: u8, size: u8) -> Vec<Vec<u8>> {
        if k == 0 {
            (0..size).map(|i| vec![i]).collect::<Vec<Vec<u8>>>()
        } else {
            iter(k - 1, size)
                .iter()
                .flat_map(|queens| {
                    (0..size).filter(|c| is_safe(k, *c, size, queens)).map(|c| {
                        let mut queens = queens.clone();
                        queens.push(c);
                        queens
                    })
                })
                .collect::<Vec<Vec<u8>>>()
        }
    }

    iter(n - 1, n)
}

fn main() {
    let q8 = queen(8);
    println!("queen(8) has {} sols", q8.len());
    // => queen(8) has 92 sols
}

在这里,iter 就是一个迭代填入棋盘的过程,我们先假设第 k 行前的所有行都已经放好了皇后, 那么对于目前的棋盘 queens,我们可以尝试 size 摆放方式,每一种方式通过 filter 检验, 最后就得到了当前行所有的摆放方式,使用 flat_mapcollect 打包返回给上一层

还可以使用 rayon 库并行来加快速度

流不是列表

到目前为止,我们看到了流可以非常 “优雅” 地解决一些和枚举有关的问题, 但是如果从效率上考虑,似乎流又显得比较低效, 因为我们需要先将要处理的数据收集起来,然后统一处理,过滤,然后再统一处理,最后再累积

但真的是这样吗?

事实上,流并不会一次性把所有的数据都处理完,而是倾向于像流水线一样, 前一个元素处理完到下一个流程处理的时候,当前流程又会收到新的元素,与之前的元素近乎同时处理, 这也就意味着流天然就适合并行处理

流的真实结构在现在的语言中一般被叫做生成器,即包含当前元素以及生成下一个元素的方法:

Rust 的 流 实现

这是一个简单的流的实现,当然,使用 Iterator 也是可以做到的, 但是在后面筛法实现上就不太方便了,所以还是使用自定义结构

struct Stream<A>(
    std::sync::Arc<std::sync::RwLock<Option<A>>>,
    std::sync::Arc<dyn Fn() -> Stream<A>>,
);

impl<A: Clone + 'static> Stream<A> {
    pub fn cons_stream(x: A, y: std::sync::Arc<dyn Fn() -> Stream<A>>) -> Stream<A> {
        Stream(
            std::sync::Arc::new(std::sync::RwLock::new(Some(x))),
            std::sync::Arc::new(move || y()),
        )
    }

    pub fn head(&self) -> Option<A> {
        self.0.read().expect("read stream failed").clone()
    }

    pub fn tail(&self) -> Stream<A> {
        (self.1)()
    }

    pub fn is_empty(&self) -> bool {
        self.head().is_none()
    }

    pub fn kth(&self, k: usize) -> Option<A> {
        fn iter<A: Clone + 'static>(s: &Stream<A>, k: usize, idx: usize) -> Option<A> {
            if s.is_empty() || k == idx {
                s.head()
            } else {
                iter(&s.tail(), k, idx + 1)
            }
        }
        iter(self, k, 0)
    }

    pub fn map<B: Clone + 'static>(&self, proc: std::sync::Arc<dyn Fn(A) -> B>) -> Stream<B> {
        if self.is_empty() {
            Stream(
                std::sync::Arc::new(std::sync::RwLock::new(None)),
                std::sync::Arc::new(|| unimplemented!()),
            )
        } else {
            let tail = self.tail();
            Stream::cons_stream(
                proc(self.head().unwrap()),
                std::sync::Arc::new(move || tail.map(proc.clone())),
            )
        }
    }

    pub fn filter(&self, pred: std::sync::Arc<dyn Fn(A) -> bool>) -> Stream<A> {
        if self.is_empty() {
            Stream(
                std::sync::Arc::new(std::sync::RwLock::new(None)),
                std::sync::Arc::new(|| unimplemented!()),
            )
        } else if pred(self.head().unwrap()) {
            let tail = self.tail();
            Stream::cons_stream(
                self.head().unwrap(),
                std::sync::Arc::new(move || tail.filter(pred.clone())),
            )
        } else {
            self.tail().filter(pred)
        }
    }

    pub fn accumulate(&self, combiner: std::sync::Arc<dyn Fn(A, A) -> A>, init_val: A) -> A {
        if self.is_empty() {
            init_val
        } else {
            combiner(
                self.head().unwrap(),
                self.tail().accumulate(combiner.clone(), init_val),
            )
        }
    }
}

仅为演示,没有进行优化

我们的流程在使用时会不断的调用 tail(),尝试获取新的元素,而非一次性就生成所有元素, 这种延后计算的方法让我们有能力任意编排计算顺序,而非完全依照代码顺序, 这是一种程序逻辑顺序与实际运行顺序的解耦

当然,每一次都重新计算是比较繁琐的,所以可以使用一些方法来记忆计算结果, 以便更快的的到下一个元素,在 LISP 中对应 memo-proc

(define (memo-proc proc)
  (let ((already-run? nil)
        (result nil))
    (lambda ()
      (if (not already-run?)
          (sequence
            (set! result (proc))
            (set! already-run? (not nil))
            result)
          result))))

即在第一次运行的时候记录下运行结果,在下一次运行的时候就直接返回结果

Lecture 6B

流,或者说惰性求值1,让我们能够处理非常大的数据,甚至是无限长的数据

自然数

还记得 皮亚诺数 的定义, 其实皮亚诺数就是一个自然数流,使用其定义就可以生成所有自然数

fn integers_from(n: u128) -> Stream<u128> {
    Stream::cons_stream(n, std::sync::Arc::new(move || integers_from(n + 1)))
}

素数

我们甚至可以表示所有素数,即通过特定的 筛法2 排除自然数中不符合要求的部分:

fn sieve(s: Stream<u128>) -> Stream<u128> {
    let head = s.head().unwrap();
    Stream::cons_stream(
        s.head().unwrap(),
        std::sync::Arc::new(move || {
            sieve(s.tail().filter(std::sync::Arc::new(move |x| x % head != 0)))
        }),
    )
}

fn main() {
    let primes = sieve(integers_from(2));
    println!("the 21-th prime is {:?}", primes.kth(20));
    // => the 21-th prime is Some(73)
}
迭代器实现
struct Primes {
    iter: Box<dyn Iterator<Item = u128>>,
}

impl Iterator for Primes {
    type Item = u128;

    fn next(&mut self) -> Option<Self::Item> {
        let head = self.iter.next()?;
        let iter = std::mem::replace(&mut self.iter, Box::new(std::iter::empty()));
        self.iter = Box::new(iter.filter(move |&x| x % head != 0));
        Some(head)
    }
}

fn sieve<I>(iter: I) -> Primes
where
    I: Iterator<Item = u128> + 'static,
{
    Primes {
        iter: Box::new(iter),
    }
}

fn main() {
    let primes = sieve(2..);
    println!("the 21-th prime is {:?}", primes.nth(20));
    // => the 21-th prime is Some(73)
}

流的局限性

虽然有惰性求值,但是在一些情况下,流会无法正常工作,例如:

(define (integral s initial-value dt)
  (define int
    (cons-stream
      initial-value
      (add-streams (scale-stream dt s)
        int)))
  int)

(define y (integral dy 1 0.001))
(define dy (map-stream square Y))

上式是对于 这个微分方程的模拟, 但是由于定义域,y 的定义需要 dy 的定义,在这种情况下,这个定义就不生效了

要解决这个问题,我们需要修改 integral 定义:

(define (integral delayed-s initial-value dt)
  (define int
    (cons-stream
      initial-value
      (let ((s (force delayed-s)))
        (add-streams (scale-stream dt s)
          int))))
  int)

这样延迟了 dy 的定义,就可以解决无法定义的问题

不过这个例子非常简单,如果系统再复杂一点,情况会变成什么样子呢? 很明显,什么时候使用 delay 是非常重要的,但是这会大大增加程序维护的复杂度

那么有什么解决方案呢?一个比较直接的方法就是改用惰性求值的语言, 这种求值策略被称为 常序求值(Normal Order Evaluation)3, 编程语言通常是使用 应用序求值(Applicative Order Evaluation)

但是有得必有失,常序求值会导致原本正常的过程由于 delay 而延迟, 导致无法完成真正的计算使得内存无法释放,这在一些性能敏感的场合非常致命

纯函数语言

当然,还有一种方式可以避免这些混乱,那就是放弃赋值,也就是所谓的 纯函数语言4, 这就属于编码范式和风格的取舍问题了, 某些情况下有赋值是方便的,某些情况下没有赋值会少很多问题


这一节主要介绍 delay 的概念,以及相关的问题, 下一节将会开始具体讲解求值器的细节


1

惰性求值.Wikipedia [DB/OL].(2025-01-28)[2025-03-19]. https://en.wikipedia.org/wiki/Lazy_evaluation

2

筛法.Wikipedia [DB/OL].(2025-01-31)[2025-03-19]. https://en.wikipedia.org/wiki/Sieve_of_Eratosthenes

3

非严格求值.Wikipedia [DB/OL].(2024-12-17)[2025-03-19]. https://en.wikipedia.org/wiki/Evaluation_strategy#Non-strict_evaluation

4

纯函数语言.Wikipedia [DB/OL].(2024-07-16)[2025-03-19]. https://en.wikipedia.org/wiki/Purely_functional_programming