LESSON

演習:不正監視AIエージェントを実装しよう

「いよいよ全てを組み合わせて、動く不正監視エージェントを作る時だ。」

田中VPoEが期待を込めて言う。

「取引を入力したら、スコアリングして、リスク判定して、必要に応じて調査レポートまで出す。そこまで一気通貫で動くものを作ってくれ。」

ミッション概要

LangGraphを使い、不正取引監視AIエージェントを設計・実装する。スコアリングからアラート発行、調査支援までを統合した実用的なエージェントを構築する。


Mission 1: Tool群の実装(30分)

エージェントが使用するTool群を実装する。

タスク:

  1. Credit Card Fraud Detectionの学習済みモデルをロードするToolを実装する
  2. 取引データの特徴量を計算するToolを実装する
  3. モデル推論でスコアリングするToolを実装する
  4. 各Toolの単体テストを作成し、動作確認する
解答例
import json
import numpy as np
import pandas as pd
import joblib
from langchain_core.tools import tool

# モデルのロード
lgb_model = joblib.load('models/lgb_fraud_model.pkl')
scaler = joblib.load('models/scaler.pkl')

# シミュレーション用データ
test_df = pd.read_csv('creditcard.csv').sample(100, random_state=42)

@tool
def get_transaction_data(transaction_index: int) -> str:
    """テストデータから指定インデックスの取引データを取得する。

    Args:
        transaction_index: 取引のインデックス(0〜99)
    """
    if transaction_index < 0 or transaction_index >= len(test_df):
        return json.dumps({'error': 'Invalid index'})

    row = test_df.iloc[transaction_index]
    txn = {
        'index': transaction_index,
        'amount': float(row['Amount']),
        'time': float(row['Time']),
        'class': int(row['Class']),
        'features': {f'V{i}': float(row[f'V{i}']) for i in range(1, 29)},
    }
    return json.dumps(txn, ensure_ascii=False)

@tool
def compute_features(transaction_json: str) -> str:
    """取引データから特徴量ベクトルを計算する。

    Args:
        transaction_json: JSON形式の取引データ
    """
    txn = json.loads(transaction_json)

    feature_dict = txn['features'].copy()
    feature_dict['Amount'] = np.log1p(txn['amount'])
    feature_dict['Time_sin'] = float(np.sin(2 * np.pi * (txn['time'] / 3600 % 24) / 24))
    feature_dict['Time_cos'] = float(np.cos(2 * np.pi * (txn['time'] / 3600 % 24) / 24))

    return json.dumps({
        'transaction_index': txn['index'],
        'features': feature_dict,
        'feature_count': len(feature_dict),
    }, ensure_ascii=False)

@tool
def predict_fraud_score(features_json: str) -> str:
    """特徴量からモデルで不正スコアを予測する。

    Args:
        features_json: JSON形式の特徴量データ
    """
    data = json.loads(features_json)
    features = data['features']

    # モデル入力形式に変換
    feature_names = [f'V{i}' for i in range(1, 29)] + ['Amount', 'Time_sin', 'Time_cos']
    X = np.array([[features.get(f, 0) for f in feature_names]])
    X_scaled = scaler.transform(X)

    # 予測
    prob = float(lgb_model.predict_proba(X_scaled)[:, 1][0])

    # リスクレベル判定
    if prob >= 0.80:
        risk = 'CRITICAL'
        action = 'BLOCK'
    elif prob >= 0.30:
        risk = 'HIGH'
        action = 'HOLD_FOR_REVIEW'
    elif prob >= 0.10:
        risk = 'MEDIUM'
        action = 'APPROVE_WITH_MONITORING'
    else:
        risk = 'LOW'
        action = 'APPROVE'

    return json.dumps({
        'transaction_index': data['transaction_index'],
        'fraud_score': round(prob, 6),
        'risk_level': risk,
        'recommended_action': action,
    }, ensure_ascii=False)

# 単体テスト
def test_tools():
    txn = get_transaction_data.invoke({'transaction_index': 0})
    features = compute_features.invoke({'transaction_json': txn})
    result = predict_fraud_score.invoke({'features_json': features})
    print(json.loads(result))
    assert 'fraud_score' in json.loads(result)
    print("All tool tests passed!")

test_tools()

Mission 2: LangGraphワークフロー構築(40分)

LangGraphでエージェントのワークフローを構築する。

タスク:

  1. StateGraphで状態を定義する
  2. スコアリング → リスク判定 → アクション実行のフローを構築する
  3. HIGH/CRITICAL の場合は調査レポート生成ノードに遷移する条件分岐を追加する
  4. テスト取引(正常5件、不正5件)でワークフロー全体の動作を確認する
解答例
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal, Annotated
import operator

class MonitorState(TypedDict):
    transaction_index: int
    transaction_data: str
    features_data: str
    score_result: str
    risk_level: str
    action_log: Annotated[list, operator.add]

def fetch_node(state: MonitorState) -> MonitorState:
    txn = get_transaction_data.invoke(
        {'transaction_index': state['transaction_index']}
    )
    return {**state, 'transaction_data': txn}

def feature_node(state: MonitorState) -> MonitorState:
    features = compute_features.invoke(
        {'transaction_json': state['transaction_data']}
    )
    return {**state, 'features_data': features}

def score_node(state: MonitorState) -> MonitorState:
    result = predict_fraud_score.invoke(
        {'features_json': state['features_data']}
    )
    result_dict = json.loads(result)
    return {
        **state,
        'score_result': result,
        'risk_level': result_dict['risk_level'],
    }

def action_node(state: MonitorState) -> MonitorState:
    result = json.loads(state['score_result'])
    log_entry = (
        f"[{result['risk_level']}] Index={result['transaction_index']} "
        f"Score={result['fraud_score']:.4f} Action={result['recommended_action']}"
    )
    return {**state, 'action_log': [log_entry]}

def investigate_node(state: MonitorState) -> MonitorState:
    result = json.loads(state['score_result'])
    report = (
        f"=== 調査レポート ===\n"
        f"取引Index: {result['transaction_index']}\n"
        f"不正スコア: {result['fraud_score']:.4f}\n"
        f"リスクレベル: {result['risk_level']}\n"
        f"推奨: 詳細調査を実施してください\n"
    )
    return {**state, 'action_log': [report]}

def route_by_risk(state: MonitorState) -> Literal['investigate', 'end']:
    if state['risk_level'] in ('CRITICAL', 'HIGH'):
        return 'investigate'
    return 'end'

# ワークフロー構築
workflow = StateGraph(MonitorState)
workflow.add_node('fetch', fetch_node)
workflow.add_node('features', feature_node)
workflow.add_node('score', score_node)
workflow.add_node('action', action_node)
workflow.add_node('investigate', investigate_node)

workflow.set_entry_point('fetch')
workflow.add_edge('fetch', 'features')
workflow.add_edge('features', 'score')
workflow.add_edge('score', 'action')
workflow.add_conditional_edges('action', route_by_risk,
                                {'investigate': 'investigate', 'end': END})
workflow.add_edge('investigate', END)

app = workflow.compile()

# テスト実行
for idx in [0, 10, 20, 30, 40]:
    result = app.invoke({
        'transaction_index': idx,
        'action_log': [],
    })
    for log in result['action_log']:
        print(log)

Mission 3: 対話型エージェントの実装(30分)

自然言語で対話できる不正監視エージェントを構築する。

タスク:

  1. LLMを使ったReActエージェントとして実装する
  2. 「取引Xを調査して」「ダッシュボードを見せて」等の自然言語入力に対応する
  3. 複数取引の一括スキャン機能を実装する
  4. 対話例を3つ以上作成して動作確認する
解答例
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

@tool
def batch_scan(start_index: int, count: int) -> str:
    """複数の取引を一括スキャンし、リスクサマリーを返す。

    Args:
        start_index: 開始インデックス
        count: スキャンする取引数
    """
    results = []
    for i in range(start_index, min(start_index + count, len(test_df))):
        state = app.invoke({
            'transaction_index': i,
            'action_log': [],
        })
        score_data = json.loads(state['score_result'])
        results.append(score_data)

    # サマリー
    summary = {
        'scanned': len(results),
        'critical': len([r for r in results if r['risk_level'] == 'CRITICAL']),
        'high': len([r for r in results if r['risk_level'] == 'HIGH']),
        'medium': len([r for r in results if r['risk_level'] == 'MEDIUM']),
        'low': len([r for r in results if r['risk_level'] == 'LOW']),
        'top_risks': sorted(results, key=lambda x: x['fraud_score'], reverse=True)[:5],
    }
    return json.dumps(summary, ensure_ascii=False, indent=2)

all_tools = [
    get_transaction_data,
    compute_features,
    predict_fraud_score,
    batch_scan,
]

SYSTEM = """あなたはNetShop社の不正取引監視AIエージェントです。
Toolを活用して取引の調査、スコアリング、レポート生成を行ってください。
結果は日本語で分かりやすく説明してください。"""

llm = ChatOpenAI(model="gpt-4o", temperature=0)
react_agent = create_react_agent(llm, all_tools, prompt=SYSTEM)

# 対話例1: 個別取引の調査
response = react_agent.invoke(
    {'messages': [{'role': 'user', 'content': '取引インデックス5を調査してください'}]}
)

# 対話例2: 一括スキャン
response = react_agent.invoke(
    {'messages': [{'role': 'user', 'content': '最初の20件の取引をスキャンして、危険なものを教えてください'}]}
)

# 対話例3: 分析依頼
response = react_agent.invoke(
    {'messages': [{'role': 'user', 'content': '不正スコアが0.5以上の取引を全てリストアップし、共通パターンを分析してください'}]}
)

Mission 4: 統合テスト(20分)

全体の動作を検証し、性能を評価する。

タスク:

  1. テストデータ100件でエージェントを実行する
  2. 正解ラベルとの比較で、検知率と偽陽性率を計算する
  3. 処理時間を計測し、リアルタイム性の課題を特定する
  4. 改善すべき点を3つ以上挙げて、今後の計画を述べる

達成度チェック

  • データ取得・特徴量計算・推論の3つのToolを実装できた
  • LangGraphでスコアリングワークフローを構築できた
  • リスクレベルに応じた条件分岐を実装できた
  • 対話型エージェントとして自然言語入力に対応できた
  • 一括スキャン機能を実装できた
  • テストデータで検知性能を評価できた

推定所要時間: 120分