LESSON

LangGraphワークフロー構築

「Toolは揃った。次はLangGraphでワークフローを組み上げる段階だ。」

田中VPoEがLangGraphのドキュメントを開く。

「LangGraphはLLMアプリケーションのワークフローをグラフとして定義できるフレームワークだ。ノードとエッジで処理の流れを表現し、状態管理を自動化してくれる。」

LangGraphの基本概念

LangGraphの構成要素:
- State:     ワークフロー全体で共有するデータ
- Node:      処理を実行する関数
- Edge:      ノード間の接続(遷移)
- Conditional Edge: 条件に応じた分岐
- Graph:     ノードとエッジの集合

State定義

from typing import TypedDict, Optional, Annotated
from operator import add
from langgraph.graph import StateGraph, END

class ChurnAnalysisState(TypedDict):
    # ユーザー入力
    messages: Annotated[list, add]
    query: str
    query_type: Optional[str]

    # データ
    customer_id: Optional[str]
    raw_data: Optional[dict]
    features: Optional[list]
    feature_names: Optional[list]

    # 予測結果
    churn_probability: Optional[float]
    risk_level: Optional[str]

    # 分析結果
    shap_explanations: Optional[list]
    retention_actions: Optional[list]
    visualization_path: Optional[str]

    # 応答
    response: Optional[str]
    error: Optional[str]

ノードの実装

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# ノード1: 意図分類
def classify_intent(state: ChurnAnalysisState) -> dict:
    """ユーザーの意図を分類する"""
    query = state["query"]

    prompt = f"""以下のクエリの意図を分類してください。
    クエリ: {query}

    分類:
    - individual: 特定の顧客の離反リスクを知りたい
    - segment: セグメント全体の分析をしたい
    - batch: 複数顧客の一括予測をしたい
    - general: 一般的な質問

    意図のみを1単語で回答してください。"""

    response = llm.invoke(prompt)
    intent = response.content.strip().lower()

    # 顧客IDの抽出
    import re
    id_match = re.search(r'\d{4}-[A-Z]{5}', query)
    customer_id = id_match.group() if id_match else None

    return {
        "query_type": intent,
        "customer_id": customer_id,
    }

# ノード2: データ取得
def fetch_data(state: ChurnAnalysisState) -> dict:
    """顧客データを取得する"""
    customer_id = state.get("customer_id")
    if not customer_id:
        return {"error": "顧客IDが指定されていません"}

    result = get_customer_data.invoke(customer_id)
    if "error" in result:
        return {"error": result["error"]}
    return {"raw_data": result}

# ノード3: 前処理
def preprocess(state: ChurnAnalysisState) -> dict:
    """データを前処理する"""
    result = preprocess_customer.invoke(state["raw_data"])
    if not result.get("success"):
        return {"error": result.get("error", "前処理に失敗しました")}
    return {
        "features": result["features"],
        "feature_names": result["feature_names"],
    }

# ノード4: 予測
def predict(state: ChurnAnalysisState) -> dict:
    """離反確率を予測する"""
    result = predict_churn.invoke(state["features"])
    if not result.get("success"):
        return {"error": result.get("error", "予測に失敗しました")}
    return {
        "churn_probability": result["churn_probability"],
        "risk_level": result["risk_level"],
    }

# ノード5: SHAP分析
def explain(state: ChurnAnalysisState) -> dict:
    """SHAP分析で要因を説明する"""
    result = explain_churn_prediction.invoke({
        "features": state["features"],
        "feature_names": state["feature_names"],
    })
    if not result.get("success"):
        return {"error": result.get("error", "SHAP分析に失敗しました")}
    return {"shap_explanations": result["explanations"]}

# ノード6: 施策提案
def recommend(state: ChurnAnalysisState) -> dict:
    """リテンション施策を提案する"""
    result = suggest_retention_actions.invoke({
        "shap_explanations": state["shap_explanations"],
        "customer_data": state["raw_data"],
    })
    return {"retention_actions": result["actions"]}

# ノード7: 応答生成
def respond(state: ChurnAnalysisState) -> dict:
    """自然言語で回答を生成する"""
    context = {
        "customer_id": state.get("customer_id"),
        "probability": state.get("churn_probability"),
        "risk_level": state.get("risk_level"),
        "explanations": state.get("shap_explanations"),
        "actions": state.get("retention_actions"),
    }

    prompt = f"""以下の分析結果を基に、ビジネスユーザー向けに
わかりやすい日本語でレポートを作成してください。

分析結果: {context}

以下の構成で回答してください:
1. リスク判定(離反確率とリスクレベル)
2. 主な離反要因(上位3つ)
3. 推奨アクション
"""
    response = llm.invoke(prompt)
    return {"response": response.content}

# ノード8: エラーハンドリング
def handle_error(state: ChurnAnalysisState) -> dict:
    """エラーを処理する"""
    error = state.get("error", "不明なエラーが発生しました")
    return {"response": f"エラーが発生しました: {error}"}

グラフの構築

from langgraph.graph import StateGraph, END

# グラフの作成
workflow = StateGraph(ChurnAnalysisState)

# ノードの追加
workflow.add_node("classify_intent", classify_intent)
workflow.add_node("fetch_data", fetch_data)
workflow.add_node("preprocess", preprocess)
workflow.add_node("predict", predict)
workflow.add_node("explain", explain)
workflow.add_node("recommend", recommend)
workflow.add_node("respond", respond)
workflow.add_node("handle_error", handle_error)

# エッジの追加
workflow.set_entry_point("classify_intent")

# 条件分岐: 意図による分岐
def route_intent(state):
    if state.get("error"):
        return "handle_error"
    if state["query_type"] == "individual":
        return "fetch_data"
    return "respond"  # 簡易版: individual以外は直接応答

workflow.add_conditional_edges(
    "classify_intent",
    route_intent,
    {"fetch_data": "fetch_data", "respond": "respond", "handle_error": "handle_error"}
)

# シーケンシャルエッジ
workflow.add_edge("fetch_data", "preprocess")
workflow.add_edge("preprocess", "predict")

# 条件分岐: リスクレベルによる分岐
def route_risk(state):
    if state.get("error"):
        return "handle_error"
    if state.get("risk_level") in ["HIGH", "MEDIUM"]:
        return "explain"
    return "respond"

workflow.add_conditional_edges(
    "predict",
    route_risk,
    {"explain": "explain", "respond": "respond", "handle_error": "handle_error"}
)

workflow.add_edge("explain", "recommend")
workflow.add_edge("recommend", "respond")
workflow.add_edge("respond", END)
workflow.add_edge("handle_error", END)

# コンパイル
app = workflow.compile()

実行テスト

# テスト実行
result = app.invoke({
    "messages": [],
    "query": "顧客ID 7590-VHVEG の離反リスクを教えてください",
})

print(result["response"])

まとめ

項目ポイント
StateTypedDictで型安全に管理
ノード数8つ(分類/取得/前処理/予測/説明/提案/応答/エラー)
条件分岐意図分類とリスクレベルの2箇所
エラー処理各ノードでerrorをStateに設定 → handle_errorに遷移
実行workflow.compile() → app.invoke()

チェックリスト

  • LangGraphのState/Node/Edgeを説明できる
  • StateGraphでワークフローを構築できる
  • 条件分岐(add_conditional_edges)を実装できる
  • エラーハンドリングを組み込める
  • ワークフローをコンパイルして実行できる

次のステップへ

LangGraphワークフローが動作した。次はSHAP分析の統合を深め、予測結果の解釈をより充実させよう。

推定読了時間: 30分