LESSON 30分

DAG設計

田中VPoE「Airflowの基礎を押さえた。次はDAGの設計パターンを学ぼう。現場では何十ものDAGが走ることになる。設計が悪いとメンテナンスの地獄が待っている。」

あなた「適切な粒度でDAGを分割することが重要なんですね。」

田中VPoE「そうだ。タスクの粒度、DAGの分割、命名規則、冪等性の確保を一つずつ見ていこう。」

DAG設計の原則

1. 冪等性(Idempotency)

同じDAGを同じパラメータで何度実行しても、結果が同じであることを保証します。

# 悪い例: INSERT(重複が発生する)
def load_data_bad():
    engine.execute("INSERT INTO orders SELECT * FROM staging_orders")

# 良い例: MERGE / UPSERT(冪等)
def load_data_good(execution_date):
    engine.execute(f"""
        DELETE FROM orders WHERE order_date = '{execution_date}';
        INSERT INTO orders
        SELECT * FROM staging_orders WHERE order_date = '{execution_date}';
    """)

2. アトミック性

各タスクは成功か失敗かの2状態のみとし、中途半端な状態を残さないようにします。

3. 適切な粒度

粒度メリットデメリット
粗い(1DAG = 1タスク)シンプルリトライが全体再実行、監視困難
適切(論理単位)部分リトライ可能、進捗が見やすい設計に考慮が必要
細かすぎ極めて詳細な監視オーバーヘッド大、DAGが複雑化

DAG設計パターン

パターン1: リニアパイプライン

extract >> transform >> validate >> load >> notify

最もシンプルなパターンで、ステップが順次実行されます。

パターン2: ファンアウト/ファンイン

extract_orders = PythonOperator(task_id="extract_orders", ...)
extract_customers = PythonOperator(task_id="extract_customers", ...)
extract_products = PythonOperator(task_id="extract_products", ...)

join_data = PythonOperator(task_id="join_data", ...)
load = PythonOperator(task_id="load", ...)

[extract_orders, extract_customers, extract_products] >> join_data >> load

複数のソースからデータを並列取得し、結合するパターンです。

パターン3: 条件分岐

from airflow.operators.python import BranchPythonOperator

def decide_path(**context):
    row_count = context["ti"].xcom_pull(task_ids="count_rows")
    if row_count > 0:
        return "process_data"
    return "skip_processing"

branch = BranchPythonOperator(
    task_id="branch",
    python_callable=decide_path,
)

branch >> [process_data, skip_processing]

パターン4: DAGのチェーン(TriggerDagRunOperator)

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_downstream = TriggerDagRunOperator(
    task_id="trigger_mart_build",
    trigger_dag_id="build_mart_tables",
    wait_for_completion=True,
)

DAGの命名規則

要素ルール
DAG ID{domain}_{frequency}_{purpose}sales_daily_etl
タスク ID{action}_{object}extract_orders, validate_customers
タグドメイン、頻度、チーム["sales", "daily", "ds-team"]

XCom: タスク間のデータ受け渡し

def extract(**context):
    row_count = 1500
    context["ti"].xcom_push(key="row_count", value=row_count)

def transform(**context):
    row_count = context["ti"].xcom_pull(task_ids="extract", key="row_count")
    print(f"Processing {row_count} rows")
注意点説明
サイズ制限XComはメタデータDBに保存されるため、大きなデータは不可
用途メタデータ(行数、パス、ステータス)の受け渡しに使用
大量データS3やデータベースを介して受け渡す

まとめ

項目ポイント
冪等性同じ入力で何度実行しても同じ結果を保証
粒度論理単位でタスクを分割し、部分リトライを可能にする
設計パターンリニア、ファンアウト/イン、条件分岐、DAGチェーン
XComタスク間の小さなデータ受け渡し用(大量データはNG)

チェックリスト

  • 冪等性の重要性と実現方法を説明できる
  • 4つのDAG設計パターンを使い分けられる
  • DAGの命名規則を理解している
  • XComの用途と制限を理解している

次のステップへ

DAG設計パターンを学びました。次はタスク間の依存関係管理について詳しく学びましょう。


推定読了時間:30分