LESSON 30分

ドリフト検出

田中VPoE「モニタリングの全体像を掴んだところで、ML固有の最重要課題であるドリフト検出について深掘りしよう。」

あなた「データドリフトとコンセプトドリフトの違いは理解していますが、実際にどう検出するんですか?統計的な手法が必要ですよね。」

田中VPoE「その通り。統計的検定やダイバージェンス指標を使って定量的にドリフトを測定する方法を学ぼう。」

ドリフトの種類と検出

データドリフト vs コンセプトドリフト

種類定義検出方法対応策
データドリフトP(X)の変化入力特徴量の分布比較特徴量パイプラインの確認、再学習
コンセプトドリフトP(Y|X)の変化予測精度の低下検知モデルの再学習が必須
予測ドリフトP(Y_pred)の変化予測値の分布変化原因調査(データ or コンセプト)

統計的ドリフト検出手法

1. PSI(Population Stability Index)

数値特徴量の分布変化を測定する最も一般的な指標です。

import numpy as np

def calculate_psi(reference: np.ndarray, current: np.ndarray, bins: int = 10) -> float:
    """PSI(Population Stability Index)を計算する"""
    # ビニング
    breakpoints = np.percentile(reference, np.linspace(0, 100, bins + 1))
    breakpoints[0] = -np.inf
    breakpoints[-1] = np.inf

    ref_counts = np.histogram(reference, bins=breakpoints)[0]
    cur_counts = np.histogram(current, bins=breakpoints)[0]

    # 0除算防止
    ref_pct = (ref_counts + 1) / (len(reference) + bins)
    cur_pct = (cur_counts + 1) / (len(current) + bins)

    psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
    return psi
PSI値判定対応
< 0.1安定モニタリング継続
0.1 - 0.25中程度の変化調査開始
> 0.25大きな変化再学習を検討

2. KS検定(Kolmogorov-Smirnov検定)

from scipy import stats

def ks_drift_test(reference: np.ndarray, current: np.ndarray, alpha: float = 0.05) -> dict:
    """KS検定によるドリフト検出"""
    statistic, p_value = stats.ks_2samp(reference, current)
    return {
        "statistic": statistic,
        "p_value": p_value,
        "drift_detected": p_value < alpha,
    }

3. カテゴリ特徴量のドリフト検出

from scipy.stats import chi2_contingency

def chi2_drift_test(reference: pd.Series, current: pd.Series, alpha: float = 0.05) -> dict:
    """カイ二乗検定によるカテゴリ変数のドリフト検出"""
    # 頻度テーブル作成
    categories = set(reference.unique()) | set(current.unique())
    ref_counts = reference.value_counts().reindex(categories, fill_value=0)
    cur_counts = current.value_counts().reindex(categories, fill_value=0)

    contingency = pd.DataFrame({
        "reference": ref_counts,
        "current": cur_counts,
    })

    chi2, p_value, dof, _ = chi2_contingency(contingency.T)
    return {
        "chi2": chi2,
        "p_value": p_value,
        "drift_detected": p_value < alpha,
    }

多次元ドリフト検出

個別の特徴量だけでなく、特徴量間の相関変化も検出する必要があります。

def multivariate_drift_report(
    reference: pd.DataFrame,
    current: pd.DataFrame,
    numerical_cols: list[str],
    categorical_cols: list[str]
) -> pd.DataFrame:
    """全特徴量のドリフトレポートを生成する"""
    results = []

    for col in numerical_cols:
        psi = calculate_psi(reference[col].values, current[col].values)
        ks = ks_drift_test(reference[col].values, current[col].values)
        results.append({
            "feature": col,
            "type": "numerical",
            "psi": round(psi, 4),
            "ks_statistic": round(ks["statistic"], 4),
            "ks_p_value": round(ks["p_value"], 4),
            "drift": "YES" if psi > 0.25 or ks["drift_detected"] else "NO",
        })

    for col in categorical_cols:
        chi2 = chi2_drift_test(reference[col], current[col])
        results.append({
            "feature": col,
            "type": "categorical",
            "psi": None,
            "ks_statistic": None,
            "ks_p_value": None,
            "chi2_p_value": round(chi2["p_value"], 4),
            "drift": "YES" if chi2["drift_detected"] else "NO",
        })

    return pd.DataFrame(results)

ウィンドウベースの継続的モニタリング

本番環境では、時間窓を設定して継続的にドリフトを検出します。

from datetime import datetime, timedelta

class DriftMonitor:
    """継続的ドリフトモニタリング"""

    def __init__(self, reference_data: pd.DataFrame, window_size_days: int = 7):
        self.reference = reference_data
        self.window_size = timedelta(days=window_size_days)
        self.drift_history = []

    def check(self, current_data: pd.DataFrame, timestamp: datetime) -> dict:
        """ドリフトチェックを実行する"""
        numerical_cols = current_data.select_dtypes(include=[np.number]).columns
        drift_scores = {}

        for col in numerical_cols:
            psi = calculate_psi(
                self.reference[col].values,
                current_data[col].values
            )
            drift_scores[col] = psi

        max_drift = max(drift_scores.values()) if drift_scores else 0
        result = {
            "timestamp": timestamp,
            "scores": drift_scores,
            "max_drift": max_drift,
            "status": "CRITICAL" if max_drift > 0.25
                      else "WARNING" if max_drift > 0.1
                      else "OK",
        }

        self.drift_history.append(result)
        return result

まとめ

項目ポイント
ドリフトの種類データドリフト(P(X))、コンセプトドリフト(P(Y|X))、予測ドリフト
数値特徴量PSI、KS検定で分布変化を定量評価
カテゴリ特徴量カイ二乗検定で分布変化を検出
継続的監視ウィンドウベースで時系列的にドリフトを追跡

チェックリスト

  • データドリフトとコンセプトドリフトの違いを説明できる
  • PSIの計算方法と閾値を理解している
  • KS検定とカイ二乗検定の使い分けを説明できる
  • ウィンドウベースの継続的モニタリングの概念を理解している

次のステップへ

ドリフト検出の手法を理解しました。次は、ドリフトを検知した後の再学習戦略について学びましょう。


推定読了時間:30分