LESSON

演習:不正検知モデルのスコアを改善しよう

「ここまで学んだ手法を総動員して、最高の不正検知モデルを構築してくれ。」

田中VPoEが目標を示す。

「PR-AUC 0.80以上、かつビジネスコスト最小化。この2つを同時に達成するのが今回のミッションだ。」

ミッション概要

Credit Card Fraud Detectionデータセットを使い、複数のモデルを構築・比較し、最終的にビジネスコストを最小化する不正検知モデルを完成させる。


Mission 1: ベースラインの構築(20分)

3つの異なるアプローチでベースラインモデルを構築する。

タスク:

  1. Isolation Forestで異常検知モデルを構築し、PR-AUCを計測する
  2. LightGBM(is_unbalance=True)で教師あり学習モデルを構築し、PR-AUCを計測する
  3. SMOTE + XGBoostでモデルを構築し、PR-AUCを計測する
  4. 3つのモデルのPR-AUC、Recall(閾値0.5)、Precisionを比較表にまとめる
解答例
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.metrics import average_precision_score, recall_score, precision_score
import lightgbm as lgb
import xgboost as xgb
from imblearn.over_sampling import SMOTE

df = pd.read_csv('creditcard.csv')
X = df.drop('Class', axis=1)
y = df['Class']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

results = {}

# 1. Isolation Forest
iso = IsolationForest(n_estimators=200, contamination=0.002, random_state=42)
iso.fit(X_train_scaled)
scores_iso = -iso.decision_function(X_test_scaled)  # 高いほど異常
pr_auc_iso = average_precision_score(y_test, scores_iso)
results['Isolation Forest'] = {'PR-AUC': pr_auc_iso}

# 2. LightGBM
lgb_model = lgb.LGBMClassifier(
    is_unbalance=True, max_depth=6, learning_rate=0.05,
    n_estimators=300, random_state=42
)
lgb_model.fit(X_train_scaled, y_train,
              eval_set=[(X_test_scaled, y_test)],
              callbacks=[lgb.early_stopping(30), lgb.log_evaluation(0)])
y_prob_lgb = lgb_model.predict_proba(X_test_scaled)[:, 1]
results['LightGBM'] = {
    'PR-AUC': average_precision_score(y_test, y_prob_lgb),
    'Recall': recall_score(y_test, (y_prob_lgb >= 0.5).astype(int)),
    'Precision': precision_score(y_test, (y_prob_lgb >= 0.5).astype(int)),
}

# 3. SMOTE + XGBoost
smote = SMOTE(random_state=42)
X_smote, y_smote = smote.fit_resample(X_train_scaled, y_train)
xgb_model = xgb.XGBClassifier(
    max_depth=6, learning_rate=0.05, n_estimators=300,
    eval_metric='aucpr', random_state=42
)
xgb_model.fit(X_smote, y_smote,
              eval_set=[(X_test_scaled, y_test)], verbose=0)
y_prob_xgb = xgb_model.predict_proba(X_test_scaled)[:, 1]
results['SMOTE+XGBoost'] = {
    'PR-AUC': average_precision_score(y_test, y_prob_xgb),
    'Recall': recall_score(y_test, (y_prob_xgb >= 0.5).astype(int)),
    'Precision': precision_score(y_test, (y_prob_xgb >= 0.5).astype(int)),
}

print(pd.DataFrame(results).T)

Mission 2: モデルの改善(40分)

ベースラインを改善し、PR-AUC 0.80以上を目指す。

タスク:

  1. 特徴量エンジニアリング(Amountの対数変換、Timeの周期性、V特徴量間の交互作用等)を実施する
  2. LightGBMのハイパーパラメータをOptuna等で最適化する
  3. 複数モデルのアンサンブル(確率の重み付き平均)を構築する
  4. 5-Fold StratifiedKFoldで安定した性能を確認する
解答例
# 特徴量エンジニアリング
def create_features(df):
    df_feat = df.copy()

    # Amount の変換
    df_feat['Amount_log'] = np.log1p(df_feat['Amount'])
    df_feat['Amount_scaled'] = StandardScaler().fit_transform(
        df_feat[['Amount']]
    )

    # Time の周期性
    df_feat['Time_hour'] = (df_feat['Time'] / 3600) % 24
    df_feat['Time_sin'] = np.sin(2 * np.pi * df_feat['Time_hour'] / 24)
    df_feat['Time_cos'] = np.cos(2 * np.pi * df_feat['Time_hour'] / 24)

    # V特徴量の交互作用(上位重要特徴量)
    df_feat['V14_V12'] = df_feat['V14'] * df_feat['V12']
    df_feat['V14_V10'] = df_feat['V14'] * df_feat['V10']
    df_feat['V17_V14'] = df_feat['V17'] * df_feat['V14']

    # V特徴量の統計量
    v_cols = [f'V{i}' for i in range(1, 29)]
    df_feat['V_mean'] = df_feat[v_cols].mean(axis=1)
    df_feat['V_std'] = df_feat[v_cols].std(axis=1)

    return df_feat

# Optunaでハイパーパラメータ最適化
import optuna

def objective(trial):
    params = {
        'max_depth': trial.suggest_int('max_depth', 3, 8),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
        'n_estimators': trial.suggest_int('n_estimators', 100, 500),
        'num_leaves': trial.suggest_int('num_leaves', 15, 63),
        'min_child_samples': trial.suggest_int('min_child_samples', 10, 50),
        'subsample': trial.suggest_float('subsample', 0.6, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-3, 10, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-3, 10, log=True),
    }

    model = lgb.LGBMClassifier(is_unbalance=True, random_state=42, **params)
    # 交差検証
    skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    scores = []
    for train_idx, val_idx in skf.split(X_train_feat, y_train):
        X_t, X_v = X_train_feat[train_idx], X_train_feat[val_idx]
        y_t, y_v = y_train.iloc[train_idx], y_train.iloc[val_idx]
        model.fit(X_t, y_t, eval_set=[(X_v, y_v)],
                  callbacks=[lgb.early_stopping(20), lgb.log_evaluation(0)])
        y_prob = model.predict_proba(X_v)[:, 1]
        scores.append(average_precision_score(y_v, y_prob))
    return np.mean(scores)

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

# アンサンブル
y_ensemble = 0.4 * y_prob_lgb_best + 0.4 * y_prob_xgb_best + 0.2 * scores_iso_norm
pr_auc_ensemble = average_precision_score(y_test, y_ensemble)
print(f"Ensemble PR-AUC: {pr_auc_ensemble:.4f}")

Mission 3: 閾値最適化とコスト分析(30分)

最良モデルの閾値を最適化し、ビジネスコストを最小化する。

タスク:

  1. 最良モデルのコスト曲線を描画する(FN=50,000円、FP=500円)
  2. コスト最小化閾値を算出する
  3. 多段階閾値(BLOCK/REVIEW/MONITOR/APPROVE)を設計する
  4. 各段階の件数とコストを算出し、運用可能性を評価する
  5. 現行ルールベース(Recall 35%、FPR 2.5%)との比較表を作成する
解答例
# コスト最適化閾値の算出
from sklearn.metrics import confusion_matrix

thresholds = np.arange(0.01, 0.99, 0.005)
costs = []
for t in thresholds:
    y_pred = (y_prob_best >= t).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
    cost = fn * 50000 + fp * 500
    costs.append({'threshold': t, 'cost': cost, 'fn': fn, 'fp': fp,
                  'tp': tp, 'recall': tp/(tp+fn), 'precision': tp/(tp+fp) if (tp+fp)>0 else 0})

costs_df = pd.DataFrame(costs)
optimal = costs_df.loc[costs_df['cost'].idxmin()]
print(f"最適閾値: {optimal['threshold']:.3f}")
print(f"最小コスト: {optimal['cost']:,.0f}円")

# 多段階閾値の設計
multi_thresholds = {
    'BLOCK': 0.80,    # 高確信度 → 自動ブロック
    'REVIEW': 0.30,   # 中程度 → 人手確認
    'MONITOR': 0.10,  # 軽度 → ログ記録
}

for level, t in multi_thresholds.items():
    count = ((y_prob_best >= t) & (y_prob_best < multi_thresholds.get(
        {'MONITOR': 'REVIEW', 'REVIEW': 'BLOCK'}.get(level, None), 1.0
    ))).sum() if level != 'BLOCK' else (y_prob_best >= t).sum()
    print(f"{level}: 閾値>{t}, 件数={count}")

# 現行との比較
print("\n=== 現行 vs ML ===")
print(f"現行: Recall=35%, FPR=2.5%, 月間コスト≈5,300万円")
print(f"ML:   Recall={optimal['recall']:.0%}, "
      f"FP={optimal['fp']}件, 月間コスト≈{optimal['cost']:,.0f}円")

達成度チェック

  • 3つのベースラインモデルを構築・比較できた
  • 特徴量エンジニアリングで性能を改善できた
  • ハイパーパラメータ最適化を実施できた
  • アンサンブルモデルを構築できた
  • コスト最小化閾値を算出できた
  • 多段階閾値を設計し、運用可能性を評価できた
  • 現行システムとの比較で改善効果を定量化できた

推定所要時間: 90分