LESSON

予測精度モニタリング

「モデルをデプロイして終わり、ではない。むしろここからが本番だ。」

田中VPoEがモニタリングダッシュボードを開く。

「需要予測モデルは時間とともに劣化する。季節が変わり、トレンドが変わり、消費者の行動が変わる。精度の劣化を早期に検知し、対応する仕組みが必要だ。」

なぜモニタリングが必要か

デプロイ直後: MAPE 8% → 高精度で予測
  ↓ 1ヶ月後: MAPE 10% → やや劣化
  ↓ 3ヶ月後: MAPE 15% → 明らかな劣化
  ↓ 6ヶ月後: MAPE 25% → 使い物にならない

原因:
- データドリフト: 入力データの分布が変化
- コンセプトドリフト: 入力と出力の関係が変化
- 季節変動: 学習データにない季節パターン
- 外部要因の変化: 競合出店、価格改定、災害

モニタリング指標の設計

レベル指標閾値チェック頻度
モデル精度MAPE< 12%日次
モデル精度RMSLE< 0.55日次
データ品質欠損率< 5%リアルタイム
データドリフトPSI< 0.1週次
予測分布予測値の標準偏差前週比 ±30%以内日次

データドリフト検知

PSI(Population Stability Index)

import numpy as np

def calculate_psi(expected, actual, bins=10):
    """PSI(分布安定性指標)を計算"""
    # ヒストグラムで分布を離散化
    breakpoints = np.linspace(
        min(expected.min(), actual.min()),
        max(expected.max(), actual.max()),
        bins + 1
    )

    expected_counts = np.histogram(expected, bins=breakpoints)[0]
    actual_counts = np.histogram(actual, bins=breakpoints)[0]

    # 0除算を避けるために微小値を加算
    expected_pct = (expected_counts + 1) / (len(expected) + bins)
    actual_pct = (actual_counts + 1) / (len(actual) + bins)

    psi = np.sum(
        (actual_pct - expected_pct) * np.log(actual_pct / expected_pct)
    )

    return psi

# 判定基準
# PSI < 0.1: 安定(対応不要)
# 0.1 <= PSI < 0.25: 軽度のドリフト(監視強化)
# PSI >= 0.25: 重大なドリフト(再学習が必要)

特徴量ごとのドリフト検知

class FeatureDriftMonitor:
    """特徴量ごとのドリフトを監視"""

    def __init__(self, reference_data):
        self.reference = reference_data
        self.feature_stats = self._compute_stats(reference_data)

    def _compute_stats(self, data):
        """基準統計量を計算"""
        stats = {}
        for col in data.columns:
            stats[col] = {
                'mean': data[col].mean(),
                'std': data[col].std(),
                'q25': data[col].quantile(0.25),
                'q75': data[col].quantile(0.75),
            }
        return stats

    def check_drift(self, current_data):
        """ドリフトを検知"""
        alerts = []
        for col in current_data.columns:
            if col not in self.feature_stats:
                continue

            ref = self.feature_stats[col]
            curr_mean = current_data[col].mean()

            # 平均値の変動をチェック(2σ超でアラート)
            z_score = abs(curr_mean - ref['mean']) / (ref['std'] + 1e-8)
            if z_score > 2.0:
                alerts.append({
                    'feature': col,
                    'type': 'mean_shift',
                    'z_score': round(z_score, 2),
                    'reference_mean': round(ref['mean'], 4),
                    'current_mean': round(curr_mean, 4),
                })

            # PSI計算
            psi = calculate_psi(
                self.reference[col].values,
                current_data[col].values
            )
            if psi >= 0.1:
                alerts.append({
                    'feature': col,
                    'type': 'distribution_shift',
                    'psi': round(psi, 4),
                })

        return alerts

精度劣化アラートシステム

class PredictionAccuracyMonitor:
    """予測精度のリアルタイム監視"""

    def __init__(self, target_mape=12.0, alert_threshold=15.0):
        self.target_mape = target_mape
        self.alert_threshold = alert_threshold
        self.daily_records = []

    def record_daily(self, date, actual, predicted):
        """日次の実績と予測を記録"""
        mape = np.mean(np.abs(actual - predicted) / (actual + 1)) * 100
        rmse = np.sqrt(np.mean((actual - predicted) ** 2))

        record = {
            'date': date,
            'mape': round(mape, 2),
            'rmse': round(rmse, 2),
            'n_samples': len(actual),
            'over_prediction_rate': np.mean(predicted > actual) * 100,
            'under_prediction_rate': np.mean(predicted < actual) * 100,
        }
        self.daily_records.append(record)
        return self._check_alerts(record)

    def _check_alerts(self, record):
        """アラート条件をチェック"""
        alerts = []

        # 即時アラート: 閾値超過
        if record['mape'] > self.alert_threshold:
            alerts.append({
                'level': 'CRITICAL',
                'message': f"MAPE {record['mape']}% が閾値 {self.alert_threshold}% を超過",
            })

        # 傾向アラート: 3日連続で目標超過
        if len(self.daily_records) >= 3:
            recent_3 = self.daily_records[-3:]
            if all(r['mape'] > self.target_mape for r in recent_3):
                alerts.append({
                    'level': 'WARNING',
                    'message': f"3日連続でMAPEが目標 {self.target_mape}% を超過",
                })

        return alerts

モニタリングダッシュボードの設計

def generate_monitoring_report(daily_records, drift_alerts):
    """日次モニタリングレポートを生成"""
    report = {
        'date': daily_records[-1]['date'],
        'accuracy': {
            'today_mape': daily_records[-1]['mape'],
            'week_avg_mape': np.mean([r['mape'] for r in daily_records[-7:]]),
            'trend': 'improving' if daily_records[-1]['mape'] < daily_records[-7]['mape'] else 'degrading',
        },
        'data_quality': {
            'drift_alerts': len(drift_alerts),
            'critical_features': [a['feature'] for a in drift_alerts if a.get('psi', 0) >= 0.25],
        },
        'recommendation': get_recommendation(daily_records, drift_alerts),
    }
    return report

def get_recommendation(records, drift_alerts):
    """対応アクションを推奨"""
    recent_mape = np.mean([r['mape'] for r in records[-7:]])

    if recent_mape > 20:
        return "緊急再学習が必要。ベースラインモデルへの切り替えも検討"
    elif recent_mape > 15:
        return "増分学習の実施を推奨。ドリフト特徴量の確認"
    elif len(drift_alerts) > 3:
        return "複数特徴量でドリフト検知。特徴量エンジニアリングの見直し"
    else:
        return "正常稼働中。定期モニタリングを継続"

まとめ

項目ポイント
ドリフト検知PSIで分布の安定性を定量化(0.1未満が安定)
精度監視MAPE/RMSLEを日次で記録し閾値超過でアラート
アラート設計即時(閾値超過)と傾向(連続劣化)の2種類
ダッシュボード精度 + データ品質 + 推奨アクションの3層構成

チェックリスト

  • データドリフトとコンセプトドリフトの違いを説明できる
  • PSIの計算方法と判定基準を理解した
  • 特徴量ごとのドリフト検知を実装できる
  • 精度劣化アラートの設計ができる
  • モニタリングダッシュボードの構成要素を説明できる

次のステップへ

モニタリングの仕組みを理解した。次は自動再学習パイプラインを設計しよう。

推定読了時間: 30分