リスク評価ロジックの設計
「モデルスコアだけでリスク判断をするのは危険だ。コンテキストを加味した総合判断が必要になる。」
田中VPoEが過去の不正事例ファイルを開く。
「この事例を見てくれ。モデルスコアは0.4で中リスクだったが、実際には組織的な不正グループの一端だった。スコアだけでは見えないリスクがある。」
スコアだけでは不十分な理由
モデルのスコアは過去のパターンに基づく確率的な判断である。しかし、実際の不正リスクはコンテキストによって大きく変動する。
コンテキストが判断を変える例
ケース1: スコア0.7 + 常連顧客 + いつもの購入パターン
→ 実質リスク: LOW(一時的な金額増加)
ケース2: スコア0.4 + 新規アカウント + 高額商品 + 深夜
→ 実質リスク: HIGH(新規アカウント詐欺の典型パターン)
ケース3: スコア0.3 + 過去30分で同一IPから5件の取引
→ 実質リスク: CRITICAL(ボット攻撃の可能性)
多次元リスク評価フレームワーク
リスクの4つの評価軸
from dataclasses import dataclass
@dataclass
class RiskContext:
"""多次元リスク評価の結果"""
# 各軸のスコア(0.0 - 1.0)
model_score: float # 軸1: モデルスコア
behavioral_score: float # 軸2: 行動パターンスコア
velocity_score: float # 軸3: 取引速度スコア
contextual_score: float # 軸4: コンテキストスコア
# 統合スコアとリスクレベル
risk_score: float
risk_level: str
explanation: str
top_risk_factor: str
class RiskAssessor:
"""多次元リスク評価エンジン"""
def __init__(self):
# 各軸の重み(合計1.0)
self.weights = {
'model': 0.40,
'behavioral': 0.25,
'velocity': 0.20,
'contextual': 0.15,
}
self.thresholds = RiskThresholds()
def evaluate(self, scores: dict, transaction: dict,
user_history: dict) -> RiskContext:
# 軸1: モデルスコア
model_score = scores['ensemble']
# 軸2: 行動パターン分析
behavioral_score = self._evaluate_behavior(
transaction, user_history
)
# 軸3: 取引速度分析
velocity_score = self._evaluate_velocity(
transaction, user_history
)
# 軸4: コンテキスト分析
contextual_score = self._evaluate_context(transaction)
# 統合スコア
risk_score = (
self.weights['model'] * model_score +
self.weights['behavioral'] * behavioral_score +
self.weights['velocity'] * velocity_score +
self.weights['contextual'] * contextual_score
)
# リスクレベル判定
risk_level = self.thresholds.classify(risk_score)
# 最大寄与要因の特定
factors = {
'モデルスコア': model_score * self.weights['model'],
'行動パターン': behavioral_score * self.weights['behavioral'],
'取引速度': velocity_score * self.weights['velocity'],
'コンテキスト': contextual_score * self.weights['contextual'],
}
top_factor = max(factors, key=factors.get)
return RiskContext(
model_score=model_score,
behavioral_score=behavioral_score,
velocity_score=velocity_score,
contextual_score=contextual_score,
risk_score=risk_score,
risk_level=risk_level.value,
explanation=self._generate_explanation(factors),
top_risk_factor=top_factor,
)
軸2: 行動パターン分析
def _evaluate_behavior(self, txn: dict,
history: dict) -> float:
"""ユーザーの過去の行動からの逸脱度を評価"""
risk_signals = []
# 1. 金額の逸脱度
if history.get('avg_amount', 0) > 0:
amount_ratio = txn['amount'] / history['avg_amount']
if amount_ratio > 5.0:
risk_signals.append(0.9) # 平均の5倍以上
elif amount_ratio > 3.0:
risk_signals.append(0.6) # 平均の3倍以上
elif amount_ratio > 2.0:
risk_signals.append(0.3) # 平均の2倍以上
else:
risk_signals.append(0.1)
else:
risk_signals.append(0.5) # 履歴なし
# 2. 新しい店舗での取引
if txn.get('is_new_merchant', False):
risk_signals.append(0.4)
else:
risk_signals.append(0.0)
# 3. 普段と異なるカテゴリ
usual_categories = set(history.get('frequent_categories', []))
if txn.get('category') not in usual_categories:
risk_signals.append(0.5)
else:
risk_signals.append(0.0)
# 4. アカウントの年齢
account_age_days = history.get('account_age_days', 0)
if account_age_days < 7:
risk_signals.append(0.8) # 1週間未満
elif account_age_days < 30:
risk_signals.append(0.4) # 1ヶ月未満
else:
risk_signals.append(0.1)
return float(np.mean(risk_signals))
軸3: 取引速度分析
def _evaluate_velocity(self, txn: dict,
history: dict) -> float:
"""短時間での取引集中度を評価"""
risk_signals = []
# 1. 直近1時間の取引回数
txn_1h = history.get('txn_count_1h', 0)
if txn_1h >= 10:
risk_signals.append(1.0) # 異常な速度
elif txn_1h >= 5:
risk_signals.append(0.7)
elif txn_1h >= 3:
risk_signals.append(0.3)
else:
risk_signals.append(0.0)
# 2. 前回取引からの経過時間
time_since_last = history.get('time_since_last_txn', 86400)
if time_since_last < 60: # 1分以内
risk_signals.append(0.9)
elif time_since_last < 300: # 5分以内
risk_signals.append(0.5)
elif time_since_last < 3600: # 1時間以内
risk_signals.append(0.2)
else:
risk_signals.append(0.0)
# 3. 直近24時間の累計金額
total_24h = history.get('total_amount_24h', 0) + txn['amount']
daily_limit = history.get('daily_avg_amount', 10000) * 3
if total_24h > daily_limit:
risk_signals.append(0.8)
else:
ratio = total_24h / daily_limit if daily_limit > 0 else 0.5
risk_signals.append(min(ratio, 1.0))
return float(np.mean(risk_signals))
軸4: コンテキスト分析
def _evaluate_context(self, txn: dict) -> float:
"""取引のコンテキスト情報からリスクを評価"""
risk_signals = []
# 1. 時間帯リスク
hour = txn.get('hour_of_day', 12)
if 2 <= hour <= 5: # 深夜帯
risk_signals.append(0.6)
elif 23 <= hour or hour <= 1: # 夜間
risk_signals.append(0.3)
else:
risk_signals.append(0.0)
# 2. デバイスリスク
device_risk = txn.get('device_risk_score', 0.5)
risk_signals.append(device_risk)
# 3. 地理的リスク(IPと住所の不一致)
if txn.get('ip_country') != txn.get('billing_country'):
risk_signals.append(0.7)
else:
risk_signals.append(0.0)
# 4. 商品カテゴリリスク
high_risk_categories = [
'electronics', 'gift_cards', 'cryptocurrency', 'luxury'
]
if txn.get('category') in high_risk_categories:
risk_signals.append(0.4)
else:
risk_signals.append(0.0)
return float(np.mean(risk_signals))
ルールエンジンとの統合
MLスコアに加え、確定的なルールでオーバーライドする仕組みも必要である。
class RuleEngine:
"""確定的ルールによるオーバーライド"""
def __init__(self):
self.rules = [
self._check_blacklist,
self._check_amount_limit,
self._check_velocity_limit,
self._check_country_block,
]
def apply(self, txn: dict, risk_context: RiskContext) -> RiskContext:
"""ルールを順次適用し、必要ならリスクレベルをオーバーライド"""
for rule in self.rules:
override = rule(txn)
if override:
risk_context.risk_level = override['level']
risk_context.explanation += f" [ルール適用: {override['reason']}]"
break # 最も厳しいルールで停止
return risk_context
def _check_blacklist(self, txn: dict) -> dict | None:
"""ブラックリストチェック"""
# 実際にはDBやRedisからブラックリストを参照
blacklisted_ips = self._get_blacklist('ip')
if txn.get('ip_address') in blacklisted_ips:
return {
'level': 'critical',
'reason': 'ブラックリスト登録IPからの取引'
}
return None
def _check_amount_limit(self, txn: dict) -> dict | None:
"""取引金額上限チェック"""
if txn['amount'] > 500_000: # 50万円超
return {
'level': 'high',
'reason': f"高額取引: {txn['amount']:,.0f}円"
}
return None
def _check_velocity_limit(self, txn: dict) -> dict | None:
"""速度制限チェック"""
if txn.get('txn_count_1h', 0) > 20:
return {
'level': 'critical',
'reason': '1時間に20件以上の取引を検出'
}
return None
def _check_country_block(self, txn: dict) -> dict | None:
"""国別ブロック"""
blocked_countries = ['XX', 'YY'] # 取引禁止国
if txn.get('ip_country') in blocked_countries:
return {
'level': 'critical',
'reason': f"取引禁止国: {txn['ip_country']}"
}
return None
リスク評価の可視化
def visualize_risk_breakdown(context: RiskContext):
"""リスク評価の内訳をダッシュボード用に整形"""
breakdown = {
'total_score': round(context.risk_score, 3),
'risk_level': context.risk_level,
'components': [
{
'name': 'モデルスコア',
'score': round(context.model_score, 3),
'weight': 0.40,
'contribution': round(context.model_score * 0.40, 3),
},
{
'name': '行動パターン',
'score': round(context.behavioral_score, 3),
'weight': 0.25,
'contribution': round(context.behavioral_score * 0.25, 3),
},
{
'name': '取引速度',
'score': round(context.velocity_score, 3),
'weight': 0.20,
'contribution': round(context.velocity_score * 0.20, 3),
},
{
'name': 'コンテキスト',
'score': round(context.contextual_score, 3),
'weight': 0.15,
'contribution': round(context.contextual_score * 0.15, 3),
},
],
'top_factor': context.top_risk_factor,
'explanation': context.explanation,
}
return breakdown
まとめ
| 項目 | ポイント |
|---|---|
| 多次元評価 | モデルスコア、行動、速度、コンテキストの4軸 |
| 行動分析 | 金額逸脱、新店舗、新カテゴリ、アカウント年齢 |
| 速度分析 | 取引頻度、前回からの間隔、累計金額 |
| コンテキスト | 時間帯、デバイス、地理、商品カテゴリ |
| ルールエンジン | ブラックリスト等の確定的ルールでMLをオーバーライド |
チェックリスト
- モデルスコアだけでは不十分な理由を説明できる
- 4つの評価軸を列挙し、それぞれの役割を説明できる
- ルールエンジンとMLの組み合わせ方を理解した
- リスク評価結果の可視化の意義を説明できる
次のステップへ
リスク評価の仕組みを理解したところで、次はアラートワークフローの設計に入ろう。
推定読了時間: 30分