ストーリー
田
田中VPoE
データガバナンスの方針、品質管理、プライバシー対策 — ここまでの3つのレッスンで「何を守るか」を学んだ。次は「どう動かすか」だ
あなた
データパイプラインですね。RAGやファインチューニングにデータを流す仕組み
あ
田
田中VPoE
その通り。だが、エンタープライズのデータパイプラインは単純にデータを流すだけでは済まない。品質チェック、PII検出、バージョニング、リネージュ追跡 — これらを全て組み込んだ堅牢なパイプラインが必要だ
あなた
品質もプライバシーも自動化の中に組み込むわけですね
あ
田
田中VPoE
そうだ。手動チェックに頼る運用はスケールしない。パイプラインに品質ゲートとプライバシーフィルタを組み込み、問題があれば自動で止まる。そんな設計を学ぼう
RAG用データパイプライン設計
全体アーキテクチャ
┌──────────────────────────────────────────────────────────┐
│ RAG データパイプライン │
├──────────────────────────────────────────────────────────┤
│ │
│ [データソース] [取り込み] [前処理] │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ドキュメント│ │ Ingestion│ │ Cleaning │ │
│ │ DB │───▶│ Service │───▶│ Pipeline │ │
│ │ API │ │ │ │ │ │
│ │ ファイル │ │ │ │ │ │
│ └──────────┘ └──────────┘ └────┬─────┘ │
│ │ │
│ ▼ │
│ [品質ゲート] [変換] [PII検出] │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Quality │◀───│ Chunking │◀───│ PII │ │
│ │ Gate │ │ +Enrichm.│ │ Detection│ │
│ │ │ │ │ │ +Masking │ │
│ └────┬─────┘ └──────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ [埋め込み] [ベクトルDB] [メタデータ] │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Embedding│───▶│ Vector │ │ Metadata │ │
│ │ Model │ │ Store │ │ Store │ │
│ │ │ │ │ │(Lineage) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└──────────────────────────────────────────────────────────┘
パイプラインの各ステージ
| ステージ | 入力 | 処理内容 | 出力 | 失敗時の動作 |
|---|
| 取り込み | 各種データソース | フォーマット正規化、重複検出 | 正規化ドキュメント | リトライ → DLQ |
| PII検出 | 正規化ドキュメント | PII検出、マスキング/除外 | クリーンドキュメント | パイプライン停止 |
| 前処理 | クリーンドキュメント | テキスト抽出、正規化 | 処理済みテキスト | スキップ+ログ |
| チャンキング | 処理済みテキスト | セマンティック/固定長分割 | チャンク群 | スキップ+ログ |
| 品質ゲート | チャンク群 | 品質スコア算出、閾値チェック | 検証済みチャンク | 品質不足を通知 |
| 埋め込み | 検証済みチャンク | ベクトル化 | 埋め込みベクトル | リトライ → DLQ |
| 格納 | 埋め込みベクトル | ベクトルDB/メタデータDB書き込み | インデックス完了 | ロールバック |
チャンキング戦略
from enum import Enum
from dataclasses import dataclass
class ChunkingStrategy(Enum):
FIXED_SIZE = "fixed_size" # 固定トークン数で分割
SEMANTIC = "semantic" # 意味のまとまりで分割
RECURSIVE = "recursive" # 段階的に分割
DOCUMENT_AWARE = "document_aware" # 文書構造を考慮
@dataclass
class ChunkingConfig:
strategy: ChunkingStrategy
chunk_size: int = 512 # トークン数
chunk_overlap: int = 50 # オーバーラップ
min_chunk_size: int = 100 # 最小チャンクサイズ
# 推奨設定
CHUNKING_CONFIGS = {
"faq": ChunkingConfig(
strategy=ChunkingStrategy.DOCUMENT_AWARE,
chunk_size=256,
chunk_overlap=0, # FAQ は質問-回答を1チャンクに
),
"technical_docs": ChunkingConfig(
strategy=ChunkingStrategy.RECURSIVE,
chunk_size=512,
chunk_overlap=50,
),
"contracts": ChunkingConfig(
strategy=ChunkingStrategy.SEMANTIC,
chunk_size=1024,
chunk_overlap=100, # 法的文書は文脈が重要
),
"chat_logs": ChunkingConfig(
strategy=ChunkingStrategy.FIXED_SIZE,
chunk_size=256,
chunk_overlap=30,
),
}
ベクトルDB/埋め込みモデル選定
ベクトルDBの比較
| ベクトルDB | デプロイ | スケーラビリティ | 特徴 | 適用シーン |
|---|
| Pinecone | マネージド | 高 | フルマネージド、メタデータフィルタリング | 運用負荷を最小化したい |
| Weaviate | セルフホスト/マネージド | 高 | マルチモーダル、ハイブリッド検索 | テキスト+画像の検索 |
| Qdrant | セルフホスト/マネージド | 中-高 | Rust製高速、ペイロードフィルタ | 高性能が必要 |
| pgvector | PostgreSQL拡張 | 中 | 既存PostgreSQLに追加 | 小-中規模、既存PG活用 |
| Chroma | セルフホスト | 低-中 | 軽量、開発向け | PoC、プロトタイピング |
| Amazon OpenSearch | マネージド (AWS) | 高 | k-NN検索、AWS統合 | AWS基盤の組織 |
選定の判断基準
vector_db_selection_criteria:
functional:
- name: "検索精度"
weight: 0.25
metrics: ["Recall@10", "MRR"]
- name: "メタデータフィルタリング"
weight: 0.20
consideration: "データ分類レベルによるアクセス制御"
- name: "ハイブリッド検索"
weight: 0.15
consideration: "ベクトル検索 + キーワード検索の組み合わせ"
non_functional:
- name: "レイテンシ"
weight: 0.15
target: "p99 < 100ms"
- name: "スケーラビリティ"
weight: 0.10
target: "1億ベクトル以上に対応"
- name: "運用性"
weight: 0.10
consideration: "バックアップ、監視、アップグレード"
- name: "コスト"
weight: 0.05
consideration: "ベクトル数・クエリ数に応じた料金体系"
埋め込みモデルの比較
| モデル | 次元数 | 最大トークン | 特徴 | 用途 |
|---|
| text-embedding-3-small (OpenAI) | 1536 | 8191 | 低コスト、高品質 | 汎用 |
| text-embedding-3-large (OpenAI) | 3072 | 8191 | 最高精度 | 高精度が必要 |
| Cohere embed-v3 | 1024 | 512 | 多言語対応 | 多言語RAG |
| E5-large-v2 (OSS) | 1024 | 512 | セルフホスト可能 | データを外部に出せない場合 |
| multilingual-e5-large (OSS) | 1024 | 512 | 多言語、セルフホスト可 | 日本語+セルフホスト |
ファインチューニング用データ準備フロー
データ準備のワークフロー
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 1. データ収集 │───▶│ 2. フィルタ │───▶│ 3. 品質検証 │
│ ・タスク定義 │ │ ・PII除去 │ │ ・ラベル検証 │
│ ・データソース│ │ ・重複除去 │ │ ・バイアス │
│ 特定 │ │ ・品質基準 │ │ チェック │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
┌─────────────┐ ┌─────────────┐ ┌──────▼──────┐
│ 6. 評価 │◀───│ 5. 学習 │◀───│ 4. フォーマ │
│ ・精度検証 │ │ ・パラメータ │ │ ット変換 │
│ ・バイアス │ │ チューニング│ │ ・JSONL │
│ 再検証 │ │ ・検証分割 │ │ ・train/val │
└─────────────┘ └─────────────┘ └─────────────┘
ファインチューニングデータのフォーマット
{"messages": [{"role": "system", "content": "あなたは社内ヘルプデスクのアシスタントです。"}, {"role": "user", "content": "有給休暇の申請方法を教えてください"}, {"role": "assistant", "content": "有給休暇の申請手順は以下の通りです:\n1. 社内ポータルの「勤怠管理」にログイン\n2. 「休暇申請」メニューを選択\n3. 休暇種別で「年次有給休暇」を選択\n4. 取得希望日を入力\n5. 上長の承認を待つ\n\n申請は取得希望日の3営業日前までにお願いします。"}]}
{"messages": [{"role": "system", "content": "あなたは社内ヘルプデスクのアシスタントです。"}, {"role": "user", "content": "VPN接続ができません"}, {"role": "assistant", "content": "VPN接続のトラブルシューティングを行います:\n\n1. VPNクライアントのバージョンを確認(最新: v4.2.1)\n2. ネットワーク接続を確認\n3. 認証情報(ID/パスワード)の有効期限を確認\n4. ファイアウォール設定を確認\n\n上記で解決しない場合は、IT部門(内線: 2345)までご連絡ください。"}]}
データ品質チェックリスト
| チェック項目 | 基準 | ツール |
|---|
| データ件数 | 最低500件、推奨1000件以上 | 手動カウント |
| ラベル品質 | Inter-annotator agreement >= 0.8 | Cohen’s Kappa |
| PII除去 | PII検出率 0% | 自動検出ツール |
| 多様性 | カテゴリの偏りが10 | 分布分析 |
| 重複率 | 5%以下 | 類似度検出 |
| フォーマット | 全レコードが正しいJSON形式 | スキーマ検証 |
| トークン長 | 最大トークン長を超えないこと | トークナイザー |
データバージョニング
DVC (Data Version Control) の活用
# DVCの初期化
dvc init
# データファイルの追跡
dvc add data/training/customer_support_v1.jsonl
# メタデータのコミット
git add data/training/customer_support_v1.jsonl.dvc .gitignore
git commit -m "Add training data v1: 1200 customer support examples"
# リモートストレージの設定
dvc remote add -d s3store s3://ml-data-versioning/training
# データのプッシュ
dvc push
# 特定バージョンに切り替え
git checkout v1.0.0
dvc checkout
データバージョニング戦略
| 項目 | 方針 |
|---|
| バージョン命名 | SemVer準拠: v{major}.{minor}.{patch} |
| Major更新 | スキーマ変更、大規模データ入替 |
| Minor更新 | データの追加、ラベルの修正 |
| Patch更新 | 軽微なクレンジング、typo修正 |
| 保持期間 | 直近5バージョン + 各モデルの学習時バージョン |
| メタデータ | 件数、品質スコア、変更概要を記録 |
# データバージョンのメタデータ例
data_version:
id: "customer_support_v2.1.0"
created_at: "2026-02-15T10:00:00+09:00"
created_by: "data-team"
statistics:
total_records: 1500
added: 300
removed: 0
modified: 45
quality:
overall_score: 0.89
completeness: 0.97
bias_check: "passed"
pii_check: "passed"
lineage:
source_systems: ["zendesk", "slack", "internal_wiki"]
transformations:
- "PII masking (v2.0)"
- "Deduplication (cosine similarity > 0.95)"
- "Quality filter (score >= 0.7)"
associated_models:
- model_id: "helpdesk-gpt-v3"
trained_at: "2026-02-16"
performance: { accuracy: 0.92, f1: 0.89 }
パイプラインの監視とアラート
監視すべきメトリクス
| カテゴリ | メトリクス | 閾値例 | アラート |
|---|
| パフォーマンス | パイプライン実行時間 | > 想定の2倍 | Warning |
| パフォーマンス | スループット (docs/min) | < 100 docs/min | Warning |
| 品質 | 品質ゲート通過率 | < 90% | Critical |
| 品質 | PII検出件数 | > 0 (Level 3以上) | Critical |
| 信頼性 | パイプライン成功率 | < 99% | Warning |
| 信頼性 | DLQメッセージ数 | > 100/日 | Warning |
| データドリフト | KS統計量 | > 0.1 | Warning |
| コスト | 埋め込みAPI呼び出し数 | > 予算の80% | Warning |
アラートの設定例
monitoring:
pipeline_alerts:
- name: "pipeline_failure"
condition: "pipeline.status == 'FAILED'"
severity: "critical"
channels: ["slack:#data-alerts", "pagerduty"]
runbook: "https://wiki/runbooks/pipeline-failure"
- name: "pii_detected"
condition: "pii_detection.count > 0 AND data.classification >= 'confidential'"
severity: "critical"
channels: ["slack:#security-alerts", "email:data-owner"]
action: "pipeline.halt()"
runbook: "https://wiki/runbooks/pii-incident"
- name: "quality_degradation"
condition: "quality_gate.pass_rate < 0.90 FOR 3 consecutive runs"
severity: "warning"
channels: ["slack:#data-quality"]
action: "notify_data_steward()"
- name: "drift_detected"
condition: "data_drift.ks_statistic > 0.15"
severity: "warning"
channels: ["slack:#ml-ops"]
action: "trigger_model_evaluation()"
- name: "embedding_cost_alert"
condition: "monthly_embedding_cost > budget * 0.80"
severity: "warning"
channels: ["slack:#data-platform", "email:engineering-manager"]
データリネージュの管理
リネージュとは
データリネージュは、データがどこから来て、どのように変換され、どこで使われているかを追跡する仕組みです。
データリネージュの例:
[Salesforce CRM] [Slack Export]
│ │
▼ ▼
[ETL: 顧客抽出] [ETL: チャット抽出]
│ │
▼ ▼
[PII Masking v2.0] [PII Masking v2.0]
│ │
▼ ▼
[Data Lake: customer/masked/] [Data Lake: chat/masked/]
│ │
└──────────┬────────────────────────┘
│
▼
[Merge + Dedup]
│
▼
[Quality Gate]
(score >= 0.80)
│
▼
[Chunking: semantic]
│
▼
[Embedding: text-embedding-3-small]
│
▼
[Pinecone: helpdesk-index]
│
▼
[RAG Application: 社内ヘルプデスクAI]
リネージュの活用シーン
| シーン | リネージュの活用 |
|---|
| 品質問題の根本原因分析 | AIの回答が不正確 → どのチャンクが原因か → 元データのどの部分か |
| コンプライアンス監査 | 規制当局への報告: このAIが使うデータの出所と変換履歴 |
| 忘れられる権利の行使 | 特定ユーザーのデータがどのベクトルに含まれるかを特定 |
| モデル再学習の判断 | ソースデータの変更がモデル精度にどう影響するか |
| インシデント対応 | PII漏洩時の影響範囲の特定 |
リネージュ管理ツール
| ツール | 特徴 | 統合 |
|---|
| Apache Atlas | Hadoopエコシステム統合 | Hive, Spark, Kafka |
| DataHub | 汎用、REST API | 多数のコネクタ |
| OpenLineage | 標準仕様、相互運用性 | Airflow, Spark, dbt |
| AWS Glue DataBrew | AWS統合、ビジュアル | S3, Redshift, RDS |
| Marquez (WeWork OSS) | 軽量、OpenLineage準拠 | Airflow |
まとめ
| ポイント | 内容 |
|---|
| RAGパイプライン設計 | 取り込み→PII検出→前処理→チャンキング→品質ゲート→埋め込み→格納 |
| ベクトルDB選定 | 精度、スケーラビリティ、運用性、コストで総合判断 |
| ファインチューニングデータ | 収集→フィルタ→品質検証→フォーマット変換→学習→評価 |
| データバージョニング | DVCによるSemVer管理とメタデータの記録 |
| 監視とアラート | 品質・PII・ドリフト・コストの自動監視 |
| リネージュ管理 | ソースからAI利用までの全変換履歴の追跡 |
チェックリスト
次のステップへ
次は演習です。ここまで学んだデータガバナンスの知識を統合し、実際のシナリオに基づいてデータガバナンスポリシーを策定しましょう。
推定読了時間: 30分