EXERCISE 90分

演習:MLパイプラインを構築しよう

田中VPoE「学習パイプライン、CI/CD、デプロイ戦略を一通り学んだ。ここで実際にNetShop社の離脱予測モデル用のMLパイプラインを一気通貫で構築してみよう。」

あなた「学習からデプロイまでの全フローを自動化するんですね。品質ゲートも組み込みます。」

田中VPoE「そうだ。手動作業を極力排除して、信頼性の高いパイプラインを作ってほしい。」

ミッション概要

NetShop社の離脱予測モデルについて、データバリデーション→学習→評価→品質ゲート→デプロイの一気通貫パイプラインを構築します。

前提条件

  • Step 4の各レッスン(学習パイプライン、CI/CD for ML、デプロイメントパイプライン、ML CI/CD実装)を修了していること
  • Python 3.10+、MLflow、Docker がインストール済みであること

Mission 1: 学習パイプラインの構築(30分)

以下の要件を満たす学習パイプラインをPythonで実装してください。

要件

  1. データ読み込み → 前処理 → 学習 → 評価 の各ステップを関数として定義
  2. MLflowで実験パラメータとメトリクスを記録
  3. 設定ファイル(YAML)でハイパーパラメータを管理

テンプレート

# pipeline/train_pipeline.py
import yaml
import mlflow
import pandas as pd
from sklearn.model_selection import train_test_split

def load_config(config_path: str) -> dict:
    """設定ファイルを読み込む"""
    # TODO: 実装

def load_data(data_path: str) -> pd.DataFrame:
    """データを読み込みバリデーションする"""
    # TODO: 実装

def preprocess(df: pd.DataFrame) -> tuple:
    """前処理を行いtrain/testに分割する"""
    # TODO: 実装

def train(X_train, y_train, params: dict):
    """モデルを学習する"""
    # TODO: 実装

def evaluate(model, X_test, y_test) -> dict:
    """モデルを評価しメトリクスを返す"""
    # TODO: 実装

def run_pipeline(config_path: str):
    """パイプライン全体を実行する"""
    # TODO: 実装
# config/training_config.yml
data:
  path: "data/churn_dataset.parquet"
  test_size: 0.2
  random_state: 42

model:
  type: "gradient_boosting"
  params:
    n_estimators: 200
    max_depth: 5
    learning_rate: 0.1

quality_gates:
  auc_roc: 0.80
  f1_score: 0.70
解答例
# pipeline/train_pipeline.py
import yaml
import mlflow
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, f1_score, classification_report

def load_config(config_path: str) -> dict:
    with open(config_path) as f:
        return yaml.safe_load(f)

def load_data(data_path: str) -> pd.DataFrame:
    df = pd.read_parquet(data_path)
    assert len(df) > 0, "データが空です"
    assert "churn" in df.columns, "ラベル列 'churn' が存在しません"
    print(f"Loaded {len(df)} records")
    return df

def preprocess(df: pd.DataFrame, config: dict) -> tuple:
    feature_cols = [c for c in df.columns if c not in ["churn", "customer_id"]]
    X = df[feature_cols]
    y = df["churn"]
    return train_test_split(
        X, y,
        test_size=config["data"]["test_size"],
        random_state=config["data"]["random_state"]
    )

def train(X_train, y_train, params: dict):
    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)
    return model

def evaluate(model, X_test, y_test) -> dict:
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]
    metrics = {
        "auc_roc": roc_auc_score(y_test, y_prob),
        "f1_score": f1_score(y_test, y_pred),
    }
    print(classification_report(y_test, y_pred))
    return metrics

def run_pipeline(config_path: str):
    config = load_config(config_path)

    with mlflow.start_run():
        # パラメータ記録
        mlflow.log_params(config["model"]["params"])
        mlflow.log_param("model_type", config["model"]["type"])

        # パイプライン実行
        df = load_data(config["data"]["path"])
        X_train, X_test, y_train, y_test = preprocess(df, config)
        model = train(X_train, y_train, config["model"]["params"])
        metrics = evaluate(model, X_test, y_test)

        # メトリクス記録
        mlflow.log_metrics(metrics)

        # 品質ゲート
        gates = config["quality_gates"]
        passed = all(
            metrics[k] >= v for k, v in gates.items() if k in metrics
        )

        if passed:
            mlflow.sklearn.log_model(model, "model")
            print("Quality gates PASSED - model logged")
        else:
            print("Quality gates FAILED - model NOT logged")
            for k, v in gates.items():
                if k in metrics and metrics[k] < v:
                    print(f"  FAIL: {k} = {metrics[k]:.4f} < {v}")

        return metrics, passed

if __name__ == "__main__":
    run_pipeline("config/training_config.yml")

Mission 2: GitHub Actions ワークフローの作成(30分)

学習パイプラインをGitHub Actionsで自動実行するワークフローを作成してください。

要件

  1. PRマージ時とスケジュール(週次)の2つのトリガー
  2. データバリデーション → 学習 → 品質ゲート → ステージングデプロイのジョブ構成
  3. 品質ゲート不合格時はデプロイをブロック
解答例
# .github/workflows/ml-pipeline.yml
name: ML Training Pipeline

on:
  push:
    branches: [main]
    paths:
      - "src/**"
      - "pipeline/**"
      - "config/**"
  schedule:
    - cron: "0 2 * * 1"
  workflow_dispatch:

jobs:
  validate-data:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.10"
      - run: pip install -r requirements.txt
      - name: Validate training data
        run: python pipeline/validate_data.py

  train-and-evaluate:
    needs: validate-data
    runs-on: ubuntu-latest
    outputs:
      quality_gate: ${{ steps.train.outputs.quality_gate }}
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.10"
      - run: pip install -r requirements.txt
      - name: Train and evaluate
        id: train
        run: |
          result=$(python pipeline/train_pipeline.py)
          echo "quality_gate=$result" >> $GITHUB_OUTPUT
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}

  deploy-staging:
    needs: train-and-evaluate
    if: needs.train-and-evaluate.outputs.quality_gate == 'passed'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Deploy to staging
        run: python pipeline/deploy.py --stage staging
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
      - name: Run integration tests
        run: pytest tests/integration/ -v

  notify:
    needs: [train-and-evaluate, deploy-staging]
    if: always()
    runs-on: ubuntu-latest
    steps:
      - name: Notify Slack
        uses: slackapi/slack-github-action@v1
        with:
          payload: |
            {
              "text": "ML Pipeline: ${{ needs.deploy-staging.result }}"
            }
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

Mission 3: ロールバック機構の実装(30分)

デプロイ後に問題が検知された場合のロールバック機構を実装してください。

要件

  1. Model Registryのバージョン管理を活用したロールバック
  2. ロールバック条件の自動判定
  3. ロールバック実行のログ記録
解答例
# pipeline/rollback.py
import mlflow
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

ROLLBACK_CONDITIONS = {
    "auc_roc_min": 0.75,
    "error_rate_max": 0.05,
    "latency_p99_max_ms": 200,
}

def check_rollback_needed(current_metrics: dict) -> tuple[bool, list[str]]:
    """ロールバックが必要か判定する"""
    reasons = []

    if current_metrics.get("auc_roc", 1.0) < ROLLBACK_CONDITIONS["auc_roc_min"]:
        reasons.append(
            f"AUC-ROC {current_metrics['auc_roc']:.4f} < "
            f"{ROLLBACK_CONDITIONS['auc_roc_min']}"
        )

    if current_metrics.get("error_rate", 0) > ROLLBACK_CONDITIONS["error_rate_max"]:
        reasons.append(
            f"Error rate {current_metrics['error_rate']:.4f} > "
            f"{ROLLBACK_CONDITIONS['error_rate_max']}"
        )

    if current_metrics.get("latency_p99_ms", 0) > ROLLBACK_CONDITIONS["latency_p99_max_ms"]:
        reasons.append(
            f"Latency {current_metrics['latency_p99_ms']}ms > "
            f"{ROLLBACK_CONDITIONS['latency_p99_max_ms']}ms"
        )

    return len(reasons) > 0, reasons

def execute_rollback(model_name: str, reasons: list[str]):
    """ロールバックを実行する"""
    client = mlflow.tracking.MlflowClient()

    # 現在のProductionバージョン取得
    prod = client.get_latest_versions(model_name, stages=["Production"])
    archived = client.get_latest_versions(model_name, stages=["Archived"])

    if not prod or not archived:
        logger.error("Rollback failed: missing versions")
        return False

    current_v = prod[0].version
    rollback_v = archived[0].version

    # ステージ変更
    client.transition_model_version_stage(
        name=model_name, version=current_v, stage="Archived"
    )
    client.transition_model_version_stage(
        name=model_name, version=rollback_v, stage="Production"
    )

    # ログ記録
    logger.info(f"ROLLBACK: {model_name} v{current_v} -> v{rollback_v}")
    logger.info(f"Reasons: {'; '.join(reasons)}")
    logger.info(f"Timestamp: {datetime.now().isoformat()}")

    return True

if __name__ == "__main__":
    # モニタリングから取得したメトリクス(例)
    metrics = {"auc_roc": 0.72, "error_rate": 0.02, "latency_p99_ms": 150}
    needed, reasons = check_rollback_needed(metrics)
    if needed:
        execute_rollback("churn_model", reasons)

達成度チェック

  • 設定ファイル駆動の学習パイプラインを構築できた
  • MLflowでパラメータ・メトリクスを自動記録できた
  • 品質ゲートによるデプロイ判定を実装できた
  • GitHub Actionsワークフローを設計できた
  • ロールバック機構を実装できた

推定所要時間:90分