EXERCISE 60分

演習:モデルモニタリング基盤を構築しよう

田中VPoE「モニタリングの理論を学んだ。実際にNetShop社の離脱予測モデルのモニタリング基盤を構築しよう。ドリフト検出からアラート発報まで一気通貫で作ってほしい。」

あなた「参照データと本番データを比較して、閾値を超えたらアラートを出す仕組みですね。」

田中VPoE「そうだ。実際の運用を想定して、レポート生成まで含めて作ろう。」

ミッション概要

離脱予測モデルの本番運用を想定したモニタリング基盤を構築します。参照データと本番データの比較によるドリフト検出、アラートルールの実装、レポート生成を行います。

前提条件

  • Step 5のレッスン(モニタリング基礎、ドリフト検出、再学習戦略)を修了していること
  • Python 3.10+、pandas、scipy がインストール済みであること

Mission 1: ドリフト検出エンジンの実装(20分)

PSIとKS検定を使った汎用的なドリフト検出エンジンを実装してください。

要件

  1. 数値特徴量のPSIとKS検定を計算
  2. カテゴリ特徴量のカイ二乗検定を計算
  3. 全特徴量のドリフトレポートをDataFrameで出力
解答例
import numpy as np
import pandas as pd
from scipy import stats
from scipy.stats import chi2_contingency

class DriftDetector:
    def __init__(self, reference_data: pd.DataFrame):
        self.reference = reference_data

    def _psi(self, ref: np.ndarray, cur: np.ndarray, bins: int = 10) -> float:
        breakpoints = np.percentile(ref, np.linspace(0, 100, bins + 1))
        breakpoints[0], breakpoints[-1] = -np.inf, np.inf
        ref_pct = (np.histogram(ref, breakpoints)[0] + 1) / (len(ref) + bins)
        cur_pct = (np.histogram(cur, breakpoints)[0] + 1) / (len(cur) + bins)
        return float(np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct)))

    def _ks_test(self, ref: np.ndarray, cur: np.ndarray) -> dict:
        stat, p = stats.ks_2samp(ref, cur)
        return {"statistic": stat, "p_value": p}

    def _chi2_test(self, ref: pd.Series, cur: pd.Series) -> dict:
        cats = set(ref.unique()) | set(cur.unique())
        table = pd.DataFrame({
            "ref": ref.value_counts().reindex(cats, fill_value=0),
            "cur": cur.value_counts().reindex(cats, fill_value=0),
        })
        chi2, p, _, _ = chi2_contingency(table.T)
        return {"chi2": chi2, "p_value": p}

    def detect(self, current_data: pd.DataFrame) -> pd.DataFrame:
        results = []
        num_cols = self.reference.select_dtypes(include=[np.number]).columns
        cat_cols = self.reference.select_dtypes(include=["object", "category"]).columns

        for col in num_cols:
            if col not in current_data.columns:
                continue
            psi = self._psi(self.reference[col].dropna().values,
                           current_data[col].dropna().values)
            ks = self._ks_test(self.reference[col].dropna().values,
                              current_data[col].dropna().values)
            results.append({
                "feature": col, "type": "numerical",
                "psi": round(psi, 4),
                "ks_p_value": round(ks["p_value"], 4),
                "drift": "YES" if psi > 0.25 else "NO",
            })

        for col in cat_cols:
            if col not in current_data.columns:
                continue
            chi2 = self._chi2_test(self.reference[col], current_data[col])
            results.append({
                "feature": col, "type": "categorical",
                "psi": None, "ks_p_value": None,
                "chi2_p_value": round(chi2["p_value"], 4),
                "drift": "YES" if chi2["p_value"] < 0.05 else "NO",
            })

        return pd.DataFrame(results)

Mission 2: アラートシステムの実装(20分)

ドリフト検出結果に基づいてアラートを発報するシステムを実装してください。

要件

  1. Warning / Critical の2段階アラート
  2. 連続N回の閾値超過でアラート発報(ノイズ抑制)
  3. アラート履歴の記録
解答例
from datetime import datetime
from collections import defaultdict

class AlertManager:
    RULES = {
        "drift_psi": {"warning": 0.1, "critical": 0.25, "consecutive": 3},
        "accuracy": {"warning": 0.82, "critical": 0.78, "consecutive": 2},
    }

    def __init__(self):
        self.consecutive_counts = defaultdict(int)
        self.alert_history = []

    def check(self, metric_name: str, value: float, timestamp: datetime = None):
        timestamp = timestamp or datetime.now()
        rule = self.RULES.get(metric_name)
        if not rule:
            return None

        # ドリフト系: 値が大きいほど悪い
        if metric_name == "drift_psi":
            if value >= rule["critical"]:
                self.consecutive_counts[metric_name] += 1
                severity = "CRITICAL"
            elif value >= rule["warning"]:
                self.consecutive_counts[metric_name] += 1
                severity = "WARNING"
            else:
                self.consecutive_counts[metric_name] = 0
                return None
        # 精度系: 値が小さいほど悪い
        else:
            if value <= rule["critical"]:
                self.consecutive_counts[metric_name] += 1
                severity = "CRITICAL"
            elif value <= rule["warning"]:
                self.consecutive_counts[metric_name] += 1
                severity = "WARNING"
            else:
                self.consecutive_counts[metric_name] = 0
                return None

        if self.consecutive_counts[metric_name] >= rule["consecutive"]:
            alert = {
                "timestamp": timestamp,
                "metric": metric_name,
                "value": value,
                "severity": severity,
                "consecutive": self.consecutive_counts[metric_name],
            }
            self.alert_history.append(alert)
            return alert

        return None

Mission 3: モニタリングダッシュボード用レポート生成(20分)

ドリフト検出結果とアラート情報を統合したレポートを生成してください。

要件

  1. 特徴量ごとのドリフトスコアサマリー
  2. アラート履歴の一覧
  3. 再学習推奨判定
解答例
def generate_report(drift_results: pd.DataFrame, alert_manager: AlertManager) -> str:
    report_lines = []
    report_lines.append("=" * 60)
    report_lines.append("MODEL MONITORING REPORT")
    report_lines.append(f"Generated: {datetime.now().isoformat()}")
    report_lines.append("=" * 60)

    # ドリフトサマリー
    report_lines.append("\n## Drift Summary")
    drifted = drift_results[drift_results["drift"] == "YES"]
    total = len(drift_results)
    report_lines.append(f"Features checked: {total}")
    report_lines.append(f"Drift detected: {len(drifted)}/{total}")

    if len(drifted) > 0:
        report_lines.append("\nDrifted features:")
        for _, row in drifted.iterrows():
            report_lines.append(f"  - {row['feature']} (PSI: {row.get('psi', 'N/A')})")

    # アラートサマリー
    report_lines.append("\n## Alert History")
    for alert in alert_manager.alert_history[-10:]:
        report_lines.append(
            f"  [{alert['severity']}] {alert['metric']} = {alert['value']:.4f} "
            f"({alert['consecutive']} consecutive)"
        )

    # 再学習推奨
    report_lines.append("\n## Recommendation")
    critical_alerts = [a for a in alert_manager.alert_history if a["severity"] == "CRITICAL"]
    drift_ratio = len(drifted) / total if total > 0 else 0

    if critical_alerts or drift_ratio > 0.3:
        report_lines.append("RECOMMENDATION: Immediate retraining required")
    elif drift_ratio > 0.1:
        report_lines.append("RECOMMENDATION: Schedule retraining within this week")
    else:
        report_lines.append("RECOMMENDATION: No action needed, continue monitoring")

    return "\n".join(report_lines)

達成度チェック

  • PSI・KS検定・カイ二乗検定を使ったドリフト検出エンジンを実装できた
  • 段階的アラートシステムをノイズ抑制付きで実装できた
  • ドリフト結果とアラートを統合したモニタリングレポートを生成できた
  • 再学習推奨の自動判定ロジックを実装できた

推定所要時間:60分