LESSON 30分

ストーリー

田中VPoE
ドキュメントローディング、チャンキング、メタデータ抽出 — 個別のコンポーネントは理解した。次はこれらを統合した前処理パイプラインを設計する
あなた
ETLパイプラインのような感じですか?
田中VPoE
まさにそうだ。Extract(ドキュメントの取得・テキスト抽出)、Transform(チャンキング・メタデータ付加・Embedding化)、Load(ベクトルDBへの格納)。このパイプラインの設計が、RAGシステムの運用品質を左右する
あなた
バッチ処理とリアルタイム処理、どちらで構築すればいいですか?
田中VPoE
両方必要だ。初回の全量インデックスはバッチ処理、ドキュメントの追加・更新はニアリアルタイムで処理する。NetShop社のケースで具体的に設計しよう

パイプラインの全体設計

アーキテクチャ概要

前処理パイプラインの全体像:

[Extract]
├── Source Connector
│   ├── Confluence API → ドキュメント取得
│   ├── Notion API → ドキュメント取得
│   └── S3 → ファイル取得
├── Format Detector
│   └── 形式判定(PDF/Markdown/HTML/docx)
└── Text Extractor
    └── 形式別のテキスト抽出

[Transform]
├── Cleaner
│   └── ノイズ除去、正規化
├── Chunker
│   └── チャンキング戦略の適用
├── Metadata Enricher
│   ├── 基本メタデータ付加
│   ├── 構造メタデータ抽出
│   └── LLMによるエンリッチメント
└── Embedder
    └── Embeddingベクトル生成

[Load]
├── Vector DB Writer
│   └── ベクトルDB(Qdrant)へのUpsert
└── Audit Logger
    └── 処理ログの記録

バッチ処理とストリーミング処理

処理パターンの使い分け

パターン用途トリガー処理量
フルインデックス(バッチ)初回構築、全量再構築手動/スケジュール全ドキュメント
差分更新(ニアリアルタイム)ドキュメントの追加・更新Webhook/変更検知変更分のみ
定期同期(バッチ)定期的な整合性確認スケジュール(日次/週次)変更分 + 整合性チェック

差分更新の仕組み

差分更新パイプライン:

[Change Detection]
├── Webhook受信(Confluence/Notionの更新通知)
├── ポーリング(定期的にAPIで変更をチェック)
└── ファイル監視(S3イベント通知)

[Change Type判定]
├── 新規追加 → Extract → Transform → Load(Insert)
├── 更新 → Extract → Transform → Load(Update)
└── 削除 → Load(Delete)

[整合性チェック]
└── ベクトルDBの状態と元ドキュメントの一致を確認

エラーハンドリングと冪等性

エラーハンドリング戦略

エラー種類対処リトライ
ソース接続エラーAPIレート制限、ネットワーク障害指数バックオフ(最大5回)
テキスト抽出エラー破損ファイル、未対応形式スキップ + アラート
Embedding APIエラーレート制限、タイムアウト指数バックオフ + キュー再投入
ベクトルDB書き込みエラーDB接続障害、容量不足リトライ + アラート
LLMメタデータ生成エラーAPI障害、不正な出力デフォルト値で代替

冪等性の確保

冪等性の設計:

ドキュメントID + コンテンツハッシュ → 一意なチャンクID

処理フロー:
1. ドキュメントのコンテンツハッシュを計算
2. 既存のハッシュと比較
3. 変更がなければスキップ(冪等性)
4. 変更があれば、旧チャンクを削除 → 新チャンクを挿入

※ 同じドキュメントを何度処理しても結果が同じになる

パイプラインの監視

監視すべきメトリクス

メトリクス閾値アラート条件
処理ドキュメント数/時間ベースラインの±20%急増・急減
エラー率5%以下5%超過
Embedding API レイテンシP99 < 2秒閾値超過
ベクトルDB格納レイテンシP99 < 1秒閾値超過
パイプライン全体の処理時間SLA内SLA超過
未処理ドキュメント数0に収束キュー滞留

ログ設計

処理ログの構造:

{
  "timestamp": "2025-12-01T10:30:00Z",
  "pipeline_run_id": "run-20251201-001",
  "document_id": "doc-12345",
  "stage": "chunking",           // extract | transform | load
  "status": "success",           // success | error | skipped
  "chunks_created": 8,
  "processing_time_ms": 1250,
  "error_message": null,
  "metadata": {
    "source": "confluence",
    "doc_type": "技術文書",
    "content_hash": "abc123..."
  }
}

NetShop社のパイプライン設計

AWS上の構成

NetShop社の前処理パイプライン:

[トリガー]
├── EventBridge(日次スケジュール)
├── S3 Event Notification(ファイルアップロード)
└── API Gateway(手動トリガー)

[SQS キュー]

[Lambda / ECS Task]
├── ドキュメント取得(Confluence/Notion/S3)
├── テキスト抽出 + クリーニング
├── チャンキング
├── メタデータ抽出 + エンリッチメント
├── Embedding生成(OpenAI API)
└── Qdrant Upsert

[CloudWatch]
├── メトリクス監視
├── ログ集約
└── アラート通知(Slack)
コンポーネント選定理由
オーケストレーションAWS Step Functionsステップ間の制御、リトライ、エラーハンドリング
キューAmazon SQS非同期処理、バックプレッシャー対応
処理ワーカーAWS Lambda / ECS Fargate処理量に応じたスケーリング
監視CloudWatch + Slack通知既存監視基盤を活用

まとめ

ポイント内容
パイプライン構成Extract→Transform→Load の3段階
処理パターンフルインデックス(バッチ)+ 差分更新(ニアリアルタイム)の併用
エラーハンドリングリトライ、スキップ、アラートの組み合わせ
冪等性コンテンツハッシュによる変更検知と重複排除
監視処理量、エラー率、レイテンシのメトリクス監視

チェックリスト

  • ETLパイプラインの全体構成を理解した
  • バッチ処理と差分更新の使い分けを理解した
  • エラーハンドリングと冪等性の設計原則を理解した
  • パイプラインの監視メトリクスを理解した

次のステップへ

次は「演習:ドキュメント処理パイプラインを構築しよう」です。ここまで学んだ知識を統合して、実際のパイプラインを設計してみましょう。


推定読了時間: 30分