LESSON

フィードバックループ

「一度作ったモデルをそのまま使い続けるのは危険だ。不正の手口は日々進化する。」

田中VPoEが強調する。

「調査チームの判断結果をモデルに反映し、継続的に進化させる仕組みが必要だ。これがフィードバックループだ。」

フィードバックループの全体像

取引発生 → スコアリング → アラート → 調査 → 判定結果
     ↑                                         ↓
     └───── モデル再学習 ← ラベル確定 ←────────┘

サイクル:
  短期(日次): 閾値の微調整、ルールの更新
  中期(週次): モデルの増分学習
  長期(月次): モデルの全面再学習

調査結果の収集

ラベリングパイプライン

class LabelingPipeline:
    """調査結果をラベルとして収集するパイプライン"""

    def __init__(self):
        self.pending_labels = []
        self.confirmed_labels = []

    def submit_investigation_result(self, alert_id, analyst_id, decision):
        """調査結果を登録"""
        valid_decisions = [
            'confirmed_fraud',     # 不正確定
            'false_positive',      # 偽陽性(正常だった)
            'suspicious',          # 疑わしいが確定できない
            'needs_more_info',     # 追加情報が必要
        ]

        if decision not in valid_decisions:
            raise ValueError(f"Invalid decision: {decision}")

        label_entry = {
            'alert_id': alert_id,
            'analyst_id': analyst_id,
            'decision': decision,
            'timestamp': datetime.now().isoformat(),
            'review_duration': calculate_review_time(alert_id),
        }

        if decision in ('confirmed_fraud', 'false_positive'):
            self.confirmed_labels.append(label_entry)
        else:
            self.pending_labels.append(label_entry)

        return label_entry

    def get_training_data(self, since=None):
        """再学習用のラベル付きデータを取得"""
        labels = self.confirmed_labels
        if since:
            labels = [l for l in labels
                      if l['timestamp'] > since]

        return [
            {
                'alert_id': l['alert_id'],
                'label': 1 if l['decision'] == 'confirmed_fraud' else 0,
                'confidence': 1.0,  # 調査員の確認済み
            }
            for l in labels
        ]

チャージバックからのラベル取得

class ChargebackLabelCollector:
    """チャージバック情報からラベルを自動生成"""

    def process_chargeback(self, chargeback):
        """チャージバック情報を処理してラベルを生成"""
        transaction_id = chargeback['transaction_id']
        reason_code = chargeback['reason_code']

        # チャージバック理由コードからラベルを推定
        fraud_reason_codes = ['10.4', '10.5', '13.1', '13.2', '13.3']

        if reason_code in fraud_reason_codes:
            label = 1  # 不正
            confidence = 0.9  # チャージバック理由に基づく推定
        else:
            label = 0  # フレンドリー詐欺の可能性
            confidence = 0.6

        return {
            'transaction_id': transaction_id,
            'label': label,
            'confidence': confidence,
            'source': 'chargeback',
            'reason_code': reason_code,
            'delay_days': (datetime.now() - chargeback['transaction_date']).days,
        }

モデルの再学習戦略

増分学習(Online Learning)

class IncrementalModelUpdater:
    """モデルの増分学習"""

    def __init__(self, base_model, update_interval_hours=24):
        self.model = base_model
        self.update_interval = update_interval_hours
        self.new_samples = []
        self.last_update = datetime.now()

    def add_labeled_sample(self, features, label, confidence=1.0):
        """新しいラベル付きサンプルを追加"""
        self.new_samples.append({
            'features': features,
            'label': label,
            'confidence': confidence,
            'timestamp': datetime.now(),
        })

    def should_update(self):
        """再学習が必要か判定"""
        hours_since_update = (
            datetime.now() - self.last_update
        ).total_seconds() / 3600

        return (
            hours_since_update >= self.update_interval
            and len(self.new_samples) >= 50  # 最低50件の新サンプル
        )

    def update_model(self):
        """増分学習を実行"""
        if not self.should_update():
            return None

        # 新しいサンプルで追加学習
        X_new = np.array([s['features'] for s in self.new_samples])
        y_new = np.array([s['label'] for s in self.new_samples])
        weights = np.array([s['confidence'] for s in self.new_samples])

        # LightGBMの場合: 既存モデルを初期値として再学習
        updated_model = lgb.LGBMClassifier(
            is_unbalance=True,
            n_estimators=50,  # 追加するツリー数
            init_model=self.model,
        )
        updated_model.fit(
            X_new, y_new,
            sample_weight=weights,
        )

        # 検証
        if self.validate_model(updated_model):
            self.model = updated_model
            self.new_samples = []
            self.last_update = datetime.now()
            return 'updated'
        else:
            return 'rejected'  # 性能が下がった場合は拒否

    def validate_model(self, new_model, min_pr_auc=0.70):
        """新モデルの性能を検証"""
        X_val, y_val = get_validation_set()
        y_prob = new_model.predict_proba(X_val)[:, 1]
        pr_auc = average_precision_score(y_val, y_prob)

        return pr_auc >= min_pr_auc

モデルバージョン管理

class ModelVersionManager:
    """モデルのバージョン管理とロールバック"""

    def __init__(self):
        self.versions = []
        self.active_version = None

    def register_model(self, model, metrics, metadata=None):
        """新バージョンのモデルを登録"""
        version = {
            'version_id': f"v{len(self.versions) + 1}",
            'model': model,
            'metrics': metrics,
            'metadata': metadata or {},
            'created_at': datetime.now().isoformat(),
            'status': 'staged',
        }
        self.versions.append(version)
        return version['version_id']

    def promote(self, version_id):
        """モデルを本番環境にプロモート"""
        for v in self.versions:
            if v['version_id'] == version_id:
                v['status'] = 'active'
                if self.active_version:
                    self.active_version['status'] = 'archived'
                self.active_version = v
                return True
        return False

    def rollback(self):
        """前のバージョンにロールバック"""
        archived = [v for v in self.versions if v['status'] == 'archived']
        if archived:
            previous = archived[-1]
            self.promote(previous['version_id'])
            return previous['version_id']
        return None

性能劣化の検知

class ModelPerformanceMonitor:
    """モデル性能のリアルタイム監視"""

    def __init__(self, baseline_metrics):
        self.baseline = baseline_metrics
        self.daily_metrics = []

    def record_daily_metrics(self, metrics):
        """日次メトリクスを記録"""
        self.daily_metrics.append({
            'date': datetime.now().date().isoformat(),
            **metrics,
        })

    def detect_degradation(self, window_days=7):
        """性能劣化を検知"""
        if len(self.daily_metrics) < window_days:
            return None

        recent = self.daily_metrics[-window_days:]
        avg_pr_auc = np.mean([m['pr_auc'] for m in recent])
        avg_recall = np.mean([m['recall'] for m in recent])

        alerts = []
        if avg_pr_auc < self.baseline['pr_auc'] * 0.9:
            alerts.append({
                'metric': 'PR-AUC',
                'current': avg_pr_auc,
                'baseline': self.baseline['pr_auc'],
                'degradation': f"{(1 - avg_pr_auc/self.baseline['pr_auc'])*100:.1f}%",
            })

        if avg_recall < self.baseline['recall'] * 0.85:
            alerts.append({
                'metric': 'Recall',
                'current': avg_recall,
                'baseline': self.baseline['recall'],
                'degradation': f"{(1 - avg_recall/self.baseline['recall'])*100:.1f}%",
            })

        return alerts if alerts else None

まとめ

項目ポイント
ラベル収集調査結果 + チャージバック情報の2系統
増分学習日次で新サンプルを追加学習、性能検証後にデプロイ
バージョン管理各バージョンの性能追跡、ロールバック機能
劣化検知日次メトリクス監視、ベースライン比10%低下でアラート

チェックリスト

  • フィードバックループの全体像を説明できる
  • 調査結果とチャージバックからのラベル収集方法を理解した
  • 増分学習と全面再学習の使い分けを判断できる
  • モデルの性能劣化検知と対応方法を設計できる

次のステップへ

フィードバックループを設計したところで、次は敵対的攻撃への対策を学ぼう。

推定読了時間: 30分