LESSON 30分

エラーハンドリング

田中VPoE「DAGの設計と依存関係管理を学んだ。本番運用で最も重要なのがエラーハンドリングだ。深夜バッチが失敗したときにどう対処するかを設計しておく必要がある。」

あなた「リトライの設定だけでなく、失敗時の通知やフォールバック処理も考慮する必要があるんですね。」

田中VPoE「その通り。失敗は必ず起こる前提で設計しよう。」

リトライ戦略

基本的なリトライ設定

default_args = {
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
}
パラメータ説明推奨値
retriesリトライ回数2〜5回
retry_delayリトライ間隔5〜10分
retry_exponential_backoff指数バックオフTrue
max_retry_delay最大リトライ間隔30〜60分

タスクごとのリトライ設定

# API呼び出しは多めにリトライ
extract_api = PythonOperator(
    task_id="extract_from_api",
    python_callable=call_api,
    retries=5,
    retry_delay=timedelta(minutes=2),
    retry_exponential_backoff=True,
)

# データベース投入はリトライ少なめ
load_db = PythonOperator(
    task_id="load_to_db",
    python_callable=load_data,
    retries=2,
    retry_delay=timedelta(minutes=10),
)

コールバック関数

def on_failure_callback(context):
    """タスク失敗時のコールバック"""
    task_id = context["task_instance"].task_id
    dag_id = context["dag"].dag_id
    execution_date = context["execution_date"]
    exception = context.get("exception", "Unknown error")

    message = f"""
    DAG失敗通知:
    DAG: {dag_id}
    Task: {task_id}
    Date: {execution_date}
    Error: {exception}
    """
    send_slack_notification(message, channel="#data-alerts")

def on_success_callback(context):
    """DAG成功時のコールバック"""
    dag_id = context["dag"].dag_id
    send_slack_notification(
        f"DAG {dag_id} completed successfully",
        channel="#data-info"
    )

with DAG(
    "etl_pipeline",
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
    ...
):
    pass

SLA(Service Level Agreement)

with DAG(
    "critical_daily_etl",
    sla_miss_callback=sla_miss_alert,
    ...
):
    extract = PythonOperator(
        task_id="extract",
        python_callable=extract_data,
        sla=timedelta(hours=1),  # 1時間以内に完了すべき
    )

def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
    """SLA違反時のアラート"""
    message = f"SLA violation: {dag.dag_id}, tasks: {task_list}"
    send_slack_notification(message, channel="#data-critical")

エラー時のフォールバックパターン

パターン1: 代替処理

from airflow.operators.python import BranchPythonOperator

def check_data_source(**context):
    try:
        check_api_available()
        return "extract_from_api"
    except Exception:
        return "extract_from_cache"

branch = BranchPythonOperator(
    task_id="check_source",
    python_callable=check_data_source,
)

extract_api = PythonOperator(task_id="extract_from_api", ...)
extract_cache = PythonOperator(task_id="extract_from_cache", ...)

branch >> [extract_api, extract_cache]

パターン2: 部分的なスキップ

from airflow.exceptions import AirflowSkipException

def process_with_skip(**context):
    data = load_data()
    if len(data) == 0:
        raise AirflowSkipException("No data to process")
    transform(data)

パターン3: デッドレターキュー

def process_with_dlq(**context):
    """処理失敗レコードをDLQに保存する"""
    records = load_records()
    success, failed = [], []

    for record in records:
        try:
            result = transform(record)
            success.append(result)
        except Exception as e:
            record["error"] = str(e)
            failed.append(record)

    save_to_warehouse(success)
    if failed:
        save_to_dlq(failed)
        context["ti"].xcom_push(key="failed_count", value=len(failed))

アラートの設計

レベル条件通知先
INFODAG正常完了#data-info
WARNINGSLA違反、リトライ発生#data-alerts
CRITICALDAG最終失敗#data-critical + PagerDuty

まとめ

項目ポイント
リトライ指数バックオフでリトライ、タスクごとに適切な回数を設定
コールバック失敗/成功時に通知を自動送信
SLA処理時間の期待値を設定し、違反時にアラート
フォールバック代替処理、スキップ、デッドレターキューの3パターン

チェックリスト

  • リトライ戦略(指数バックオフ含む)を設定できる
  • コールバック関数で失敗時の通知を実装できる
  • SLAの設定方法を理解している
  • エラー時のフォールバックパターンを使い分けられる

次のステップへ

エラーハンドリングの設計を学びました。次は演習で、Airflowのワークフローを実際に構築してみましょう。


推定読了時間:30分