LESSON

学習パイプライン設計

田中VPoE「実験管理と特徴量ストアが整った。次はいよいよ学習パイプラインの自動化だ。今まで手動でやっていたデータ取得→前処理→学習→評価→登録のフローを、DAG(有向非巡回グラフ)として定義する。」

あなた「DAGですか。AirflowやPrefectで使われる概念ですよね。」

田中VPoE「そうだ。各ステップの依存関係を明確にして、変更があった部分だけ再実行できるようにする。これがMLパイプラインの設計の要だ。」

DAGとは

DAG(Directed Acyclic Graph:有向非巡回グラフ)は、タスク間の依存関係を表現するグラフ構造です。

データ取得 → データバリデーション → 前処理 → 学習 → 評価 → 登録

                              特徴量ストア更新

DAGの利点

利点説明
依存関係の明確化どのタスクがどのタスクに依存するか一目瞭然
並列実行依存のないタスクは並列実行可能
差分実行変更があったステップだけ再実行
エラーハンドリング失敗したステップからリトライ
可視化パイプライン全体をグラフで確認

パイプラインのステップ設計

基本ステップ

ステップ入力出力失敗時の対応
1. データ取得データソース設定生データリトライ(3回)
2. データバリデーション生データバリデーション結果パイプライン停止、アラート
3. データ前処理生データ前処理済みデータパイプライン停止
4. データ分割前処理済みデータtrain/val/testパイプライン停止
5. モデル学習学習データ + パラメータモデルリトライ
6. モデル評価モデル + テストデータメトリクスパイプライン停止
7. モデル登録評価済みモデルレジストリエントリスキップ(閾値未達)

ステップ分割の原則

1. 単一責任の原則

各ステップは1つの責務のみを持ちます。

# Bad: 1つのステップに詰め込みすぎ
@task
def train_and_evaluate_and_register(data):
    model = train(data)
    metrics = evaluate(model)
    register(model)

# Good: 責務を分離
@task
def train_model(train_data, params):
    return model

@task
def evaluate_model(model, test_data):
    return metrics

@task
def register_model(model, metrics):
    return model_version

2. 冪等性の保証

同じ入力で何度実行しても同じ結果になるようにします。

@task
def preprocess(raw_data, config):
    # 冪等: 同じraw_dataとconfigなら常に同じ結果
    processed = apply_transformations(raw_data, config)
    return processed

3. キャッシュの活用

変更がないステップはキャッシュから結果を返します。

from prefect import task

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def preprocess(raw_data):
    # raw_dataが同じなら24時間キャッシュを利用
    return processed_data

Prefectでのパイプライン実装

完全なパイプライン定義

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import mlflow
import pandas as pd
from feast import FeatureStore

@task(retries=3, retry_delay_seconds=60)
def fetch_data(config: dict) -> pd.DataFrame:
    """データソースからデータを取得"""
    logger = get_run_logger()
    store = FeatureStore(repo_path=config["feast_repo"])

    entity_df = pd.read_sql(config["entity_query"], config["db_uri"])
    logger.info(f"Fetched {len(entity_df)} entities")

    training_df = store.get_historical_features(
        entity_df=entity_df,
        features=config["features"],
    ).to_df()

    logger.info(f"Training data shape: {training_df.shape}")
    return training_df

@task
def validate_data(df: pd.DataFrame, rules: dict) -> pd.DataFrame:
    """データ品質チェック"""
    logger = get_run_logger()
    issues = []

    # Nullチェック
    null_rates = df.isnull().mean()
    high_null = null_rates[null_rates > rules.get("max_null_rate", 0.05)]
    if len(high_null) > 0:
        issues.append(f"High null rate: {high_null.to_dict()}")

    # 行数チェック
    if len(df) < rules.get("min_rows", 100):
        issues.append(f"Too few rows: {len(df)}")

    if issues:
        error_msg = "Data validation failed: " + "; ".join(issues)
        logger.error(error_msg)
        raise ValueError(error_msg)

    logger.info("Data validation passed")
    return df

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=12))
def preprocess(df: pd.DataFrame, config: dict) -> dict:
    """データ前処理と分割"""
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler

    feature_cols = config["feature_columns"]
    target_col = config["target_column"]

    X = df[feature_cols].fillna(0)
    y = df[target_col]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    return {
        "X_train": X_train_scaled, "X_test": X_test_scaled,
        "y_train": y_train, "y_test": y_test,
        "scaler": scaler, "feature_cols": feature_cols,
    }

@task
def train(data: dict, params: dict):
    """モデル学習"""
    from sklearn.ensemble import GradientBoostingClassifier
    model = GradientBoostingClassifier(**params, random_state=42)
    model.fit(data["X_train"], data["y_train"])
    return model

@task
def evaluate(model, data: dict) -> dict:
    """モデル評価"""
    from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
    y_pred = model.predict(data["X_test"])
    y_prob = model.predict_proba(data["X_test"])[:, 1]

    return {
        "accuracy": accuracy_score(data["y_test"], y_pred),
        "f1_score": f1_score(data["y_test"], y_pred),
        "auc_roc": roc_auc_score(data["y_test"], y_prob),
    }

@task
def register(model, metrics: dict, threshold: float, model_name: str):
    """モデル登録(閾値チェック付き)"""
    logger = get_run_logger()

    if metrics["f1_score"] < threshold:
        logger.warning(
            f"F1 {metrics['f1_score']:.4f} < threshold {threshold}. Skipping registration."
        )
        return None

    with mlflow.start_run():
        mlflow.log_metrics(metrics)
        result = mlflow.sklearn.log_model(
            model, "model",
            registered_model_name=model_name
        )
    logger.info(f"Model registered: {model_name}")
    return result

@flow(name="churn-prediction-training")
def training_pipeline(config: dict):
    """離脱予測モデル学習パイプライン"""
    raw_data = fetch_data(config)
    validated_data = validate_data(raw_data, config["validation_rules"])
    processed = preprocess(validated_data, config)
    model = train(processed, config["model_params"])
    metrics = evaluate(model, processed)
    register(model, metrics, config["threshold"], config["model_name"])

パイプラインの実行

config = {
    "feast_repo": "feature_repo",
    "entity_query": "SELECT customer_id, event_timestamp, churn FROM training_labels",
    "db_uri": "postgresql://...",
    "features": [
        "customer_profile:age",
        "customer_profile:tenure_days",
        "customer_order_stats:order_count_30d",
        "customer_order_stats:avg_order_amount",
    ],
    "feature_columns": ["age", "tenure_days", "order_count_30d", "avg_order_amount"],
    "target_column": "churn",
    "validation_rules": {"max_null_rate": 0.05, "min_rows": 1000},
    "model_params": {"n_estimators": 100, "max_depth": 5, "learning_rate": 0.1},
    "threshold": 0.75,
    "model_name": "churn_predictor",
}

training_pipeline(config)

パラメータ管理

パイプラインのパラメータは設定ファイルで外部管理します。

# config/training_config.yaml
model:
  name: churn_predictor
  type: GradientBoosting
  params:
    n_estimators: 100
    max_depth: 5
    learning_rate: 0.1

data:
  features:
    - customer_profile:age
    - customer_profile:tenure_days
    - customer_order_stats:order_count_30d
    - customer_order_stats:avg_order_amount
  validation:
    max_null_rate: 0.05
    min_rows: 1000

deployment:
  threshold: 0.75
  auto_promote: false

まとめ

項目ポイント
DAGタスクの依存関係を有向非巡回グラフで表現
ステップ設計単一責任、冪等性、キャッシュ活用
エラーハンドリングリトライ、バリデーション停止、閾値スキップ
パラメータ管理設定ファイルで外部化

チェックリスト

  • DAGの概念と利点を説明できる
  • パイプラインのステップ分割の原則を理解している
  • Prefectで学習パイプラインを定義できる
  • パラメータの外部管理ができる

次のステップへ

学習パイプラインの設計を学びました。次は、CI/CD for MLを構築しましょう。


推定読了時間:30分