ストーリー
通信パターンの概要
| パターン | 方式 | 特徴 | 適するケース |
|---|---|---|---|
| メッセージパッシング | エージェント間で直接メッセージを送受信 | シンプル、直接的 | 1対1のタスク委任 |
| 共有状態 | 共通のStateを読み書き | 全エージェントが同じ情報を参照 | 協調作業、データ共有 |
| イベントドリブン | イベントを発行/購読 | 疎結合、非同期 | リアクティブな処理連携 |
パターン1: メッセージパッシング
基本構造
エージェント同士が直接メッセージを送り合うパターンです。
[エージェントA] ──メッセージ──→ [エージェントB]
処理
[エージェントA] ←──レスポンス── [エージェントB]
LangGraphでの実装(Handoff)
LangGraphのSwarmパターンでは、エージェントが処理を別のエージェントに「引き渡す(Handoff)」ことでメッセージパッシングを実現します。
from langchain_core.tools import tool
# ハンドオフツール: 他のエージェントに処理を委任
@tool
def transfer_to_shipping_agent():
"""配送に関する問い合わせの場合、配送管理エージェントに処理を引き渡します"""
return "配送管理エージェントに転送します"
@tool
def transfer_to_order_agent():
"""注文に関する問い合わせの場合、注文管理エージェントに処理を引き渡します"""
return "注文管理エージェントに転送します"
# カスタマーサポートエージェント
support_agent = create_react_agent(
llm,
tools=[transfer_to_shipping_agent, transfer_to_order_agent, search_faq],
prompt="あなたはカスタマーサポートエージェントです。配送の問題は配送エージェントに、注文の問題は注文エージェントに転送してください。"
)
# 配送管理エージェント
shipping_agent = create_react_agent(
llm,
tools=[track_shipment, arrange_redelivery, transfer_to_order_agent],
prompt="あなたは配送管理エージェントです。配送の追跡と再配達手配を担当します。"
)
利点と課題
| 利点 | 課題 |
|---|---|
| 直感的で理解しやすい | エージェント間の依存関係が密結合になりやすい |
| 即座にレスポンスが得られる | エージェント数が増えると通信経路が複雑化 |
| デバッグが容易 | 1つのエージェントの障害が連鎖する |
パターン2: 共有状態
基本構造
全エージェントが共通のStateを読み書きするパターンです。LangGraphの基本パターンはこれに該当します。
┌─────────────────────────────────┐
│ 共有State │
│ messages: [...] │
│ order_data: {...} │
│ shipping_data: {...} │
│ customer_data: {...} │
└─────────┬───────────────────────┘
┌────┼────┐
↕ ↕ ↕
注文 配送 顧客
Agent Agent Agent
LangGraphでの実装
class SharedState(TypedDict):
messages: Annotated[list, add_messages]
order_data: dict | None
shipping_data: dict | None
customer_data: dict | None
current_agent: str
def order_agent(state: SharedState) -> SharedState:
"""注文エージェント: order_dataを更新"""
order_data = search_orders(state["messages"])
return {"order_data": order_data, "current_agent": "order"}
def shipping_agent(state: SharedState) -> SharedState:
"""配送エージェント: shipping_dataを更新、order_dataを参照"""
order = state.get("order_data", {})
tracking = track_shipment(order.get("tracking_number"))
return {"shipping_data": tracking, "current_agent": "shipping"}
def customer_agent(state: SharedState) -> SharedState:
"""顧客エージェント: 全データを参照して回答生成"""
context = {
"order": state.get("order_data"),
"shipping": state.get("shipping_data")
}
response = generate_customer_response(context, state["messages"])
return {"messages": [response], "current_agent": "customer"}
利点と課題
| 利点 | 課題 |
|---|---|
| 全エージェントが同じ情報を参照可能 | State肥大化のリスク |
| データの一貫性を維持しやすい | 同時書き込みの競合 |
| LangGraphとの親和性が高い | エージェント間の暗黙的な依存 |
パターン3: イベントドリブン
基本構造
エージェントがイベントを発行(Publish)し、関心のあるエージェントがイベントを購読(Subscribe)するパターンです。
[注文Agent] ──publish──→ [Event Bus] ──subscribe──→ [配送Agent]
│
└──subscribe──→ [通知Agent]
│
└──subscribe──→ [分析Agent]
実装パターン
// イベントバスの実装
class AgentEventBus {
private handlers: Map<string, Array<(event: AgentEvent) => Promise<void>>> = new Map();
subscribe(eventType: string, handler: (event: AgentEvent) => Promise<void>): void {
const handlers = this.handlers.get(eventType) ?? [];
handlers.push(handler);
this.handlers.set(eventType, handlers);
}
async publish(event: AgentEvent): Promise<void> {
const handlers = this.handlers.get(event.type) ?? [];
await Promise.all(handlers.map(h => h(event)));
}
}
interface AgentEvent {
type: string;
source: string; // 発行元エージェント
data: Record<string, unknown>;
timestamp: Date;
}
// 使用例
const eventBus = new AgentEventBus();
// 配送エージェントが「注文キャンセル」イベントを購読
eventBus.subscribe("order.cancelled", async (event) => {
await cancelShipment(event.data.orderId as string);
});
// 通知エージェントが「注文キャンセル」イベントを購読
eventBus.subscribe("order.cancelled", async (event) => {
await sendCancellationEmail(event.data.customerId as string);
});
// 注文エージェントがキャンセルイベントを発行
await eventBus.publish({
type: "order.cancelled",
source: "order-agent",
data: { orderId: "ORD-12345", customerId: "CUST-001", reason: "customer_request" },
timestamp: new Date()
});
利点と課題
| 利点 | 課題 |
|---|---|
| エージェント間の疎結合 | イベントの順序保証が難しい |
| 新しいエージェントの追加が容易 | デバッグが複雑 |
| 非同期処理に適している | イベントの消失リスク |
| スケーラビリティが高い | 最終的な一貫性の管理 |
パターンの組み合わせ
実際のシステムでは、複数のパターンを組み合わせて使用します。
[Supervisor] ──メッセージパッシング──→ [各Agent]
│
共有State
│
──→ [Event Bus] ──→ [監視Agent]
[通知Agent]
[分析Agent]
まとめ
| ポイント | 内容 |
|---|---|
| メッセージパッシング | 直接的な1対1通信。シンプルだが密結合 |
| 共有状態 | 全エージェントが共通Stateを参照。一貫性が高い |
| イベントドリブン | Pub/Sub型の疎結合通信。スケーラブル |
| 実際の設計 | 複数パターンを組み合わせて使用 |
チェックリスト
- 3つの通信パターンの特徴と使い分けを理解した
- メッセージパッシング(Handoff)の実装方法を把握した
- 共有状態パターンのLangGraphでの実現方法を理解した
- イベントドリブンパターンの利点と課題を把握した
次のステップへ
次は「オーケストレーション設計」を学びます。タスク分解、結果集約、エラー伝播のパターンを理解しましょう。
推定読了時間: 30分