EXERCISE 90分

演習:Airflowでワークフローを構築しよう

田中VPoE「Airflowの基礎、DAG設計、依存関係、エラーハンドリングを学んだ。ここでNetShop社の日次ETLパイプラインをAirflow DAGとして設計・実装しよう。」

あなた「データ抽出からdbt実行、品質チェック、通知まで一気通貫のパイプラインですね。」

田中VPoE「そうだ。エラーハンドリングとSLAも含めて、本番運用レベルの品質で作ってほしい。」

ミッション概要

NetShop社の日次データパイプラインをAirflow DAGとして構築します。複数ソースからのデータ抽出、dbtによる変換、品質チェック、通知を含む本番運用レベルのワークフローを設計します。

前提条件

  • Step 3の各レッスン(Airflow基礎、DAG設計、タスク依存関係、エラーハンドリング)を修了していること
  • Airflowの基本概念を理解していること

Mission 1: DAGの基本設計(30分)

以下の要件を満たすDAGを設計してください。

要件

  1. 毎日2
  2. 3つのデータソース(orders、customers、products)から並列にデータ抽出
  3. データ抽出後にdbt run → dbt test を実行
  4. 完了時にSlack通知
解答例
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup

default_args = {
    "owner": "ds-team",
    "depends_on_past": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
    "email_on_failure": True,
    "email": ["ds-team@netshop.co.jp"],
}

def extract_data(source: str, **context):
    execution_date = context["ds"]
    print(f"Extracting {source} data for {execution_date}")
    # 実際にはAPI/DB/S3からデータを取得
    return {"source": source, "rows": 1000}

def notify_slack(status: str, **context):
    dag_id = context["dag"].dag_id
    print(f"Slack notification: {dag_id} - {status}")

with DAG(
    dag_id="netshop_daily_etl",
    default_args=default_args,
    description="NetShop日次ETLパイプライン",
    schedule_interval="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "daily", "ds-team"],
    max_active_runs=1,
) as dag:

    with TaskGroup("extract") as extract_group:
        extract_orders = PythonOperator(
            task_id="orders",
            python_callable=extract_data,
            op_kwargs={"source": "orders"},
        )
        extract_customers = PythonOperator(
            task_id="customers",
            python_callable=extract_data,
            op_kwargs={"source": "customers"},
        )
        extract_products = PythonOperator(
            task_id="products",
            python_callable=extract_data,
            op_kwargs={"source": "products"},
        )

    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command="cd /opt/dbt/netshop && dbt run --profiles-dir .",
        sla=timedelta(hours=1),
    )

    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command="cd /opt/dbt/netshop && dbt test --profiles-dir .",
    )

    notify_success = PythonOperator(
        task_id="notify_success",
        python_callable=notify_slack,
        op_kwargs={"status": "SUCCESS"},
    )

    extract_group >> dbt_run >> dbt_test >> notify_success

Mission 2: エラーハンドリングの実装(30分)

Mission 1のDAGにエラーハンドリングを追加してください。

要件

  1. タスク失敗時のSlack通知コールバック
  2. データ抽出失敗時のフォールバック(キャッシュからの読み込み)
  3. SLA設定(全体2時間以内)
解答例
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

def on_failure_callback(context):
    task_id = context["task_instance"].task_id
    dag_id = context["dag"].dag_id
    error = context.get("exception", "Unknown")
    print(f"ALERT: {dag_id}/{task_id} failed: {error}")

def on_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(f"SLA MISS: {dag.dag_id}, tasks: {task_list}")

def extract_with_fallback(source: str, **context):
    """フォールバック付きのデータ抽出"""
    try:
        data = extract_from_primary(source)
        context["ti"].xcom_push(key="source_type", value="primary")
        return data
    except Exception as e:
        print(f"Primary extraction failed: {e}, falling back to cache")
        data = extract_from_cache(source)
        context["ti"].xcom_push(key="source_type", value="cache")
        return data

with DAG(
    dag_id="netshop_daily_etl_v2",
    default_args={
        **default_args,
        "on_failure_callback": on_failure_callback,
    },
    sla_miss_callback=on_sla_miss,
    schedule_interval="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "daily", "ds-team"],
    max_active_runs=1,
) as dag:

    with TaskGroup("extract") as extract_group:
        for source in ["orders", "customers", "products"]:
            PythonOperator(
                task_id=source,
                python_callable=extract_with_fallback,
                op_kwargs={"source": source},
                sla=timedelta(minutes=30),
            )

    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command="cd /opt/dbt/netshop && dbt run --profiles-dir .",
        sla=timedelta(hours=1),
    )

    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command="cd /opt/dbt/netshop && dbt test --profiles-dir .",
    )

    notify_success = PythonOperator(
        task_id="notify_success",
        python_callable=notify_slack,
        op_kwargs={"status": "SUCCESS"},
        trigger_rule=TriggerRule.ALL_SUCCESS,
    )

    notify_failure = PythonOperator(
        task_id="notify_failure",
        python_callable=notify_slack,
        op_kwargs={"status": "FAILURE"},
        trigger_rule=TriggerRule.ONE_FAILED,
    )

    extract_group >> dbt_run >> dbt_test
    dbt_test >> [notify_success, notify_failure]

Mission 3: データ品質チェックの統合(30分)

dbt testに加えて、カスタムのデータ品質チェックタスクを追加してください。

要件

  1. 行数チェック(前日比で急激な変動がないか)
  2. NULL率チェック(重要カラムのNULL率が閾値以下か)
  3. 品質チェック結果のレポート出力
解答例
def data_quality_check(**context):
    """カスタムデータ品質チェック"""
    checks = []

    # 行数チェック
    current_count = get_row_count("mart_customer_summary")
    previous_count = get_previous_row_count("mart_customer_summary")
    ratio = current_count / max(previous_count, 1)

    checks.append({
        "check": "row_count_ratio",
        "table": "mart_customer_summary",
        "value": round(ratio, 2),
        "threshold": "0.8 - 1.5",
        "status": "PASS" if 0.8 <= ratio <= 1.5 else "FAIL",
    })

    # NULL率チェック
    critical_columns = {
        "mart_customer_summary": ["customer_id", "total_orders"],
        "mart_monthly_revenue": ["month", "revenue"],
    }

    for table, columns in critical_columns.items():
        for col in columns:
            null_rate = get_null_rate(table, col)
            checks.append({
                "check": "null_rate",
                "table": f"{table}.{col}",
                "value": f"{null_rate:.2%}",
                "threshold": "< 1%",
                "status": "PASS" if null_rate < 0.01 else "FAIL",
            })

    # レポート出力
    import pandas as pd
    report_df = pd.DataFrame(checks)
    report_df.to_csv(f"/data/reports/quality_{context['ds']}.csv", index=False)

    failed = report_df[report_df["status"] == "FAIL"]
    if len(failed) > 0:
        context["ti"].xcom_push(key="quality_issues", value=len(failed))
        raise ValueError(f"Data quality check failed: {len(failed)} issues")

    return "All checks passed"

quality_check = PythonOperator(
    task_id="data_quality_check",
    python_callable=data_quality_check,
    retries=0,  # 品質チェックはリトライしない
)

dbt_test >> quality_check >> notify_success

達成度チェック

  • TaskGroupを使った並列抽出を含むDAGを設計できた
  • dbt run → dbt test の実行フローを構築できた
  • エラー時のコールバックとフォールバック処理を実装できた
  • SLAを設定し、違反時のアラートを実装できた
  • カスタムデータ品質チェックをパイプラインに統合できた

推定所要時間:90分