
ETL 不是大公司才需要的東西。只要你有「從 A 把資料搬到 B」的需求——每天匯出報表、同步資料到分析 DB、定期清理過期資料——你就在做 ETL。問題是,你是用「可維護」的方式做,還是用「寫一個 cron script 然後祈禱它不要壞」的方式做?
先講結論
ETL 的三條生存法則:冪等性(跑兩次不會產生雙份資料)、可回放(能重新跑某一天的資料)、明確的失敗策略(失敗時停下來告警,不要靜悄悄地寫入一半)。
冪等性:跑兩次不會爆
你的 ETL cron 在半夜跑,跑到一半 timeout 了。你重新跑一次。如果沒有冪等設計,同一批資料就會被寫入兩次。帳目多了一倍,客戶打來問為什麼扣了兩次款。
做法很單純:用唯一鍵 upsert。
INSERT INTO sales (order_id, amount, date)
VALUES (:order_id, :amount, :date)
ON CONFLICT (order_id)
DO UPDATE SET amount = EXCLUDED.amount;每筆資料有唯一 key,重複寫入就是更新,不會產生多餘的 row。
可回放:能重跑某一天
ETL 跑完之後發現資料有誤。你需要修正邏輯後重跑 3 月 5 號的資料。如果你的 ETL 不支援指定日期重跑,你就得手動清資料再跑一次——這個「手動清」的過程本身就充滿風險。
#!/bin/bash
set -euo pipefail
DATE=${1:-$(date +%Y-%m-%d)}
python extract.py --date $DATE
python transform.py --date $DATE
python load.py --date $DATE所有輸入保存、log 中記錄批次範圍(batch_id: 2024-09-15)、支援指定日期參數。這樣出問題時你有「重跑」的能力,而不是只能往前跑。
Checkpoint:失敗了從哪裡接
如果你的 ETL 處理 10 萬筆資料,跑到第 8 萬筆掛了。沒有 checkpoint 的話,你只能從頭跑。有 checkpoint,你可以從第 8 萬筆接下去。
{
"batch_id": "2024-09-15",
"last_offset": 80000,
"status": "running"
}驗證:load 之前先檢查
不要直接把 extract 出來的資料塞進目標 DB。先做 schema validation,確認資料格式正確。
from pydantic import BaseModel
class Order(BaseModel):
order_id: str
amount: float
date: strPydantic 驗證失敗的資料不要 load,記錄下來發告警。你不會想在 prod DB 裡發現一堆 amount = NaN 的 row。
失敗策略:停下來比繼續更重要
- Extract 失敗:不進 Transform
- Transform 失敗:保留原始資料
- Load 失敗:不要寫入部分資料
全部成功才 commit,用 transaction 包起來。寫入一半的資料比沒有資料更可怕——因為你不知道哪些是對的、哪些是半成品。
失敗一定要告警。我見過 ETL 靜悄悄地失敗了兩週,每天的報表都是空的,但因為沒有告警,營運部門兩週後才發現數字不對。
排程的現實
# /etc/cron.d/etl
0 2 * * * /opt/etl/run.sh >> /var/log/etl.log 2>&1凌晨 2 點跑,避開尖峰時段。但你有想過:如果凌晨 2 點的 job 跑了 3 小時,跟凌晨 5 點的另一個 job 撞了怎麼辦?用 flock 或 checkpoint 確保同一個 ETL 不會同時跑兩個 instance。
ETL 就像搬家。你可以把東西全部丟進車裡然後祈禱到了之後找得到,也可以先貼標籤、做清單、確認每箱都到了。第一種方式快但你一定會少東西。