フィードバックループ
「一度作ったモデルをそのまま使い続けるのは危険だ。不正の手口は日々進化する。」
田中VPoEが強調する。
「調査チームの判断結果をモデルに反映し、継続的に進化させる仕組みが必要だ。これがフィードバックループだ。」
フィードバックループの全体像
取引発生 → スコアリング → アラート → 調査 → 判定結果
↑ ↓
└───── モデル再学習 ← ラベル確定 ←────────┘
サイクル:
短期(日次): 閾値の微調整、ルールの更新
中期(週次): モデルの増分学習
長期(月次): モデルの全面再学習
調査結果の収集
ラベリングパイプライン
class LabelingPipeline:
"""調査結果をラベルとして収集するパイプライン"""
def __init__(self):
self.pending_labels = []
self.confirmed_labels = []
def submit_investigation_result(self, alert_id, analyst_id, decision):
"""調査結果を登録"""
valid_decisions = [
'confirmed_fraud', # 不正確定
'false_positive', # 偽陽性(正常だった)
'suspicious', # 疑わしいが確定できない
'needs_more_info', # 追加情報が必要
]
if decision not in valid_decisions:
raise ValueError(f"Invalid decision: {decision}")
label_entry = {
'alert_id': alert_id,
'analyst_id': analyst_id,
'decision': decision,
'timestamp': datetime.now().isoformat(),
'review_duration': calculate_review_time(alert_id),
}
if decision in ('confirmed_fraud', 'false_positive'):
self.confirmed_labels.append(label_entry)
else:
self.pending_labels.append(label_entry)
return label_entry
def get_training_data(self, since=None):
"""再学習用のラベル付きデータを取得"""
labels = self.confirmed_labels
if since:
labels = [l for l in labels
if l['timestamp'] > since]
return [
{
'alert_id': l['alert_id'],
'label': 1 if l['decision'] == 'confirmed_fraud' else 0,
'confidence': 1.0, # 調査員の確認済み
}
for l in labels
]
チャージバックからのラベル取得
class ChargebackLabelCollector:
"""チャージバック情報からラベルを自動生成"""
def process_chargeback(self, chargeback):
"""チャージバック情報を処理してラベルを生成"""
transaction_id = chargeback['transaction_id']
reason_code = chargeback['reason_code']
# チャージバック理由コードからラベルを推定
fraud_reason_codes = ['10.4', '10.5', '13.1', '13.2', '13.3']
if reason_code in fraud_reason_codes:
label = 1 # 不正
confidence = 0.9 # チャージバック理由に基づく推定
else:
label = 0 # フレンドリー詐欺の可能性
confidence = 0.6
return {
'transaction_id': transaction_id,
'label': label,
'confidence': confidence,
'source': 'chargeback',
'reason_code': reason_code,
'delay_days': (datetime.now() - chargeback['transaction_date']).days,
}
モデルの再学習戦略
増分学習(Online Learning)
class IncrementalModelUpdater:
"""モデルの増分学習"""
def __init__(self, base_model, update_interval_hours=24):
self.model = base_model
self.update_interval = update_interval_hours
self.new_samples = []
self.last_update = datetime.now()
def add_labeled_sample(self, features, label, confidence=1.0):
"""新しいラベル付きサンプルを追加"""
self.new_samples.append({
'features': features,
'label': label,
'confidence': confidence,
'timestamp': datetime.now(),
})
def should_update(self):
"""再学習が必要か判定"""
hours_since_update = (
datetime.now() - self.last_update
).total_seconds() / 3600
return (
hours_since_update >= self.update_interval
and len(self.new_samples) >= 50 # 最低50件の新サンプル
)
def update_model(self):
"""増分学習を実行"""
if not self.should_update():
return None
# 新しいサンプルで追加学習
X_new = np.array([s['features'] for s in self.new_samples])
y_new = np.array([s['label'] for s in self.new_samples])
weights = np.array([s['confidence'] for s in self.new_samples])
# LightGBMの場合: 既存モデルを初期値として再学習
updated_model = lgb.LGBMClassifier(
is_unbalance=True,
n_estimators=50, # 追加するツリー数
init_model=self.model,
)
updated_model.fit(
X_new, y_new,
sample_weight=weights,
)
# 検証
if self.validate_model(updated_model):
self.model = updated_model
self.new_samples = []
self.last_update = datetime.now()
return 'updated'
else:
return 'rejected' # 性能が下がった場合は拒否
def validate_model(self, new_model, min_pr_auc=0.70):
"""新モデルの性能を検証"""
X_val, y_val = get_validation_set()
y_prob = new_model.predict_proba(X_val)[:, 1]
pr_auc = average_precision_score(y_val, y_prob)
return pr_auc >= min_pr_auc
モデルバージョン管理
class ModelVersionManager:
"""モデルのバージョン管理とロールバック"""
def __init__(self):
self.versions = []
self.active_version = None
def register_model(self, model, metrics, metadata=None):
"""新バージョンのモデルを登録"""
version = {
'version_id': f"v{len(self.versions) + 1}",
'model': model,
'metrics': metrics,
'metadata': metadata or {},
'created_at': datetime.now().isoformat(),
'status': 'staged',
}
self.versions.append(version)
return version['version_id']
def promote(self, version_id):
"""モデルを本番環境にプロモート"""
for v in self.versions:
if v['version_id'] == version_id:
v['status'] = 'active'
if self.active_version:
self.active_version['status'] = 'archived'
self.active_version = v
return True
return False
def rollback(self):
"""前のバージョンにロールバック"""
archived = [v for v in self.versions if v['status'] == 'archived']
if archived:
previous = archived[-1]
self.promote(previous['version_id'])
return previous['version_id']
return None
性能劣化の検知
class ModelPerformanceMonitor:
"""モデル性能のリアルタイム監視"""
def __init__(self, baseline_metrics):
self.baseline = baseline_metrics
self.daily_metrics = []
def record_daily_metrics(self, metrics):
"""日次メトリクスを記録"""
self.daily_metrics.append({
'date': datetime.now().date().isoformat(),
**metrics,
})
def detect_degradation(self, window_days=7):
"""性能劣化を検知"""
if len(self.daily_metrics) < window_days:
return None
recent = self.daily_metrics[-window_days:]
avg_pr_auc = np.mean([m['pr_auc'] for m in recent])
avg_recall = np.mean([m['recall'] for m in recent])
alerts = []
if avg_pr_auc < self.baseline['pr_auc'] * 0.9:
alerts.append({
'metric': 'PR-AUC',
'current': avg_pr_auc,
'baseline': self.baseline['pr_auc'],
'degradation': f"{(1 - avg_pr_auc/self.baseline['pr_auc'])*100:.1f}%",
})
if avg_recall < self.baseline['recall'] * 0.85:
alerts.append({
'metric': 'Recall',
'current': avg_recall,
'baseline': self.baseline['recall'],
'degradation': f"{(1 - avg_recall/self.baseline['recall'])*100:.1f}%",
})
return alerts if alerts else None
まとめ
| 項目 | ポイント |
|---|---|
| ラベル収集 | 調査結果 + チャージバック情報の2系統 |
| 増分学習 | 日次で新サンプルを追加学習、性能検証後にデプロイ |
| バージョン管理 | 各バージョンの性能追跡、ロールバック機能 |
| 劣化検知 | 日次メトリクス監視、ベースライン比10%低下でアラート |
チェックリスト
- フィードバックループの全体像を説明できる
- 調査結果とチャージバックからのラベル収集方法を理解した
- 増分学習と全面再学習の使い分けを判断できる
- モデルの性能劣化検知と対応方法を設計できる
次のステップへ
フィードバックループを設計したところで、次は敵対的攻撃への対策を学ぼう。
推定読了時間: 30分