タスク依存関係
田中VPoE「DAGの設計パターンを学んだね。次は依存関係の管理をもう少し深掘りしよう。DAG内の依存関係だけでなく、DAG間の依存関係やセンサーによる外部イベント待ちも重要だ。」
あなた「上流のDAGが完了してから下流のDAGを実行するようなケースですね。」
田中VPoE「そうだ。データの到着を待って処理を開始するパターンもよく使う。」
タスク依存関係の種類
DAG内の依存関係
# ビットシフト演算子
task_a >> task_b >> task_c
# 並列実行 + 合流
task_a >> [task_b, task_c] >> task_d
# set_upstream / set_downstream
task_b.set_upstream(task_a)
task_c.set_downstream(task_d)
トリガールール
| ルール | 説明 | ユースケース |
|---|---|---|
all_success | 全上流タスクが成功(デフォルト) | 通常のパイプライン |
all_failed | 全上流タスクが失敗 | エラーハンドリング |
one_success | 1つ以上の上流タスクが成功 | OR条件の分岐 |
one_failed | 1つ以上の上流タスクが失敗 | アラート通知 |
none_failed | 失敗した上流タスクがない(スキップはOK) | 条件分岐後の合流 |
from airflow.utils.trigger_rule import TriggerRule
notify_on_failure = EmailOperator(
task_id="notify_failure",
to="ds-team@netshop.co.jp",
subject="Pipeline Failed",
trigger_rule=TriggerRule.ONE_FAILED,
)
センサー(外部イベント待ち)
センサーは特定の条件が満たされるまで待機するOperatorです。
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# ファイルの到着を待つ
wait_for_file = FileSensor(
task_id="wait_for_data_file",
filepath="/data/incoming/orders_{{ ds }}.csv",
poke_interval=300, # 5分ごとにチェック
timeout=3600, # 1時間でタイムアウト
mode="poke",
)
# S3オブジェクトの到着を待つ
wait_for_s3 = S3KeySensor(
task_id="wait_for_s3_data",
bucket_name="netshop-data",
bucket_key="raw/orders/{{ ds }}/data.parquet",
poke_interval=300,
timeout=7200,
)
# 他のDAGの完了を待つ
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_extraction",
external_dag_id="data_extraction_dag",
external_task_id="final_task",
execution_delta=timedelta(hours=0),
timeout=3600,
)
センサーモード
| モード | 動作 | メモリ使用 |
|---|---|---|
poke | ワーカースロットを占有して待機 | 高 |
reschedule | チェック間はスロットを解放 | 低 |
# rescheduleモードの推奨使用
wait_for_file = FileSensor(
task_id="wait_for_data",
filepath="/data/incoming/{{ ds }}.csv",
mode="reschedule", # スロットを解放
poke_interval=600,
)
DAG間の依存関係
TriggerDagRunOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_transform = TriggerDagRunOperator(
task_id="trigger_transform_dag",
trigger_dag_id="transform_pipeline",
conf={"source": "orders", "date": "{{ ds }}"},
wait_for_completion=True,
poke_interval=60,
)
Dataset駆動のスケジューリング(Airflow 2.4+)
from airflow.datasets import Dataset
# プロデューサーDAG
orders_dataset = Dataset("s3://netshop-data/staging/orders/")
with DAG("produce_orders", ...):
produce = PythonOperator(
task_id="produce",
python_callable=process_orders,
outlets=[orders_dataset], # データセットの更新を宣言
)
# コンシューマーDAG(データセット更新でトリガー)
with DAG(
"consume_orders",
schedule=[orders_dataset], # データセット更新をトリガーに
):
consume = PythonOperator(
task_id="consume",
python_callable=build_mart,
)
TaskGroup: タスクのグループ化
from airflow.utils.task_group import TaskGroup
with DAG("etl_pipeline", ...):
with TaskGroup("extract") as extract_group:
extract_orders = PythonOperator(task_id="orders", ...)
extract_customers = PythonOperator(task_id="customers", ...)
with TaskGroup("transform") as transform_group:
clean = PythonOperator(task_id="clean", ...)
enrich = PythonOperator(task_id="enrich", ...)
clean >> enrich
with TaskGroup("load") as load_group:
load_warehouse = PythonOperator(task_id="warehouse", ...)
extract_group >> transform_group >> load_group
まとめ
| 項目 | ポイント |
|---|---|
| トリガールール | タスクの実行条件を柔軟に制御(all_success、one_failedなど) |
| センサー | 外部イベント(ファイル到着、他DAG完了)を待機 |
| DAG間依存 | TriggerDagRunOperatorまたはDatasetで連携 |
| TaskGroup | 関連タスクをグループ化してUIの可読性を向上 |
チェックリスト
- トリガールールの種類と使い分けを説明できる
- センサーの用途とモード(poke / reschedule)の違いを理解している
- DAG間の依存関係を設計できる
- TaskGroupでタスクを整理できる
次のステップへ
タスク依存関係の管理を学びました。次はエラーハンドリングとリトライ戦略について学びましょう。
推定読了時間:30分