リアルタイムスコアリングの実装
「エージェントの設計図はできた。次は心臓部であるスコアリングエンジンを実装する。」
田中VPoEがモニターにレイテンシグラフを表示する。
「NetShop社の決済処理は200ms以内に完了する必要がある。そのうちスコアリングに使えるのは50ms。この制約の中で最大限の精度を出す方法を考えよう。」
リアルタイムスコアリングの要件
レイテンシバジェット
決済処理全体: 200ms
├── 認証・認可: 30ms
├── 不正スコアリング: 50ms(ここが対象)
│ ├── 特徴量生成: 20ms
│ ├── モデル推論: 20ms
│ └── 後処理: 10ms
├── 決済処理: 80ms
└── レスポンス生成: 40ms
スループット要件
# NetShop社の取引ピーク時
peak_tps = 500 # 秒間500取引
daily_transactions = 600_000 # 日間60万件
model_latency_p99 = 50 # P99で50ms以内
特徴量エンジンの実装
リアルタイム特徴量とバッチ特徴量の分離
from dataclasses import dataclass
from typing import Optional
import redis
import time
@dataclass
class TransactionFeatures:
"""取引特徴量"""
# バッチ特徴量(事前計算済み、特徴量ストアから取得)
user_avg_amount_30d: float # 過去30日の平均取引額
user_txn_count_30d: int # 過去30日の取引回数
user_unique_merchants_30d: int # 過去30日のユニーク店舗数
user_max_amount_30d: float # 過去30日の最大取引額
device_risk_score: float # デバイスリスクスコア
# リアルタイム特徴量(取引時に計算)
amount: float # 取引金額
amount_zscore: float # 金額のZスコア(ユーザー基準)
time_since_last_txn: float # 前回取引からの経過秒数
txn_count_1h: int # 直近1時間の取引回数
txn_count_24h: int # 直近24時間の取引回数
is_new_merchant: bool # 初めての店舗か
hour_of_day: int # 取引時刻(時)
is_weekend: bool # 週末か
class FeatureEngine:
"""リアルタイム特徴量生成エンジン"""
def __init__(self, feature_store: redis.Redis):
self.store = feature_store
def extract(self, transaction: dict) -> TransactionFeatures:
user_id = transaction['user_id']
# バッチ特徴量をRedisから取得(< 5ms)
batch_features = self._get_batch_features(user_id)
# リアルタイム特徴量を計算(< 15ms)
realtime_features = self._compute_realtime_features(
transaction, batch_features
)
return TransactionFeatures(**batch_features, **realtime_features)
def _get_batch_features(self, user_id: str) -> dict:
"""特徴量ストア(Redis)から事前計算済み特徴量を取得"""
key = f"user_features:{user_id}"
cached = self.store.hgetall(key)
if not cached:
# 新規ユーザーはデフォルト値を返す
return {
'user_avg_amount_30d': 0.0,
'user_txn_count_30d': 0,
'user_unique_merchants_30d': 0,
'user_max_amount_30d': 0.0,
'device_risk_score': 0.5, # 不明はミドルリスク
}
return {
'user_avg_amount_30d': float(cached[b'avg_amount']),
'user_txn_count_30d': int(cached[b'txn_count']),
'user_unique_merchants_30d': int(cached[b'unique_merchants']),
'user_max_amount_30d': float(cached[b'max_amount']),
'device_risk_score': float(cached[b'device_risk']),
}
def _compute_realtime_features(self, txn: dict,
batch: dict) -> dict:
"""リアルタイムで計算する特徴量"""
from datetime import datetime
now = datetime.now()
amount = txn['amount']
avg = batch['user_avg_amount_30d']
std = self._get_user_std(txn['user_id'])
# 直近の取引カウント(Redisのソート済みセットで高速集計)
txn_count_1h = self._count_recent_txns(txn['user_id'], 3600)
txn_count_24h = self._count_recent_txns(txn['user_id'], 86400)
return {
'amount': amount,
'amount_zscore': (amount - avg) / std if std > 0 else 0.0,
'time_since_last_txn': self._time_since_last(txn['user_id']),
'txn_count_1h': txn_count_1h,
'txn_count_24h': txn_count_24h,
'is_new_merchant': self._is_new_merchant(
txn['user_id'], txn['merchant_id']
),
'hour_of_day': now.hour,
'is_weekend': now.weekday() >= 5,
}
def _count_recent_txns(self, user_id: str,
seconds: int) -> int:
"""Redisソート済みセットで直近N秒の取引数を高速カウント"""
key = f"txn_times:{user_id}"
now = time.time()
return self.store.zcount(key, now - seconds, now)
def _time_since_last(self, user_id: str) -> float:
key = f"txn_times:{user_id}"
last = self.store.zrevrange(key, 0, 0, withscores=True)
if last:
return time.time() - last[0][1]
return 86400.0 # 初回取引は24時間として扱う
def _is_new_merchant(self, user_id: str,
merchant_id: str) -> bool:
key = f"user_merchants:{user_id}"
return not self.store.sismember(key, merchant_id)
def _get_user_std(self, user_id: str) -> float:
key = f"user_features:{user_id}"
std = self.store.hget(key, 'amount_std')
return float(std) if std else 1.0
モデルサービングの実装
推論パイプライン
import numpy as np
import joblib
from typing import Dict
class ScoringEngine:
"""モデルスコアリングエンジン"""
def __init__(self, model_dir: str):
# モデルをメモリにロード(起動時に1回だけ)
self.xgboost_model = joblib.load(f'{model_dir}/xgboost.pkl')
self.isolation_forest = joblib.load(f'{model_dir}/iforest.pkl')
self.scaler = joblib.load(f'{model_dir}/scaler.pkl')
# アンサンブル重み
self.weights = {'xgboost': 0.6, 'isolation_forest': 0.4}
def predict(self, features: TransactionFeatures) -> Dict[str, float]:
"""アンサンブルスコアリング"""
# 特徴量をベクトル化
feature_vector = self._to_vector(features)
scaled = self.scaler.transform(feature_vector.reshape(1, -1))
# 各モデルで推論
xgb_score = float(
self.xgboost_model.predict_proba(scaled)[0, 1]
)
# Isolation Forestのスコアを0-1に正規化
if_raw = self.isolation_forest.decision_function(scaled)[0]
if_score = self._normalize_if_score(if_raw)
# 加重平均でアンサンブル
ensemble = (
self.weights['xgboost'] * xgb_score +
self.weights['isolation_forest'] * if_score
)
return {
'ensemble': float(np.clip(ensemble, 0, 1)),
'xgboost': xgb_score,
'isolation_forest': if_score,
}
def _to_vector(self, features: TransactionFeatures) -> np.ndarray:
"""特徴量オブジェクトをNumPy配列に変換"""
return np.array([
features.amount,
features.amount_zscore,
features.time_since_last_txn,
features.txn_count_1h,
features.txn_count_24h,
features.is_new_merchant,
features.hour_of_day,
features.is_weekend,
features.user_avg_amount_30d,
features.user_txn_count_30d,
features.user_unique_merchants_30d,
features.user_max_amount_30d,
features.device_risk_score,
])
def _normalize_if_score(self, raw_score: float) -> float:
"""Isolation Forestのスコアを0-1に変換"""
# decision_functionは正常が正、異常が負
# シグモイド変換で0-1に正規化
return 1 / (1 + np.exp(raw_score * 5))
パフォーマンス最適化
モデルの軽量化
# 1. XGBoostの推論高速化
import xgboost as xgb
# 不要な木を削除(精度とのトレードオフ)
model = xgb.XGBClassifier(
n_estimators=200, # 本番用に削減
max_depth=6, # 浅めの木
tree_method='hist', # ヒストグラムベース
)
# 2. ONNX形式への変換(推論高速化)
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
initial_type = [('features', FloatTensorType([None, 13]))]
onnx_model = convert_sklearn(model, initial_types=initial_type)
# ONNX Runtimeで推論
import onnxruntime as ort
session = ort.InferenceSession('model.onnx')
result = session.run(None, {'features': feature_array})
バッチ処理の活用
import asyncio
from collections import deque
class BatchScoringEngine:
"""マイクロバッチで推論スループットを向上"""
def __init__(self, model, batch_size=32, max_wait_ms=10):
self.model = model
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.queue = deque()
async def predict_single(self, features: np.ndarray) -> float:
"""単一リクエストをバッチにまとめて処理"""
future = asyncio.get_event_loop().create_future()
self.queue.append((features, future))
if len(self.queue) >= self.batch_size:
await self._flush_batch()
else:
# max_wait_ms後に強制フラッシュ
asyncio.get_event_loop().call_later(
self.max_wait_ms / 1000, self._flush_batch_sync
)
return await future
async def _flush_batch(self):
"""バッチ推論を実行"""
if not self.queue:
return
batch_items = []
while self.queue and len(batch_items) < self.batch_size:
batch_items.append(self.queue.popleft())
features_batch = np.array([item[0] for item in batch_items])
scores = self.model.predict_proba(features_batch)[:, 1]
for (_, future), score in zip(batch_items, scores):
future.set_result(float(score))
レイテンシの計測とモニタリング
import time
from dataclasses import dataclass
@dataclass
class LatencyMetrics:
feature_extraction_ms: float
model_inference_ms: float
post_processing_ms: float
total_ms: float
class InstrumentedScoringPipeline:
"""レイテンシ計測付きスコアリングパイプライン"""
def __init__(self, feature_engine, scoring_engine):
self.feature_engine = feature_engine
self.scoring_engine = scoring_engine
self.latency_history = []
def score(self, transaction: dict) -> tuple:
t0 = time.perf_counter()
# 特徴量生成
features = self.feature_engine.extract(transaction)
t1 = time.perf_counter()
# モデル推論
scores = self.scoring_engine.predict(features)
t2 = time.perf_counter()
# 後処理(リスクレベル判定等)
risk_level = self._classify_risk(scores['ensemble'])
t3 = time.perf_counter()
metrics = LatencyMetrics(
feature_extraction_ms=(t1 - t0) * 1000,
model_inference_ms=(t2 - t1) * 1000,
post_processing_ms=(t3 - t2) * 1000,
total_ms=(t3 - t0) * 1000,
)
self.latency_history.append(metrics)
# P99が50msを超えたらアラート
if metrics.total_ms > 50:
self._emit_latency_alert(metrics)
return scores, risk_level, metrics
まとめ
| 項目 | ポイント |
|---|---|
| レイテンシバジェット | スコアリング全体で50ms以内 |
| 特徴量設計 | バッチ(Redis事前計算)+ リアルタイム(取引時計算)の2層 |
| モデルサービング | メモリロード + ONNX変換で高速推論 |
| スループット | マイクロバッチで秒間500取引に対応 |
| モニタリング | レイテンシP99を常時監視 |
チェックリスト
- レイテンシバジェットの配分を説明できる
- バッチ特徴量とリアルタイム特徴量の使い分けを理解した
- Redisを使った特徴量ストアの仕組みを説明できる
- モデル推論の高速化手法を2つ以上挙げられる
次のステップへ
スコアリングの仕組みを理解したところで、次はリスク評価ロジックの詳細設計に入ろう。
推定読了時間: 30分