問題:Read-Modify-Write 競爭

Race condition 的經典場景是 Read-Modify-Write

請求 A:讀到庫存 = 1  →  判斷可以扣  →  寫入庫存 = 0
請求 B:讀到庫存 = 1  →  判斷可以扣  →  寫入庫存 = 0

兩個請求幾乎同時進來,都讀到庫存是 1,都認為可以扣,最後庫存扣了兩次卻只扣到 0,不是 -1。超賣了一個,用戶收到訂單確認,倉庫沒貨。

這不是應用層的 bug,是並發訪問共享狀態的本質問題。應用層加 if (stock > 0) 不夠——兩個請求的 if 都在讀到 1 之後執行,都過了。


解法一:悲觀鎖(SELECT FOR UPDATE)

悲觀的假設:這筆資料很可能被別人同時讀,所以讀的時候先鎖住,讓別人等。

// Sequelize
async decrementStock(productId: string, quantity: number) {
  return db.transaction(async (trx) => {
    // FOR UPDATE:鎖住這一行,其他 transaction 的 SELECT FOR UPDATE 會 block
    const product = await Product.findOne({
      where: { id: productId },
      lock: trx.LOCK.UPDATE,
      transaction: trx,
    });
 
    if (!product || product.stock < quantity) {
      throw new InsufficientStockError();
    }
 
    await product.update(
      { stock: product.stock - quantity },
      { transaction: trx }
    );
 
    return product;
  });
}
-- 等價 SQL
BEGIN;
SELECT * FROM products WHERE id = $1 FOR UPDATE;  -- 鎖住
UPDATE products SET stock = stock - $2 WHERE id = $1;
COMMIT;

特性

  • 第二個請求的 SELECT FOR UPDATE 會 block,等第一個 commit 後才繼續
  • 保證資料一致性,不會超賣
  • 高並發下效能影響大(排隊等鎖)
  • 長 transaction 容易造成 deadlock

適合:並發量不高但一致性要求高的場景(金融交易、合約變更)。


解法二:樂觀鎖(版本號)

樂觀的假設:大部分情況下不會有衝突,先讀,更新時再檢查資料有沒有被別人改過。

// 資料表加 version 欄位
ALTER TABLE products ADD COLUMN version INTEGER NOT NULL DEFAULT 0;
 
// 更新時帶上版本號作為條件
async decrementStock(productId: string, quantity: number, version: number) {
  const updated = await Product.update(
    {
      stock: db.literal(`stock - ${quantity}`),
      version: db.literal('version + 1'),
    },
    {
      where: {
        id: productId,
        version,           // 版本號必須對上
        stock: { [Op.gte]: quantity },  // 庫存必須夠
      },
    }
  );
 
  if (updated[0] === 0) {
    // 沒有更新到任何 row → 版本號衝突(被別人搶先更新了)或庫存不足
    throw new OptimisticLockError('Concurrent modification detected');
  }
}

應用層的 retry 邏輯

async purchaseWithRetry(productId: string, quantity: number, maxRetries = 3) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    const product = await Product.findById(productId);
 
    try {
      await decrementStock(productId, quantity, product.version);
      return;  // 成功
    } catch (error) {
      if (error instanceof OptimisticLockError && attempt < maxRetries - 1) {
        // 被搶了,稍等再試
        await sleep(Math.random() * 100);  // jitter 避免同時重試
        continue;
      }
      throw error;
    }
  }
 
  throw new Error('Failed after max retries');
}

特性

  • 不鎖資料,高並發下效能好
  • 衝突時需要重試(可能重試多次)
  • 高衝突率場景下重試開銷高,反而不如悲觀鎖

適合:讀多寫少、衝突率低的場景(用戶資料更新、文件版本控制)。


解法三:原子操作(DB 層保證)

直接用 DB 的原子操作,不需要鎖:

// DB 層做原子扣減 + 條件檢查
const result = await db.query(`
  UPDATE products
  SET stock = stock - :quantity
  WHERE id = :id
    AND stock >= :quantity
  RETURNING stock
`, { replacements: { id: productId, quantity } });
 
if (result[0].length === 0) {
  throw new InsufficientStockError();
}
-- 等價 SQL(單一 statement,DB 保證原子性)
UPDATE products
SET stock = stock - 5
WHERE id = 'prod-123' AND stock >= 5;
-- 如果 stock < 5,這行不更新任何 row

特性

  • 最簡單,不需要 transaction 包裝
  • DB 在單一 statement 層級保證原子性
  • 無法做複雜的條件判斷(多個欄位、跨 table)

適合:簡單的計數器操作(庫存扣減、點讚數、瀏覽數)。


解法四:Redis 分散式鎖(跨服務場景)

多個 pod 或多個服務都要操作同一個資源時,DB 鎖不夠用,需要應用層的分散式鎖:

import { Redis } from 'ioredis';
 
async function withDistributedLock<T>(
  redis: Redis,
  lockKey: string,
  fn: () => Promise<T>,
  options = { ttl: 10000, retries: 3, retryDelay: 100 }
): Promise<T> {
  const lockValue = crypto.randomUUID();
 
  for (let i = 0; i < options.retries; i++) {
    // SET NX:只有 key 不存在時才設定(原子操作)
    const acquired = await redis.set(lockKey, lockValue, 'NX', 'PX', options.ttl);
 
    if (acquired) {
      try {
        return await fn();
      } finally {
        // 用 Lua script 確保只刪自己的鎖(原子操作)
        await redis.eval(
          `if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end`,
          1, lockKey, lockValue
        );
      }
    }
 
    if (i < options.retries - 1) {
      await sleep(options.retryDelay * (i + 1));
    }
  }
 
  throw new Error(`Failed to acquire lock: ${lockKey}`);
}
 
// 使用
await withDistributedLock(redis, `inventory:${productId}`, async () => {
  await decrementStock(productId, quantity);
});

注意事項

  • TTL 必須大於 fn() 的執行時間,否則鎖會在操作完成前過期
  • 刪鎖時必須用 Lua script 確保「只刪自己的鎖」的原子性
  • Redis 單節點方案:Redis 掛掉鎖消失;高可用需要 Redlock 演算法(多個 Redis 節點)

各解法的選型指引

並發量低、一致性要求高   → 悲觀鎖(SELECT FOR UPDATE)
並發量高、衝突率低       → 樂觀鎖(版本號 + retry)
簡單計數器操作           → 原子 UPDATE(最輕量)
跨服務 / 跨 pod          → Redis 分散式鎖

常見誤用

// ❌ 兩個 query,中間有 race window
const product = await Product.findById(id);
if (product.stock < quantity) throw new Error('...');
await product.update({ stock: product.stock - quantity });  // 這兩行不是原子的
 
// ✅ 用原子操作或鎖
await Product.update(
  { stock: db.literal(`stock - ${quantity}`) },
  { where: { id, stock: { [Op.gte]: quantity } } }
);

Deadlock 預防

悲觀鎖場景下,兩個 transaction 互相等對方持有的鎖:

T1 鎖住 A,等 B
T2 鎖住 B,等 A
→ Deadlock

預防方式

  1. 固定鎖定順序:所有 transaction 都按照相同的資源順序拿鎖(先鎖 A 再鎖 B)
  2. 縮短 transaction 時間:長 transaction 佔鎖時間久,deadlock 機率高
  3. 設定 Lock TimeoutSET LOCAL lock_timeout = '2s',等超時就放棄
// PostgreSQL 設定 lock timeout
await db.query("SET LOCAL lock_timeout = '2s'");
const product = await Product.findOne({ lock: 'UPDATE', transaction: trx });
// 如果 2 秒內拿不到鎖,拋 LockNotAvailable 錯誤

延伸閱讀