LESSON

自動再学習パイプライン

「モニタリングでドリフトを検知できるようになった。だが、検知したら人間が手動で再学習する?」

田中VPoEが首を振る。

「それでは24時間365日の運用に耐えられない。再学習を自動化し、モデルの品質を保ち続ける仕組みが必要だ。これがMLOpsの中核だ。」

再学習パイプラインの全体像

データドリフト検知 / 定期スケジュール

[トリガー判定] → 再学習が必要か?
        ↓ Yes
[学習データ準備] → 最新データの取得・前処理

[モデル学習] → 新モデルの構築

[性能検証] → 旧モデルとの比較
        ↓ 改善あり
[シャドーデプロイ] → 本番トラフィックで並行評価
        ↓ 問題なし
[本番デプロイ] → チャンピオン交代

[モニタリング] → デプロイ後24時間の監視

再学習トリガーの設計

トリガー種別条件優先度
定期スケジュール週次(毎週月曜AM2
通常
精度劣化MAPE > 12% が3日連続
データドリフトPSI > 0.25 の特徴量が2つ以上
緊急MAPE > 20%最高
手動オペレーターの判断任意
class RetrainingTrigger:
    """再学習トリガーの管理"""

    def __init__(self, config):
        self.config = config
        self.last_retrain = None

    def evaluate(self, monitoring_state):
        """再学習が必要か判定"""
        triggers = []

        # 定期スケジュール
        if self._is_scheduled():
            triggers.append({
                'type': 'scheduled',
                'priority': 'normal',
            })

        # 精度劣化
        recent_mape = monitoring_state.get('recent_mape', [])
        if len(recent_mape) >= 3 and all(m > 12.0 for m in recent_mape[-3:]):
            triggers.append({
                'type': 'accuracy_degradation',
                'priority': 'high',
                'detail': f"3日連続MAPE超過: {recent_mape[-3:]}",
            })

        # 緊急
        if recent_mape and recent_mape[-1] > 20.0:
            triggers.append({
                'type': 'emergency',
                'priority': 'critical',
                'detail': f"MAPE {recent_mape[-1]}% (閾値20%)",
            })

        # データドリフト
        drift_count = monitoring_state.get('drift_feature_count', 0)
        if drift_count >= 2:
            triggers.append({
                'type': 'data_drift',
                'priority': 'high',
                'detail': f"{drift_count}特徴量でドリフト検知",
            })

        return triggers

    def _is_scheduled(self):
        """定期スケジュールに該当するか"""
        from datetime import datetime, timedelta
        now = datetime.now()
        if self.last_retrain is None:
            return True
        return (now - self.last_retrain) > timedelta(days=7)

チャンピオン/チャレンジャー方式

class ChampionChallengerPipeline:
    """チャンピオン/チャレンジャー方式での再学習"""

    def __init__(self, champion_model, validation_data):
        self.champion = champion_model
        self.X_val, self.y_val = validation_data
        self.history = []

    def train_challenger(self, X_train, y_train, params=None):
        """チャレンジャーモデルを学習"""
        import lightgbm as lgb

        default_params = {
            'objective': 'regression',
            'metric': 'mape',
            'n_estimators': 1000,
            'learning_rate': 0.05,
            'early_stopping_rounds': 50,
        }
        if params:
            default_params.update(params)

        challenger = lgb.LGBMRegressor(**default_params)
        challenger.fit(
            X_train, y_train,
            eval_set=[(self.X_val, self.y_val)],
        )
        return challenger

    def compare(self, challenger, min_improvement=0.02):
        """チャンピオンとチャレンジャーを比較"""
        from sklearn.metrics import mean_absolute_percentage_error

        champ_pred = self.champion.predict(self.X_val)
        chall_pred = challenger.predict(self.X_val)

        champ_mape = mean_absolute_percentage_error(self.y_val, champ_pred)
        chall_mape = mean_absolute_percentage_error(self.y_val, chall_pred)

        improvement = (champ_mape - chall_mape) / champ_mape

        result = {
            'champion_mape': round(champ_mape * 100, 2),
            'challenger_mape': round(chall_mape * 100, 2),
            'improvement': round(improvement * 100, 2),
            'promote': improvement >= min_improvement,
        }

        self.history.append(result)
        return result

    def promote_challenger(self, challenger):
        """チャレンジャーをチャンピオンに昇格"""
        self.champion = challenger
        return True

シャドーデプロイ

class ShadowDeployment:
    """シャドーデプロイで新モデルを検証"""

    def __init__(self, champion, challenger, duration_hours=24):
        self.champion = champion
        self.challenger = challenger
        self.duration = duration_hours
        self.predictions = []

    def predict(self, features):
        """両モデルで予測し、チャンピオンの結果を返す"""
        champ_pred = self.champion.predict(features)
        chall_pred = self.challenger.predict(features)

        self.predictions.append({
            'timestamp': datetime.now().isoformat(),
            'champion_pred': champ_pred.tolist(),
            'challenger_pred': chall_pred.tolist(),
        })

        # 本番にはチャンピオンの結果を返す
        return champ_pred

    def evaluate_shadow(self, actuals):
        """シャドー期間の結果を評価"""
        champ_preds = np.concatenate(
            [p['champion_pred'] for p in self.predictions]
        )
        chall_preds = np.concatenate(
            [p['challenger_pred'] for p in self.predictions]
        )

        champ_mape = np.mean(np.abs(actuals - champ_preds) / (actuals + 1)) * 100
        chall_mape = np.mean(np.abs(actuals - chall_preds) / (actuals + 1)) * 100

        return {
            'champion_mape': round(champ_mape, 2),
            'challenger_mape': round(chall_mape, 2),
            'safe_to_promote': chall_mape < champ_mape,
            'n_predictions': len(self.predictions),
        }

ロールバック戦略

class RollbackManager:
    """デプロイ後の自動ロールバック"""

    def __init__(self, rollback_threshold_mape=15.0, monitoring_hours=24):
        self.threshold = rollback_threshold_mape
        self.monitoring_hours = monitoring_hours
        self.model_versions = []

    def register_deployment(self, model, version_id, metrics):
        """デプロイを登録"""
        self.model_versions.append({
            'version_id': version_id,
            'model': model,
            'deployed_at': datetime.now(),
            'pre_deploy_metrics': metrics,
            'post_deploy_metrics': [],
        })

    def check_rollback(self, current_mape):
        """ロールバックが必要か判定"""
        current = self.model_versions[-1]
        hours_since_deploy = (
            datetime.now() - current['deployed_at']
        ).total_seconds() / 3600

        if hours_since_deploy <= self.monitoring_hours:
            current['post_deploy_metrics'].append(current_mape)

            if current_mape > self.threshold:
                return {
                    'action': 'rollback',
                    'reason': f"MAPE {current_mape}% > 閾値 {self.threshold}%",
                    'rollback_to': self.model_versions[-2]['version_id'],
                }

        return {'action': 'continue'}

まとめ

項目ポイント
トリガー定期 + 精度劣化 + ドリフト + 緊急の4種類
チャンピオン/チャレンジャー新モデルが2%以上改善時のみ昇格
シャドーデプロイ24時間の並行評価で安全にデプロイ
ロールバックデプロイ後24時間の監視で自動切り戻し

チェックリスト

  • 再学習トリガーの種類と優先度を設計できる
  • チャンピオン/チャレンジャー方式の流れを説明できる
  • シャドーデプロイの利点と実装方法を理解した
  • ロールバック戦略を設計できる
  • 再学習パイプラインの全体像を描ける

次のステップへ

自動再学習パイプラインを設計した。次はビジネスKPI連動評価を学ぼう。

推定読了時間: 30分