LESSON 30分

Airflow基礎

田中VPoE「dbtでデータ変換パイプラインを構築できたね。次はこれをオーケストレーションする仕組みを学ぼう。Apache Airflowは業界標準のワークフローオーケストレーターだ。」

あなた「cronジョブの代わりになるんですよね。依存関係の管理やエラー時のリトライも自動でやってくれるんですか?」

田中VPoE「その通り。深夜バッチが失敗しても誰も気づかない、という現状をAirflowで解決しよう。」

Apache Airflowとは

Apache Airflowは、ワークフローをプログラムで定義・スケジュール・監視するためのオープンソースプラットフォームです。

Airflowの特徴

特徴説明
DAGベースワークフローを有向非巡回グラフ(DAG)として定義
Python nativeDAGの定義はPythonコードで記述
豊富なOperatorBash、Python、SQL、HTTP、クラウドサービス等
Web UIDAGの実行状況をリアルタイムで監視可能
スケジューリングcron式によるスケジュール実行
リトライ失敗時の自動リトライ機能

Airflowのアーキテクチャ

[Web Server] ← ブラウザ

[Scheduler] → DAGファイル読み込み → タスクキュー

[Executor] → [Worker 1] [Worker 2] [Worker 3]

[Metadata DB] (PostgreSQL)

最初のDAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    "owner": "ds-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["ds-team@netshop.co.jp"],
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="netshop_daily_etl",
    default_args=default_args,
    description="NetShop日次ETLパイプライン",
    schedule_interval="0 2 * * *",  # 毎日2:00
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "daily"],
) as dag:

    extract = BashOperator(
        task_id="extract_data",
        bash_command="python /opt/etl/extract.py",
    )

    def transform_data():
        import pandas as pd
        df = pd.read_parquet("/data/raw/orders.parquet")
        df_cleaned = df.dropna(subset=["order_id"])
        df_cleaned.to_parquet("/data/staging/orders.parquet")

    transform = PythonOperator(
        task_id="transform_data",
        python_callable=transform_data,
    )

    load = BashOperator(
        task_id="load_to_warehouse",
        bash_command="python /opt/etl/load.py",
    )

    # 依存関係の定義
    extract >> transform >> load

主要なOperator

Operator用途
BashOperatorシェルコマンド実行スクリプト実行、ファイル操作
PythonOperatorPython関数実行データ処理、API呼び出し
SQLExecuteQueryOperatorSQL実行テーブル作成、データ投入
S3ToWarehouseOperatorS3からDWHデータロード
EmailOperatorメール送信通知、レポート送付
DbtOperatordbt実行dbt run、dbt test

スケジューリング

# cron式の例
schedule_interval="0 2 * * *"     # 毎日2:00
schedule_interval="0 */6 * * *"   # 6時間ごと
schedule_interval="0 9 * * 1"     # 毎週月曜9:00
schedule_interval="0 0 1 * *"     # 毎月1日0:00
パラメータ説明
start_dateDAGの開始日
schedule_interval実行間隔(cron式またはtimedelta)
catchup過去の未実行分をバックフィルするか
max_active_runs同時実行可能なDAGの最大数

まとめ

項目ポイント
Airflowの役割ワークフローのオーケストレーション(定義・スケジュール・監視)
DAG定義PythonコードでDAGを定義、Operatorでタスクを記述
スケジューリングcron式で柔軟なスケジュール設定が可能
アーキテクチャWeb Server + Scheduler + Executor + Metadata DB

チェックリスト

  • Airflowの役割とアーキテクチャを説明できる
  • 基本的なDAGをPythonで定義できる
  • 主要なOperatorの種類と用途を理解している
  • cron式によるスケジューリングを設定できる

次のステップへ

Airflowの基礎を理解しました。次はDAGの設計パターンとベストプラクティスを学びましょう。


推定読了時間:30分