LESSON

リアルタイムスコアリングの実装

「エージェントの設計図はできた。次は心臓部であるスコアリングエンジンを実装する。」

田中VPoEがモニターにレイテンシグラフを表示する。

「NetShop社の決済処理は200ms以内に完了する必要がある。そのうちスコアリングに使えるのは50ms。この制約の中で最大限の精度を出す方法を考えよう。」

リアルタイムスコアリングの要件

レイテンシバジェット

決済処理全体: 200ms
├── 認証・認可: 30ms
├── 不正スコアリング: 50ms(ここが対象)
│   ├── 特徴量生成: 20ms
│   ├── モデル推論: 20ms
│   └── 後処理: 10ms
├── 決済処理: 80ms
└── レスポンス生成: 40ms

スループット要件

# NetShop社の取引ピーク時
peak_tps = 500          # 秒間500取引
daily_transactions = 600_000  # 日間60万件
model_latency_p99 = 50  # P99で50ms以内

特徴量エンジンの実装

リアルタイム特徴量とバッチ特徴量の分離

from dataclasses import dataclass
from typing import Optional
import redis
import time

@dataclass
class TransactionFeatures:
    """取引特徴量"""
    # バッチ特徴量(事前計算済み、特徴量ストアから取得)
    user_avg_amount_30d: float       # 過去30日の平均取引額
    user_txn_count_30d: int          # 過去30日の取引回数
    user_unique_merchants_30d: int   # 過去30日のユニーク店舗数
    user_max_amount_30d: float       # 過去30日の最大取引額
    device_risk_score: float         # デバイスリスクスコア

    # リアルタイム特徴量(取引時に計算)
    amount: float                    # 取引金額
    amount_zscore: float             # 金額のZスコア(ユーザー基準)
    time_since_last_txn: float       # 前回取引からの経過秒数
    txn_count_1h: int                # 直近1時間の取引回数
    txn_count_24h: int               # 直近24時間の取引回数
    is_new_merchant: bool            # 初めての店舗か
    hour_of_day: int                 # 取引時刻(時)
    is_weekend: bool                 # 週末か


class FeatureEngine:
    """リアルタイム特徴量生成エンジン"""

    def __init__(self, feature_store: redis.Redis):
        self.store = feature_store

    def extract(self, transaction: dict) -> TransactionFeatures:
        user_id = transaction['user_id']

        # バッチ特徴量をRedisから取得(< 5ms)
        batch_features = self._get_batch_features(user_id)

        # リアルタイム特徴量を計算(< 15ms)
        realtime_features = self._compute_realtime_features(
            transaction, batch_features
        )

        return TransactionFeatures(**batch_features, **realtime_features)

    def _get_batch_features(self, user_id: str) -> dict:
        """特徴量ストア(Redis)から事前計算済み特徴量を取得"""
        key = f"user_features:{user_id}"
        cached = self.store.hgetall(key)

        if not cached:
            # 新規ユーザーはデフォルト値を返す
            return {
                'user_avg_amount_30d': 0.0,
                'user_txn_count_30d': 0,
                'user_unique_merchants_30d': 0,
                'user_max_amount_30d': 0.0,
                'device_risk_score': 0.5,  # 不明はミドルリスク
            }

        return {
            'user_avg_amount_30d': float(cached[b'avg_amount']),
            'user_txn_count_30d': int(cached[b'txn_count']),
            'user_unique_merchants_30d': int(cached[b'unique_merchants']),
            'user_max_amount_30d': float(cached[b'max_amount']),
            'device_risk_score': float(cached[b'device_risk']),
        }

    def _compute_realtime_features(self, txn: dict,
                                    batch: dict) -> dict:
        """リアルタイムで計算する特徴量"""
        from datetime import datetime

        now = datetime.now()
        amount = txn['amount']
        avg = batch['user_avg_amount_30d']
        std = self._get_user_std(txn['user_id'])

        # 直近の取引カウント(Redisのソート済みセットで高速集計)
        txn_count_1h = self._count_recent_txns(txn['user_id'], 3600)
        txn_count_24h = self._count_recent_txns(txn['user_id'], 86400)

        return {
            'amount': amount,
            'amount_zscore': (amount - avg) / std if std > 0 else 0.0,
            'time_since_last_txn': self._time_since_last(txn['user_id']),
            'txn_count_1h': txn_count_1h,
            'txn_count_24h': txn_count_24h,
            'is_new_merchant': self._is_new_merchant(
                txn['user_id'], txn['merchant_id']
            ),
            'hour_of_day': now.hour,
            'is_weekend': now.weekday() >= 5,
        }

    def _count_recent_txns(self, user_id: str,
                            seconds: int) -> int:
        """Redisソート済みセットで直近N秒の取引数を高速カウント"""
        key = f"txn_times:{user_id}"
        now = time.time()
        return self.store.zcount(key, now - seconds, now)

    def _time_since_last(self, user_id: str) -> float:
        key = f"txn_times:{user_id}"
        last = self.store.zrevrange(key, 0, 0, withscores=True)
        if last:
            return time.time() - last[0][1]
        return 86400.0  # 初回取引は24時間として扱う

    def _is_new_merchant(self, user_id: str,
                          merchant_id: str) -> bool:
        key = f"user_merchants:{user_id}"
        return not self.store.sismember(key, merchant_id)

    def _get_user_std(self, user_id: str) -> float:
        key = f"user_features:{user_id}"
        std = self.store.hget(key, 'amount_std')
        return float(std) if std else 1.0

モデルサービングの実装

推論パイプライン

import numpy as np
import joblib
from typing import Dict

class ScoringEngine:
    """モデルスコアリングエンジン"""

    def __init__(self, model_dir: str):
        # モデルをメモリにロード(起動時に1回だけ)
        self.xgboost_model = joblib.load(f'{model_dir}/xgboost.pkl')
        self.isolation_forest = joblib.load(f'{model_dir}/iforest.pkl')
        self.scaler = joblib.load(f'{model_dir}/scaler.pkl')

        # アンサンブル重み
        self.weights = {'xgboost': 0.6, 'isolation_forest': 0.4}

    def predict(self, features: TransactionFeatures) -> Dict[str, float]:
        """アンサンブルスコアリング"""
        # 特徴量をベクトル化
        feature_vector = self._to_vector(features)
        scaled = self.scaler.transform(feature_vector.reshape(1, -1))

        # 各モデルで推論
        xgb_score = float(
            self.xgboost_model.predict_proba(scaled)[0, 1]
        )

        # Isolation Forestのスコアを0-1に正規化
        if_raw = self.isolation_forest.decision_function(scaled)[0]
        if_score = self._normalize_if_score(if_raw)

        # 加重平均でアンサンブル
        ensemble = (
            self.weights['xgboost'] * xgb_score +
            self.weights['isolation_forest'] * if_score
        )

        return {
            'ensemble': float(np.clip(ensemble, 0, 1)),
            'xgboost': xgb_score,
            'isolation_forest': if_score,
        }

    def _to_vector(self, features: TransactionFeatures) -> np.ndarray:
        """特徴量オブジェクトをNumPy配列に変換"""
        return np.array([
            features.amount,
            features.amount_zscore,
            features.time_since_last_txn,
            features.txn_count_1h,
            features.txn_count_24h,
            features.is_new_merchant,
            features.hour_of_day,
            features.is_weekend,
            features.user_avg_amount_30d,
            features.user_txn_count_30d,
            features.user_unique_merchants_30d,
            features.user_max_amount_30d,
            features.device_risk_score,
        ])

    def _normalize_if_score(self, raw_score: float) -> float:
        """Isolation Forestのスコアを0-1に変換"""
        # decision_functionは正常が正、異常が負
        # シグモイド変換で0-1に正規化
        return 1 / (1 + np.exp(raw_score * 5))

パフォーマンス最適化

モデルの軽量化

# 1. XGBoostの推論高速化
import xgboost as xgb

# 不要な木を削除(精度とのトレードオフ)
model = xgb.XGBClassifier(
    n_estimators=200,       # 本番用に削減
    max_depth=6,            # 浅めの木
    tree_method='hist',     # ヒストグラムベース
)

# 2. ONNX形式への変換(推論高速化)
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType

initial_type = [('features', FloatTensorType([None, 13]))]
onnx_model = convert_sklearn(model, initial_types=initial_type)

# ONNX Runtimeで推論
import onnxruntime as ort
session = ort.InferenceSession('model.onnx')
result = session.run(None, {'features': feature_array})

バッチ処理の活用

import asyncio
from collections import deque

class BatchScoringEngine:
    """マイクロバッチで推論スループットを向上"""

    def __init__(self, model, batch_size=32, max_wait_ms=10):
        self.model = model
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        self.queue = deque()

    async def predict_single(self, features: np.ndarray) -> float:
        """単一リクエストをバッチにまとめて処理"""
        future = asyncio.get_event_loop().create_future()
        self.queue.append((features, future))

        if len(self.queue) >= self.batch_size:
            await self._flush_batch()
        else:
            # max_wait_ms後に強制フラッシュ
            asyncio.get_event_loop().call_later(
                self.max_wait_ms / 1000, self._flush_batch_sync
            )

        return await future

    async def _flush_batch(self):
        """バッチ推論を実行"""
        if not self.queue:
            return

        batch_items = []
        while self.queue and len(batch_items) < self.batch_size:
            batch_items.append(self.queue.popleft())

        features_batch = np.array([item[0] for item in batch_items])
        scores = self.model.predict_proba(features_batch)[:, 1]

        for (_, future), score in zip(batch_items, scores):
            future.set_result(float(score))

レイテンシの計測とモニタリング

import time
from dataclasses import dataclass

@dataclass
class LatencyMetrics:
    feature_extraction_ms: float
    model_inference_ms: float
    post_processing_ms: float
    total_ms: float

class InstrumentedScoringPipeline:
    """レイテンシ計測付きスコアリングパイプライン"""

    def __init__(self, feature_engine, scoring_engine):
        self.feature_engine = feature_engine
        self.scoring_engine = scoring_engine
        self.latency_history = []

    def score(self, transaction: dict) -> tuple:
        t0 = time.perf_counter()

        # 特徴量生成
        features = self.feature_engine.extract(transaction)
        t1 = time.perf_counter()

        # モデル推論
        scores = self.scoring_engine.predict(features)
        t2 = time.perf_counter()

        # 後処理(リスクレベル判定等)
        risk_level = self._classify_risk(scores['ensemble'])
        t3 = time.perf_counter()

        metrics = LatencyMetrics(
            feature_extraction_ms=(t1 - t0) * 1000,
            model_inference_ms=(t2 - t1) * 1000,
            post_processing_ms=(t3 - t2) * 1000,
            total_ms=(t3 - t0) * 1000,
        )
        self.latency_history.append(metrics)

        # P99が50msを超えたらアラート
        if metrics.total_ms > 50:
            self._emit_latency_alert(metrics)

        return scores, risk_level, metrics

まとめ

項目ポイント
レイテンシバジェットスコアリング全体で50ms以内
特徴量設計バッチ(Redis事前計算)+ リアルタイム(取引時計算)の2層
モデルサービングメモリロード + ONNX変換で高速推論
スループットマイクロバッチで秒間500取引に対応
モニタリングレイテンシP99を常時監視

チェックリスト

  • レイテンシバジェットの配分を説明できる
  • バッチ特徴量とリアルタイム特徴量の使い分けを理解した
  • Redisを使った特徴量ストアの仕組みを説明できる
  • モデル推論の高速化手法を2つ以上挙げられる

次のステップへ

スコアリングの仕組みを理解したところで、次はリスク評価ロジックの詳細設計に入ろう。

推定読了時間: 30分