ストーリー
タスク分解
分解戦略
複雑なタスクをサブタスクに分解し、適切なエージェントに割り当てます。
| 分解戦略 | 説明 | 例 |
|---|---|---|
| 機能分解 | 機能単位でサブタスクに分解 | 注文検索、配送追跡、返金処理 |
| データ分解 | データソース単位で分解 | 注文DB、配送DB、顧客DB |
| フェーズ分解 | 処理フェーズで分解 | 情報収集→分析→意思決定→実行 |
| 並列分解 | 独立した処理を並列に分解 | 複数の情報源を同時検索 |
LangGraphでの実装
class OrchestratorState(TypedDict):
messages: Annotated[list, add_messages]
task_plan: list[dict] # 分解されたサブタスク一覧
subtask_results: dict # 各サブタスクの結果
aggregated_result: str | None # 統合結果
errors: list[dict] # エラー一覧
def decompose_task(state: OrchestratorState) -> OrchestratorState:
"""タスクをサブタスクに分解"""
response = llm.invoke([
SystemMessage(content="""ユーザーのリクエストを以下のサブタスクに分解してください。
各サブタスクにはagent(担当エージェント)とdescription(説明)を付けてください。
JSON配列形式で出力してください。
利用可能なエージェント: order_agent, shipping_agent, customer_agent, analytics_agent"""),
*state["messages"]
])
task_plan = json.loads(response.content)
return {"task_plan": task_plan}
# 例: 分解結果
# [
# {"id": "t1", "agent": "order_agent", "description": "注文ORD-12345の詳細を取得"},
# {"id": "t2", "agent": "shipping_agent", "description": "配送状況を追跡", "depends_on": ["t1"]},
# {"id": "t3", "agent": "customer_agent", "description": "顧客への回答を生成", "depends_on": ["t1", "t2"]}
# ]
結果集約
集約パターン
| パターン | 説明 | 使い所 |
|---|---|---|
| 統合(Merge) | 各結果を1つにまとめる | レポート生成 |
| 選択(Select) | 最良の結果を選ぶ | 複数候補からの最適解選定 |
| 投票(Vote) | 多数決で決定 | 品質評価 |
| 連鎖(Chain) | 前の結果を次の入力にする | 段階的な処理 |
実装例
def aggregate_results(state: OrchestratorState) -> OrchestratorState:
"""各エージェントの結果を統合"""
results = state.get("subtask_results", {})
errors = state.get("errors", [])
# 利用可能な結果を収集
available_results = {
k: v for k, v in results.items()
if v.get("status") == "completed"
}
# エラーがあった場合の対応
if errors:
error_summary = "\n".join([
f"- {e['agent']}: {e['message']}" for e in errors
])
else:
error_summary = "なし"
# LLMで結果を統合
aggregation_prompt = f"""
以下のサブタスクの結果を統合して、ユーザーへの回答を生成してください。
サブタスク結果:
{json.dumps(available_results, ensure_ascii=False, indent=2)}
エラー情報:
{error_summary}
エラーがある場合は、利用可能な情報のみで回答し、
不足している情報については正直に伝えてください。
"""
response = llm.invoke([
SystemMessage(content=aggregation_prompt),
*state["messages"]
])
return {
"aggregated_result": response.content,
"messages": [response]
}
エラー伝播
エラー伝播戦略
マルチエージェントシステムでは、1つのエージェントのエラーが全体に影響する可能性があります。
| 戦略 | 説明 | 適するケース |
|---|---|---|
| 即座に停止 | 1つのエラーで全体を停止 | 全サブタスクが必須の場合 |
| 部分的続行 | エラーのサブタスクをスキップして続行 | 一部が欠けても回答可能な場合 |
| リトライ + フォールバック | エラー時にリトライ、ダメなら代替手段 | 一時的エラーが想定される場合 |
| エスカレーション | 人間に判断を委譲 | 重大なエラーの場合 |
実装例
async def execute_subtask_with_error_handling(
subtask: dict,
state: OrchestratorState
) -> dict:
"""エラーハンドリング付きサブタスク実行"""
agent_name = subtask["agent"]
max_retries = 2
for attempt in range(max_retries + 1):
try:
result = await execute_agent(agent_name, subtask, state)
return {"status": "completed", "data": result}
except TimeoutError:
if attempt < max_retries:
await asyncio.sleep(2 ** attempt)
continue
return {
"status": "failed",
"error": f"{agent_name} がタイムアウトしました",
"fallback_available": True
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"fallback_available": False
}
def handle_subtask_errors(state: OrchestratorState) -> str:
"""エラー状況に応じた次のアクションを決定"""
errors = state.get("errors", [])
results = state.get("subtask_results", {})
if not errors:
return "aggregate" # エラーなし → 結果統合へ
critical_errors = [e for e in errors if not e.get("fallback_available")]
if critical_errors:
return "escalate" # クリティカルエラー → エスカレーション
# 一部エラーだが続行可能
completed = sum(1 for r in results.values() if r.get("status") == "completed")
total = len(state.get("task_plan", []))
if completed / total >= 0.5:
return "aggregate" # 50%以上成功 → 部分的な結果で統合
else:
return "escalate" # 50%未満 → エスカレーション
オーケストレーション全体設計
graph TD
start["タスク受付"] --> decompose["タスク分解"]
decompose --> execute["サブタスク実行<br/>(並列/順次)"]
execute --> check_errors{"エラー<br/>チェック"}
check_errors -->|エラーなし| aggregate["結果統合"]
check_errors -->|部分エラー| partial["部分統合"]
check_errors -->|重大エラー| escalate["エスカレーション"]
aggregate --> respond["回答生成"]
partial --> respond
escalate --> human["人間対応"]
respond --> end_node["完了"]
まとめ
| ポイント | 内容 |
|---|---|
| タスク分解 | 機能/データ/フェーズ/並列の4つの分解戦略 |
| 結果集約 | 統合、選択、投票、連鎖の4パターン |
| エラー伝播 | 即停止、部分続行、リトライ、エスカレーションの使い分け |
| 全体設計 | 分解→実行→エラーチェック→統合→回答のフロー |
チェックリスト
- 4つのタスク分解戦略を理解し、使い分けられる
- 結果集約パターンの選択基準を把握した
- エラー伝播戦略の設計方法を理解した
- オーケストレーション全体のフローを設計できる
次のステップへ
次は「演習:マルチエージェントシステムを設計しよう」です。NetShop社のカスタマーサポート向けマルチエージェントシステムの設計に取り組みます。
推定読了時間: 30分