LESSON 30分

ストーリー

田中VPoE
マルチエージェントのパターンと通信方法を学んだ。次は、複数のエージェントを「どう協調させるか」というオーケストレーション設計だ
あなた
オーケストラの指揮者のように、全体を調整するということですね
田中VPoE
まさにそうだ。タスクをどう分解するか、各エージェントの結果をどう統合するか、エラーが発生したらどう伝播・対処するか。この3つの設計が、マルチエージェントシステムの成否を分ける
あなた
分散システムの設計に近い考え方が必要ですね

タスク分解

分解戦略

複雑なタスクをサブタスクに分解し、適切なエージェントに割り当てます。

分解戦略説明
機能分解機能単位でサブタスクに分解注文検索、配送追跡、返金処理
データ分解データソース単位で分解注文DB、配送DB、顧客DB
フェーズ分解処理フェーズで分解情報収集→分析→意思決定→実行
並列分解独立した処理を並列に分解複数の情報源を同時検索

LangGraphでの実装

class OrchestratorState(TypedDict):
    messages: Annotated[list, add_messages]
    task_plan: list[dict]          # 分解されたサブタスク一覧
    subtask_results: dict          # 各サブタスクの結果
    aggregated_result: str | None  # 統合結果
    errors: list[dict]             # エラー一覧

def decompose_task(state: OrchestratorState) -> OrchestratorState:
    """タスクをサブタスクに分解"""
    response = llm.invoke([
        SystemMessage(content="""ユーザーのリクエストを以下のサブタスクに分解してください。
        各サブタスクにはagent(担当エージェント)とdescription(説明)を付けてください。
        JSON配列形式で出力してください。
        利用可能なエージェント: order_agent, shipping_agent, customer_agent, analytics_agent"""),
        *state["messages"]
    ])

    task_plan = json.loads(response.content)
    return {"task_plan": task_plan}

# 例: 分解結果
# [
#   {"id": "t1", "agent": "order_agent", "description": "注文ORD-12345の詳細を取得"},
#   {"id": "t2", "agent": "shipping_agent", "description": "配送状況を追跡", "depends_on": ["t1"]},
#   {"id": "t3", "agent": "customer_agent", "description": "顧客への回答を生成", "depends_on": ["t1", "t2"]}
# ]

結果集約

集約パターン

パターン説明使い所
統合(Merge)各結果を1つにまとめるレポート生成
選択(Select)最良の結果を選ぶ複数候補からの最適解選定
投票(Vote)多数決で決定品質評価
連鎖(Chain)前の結果を次の入力にする段階的な処理

実装例

def aggregate_results(state: OrchestratorState) -> OrchestratorState:
    """各エージェントの結果を統合"""
    results = state.get("subtask_results", {})
    errors = state.get("errors", [])

    # 利用可能な結果を収集
    available_results = {
        k: v for k, v in results.items()
        if v.get("status") == "completed"
    }

    # エラーがあった場合の対応
    if errors:
        error_summary = "\n".join([
            f"- {e['agent']}: {e['message']}" for e in errors
        ])
    else:
        error_summary = "なし"

    # LLMで結果を統合
    aggregation_prompt = f"""
    以下のサブタスクの結果を統合して、ユーザーへの回答を生成してください。

    サブタスク結果:
    {json.dumps(available_results, ensure_ascii=False, indent=2)}

    エラー情報:
    {error_summary}

    エラーがある場合は、利用可能な情報のみで回答し、
    不足している情報については正直に伝えてください。
    """

    response = llm.invoke([
        SystemMessage(content=aggregation_prompt),
        *state["messages"]
    ])

    return {
        "aggregated_result": response.content,
        "messages": [response]
    }

エラー伝播

エラー伝播戦略

マルチエージェントシステムでは、1つのエージェントのエラーが全体に影響する可能性があります。

戦略説明適するケース
即座に停止1つのエラーで全体を停止全サブタスクが必須の場合
部分的続行エラーのサブタスクをスキップして続行一部が欠けても回答可能な場合
リトライ + フォールバックエラー時にリトライ、ダメなら代替手段一時的エラーが想定される場合
エスカレーション人間に判断を委譲重大なエラーの場合

実装例

async def execute_subtask_with_error_handling(
    subtask: dict,
    state: OrchestratorState
) -> dict:
    """エラーハンドリング付きサブタスク実行"""
    agent_name = subtask["agent"]
    max_retries = 2

    for attempt in range(max_retries + 1):
        try:
            result = await execute_agent(agent_name, subtask, state)
            return {"status": "completed", "data": result}
        except TimeoutError:
            if attempt < max_retries:
                await asyncio.sleep(2 ** attempt)
                continue
            return {
                "status": "failed",
                "error": f"{agent_name} がタイムアウトしました",
                "fallback_available": True
            }
        except Exception as e:
            return {
                "status": "failed",
                "error": str(e),
                "fallback_available": False
            }

def handle_subtask_errors(state: OrchestratorState) -> str:
    """エラー状況に応じた次のアクションを決定"""
    errors = state.get("errors", [])
    results = state.get("subtask_results", {})

    if not errors:
        return "aggregate"  # エラーなし → 結果統合へ

    critical_errors = [e for e in errors if not e.get("fallback_available")]
    if critical_errors:
        return "escalate"  # クリティカルエラー → エスカレーション

    # 一部エラーだが続行可能
    completed = sum(1 for r in results.values() if r.get("status") == "completed")
    total = len(state.get("task_plan", []))

    if completed / total >= 0.5:
        return "aggregate"  # 50%以上成功 → 部分的な結果で統合
    else:
        return "escalate"   # 50%未満 → エスカレーション

オーケストレーション全体設計

graph TD
    start["タスク受付"] --> decompose["タスク分解"]
    decompose --> execute["サブタスク実行<br/>(並列/順次)"]
    execute --> check_errors{"エラー<br/>チェック"}
    check_errors -->|エラーなし| aggregate["結果統合"]
    check_errors -->|部分エラー| partial["部分統合"]
    check_errors -->|重大エラー| escalate["エスカレーション"]
    aggregate --> respond["回答生成"]
    partial --> respond
    escalate --> human["人間対応"]
    respond --> end_node["完了"]

まとめ

ポイント内容
タスク分解機能/データ/フェーズ/並列の4つの分解戦略
結果集約統合、選択、投票、連鎖の4パターン
エラー伝播即停止、部分続行、リトライ、エスカレーションの使い分け
全体設計分解→実行→エラーチェック→統合→回答のフロー

チェックリスト

  • 4つのタスク分解戦略を理解し、使い分けられる
  • 結果集約パターンの選択基準を把握した
  • エラー伝播戦略の設計方法を理解した
  • オーケストレーション全体のフローを設計できる

次のステップへ

次は「演習:マルチエージェントシステムを設計しよう」です。NetShop社のカスタマーサポート向けマルチエージェントシステムの設計に取り組みます。


推定読了時間: 30分