学習パイプライン設計
田中VPoE「実験管理と特徴量ストアが整った。次はいよいよ学習パイプラインの自動化だ。今まで手動でやっていたデータ取得→前処理→学習→評価→登録のフローを、DAG(有向非巡回グラフ)として定義する。」
あなた「DAGですか。AirflowやPrefectで使われる概念ですよね。」
田中VPoE「そうだ。各ステップの依存関係を明確にして、変更があった部分だけ再実行できるようにする。これがMLパイプラインの設計の要だ。」
DAGとは
DAG(Directed Acyclic Graph:有向非巡回グラフ)は、タスク間の依存関係を表現するグラフ構造です。
データ取得 → データバリデーション → 前処理 → 学習 → 評価 → 登録
↓
特徴量ストア更新
DAGの利点
| 利点 | 説明 |
|---|---|
| 依存関係の明確化 | どのタスクがどのタスクに依存するか一目瞭然 |
| 並列実行 | 依存のないタスクは並列実行可能 |
| 差分実行 | 変更があったステップだけ再実行 |
| エラーハンドリング | 失敗したステップからリトライ |
| 可視化 | パイプライン全体をグラフで確認 |
パイプラインのステップ設計
基本ステップ
| ステップ | 入力 | 出力 | 失敗時の対応 |
|---|---|---|---|
| 1. データ取得 | データソース設定 | 生データ | リトライ(3回) |
| 2. データバリデーション | 生データ | バリデーション結果 | パイプライン停止、アラート |
| 3. データ前処理 | 生データ | 前処理済みデータ | パイプライン停止 |
| 4. データ分割 | 前処理済みデータ | train/val/test | パイプライン停止 |
| 5. モデル学習 | 学習データ + パラメータ | モデル | リトライ |
| 6. モデル評価 | モデル + テストデータ | メトリクス | パイプライン停止 |
| 7. モデル登録 | 評価済みモデル | レジストリエントリ | スキップ(閾値未達) |
ステップ分割の原則
1. 単一責任の原則
各ステップは1つの責務のみを持ちます。
# Bad: 1つのステップに詰め込みすぎ
@task
def train_and_evaluate_and_register(data):
model = train(data)
metrics = evaluate(model)
register(model)
# Good: 責務を分離
@task
def train_model(train_data, params):
return model
@task
def evaluate_model(model, test_data):
return metrics
@task
def register_model(model, metrics):
return model_version
2. 冪等性の保証
同じ入力で何度実行しても同じ結果になるようにします。
@task
def preprocess(raw_data, config):
# 冪等: 同じraw_dataとconfigなら常に同じ結果
processed = apply_transformations(raw_data, config)
return processed
3. キャッシュの活用
変更がないステップはキャッシュから結果を返します。
from prefect import task
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def preprocess(raw_data):
# raw_dataが同じなら24時間キャッシュを利用
return processed_data
Prefectでのパイプライン実装
完全なパイプライン定義
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import mlflow
import pandas as pd
from feast import FeatureStore
@task(retries=3, retry_delay_seconds=60)
def fetch_data(config: dict) -> pd.DataFrame:
"""データソースからデータを取得"""
logger = get_run_logger()
store = FeatureStore(repo_path=config["feast_repo"])
entity_df = pd.read_sql(config["entity_query"], config["db_uri"])
logger.info(f"Fetched {len(entity_df)} entities")
training_df = store.get_historical_features(
entity_df=entity_df,
features=config["features"],
).to_df()
logger.info(f"Training data shape: {training_df.shape}")
return training_df
@task
def validate_data(df: pd.DataFrame, rules: dict) -> pd.DataFrame:
"""データ品質チェック"""
logger = get_run_logger()
issues = []
# Nullチェック
null_rates = df.isnull().mean()
high_null = null_rates[null_rates > rules.get("max_null_rate", 0.05)]
if len(high_null) > 0:
issues.append(f"High null rate: {high_null.to_dict()}")
# 行数チェック
if len(df) < rules.get("min_rows", 100):
issues.append(f"Too few rows: {len(df)}")
if issues:
error_msg = "Data validation failed: " + "; ".join(issues)
logger.error(error_msg)
raise ValueError(error_msg)
logger.info("Data validation passed")
return df
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=12))
def preprocess(df: pd.DataFrame, config: dict) -> dict:
"""データ前処理と分割"""
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
feature_cols = config["feature_columns"]
target_col = config["target_column"]
X = df[feature_cols].fillna(0)
y = df[target_col]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
return {
"X_train": X_train_scaled, "X_test": X_test_scaled,
"y_train": y_train, "y_test": y_test,
"scaler": scaler, "feature_cols": feature_cols,
}
@task
def train(data: dict, params: dict):
"""モデル学習"""
from sklearn.ensemble import GradientBoostingClassifier
model = GradientBoostingClassifier(**params, random_state=42)
model.fit(data["X_train"], data["y_train"])
return model
@task
def evaluate(model, data: dict) -> dict:
"""モデル評価"""
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
y_pred = model.predict(data["X_test"])
y_prob = model.predict_proba(data["X_test"])[:, 1]
return {
"accuracy": accuracy_score(data["y_test"], y_pred),
"f1_score": f1_score(data["y_test"], y_pred),
"auc_roc": roc_auc_score(data["y_test"], y_prob),
}
@task
def register(model, metrics: dict, threshold: float, model_name: str):
"""モデル登録(閾値チェック付き)"""
logger = get_run_logger()
if metrics["f1_score"] < threshold:
logger.warning(
f"F1 {metrics['f1_score']:.4f} < threshold {threshold}. Skipping registration."
)
return None
with mlflow.start_run():
mlflow.log_metrics(metrics)
result = mlflow.sklearn.log_model(
model, "model",
registered_model_name=model_name
)
logger.info(f"Model registered: {model_name}")
return result
@flow(name="churn-prediction-training")
def training_pipeline(config: dict):
"""離脱予測モデル学習パイプライン"""
raw_data = fetch_data(config)
validated_data = validate_data(raw_data, config["validation_rules"])
processed = preprocess(validated_data, config)
model = train(processed, config["model_params"])
metrics = evaluate(model, processed)
register(model, metrics, config["threshold"], config["model_name"])
パイプラインの実行
config = {
"feast_repo": "feature_repo",
"entity_query": "SELECT customer_id, event_timestamp, churn FROM training_labels",
"db_uri": "postgresql://...",
"features": [
"customer_profile:age",
"customer_profile:tenure_days",
"customer_order_stats:order_count_30d",
"customer_order_stats:avg_order_amount",
],
"feature_columns": ["age", "tenure_days", "order_count_30d", "avg_order_amount"],
"target_column": "churn",
"validation_rules": {"max_null_rate": 0.05, "min_rows": 1000},
"model_params": {"n_estimators": 100, "max_depth": 5, "learning_rate": 0.1},
"threshold": 0.75,
"model_name": "churn_predictor",
}
training_pipeline(config)
パラメータ管理
パイプラインのパラメータは設定ファイルで外部管理します。
# config/training_config.yaml
model:
name: churn_predictor
type: GradientBoosting
params:
n_estimators: 100
max_depth: 5
learning_rate: 0.1
data:
features:
- customer_profile:age
- customer_profile:tenure_days
- customer_order_stats:order_count_30d
- customer_order_stats:avg_order_amount
validation:
max_null_rate: 0.05
min_rows: 1000
deployment:
threshold: 0.75
auto_promote: false
まとめ
| 項目 | ポイント |
|---|---|
| DAG | タスクの依存関係を有向非巡回グラフで表現 |
| ステップ設計 | 単一責任、冪等性、キャッシュ活用 |
| エラーハンドリング | リトライ、バリデーション停止、閾値スキップ |
| パラメータ管理 | 設定ファイルで外部化 |
チェックリスト
- DAGの概念と利点を説明できる
- パイプラインのステップ分割の原則を理解している
- Prefectで学習パイプラインを定義できる
- パラメータの外部管理ができる
次のステップへ
学習パイプラインの設計を学びました。次は、CI/CD for MLを構築しましょう。
推定読了時間:30分