LESSON 30分

ストーリー

田中VPoE
音声認識、音声合成、動画分析を個別に学んだ。ここからはそれらを組み合わせて、業務で使えるメディア処理パイプラインを構築しよう
あなた
個別の技術は分かりましたが、実際の業務ではどう組み合わせるんですか?
田中VPoE
例えば会議録画の処理を考えてみよう。音声を抽出して文字起こしし、話者分離を行い、フレームからスライド内容を抽出し、それらを統合して議事録を自動生成する。これがパイプラインだ
あなた
複数の処理を組み合わせて一気通貫で処理するんですね

メディア処理パイプラインの設計原則

パイプラインアーキテクチャ

入力(音声/動画)


[前処理] → フォーマット変換、品質チェック

    ├──→ [映像処理] → フレーム抽出、VLM分析

    ├──→ [音声処理] → STT、話者分離、感情分析

    └──→ [メタデータ] → ファイル情報、タイムスタンプ


    [統合処理] → LLMで結果を統合・構造化


    [後処理] → フォーマット変換、品質チェック、配信

設計のポイント

原則説明
並列化独立した処理は同時実行映像分析と音声分析を並行
冪等性同じ入力で同じ結果を保証リトライ時の安全性確保
チェックポイント中間結果を保存失敗時に途中から再開
スケーラビリティ負荷に応じたスケールキューベースの非同期処理
エラーハンドリング部分失敗の許容音声なし動画でも映像分析は実行

会議議事録の自動生成パイプライン

全体フロー

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import asyncio

class PipelineStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    PARTIAL = "partial"

@dataclass
class MeetingMinutes:
    meeting_id: str
    title: str
    date: datetime
    duration_seconds: float
    participants: list[str]
    transcript: list[dict]
    slide_contents: list[dict]
    summary: str
    topics: list[dict]
    action_items: list[dict]
    decisions: list[str]
    status: PipelineStatus = PipelineStatus.PENDING

class MeetingPipeline:
    """会議録画から議事録を自動生成するパイプライン"""

    async def process(self, video_path: str, meeting_id: str) -> MeetingMinutes:
        """メインの処理フロー"""
        minutes = MeetingMinutes(
            meeting_id=meeting_id,
            title="",
            date=datetime.now(),
            duration_seconds=0,
            participants=[],
            transcript=[],
            slide_contents=[],
            summary="",
            topics=[],
            action_items=[],
            decisions=[]
        )
        minutes.status = PipelineStatus.PROCESSING

        try:
            # Step 1: 前処理(メタデータ取得)
            metadata = self._get_metadata(video_path)
            minutes.duration_seconds = metadata["duration"]

            # Step 2: 映像と音声を並列処理
            audio_task = asyncio.create_task(
                self._process_audio(video_path)
            )
            video_task = asyncio.create_task(
                self._process_video(video_path)
            )

            audio_result, video_result = await asyncio.gather(
                audio_task, video_task,
                return_exceptions=True
            )

            # 部分失敗の処理
            if isinstance(audio_result, Exception):
                print(f"音声処理失敗: {audio_result}")
                minutes.status = PipelineStatus.PARTIAL
            else:
                minutes.transcript = audio_result["transcript"]
                minutes.participants = audio_result["speakers"]

            if isinstance(video_result, Exception):
                print(f"映像処理失敗: {video_result}")
                minutes.status = PipelineStatus.PARTIAL
            else:
                minutes.slide_contents = video_result["slides"]

            # Step 3: 統合分析
            integrated = await self._integrate_results(minutes)
            minutes.summary = integrated["summary"]
            minutes.title = integrated["title"]
            minutes.topics = integrated["topics"]
            minutes.action_items = integrated["action_items"]
            minutes.decisions = integrated["decisions"]

            if minutes.status != PipelineStatus.PARTIAL:
                minutes.status = PipelineStatus.COMPLETED

        except Exception as e:
            minutes.status = PipelineStatus.FAILED
            raise

        return minutes

    def _get_metadata(self, video_path: str) -> dict:
        """動画のメタデータを取得"""
        import cv2
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
        cap.release()
        return {
            "duration": total_frames / fps,
            "fps": fps,
            "total_frames": int(total_frames)
        }

    async def _process_audio(self, video_path: str) -> dict:
        """音声処理: 抽出 → STT → 話者分離"""
        # 音声抽出
        audio_path = extract_audio_track(video_path)
        # STT + 話者分離
        transcript = transcribe_with_speakers(audio_path)
        speakers = list(set(seg["speaker"] for seg in transcript))
        return {"transcript": transcript, "speakers": speakers}

    async def _process_video(self, video_path: str) -> dict:
        """映像処理: フレーム抽出 → スライド検出 → OCR"""
        frames = extract_frames_interval(video_path, interval_sec=30)
        slides = analyze_slides(frames)
        return {"slides": slides}

    async def _integrate_results(self, minutes: MeetingMinutes) -> dict:
        """LLMで結果を統合"""
        # 統合プロンプトを構築
        prompt = build_integration_prompt(minutes)
        # LLMで構造化データを生成
        return call_llm_for_integration(prompt)

コールセンター音声分析パイプライン

リアルタイム処理 + バッチ処理の組み合わせ

@dataclass
class CallAnalysis:
    call_id: str
    customer_id: str
    operator_id: str
    duration: float
    transcript: list[dict]
    sentiment_score: float          # -1.0 〜 1.0
    category: str                   # 問い合わせカテゴリ
    extracted_info: dict            # 抽出された構造化情報
    summary: str
    escalation_needed: bool
    quality_score: float            # オペレーター品質スコア

class CallCenterPipeline:
    """コールセンター通話の分析パイプライン"""

    def process_realtime(self, audio_stream) -> dict:
        """リアルタイム処理(通話中)"""
        # ストリーミングSTTで文字起こし
        # 感情分析でクレーム検知
        # エスカレーション判定
        pass

    def process_batch(self, audio_path: str, call_id: str) -> CallAnalysis:
        """バッチ処理(通話終了後)"""
        # 1. 高精度STT
        transcript = transcribe_with_speakers(audio_path, num_speakers=2)

        # 2. 感情分析
        sentiment = analyze_sentiment(transcript)

        # 3. カテゴリ分類 + 情報抽出
        analysis = extract_call_info(transcript)

        # 4. 品質スコアリング
        quality = score_operator_quality(transcript)

        # 5. 要約生成
        summary = generate_call_summary(transcript, analysis)

        return CallAnalysis(
            call_id=call_id,
            transcript=transcript,
            sentiment_score=sentiment,
            category=analysis["category"],
            extracted_info=analysis["extracted"],
            summary=summary,
            escalation_needed=sentiment < -0.5,
            quality_score=quality,
            customer_id=analysis["extracted"].get("customer_id", ""),
            operator_id=analysis["extracted"].get("operator_id", ""),
            duration=0
        )

パイプラインの運用設計

キューベースの非同期処理

[ファイルアップロード]


   [メッセージキュー]  ← SQS / RabbitMQ


   [ワーカープロセス]  ← 複数インスタンスでスケール

        ├──→ [前処理ワーカー]
        ├──→ [音声処理ワーカー]
        ├──→ [映像処理ワーカー]
        └──→ [統合処理ワーカー]


         [結果DB] → [通知] → ユーザー

エラーハンドリング戦略

エラー種類対処リトライ
API一時障害指数バックオフでリトライ最大3回
ファイル破損ユーザーに再アップロード依頼なし
タイムアウトチャンク分割して再処理1回
認識精度不良別モデルにフォールバック1回
メモリ不足より小さなモデルに切り替え1回

コスト最適化

戦略効果実装方法
モデルの段階選択コスト30-50%削減簡単な処理にはsmallモデル
キャッシュ重複処理を回避ハッシュベースのキャッシュ
バッチ化API呼び出し回数削減複数ファイルをまとめて処理
オフピーク実行コスト/リソース最適化夜間バッチ

まとめ

パイプライン入力主要処理出力
会議議事録動画STT + 話者分離 + VLM + LLM統合構造化議事録
コールセンター通話音声リアルタイムSTT + 感情分析 + バッチ分析通話分析レポート
研修動画動画STT + 字幕生成 + 要約字幕ファイル + サマリー

チェックリスト

  • メディア処理パイプラインの設計原則(並列化、冪等性、チェックポイント)を理解した
  • 会議議事録の自動生成パイプラインを設計できる
  • リアルタイム処理とバッチ処理の使い分けを説明できる
  • キューベースの非同期処理アーキテクチャを設計できる
  • エラーハンドリングとコスト最適化の戦略を把握した

次のステップへ

Step 3の演習で、学んだ音声・動画処理技術を実際のシナリオに適用します。


推定読了時間: 30分