LESSON 30分

ストーリー

田中VPoE
Step 1〜4で画像、音声、動画、文書の各モダリティを個別に学んできた。いよいよこれらを統合するフェーズだ
あなた
個別に動かすだけでなく、複数のモダリティを組み合わせたシステムを作るんですね
田中VPoE
その通り。NetShop社では、カスタマーサポートに「画像付き問い合わせ」「音声通話」「PDF添付」が混在している。これらを統一的に処理する統合パイプラインが必要だ
あなた
どのモダリティが来ても対応できる基盤を設計するということですね

統合パイプラインの設計思想

なぜ統合が必要か

課題個別パイプライン統合パイプライン
入力の多様性モダリティごとに別システム単一エントリーポイントで全対応
データの相関分析モダリティ間の関連が見えないクロスモーダル分析が可能
運用コストシステムごとに運用が必要共通基盤で効率化
出力の一貫性フォーマットがバラバラ統一された構造化出力

統合アーキテクチャの全体像

入力層(マルチモダリティ)
├── 画像(JPG/PNG/WebP)
├── 音声(MP3/WAV/M4A)
├── 動画(MP4/MOV)
├── 文書(PDF/DOCX/XLSX)
└── テキスト(自然言語)


[モダリティルーター] ← 入力タイプを判定・分岐

    ├─→ [画像処理パイプライン] ─┐
    ├─→ [音声処理パイプライン] ─┤
    ├─→ [動画処理パイプライン] ─┤→ [統合分析エンジン] → 構造化出力
    ├─→ [文書処理パイプライン] ─┤
    └─→ [テキスト処理パイプライン]┘

モダリティルーターの設計

ルーターの役割

入力データのタイプを判定し、適切な処理パイプラインに振り分けるコンポーネントです。

from enum import Enum
from pathlib import Path
import mimetypes

class Modality(Enum):
    IMAGE = "image"
    AUDIO = "audio"
    VIDEO = "video"
    DOCUMENT = "document"
    TEXT = "text"

class ModalityRouter:
    """入力データのモダリティを判定し、適切なパイプラインにルーティング"""

    MIME_MAPPING = {
        "image/jpeg": Modality.IMAGE,
        "image/png": Modality.IMAGE,
        "image/webp": Modality.IMAGE,
        "audio/mpeg": Modality.AUDIO,
        "audio/wav": Modality.AUDIO,
        "audio/x-m4a": Modality.AUDIO,
        "video/mp4": Modality.VIDEO,
        "video/quicktime": Modality.VIDEO,
        "application/pdf": Modality.DOCUMENT,
        "application/vnd.openxmlformats-officedocument.wordprocessingml.document": Modality.DOCUMENT,
        "text/plain": Modality.TEXT,
    }

    def detect_modality(self, file_path: str) -> Modality:
        """ファイルパスからモダリティを判定"""
        mime_type, _ = mimetypes.guess_type(file_path)
        if mime_type and mime_type in self.MIME_MAPPING:
            return self.MIME_MAPPING[mime_type]
        raise ValueError(f"Unsupported file type: {mime_type}")

    def route(self, file_path: str) -> dict:
        """モダリティに応じたパイプライン設定を返す"""
        modality = self.detect_modality(file_path)
        return {
            "modality": modality,
            "pipeline": self._get_pipeline_config(modality),
            "file_path": file_path
        }

    def _get_pipeline_config(self, modality: Modality) -> dict:
        configs = {
            Modality.IMAGE: {
                "preprocessor": "image_preprocessor",
                "model": "vision_model",
                "postprocessor": "image_postprocessor"
            },
            Modality.AUDIO: {
                "preprocessor": "audio_preprocessor",
                "model": "whisper_model",
                "postprocessor": "audio_postprocessor"
            },
            Modality.VIDEO: {
                "preprocessor": "video_preprocessor",
                "model": "video_analysis_model",
                "postprocessor": "video_postprocessor"
            },
            Modality.DOCUMENT: {
                "preprocessor": "document_preprocessor",
                "model": "document_ai_model",
                "postprocessor": "document_postprocessor"
            },
            Modality.TEXT: {
                "preprocessor": "text_preprocessor",
                "model": "llm",
                "postprocessor": "text_postprocessor"
            },
        }
        return configs[modality]

統合分析エンジンの設計

共通出力スキーマ

各モダリティのパイプラインから出力される結果を統一フォーマットに変換します。

from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class ModalityResult:
    """各モダリティパイプラインの出力を統一するスキーマ"""
    modality: str
    source_file: str
    extracted_text: str
    metadata: dict = field(default_factory=dict)
    confidence: float = 0.0
    entities: list[dict] = field(default_factory=list)
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())

@dataclass
class IntegratedResult:
    """統合分析の最終出力"""
    request_id: str
    modality_results: list[ModalityResult]
    summary: str
    combined_entities: list[dict]
    action_items: list[str]
    classification: str
    confidence: float

クロスモーダル分析

from openai import OpenAI

client = OpenAI()

def cross_modal_analysis(results: list[ModalityResult]) -> IntegratedResult:
    """複数モダリティの結果を統合分析"""

    # 各モダリティの結果をコンテキストとして構成
    context_parts = []
    for r in results:
        context_parts.append(
            f"[{r.modality}] (信頼度: {r.confidence:.2f})\n"
            f"テキスト: {r.extracted_text}\n"
            f"エンティティ: {r.entities}\n"
        )
    combined_context = "\n---\n".join(context_parts)

    # LLMで統合分析
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "複数のモダリティから抽出された情報を統合分析し、"
                    "以下のJSON形式で出力してください:\n"
                    '{"summary": "...", "entities": [...], '
                    '"action_items": [...], "classification": "..."}'
                )
            },
            {
                "role": "user",
                "content": f"以下の情報を統合分析してください:\n\n{combined_context}"
            }
        ],
        response_format={"type": "json_object"}
    )

    analysis = response.choices[0].message.content
    # JSONパース後にIntegratedResultに変換
    import json
    data = json.loads(analysis)

    return IntegratedResult(
        request_id="req_" + datetime.now().strftime("%Y%m%d%H%M%S"),
        modality_results=results,
        summary=data["summary"],
        combined_entities=data["entities"],
        action_items=data["action_items"],
        classification=data["classification"],
        confidence=min(r.confidence for r in results)
    )

マルチモーダル入力の並行処理

並行実行パターン

複数モダリティの処理を並行実行することで、全体のレイテンシを削減します。

import asyncio
from typing import Callable

class MultiModalPipeline:
    """マルチモーダル入力を並行処理するパイプライン"""

    def __init__(self):
        self.router = ModalityRouter()
        self.processors: dict[Modality, Callable] = {}

    def register_processor(self, modality: Modality, processor: Callable):
        """モダリティごとのプロセッサを登録"""
        self.processors[modality] = processor

    async def process(self, file_paths: list[str]) -> IntegratedResult:
        """複数ファイルを並行処理し、結果を統合"""
        tasks = []
        for path in file_paths:
            route_info = self.router.route(path)
            modality = route_info["modality"]
            processor = self.processors.get(modality)
            if processor:
                tasks.append(processor(path))

        # 全モダリティを並行実行
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # エラーハンドリング
        valid_results = []
        for r in results:
            if isinstance(r, Exception):
                print(f"処理エラー: {r}")
            else:
                valid_results.append(r)

        # 統合分析
        return cross_modal_analysis(valid_results)

レイテンシ比較

処理方式画像(2s) + 音声(5s) + 文書(3s)合計レイテンシ
逐次処理画像→音声→文書10秒
並行処理全て同時実行5秒(最長の音声に律速)
並行+キャッシュキャッシュヒット時はスキップ2〜5秒

エラーハンドリングとフォールバック

グレースフルデグラデーション

一部のモダリティ処理が失敗しても、残りの結果で最善の回答を生成する設計です。

シナリオ対応
画像処理が失敗テキスト+音声の結果のみで統合分析
音声認識の精度が低い低信頼度フラグを付与し、人的確認キューへ
APIレートリミットリトライ+指数バックオフ、代替モデルへフォールバック
未対応ファイル形式エラーメッセージ返却+対応形式の案内
class FallbackStrategy:
    """フォールバック戦略の管理"""

    def __init__(self, min_confidence: float = 0.7):
        self.min_confidence = min_confidence

    def evaluate(self, results: list[ModalityResult]) -> dict:
        """結果の品質を評価し、フォールバック判断"""
        low_confidence = [
            r for r in results if r.confidence < self.min_confidence
        ]

        if len(results) == 0:
            return {"action": "escalate", "reason": "全モダリティ処理失敗"}
        elif len(low_confidence) > len(results) / 2:
            return {"action": "human_review", "reason": "低信頼度結果が多い"}
        elif low_confidence:
            return {
                "action": "partial_auto",
                "reason": f"{len(low_confidence)}件が低信頼度",
                "low_confidence_modalities": [r.modality for r in low_confidence]
            }
        else:
            return {"action": "auto_complete", "reason": "全結果が高信頼度"}

まとめ

コンポーネント役割ポイント
モダリティルーター入力タイプの判定と振り分けMIMEタイプベースの自動判定
共通出力スキーマ統一フォーマットへの変換全モダリティで同じ構造
統合分析エンジンクロスモーダル分析LLMによる情報統合
並行処理レイテンシ削減asyncio.gatherで同時実行
フォールバック障害耐性グレースフルデグラデーション

チェックリスト

  • マルチモーダル統合パイプラインの全体アーキテクチャを理解した
  • モダリティルーターの設計と実装方法を把握した
  • 共通出力スキーマによる統一フォーマットの必要性を理解した
  • 並行処理によるレイテンシ最適化の方法を学んだ
  • フォールバック戦略とグレースフルデグラデーションを理解した

次のステップへ

次はオーケストレーション設計を学び、複雑なマルチモーダルワークフローの管理方法を習得します。


推定読了時間: 30分