LESSON

リスク評価ロジックの設計

「モデルスコアだけでリスク判断をするのは危険だ。コンテキストを加味した総合判断が必要になる。」

田中VPoEが過去の不正事例ファイルを開く。

「この事例を見てくれ。モデルスコアは0.4で中リスクだったが、実際には組織的な不正グループの一端だった。スコアだけでは見えないリスクがある。」

スコアだけでは不十分な理由

モデルのスコアは過去のパターンに基づく確率的な判断である。しかし、実際の不正リスクはコンテキストによって大きく変動する。

コンテキストが判断を変える例

ケース1: スコア0.7 + 常連顧客 + いつもの購入パターン
  → 実質リスク: LOW(一時的な金額増加)

ケース2: スコア0.4 + 新規アカウント + 高額商品 + 深夜
  → 実質リスク: HIGH(新規アカウント詐欺の典型パターン)

ケース3: スコア0.3 + 過去30分で同一IPから5件の取引
  → 実質リスク: CRITICAL(ボット攻撃の可能性)

多次元リスク評価フレームワーク

リスクの4つの評価軸

from dataclasses import dataclass

@dataclass
class RiskContext:
    """多次元リスク評価の結果"""
    # 各軸のスコア(0.0 - 1.0)
    model_score: float          # 軸1: モデルスコア
    behavioral_score: float     # 軸2: 行動パターンスコア
    velocity_score: float       # 軸3: 取引速度スコア
    contextual_score: float     # 軸4: コンテキストスコア

    # 統合スコアとリスクレベル
    risk_score: float
    risk_level: str
    explanation: str
    top_risk_factor: str


class RiskAssessor:
    """多次元リスク評価エンジン"""

    def __init__(self):
        # 各軸の重み(合計1.0)
        self.weights = {
            'model': 0.40,
            'behavioral': 0.25,
            'velocity': 0.20,
            'contextual': 0.15,
        }
        self.thresholds = RiskThresholds()

    def evaluate(self, scores: dict, transaction: dict,
                 user_history: dict) -> RiskContext:
        # 軸1: モデルスコア
        model_score = scores['ensemble']

        # 軸2: 行動パターン分析
        behavioral_score = self._evaluate_behavior(
            transaction, user_history
        )

        # 軸3: 取引速度分析
        velocity_score = self._evaluate_velocity(
            transaction, user_history
        )

        # 軸4: コンテキスト分析
        contextual_score = self._evaluate_context(transaction)

        # 統合スコア
        risk_score = (
            self.weights['model'] * model_score +
            self.weights['behavioral'] * behavioral_score +
            self.weights['velocity'] * velocity_score +
            self.weights['contextual'] * contextual_score
        )

        # リスクレベル判定
        risk_level = self.thresholds.classify(risk_score)

        # 最大寄与要因の特定
        factors = {
            'モデルスコア': model_score * self.weights['model'],
            '行動パターン': behavioral_score * self.weights['behavioral'],
            '取引速度': velocity_score * self.weights['velocity'],
            'コンテキスト': contextual_score * self.weights['contextual'],
        }
        top_factor = max(factors, key=factors.get)

        return RiskContext(
            model_score=model_score,
            behavioral_score=behavioral_score,
            velocity_score=velocity_score,
            contextual_score=contextual_score,
            risk_score=risk_score,
            risk_level=risk_level.value,
            explanation=self._generate_explanation(factors),
            top_risk_factor=top_factor,
        )

軸2: 行動パターン分析

def _evaluate_behavior(self, txn: dict,
                        history: dict) -> float:
    """ユーザーの過去の行動からの逸脱度を評価"""
    risk_signals = []

    # 1. 金額の逸脱度
    if history.get('avg_amount', 0) > 0:
        amount_ratio = txn['amount'] / history['avg_amount']
        if amount_ratio > 5.0:
            risk_signals.append(0.9)   # 平均の5倍以上
        elif amount_ratio > 3.0:
            risk_signals.append(0.6)   # 平均の3倍以上
        elif amount_ratio > 2.0:
            risk_signals.append(0.3)   # 平均の2倍以上
        else:
            risk_signals.append(0.1)
    else:
        risk_signals.append(0.5)       # 履歴なし

    # 2. 新しい店舗での取引
    if txn.get('is_new_merchant', False):
        risk_signals.append(0.4)
    else:
        risk_signals.append(0.0)

    # 3. 普段と異なるカテゴリ
    usual_categories = set(history.get('frequent_categories', []))
    if txn.get('category') not in usual_categories:
        risk_signals.append(0.5)
    else:
        risk_signals.append(0.0)

    # 4. アカウントの年齢
    account_age_days = history.get('account_age_days', 0)
    if account_age_days < 7:
        risk_signals.append(0.8)       # 1週間未満
    elif account_age_days < 30:
        risk_signals.append(0.4)       # 1ヶ月未満
    else:
        risk_signals.append(0.1)

    return float(np.mean(risk_signals))

軸3: 取引速度分析

def _evaluate_velocity(self, txn: dict,
                        history: dict) -> float:
    """短時間での取引集中度を評価"""
    risk_signals = []

    # 1. 直近1時間の取引回数
    txn_1h = history.get('txn_count_1h', 0)
    if txn_1h >= 10:
        risk_signals.append(1.0)       # 異常な速度
    elif txn_1h >= 5:
        risk_signals.append(0.7)
    elif txn_1h >= 3:
        risk_signals.append(0.3)
    else:
        risk_signals.append(0.0)

    # 2. 前回取引からの経過時間
    time_since_last = history.get('time_since_last_txn', 86400)
    if time_since_last < 60:           # 1分以内
        risk_signals.append(0.9)
    elif time_since_last < 300:        # 5分以内
        risk_signals.append(0.5)
    elif time_since_last < 3600:       # 1時間以内
        risk_signals.append(0.2)
    else:
        risk_signals.append(0.0)

    # 3. 直近24時間の累計金額
    total_24h = history.get('total_amount_24h', 0) + txn['amount']
    daily_limit = history.get('daily_avg_amount', 10000) * 3
    if total_24h > daily_limit:
        risk_signals.append(0.8)
    else:
        ratio = total_24h / daily_limit if daily_limit > 0 else 0.5
        risk_signals.append(min(ratio, 1.0))

    return float(np.mean(risk_signals))

軸4: コンテキスト分析

def _evaluate_context(self, txn: dict) -> float:
    """取引のコンテキスト情報からリスクを評価"""
    risk_signals = []

    # 1. 時間帯リスク
    hour = txn.get('hour_of_day', 12)
    if 2 <= hour <= 5:                 # 深夜帯
        risk_signals.append(0.6)
    elif 23 <= hour or hour <= 1:      # 夜間
        risk_signals.append(0.3)
    else:
        risk_signals.append(0.0)

    # 2. デバイスリスク
    device_risk = txn.get('device_risk_score', 0.5)
    risk_signals.append(device_risk)

    # 3. 地理的リスク(IPと住所の不一致)
    if txn.get('ip_country') != txn.get('billing_country'):
        risk_signals.append(0.7)
    else:
        risk_signals.append(0.0)

    # 4. 商品カテゴリリスク
    high_risk_categories = [
        'electronics', 'gift_cards', 'cryptocurrency', 'luxury'
    ]
    if txn.get('category') in high_risk_categories:
        risk_signals.append(0.4)
    else:
        risk_signals.append(0.0)

    return float(np.mean(risk_signals))

ルールエンジンとの統合

MLスコアに加え、確定的なルールでオーバーライドする仕組みも必要である。

class RuleEngine:
    """確定的ルールによるオーバーライド"""

    def __init__(self):
        self.rules = [
            self._check_blacklist,
            self._check_amount_limit,
            self._check_velocity_limit,
            self._check_country_block,
        ]

    def apply(self, txn: dict, risk_context: RiskContext) -> RiskContext:
        """ルールを順次適用し、必要ならリスクレベルをオーバーライド"""
        for rule in self.rules:
            override = rule(txn)
            if override:
                risk_context.risk_level = override['level']
                risk_context.explanation += f" [ルール適用: {override['reason']}]"
                break  # 最も厳しいルールで停止
        return risk_context

    def _check_blacklist(self, txn: dict) -> dict | None:
        """ブラックリストチェック"""
        # 実際にはDBやRedisからブラックリストを参照
        blacklisted_ips = self._get_blacklist('ip')
        if txn.get('ip_address') in blacklisted_ips:
            return {
                'level': 'critical',
                'reason': 'ブラックリスト登録IPからの取引'
            }
        return None

    def _check_amount_limit(self, txn: dict) -> dict | None:
        """取引金額上限チェック"""
        if txn['amount'] > 500_000:    # 50万円超
            return {
                'level': 'high',
                'reason': f"高額取引: {txn['amount']:,.0f}円"
            }
        return None

    def _check_velocity_limit(self, txn: dict) -> dict | None:
        """速度制限チェック"""
        if txn.get('txn_count_1h', 0) > 20:
            return {
                'level': 'critical',
                'reason': '1時間に20件以上の取引を検出'
            }
        return None

    def _check_country_block(self, txn: dict) -> dict | None:
        """国別ブロック"""
        blocked_countries = ['XX', 'YY']  # 取引禁止国
        if txn.get('ip_country') in blocked_countries:
            return {
                'level': 'critical',
                'reason': f"取引禁止国: {txn['ip_country']}"
            }
        return None

リスク評価の可視化

def visualize_risk_breakdown(context: RiskContext):
    """リスク評価の内訳をダッシュボード用に整形"""
    breakdown = {
        'total_score': round(context.risk_score, 3),
        'risk_level': context.risk_level,
        'components': [
            {
                'name': 'モデルスコア',
                'score': round(context.model_score, 3),
                'weight': 0.40,
                'contribution': round(context.model_score * 0.40, 3),
            },
            {
                'name': '行動パターン',
                'score': round(context.behavioral_score, 3),
                'weight': 0.25,
                'contribution': round(context.behavioral_score * 0.25, 3),
            },
            {
                'name': '取引速度',
                'score': round(context.velocity_score, 3),
                'weight': 0.20,
                'contribution': round(context.velocity_score * 0.20, 3),
            },
            {
                'name': 'コンテキスト',
                'score': round(context.contextual_score, 3),
                'weight': 0.15,
                'contribution': round(context.contextual_score * 0.15, 3),
            },
        ],
        'top_factor': context.top_risk_factor,
        'explanation': context.explanation,
    }
    return breakdown

まとめ

項目ポイント
多次元評価モデルスコア、行動、速度、コンテキストの4軸
行動分析金額逸脱、新店舗、新カテゴリ、アカウント年齢
速度分析取引頻度、前回からの間隔、累計金額
コンテキスト時間帯、デバイス、地理、商品カテゴリ
ルールエンジンブラックリスト等の確定的ルールでMLをオーバーライド

チェックリスト

  • モデルスコアだけでは不十分な理由を説明できる
  • 4つの評価軸を列挙し、それぞれの役割を説明できる
  • ルールエンジンとMLの組み合わせ方を理解した
  • リスク評価結果の可視化の意義を説明できる

次のステップへ

リスク評価の仕組みを理解したところで、次はアラートワークフローの設計に入ろう。

推定読了時間: 30分