
Event-driven & Queue:不是所有事都要即時回應
使用者下訂單後,系統要做的事情很多:扣庫存、處理付款、寄確認信、通知倉庫出貨、記錄分析事件。如果這些都在一個 HTTP request 裡同步完成,使用者要等 5 秒以上才能看到「訂單成功」的頁面。更糟的是,如果寄信的 SMTP server 暫時掛了,整個訂單就會失敗。正確的做法是:只有核心操作(扣庫存、處理付款)同步完成,其餘的丟到訊息佇列讓 Worker 非同步處理。
架構概覽
flowchart LR Producer[Producer\nAPI Service] -->|發布事件| Queue[Message Queue\nRedis / RabbitMQ] Queue -->|消費| C1[Consumer A\n寄送通知] Queue -->|消費| C2[Consumer B\n資料分析] C1 -->|失敗重試| Queue C1 -->|超過重試上限| DLQ[Dead Letter Queue] DLQ -->|人工處理| Admin[管理員]
架構概覽
flowchart LR API[API Service] -->|publish event| Queue[Message Queue\nRedis / RabbitMQ / Kafka] Queue -->|consume| Worker1[Email Worker\n寄送通知信] Queue -->|consume| Worker2[Analytics Worker\n記錄分析事件] Queue -->|consume| Worker3[Warehouse Worker\n通知倉庫出貨] Queue -->|consume| Worker4[Webhook Worker\n通知第三方] Worker1 -->|retry on failure| Queue DLQ[Dead Letter Queue\n處理失敗的訊息] --> Admin[Admin\n人工處理]
API 完成核心操作後,發布事件到 Queue。各個 Worker 從 Queue 消費事件並處理。處理失敗的訊息進入 Dead Letter Queue(DLQ),由人工檢視。
核心概念
-
同步 vs 非同步的取捨:判斷一個操作應該同步還是非同步,問兩個問題:(1)使用者需要立即看到這個操作的結果嗎?(扣庫存 → 是,寄信 → 否)(2)這個操作失敗了,應該讓使用者看到錯誤嗎?(付款失敗 → 是,分析事件記錄失敗 → 否)。答案都是「是」的,同步處理;有一個「否」的,就適合非同步。
-
Queue 選型:常見的三種選擇各有適用場景。Redis(BullMQ/Celery):最簡單,適合小型應用的背景任務(寄信、圖片處理)。優點是大部分應用已經有 Redis,不需要額外部署。缺點是 Redis 不保證 message durability(除非設定 AOF),大量積壓時記憶體壓力大。RabbitMQ:功能完整的 Message Broker,支援複雜的路由(exchange + routing key)、確認機制(ack)、優先級佇列。適合需要可靠訊息傳遞的業務場景。Kafka:高吞吐量的 Event Streaming 平台,訊息持久化到磁碟,支援 replay。適合需要事件溯源、多個 Consumer Group 獨立消費的場景。小團隊建議從 Redis Queue 開始,有明確需求再升級。
-
重試與 Dead Letter Queue(DLQ):Worker 處理訊息失敗(API timeout、暫時性錯誤)時,不應該直接丟棄。設計重試策略:第 1 次失敗後等 5 秒重試、第 2 次等 30 秒、第 3 次等 5 分鐘(exponential backoff)。超過最大重試次數後,把訊息移到 DLQ。DLQ 裡的訊息需要人工檢視和處理——可能是程式 bug、可能是外部服務永久性故障。
-
冪等消費:和 ETL 一樣,Queue 的 Consumer 也必須設計成冪等的。原因是訊息可能被重複投遞(網路抖動、Consumer 處理到一半 crash 導致沒有 ack)。如果 Consumer 不是冪等的,重複投遞會導致重複寄信、重複扣款。實作方式:每個訊息帶一個唯一 ID(event_id),Consumer 處理前先檢查這個 ID 是否已經處理過。
使用情境
-
訂單後處理:API 完成訂單建立(同步)後,發布
order.created事件。Email Worker 寄確認信、Analytics Worker 記錄轉換事件、Warehouse Worker 通知出貨。即使 SMTP server 暫時掛了,Email Worker 會重試,不影響訂單本身。 -
圖片處理:使用者上傳圖片後,API 立即回傳 200,然後發布
image.uploaded事件。Image Worker 從 MinIO 下載原圖、產生各種尺寸的縮圖、上傳回 MinIO、更新資料庫記錄。使用者不需要等圖片處理完才能繼續操作。 -
流量削峰:促銷活動期間,API 收到每秒 1000 個訂單。如果每個訂單都同步處理(查庫存、扣款、寫入資料庫),資料庫撐不住。改成把訂單先丟到 Queue,Worker 按固定速率消費(每秒 100 個),平穩地處理積壓的訂單。使用者看到「訂單處理中」,幾分鐘後收到確認通知。
實作範例 / 設定範例
Redis Queue(使用 BullMQ + Node.js)
// producer.ts - 發布任務到 Queue
import { Queue } from 'bullmq';
const emailQueue = new Queue('email', {
connection: { host: 'localhost', port: 6379 }
});
// 訂單建立後,發布寄信任務
async function onOrderCreated(order: Order) {
await emailQueue.add('order-confirmation', {
eventId: `order-confirm-${order.id}`, // 冪等 ID
to: order.customerEmail,
orderId: order.id,
totalAmount: order.totalAmount,
}, {
attempts: 3, // 最多重試 3 次
backoff: { type: 'exponential', delay: 5000 }, // 5s, 25s, 125s
removeOnComplete: 100, // 保留最近 100 個完成的 job
removeOnFail: 500, // 保留最近 500 個失敗的 job
});
}// worker.ts - 消費 Queue 的任務
import { Worker } from 'bullmq';
const worker = new Worker('email', async (job) => {
const { eventId, to, orderId, totalAmount } = job.data;
// 冪等檢查:已處理過就跳過
const processed = await redis.get(`processed:${eventId}`);
if (processed) {
console.log(`Already processed: ${eventId}`);
return;
}
// 寄送 email
await sendEmail({
to,
subject: `訂單確認 #${orderId}`,
body: `您的訂單金額為 $${totalAmount},感謝您的購買。`,
});
// 標記已處理
await redis.set(`processed:${eventId}`, '1', 'EX', 86400 * 7); // 7 天過期
console.log(`Email sent for order ${orderId}`);
}, {
connection: { host: 'localhost', port: 6379 },
concurrency: 5, // 同時處理 5 個任務
});
worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed: ${err.message}`);
// 超過最大重試次數後,進入 DLQ 或發告警
});docker-compose 部署 Worker
# docker-compose.yml
services:
api:
image: registry.example.com/myapp/api:v1.2.0
environment:
- REDIS_URL=redis://redis:6379
ports:
- "8000:8000"
email-worker:
image: registry.example.com/myapp/worker:v1.2.0
command: node worker.js
environment:
- REDIS_URL=redis://redis:6379
- SMTP_HOST=${SMTP_HOST}
- SMTP_USER=${SMTP_USER}
- SMTP_PASSWORD=${SMTP_PASSWORD}
deploy:
replicas: 2 # 跑兩個 Worker 實例提高處理速度
restart: unless-stopped
redis:
image: redis:7-alpine
restart: unless-stopped
volumes:
- redis-data:/data
command: redis-server --appendonly yes # 開啟 AOF 持久化
volumes:
redis-data:監控 Queue 狀態
# 使用 BullMQ 的 CLI 工具
npx bullmq-cli --host localhost --port 6379
# 或在 Redis 裡直接查看
redis-cli LLEN bull:email:wait # 等待中的任務數
redis-cli LLEN bull:email:active # 正在處理的任務數
redis-cli SCARD bull:email:failed # 失敗的任務數常見問題與風險
-
Queue 積壓:Producer 的速率遠大於 Consumer 的處理速率,Queue 裡的訊息越積越多。Redis Queue 的訊息積壓會耗盡記憶體。避免方式:監控 Queue 的 pending count,設定告警閾值。增加 Worker 實例數(水平擴展)。如果是暫時的流量高峰,確保 Queue 有足夠的 buffer 能力。
-
訊息遺失:Redis 在沒有開啟 AOF 或 RDB 的情況下,重啟會遺失所有資料(包括 Queue 裡的訊息)。避免方式:Redis 開啟
appendonly yes。對可靠性要求高的場景用 RabbitMQ(自帶持久化)或 Kafka。 -
Consumer 重複處理:Consumer 處理完訊息但還沒 ack 就 crash,Queue 會重新投遞同一條訊息。如果 Consumer 不是冪等的,會重複處理(例如重複寄信)。避免方式:每個訊息帶唯一 event_id,Consumer 用 Redis SET 記錄已處理的 ID,處理前先檢查。
-
DLQ 無人處理:失敗的訊息進入 DLQ,但沒有人定期檢視。DLQ 裡可能有需要人工介入的重要任務(例如退款失敗)。避免方式:設定 DLQ 的監控告警(DLQ 有新訊息就通知)。定期回顧 DLQ,修復根本原因後重新投遞。
優點
- 服務解耦:Producer 和 Consumer 不需要互相知道對方的存在
- 流量削峰:Queue 作為緩衝,平穩處理流量尖峰
- 失敗隔離:某個 Worker 掛了不影響其他 Worker 和 API
缺點 / 限制
- 引入最終一致性,不適合需要強一致的場景
- Debug 更困難(訊息在 Queue 裡流動,不像同步呼叫可以追蹤堆疊)
- Queue 本身成為依賴,需要監控和維護