LESSON 30分

ストーリー

田中VPoE
統合パイプラインの基盤ができた。次はワークフロー全体のオーケストレーションだ。実際の業務では、モダリティの処理順序に依存関係があったり、条件分岐が必要になる
あなた
例えば、動画を処理するときは音声と映像を別々に処理してから統合する、みたいな依存関係ですか?
田中VPoE
その通り。さらに、中間結果に応じて処理を分岐させたり、エラー時にリトライする仕組みも必要だ。オーケストレーションパターンを学ぼう
あなた
複雑なフローを管理する方法を知りたいです

オーケストレーションパターン

パターンの分類

パターン概要適用場面
シーケンシャル処理を順番に実行前の結果が次の入力になる場合
パラレル独立した処理を同時実行処理間に依存がない場合
ファンアウト/ファンイン分散→集約1入力→複数処理→結果統合
条件分岐中間結果で処理を分岐入力内容に応じた処理選択
ループ条件を満たすまで繰り返し精度が基準に達するまで再処理

ワークフロー定義

from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable

class StepType(Enum):
    PROCESS = "process"
    BRANCH = "branch"
    PARALLEL = "parallel"
    LOOP = "loop"

@dataclass
class WorkflowStep:
    """ワークフローの1ステップ"""
    name: str
    step_type: StepType
    handler: Callable
    next_steps: list[str] = field(default_factory=list)
    condition: Callable | None = None  # 分岐条件
    max_retries: int = 3
    timeout_seconds: int = 300

@dataclass
class Workflow:
    """マルチモーダルワークフロー定義"""
    name: str
    steps: dict[str, WorkflowStep] = field(default_factory=dict)
    entry_point: str = ""

    def add_step(self, step: WorkflowStep):
        self.steps[step.name] = step
        if not self.entry_point:
            self.entry_point = step.name

DAGベースのオーケストレーション

有向非巡回グラフ(DAG)による依存管理

[動画入力]

    ├──→ [映像フレーム抽出] ──→ [画像分析] ──┐
    │                                       │
    └──→ [音声分離] ──→ [STT] ──→ [NLP分析] ┤

                                    [統合分析] ──→ [出力]
import asyncio
from collections import defaultdict

class DAGOrchestrator:
    """DAGベースのワークフローオーケストレーター"""

    def __init__(self):
        self.tasks: dict[str, Callable] = {}
        self.dependencies: dict[str, list[str]] = defaultdict(list)
        self.results: dict[str, Any] = {}

    def add_task(self, name: str, handler: Callable, depends_on: list[str] = None):
        """タスクを追加"""
        self.tasks[name] = handler
        if depends_on:
            self.dependencies[name] = depends_on

    async def execute(self, initial_input: Any) -> dict[str, Any]:
        """DAGを実行"""
        completed = set()
        self.results = {"_input": initial_input}

        while len(completed) < len(self.tasks):
            # 実行可能なタスクを取得(依存が全て完了しているもの)
            ready = [
                name for name, deps in self.dependencies.items()
                if name not in completed
                and all(d in completed for d in deps)
            ]
            # 依存なしのタスクも追加
            for name in self.tasks:
                if name not in completed and name not in self.dependencies:
                    ready.append(name)

            if not ready:
                break

            # 実行可能なタスクを並行実行
            async_tasks = []
            for name in ready:
                dep_results = {
                    d: self.results[d] for d in self.dependencies.get(name, [])
                }
                async_tasks.append(
                    self._run_task(name, dep_results or initial_input)
                )

            results = await asyncio.gather(*async_tasks, return_exceptions=True)
            for name, result in zip(ready, results):
                if isinstance(result, Exception):
                    print(f"タスク {name} 失敗: {result}")
                    self.results[name] = None
                else:
                    self.results[name] = result
                completed.add(name)

        return self.results

    async def _run_task(self, name: str, inputs: Any) -> Any:
        """個別タスクを実行(リトライ付き)"""
        handler = self.tasks[name]
        for attempt in range(3):
            try:
                return await handler(inputs)
            except Exception as e:
                if attempt == 2:
                    raise
                await asyncio.sleep(2 ** attempt)

条件分岐とルーティング

動的ルーティングの実装

class ConditionalRouter:
    """中間結果に基づく条件分岐ルーター"""

    def __init__(self):
        self.routes: list[dict] = []

    def add_route(
        self,
        condition: Callable[[dict], bool],
        target: str,
        priority: int = 0
    ):
        """条件付きルートを追加"""
        self.routes.append({
            "condition": condition,
            "target": target,
            "priority": priority
        })
        self.routes.sort(key=lambda r: r["priority"], reverse=True)

    def resolve(self, context: dict) -> str:
        """コンテキストに基づいて次のステップを決定"""
        for route in self.routes:
            if route["condition"](context):
                return route["target"]
        return "default_handler"

# 使用例
router = ConditionalRouter()

# 画像に文書が含まれる場合 → Document AIパイプラインへ
router.add_route(
    condition=lambda ctx: ctx.get("contains_text", False),
    target="document_pipeline",
    priority=10
)

# 画像に人物が含まれる場合 → 顔認識パイプラインへ
router.add_route(
    condition=lambda ctx: ctx.get("contains_faces", False),
    target="face_recognition_pipeline",
    priority=5
)

# デフォルト → 汎用画像分析
router.add_route(
    condition=lambda ctx: True,
    target="general_image_analysis",
    priority=0
)

状態管理とチェックポイント

ワークフロー状態の永続化

長時間実行されるワークフローでは、途中の状態を保存しておく必要があります。

import json
from datetime import datetime

class WorkflowState:
    """ワークフローの状態管理"""

    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.status = "pending"
        self.current_step = ""
        self.completed_steps: list[str] = []
        self.step_results: dict[str, Any] = {}
        self.errors: list[dict] = []
        self.created_at = datetime.now().isoformat()
        self.updated_at = datetime.now().isoformat()

    def update_step(self, step_name: str, result: Any, status: str = "completed"):
        """ステップの完了を記録"""
        self.step_results[step_name] = result
        self.completed_steps.append(step_name)
        self.current_step = step_name
        self.status = status
        self.updated_at = datetime.now().isoformat()

    def record_error(self, step_name: str, error: str):
        """エラーを記録"""
        self.errors.append({
            "step": step_name,
            "error": error,
            "timestamp": datetime.now().isoformat()
        })

    def to_checkpoint(self) -> str:
        """状態をJSON形式でシリアライズ"""
        return json.dumps({
            "workflow_id": self.workflow_id,
            "status": self.status,
            "current_step": self.current_step,
            "completed_steps": self.completed_steps,
            "errors": self.errors,
            "updated_at": self.updated_at
        }, ensure_ascii=False)

    @classmethod
    def from_checkpoint(cls, checkpoint_json: str) -> "WorkflowState":
        """チェックポイントから状態を復元"""
        data = json.loads(checkpoint_json)
        state = cls(data["workflow_id"])
        state.status = data["status"]
        state.current_step = data["current_step"]
        state.completed_steps = data["completed_steps"]
        state.errors = data["errors"]
        return state

再実行戦略

障害ケース再実行戦略
一時的なAPIエラー指数バックオフでリトライ
タイムアウトチェックポイントから再開
データ破損該当ステップのみ再実行
全体障害最後のチェックポイントからワークフロー全体を再開

実践的なオーケストレーション例

カスタマーサポートの問い合わせ統合処理

[問い合わせ受信]


[入力分類] → テキストのみ → [LLM分析] → [回答生成]
    │                                        │
    ├→ 画像添付あり → [画像分析] ────────────┤
    │                                        │
    ├→ 音声添付あり → [STT] → [感情分析] ───┤
    │                                        │
    └→ 文書添付あり → [Document AI] ────────┤

                                      [統合コンテキスト]

                                      [優先度判定]

                                ┌────────────┼────────────┐
                                ▼            ▼            ▼
                            [自動回答]   [担当者割当]   [エスカレーション]

まとめ

パターンメリット注意点
DAGオーケストレーション依存関係の明確化、並行実行循環依存の防止が必要
条件分岐柔軟な処理フロー分岐条件の網羅性を確認
状態管理障害回復、監査ストレージコストの考慮
チェックポイント長時間処理の安全性シリアライズのオーバーヘッド

チェックリスト

  • オーケストレーションの5つのパターンを理解した
  • DAGベースの依存管理と並行実行を設計できる
  • 条件分岐による動的ルーティングの仕組みを把握した
  • ワークフロー状態の永続化とチェックポイントの必要性を理解した
  • エラー時の再実行戦略を設計できる

次のステップへ

次は品質管理を学び、マルチモーダルパイプライン全体の品質を担保する方法を習得します。


推定読了時間: 30分