予測精度モニタリング
「モデルをデプロイして終わり、ではない。むしろここからが本番だ。」
田中VPoEがモニタリングダッシュボードを開く。
「需要予測モデルは時間とともに劣化する。季節が変わり、トレンドが変わり、消費者の行動が変わる。精度の劣化を早期に検知し、対応する仕組みが必要だ。」
なぜモニタリングが必要か
デプロイ直後: MAPE 8% → 高精度で予測
↓ 1ヶ月後: MAPE 10% → やや劣化
↓ 3ヶ月後: MAPE 15% → 明らかな劣化
↓ 6ヶ月後: MAPE 25% → 使い物にならない
原因:
- データドリフト: 入力データの分布が変化
- コンセプトドリフト: 入力と出力の関係が変化
- 季節変動: 学習データにない季節パターン
- 外部要因の変化: 競合出店、価格改定、災害
モニタリング指標の設計
| レベル | 指標 | 閾値 | チェック頻度 |
|---|---|---|---|
| モデル精度 | MAPE | < 12% | 日次 |
| モデル精度 | RMSLE | < 0.55 | 日次 |
| データ品質 | 欠損率 | < 5% | リアルタイム |
| データドリフト | PSI | < 0.1 | 週次 |
| 予測分布 | 予測値の標準偏差 | 前週比 ±30%以内 | 日次 |
データドリフト検知
PSI(Population Stability Index)
import numpy as np
def calculate_psi(expected, actual, bins=10):
"""PSI(分布安定性指標)を計算"""
# ヒストグラムで分布を離散化
breakpoints = np.linspace(
min(expected.min(), actual.min()),
max(expected.max(), actual.max()),
bins + 1
)
expected_counts = np.histogram(expected, bins=breakpoints)[0]
actual_counts = np.histogram(actual, bins=breakpoints)[0]
# 0除算を避けるために微小値を加算
expected_pct = (expected_counts + 1) / (len(expected) + bins)
actual_pct = (actual_counts + 1) / (len(actual) + bins)
psi = np.sum(
(actual_pct - expected_pct) * np.log(actual_pct / expected_pct)
)
return psi
# 判定基準
# PSI < 0.1: 安定(対応不要)
# 0.1 <= PSI < 0.25: 軽度のドリフト(監視強化)
# PSI >= 0.25: 重大なドリフト(再学習が必要)
特徴量ごとのドリフト検知
class FeatureDriftMonitor:
"""特徴量ごとのドリフトを監視"""
def __init__(self, reference_data):
self.reference = reference_data
self.feature_stats = self._compute_stats(reference_data)
def _compute_stats(self, data):
"""基準統計量を計算"""
stats = {}
for col in data.columns:
stats[col] = {
'mean': data[col].mean(),
'std': data[col].std(),
'q25': data[col].quantile(0.25),
'q75': data[col].quantile(0.75),
}
return stats
def check_drift(self, current_data):
"""ドリフトを検知"""
alerts = []
for col in current_data.columns:
if col not in self.feature_stats:
continue
ref = self.feature_stats[col]
curr_mean = current_data[col].mean()
# 平均値の変動をチェック(2σ超でアラート)
z_score = abs(curr_mean - ref['mean']) / (ref['std'] + 1e-8)
if z_score > 2.0:
alerts.append({
'feature': col,
'type': 'mean_shift',
'z_score': round(z_score, 2),
'reference_mean': round(ref['mean'], 4),
'current_mean': round(curr_mean, 4),
})
# PSI計算
psi = calculate_psi(
self.reference[col].values,
current_data[col].values
)
if psi >= 0.1:
alerts.append({
'feature': col,
'type': 'distribution_shift',
'psi': round(psi, 4),
})
return alerts
精度劣化アラートシステム
class PredictionAccuracyMonitor:
"""予測精度のリアルタイム監視"""
def __init__(self, target_mape=12.0, alert_threshold=15.0):
self.target_mape = target_mape
self.alert_threshold = alert_threshold
self.daily_records = []
def record_daily(self, date, actual, predicted):
"""日次の実績と予測を記録"""
mape = np.mean(np.abs(actual - predicted) / (actual + 1)) * 100
rmse = np.sqrt(np.mean((actual - predicted) ** 2))
record = {
'date': date,
'mape': round(mape, 2),
'rmse': round(rmse, 2),
'n_samples': len(actual),
'over_prediction_rate': np.mean(predicted > actual) * 100,
'under_prediction_rate': np.mean(predicted < actual) * 100,
}
self.daily_records.append(record)
return self._check_alerts(record)
def _check_alerts(self, record):
"""アラート条件をチェック"""
alerts = []
# 即時アラート: 閾値超過
if record['mape'] > self.alert_threshold:
alerts.append({
'level': 'CRITICAL',
'message': f"MAPE {record['mape']}% が閾値 {self.alert_threshold}% を超過",
})
# 傾向アラート: 3日連続で目標超過
if len(self.daily_records) >= 3:
recent_3 = self.daily_records[-3:]
if all(r['mape'] > self.target_mape for r in recent_3):
alerts.append({
'level': 'WARNING',
'message': f"3日連続でMAPEが目標 {self.target_mape}% を超過",
})
return alerts
モニタリングダッシュボードの設計
def generate_monitoring_report(daily_records, drift_alerts):
"""日次モニタリングレポートを生成"""
report = {
'date': daily_records[-1]['date'],
'accuracy': {
'today_mape': daily_records[-1]['mape'],
'week_avg_mape': np.mean([r['mape'] for r in daily_records[-7:]]),
'trend': 'improving' if daily_records[-1]['mape'] < daily_records[-7]['mape'] else 'degrading',
},
'data_quality': {
'drift_alerts': len(drift_alerts),
'critical_features': [a['feature'] for a in drift_alerts if a.get('psi', 0) >= 0.25],
},
'recommendation': get_recommendation(daily_records, drift_alerts),
}
return report
def get_recommendation(records, drift_alerts):
"""対応アクションを推奨"""
recent_mape = np.mean([r['mape'] for r in records[-7:]])
if recent_mape > 20:
return "緊急再学習が必要。ベースラインモデルへの切り替えも検討"
elif recent_mape > 15:
return "増分学習の実施を推奨。ドリフト特徴量の確認"
elif len(drift_alerts) > 3:
return "複数特徴量でドリフト検知。特徴量エンジニアリングの見直し"
else:
return "正常稼働中。定期モニタリングを継続"
まとめ
| 項目 | ポイント |
|---|---|
| ドリフト検知 | PSIで分布の安定性を定量化(0.1未満が安定) |
| 精度監視 | MAPE/RMSLEを日次で記録し閾値超過でアラート |
| アラート設計 | 即時(閾値超過)と傾向(連続劣化)の2種類 |
| ダッシュボード | 精度 + データ品質 + 推奨アクションの3層構成 |
チェックリスト
- データドリフトとコンセプトドリフトの違いを説明できる
- PSIの計算方法と判定基準を理解した
- 特徴量ごとのドリフト検知を実装できる
- 精度劣化アラートの設計ができる
- モニタリングダッシュボードの構成要素を説明できる
次のステップへ
モニタリングの仕組みを理解した。次は自動再学習パイプラインを設計しよう。
推定読了時間: 30分