演習:Airflowでワークフローを構築しよう
田中VPoE「Airflowの基礎、DAG設計、依存関係、エラーハンドリングを学んだ。ここでNetShop社の日次ETLパイプラインをAirflow DAGとして設計・実装しよう。」
あなた「データ抽出からdbt実行、品質チェック、通知まで一気通貫のパイプラインですね。」
田中VPoE「そうだ。エラーハンドリングとSLAも含めて、本番運用レベルの品質で作ってほしい。」
ミッション概要
NetShop社の日次データパイプラインをAirflow DAGとして構築します。複数ソースからのデータ抽出、dbtによる変換、品質チェック、通知を含む本番運用レベルのワークフローを設計します。
前提条件
- Step 3の各レッスン(Airflow基礎、DAG設計、タスク依存関係、エラーハンドリング)を修了していること
- Airflowの基本概念を理解していること
Mission 1: DAGの基本設計(30分)
以下の要件を満たすDAGを設計してください。
要件
- 毎日2
- 3つのデータソース(orders、customers、products)から並列にデータ抽出
- データ抽出後にdbt run → dbt test を実行
- 完了時にSlack通知
解答例
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
default_args = {
"owner": "ds-team",
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
"email_on_failure": True,
"email": ["ds-team@netshop.co.jp"],
}
def extract_data(source: str, **context):
execution_date = context["ds"]
print(f"Extracting {source} data for {execution_date}")
# 実際にはAPI/DB/S3からデータを取得
return {"source": source, "rows": 1000}
def notify_slack(status: str, **context):
dag_id = context["dag"].dag_id
print(f"Slack notification: {dag_id} - {status}")
with DAG(
dag_id="netshop_daily_etl",
default_args=default_args,
description="NetShop日次ETLパイプライン",
schedule_interval="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "daily", "ds-team"],
max_active_runs=1,
) as dag:
with TaskGroup("extract") as extract_group:
extract_orders = PythonOperator(
task_id="orders",
python_callable=extract_data,
op_kwargs={"source": "orders"},
)
extract_customers = PythonOperator(
task_id="customers",
python_callable=extract_data,
op_kwargs={"source": "customers"},
)
extract_products = PythonOperator(
task_id="products",
python_callable=extract_data,
op_kwargs={"source": "products"},
)
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="cd /opt/dbt/netshop && dbt run --profiles-dir .",
sla=timedelta(hours=1),
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="cd /opt/dbt/netshop && dbt test --profiles-dir .",
)
notify_success = PythonOperator(
task_id="notify_success",
python_callable=notify_slack,
op_kwargs={"status": "SUCCESS"},
)
extract_group >> dbt_run >> dbt_test >> notify_success
Mission 2: エラーハンドリングの実装(30分)
Mission 1のDAGにエラーハンドリングを追加してください。
要件
- タスク失敗時のSlack通知コールバック
- データ抽出失敗時のフォールバック(キャッシュからの読み込み)
- SLA設定(全体2時間以内)
解答例
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
def on_failure_callback(context):
task_id = context["task_instance"].task_id
dag_id = context["dag"].dag_id
error = context.get("exception", "Unknown")
print(f"ALERT: {dag_id}/{task_id} failed: {error}")
def on_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
print(f"SLA MISS: {dag.dag_id}, tasks: {task_list}")
def extract_with_fallback(source: str, **context):
"""フォールバック付きのデータ抽出"""
try:
data = extract_from_primary(source)
context["ti"].xcom_push(key="source_type", value="primary")
return data
except Exception as e:
print(f"Primary extraction failed: {e}, falling back to cache")
data = extract_from_cache(source)
context["ti"].xcom_push(key="source_type", value="cache")
return data
with DAG(
dag_id="netshop_daily_etl_v2",
default_args={
**default_args,
"on_failure_callback": on_failure_callback,
},
sla_miss_callback=on_sla_miss,
schedule_interval="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "daily", "ds-team"],
max_active_runs=1,
) as dag:
with TaskGroup("extract") as extract_group:
for source in ["orders", "customers", "products"]:
PythonOperator(
task_id=source,
python_callable=extract_with_fallback,
op_kwargs={"source": source},
sla=timedelta(minutes=30),
)
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="cd /opt/dbt/netshop && dbt run --profiles-dir .",
sla=timedelta(hours=1),
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="cd /opt/dbt/netshop && dbt test --profiles-dir .",
)
notify_success = PythonOperator(
task_id="notify_success",
python_callable=notify_slack,
op_kwargs={"status": "SUCCESS"},
trigger_rule=TriggerRule.ALL_SUCCESS,
)
notify_failure = PythonOperator(
task_id="notify_failure",
python_callable=notify_slack,
op_kwargs={"status": "FAILURE"},
trigger_rule=TriggerRule.ONE_FAILED,
)
extract_group >> dbt_run >> dbt_test
dbt_test >> [notify_success, notify_failure]
Mission 3: データ品質チェックの統合(30分)
dbt testに加えて、カスタムのデータ品質チェックタスクを追加してください。
要件
- 行数チェック(前日比で急激な変動がないか)
- NULL率チェック(重要カラムのNULL率が閾値以下か)
- 品質チェック結果のレポート出力
解答例
def data_quality_check(**context):
"""カスタムデータ品質チェック"""
checks = []
# 行数チェック
current_count = get_row_count("mart_customer_summary")
previous_count = get_previous_row_count("mart_customer_summary")
ratio = current_count / max(previous_count, 1)
checks.append({
"check": "row_count_ratio",
"table": "mart_customer_summary",
"value": round(ratio, 2),
"threshold": "0.8 - 1.5",
"status": "PASS" if 0.8 <= ratio <= 1.5 else "FAIL",
})
# NULL率チェック
critical_columns = {
"mart_customer_summary": ["customer_id", "total_orders"],
"mart_monthly_revenue": ["month", "revenue"],
}
for table, columns in critical_columns.items():
for col in columns:
null_rate = get_null_rate(table, col)
checks.append({
"check": "null_rate",
"table": f"{table}.{col}",
"value": f"{null_rate:.2%}",
"threshold": "< 1%",
"status": "PASS" if null_rate < 0.01 else "FAIL",
})
# レポート出力
import pandas as pd
report_df = pd.DataFrame(checks)
report_df.to_csv(f"/data/reports/quality_{context['ds']}.csv", index=False)
failed = report_df[report_df["status"] == "FAIL"]
if len(failed) > 0:
context["ti"].xcom_push(key="quality_issues", value=len(failed))
raise ValueError(f"Data quality check failed: {len(failed)} issues")
return "All checks passed"
quality_check = PythonOperator(
task_id="data_quality_check",
python_callable=data_quality_check,
retries=0, # 品質チェックはリトライしない
)
dbt_test >> quality_check >> notify_success
達成度チェック
- TaskGroupを使った並列抽出を含むDAGを設計できた
- dbt run → dbt test の実行フローを構築できた
- エラー時のコールバックとフォールバック処理を実装できた
- SLAを設定し、違反時のアラートを実装できた
- カスタムデータ品質チェックをパイプラインに統合できた
推定所要時間:90分