批量發送 email——你有 10,000 封要發,每次 API call 需要 100ms。依序執行要 1000 秒(16 分鐘)。
你不想無限制並行——瞬間發 10,000 個 request,email provider API 會 rate limit 你,或者你的服務因為 goroutine/thread 暴增而不穩定。
Worker Pool 是解法:固定建立 N 個 worker,所有任務放進佇列,worker 一次取一個,做完再取下一個。N 個任務並行,總時間降到 10000 / N * 0.1 秒。
Go:Channel 讓 Worker Pool 最自然
Go 的 channel 天生適合 Worker Pool——把任務放進 channel,worker goroutine 從 channel 取:
package main
import (
"fmt"
"sync"
"time"
)
func workerPool(jobs <-chan int, results chan<- int, workerCount int) {
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
// 模擬工作
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}()
}
// 所有 worker 結束後關閉 results channel
go func() {
wg.Wait()
close(results)
}()
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 啟動 5 個 worker
workerPool(jobs, results, 5)
// 送出 10 個任務
for i := 0; i < 10; i++ {
jobs <- i
}
close(jobs)
// 收集結果
for result := range results {
fmt.Println(result)
}
}這個模式在 Go 很自然,因為三件事剛好對齊:for job := range jobs 讓 worker 持續從 channel 取工作,channel 關閉時自動結束迴圈;close(jobs) 通知所有 worker「沒有更多工作了」,worker 在取完剩餘工作後自然退出;sync.WaitGroup 等所有 worker 完成後才關 results channel。Channel 的設計讓「任務分發」和「結果收集」的同步問題自然解決,不需要手動管理鎖。
Python:ThreadPoolExecutor
Python 有內建的 concurrent.futures.ThreadPoolExecutor,直接封裝了 Worker Pool 模式:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def process_job(job_id: int) -> int:
time.sleep(0.1) # 模擬 I/O 工作
return job_id * 2
def main():
jobs = list(range(10))
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任務
futures = {executor.submit(process_job, job): job for job in jobs}
# 以完成順序收結果
for future in as_completed(futures):
result = future.result()
print(result)
main()Python 的選擇取決於工作類型:ThreadPoolExecutor 用 OS thread,適合 I/O 密集任務(GIL 在 I/O 時釋放,thread 可以真正並行等待);ProcessPoolExecutor 用行程,真正繞過 GIL,適合 CPU 密集任務。
from concurrent.futures import ProcessPoolExecutor
def cpu_heavy(n: int) -> int:
return sum(i * i for i in range(n))
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_heavy, [10**6] * 8))Python vs Go 在這個模式的差異:Python 的 ThreadPoolExecutor 幫你封裝了所有細節(任務佇列、worker 管理、結果收集),用起來更簡單,但你對底層行為的控制比較少(你不能自定義 worker 退出條件、任務優先權等)。
Node.js:Worker Threads + Queue
Node.js 的 event loop 是單 thread,做 CPU 密集任務需要用 worker_threads:
// worker.js(在 worker thread 裡執行)
const { parentPort } = require('worker_threads');
parentPort.on('message', (job) => {
// 模擬工作
const result = job * 2;
parentPort.postMessage(result);
});// main.js
const { Worker } = require('worker_threads');
const path = require('path');
class WorkerPool {
constructor(workerCount) {
this.workers = [];
this.queue = [];
this.resolvers = new Map();
for (let i = 0; i < workerCount; i++) {
const worker = new Worker(path.join(__dirname, 'worker.js'));
worker.on('message', (result) => {
const resolve = this.resolvers.get(worker);
this.resolvers.delete(worker);
if (resolve) resolve(result);
this.processNext(worker);
});
this.workers.push(worker);
}
}
run(job) {
return new Promise((resolve) => {
const idleWorker = this.workers.find(w => !this.resolvers.has(w));
if (idleWorker) {
this.resolvers.set(idleWorker, resolve);
idleWorker.postMessage(job);
} else {
this.queue.push({ job, resolve });
}
});
}
processNext(worker) {
if (this.queue.length > 0) {
const { job, resolve } = this.queue.shift();
this.resolvers.set(worker, resolve);
worker.postMessage(job);
}
}
}
async function main() {
const pool = new WorkerPool(5);
const jobs = Array.from({ length: 10 }, (_, i) => i);
const results = await Promise.all(jobs.map(job => pool.run(job)));
console.log(results);
}
main();Node.js 版本比 Go 和 Python 複雜得多,原因是 worker threads 之間只能透過 postMessage 傳訊息(不能共享記憶體,需要序列化),你需要自己維護「哪個 worker 是 idle」的狀態,也沒有內建的 Worker Pool 抽象。
對 Node.js 的 I/O 密集工作(fetch API、DB query),不需要 worker_threads——直接用 Promise.all 就是 Worker Pool 的效果:
// I/O 密集的 "Worker Pool"——不需要 threads
const fetchUser = async (id) => {
return await db.query(`SELECT * FROM users WHERE id = ?`, [id]);
};
// 5 個同時進行
const CONCURRENCY = 5;
const ids = [...Array(100).keys()];
const results = [];
for (let i = 0; i < ids.length; i += CONCURRENCY) {
const batch = ids.slice(i, i + CONCURRENCY);
const batchResults = await Promise.all(batch.map(fetchUser));
results.push(...batchResults);
}三個實作的核心對照
| Go | Python (I/O) | Node.js (I/O) | |
|---|---|---|---|
| Worker 的形態 | Goroutine | OS Thread | Promise |
| 任務佇列 | Channel(內建) | 內部 queue(框架管) | 手動或 Promise.all |
| 結果收集 | Channel | Future/as_completed | Promise |
| 需要手動管理同步? | 少(channel 處理) | 少(框架處理) | I/O 不需要,CPU 要手動 |
Go 的 channel 在這個模式特別適合:任務分發(往 jobs channel 送)和工作結束(close jobs channel)的語意直接對應到「Producer-Consumer」的概念,程式碼的結構和心智模型高度一致。