ストーリー
田
田中VPoE
音声認識、音声合成、動画分析を個別に学んだ。ここからはそれらを組み合わせて、業務で使えるメディア処理パイプラインを構築しよう
あなた
個別の技術は分かりましたが、実際の業務ではどう組み合わせるんですか?
あ
田
田中VPoE
例えば会議録画の処理を考えてみよう。音声を抽出して文字起こしし、話者分離を行い、フレームからスライド内容を抽出し、それらを統合して議事録を自動生成する。これがパイプラインだ
あなた
複数の処理を組み合わせて一気通貫で処理するんですね
あ
メディア処理パイプラインの設計原則
パイプラインアーキテクチャ
入力(音声/動画)
│
▼
[前処理] → フォーマット変換、品質チェック
│
├──→ [映像処理] → フレーム抽出、VLM分析
│
├──→ [音声処理] → STT、話者分離、感情分析
│
└──→ [メタデータ] → ファイル情報、タイムスタンプ
│
▼
[統合処理] → LLMで結果を統合・構造化
│
▼
[後処理] → フォーマット変換、品質チェック、配信
設計のポイント
| 原則 | 説明 | 例 |
|---|
| 並列化 | 独立した処理は同時実行 | 映像分析と音声分析を並行 |
| 冪等性 | 同じ入力で同じ結果を保証 | リトライ時の安全性確保 |
| チェックポイント | 中間結果を保存 | 失敗時に途中から再開 |
| スケーラビリティ | 負荷に応じたスケール | キューベースの非同期処理 |
| エラーハンドリング | 部分失敗の許容 | 音声なし動画でも映像分析は実行 |
会議議事録の自動生成パイプライン
全体フロー
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import asyncio
class PipelineStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
PARTIAL = "partial"
@dataclass
class MeetingMinutes:
meeting_id: str
title: str
date: datetime
duration_seconds: float
participants: list[str]
transcript: list[dict]
slide_contents: list[dict]
summary: str
topics: list[dict]
action_items: list[dict]
decisions: list[str]
status: PipelineStatus = PipelineStatus.PENDING
class MeetingPipeline:
"""会議録画から議事録を自動生成するパイプライン"""
async def process(self, video_path: str, meeting_id: str) -> MeetingMinutes:
"""メインの処理フロー"""
minutes = MeetingMinutes(
meeting_id=meeting_id,
title="",
date=datetime.now(),
duration_seconds=0,
participants=[],
transcript=[],
slide_contents=[],
summary="",
topics=[],
action_items=[],
decisions=[]
)
minutes.status = PipelineStatus.PROCESSING
try:
# Step 1: 前処理(メタデータ取得)
metadata = self._get_metadata(video_path)
minutes.duration_seconds = metadata["duration"]
# Step 2: 映像と音声を並列処理
audio_task = asyncio.create_task(
self._process_audio(video_path)
)
video_task = asyncio.create_task(
self._process_video(video_path)
)
audio_result, video_result = await asyncio.gather(
audio_task, video_task,
return_exceptions=True
)
# 部分失敗の処理
if isinstance(audio_result, Exception):
print(f"音声処理失敗: {audio_result}")
minutes.status = PipelineStatus.PARTIAL
else:
minutes.transcript = audio_result["transcript"]
minutes.participants = audio_result["speakers"]
if isinstance(video_result, Exception):
print(f"映像処理失敗: {video_result}")
minutes.status = PipelineStatus.PARTIAL
else:
minutes.slide_contents = video_result["slides"]
# Step 3: 統合分析
integrated = await self._integrate_results(minutes)
minutes.summary = integrated["summary"]
minutes.title = integrated["title"]
minutes.topics = integrated["topics"]
minutes.action_items = integrated["action_items"]
minutes.decisions = integrated["decisions"]
if minutes.status != PipelineStatus.PARTIAL:
minutes.status = PipelineStatus.COMPLETED
except Exception as e:
minutes.status = PipelineStatus.FAILED
raise
return minutes
def _get_metadata(self, video_path: str) -> dict:
"""動画のメタデータを取得"""
import cv2
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
cap.release()
return {
"duration": total_frames / fps,
"fps": fps,
"total_frames": int(total_frames)
}
async def _process_audio(self, video_path: str) -> dict:
"""音声処理: 抽出 → STT → 話者分離"""
# 音声抽出
audio_path = extract_audio_track(video_path)
# STT + 話者分離
transcript = transcribe_with_speakers(audio_path)
speakers = list(set(seg["speaker"] for seg in transcript))
return {"transcript": transcript, "speakers": speakers}
async def _process_video(self, video_path: str) -> dict:
"""映像処理: フレーム抽出 → スライド検出 → OCR"""
frames = extract_frames_interval(video_path, interval_sec=30)
slides = analyze_slides(frames)
return {"slides": slides}
async def _integrate_results(self, minutes: MeetingMinutes) -> dict:
"""LLMで結果を統合"""
# 統合プロンプトを構築
prompt = build_integration_prompt(minutes)
# LLMで構造化データを生成
return call_llm_for_integration(prompt)
コールセンター音声分析パイプライン
リアルタイム処理 + バッチ処理の組み合わせ
@dataclass
class CallAnalysis:
call_id: str
customer_id: str
operator_id: str
duration: float
transcript: list[dict]
sentiment_score: float # -1.0 〜 1.0
category: str # 問い合わせカテゴリ
extracted_info: dict # 抽出された構造化情報
summary: str
escalation_needed: bool
quality_score: float # オペレーター品質スコア
class CallCenterPipeline:
"""コールセンター通話の分析パイプライン"""
def process_realtime(self, audio_stream) -> dict:
"""リアルタイム処理(通話中)"""
# ストリーミングSTTで文字起こし
# 感情分析でクレーム検知
# エスカレーション判定
pass
def process_batch(self, audio_path: str, call_id: str) -> CallAnalysis:
"""バッチ処理(通話終了後)"""
# 1. 高精度STT
transcript = transcribe_with_speakers(audio_path, num_speakers=2)
# 2. 感情分析
sentiment = analyze_sentiment(transcript)
# 3. カテゴリ分類 + 情報抽出
analysis = extract_call_info(transcript)
# 4. 品質スコアリング
quality = score_operator_quality(transcript)
# 5. 要約生成
summary = generate_call_summary(transcript, analysis)
return CallAnalysis(
call_id=call_id,
transcript=transcript,
sentiment_score=sentiment,
category=analysis["category"],
extracted_info=analysis["extracted"],
summary=summary,
escalation_needed=sentiment < -0.5,
quality_score=quality,
customer_id=analysis["extracted"].get("customer_id", ""),
operator_id=analysis["extracted"].get("operator_id", ""),
duration=0
)
パイプラインの運用設計
キューベースの非同期処理
[ファイルアップロード]
│
▼
[メッセージキュー] ← SQS / RabbitMQ
│
▼
[ワーカープロセス] ← 複数インスタンスでスケール
│
├──→ [前処理ワーカー]
├──→ [音声処理ワーカー]
├──→ [映像処理ワーカー]
└──→ [統合処理ワーカー]
│
▼
[結果DB] → [通知] → ユーザー
エラーハンドリング戦略
| エラー種類 | 対処 | リトライ |
|---|
| API一時障害 | 指数バックオフでリトライ | 最大3回 |
| ファイル破損 | ユーザーに再アップロード依頼 | なし |
| タイムアウト | チャンク分割して再処理 | 1回 |
| 認識精度不良 | 別モデルにフォールバック | 1回 |
| メモリ不足 | より小さなモデルに切り替え | 1回 |
コスト最適化
| 戦略 | 効果 | 実装方法 |
|---|
| モデルの段階選択 | コスト30-50%削減 | 簡単な処理にはsmallモデル |
| キャッシュ | 重複処理を回避 | ハッシュベースのキャッシュ |
| バッチ化 | API呼び出し回数削減 | 複数ファイルをまとめて処理 |
| オフピーク実行 | コスト/リソース最適化 | 夜間バッチ |
まとめ
| パイプライン | 入力 | 主要処理 | 出力 |
|---|
| 会議議事録 | 動画 | STT + 話者分離 + VLM + LLM統合 | 構造化議事録 |
| コールセンター | 通話音声 | リアルタイムSTT + 感情分析 + バッチ分析 | 通話分析レポート |
| 研修動画 | 動画 | STT + 字幕生成 + 要約 | 字幕ファイル + サマリー |
チェックリスト
次のステップへ
Step 3の演習で、学んだ音声・動画処理技術を実際のシナリオに適用します。
推定読了時間: 30分