演習:不正監視AIエージェントを実装しよう
「いよいよ全てを組み合わせて、動く不正監視エージェントを作る時だ。」
田中VPoEが期待を込めて言う。
「取引を入力したら、スコアリングして、リスク判定して、必要に応じて調査レポートまで出す。そこまで一気通貫で動くものを作ってくれ。」
ミッション概要
LangGraphを使い、不正取引監視AIエージェントを設計・実装する。スコアリングからアラート発行、調査支援までを統合した実用的なエージェントを構築する。
Mission 1: Tool群の実装(30分)
エージェントが使用するTool群を実装する。
タスク:
- Credit Card Fraud Detectionの学習済みモデルをロードするToolを実装する
- 取引データの特徴量を計算するToolを実装する
- モデル推論でスコアリングするToolを実装する
- 各Toolの単体テストを作成し、動作確認する
解答例
import json
import numpy as np
import pandas as pd
import joblib
from langchain_core.tools import tool
# モデルのロード
lgb_model = joblib.load('models/lgb_fraud_model.pkl')
scaler = joblib.load('models/scaler.pkl')
# シミュレーション用データ
test_df = pd.read_csv('creditcard.csv').sample(100, random_state=42)
@tool
def get_transaction_data(transaction_index: int) -> str:
"""テストデータから指定インデックスの取引データを取得する。
Args:
transaction_index: 取引のインデックス(0〜99)
"""
if transaction_index < 0 or transaction_index >= len(test_df):
return json.dumps({'error': 'Invalid index'})
row = test_df.iloc[transaction_index]
txn = {
'index': transaction_index,
'amount': float(row['Amount']),
'time': float(row['Time']),
'class': int(row['Class']),
'features': {f'V{i}': float(row[f'V{i}']) for i in range(1, 29)},
}
return json.dumps(txn, ensure_ascii=False)
@tool
def compute_features(transaction_json: str) -> str:
"""取引データから特徴量ベクトルを計算する。
Args:
transaction_json: JSON形式の取引データ
"""
txn = json.loads(transaction_json)
feature_dict = txn['features'].copy()
feature_dict['Amount'] = np.log1p(txn['amount'])
feature_dict['Time_sin'] = float(np.sin(2 * np.pi * (txn['time'] / 3600 % 24) / 24))
feature_dict['Time_cos'] = float(np.cos(2 * np.pi * (txn['time'] / 3600 % 24) / 24))
return json.dumps({
'transaction_index': txn['index'],
'features': feature_dict,
'feature_count': len(feature_dict),
}, ensure_ascii=False)
@tool
def predict_fraud_score(features_json: str) -> str:
"""特徴量からモデルで不正スコアを予測する。
Args:
features_json: JSON形式の特徴量データ
"""
data = json.loads(features_json)
features = data['features']
# モデル入力形式に変換
feature_names = [f'V{i}' for i in range(1, 29)] + ['Amount', 'Time_sin', 'Time_cos']
X = np.array([[features.get(f, 0) for f in feature_names]])
X_scaled = scaler.transform(X)
# 予測
prob = float(lgb_model.predict_proba(X_scaled)[:, 1][0])
# リスクレベル判定
if prob >= 0.80:
risk = 'CRITICAL'
action = 'BLOCK'
elif prob >= 0.30:
risk = 'HIGH'
action = 'HOLD_FOR_REVIEW'
elif prob >= 0.10:
risk = 'MEDIUM'
action = 'APPROVE_WITH_MONITORING'
else:
risk = 'LOW'
action = 'APPROVE'
return json.dumps({
'transaction_index': data['transaction_index'],
'fraud_score': round(prob, 6),
'risk_level': risk,
'recommended_action': action,
}, ensure_ascii=False)
# 単体テスト
def test_tools():
txn = get_transaction_data.invoke({'transaction_index': 0})
features = compute_features.invoke({'transaction_json': txn})
result = predict_fraud_score.invoke({'features_json': features})
print(json.loads(result))
assert 'fraud_score' in json.loads(result)
print("All tool tests passed!")
test_tools()
Mission 2: LangGraphワークフロー構築(40分)
LangGraphでエージェントのワークフローを構築する。
タスク:
- StateGraphで状態を定義する
- スコアリング → リスク判定 → アクション実行のフローを構築する
- HIGH/CRITICAL の場合は調査レポート生成ノードに遷移する条件分岐を追加する
- テスト取引(正常5件、不正5件)でワークフロー全体の動作を確認する
解答例
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal, Annotated
import operator
class MonitorState(TypedDict):
transaction_index: int
transaction_data: str
features_data: str
score_result: str
risk_level: str
action_log: Annotated[list, operator.add]
def fetch_node(state: MonitorState) -> MonitorState:
txn = get_transaction_data.invoke(
{'transaction_index': state['transaction_index']}
)
return {**state, 'transaction_data': txn}
def feature_node(state: MonitorState) -> MonitorState:
features = compute_features.invoke(
{'transaction_json': state['transaction_data']}
)
return {**state, 'features_data': features}
def score_node(state: MonitorState) -> MonitorState:
result = predict_fraud_score.invoke(
{'features_json': state['features_data']}
)
result_dict = json.loads(result)
return {
**state,
'score_result': result,
'risk_level': result_dict['risk_level'],
}
def action_node(state: MonitorState) -> MonitorState:
result = json.loads(state['score_result'])
log_entry = (
f"[{result['risk_level']}] Index={result['transaction_index']} "
f"Score={result['fraud_score']:.4f} Action={result['recommended_action']}"
)
return {**state, 'action_log': [log_entry]}
def investigate_node(state: MonitorState) -> MonitorState:
result = json.loads(state['score_result'])
report = (
f"=== 調査レポート ===\n"
f"取引Index: {result['transaction_index']}\n"
f"不正スコア: {result['fraud_score']:.4f}\n"
f"リスクレベル: {result['risk_level']}\n"
f"推奨: 詳細調査を実施してください\n"
)
return {**state, 'action_log': [report]}
def route_by_risk(state: MonitorState) -> Literal['investigate', 'end']:
if state['risk_level'] in ('CRITICAL', 'HIGH'):
return 'investigate'
return 'end'
# ワークフロー構築
workflow = StateGraph(MonitorState)
workflow.add_node('fetch', fetch_node)
workflow.add_node('features', feature_node)
workflow.add_node('score', score_node)
workflow.add_node('action', action_node)
workflow.add_node('investigate', investigate_node)
workflow.set_entry_point('fetch')
workflow.add_edge('fetch', 'features')
workflow.add_edge('features', 'score')
workflow.add_edge('score', 'action')
workflow.add_conditional_edges('action', route_by_risk,
{'investigate': 'investigate', 'end': END})
workflow.add_edge('investigate', END)
app = workflow.compile()
# テスト実行
for idx in [0, 10, 20, 30, 40]:
result = app.invoke({
'transaction_index': idx,
'action_log': [],
})
for log in result['action_log']:
print(log)
Mission 3: 対話型エージェントの実装(30分)
自然言語で対話できる不正監視エージェントを構築する。
タスク:
- LLMを使ったReActエージェントとして実装する
- 「取引Xを調査して」「ダッシュボードを見せて」等の自然言語入力に対応する
- 複数取引の一括スキャン機能を実装する
- 対話例を3つ以上作成して動作確認する
解答例
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
@tool
def batch_scan(start_index: int, count: int) -> str:
"""複数の取引を一括スキャンし、リスクサマリーを返す。
Args:
start_index: 開始インデックス
count: スキャンする取引数
"""
results = []
for i in range(start_index, min(start_index + count, len(test_df))):
state = app.invoke({
'transaction_index': i,
'action_log': [],
})
score_data = json.loads(state['score_result'])
results.append(score_data)
# サマリー
summary = {
'scanned': len(results),
'critical': len([r for r in results if r['risk_level'] == 'CRITICAL']),
'high': len([r for r in results if r['risk_level'] == 'HIGH']),
'medium': len([r for r in results if r['risk_level'] == 'MEDIUM']),
'low': len([r for r in results if r['risk_level'] == 'LOW']),
'top_risks': sorted(results, key=lambda x: x['fraud_score'], reverse=True)[:5],
}
return json.dumps(summary, ensure_ascii=False, indent=2)
all_tools = [
get_transaction_data,
compute_features,
predict_fraud_score,
batch_scan,
]
SYSTEM = """あなたはNetShop社の不正取引監視AIエージェントです。
Toolを活用して取引の調査、スコアリング、レポート生成を行ってください。
結果は日本語で分かりやすく説明してください。"""
llm = ChatOpenAI(model="gpt-4o", temperature=0)
react_agent = create_react_agent(llm, all_tools, prompt=SYSTEM)
# 対話例1: 個別取引の調査
response = react_agent.invoke(
{'messages': [{'role': 'user', 'content': '取引インデックス5を調査してください'}]}
)
# 対話例2: 一括スキャン
response = react_agent.invoke(
{'messages': [{'role': 'user', 'content': '最初の20件の取引をスキャンして、危険なものを教えてください'}]}
)
# 対話例3: 分析依頼
response = react_agent.invoke(
{'messages': [{'role': 'user', 'content': '不正スコアが0.5以上の取引を全てリストアップし、共通パターンを分析してください'}]}
)
Mission 4: 統合テスト(20分)
全体の動作を検証し、性能を評価する。
タスク:
- テストデータ100件でエージェントを実行する
- 正解ラベルとの比較で、検知率と偽陽性率を計算する
- 処理時間を計測し、リアルタイム性の課題を特定する
- 改善すべき点を3つ以上挙げて、今後の計画を述べる
達成度チェック
- データ取得・特徴量計算・推論の3つのToolを実装できた
- LangGraphでスコアリングワークフローを構築できた
- リスクレベルに応じた条件分岐を実装できた
- 対話型エージェントとして自然言語入力に対応できた
- 一括スキャン機能を実装できた
- テストデータで検知性能を評価できた
推定所要時間: 120分