cover

Event-driven & Queue:不是所有事都要即時回應

使用者下訂單後,系統要做的事情很多:扣庫存、處理付款、寄確認信、通知倉庫出貨、記錄分析事件。如果這些都在一個 HTTP request 裡同步完成,使用者要等 5 秒以上才能看到「訂單成功」的頁面。更糟的是,如果寄信的 SMTP server 暫時掛了,整個訂單就會失敗。正確的做法是:只有核心操作(扣庫存、處理付款)同步完成,其餘的丟到訊息佇列讓 Worker 非同步處理。

架構概覽

flowchart LR
  Producer[Producer\nAPI Service] -->|發布事件| Queue[Message Queue\nRedis / RabbitMQ]
  Queue -->|消費| C1[Consumer A\n寄送通知]
  Queue -->|消費| C2[Consumer B\n資料分析]
  C1 -->|失敗重試| Queue
  C1 -->|超過重試上限| DLQ[Dead Letter Queue]
  DLQ -->|人工處理| Admin[管理員]

架構概覽

flowchart LR
  API[API Service] -->|publish event| Queue[Message Queue\nRedis / RabbitMQ / Kafka]

  Queue -->|consume| Worker1[Email Worker\n寄送通知信]
  Queue -->|consume| Worker2[Analytics Worker\n記錄分析事件]
  Queue -->|consume| Worker3[Warehouse Worker\n通知倉庫出貨]
  Queue -->|consume| Worker4[Webhook Worker\n通知第三方]

  Worker1 -->|retry on failure| Queue
  DLQ[Dead Letter Queue\n處理失敗的訊息] --> Admin[Admin\n人工處理]

API 完成核心操作後,發布事件到 Queue。各個 Worker 從 Queue 消費事件並處理。處理失敗的訊息進入 Dead Letter Queue(DLQ),由人工檢視。

核心概念

  1. 同步 vs 非同步的取捨:判斷一個操作應該同步還是非同步,問兩個問題:(1)使用者需要立即看到這個操作的結果嗎?(扣庫存 → 是,寄信 → 否)(2)這個操作失敗了,應該讓使用者看到錯誤嗎?(付款失敗 → 是,分析事件記錄失敗 → 否)。答案都是「是」的,同步處理;有一個「否」的,就適合非同步。

  2. Queue 選型:常見的三種選擇各有適用場景。Redis(BullMQ/Celery):最簡單,適合小型應用的背景任務(寄信、圖片處理)。優點是大部分應用已經有 Redis,不需要額外部署。缺點是 Redis 不保證 message durability(除非設定 AOF),大量積壓時記憶體壓力大。RabbitMQ:功能完整的 Message Broker,支援複雜的路由(exchange + routing key)、確認機制(ack)、優先級佇列。適合需要可靠訊息傳遞的業務場景。Kafka:高吞吐量的 Event Streaming 平台,訊息持久化到磁碟,支援 replay。適合需要事件溯源、多個 Consumer Group 獨立消費的場景。小團隊建議從 Redis Queue 開始,有明確需求再升級。

  3. 重試與 Dead Letter Queue(DLQ):Worker 處理訊息失敗(API timeout、暫時性錯誤)時,不應該直接丟棄。設計重試策略:第 1 次失敗後等 5 秒重試、第 2 次等 30 秒、第 3 次等 5 分鐘(exponential backoff)。超過最大重試次數後,把訊息移到 DLQ。DLQ 裡的訊息需要人工檢視和處理——可能是程式 bug、可能是外部服務永久性故障。

  4. 冪等消費:和 ETL 一樣,Queue 的 Consumer 也必須設計成冪等的。原因是訊息可能被重複投遞(網路抖動、Consumer 處理到一半 crash 導致沒有 ack)。如果 Consumer 不是冪等的,重複投遞會導致重複寄信、重複扣款。實作方式:每個訊息帶一個唯一 ID(event_id),Consumer 處理前先檢查這個 ID 是否已經處理過。

使用情境

  • 訂單後處理:API 完成訂單建立(同步)後,發布 order.created 事件。Email Worker 寄確認信、Analytics Worker 記錄轉換事件、Warehouse Worker 通知出貨。即使 SMTP server 暫時掛了,Email Worker 會重試,不影響訂單本身。

  • 圖片處理:使用者上傳圖片後,API 立即回傳 200,然後發布 image.uploaded 事件。Image Worker 從 MinIO 下載原圖、產生各種尺寸的縮圖、上傳回 MinIO、更新資料庫記錄。使用者不需要等圖片處理完才能繼續操作。

  • 流量削峰:促銷活動期間,API 收到每秒 1000 個訂單。如果每個訂單都同步處理(查庫存、扣款、寫入資料庫),資料庫撐不住。改成把訂單先丟到 Queue,Worker 按固定速率消費(每秒 100 個),平穩地處理積壓的訂單。使用者看到「訂單處理中」,幾分鐘後收到確認通知。

實作範例 / 設定範例

Redis Queue(使用 BullMQ + Node.js)

// producer.ts - 發布任務到 Queue
import { Queue } from 'bullmq';
 
const emailQueue = new Queue('email', {
  connection: { host: 'localhost', port: 6379 }
});
 
// 訂單建立後,發布寄信任務
async function onOrderCreated(order: Order) {
  await emailQueue.add('order-confirmation', {
    eventId: `order-confirm-${order.id}`,  // 冪等 ID
    to: order.customerEmail,
    orderId: order.id,
    totalAmount: order.totalAmount,
  }, {
    attempts: 3,                    // 最多重試 3 次
    backoff: { type: 'exponential', delay: 5000 },  // 5s, 25s, 125s
    removeOnComplete: 100,          // 保留最近 100 個完成的 job
    removeOnFail: 500,              // 保留最近 500 個失敗的 job
  });
}
// worker.ts - 消費 Queue 的任務
import { Worker } from 'bullmq';
 
const worker = new Worker('email', async (job) => {
  const { eventId, to, orderId, totalAmount } = job.data;
 
  // 冪等檢查:已處理過就跳過
  const processed = await redis.get(`processed:${eventId}`);
  if (processed) {
    console.log(`Already processed: ${eventId}`);
    return;
  }
 
  // 寄送 email
  await sendEmail({
    to,
    subject: `訂單確認 #${orderId}`,
    body: `您的訂單金額為 $${totalAmount},感謝您的購買。`,
  });
 
  // 標記已處理
  await redis.set(`processed:${eventId}`, '1', 'EX', 86400 * 7);  // 7 天過期
 
  console.log(`Email sent for order ${orderId}`);
}, {
  connection: { host: 'localhost', port: 6379 },
  concurrency: 5,  // 同時處理 5 個任務
});
 
worker.on('failed', (job, err) => {
  console.error(`Job ${job?.id} failed: ${err.message}`);
  // 超過最大重試次數後,進入 DLQ 或發告警
});

docker-compose 部署 Worker

# docker-compose.yml
services:
  api:
    image: registry.example.com/myapp/api:v1.2.0
    environment:
      - REDIS_URL=redis://redis:6379
    ports:
      - "8000:8000"
 
  email-worker:
    image: registry.example.com/myapp/worker:v1.2.0
    command: node worker.js
    environment:
      - REDIS_URL=redis://redis:6379
      - SMTP_HOST=${SMTP_HOST}
      - SMTP_USER=${SMTP_USER}
      - SMTP_PASSWORD=${SMTP_PASSWORD}
    deploy:
      replicas: 2  # 跑兩個 Worker 實例提高處理速度
    restart: unless-stopped
 
  redis:
    image: redis:7-alpine
    restart: unless-stopped
    volumes:
      - redis-data:/data
    command: redis-server --appendonly yes  # 開啟 AOF 持久化
 
volumes:
  redis-data:

監控 Queue 狀態

# 使用 BullMQ 的 CLI 工具
npx bullmq-cli --host localhost --port 6379
 
# 或在 Redis 裡直接查看
redis-cli LLEN bull:email:wait      # 等待中的任務數
redis-cli LLEN bull:email:active    # 正在處理的任務數
redis-cli SCARD bull:email:failed   # 失敗的任務數

常見問題與風險

  • Queue 積壓:Producer 的速率遠大於 Consumer 的處理速率,Queue 裡的訊息越積越多。Redis Queue 的訊息積壓會耗盡記憶體。避免方式:監控 Queue 的 pending count,設定告警閾值。增加 Worker 實例數(水平擴展)。如果是暫時的流量高峰,確保 Queue 有足夠的 buffer 能力。

  • 訊息遺失:Redis 在沒有開啟 AOF 或 RDB 的情況下,重啟會遺失所有資料(包括 Queue 裡的訊息)。避免方式:Redis 開啟 appendonly yes。對可靠性要求高的場景用 RabbitMQ(自帶持久化)或 Kafka。

  • Consumer 重複處理:Consumer 處理完訊息但還沒 ack 就 crash,Queue 會重新投遞同一條訊息。如果 Consumer 不是冪等的,會重複處理(例如重複寄信)。避免方式:每個訊息帶唯一 event_id,Consumer 用 Redis SET 記錄已處理的 ID,處理前先檢查。

  • DLQ 無人處理:失敗的訊息進入 DLQ,但沒有人定期檢視。DLQ 裡可能有需要人工介入的重要任務(例如退款失敗)。避免方式:設定 DLQ 的監控告警(DLQ 有新訊息就通知)。定期回顧 DLQ,修復根本原因後重新投遞。

優點

  • 服務解耦:Producer 和 Consumer 不需要互相知道對方的存在
  • 流量削峰:Queue 作為緩衝,平穩處理流量尖峰
  • 失敗隔離:某個 Worker 掛了不影響其他 Worker 和 API

缺點 / 限制

  • 引入最終一致性,不適合需要強一致的場景
  • Debug 更困難(訊息在 Queue 裡流動,不像同步呼叫可以追蹤堆疊)
  • Queue 本身成為依賴,需要監控和維護

延伸閱讀