ストーリー
オーケストレーションパターン
パターンの分類
| パターン | 概要 | 適用場面 |
|---|---|---|
| シーケンシャル | 処理を順番に実行 | 前の結果が次の入力になる場合 |
| パラレル | 独立した処理を同時実行 | 処理間に依存がない場合 |
| ファンアウト/ファンイン | 分散→集約 | 1入力→複数処理→結果統合 |
| 条件分岐 | 中間結果で処理を分岐 | 入力内容に応じた処理選択 |
| ループ | 条件を満たすまで繰り返し | 精度が基準に達するまで再処理 |
ワークフロー定義
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
class StepType(Enum):
PROCESS = "process"
BRANCH = "branch"
PARALLEL = "parallel"
LOOP = "loop"
@dataclass
class WorkflowStep:
"""ワークフローの1ステップ"""
name: str
step_type: StepType
handler: Callable
next_steps: list[str] = field(default_factory=list)
condition: Callable | None = None # 分岐条件
max_retries: int = 3
timeout_seconds: int = 300
@dataclass
class Workflow:
"""マルチモーダルワークフロー定義"""
name: str
steps: dict[str, WorkflowStep] = field(default_factory=dict)
entry_point: str = ""
def add_step(self, step: WorkflowStep):
self.steps[step.name] = step
if not self.entry_point:
self.entry_point = step.name
DAGベースのオーケストレーション
有向非巡回グラフ(DAG)による依存管理
[動画入力]
│
├──→ [映像フレーム抽出] ──→ [画像分析] ──┐
│ │
└──→ [音声分離] ──→ [STT] ──→ [NLP分析] ┤
│
[統合分析] ──→ [出力]
import asyncio
from collections import defaultdict
class DAGOrchestrator:
"""DAGベースのワークフローオーケストレーター"""
def __init__(self):
self.tasks: dict[str, Callable] = {}
self.dependencies: dict[str, list[str]] = defaultdict(list)
self.results: dict[str, Any] = {}
def add_task(self, name: str, handler: Callable, depends_on: list[str] = None):
"""タスクを追加"""
self.tasks[name] = handler
if depends_on:
self.dependencies[name] = depends_on
async def execute(self, initial_input: Any) -> dict[str, Any]:
"""DAGを実行"""
completed = set()
self.results = {"_input": initial_input}
while len(completed) < len(self.tasks):
# 実行可能なタスクを取得(依存が全て完了しているもの)
ready = [
name for name, deps in self.dependencies.items()
if name not in completed
and all(d in completed for d in deps)
]
# 依存なしのタスクも追加
for name in self.tasks:
if name not in completed and name not in self.dependencies:
ready.append(name)
if not ready:
break
# 実行可能なタスクを並行実行
async_tasks = []
for name in ready:
dep_results = {
d: self.results[d] for d in self.dependencies.get(name, [])
}
async_tasks.append(
self._run_task(name, dep_results or initial_input)
)
results = await asyncio.gather(*async_tasks, return_exceptions=True)
for name, result in zip(ready, results):
if isinstance(result, Exception):
print(f"タスク {name} 失敗: {result}")
self.results[name] = None
else:
self.results[name] = result
completed.add(name)
return self.results
async def _run_task(self, name: str, inputs: Any) -> Any:
"""個別タスクを実行(リトライ付き)"""
handler = self.tasks[name]
for attempt in range(3):
try:
return await handler(inputs)
except Exception as e:
if attempt == 2:
raise
await asyncio.sleep(2 ** attempt)
条件分岐とルーティング
動的ルーティングの実装
class ConditionalRouter:
"""中間結果に基づく条件分岐ルーター"""
def __init__(self):
self.routes: list[dict] = []
def add_route(
self,
condition: Callable[[dict], bool],
target: str,
priority: int = 0
):
"""条件付きルートを追加"""
self.routes.append({
"condition": condition,
"target": target,
"priority": priority
})
self.routes.sort(key=lambda r: r["priority"], reverse=True)
def resolve(self, context: dict) -> str:
"""コンテキストに基づいて次のステップを決定"""
for route in self.routes:
if route["condition"](context):
return route["target"]
return "default_handler"
# 使用例
router = ConditionalRouter()
# 画像に文書が含まれる場合 → Document AIパイプラインへ
router.add_route(
condition=lambda ctx: ctx.get("contains_text", False),
target="document_pipeline",
priority=10
)
# 画像に人物が含まれる場合 → 顔認識パイプラインへ
router.add_route(
condition=lambda ctx: ctx.get("contains_faces", False),
target="face_recognition_pipeline",
priority=5
)
# デフォルト → 汎用画像分析
router.add_route(
condition=lambda ctx: True,
target="general_image_analysis",
priority=0
)
状態管理とチェックポイント
ワークフロー状態の永続化
長時間実行されるワークフローでは、途中の状態を保存しておく必要があります。
import json
from datetime import datetime
class WorkflowState:
"""ワークフローの状態管理"""
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
self.status = "pending"
self.current_step = ""
self.completed_steps: list[str] = []
self.step_results: dict[str, Any] = {}
self.errors: list[dict] = []
self.created_at = datetime.now().isoformat()
self.updated_at = datetime.now().isoformat()
def update_step(self, step_name: str, result: Any, status: str = "completed"):
"""ステップの完了を記録"""
self.step_results[step_name] = result
self.completed_steps.append(step_name)
self.current_step = step_name
self.status = status
self.updated_at = datetime.now().isoformat()
def record_error(self, step_name: str, error: str):
"""エラーを記録"""
self.errors.append({
"step": step_name,
"error": error,
"timestamp": datetime.now().isoformat()
})
def to_checkpoint(self) -> str:
"""状態をJSON形式でシリアライズ"""
return json.dumps({
"workflow_id": self.workflow_id,
"status": self.status,
"current_step": self.current_step,
"completed_steps": self.completed_steps,
"errors": self.errors,
"updated_at": self.updated_at
}, ensure_ascii=False)
@classmethod
def from_checkpoint(cls, checkpoint_json: str) -> "WorkflowState":
"""チェックポイントから状態を復元"""
data = json.loads(checkpoint_json)
state = cls(data["workflow_id"])
state.status = data["status"]
state.current_step = data["current_step"]
state.completed_steps = data["completed_steps"]
state.errors = data["errors"]
return state
再実行戦略
| 障害ケース | 再実行戦略 |
|---|---|
| 一時的なAPIエラー | 指数バックオフでリトライ |
| タイムアウト | チェックポイントから再開 |
| データ破損 | 該当ステップのみ再実行 |
| 全体障害 | 最後のチェックポイントからワークフロー全体を再開 |
実践的なオーケストレーション例
カスタマーサポートの問い合わせ統合処理
[問い合わせ受信]
│
▼
[入力分類] → テキストのみ → [LLM分析] → [回答生成]
│ │
├→ 画像添付あり → [画像分析] ────────────┤
│ │
├→ 音声添付あり → [STT] → [感情分析] ───┤
│ │
└→ 文書添付あり → [Document AI] ────────┤
│
[統合コンテキスト]
│
[優先度判定]
│
┌────────────┼────────────┐
▼ ▼ ▼
[自動回答] [担当者割当] [エスカレーション]
まとめ
| パターン | メリット | 注意点 |
|---|---|---|
| DAGオーケストレーション | 依存関係の明確化、並行実行 | 循環依存の防止が必要 |
| 条件分岐 | 柔軟な処理フロー | 分岐条件の網羅性を確認 |
| 状態管理 | 障害回復、監査 | ストレージコストの考慮 |
| チェックポイント | 長時間処理の安全性 | シリアライズのオーバーヘッド |
チェックリスト
- オーケストレーションの5つのパターンを理解した
- DAGベースの依存管理と並行実行を設計できる
- 条件分岐による動的ルーティングの仕組みを把握した
- ワークフロー状態の永続化とチェックポイントの必要性を理解した
- エラー時の再実行戦略を設計できる
次のステップへ
次は品質管理を学び、マルチモーダルパイプライン全体の品質を担保する方法を習得します。
推定読了時間: 30分