問題背景

Producer(生產者)和 Consumer(消費者)的速度通常不一樣:

  • Web server 接受 request(快)→ 發送 email(慢)
  • 感測器資料採集(高頻)→ 資料庫寫入(有 I/O 延遲)
  • 爬蟲抓取 URL(快)→ HTML 解析(CPU 密集,慢)

直接讓 Producer 等 Consumer 完成再繼續——Producer 的速度被拖累到 Consumer 的速度。

Producer-Consumer 模式在兩者之間插入一個 Buffer(有界佇列),讓:

  • Producer 把任務放進 queue 就返回,不等 Consumer
  • Consumer 從 queue 取任務,以自己的速度處理

Bounded Buffer 和 Backpressure

有界佇列(Bounded Queue):queue 有最大容量。當 queue 滿了,Producer 不能再放——這叫做 backpressure(背壓)。

Backpressure 是健康的系統設計:它讓上游知道下游的處理能力,而不是讓 queue 無限成長到記憶體耗盡。

處理 queue 滿的策略:

  • Blocking:Producer 等待直到 queue 有空間(同步 backpressure)
  • Drop:丟棄新的任務(適合可以接受資料丟失的場景)
  • Drop oldest:丟掉最舊的任務(滑動窗口)
  • Error:向 Producer 返回錯誤讓它自己決定

Java:BlockingQueue

import java.util.concurrent.*;
 
class ImageProcessor {
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
 
    // Producer
    public void submit(String imageUrl) throws InterruptedException {
        queue.put(imageUrl);  // blocking 直到有空間
    }
 
    // Consumer(在獨立 thread 跑)
    public void startProcessing() {
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    String url = queue.take();  // blocking 直到有任務
                    processImage(url);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        consumer.start();
    }
}

Go:Channel

Go 的 channel 就是 bounded buffer:

func main() {
    jobs := make(chan string, 100)  // buffered channel,容量 100
 
    // Producers
    go func() {
        for _, url := range urls {
            jobs <- url  // 送入 channel(滿了就 block)
        }
        close(jobs)
    }()
 
    // Multiple Consumers
    var wg sync.WaitGroup
    for i := 0; i < 8; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range jobs {  // channel 關閉後自動退出
                processImage(url)
            }
        }()
    }
 
    wg.Wait()
}

現代分散式版本

在微服務架構裡,Producer-Consumer 的 queue 從 in-process 的 BlockingQueue 擴展成 message broker:

  • RabbitMQ:Push 模式,broker 推給 Consumer,有 ACK 機制
  • Kafka:Pull 模式,Consumer 自己 poll,有 consumer group 和 offset 管理
  • Redis Streams / Bull:輕量版,適合同一服務內的異步任務

原理和本地的 Producer-Consumer 相同,只是 queue 在網路上,帶來了分散式系統的新問題(at-least-once / exactly-once delivery、Consumer crash 的 recovery)。


適用場景

  • API → 異步處理(email、通知、圖片處理)
  • ETL pipeline(Extract → Transform → Load 的各階段)
  • 事件驅動架構(event production → event processing)
  • Rate limiting(把高峰流量平滑分散到後端)