LESSON 30分

タスク依存関係

田中VPoE「DAGの設計パターンを学んだね。次は依存関係の管理をもう少し深掘りしよう。DAG内の依存関係だけでなく、DAG間の依存関係やセンサーによる外部イベント待ちも重要だ。」

あなた「上流のDAGが完了してから下流のDAGを実行するようなケースですね。」

田中VPoE「そうだ。データの到着を待って処理を開始するパターンもよく使う。」

タスク依存関係の種類

DAG内の依存関係

# ビットシフト演算子
task_a >> task_b >> task_c

# 並列実行 + 合流
task_a >> [task_b, task_c] >> task_d

# set_upstream / set_downstream
task_b.set_upstream(task_a)
task_c.set_downstream(task_d)

トリガールール

ルール説明ユースケース
all_success全上流タスクが成功(デフォルト)通常のパイプライン
all_failed全上流タスクが失敗エラーハンドリング
one_success1つ以上の上流タスクが成功OR条件の分岐
one_failed1つ以上の上流タスクが失敗アラート通知
none_failed失敗した上流タスクがない(スキップはOK)条件分岐後の合流
from airflow.utils.trigger_rule import TriggerRule

notify_on_failure = EmailOperator(
    task_id="notify_failure",
    to="ds-team@netshop.co.jp",
    subject="Pipeline Failed",
    trigger_rule=TriggerRule.ONE_FAILED,
)

センサー(外部イベント待ち)

センサーは特定の条件が満たされるまで待機するOperatorです。

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

# ファイルの到着を待つ
wait_for_file = FileSensor(
    task_id="wait_for_data_file",
    filepath="/data/incoming/orders_{{ ds }}.csv",
    poke_interval=300,  # 5分ごとにチェック
    timeout=3600,       # 1時間でタイムアウト
    mode="poke",
)

# S3オブジェクトの到着を待つ
wait_for_s3 = S3KeySensor(
    task_id="wait_for_s3_data",
    bucket_name="netshop-data",
    bucket_key="raw/orders/{{ ds }}/data.parquet",
    poke_interval=300,
    timeout=7200,
)

# 他のDAGの完了を待つ
wait_for_upstream = ExternalTaskSensor(
    task_id="wait_for_extraction",
    external_dag_id="data_extraction_dag",
    external_task_id="final_task",
    execution_delta=timedelta(hours=0),
    timeout=3600,
)

センサーモード

モード動作メモリ使用
pokeワーカースロットを占有して待機
rescheduleチェック間はスロットを解放
# rescheduleモードの推奨使用
wait_for_file = FileSensor(
    task_id="wait_for_data",
    filepath="/data/incoming/{{ ds }}.csv",
    mode="reschedule",  # スロットを解放
    poke_interval=600,
)

DAG間の依存関係

TriggerDagRunOperator

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_transform = TriggerDagRunOperator(
    task_id="trigger_transform_dag",
    trigger_dag_id="transform_pipeline",
    conf={"source": "orders", "date": "{{ ds }}"},
    wait_for_completion=True,
    poke_interval=60,
)

Dataset駆動のスケジューリング(Airflow 2.4+)

from airflow.datasets import Dataset

# プロデューサーDAG
orders_dataset = Dataset("s3://netshop-data/staging/orders/")

with DAG("produce_orders", ...):
    produce = PythonOperator(
        task_id="produce",
        python_callable=process_orders,
        outlets=[orders_dataset],  # データセットの更新を宣言
    )

# コンシューマーDAG(データセット更新でトリガー)
with DAG(
    "consume_orders",
    schedule=[orders_dataset],  # データセット更新をトリガーに
):
    consume = PythonOperator(
        task_id="consume",
        python_callable=build_mart,
    )

TaskGroup: タスクのグループ化

from airflow.utils.task_group import TaskGroup

with DAG("etl_pipeline", ...):
    with TaskGroup("extract") as extract_group:
        extract_orders = PythonOperator(task_id="orders", ...)
        extract_customers = PythonOperator(task_id="customers", ...)

    with TaskGroup("transform") as transform_group:
        clean = PythonOperator(task_id="clean", ...)
        enrich = PythonOperator(task_id="enrich", ...)
        clean >> enrich

    with TaskGroup("load") as load_group:
        load_warehouse = PythonOperator(task_id="warehouse", ...)

    extract_group >> transform_group >> load_group

まとめ

項目ポイント
トリガールールタスクの実行条件を柔軟に制御(all_success、one_failedなど)
センサー外部イベント(ファイル到着、他DAG完了)を待機
DAG間依存TriggerDagRunOperatorまたはDatasetで連携
TaskGroup関連タスクをグループ化してUIの可読性を向上

チェックリスト

  • トリガールールの種類と使い分けを説明できる
  • センサーの用途とモード(poke / reschedule)の違いを理解している
  • DAG間の依存関係を設計できる
  • TaskGroupでタスクを整理できる

次のステップへ

タスク依存関係の管理を学びました。次はエラーハンドリングとリトライ戦略について学びましょう。


推定読了時間:30分