自動再学習パイプライン
「モニタリングでドリフトを検知できるようになった。だが、検知したら人間が手動で再学習する?」
田中VPoEが首を振る。
「それでは24時間365日の運用に耐えられない。再学習を自動化し、モデルの品質を保ち続ける仕組みが必要だ。これがMLOpsの中核だ。」
再学習パイプラインの全体像
データドリフト検知 / 定期スケジュール
↓
[トリガー判定] → 再学習が必要か?
↓ Yes
[学習データ準備] → 最新データの取得・前処理
↓
[モデル学習] → 新モデルの構築
↓
[性能検証] → 旧モデルとの比較
↓ 改善あり
[シャドーデプロイ] → 本番トラフィックで並行評価
↓ 問題なし
[本番デプロイ] → チャンピオン交代
↓
[モニタリング] → デプロイ後24時間の監視
再学習トリガーの設計
| トリガー種別 | 条件 | 優先度 |
|---|---|---|
| 定期スケジュール | 週次(毎週月曜AM2) | 通常 |
| 精度劣化 | MAPE > 12% が3日連続 | 高 |
| データドリフト | PSI > 0.25 の特徴量が2つ以上 | 高 |
| 緊急 | MAPE > 20% | 最高 |
| 手動 | オペレーターの判断 | 任意 |
class RetrainingTrigger:
"""再学習トリガーの管理"""
def __init__(self, config):
self.config = config
self.last_retrain = None
def evaluate(self, monitoring_state):
"""再学習が必要か判定"""
triggers = []
# 定期スケジュール
if self._is_scheduled():
triggers.append({
'type': 'scheduled',
'priority': 'normal',
})
# 精度劣化
recent_mape = monitoring_state.get('recent_mape', [])
if len(recent_mape) >= 3 and all(m > 12.0 for m in recent_mape[-3:]):
triggers.append({
'type': 'accuracy_degradation',
'priority': 'high',
'detail': f"3日連続MAPE超過: {recent_mape[-3:]}",
})
# 緊急
if recent_mape and recent_mape[-1] > 20.0:
triggers.append({
'type': 'emergency',
'priority': 'critical',
'detail': f"MAPE {recent_mape[-1]}% (閾値20%)",
})
# データドリフト
drift_count = monitoring_state.get('drift_feature_count', 0)
if drift_count >= 2:
triggers.append({
'type': 'data_drift',
'priority': 'high',
'detail': f"{drift_count}特徴量でドリフト検知",
})
return triggers
def _is_scheduled(self):
"""定期スケジュールに該当するか"""
from datetime import datetime, timedelta
now = datetime.now()
if self.last_retrain is None:
return True
return (now - self.last_retrain) > timedelta(days=7)
チャンピオン/チャレンジャー方式
class ChampionChallengerPipeline:
"""チャンピオン/チャレンジャー方式での再学習"""
def __init__(self, champion_model, validation_data):
self.champion = champion_model
self.X_val, self.y_val = validation_data
self.history = []
def train_challenger(self, X_train, y_train, params=None):
"""チャレンジャーモデルを学習"""
import lightgbm as lgb
default_params = {
'objective': 'regression',
'metric': 'mape',
'n_estimators': 1000,
'learning_rate': 0.05,
'early_stopping_rounds': 50,
}
if params:
default_params.update(params)
challenger = lgb.LGBMRegressor(**default_params)
challenger.fit(
X_train, y_train,
eval_set=[(self.X_val, self.y_val)],
)
return challenger
def compare(self, challenger, min_improvement=0.02):
"""チャンピオンとチャレンジャーを比較"""
from sklearn.metrics import mean_absolute_percentage_error
champ_pred = self.champion.predict(self.X_val)
chall_pred = challenger.predict(self.X_val)
champ_mape = mean_absolute_percentage_error(self.y_val, champ_pred)
chall_mape = mean_absolute_percentage_error(self.y_val, chall_pred)
improvement = (champ_mape - chall_mape) / champ_mape
result = {
'champion_mape': round(champ_mape * 100, 2),
'challenger_mape': round(chall_mape * 100, 2),
'improvement': round(improvement * 100, 2),
'promote': improvement >= min_improvement,
}
self.history.append(result)
return result
def promote_challenger(self, challenger):
"""チャレンジャーをチャンピオンに昇格"""
self.champion = challenger
return True
シャドーデプロイ
class ShadowDeployment:
"""シャドーデプロイで新モデルを検証"""
def __init__(self, champion, challenger, duration_hours=24):
self.champion = champion
self.challenger = challenger
self.duration = duration_hours
self.predictions = []
def predict(self, features):
"""両モデルで予測し、チャンピオンの結果を返す"""
champ_pred = self.champion.predict(features)
chall_pred = self.challenger.predict(features)
self.predictions.append({
'timestamp': datetime.now().isoformat(),
'champion_pred': champ_pred.tolist(),
'challenger_pred': chall_pred.tolist(),
})
# 本番にはチャンピオンの結果を返す
return champ_pred
def evaluate_shadow(self, actuals):
"""シャドー期間の結果を評価"""
champ_preds = np.concatenate(
[p['champion_pred'] for p in self.predictions]
)
chall_preds = np.concatenate(
[p['challenger_pred'] for p in self.predictions]
)
champ_mape = np.mean(np.abs(actuals - champ_preds) / (actuals + 1)) * 100
chall_mape = np.mean(np.abs(actuals - chall_preds) / (actuals + 1)) * 100
return {
'champion_mape': round(champ_mape, 2),
'challenger_mape': round(chall_mape, 2),
'safe_to_promote': chall_mape < champ_mape,
'n_predictions': len(self.predictions),
}
ロールバック戦略
class RollbackManager:
"""デプロイ後の自動ロールバック"""
def __init__(self, rollback_threshold_mape=15.0, monitoring_hours=24):
self.threshold = rollback_threshold_mape
self.monitoring_hours = monitoring_hours
self.model_versions = []
def register_deployment(self, model, version_id, metrics):
"""デプロイを登録"""
self.model_versions.append({
'version_id': version_id,
'model': model,
'deployed_at': datetime.now(),
'pre_deploy_metrics': metrics,
'post_deploy_metrics': [],
})
def check_rollback(self, current_mape):
"""ロールバックが必要か判定"""
current = self.model_versions[-1]
hours_since_deploy = (
datetime.now() - current['deployed_at']
).total_seconds() / 3600
if hours_since_deploy <= self.monitoring_hours:
current['post_deploy_metrics'].append(current_mape)
if current_mape > self.threshold:
return {
'action': 'rollback',
'reason': f"MAPE {current_mape}% > 閾値 {self.threshold}%",
'rollback_to': self.model_versions[-2]['version_id'],
}
return {'action': 'continue'}
まとめ
| 項目 | ポイント |
|---|---|
| トリガー | 定期 + 精度劣化 + ドリフト + 緊急の4種類 |
| チャンピオン/チャレンジャー | 新モデルが2%以上改善時のみ昇格 |
| シャドーデプロイ | 24時間の並行評価で安全にデプロイ |
| ロールバック | デプロイ後24時間の監視で自動切り戻し |
チェックリスト
- 再学習トリガーの種類と優先度を設計できる
- チャンピオン/チャレンジャー方式の流れを説明できる
- シャドーデプロイの利点と実装方法を理解した
- ロールバック戦略を設計できる
- 再学習パイプラインの全体像を描ける
次のステップへ
自動再学習パイプラインを設計した。次はビジネスKPI連動評価を学ぼう。
推定読了時間: 30分