ストーリー
統合パイプラインの設計思想
なぜ統合が必要か
| 課題 | 個別パイプライン | 統合パイプライン |
|---|---|---|
| 入力の多様性 | モダリティごとに別システム | 単一エントリーポイントで全対応 |
| データの相関分析 | モダリティ間の関連が見えない | クロスモーダル分析が可能 |
| 運用コスト | システムごとに運用が必要 | 共通基盤で効率化 |
| 出力の一貫性 | フォーマットがバラバラ | 統一された構造化出力 |
統合アーキテクチャの全体像
入力層(マルチモダリティ)
├── 画像(JPG/PNG/WebP)
├── 音声(MP3/WAV/M4A)
├── 動画(MP4/MOV)
├── 文書(PDF/DOCX/XLSX)
└── テキスト(自然言語)
│
▼
[モダリティルーター] ← 入力タイプを判定・分岐
│
├─→ [画像処理パイプライン] ─┐
├─→ [音声処理パイプライン] ─┤
├─→ [動画処理パイプライン] ─┤→ [統合分析エンジン] → 構造化出力
├─→ [文書処理パイプライン] ─┤
└─→ [テキスト処理パイプライン]┘
モダリティルーターの設計
ルーターの役割
入力データのタイプを判定し、適切な処理パイプラインに振り分けるコンポーネントです。
from enum import Enum
from pathlib import Path
import mimetypes
class Modality(Enum):
IMAGE = "image"
AUDIO = "audio"
VIDEO = "video"
DOCUMENT = "document"
TEXT = "text"
class ModalityRouter:
"""入力データのモダリティを判定し、適切なパイプラインにルーティング"""
MIME_MAPPING = {
"image/jpeg": Modality.IMAGE,
"image/png": Modality.IMAGE,
"image/webp": Modality.IMAGE,
"audio/mpeg": Modality.AUDIO,
"audio/wav": Modality.AUDIO,
"audio/x-m4a": Modality.AUDIO,
"video/mp4": Modality.VIDEO,
"video/quicktime": Modality.VIDEO,
"application/pdf": Modality.DOCUMENT,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": Modality.DOCUMENT,
"text/plain": Modality.TEXT,
}
def detect_modality(self, file_path: str) -> Modality:
"""ファイルパスからモダリティを判定"""
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type and mime_type in self.MIME_MAPPING:
return self.MIME_MAPPING[mime_type]
raise ValueError(f"Unsupported file type: {mime_type}")
def route(self, file_path: str) -> dict:
"""モダリティに応じたパイプライン設定を返す"""
modality = self.detect_modality(file_path)
return {
"modality": modality,
"pipeline": self._get_pipeline_config(modality),
"file_path": file_path
}
def _get_pipeline_config(self, modality: Modality) -> dict:
configs = {
Modality.IMAGE: {
"preprocessor": "image_preprocessor",
"model": "vision_model",
"postprocessor": "image_postprocessor"
},
Modality.AUDIO: {
"preprocessor": "audio_preprocessor",
"model": "whisper_model",
"postprocessor": "audio_postprocessor"
},
Modality.VIDEO: {
"preprocessor": "video_preprocessor",
"model": "video_analysis_model",
"postprocessor": "video_postprocessor"
},
Modality.DOCUMENT: {
"preprocessor": "document_preprocessor",
"model": "document_ai_model",
"postprocessor": "document_postprocessor"
},
Modality.TEXT: {
"preprocessor": "text_preprocessor",
"model": "llm",
"postprocessor": "text_postprocessor"
},
}
return configs[modality]
統合分析エンジンの設計
共通出力スキーマ
各モダリティのパイプラインから出力される結果を統一フォーマットに変換します。
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ModalityResult:
"""各モダリティパイプラインの出力を統一するスキーマ"""
modality: str
source_file: str
extracted_text: str
metadata: dict = field(default_factory=dict)
confidence: float = 0.0
entities: list[dict] = field(default_factory=list)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@dataclass
class IntegratedResult:
"""統合分析の最終出力"""
request_id: str
modality_results: list[ModalityResult]
summary: str
combined_entities: list[dict]
action_items: list[str]
classification: str
confidence: float
クロスモーダル分析
from openai import OpenAI
client = OpenAI()
def cross_modal_analysis(results: list[ModalityResult]) -> IntegratedResult:
"""複数モダリティの結果を統合分析"""
# 各モダリティの結果をコンテキストとして構成
context_parts = []
for r in results:
context_parts.append(
f"[{r.modality}] (信頼度: {r.confidence:.2f})\n"
f"テキスト: {r.extracted_text}\n"
f"エンティティ: {r.entities}\n"
)
combined_context = "\n---\n".join(context_parts)
# LLMで統合分析
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"複数のモダリティから抽出された情報を統合分析し、"
"以下のJSON形式で出力してください:\n"
'{"summary": "...", "entities": [...], '
'"action_items": [...], "classification": "..."}'
)
},
{
"role": "user",
"content": f"以下の情報を統合分析してください:\n\n{combined_context}"
}
],
response_format={"type": "json_object"}
)
analysis = response.choices[0].message.content
# JSONパース後にIntegratedResultに変換
import json
data = json.loads(analysis)
return IntegratedResult(
request_id="req_" + datetime.now().strftime("%Y%m%d%H%M%S"),
modality_results=results,
summary=data["summary"],
combined_entities=data["entities"],
action_items=data["action_items"],
classification=data["classification"],
confidence=min(r.confidence for r in results)
)
マルチモーダル入力の並行処理
並行実行パターン
複数モダリティの処理を並行実行することで、全体のレイテンシを削減します。
import asyncio
from typing import Callable
class MultiModalPipeline:
"""マルチモーダル入力を並行処理するパイプライン"""
def __init__(self):
self.router = ModalityRouter()
self.processors: dict[Modality, Callable] = {}
def register_processor(self, modality: Modality, processor: Callable):
"""モダリティごとのプロセッサを登録"""
self.processors[modality] = processor
async def process(self, file_paths: list[str]) -> IntegratedResult:
"""複数ファイルを並行処理し、結果を統合"""
tasks = []
for path in file_paths:
route_info = self.router.route(path)
modality = route_info["modality"]
processor = self.processors.get(modality)
if processor:
tasks.append(processor(path))
# 全モダリティを並行実行
results = await asyncio.gather(*tasks, return_exceptions=True)
# エラーハンドリング
valid_results = []
for r in results:
if isinstance(r, Exception):
print(f"処理エラー: {r}")
else:
valid_results.append(r)
# 統合分析
return cross_modal_analysis(valid_results)
レイテンシ比較
| 処理方式 | 画像(2s) + 音声(5s) + 文書(3s) | 合計レイテンシ |
|---|---|---|
| 逐次処理 | 画像→音声→文書 | 10秒 |
| 並行処理 | 全て同時実行 | 5秒(最長の音声に律速) |
| 並行+キャッシュ | キャッシュヒット時はスキップ | 2〜5秒 |
エラーハンドリングとフォールバック
グレースフルデグラデーション
一部のモダリティ処理が失敗しても、残りの結果で最善の回答を生成する設計です。
| シナリオ | 対応 |
|---|---|
| 画像処理が失敗 | テキスト+音声の結果のみで統合分析 |
| 音声認識の精度が低い | 低信頼度フラグを付与し、人的確認キューへ |
| APIレートリミット | リトライ+指数バックオフ、代替モデルへフォールバック |
| 未対応ファイル形式 | エラーメッセージ返却+対応形式の案内 |
class FallbackStrategy:
"""フォールバック戦略の管理"""
def __init__(self, min_confidence: float = 0.7):
self.min_confidence = min_confidence
def evaluate(self, results: list[ModalityResult]) -> dict:
"""結果の品質を評価し、フォールバック判断"""
low_confidence = [
r for r in results if r.confidence < self.min_confidence
]
if len(results) == 0:
return {"action": "escalate", "reason": "全モダリティ処理失敗"}
elif len(low_confidence) > len(results) / 2:
return {"action": "human_review", "reason": "低信頼度結果が多い"}
elif low_confidence:
return {
"action": "partial_auto",
"reason": f"{len(low_confidence)}件が低信頼度",
"low_confidence_modalities": [r.modality for r in low_confidence]
}
else:
return {"action": "auto_complete", "reason": "全結果が高信頼度"}
まとめ
| コンポーネント | 役割 | ポイント |
|---|---|---|
| モダリティルーター | 入力タイプの判定と振り分け | MIMEタイプベースの自動判定 |
| 共通出力スキーマ | 統一フォーマットへの変換 | 全モダリティで同じ構造 |
| 統合分析エンジン | クロスモーダル分析 | LLMによる情報統合 |
| 並行処理 | レイテンシ削減 | asyncio.gatherで同時実行 |
| フォールバック | 障害耐性 | グレースフルデグラデーション |
チェックリスト
- マルチモーダル統合パイプラインの全体アーキテクチャを理解した
- モダリティルーターの設計と実装方法を把握した
- 共通出力スキーマによる統一フォーマットの必要性を理解した
- 並行処理によるレイテンシ最適化の方法を学んだ
- フォールバック戦略とグレースフルデグラデーションを理解した
次のステップへ
次はオーケストレーション設計を学び、複雑なマルチモーダルワークフローの管理方法を習得します。
推定読了時間: 30分