ストーリー
条件分岐(Conditional Edge)
基本構造
条件分岐は、ルーティング関数がStateを受け取り、次に実行するノード名を返す仕組みです。
from langgraph.graph import StateGraph, END
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
intent: str # "order_inquiry" | "complaint" | "general"
# ルーティング関数
def route_by_intent(state: AgentState) -> str:
intent = state.get("intent", "general")
if intent == "order_inquiry":
return "handle_order"
elif intent == "complaint":
return "handle_complaint"
else:
return "handle_general"
# グラフに条件分岐を追加
workflow = StateGraph(AgentState)
workflow.add_node("classify_intent", classify_intent)
workflow.add_node("handle_order", handle_order)
workflow.add_node("handle_complaint", handle_complaint)
workflow.add_node("handle_general", handle_general)
workflow.add_node("generate_response", generate_response)
workflow.set_entry_point("classify_intent")
# 条件分岐エッジ
workflow.add_conditional_edges(
"classify_intent", # 分岐元ノード
route_by_intent, # ルーティング関数
{ # ルーティング先のマッピング
"handle_order": "handle_order",
"handle_complaint": "handle_complaint",
"handle_general": "handle_general"
}
)
# 各ハンドラーから最終レスポンスへ
workflow.add_edge("handle_order", "generate_response")
workflow.add_edge("handle_complaint", "generate_response")
workflow.add_edge("handle_general", "generate_response")
workflow.add_edge("generate_response", END)
graph TD
start["__start__"] --> classify_intent
classify_intent -->|order_inquiry| handle_order
classify_intent -->|complaint| handle_complaint
classify_intent -->|general| handle_general
handle_order --> generate_response
handle_complaint --> generate_response
handle_general --> generate_response
generate_response --> end_node["__end__"]
ループ(ReActパターンの実装)
LangGraphでReActループを構築
LLMがツール呼び出しを返す限りループし、テキスト回答を返したらループ終了するフローです。
from langchain_core.messages import ToolMessage
class ReActState(TypedDict):
messages: Annotated[list, add_messages]
def call_llm(state: ReActState) -> ReActState:
"""LLMを呼び出してツール使用か最終回答かを判断"""
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def execute_tools(state: ReActState) -> ReActState:
"""LLMが選んだツールを実行"""
last_message = state["messages"][-1]
results = []
for tool_call in last_message.tool_calls:
result = tool_executor.invoke(tool_call)
results.append(
ToolMessage(content=str(result), tool_call_id=tool_call["id"])
)
return {"messages": results}
def should_continue(state: ReActState) -> str:
"""ツール呼び出しがあればループ、なければ終了"""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "execute_tools"
return END
# グラフ構築
workflow = StateGraph(ReActState)
workflow.add_node("call_llm", call_llm)
workflow.add_node("execute_tools", execute_tools)
workflow.set_entry_point("call_llm")
workflow.add_conditional_edges("call_llm", should_continue)
workflow.add_edge("execute_tools", "call_llm") # ツール実行後はLLMに戻る
app = workflow.compile()
graph TD
start["__start__"] --> call_llm
call_llm -->|tool_calls| execute_tools
call_llm -->|end_turn| end_node["__end__"]
execute_tools --> call_llm
サブグラフ
複雑なワークフローの分割
サブグラフを使って、ワークフローの一部を独立したグラフとして定義・再利用できます。
# サブグラフ: 注文処理ワークフロー
def build_order_subgraph() -> StateGraph:
subgraph = StateGraph(AgentState)
subgraph.add_node("fetch_order", fetch_order)
subgraph.add_node("check_status", check_status)
subgraph.add_node("format_order_info", format_order_info)
subgraph.set_entry_point("fetch_order")
subgraph.add_edge("fetch_order", "check_status")
subgraph.add_edge("check_status", "format_order_info")
subgraph.add_edge("format_order_info", END)
return subgraph.compile()
# メイングラフにサブグラフを組み込む
main_workflow = StateGraph(AgentState)
main_workflow.add_node("classify", classify_intent)
main_workflow.add_node("order_workflow", build_order_subgraph())
main_workflow.add_node("respond", generate_response)
main_workflow.set_entry_point("classify")
main_workflow.add_edge("classify", "order_workflow")
main_workflow.add_edge("order_workflow", "respond")
main_workflow.add_edge("respond", END)
パラレル実行
複数ノードの同時実行
依存関係のない処理を並列で実行できます。
from langgraph.graph import Send
class ParallelState(TypedDict):
messages: Annotated[list, add_messages]
order_data: dict | None
shipping_data: dict | None
customer_data: dict | None
def fan_out(state: ParallelState) -> list[Send]:
"""複数のノードに同時にデータを送る"""
return [
Send("fetch_order", state),
Send("fetch_shipping", state),
Send("fetch_customer", state)
]
workflow = StateGraph(ParallelState)
workflow.add_node("fetch_order", fetch_order)
workflow.add_node("fetch_shipping", fetch_shipping)
workflow.add_node("fetch_customer", fetch_customer)
workflow.add_node("aggregate", aggregate_results)
workflow.set_entry_point("fan_out")
workflow.add_conditional_edges("__start__", fan_out)
workflow.add_edge("fetch_order", "aggregate")
workflow.add_edge("fetch_shipping", "aggregate")
workflow.add_edge("fetch_customer", "aggregate")
workflow.add_edge("aggregate", END)
まとめ
| ポイント | 内容 |
|---|---|
| 条件分岐 | ルーティング関数でStateに応じた動的なフロー制御 |
| ループ | ReActパターンの「LLM→ツール→LLM」ループを実装 |
| サブグラフ | 複雑なフローを独立したグラフに分割・再利用 |
| パラレル実行 | 依存関係のないノードを同時実行 |
チェックリスト
- Conditional Edgeによる条件分岐の実装方法を理解した
- LangGraphでReActパターンのループを構築できる
- サブグラフによるワークフロー分割を理解した
- パラレル実行の仕組みを把握した
次のステップへ
次は「Human-in-the-Loop」を学びます。エージェントの自律的な行動に人間の承認ステップを組み込む方法を理解しましょう。
推定読了時間: 30分