演習:MLパイプラインを構築しよう
田中VPoE「学習パイプライン、CI/CD、デプロイ戦略を一通り学んだ。ここで実際にNetShop社の離脱予測モデル用のMLパイプラインを一気通貫で構築してみよう。」
あなた「学習からデプロイまでの全フローを自動化するんですね。品質ゲートも組み込みます。」
田中VPoE「そうだ。手動作業を極力排除して、信頼性の高いパイプラインを作ってほしい。」
ミッション概要
NetShop社の離脱予測モデルについて、データバリデーション→学習→評価→品質ゲート→デプロイの一気通貫パイプラインを構築します。
前提条件
- Step 4の各レッスン(学習パイプライン、CI/CD for ML、デプロイメントパイプライン、ML CI/CD実装)を修了していること
- Python 3.10+、MLflow、Docker がインストール済みであること
Mission 1: 学習パイプラインの構築(30分)
以下の要件を満たす学習パイプラインをPythonで実装してください。
要件
- データ読み込み → 前処理 → 学習 → 評価 の各ステップを関数として定義
- MLflowで実験パラメータとメトリクスを記録
- 設定ファイル(YAML)でハイパーパラメータを管理
テンプレート
# pipeline/train_pipeline.py
import yaml
import mlflow
import pandas as pd
from sklearn.model_selection import train_test_split
def load_config(config_path: str) -> dict:
"""設定ファイルを読み込む"""
# TODO: 実装
def load_data(data_path: str) -> pd.DataFrame:
"""データを読み込みバリデーションする"""
# TODO: 実装
def preprocess(df: pd.DataFrame) -> tuple:
"""前処理を行いtrain/testに分割する"""
# TODO: 実装
def train(X_train, y_train, params: dict):
"""モデルを学習する"""
# TODO: 実装
def evaluate(model, X_test, y_test) -> dict:
"""モデルを評価しメトリクスを返す"""
# TODO: 実装
def run_pipeline(config_path: str):
"""パイプライン全体を実行する"""
# TODO: 実装
# config/training_config.yml
data:
path: "data/churn_dataset.parquet"
test_size: 0.2
random_state: 42
model:
type: "gradient_boosting"
params:
n_estimators: 200
max_depth: 5
learning_rate: 0.1
quality_gates:
auc_roc: 0.80
f1_score: 0.70
解答例
# pipeline/train_pipeline.py
import yaml
import mlflow
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, f1_score, classification_report
def load_config(config_path: str) -> dict:
with open(config_path) as f:
return yaml.safe_load(f)
def load_data(data_path: str) -> pd.DataFrame:
df = pd.read_parquet(data_path)
assert len(df) > 0, "データが空です"
assert "churn" in df.columns, "ラベル列 'churn' が存在しません"
print(f"Loaded {len(df)} records")
return df
def preprocess(df: pd.DataFrame, config: dict) -> tuple:
feature_cols = [c for c in df.columns if c not in ["churn", "customer_id"]]
X = df[feature_cols]
y = df["churn"]
return train_test_split(
X, y,
test_size=config["data"]["test_size"],
random_state=config["data"]["random_state"]
)
def train(X_train, y_train, params: dict):
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
return model
def evaluate(model, X_test, y_test) -> dict:
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"auc_roc": roc_auc_score(y_test, y_prob),
"f1_score": f1_score(y_test, y_pred),
}
print(classification_report(y_test, y_pred))
return metrics
def run_pipeline(config_path: str):
config = load_config(config_path)
with mlflow.start_run():
# パラメータ記録
mlflow.log_params(config["model"]["params"])
mlflow.log_param("model_type", config["model"]["type"])
# パイプライン実行
df = load_data(config["data"]["path"])
X_train, X_test, y_train, y_test = preprocess(df, config)
model = train(X_train, y_train, config["model"]["params"])
metrics = evaluate(model, X_test, y_test)
# メトリクス記録
mlflow.log_metrics(metrics)
# 品質ゲート
gates = config["quality_gates"]
passed = all(
metrics[k] >= v for k, v in gates.items() if k in metrics
)
if passed:
mlflow.sklearn.log_model(model, "model")
print("Quality gates PASSED - model logged")
else:
print("Quality gates FAILED - model NOT logged")
for k, v in gates.items():
if k in metrics and metrics[k] < v:
print(f" FAIL: {k} = {metrics[k]:.4f} < {v}")
return metrics, passed
if __name__ == "__main__":
run_pipeline("config/training_config.yml")
Mission 2: GitHub Actions ワークフローの作成(30分)
学習パイプラインをGitHub Actionsで自動実行するワークフローを作成してください。
要件
- PRマージ時とスケジュール(週次)の2つのトリガー
- データバリデーション → 学習 → 品質ゲート → ステージングデプロイのジョブ構成
- 品質ゲート不合格時はデプロイをブロック
解答例
# .github/workflows/ml-pipeline.yml
name: ML Training Pipeline
on:
push:
branches: [main]
paths:
- "src/**"
- "pipeline/**"
- "config/**"
schedule:
- cron: "0 2 * * 1"
workflow_dispatch:
jobs:
validate-data:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.10"
- run: pip install -r requirements.txt
- name: Validate training data
run: python pipeline/validate_data.py
train-and-evaluate:
needs: validate-data
runs-on: ubuntu-latest
outputs:
quality_gate: ${{ steps.train.outputs.quality_gate }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.10"
- run: pip install -r requirements.txt
- name: Train and evaluate
id: train
run: |
result=$(python pipeline/train_pipeline.py)
echo "quality_gate=$result" >> $GITHUB_OUTPUT
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
deploy-staging:
needs: train-and-evaluate
if: needs.train-and-evaluate.outputs.quality_gate == 'passed'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: python pipeline/deploy.py --stage staging
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
- name: Run integration tests
run: pytest tests/integration/ -v
notify:
needs: [train-and-evaluate, deploy-staging]
if: always()
runs-on: ubuntu-latest
steps:
- name: Notify Slack
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": "ML Pipeline: ${{ needs.deploy-staging.result }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
Mission 3: ロールバック機構の実装(30分)
デプロイ後に問題が検知された場合のロールバック機構を実装してください。
要件
- Model Registryのバージョン管理を活用したロールバック
- ロールバック条件の自動判定
- ロールバック実行のログ記録
解答例
# pipeline/rollback.py
import mlflow
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
ROLLBACK_CONDITIONS = {
"auc_roc_min": 0.75,
"error_rate_max": 0.05,
"latency_p99_max_ms": 200,
}
def check_rollback_needed(current_metrics: dict) -> tuple[bool, list[str]]:
"""ロールバックが必要か判定する"""
reasons = []
if current_metrics.get("auc_roc", 1.0) < ROLLBACK_CONDITIONS["auc_roc_min"]:
reasons.append(
f"AUC-ROC {current_metrics['auc_roc']:.4f} < "
f"{ROLLBACK_CONDITIONS['auc_roc_min']}"
)
if current_metrics.get("error_rate", 0) > ROLLBACK_CONDITIONS["error_rate_max"]:
reasons.append(
f"Error rate {current_metrics['error_rate']:.4f} > "
f"{ROLLBACK_CONDITIONS['error_rate_max']}"
)
if current_metrics.get("latency_p99_ms", 0) > ROLLBACK_CONDITIONS["latency_p99_max_ms"]:
reasons.append(
f"Latency {current_metrics['latency_p99_ms']}ms > "
f"{ROLLBACK_CONDITIONS['latency_p99_max_ms']}ms"
)
return len(reasons) > 0, reasons
def execute_rollback(model_name: str, reasons: list[str]):
"""ロールバックを実行する"""
client = mlflow.tracking.MlflowClient()
# 現在のProductionバージョン取得
prod = client.get_latest_versions(model_name, stages=["Production"])
archived = client.get_latest_versions(model_name, stages=["Archived"])
if not prod or not archived:
logger.error("Rollback failed: missing versions")
return False
current_v = prod[0].version
rollback_v = archived[0].version
# ステージ変更
client.transition_model_version_stage(
name=model_name, version=current_v, stage="Archived"
)
client.transition_model_version_stage(
name=model_name, version=rollback_v, stage="Production"
)
# ログ記録
logger.info(f"ROLLBACK: {model_name} v{current_v} -> v{rollback_v}")
logger.info(f"Reasons: {'; '.join(reasons)}")
logger.info(f"Timestamp: {datetime.now().isoformat()}")
return True
if __name__ == "__main__":
# モニタリングから取得したメトリクス(例)
metrics = {"auc_roc": 0.72, "error_rate": 0.02, "latency_p99_ms": 150}
needed, reasons = check_rollback_needed(metrics)
if needed:
execute_rollback("churn_model", reasons)
達成度チェック
- 設定ファイル駆動の学習パイプラインを構築できた
- MLflowでパラメータ・メトリクスを自動記録できた
- 品質ゲートによるデプロイ判定を実装できた
- GitHub Actionsワークフローを設計できた
- ロールバック機構を実装できた
推定所要時間:90分