ストーリー
田
田中VPoE
ドキュメントローディング、チャンキング、メタデータ抽出 — 個別のコンポーネントは理解した。次はこれらを統合した前処理パイプラインを設計する
田
田中VPoE
まさにそうだ。Extract(ドキュメントの取得・テキスト抽出)、Transform(チャンキング・メタデータ付加・Embedding化)、Load(ベクトルDBへの格納)。このパイプラインの設計が、RAGシステムの運用品質を左右する
あなた
バッチ処理とリアルタイム処理、どちらで構築すればいいですか?
あ
田
田中VPoE
両方必要だ。初回の全量インデックスはバッチ処理、ドキュメントの追加・更新はニアリアルタイムで処理する。NetShop社のケースで具体的に設計しよう
パイプラインの全体設計
アーキテクチャ概要
前処理パイプラインの全体像:
[Extract]
├── Source Connector
│ ├── Confluence API → ドキュメント取得
│ ├── Notion API → ドキュメント取得
│ └── S3 → ファイル取得
├── Format Detector
│ └── 形式判定(PDF/Markdown/HTML/docx)
└── Text Extractor
└── 形式別のテキスト抽出
[Transform]
├── Cleaner
│ └── ノイズ除去、正規化
├── Chunker
│ └── チャンキング戦略の適用
├── Metadata Enricher
│ ├── 基本メタデータ付加
│ ├── 構造メタデータ抽出
│ └── LLMによるエンリッチメント
└── Embedder
└── Embeddingベクトル生成
[Load]
├── Vector DB Writer
│ └── ベクトルDB(Qdrant)へのUpsert
└── Audit Logger
└── 処理ログの記録
バッチ処理とストリーミング処理
処理パターンの使い分け
| パターン | 用途 | トリガー | 処理量 |
|---|
| フルインデックス(バッチ) | 初回構築、全量再構築 | 手動/スケジュール | 全ドキュメント |
| 差分更新(ニアリアルタイム) | ドキュメントの追加・更新 | Webhook/変更検知 | 変更分のみ |
| 定期同期(バッチ) | 定期的な整合性確認 | スケジュール(日次/週次) | 変更分 + 整合性チェック |
差分更新の仕組み
差分更新パイプライン:
[Change Detection]
├── Webhook受信(Confluence/Notionの更新通知)
├── ポーリング(定期的にAPIで変更をチェック)
└── ファイル監視(S3イベント通知)
↓
[Change Type判定]
├── 新規追加 → Extract → Transform → Load(Insert)
├── 更新 → Extract → Transform → Load(Update)
└── 削除 → Load(Delete)
↓
[整合性チェック]
└── ベクトルDBの状態と元ドキュメントの一致を確認
エラーハンドリングと冪等性
エラーハンドリング戦略
| エラー種類 | 対処 | リトライ |
|---|
| ソース接続エラー | APIレート制限、ネットワーク障害 | 指数バックオフ(最大5回) |
| テキスト抽出エラー | 破損ファイル、未対応形式 | スキップ + アラート |
| Embedding APIエラー | レート制限、タイムアウト | 指数バックオフ + キュー再投入 |
| ベクトルDB書き込みエラー | DB接続障害、容量不足 | リトライ + アラート |
| LLMメタデータ生成エラー | API障害、不正な出力 | デフォルト値で代替 |
冪等性の確保
冪等性の設計:
ドキュメントID + コンテンツハッシュ → 一意なチャンクID
処理フロー:
1. ドキュメントのコンテンツハッシュを計算
2. 既存のハッシュと比較
3. 変更がなければスキップ(冪等性)
4. 変更があれば、旧チャンクを削除 → 新チャンクを挿入
※ 同じドキュメントを何度処理しても結果が同じになる
パイプラインの監視
監視すべきメトリクス
| メトリクス | 閾値 | アラート条件 |
|---|
| 処理ドキュメント数/時間 | ベースラインの±20% | 急増・急減 |
| エラー率 | 5%以下 | 5%超過 |
| Embedding API レイテンシ | P99 < 2秒 | 閾値超過 |
| ベクトルDB格納レイテンシ | P99 < 1秒 | 閾値超過 |
| パイプライン全体の処理時間 | SLA内 | SLA超過 |
| 未処理ドキュメント数 | 0に収束 | キュー滞留 |
ログ設計
処理ログの構造:
{
"timestamp": "2025-12-01T10:30:00Z",
"pipeline_run_id": "run-20251201-001",
"document_id": "doc-12345",
"stage": "chunking", // extract | transform | load
"status": "success", // success | error | skipped
"chunks_created": 8,
"processing_time_ms": 1250,
"error_message": null,
"metadata": {
"source": "confluence",
"doc_type": "技術文書",
"content_hash": "abc123..."
}
}
NetShop社のパイプライン設計
AWS上の構成
NetShop社の前処理パイプライン:
[トリガー]
├── EventBridge(日次スケジュール)
├── S3 Event Notification(ファイルアップロード)
└── API Gateway(手動トリガー)
↓
[SQS キュー]
↓
[Lambda / ECS Task]
├── ドキュメント取得(Confluence/Notion/S3)
├── テキスト抽出 + クリーニング
├── チャンキング
├── メタデータ抽出 + エンリッチメント
├── Embedding生成(OpenAI API)
└── Qdrant Upsert
↓
[CloudWatch]
├── メトリクス監視
├── ログ集約
└── アラート通知(Slack)
| コンポーネント | 選定 | 理由 |
|---|
| オーケストレーション | AWS Step Functions | ステップ間の制御、リトライ、エラーハンドリング |
| キュー | Amazon SQS | 非同期処理、バックプレッシャー対応 |
| 処理ワーカー | AWS Lambda / ECS Fargate | 処理量に応じたスケーリング |
| 監視 | CloudWatch + Slack通知 | 既存監視基盤を活用 |
まとめ
| ポイント | 内容 |
|---|
| パイプライン構成 | Extract→Transform→Load の3段階 |
| 処理パターン | フルインデックス(バッチ)+ 差分更新(ニアリアルタイム)の併用 |
| エラーハンドリング | リトライ、スキップ、アラートの組み合わせ |
| 冪等性 | コンテンツハッシュによる変更検知と重複排除 |
| 監視 | 処理量、エラー率、レイテンシのメトリクス監視 |
チェックリスト
次のステップへ
次は「演習:ドキュメント処理パイプラインを構築しよう」です。ここまで学んだ知識を統合して、実際のパイプラインを設計してみましょう。
推定読了時間: 30分