エラーハンドリング
田中VPoE「DAGの設計と依存関係管理を学んだ。本番運用で最も重要なのがエラーハンドリングだ。深夜バッチが失敗したときにどう対処するかを設計しておく必要がある。」
あなた「リトライの設定だけでなく、失敗時の通知やフォールバック処理も考慮する必要があるんですね。」
田中VPoE「その通り。失敗は必ず起こる前提で設計しよう。」
リトライ戦略
基本的なリトライ設定
default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
| パラメータ | 説明 | 推奨値 |
|---|---|---|
retries | リトライ回数 | 2〜5回 |
retry_delay | リトライ間隔 | 5〜10分 |
retry_exponential_backoff | 指数バックオフ | True |
max_retry_delay | 最大リトライ間隔 | 30〜60分 |
タスクごとのリトライ設定
# API呼び出しは多めにリトライ
extract_api = PythonOperator(
task_id="extract_from_api",
python_callable=call_api,
retries=5,
retry_delay=timedelta(minutes=2),
retry_exponential_backoff=True,
)
# データベース投入はリトライ少なめ
load_db = PythonOperator(
task_id="load_to_db",
python_callable=load_data,
retries=2,
retry_delay=timedelta(minutes=10),
)
コールバック関数
def on_failure_callback(context):
"""タスク失敗時のコールバック"""
task_id = context["task_instance"].task_id
dag_id = context["dag"].dag_id
execution_date = context["execution_date"]
exception = context.get("exception", "Unknown error")
message = f"""
DAG失敗通知:
DAG: {dag_id}
Task: {task_id}
Date: {execution_date}
Error: {exception}
"""
send_slack_notification(message, channel="#data-alerts")
def on_success_callback(context):
"""DAG成功時のコールバック"""
dag_id = context["dag"].dag_id
send_slack_notification(
f"DAG {dag_id} completed successfully",
channel="#data-info"
)
with DAG(
"etl_pipeline",
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
...
):
pass
SLA(Service Level Agreement)
with DAG(
"critical_daily_etl",
sla_miss_callback=sla_miss_alert,
...
):
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
sla=timedelta(hours=1), # 1時間以内に完了すべき
)
def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
"""SLA違反時のアラート"""
message = f"SLA violation: {dag.dag_id}, tasks: {task_list}"
send_slack_notification(message, channel="#data-critical")
エラー時のフォールバックパターン
パターン1: 代替処理
from airflow.operators.python import BranchPythonOperator
def check_data_source(**context):
try:
check_api_available()
return "extract_from_api"
except Exception:
return "extract_from_cache"
branch = BranchPythonOperator(
task_id="check_source",
python_callable=check_data_source,
)
extract_api = PythonOperator(task_id="extract_from_api", ...)
extract_cache = PythonOperator(task_id="extract_from_cache", ...)
branch >> [extract_api, extract_cache]
パターン2: 部分的なスキップ
from airflow.exceptions import AirflowSkipException
def process_with_skip(**context):
data = load_data()
if len(data) == 0:
raise AirflowSkipException("No data to process")
transform(data)
パターン3: デッドレターキュー
def process_with_dlq(**context):
"""処理失敗レコードをDLQに保存する"""
records = load_records()
success, failed = [], []
for record in records:
try:
result = transform(record)
success.append(result)
except Exception as e:
record["error"] = str(e)
failed.append(record)
save_to_warehouse(success)
if failed:
save_to_dlq(failed)
context["ti"].xcom_push(key="failed_count", value=len(failed))
アラートの設計
| レベル | 条件 | 通知先 |
|---|---|---|
| INFO | DAG正常完了 | #data-info |
| WARNING | SLA違反、リトライ発生 | #data-alerts |
| CRITICAL | DAG最終失敗 | #data-critical + PagerDuty |
まとめ
| 項目 | ポイント |
|---|---|
| リトライ | 指数バックオフでリトライ、タスクごとに適切な回数を設定 |
| コールバック | 失敗/成功時に通知を自動送信 |
| SLA | 処理時間の期待値を設定し、違反時にアラート |
| フォールバック | 代替処理、スキップ、デッドレターキューの3パターン |
チェックリスト
- リトライ戦略(指数バックオフ含む)を設定できる
- コールバック関数で失敗時の通知を実装できる
- SLAの設定方法を理解している
- エラー時のフォールバックパターンを使い分けられる
次のステップへ
エラーハンドリングの設計を学びました。次は演習で、Airflowのワークフローを実際に構築してみましょう。
推定読了時間:30分