DAG設計
田中VPoE「Airflowの基礎を押さえた。次はDAGの設計パターンを学ぼう。現場では何十ものDAGが走ることになる。設計が悪いとメンテナンスの地獄が待っている。」
あなた「適切な粒度でDAGを分割することが重要なんですね。」
田中VPoE「そうだ。タスクの粒度、DAGの分割、命名規則、冪等性の確保を一つずつ見ていこう。」
DAG設計の原則
1. 冪等性(Idempotency)
同じDAGを同じパラメータで何度実行しても、結果が同じであることを保証します。
# 悪い例: INSERT(重複が発生する)
def load_data_bad():
engine.execute("INSERT INTO orders SELECT * FROM staging_orders")
# 良い例: MERGE / UPSERT(冪等)
def load_data_good(execution_date):
engine.execute(f"""
DELETE FROM orders WHERE order_date = '{execution_date}';
INSERT INTO orders
SELECT * FROM staging_orders WHERE order_date = '{execution_date}';
""")
2. アトミック性
各タスクは成功か失敗かの2状態のみとし、中途半端な状態を残さないようにします。
3. 適切な粒度
| 粒度 | メリット | デメリット |
|---|---|---|
| 粗い(1DAG = 1タスク) | シンプル | リトライが全体再実行、監視困難 |
| 適切(論理単位) | 部分リトライ可能、進捗が見やすい | 設計に考慮が必要 |
| 細かすぎ | 極めて詳細な監視 | オーバーヘッド大、DAGが複雑化 |
DAG設計パターン
パターン1: リニアパイプライン
extract >> transform >> validate >> load >> notify
最もシンプルなパターンで、ステップが順次実行されます。
パターン2: ファンアウト/ファンイン
extract_orders = PythonOperator(task_id="extract_orders", ...)
extract_customers = PythonOperator(task_id="extract_customers", ...)
extract_products = PythonOperator(task_id="extract_products", ...)
join_data = PythonOperator(task_id="join_data", ...)
load = PythonOperator(task_id="load", ...)
[extract_orders, extract_customers, extract_products] >> join_data >> load
複数のソースからデータを並列取得し、結合するパターンです。
パターン3: 条件分岐
from airflow.operators.python import BranchPythonOperator
def decide_path(**context):
row_count = context["ti"].xcom_pull(task_ids="count_rows")
if row_count > 0:
return "process_data"
return "skip_processing"
branch = BranchPythonOperator(
task_id="branch",
python_callable=decide_path,
)
branch >> [process_data, skip_processing]
パターン4: DAGのチェーン(TriggerDagRunOperator)
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_downstream = TriggerDagRunOperator(
task_id="trigger_mart_build",
trigger_dag_id="build_mart_tables",
wait_for_completion=True,
)
DAGの命名規則
| 要素 | ルール | 例 |
|---|---|---|
| DAG ID | {domain}_{frequency}_{purpose} | sales_daily_etl |
| タスク ID | {action}_{object} | extract_orders, validate_customers |
| タグ | ドメイン、頻度、チーム | ["sales", "daily", "ds-team"] |
XCom: タスク間のデータ受け渡し
def extract(**context):
row_count = 1500
context["ti"].xcom_push(key="row_count", value=row_count)
def transform(**context):
row_count = context["ti"].xcom_pull(task_ids="extract", key="row_count")
print(f"Processing {row_count} rows")
| 注意点 | 説明 |
|---|---|
| サイズ制限 | XComはメタデータDBに保存されるため、大きなデータは不可 |
| 用途 | メタデータ(行数、パス、ステータス)の受け渡しに使用 |
| 大量データ | S3やデータベースを介して受け渡す |
まとめ
| 項目 | ポイント |
|---|---|
| 冪等性 | 同じ入力で何度実行しても同じ結果を保証 |
| 粒度 | 論理単位でタスクを分割し、部分リトライを可能にする |
| 設計パターン | リニア、ファンアウト/イン、条件分岐、DAGチェーン |
| XCom | タスク間の小さなデータ受け渡し用(大量データはNG) |
チェックリスト
- 冪等性の重要性と実現方法を説明できる
- 4つのDAG設計パターンを使い分けられる
- DAGの命名規則を理解している
- XComの用途と制限を理解している
次のステップへ
DAG設計パターンを学びました。次はタスク間の依存関係管理について詳しく学びましょう。
推定読了時間:30分