2021-08-15

生产者消费者编程模式

Views: 11711 | Add Comments

相信很多人都知道"生产者消费者"编程模式, 也使用过这种模式, 但是, 可能只是本能不自觉地使用过, 未必对这种模式有清晰和深刻的理解. 特别是级联生产者消费者模式, 更是强大无比. 很多人可能没有意识到, Golang 语言的核心思想正是生产者消费者模式, 也即 go routine + channel.

假设有一个功能, 处理某个任务需要进行3个步骤, 那么代码可以这样写:

func worker(t *Task) {
    step1(t)
    step2(t)
    step3(t)
}

for t := recv_task(); t != nil {
    worker(t)
}

这样的代码非常直观, 也是我所推崇的程序设计核心原则之一. 不过, 这段代码毕竟是串行化的, 中间某个步骤会阻塞后面的步骤.

对于串行化阻塞问题的最直接解决方案就是多线程并发. 例如, 每收到一个任务, 就启动一个线程去处理:

for t := recv_task(); t != nil {
    go worker() // 启动线程/协程
}

这种并发模式有一定的缺陷, 一是并发数量不受控制, 二是如果做了并发数量控制, 这样的控制也不高效. 例如限制最多只有 N 个 worker 线程在同时运行, 如果刚好这 N 个线程都被阻塞在某一个步骤时, 那么整个系统就都没有在工作, 是被闲置的.

但是, 如果我们改成流水线并发, 也即分段并发, 针对上面的代码逻辑启动 3 组工作线程池, 每一组是 N/3 个线程, 代码结构就变成:

q1, q2, q3 channel // 3 个任务队列, 分别给 3 组工作线程提供任务

// 启动所有工作线程组
for i := 0; i < N/3; i ++ {
    go worker1()
    go worker2()
    go worker3()
}

func worker1() {
    for t := q1.recv_task(); t != nil {
        step1(t)
        q2.push_task(t) // 将任务传给下一组工作线程
    }
}

func worker2() {
    for t := q2.recv_task(); t != nil {
        step2(t)
        q3.push_task(t) // 将任务传给下一组工作线程
    }
}

func worker3() {
    for t := q3.recv_task(); t != nil {
        step3(t)
        // 处理完毕
    }
}

这种编程模式, 不仅高效, 而且模块解耦. 即使 worker2 的所有线程都阻塞了, worker1 和 worker3 依然在工作, 计算机的算力就不会被闲置.

这样的系统如果遇到性能瓶颈, 要进行优化的话, 思路也很直观: 就是统计各种工作线程的工作速度, 然后根据阿姆达尔定律去优化最慢的那个模块. 职责划分非常清晰, 系统结构简洁.

补充: CPU 指令流水线

CPU 指令流水线技术是一种将指令分解为多步, 并让不同指令的各步操作重叠(并发), 从而实现几条指令并行处理, 以加速程序运行过程的技术.

上面的编程模式和 CPU 指令流水线技术是一样的.

Related posts:

  1. 程序设计核心原则: 直观
  2. 接口与实现分离
  3. 小心递归次数限制
  4. 使用 Channel 进行可靠传输
  5. 消除JavaScript闭包的一般方法
Posted by ideawu at 2021-08-15 09:42:47

Leave a Comment