LESSON 30分

ストーリー

田中VPoE
A2Aプロトコルはエージェント間通信の標準規格だが、実際にエージェント同士がどうやって情報をやり取りするか、具体的なパターンを学ぼう
あなた
通信の方法にもパターンがあるんですか?
田中VPoE
3つの主要パターンがある。メッセージパッシング、共有状態、イベントドリブンだ。それぞれ特性が異なるので、用途に応じて使い分ける
あなた
分散システムの通信パターンに似ていますね
田中VPoE
まさにマイクロサービスの通信パターンの考え方がそのまま適用できる。エージェントシステムも分散システムの一種だからな

通信パターンの概要

パターン方式特徴適するケース
メッセージパッシングエージェント間で直接メッセージを送受信シンプル、直接的1対1のタスク委任
共有状態共通のStateを読み書き全エージェントが同じ情報を参照協調作業、データ共有
イベントドリブンイベントを発行/購読疎結合、非同期リアクティブな処理連携

パターン1: メッセージパッシング

基本構造

エージェント同士が直接メッセージを送り合うパターンです。

[エージェントA] ──メッセージ──→ [エージェントB]
                              処理
[エージェントA] ←──レスポンス── [エージェントB]

LangGraphでの実装(Handoff)

LangGraphのSwarmパターンでは、エージェントが処理を別のエージェントに「引き渡す(Handoff)」ことでメッセージパッシングを実現します。

from langchain_core.tools import tool

# ハンドオフツール: 他のエージェントに処理を委任
@tool
def transfer_to_shipping_agent():
    """配送に関する問い合わせの場合、配送管理エージェントに処理を引き渡します"""
    return "配送管理エージェントに転送します"

@tool
def transfer_to_order_agent():
    """注文に関する問い合わせの場合、注文管理エージェントに処理を引き渡します"""
    return "注文管理エージェントに転送します"

# カスタマーサポートエージェント
support_agent = create_react_agent(
    llm,
    tools=[transfer_to_shipping_agent, transfer_to_order_agent, search_faq],
    prompt="あなたはカスタマーサポートエージェントです。配送の問題は配送エージェントに、注文の問題は注文エージェントに転送してください。"
)

# 配送管理エージェント
shipping_agent = create_react_agent(
    llm,
    tools=[track_shipment, arrange_redelivery, transfer_to_order_agent],
    prompt="あなたは配送管理エージェントです。配送の追跡と再配達手配を担当します。"
)

利点と課題

利点課題
直感的で理解しやすいエージェント間の依存関係が密結合になりやすい
即座にレスポンスが得られるエージェント数が増えると通信経路が複雑化
デバッグが容易1つのエージェントの障害が連鎖する

パターン2: 共有状態

基本構造

全エージェントが共通のStateを読み書きするパターンです。LangGraphの基本パターンはこれに該当します。

┌─────────────────────────────────┐
│         共有State                │
│  messages: [...]                │
│  order_data: {...}              │
│  shipping_data: {...}           │
│  customer_data: {...}           │
└─────────┬───────────────────────┘
     ┌────┼────┐
     ↕    ↕    ↕
  注文   配送   顧客
  Agent  Agent  Agent

LangGraphでの実装

class SharedState(TypedDict):
    messages: Annotated[list, add_messages]
    order_data: dict | None
    shipping_data: dict | None
    customer_data: dict | None
    current_agent: str

def order_agent(state: SharedState) -> SharedState:
    """注文エージェント: order_dataを更新"""
    order_data = search_orders(state["messages"])
    return {"order_data": order_data, "current_agent": "order"}

def shipping_agent(state: SharedState) -> SharedState:
    """配送エージェント: shipping_dataを更新、order_dataを参照"""
    order = state.get("order_data", {})
    tracking = track_shipment(order.get("tracking_number"))
    return {"shipping_data": tracking, "current_agent": "shipping"}

def customer_agent(state: SharedState) -> SharedState:
    """顧客エージェント: 全データを参照して回答生成"""
    context = {
        "order": state.get("order_data"),
        "shipping": state.get("shipping_data")
    }
    response = generate_customer_response(context, state["messages"])
    return {"messages": [response], "current_agent": "customer"}

利点と課題

利点課題
全エージェントが同じ情報を参照可能State肥大化のリスク
データの一貫性を維持しやすい同時書き込みの競合
LangGraphとの親和性が高いエージェント間の暗黙的な依存

パターン3: イベントドリブン

基本構造

エージェントがイベントを発行(Publish)し、関心のあるエージェントがイベントを購読(Subscribe)するパターンです。

[注文Agent] ──publish──→ [Event Bus] ──subscribe──→ [配送Agent]

                              └──subscribe──→ [通知Agent]

                              └──subscribe──→ [分析Agent]

実装パターン

// イベントバスの実装
class AgentEventBus {
  private handlers: Map<string, Array<(event: AgentEvent) => Promise<void>>> = new Map();

  subscribe(eventType: string, handler: (event: AgentEvent) => Promise<void>): void {
    const handlers = this.handlers.get(eventType) ?? [];
    handlers.push(handler);
    this.handlers.set(eventType, handlers);
  }

  async publish(event: AgentEvent): Promise<void> {
    const handlers = this.handlers.get(event.type) ?? [];
    await Promise.all(handlers.map(h => h(event)));
  }
}

interface AgentEvent {
  type: string;
  source: string;  // 発行元エージェント
  data: Record<string, unknown>;
  timestamp: Date;
}

// 使用例
const eventBus = new AgentEventBus();

// 配送エージェントが「注文キャンセル」イベントを購読
eventBus.subscribe("order.cancelled", async (event) => {
  await cancelShipment(event.data.orderId as string);
});

// 通知エージェントが「注文キャンセル」イベントを購読
eventBus.subscribe("order.cancelled", async (event) => {
  await sendCancellationEmail(event.data.customerId as string);
});

// 注文エージェントがキャンセルイベントを発行
await eventBus.publish({
  type: "order.cancelled",
  source: "order-agent",
  data: { orderId: "ORD-12345", customerId: "CUST-001", reason: "customer_request" },
  timestamp: new Date()
});

利点と課題

利点課題
エージェント間の疎結合イベントの順序保証が難しい
新しいエージェントの追加が容易デバッグが複雑
非同期処理に適しているイベントの消失リスク
スケーラビリティが高い最終的な一貫性の管理

パターンの組み合わせ

実際のシステムでは、複数のパターンを組み合わせて使用します。

[Supervisor] ──メッセージパッシング──→ [各Agent]

                                  共有State

                            ──→ [Event Bus] ──→ [監視Agent]
                                                [通知Agent]
                                                [分析Agent]

まとめ

ポイント内容
メッセージパッシング直接的な1対1通信。シンプルだが密結合
共有状態全エージェントが共通Stateを参照。一貫性が高い
イベントドリブンPub/Sub型の疎結合通信。スケーラブル
実際の設計複数パターンを組み合わせて使用

チェックリスト

  • 3つの通信パターンの特徴と使い分けを理解した
  • メッセージパッシング(Handoff)の実装方法を把握した
  • 共有状態パターンのLangGraphでの実現方法を理解した
  • イベントドリブンパターンの利点と課題を把握した

次のステップへ

次は「オーケストレーション設計」を学びます。タスク分解、結果集約、エラー伝播のパターンを理解しましょう。


推定読了時間: 30分