LangGraphワークフロー構築
「Toolは揃った。次はLangGraphでワークフローを組み上げる段階だ。」
田中VPoEがLangGraphのドキュメントを開く。
「LangGraphはLLMアプリケーションのワークフローをグラフとして定義できるフレームワークだ。ノードとエッジで処理の流れを表現し、状態管理を自動化してくれる。」
LangGraphの基本概念
LangGraphの構成要素:
- State: ワークフロー全体で共有するデータ
- Node: 処理を実行する関数
- Edge: ノード間の接続(遷移)
- Conditional Edge: 条件に応じた分岐
- Graph: ノードとエッジの集合
State定義
from typing import TypedDict, Optional, Annotated
from operator import add
from langgraph.graph import StateGraph, END
class ChurnAnalysisState(TypedDict):
# ユーザー入力
messages: Annotated[list, add]
query: str
query_type: Optional[str]
# データ
customer_id: Optional[str]
raw_data: Optional[dict]
features: Optional[list]
feature_names: Optional[list]
# 予測結果
churn_probability: Optional[float]
risk_level: Optional[str]
# 分析結果
shap_explanations: Optional[list]
retention_actions: Optional[list]
visualization_path: Optional[str]
# 応答
response: Optional[str]
error: Optional[str]
ノードの実装
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# ノード1: 意図分類
def classify_intent(state: ChurnAnalysisState) -> dict:
"""ユーザーの意図を分類する"""
query = state["query"]
prompt = f"""以下のクエリの意図を分類してください。
クエリ: {query}
分類:
- individual: 特定の顧客の離反リスクを知りたい
- segment: セグメント全体の分析をしたい
- batch: 複数顧客の一括予測をしたい
- general: 一般的な質問
意図のみを1単語で回答してください。"""
response = llm.invoke(prompt)
intent = response.content.strip().lower()
# 顧客IDの抽出
import re
id_match = re.search(r'\d{4}-[A-Z]{5}', query)
customer_id = id_match.group() if id_match else None
return {
"query_type": intent,
"customer_id": customer_id,
}
# ノード2: データ取得
def fetch_data(state: ChurnAnalysisState) -> dict:
"""顧客データを取得する"""
customer_id = state.get("customer_id")
if not customer_id:
return {"error": "顧客IDが指定されていません"}
result = get_customer_data.invoke(customer_id)
if "error" in result:
return {"error": result["error"]}
return {"raw_data": result}
# ノード3: 前処理
def preprocess(state: ChurnAnalysisState) -> dict:
"""データを前処理する"""
result = preprocess_customer.invoke(state["raw_data"])
if not result.get("success"):
return {"error": result.get("error", "前処理に失敗しました")}
return {
"features": result["features"],
"feature_names": result["feature_names"],
}
# ノード4: 予測
def predict(state: ChurnAnalysisState) -> dict:
"""離反確率を予測する"""
result = predict_churn.invoke(state["features"])
if not result.get("success"):
return {"error": result.get("error", "予測に失敗しました")}
return {
"churn_probability": result["churn_probability"],
"risk_level": result["risk_level"],
}
# ノード5: SHAP分析
def explain(state: ChurnAnalysisState) -> dict:
"""SHAP分析で要因を説明する"""
result = explain_churn_prediction.invoke({
"features": state["features"],
"feature_names": state["feature_names"],
})
if not result.get("success"):
return {"error": result.get("error", "SHAP分析に失敗しました")}
return {"shap_explanations": result["explanations"]}
# ノード6: 施策提案
def recommend(state: ChurnAnalysisState) -> dict:
"""リテンション施策を提案する"""
result = suggest_retention_actions.invoke({
"shap_explanations": state["shap_explanations"],
"customer_data": state["raw_data"],
})
return {"retention_actions": result["actions"]}
# ノード7: 応答生成
def respond(state: ChurnAnalysisState) -> dict:
"""自然言語で回答を生成する"""
context = {
"customer_id": state.get("customer_id"),
"probability": state.get("churn_probability"),
"risk_level": state.get("risk_level"),
"explanations": state.get("shap_explanations"),
"actions": state.get("retention_actions"),
}
prompt = f"""以下の分析結果を基に、ビジネスユーザー向けに
わかりやすい日本語でレポートを作成してください。
分析結果: {context}
以下の構成で回答してください:
1. リスク判定(離反確率とリスクレベル)
2. 主な離反要因(上位3つ)
3. 推奨アクション
"""
response = llm.invoke(prompt)
return {"response": response.content}
# ノード8: エラーハンドリング
def handle_error(state: ChurnAnalysisState) -> dict:
"""エラーを処理する"""
error = state.get("error", "不明なエラーが発生しました")
return {"response": f"エラーが発生しました: {error}"}
グラフの構築
from langgraph.graph import StateGraph, END
# グラフの作成
workflow = StateGraph(ChurnAnalysisState)
# ノードの追加
workflow.add_node("classify_intent", classify_intent)
workflow.add_node("fetch_data", fetch_data)
workflow.add_node("preprocess", preprocess)
workflow.add_node("predict", predict)
workflow.add_node("explain", explain)
workflow.add_node("recommend", recommend)
workflow.add_node("respond", respond)
workflow.add_node("handle_error", handle_error)
# エッジの追加
workflow.set_entry_point("classify_intent")
# 条件分岐: 意図による分岐
def route_intent(state):
if state.get("error"):
return "handle_error"
if state["query_type"] == "individual":
return "fetch_data"
return "respond" # 簡易版: individual以外は直接応答
workflow.add_conditional_edges(
"classify_intent",
route_intent,
{"fetch_data": "fetch_data", "respond": "respond", "handle_error": "handle_error"}
)
# シーケンシャルエッジ
workflow.add_edge("fetch_data", "preprocess")
workflow.add_edge("preprocess", "predict")
# 条件分岐: リスクレベルによる分岐
def route_risk(state):
if state.get("error"):
return "handle_error"
if state.get("risk_level") in ["HIGH", "MEDIUM"]:
return "explain"
return "respond"
workflow.add_conditional_edges(
"predict",
route_risk,
{"explain": "explain", "respond": "respond", "handle_error": "handle_error"}
)
workflow.add_edge("explain", "recommend")
workflow.add_edge("recommend", "respond")
workflow.add_edge("respond", END)
workflow.add_edge("handle_error", END)
# コンパイル
app = workflow.compile()
実行テスト
# テスト実行
result = app.invoke({
"messages": [],
"query": "顧客ID 7590-VHVEG の離反リスクを教えてください",
})
print(result["response"])
まとめ
| 項目 | ポイント |
|---|---|
| State | TypedDictで型安全に管理 |
| ノード数 | 8つ(分類/取得/前処理/予測/説明/提案/応答/エラー) |
| 条件分岐 | 意図分類とリスクレベルの2箇所 |
| エラー処理 | 各ノードでerrorをStateに設定 → handle_errorに遷移 |
| 実行 | workflow.compile() → app.invoke() |
チェックリスト
- LangGraphのState/Node/Edgeを説明できる
- StateGraphでワークフローを構築できる
- 条件分岐(add_conditional_edges)を実装できる
- エラーハンドリングを組み込める
- ワークフローをコンパイルして実行できる
次のステップへ
LangGraphワークフローが動作した。次はSHAP分析の統合を深め、予測結果の解釈をより充実させよう。
推定読了時間: 30分