Airflow基礎
田中VPoE「dbtでデータ変換パイプラインを構築できたね。次はこれをオーケストレーションする仕組みを学ぼう。Apache Airflowは業界標準のワークフローオーケストレーターだ。」
あなた「cronジョブの代わりになるんですよね。依存関係の管理やエラー時のリトライも自動でやってくれるんですか?」
田中VPoE「その通り。深夜バッチが失敗しても誰も気づかない、という現状をAirflowで解決しよう。」
Apache Airflowとは
Apache Airflowは、ワークフローをプログラムで定義・スケジュール・監視するためのオープンソースプラットフォームです。
Airflowの特徴
| 特徴 | 説明 |
|---|---|
| DAGベース | ワークフローを有向非巡回グラフ(DAG)として定義 |
| Python native | DAGの定義はPythonコードで記述 |
| 豊富なOperator | Bash、Python、SQL、HTTP、クラウドサービス等 |
| Web UI | DAGの実行状況をリアルタイムで監視可能 |
| スケジューリング | cron式によるスケジュール実行 |
| リトライ | 失敗時の自動リトライ機能 |
Airflowのアーキテクチャ
[Web Server] ← ブラウザ
↕
[Scheduler] → DAGファイル読み込み → タスクキュー
↕
[Executor] → [Worker 1] [Worker 2] [Worker 3]
↕
[Metadata DB] (PostgreSQL)
最初のDAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
"owner": "ds-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["ds-team@netshop.co.jp"],
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="netshop_daily_etl",
default_args=default_args,
description="NetShop日次ETLパイプライン",
schedule_interval="0 2 * * *", # 毎日2:00
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "daily"],
) as dag:
extract = BashOperator(
task_id="extract_data",
bash_command="python /opt/etl/extract.py",
)
def transform_data():
import pandas as pd
df = pd.read_parquet("/data/raw/orders.parquet")
df_cleaned = df.dropna(subset=["order_id"])
df_cleaned.to_parquet("/data/staging/orders.parquet")
transform = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
)
load = BashOperator(
task_id="load_to_warehouse",
bash_command="python /opt/etl/load.py",
)
# 依存関係の定義
extract >> transform >> load
主要なOperator
| Operator | 用途 | 例 |
|---|---|---|
| BashOperator | シェルコマンド実行 | スクリプト実行、ファイル操作 |
| PythonOperator | Python関数実行 | データ処理、API呼び出し |
| SQLExecuteQueryOperator | SQL実行 | テーブル作成、データ投入 |
| S3ToWarehouseOperator | S3からDWH | データロード |
| EmailOperator | メール送信 | 通知、レポート送付 |
| DbtOperator | dbt実行 | dbt run、dbt test |
スケジューリング
# cron式の例
schedule_interval="0 2 * * *" # 毎日2:00
schedule_interval="0 */6 * * *" # 6時間ごと
schedule_interval="0 9 * * 1" # 毎週月曜9:00
schedule_interval="0 0 1 * *" # 毎月1日0:00
| パラメータ | 説明 |
|---|---|
start_date | DAGの開始日 |
schedule_interval | 実行間隔(cron式またはtimedelta) |
catchup | 過去の未実行分をバックフィルするか |
max_active_runs | 同時実行可能なDAGの最大数 |
まとめ
| 項目 | ポイント |
|---|---|
| Airflowの役割 | ワークフローのオーケストレーション(定義・スケジュール・監視) |
| DAG定義 | PythonコードでDAGを定義、Operatorでタスクを記述 |
| スケジューリング | cron式で柔軟なスケジュール設定が可能 |
| アーキテクチャ | Web Server + Scheduler + Executor + Metadata DB |
チェックリスト
- Airflowの役割とアーキテクチャを説明できる
- 基本的なDAGをPythonで定義できる
- 主要なOperatorの種類と用途を理解している
- cron式によるスケジューリングを設定できる
次のステップへ
Airflowの基礎を理解しました。次はDAGの設計パターンとベストプラクティスを学びましょう。
推定読了時間:30分