批量發送 email——你有 10,000 封要發,每次 API call 需要 100ms。依序執行要 1000 秒(16 分鐘)。

你不想無限制並行——瞬間發 10,000 個 request,email provider API 會 rate limit 你,或者你的服務因為 goroutine/thread 暴增而不穩定。

Worker Pool 是解法:固定建立 N 個 worker,所有任務放進佇列,worker 一次取一個,做完再取下一個。N 個任務並行,總時間降到 10000 / N * 0.1 秒。


Go:Channel 讓 Worker Pool 最自然

Go 的 channel 天生適合 Worker Pool——把任務放進 channel,worker goroutine 從 channel 取:

package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
func workerPool(jobs <-chan int, results chan<- int, workerCount int) {
    var wg sync.WaitGroup
 
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                // 模擬工作
                time.Sleep(100 * time.Millisecond)
                results <- job * 2
            }
        }()
    }
 
    // 所有 worker 結束後關閉 results channel
    go func() {
        wg.Wait()
        close(results)
    }()
}
 
func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
 
    // 啟動 5 個 worker
    workerPool(jobs, results, 5)
 
    // 送出 10 個任務
    for i := 0; i < 10; i++ {
        jobs <- i
    }
    close(jobs)
 
    // 收集結果
    for result := range results {
        fmt.Println(result)
    }
}

這個模式在 Go 很自然,因為三件事剛好對齊:for job := range jobs 讓 worker 持續從 channel 取工作,channel 關閉時自動結束迴圈;close(jobs) 通知所有 worker「沒有更多工作了」,worker 在取完剩餘工作後自然退出;sync.WaitGroup 等所有 worker 完成後才關 results channel。Channel 的設計讓「任務分發」和「結果收集」的同步問題自然解決,不需要手動管理鎖。


Python:ThreadPoolExecutor

Python 有內建的 concurrent.futures.ThreadPoolExecutor,直接封裝了 Worker Pool 模式:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def process_job(job_id: int) -> int:
    time.sleep(0.1)  # 模擬 I/O 工作
    return job_id * 2
 
def main():
    jobs = list(range(10))
 
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 提交所有任務
        futures = {executor.submit(process_job, job): job for job in jobs}
 
        # 以完成順序收結果
        for future in as_completed(futures):
            result = future.result()
            print(result)
 
main()

Python 的選擇取決於工作類型:ThreadPoolExecutor 用 OS thread,適合 I/O 密集任務(GIL 在 I/O 時釋放,thread 可以真正並行等待);ProcessPoolExecutor 用行程,真正繞過 GIL,適合 CPU 密集任務。

from concurrent.futures import ProcessPoolExecutor
 
def cpu_heavy(n: int) -> int:
    return sum(i * i for i in range(n))
 
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_heavy, [10**6] * 8))

Python vs Go 在這個模式的差異:Python 的 ThreadPoolExecutor 幫你封裝了所有細節(任務佇列、worker 管理、結果收集),用起來更簡單,但你對底層行為的控制比較少(你不能自定義 worker 退出條件、任務優先權等)。


Node.js:Worker Threads + Queue

Node.js 的 event loop 是單 thread,做 CPU 密集任務需要用 worker_threads

// worker.js(在 worker thread 裡執行)
const { parentPort } = require('worker_threads');
 
parentPort.on('message', (job) => {
    // 模擬工作
    const result = job * 2;
    parentPort.postMessage(result);
});
// main.js
const { Worker } = require('worker_threads');
const path = require('path');
 
class WorkerPool {
    constructor(workerCount) {
        this.workers = [];
        this.queue = [];
        this.resolvers = new Map();
 
        for (let i = 0; i < workerCount; i++) {
            const worker = new Worker(path.join(__dirname, 'worker.js'));
            worker.on('message', (result) => {
                const resolve = this.resolvers.get(worker);
                this.resolvers.delete(worker);
                if (resolve) resolve(result);
                this.processNext(worker);
            });
            this.workers.push(worker);
        }
    }
 
    run(job) {
        return new Promise((resolve) => {
            const idleWorker = this.workers.find(w => !this.resolvers.has(w));
            if (idleWorker) {
                this.resolvers.set(idleWorker, resolve);
                idleWorker.postMessage(job);
            } else {
                this.queue.push({ job, resolve });
            }
        });
    }
 
    processNext(worker) {
        if (this.queue.length > 0) {
            const { job, resolve } = this.queue.shift();
            this.resolvers.set(worker, resolve);
            worker.postMessage(job);
        }
    }
}
 
async function main() {
    const pool = new WorkerPool(5);
    const jobs = Array.from({ length: 10 }, (_, i) => i);
    const results = await Promise.all(jobs.map(job => pool.run(job)));
    console.log(results);
}
 
main();

Node.js 版本比 Go 和 Python 複雜得多,原因是 worker threads 之間只能透過 postMessage 傳訊息(不能共享記憶體,需要序列化),你需要自己維護「哪個 worker 是 idle」的狀態,也沒有內建的 Worker Pool 抽象。

對 Node.js 的 I/O 密集工作(fetch API、DB query),不需要 worker_threads——直接用 Promise.all 就是 Worker Pool 的效果:

// I/O 密集的 "Worker Pool"——不需要 threads
const fetchUser = async (id) => {
    return await db.query(`SELECT * FROM users WHERE id = ?`, [id]);
};
 
// 5 個同時進行
const CONCURRENCY = 5;
const ids = [...Array(100).keys()];
const results = [];
 
for (let i = 0; i < ids.length; i += CONCURRENCY) {
    const batch = ids.slice(i, i + CONCURRENCY);
    const batchResults = await Promise.all(batch.map(fetchUser));
    results.push(...batchResults);
}

三個實作的核心對照

GoPython (I/O)Node.js (I/O)
Worker 的形態GoroutineOS ThreadPromise
任務佇列Channel(內建)內部 queue(框架管)手動或 Promise.all
結果收集ChannelFuture/as_completedPromise
需要手動管理同步?少(channel 處理)少(框架處理)I/O 不需要,CPU 要手動

Go 的 channel 在這個模式特別適合:任務分發(往 jobs channel 送)和工作結束(close jobs channel)的語意直接對應到「Producer-Consumer」的概念,程式碼的結構和心智模型高度一致。

下一篇:跨語言 Anti-patterns