
Data Movement / ETL:資料不會自己搬家
在實際的業務系統中,資料經常需要從一個地方搬到另一個地方、從一種格式轉成另一種格式。例如:每天把訂單資料從 PostgreSQL 匯出到 Google Sheets 給業務看、把第三方 API 的資料同步到本地資料庫、把多個來源的資料合併後產出報表。這就是 ETL(Extract, Transform, Load)要做的事。對小型團隊來說,不需要 Airflow 或 Spark 這種重型工具,用 cron + script + 良好的設計模式就能解決大部分需求。
架構概覽
flowchart LR Source[(資料來源\nDB / API / CSV)] -->|Extract\n提取| ETL[ETL Script\n排程執行] ETL -->|Transform\n清洗 & 轉換| Valid{資料驗證} Valid -->|通過| Load[Load\n載入目標] Valid -->|失敗| Alert[告警通知\n中止載入] Load --> Dest[(目標\nDB / 報表 / 檔案)] Cron[Cron 排程器] -->|定時觸發| ETL
架構概覽
flowchart LR Source1[(PostgreSQL\n訂單資料)] -->|Extract| ETL[ETL Script\nPython / Node.js] Source2[Third-party API\n匯率/庫存] -->|Extract| ETL Source3[CSV Upload\n人工上傳] -->|Extract| ETL ETL -->|Transform| Clean[清洗 / 轉換 / 聚合] Clean -->|Load| Target1[(Data Warehouse\nPostgreSQL)] Clean -->|Load| Target2[Google Sheets\n業務報表] Clean -->|Load| Target3[MinIO\n歸檔 CSV] Cron[Cron / Scheduler] -->|trigger| ETL ETL -->|log| Monitor[日誌 + 告警\n成功/失敗通知]
ETL script 由 cron 定時觸發,從各種資料來源 Extract 資料,經過 Transform(清洗、轉換、聚合)後,Load 到目標(資料庫、報表、檔案)。每次執行都記錄日誌並在失敗時告警。
核心概念
-
冪等性(Idempotency):ETL job 可能因為各種原因失敗後重跑(網路逾時、API 限流、目標資料庫暫時不可用)。如果重跑會產生重複資料,那就是災難。冪等性是指同一個 job 跑一次和跑十次的結果相同。實作方式:用
UPSERT(INSERT ON CONFLICT UPDATE)而不是 INSERT、每次跑先清除目標範圍再重新寫入、或用唯一鍵做 deduplication。設計 ETL 時,假設每個 job 都可能被重跑。 -
增量 vs 全量:全量同步每次都處理所有資料,簡單但資料量大時很慢。增量同步只處理「上次之後」的變更(用
updated_at > last_sync_time或 CDC),效率高但邏輯複雜。建議小資料量(<10 萬筆)直接用全量,大資料量(>100 萬筆)用增量。混合策略:平日增量同步、每週做一次全量同步修正可能的漂移。 -
錯誤處理與重試:ETL job 和外部系統互動,失敗是常態而非例外。要設計三層防護:(1)單次操作的重試(API call timeout → wait 5s → retry,最多 3 次)。(2)job 層級的失敗記錄(哪些 record 成功、哪些失敗,下次只重跑失敗的)。(3)告警通知(job 失敗時通知負責人,不要等業務發現報表沒更新才知道)。
-
資料驗證:Extract 後不要直接 Load,要先驗證資料品質。常見的驗證:record count 是否在合理範圍(突然掉到 0 可能是來源出問題)、必要欄位是否為空、數值是否在合理區間、日期格式是否正確。驗證失敗時應該中止 Load 並告警,不要把髒資料寫進去。
使用情境
-
每日訂單報表:每天凌晨 4 點從 PostgreSQL 的 orders 表格 Extract 昨天的訂單資料,Transform 成業務需要的格式(加總金額、計算退款率),Load 到 Google Sheets。業務團隊每天早上打開 Google Sheets 就能看到最新數據。
-
第三方庫存同步:每小時從供應商的 API 拉取庫存資料,和本地資料庫的庫存做比對和更新。因為供應商 API 有 rate limit(每分鐘 60 次),所以 Extract 階段要做分頁和節流。用 UPSERT 確保重跑不會重複。
-
多系統資料合併:CRM 系統、訂單系統、客服系統各有獨立的客戶資料。每天把三個系統的客戶資料合併到一個 Data Warehouse 表,用 email 或 phone 做比對和關聯。業務分析師在 Data Warehouse 上做跨系統的分析。
實作範例 / 設定範例
基本 ETL 腳本結構
#!/usr/bin/env python3
"""daily_order_report.py - 每日訂單報表 ETL"""
import os
import sys
import logging
from datetime import datetime, timedelta
import psycopg2
import csv
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('/var/log/etl/daily_order_report.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def extract(conn, target_date):
"""從 PostgreSQL 提取指定日期的訂單"""
query = """
SELECT order_id, customer_id, total_amount, status, created_at
FROM orders
WHERE created_at::date = %s
ORDER BY created_at
"""
with conn.cursor() as cur:
cur.execute(query, (target_date,))
rows = cur.fetchall()
logger.info(f"Extracted {len(rows)} orders for {target_date}")
return rows
def transform(rows):
"""清洗和轉換資料"""
transformed = []
for row in rows:
order_id, customer_id, amount, status, created_at = row
# 驗證:金額不能為負
if amount < 0:
logger.warning(f"Negative amount for order {order_id}: {amount}")
continue
transformed.append({
'order_id': order_id,
'customer_id': customer_id,
'amount': float(amount),
'status': status,
'date': created_at.strftime('%Y-%m-%d'),
})
logger.info(f"Transformed {len(transformed)} records (filtered {len(rows) - len(transformed)})")
return transformed
def load(records, output_path):
"""輸出為 CSV(也可以改成寫入 Google Sheets 或其他目標)"""
if not records:
logger.warning("No records to load")
return
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)
logger.info(f"Loaded {len(records)} records to {output_path}")
def validate(records, min_count=1):
"""驗證資料品質"""
if len(records) < min_count:
raise ValueError(f"Record count too low: {len(records)} < {min_count}")
total = sum(r['amount'] for r in records)
if total <= 0:
raise ValueError(f"Total amount suspicious: {total}")
logger.info(f"Validation passed: {len(records)} records, total={total}")
def main():
target_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
logger.info(f"=== Starting ETL for {target_date} ===")
try:
conn = psycopg2.connect(os.environ['DATABASE_URL'])
rows = extract(conn, target_date)
records = transform(rows)
validate(records)
load(records, f'/data/reports/orders_{target_date}.csv')
logger.info(f"=== ETL completed successfully ===")
except Exception as e:
logger.error(f"ETL failed: {e}")
# 這裡可以加告警通知(Slack webhook / email)
sys.exit(1)
finally:
conn.close()
if __name__ == '__main__':
main()Cron 排程設定
# crontab - ETL jobs
# 每日訂單報表(凌晨 4 點)
0 4 * * * cd /opt/etl && python3 daily_order_report.py >> /var/log/etl/cron.log 2>&1
# 庫存同步(每小時)
0 * * * * cd /opt/etl && python3 sync_inventory.py >> /var/log/etl/cron.log 2>&1
# 週報彙整(每週一早上 6 點)
0 6 * * 1 cd /opt/etl && python3 weekly_summary.py >> /var/log/etl/cron.log 2>&1UPSERT 範例(冪等寫入)
-- PostgreSQL UPSERT:相同 order_id 就更新,不會重複
INSERT INTO order_summary (order_id, customer_id, amount, status, report_date)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (order_id)
DO UPDATE SET
amount = EXCLUDED.amount,
status = EXCLUDED.status,
updated_at = NOW();常見問題與風險
-
Cron job 靜默失敗:cron job 失敗了但沒有通知機制,直到業務發現報表好幾天沒更新才發現。避免方式:每個 ETL job 在結束時發送成功/失敗通知。用 healthcheck 服務(如 Healthchecks.io)監控 cron job 的定期執行——如果預期每天跑但超過 25 小時沒跑,就告警。
-
資料重複:ETL job 被重跑(cron 排程重疊、手動重跑),沒有做冪等性設計,目標資料庫出現重複記錄。避免方式:所有寫入操作用 UPSERT,或在 Load 前先刪除目標日期範圍的資料再重新寫入。
-
來源資料格式變更:第三方 API 改了回傳格式、CSV 的欄位順序變了,ETL job 直接崩潰。避免方式:Extract 後做 schema validation,確認預期的欄位都存在且型別正確。在 Transform 階段用 try-except 處理個別 record 的異常,不要讓一筆壞資料拖垮整個 job。
-
時區問題:ETL job 在 UTC 時區的機器上跑,但業務資料用 Asia/Taipei 時區,「昨天」的定義不一致。避免方式:在 ETL 程式碼裡明確指定時區,不依賴系統時區。日期範圍用 UTC timestamp 而不是 date string。
優點
- Cron + script 簡單直接,不需要額外的排程框架
- 冪等性設計讓重跑安全,不怕失敗後重試
- 資料驗證在 Load 前攔截品質問題
缺點 / 限制
- Cron 排程沒有 dependency 管理(job A 完成後才跑 job B),需要自己設計
- 沒有 UI 看 job 的執行歷史和狀態(需要從日誌裡翻)
- 超大規模資料(>1TB)需要 Airflow / Spark 等專用工具