LESSON 30分

ストーリー

田中VPoE
データガバナンスの方針、品質管理、プライバシー対策 — ここまでの3つのレッスンで「何を守るか」を学んだ。次は「どう動かすか」だ
あなた
データパイプラインですね。RAGやファインチューニングにデータを流す仕組み
田中VPoE
その通り。だが、エンタープライズのデータパイプラインは単純にデータを流すだけでは済まない。品質チェック、PII検出、バージョニング、リネージュ追跡 — これらを全て組み込んだ堅牢なパイプラインが必要だ
あなた
品質もプライバシーも自動化の中に組み込むわけですね
田中VPoE
そうだ。手動チェックに頼る運用はスケールしない。パイプラインに品質ゲートとプライバシーフィルタを組み込み、問題があれば自動で止まる。そんな設計を学ぼう

RAG用データパイプライン設計

全体アーキテクチャ

┌──────────────────────────────────────────────────────────┐
│                RAG データパイプライン                       │
├──────────────────────────────────────────────────────────┤
│                                                          │
│  [データソース]     [取り込み]       [前処理]              │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐           │
│  │ ドキュメント│    │ Ingestion│    │ Cleaning │           │
│  │ DB        │───▶│ Service  │───▶│ Pipeline │           │
│  │ API       │    │          │    │          │           │
│  │ ファイル   │    │          │    │          │           │
│  └──────────┘    └──────────┘    └────┬─────┘           │
│                                       │                  │
│                                       ▼                  │
│  [品質ゲート]      [変換]         [PII検出]              │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐           │
│  │ Quality  │◀───│ Chunking │◀───│ PII      │           │
│  │ Gate     │    │ +Enrichm.│    │ Detection│           │
│  │          │    │          │    │ +Masking │           │
│  └────┬─────┘    └──────────┘    └──────────┘           │
│       │                                                  │
│       ▼                                                  │
│  [埋め込み]       [ベクトルDB]     [メタデータ]           │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐           │
│  │ Embedding│───▶│ Vector   │    │ Metadata │           │
│  │ Model    │    │ Store    │    │ Store    │           │
│  │          │    │          │    │(Lineage) │           │
│  └──────────┘    └──────────┘    └──────────┘           │
│                                                          │
└──────────────────────────────────────────────────────────┘

パイプラインの各ステージ

ステージ入力処理内容出力失敗時の動作
取り込み各種データソースフォーマット正規化、重複検出正規化ドキュメントリトライ → DLQ
PII検出正規化ドキュメントPII検出、マスキング/除外クリーンドキュメントパイプライン停止
前処理クリーンドキュメントテキスト抽出、正規化処理済みテキストスキップ+ログ
チャンキング処理済みテキストセマンティック/固定長分割チャンク群スキップ+ログ
品質ゲートチャンク群品質スコア算出、閾値チェック検証済みチャンク品質不足を通知
埋め込み検証済みチャンクベクトル化埋め込みベクトルリトライ → DLQ
格納埋め込みベクトルベクトルDB/メタデータDB書き込みインデックス完了ロールバック

チャンキング戦略

from enum import Enum
from dataclasses import dataclass

class ChunkingStrategy(Enum):
    FIXED_SIZE = "fixed_size"          # 固定トークン数で分割
    SEMANTIC = "semantic"              # 意味のまとまりで分割
    RECURSIVE = "recursive"            # 段階的に分割
    DOCUMENT_AWARE = "document_aware"  # 文書構造を考慮

@dataclass
class ChunkingConfig:
    strategy: ChunkingStrategy
    chunk_size: int = 512        # トークン数
    chunk_overlap: int = 50      # オーバーラップ
    min_chunk_size: int = 100    # 最小チャンクサイズ

# 推奨設定
CHUNKING_CONFIGS = {
    "faq": ChunkingConfig(
        strategy=ChunkingStrategy.DOCUMENT_AWARE,
        chunk_size=256,
        chunk_overlap=0,     # FAQ は質問-回答を1チャンクに
    ),
    "technical_docs": ChunkingConfig(
        strategy=ChunkingStrategy.RECURSIVE,
        chunk_size=512,
        chunk_overlap=50,
    ),
    "contracts": ChunkingConfig(
        strategy=ChunkingStrategy.SEMANTIC,
        chunk_size=1024,
        chunk_overlap=100,   # 法的文書は文脈が重要
    ),
    "chat_logs": ChunkingConfig(
        strategy=ChunkingStrategy.FIXED_SIZE,
        chunk_size=256,
        chunk_overlap=30,
    ),
}

ベクトルDB/埋め込みモデル選定

ベクトルDBの比較

ベクトルDBデプロイスケーラビリティ特徴適用シーン
Pineconeマネージドフルマネージド、メタデータフィルタリング運用負荷を最小化したい
Weaviateセルフホスト/マネージドマルチモーダル、ハイブリッド検索テキスト+画像の検索
Qdrantセルフホスト/マネージド中-高Rust製高速、ペイロードフィルタ高性能が必要
pgvectorPostgreSQL拡張既存PostgreSQLに追加小-中規模、既存PG活用
Chromaセルフホスト低-中軽量、開発向けPoC、プロトタイピング
Amazon OpenSearchマネージド (AWS)k-NN検索、AWS統合AWS基盤の組織

選定の判断基準

vector_db_selection_criteria:
  functional:
    - name: "検索精度"
      weight: 0.25
      metrics: ["Recall@10", "MRR"]

    - name: "メタデータフィルタリング"
      weight: 0.20
      consideration: "データ分類レベルによるアクセス制御"

    - name: "ハイブリッド検索"
      weight: 0.15
      consideration: "ベクトル検索 + キーワード検索の組み合わせ"

  non_functional:
    - name: "レイテンシ"
      weight: 0.15
      target: "p99 < 100ms"

    - name: "スケーラビリティ"
      weight: 0.10
      target: "1億ベクトル以上に対応"

    - name: "運用性"
      weight: 0.10
      consideration: "バックアップ、監視、アップグレード"

    - name: "コスト"
      weight: 0.05
      consideration: "ベクトル数・クエリ数に応じた料金体系"

埋め込みモデルの比較

モデル次元数最大トークン特徴用途
text-embedding-3-small (OpenAI)15368191低コスト、高品質汎用
text-embedding-3-large (OpenAI)30728191最高精度高精度が必要
Cohere embed-v31024512多言語対応多言語RAG
E5-large-v2 (OSS)1024512セルフホスト可能データを外部に出せない場合
multilingual-e5-large (OSS)1024512多言語、セルフホスト可日本語+セルフホスト

ファインチューニング用データ準備フロー

データ準備のワークフロー

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ 1. データ収集 │───▶│ 2. フィルタ  │───▶│ 3. 品質検証  │
│  ・タスク定義 │    │  ・PII除去   │    │  ・ラベル検証 │
│  ・データソース│    │  ・重複除去   │    │  ・バイアス   │
│   特定       │    │  ・品質基準   │    │   チェック    │
└─────────────┘    └─────────────┘    └──────┬──────┘

┌─────────────┐    ┌─────────────┐    ┌──────▼──────┐
│ 6. 評価     │◀───│ 5. 学習     │◀───│ 4. フォーマ  │
│  ・精度検証  │    │  ・パラメータ │    │   ット変換   │
│  ・バイアス  │    │   チューニング│    │  ・JSONL     │
│   再検証    │    │  ・検証分割   │    │  ・train/val │
└─────────────┘    └─────────────┘    └─────────────┘

ファインチューニングデータのフォーマット

{"messages": [{"role": "system", "content": "あなたは社内ヘルプデスクのアシスタントです。"}, {"role": "user", "content": "有給休暇の申請方法を教えてください"}, {"role": "assistant", "content": "有給休暇の申請手順は以下の通りです:\n1. 社内ポータルの「勤怠管理」にログイン\n2. 「休暇申請」メニューを選択\n3. 休暇種別で「年次有給休暇」を選択\n4. 取得希望日を入力\n5. 上長の承認を待つ\n\n申請は取得希望日の3営業日前までにお願いします。"}]}
{"messages": [{"role": "system", "content": "あなたは社内ヘルプデスクのアシスタントです。"}, {"role": "user", "content": "VPN接続ができません"}, {"role": "assistant", "content": "VPN接続のトラブルシューティングを行います:\n\n1. VPNクライアントのバージョンを確認(最新: v4.2.1)\n2. ネットワーク接続を確認\n3. 認証情報(ID/パスワード)の有効期限を確認\n4. ファイアウォール設定を確認\n\n上記で解決しない場合は、IT部門(内線: 2345)までご連絡ください。"}]}

データ品質チェックリスト

チェック項目基準ツール
データ件数最低500件、推奨1000件以上手動カウント
ラベル品質Inter-annotator agreement >= 0.8Cohen’s Kappa
PII除去PII検出率 0%自動検出ツール
多様性カテゴリの偏りが10
分布分析
重複率5%以下類似度検出
フォーマット全レコードが正しいJSON形式スキーマ検証
トークン長最大トークン長を超えないことトークナイザー

データバージョニング

DVC (Data Version Control) の活用

# DVCの初期化
dvc init

# データファイルの追跡
dvc add data/training/customer_support_v1.jsonl

# メタデータのコミット
git add data/training/customer_support_v1.jsonl.dvc .gitignore
git commit -m "Add training data v1: 1200 customer support examples"

# リモートストレージの設定
dvc remote add -d s3store s3://ml-data-versioning/training

# データのプッシュ
dvc push

# 特定バージョンに切り替え
git checkout v1.0.0
dvc checkout

データバージョニング戦略

項目方針
バージョン命名SemVer準拠: v{major}.{minor}.{patch}
Major更新スキーマ変更、大規模データ入替
Minor更新データの追加、ラベルの修正
Patch更新軽微なクレンジング、typo修正
保持期間直近5バージョン + 各モデルの学習時バージョン
メタデータ件数、品質スコア、変更概要を記録
# データバージョンのメタデータ例
data_version:
  id: "customer_support_v2.1.0"
  created_at: "2026-02-15T10:00:00+09:00"
  created_by: "data-team"

  statistics:
    total_records: 1500
    added: 300
    removed: 0
    modified: 45

  quality:
    overall_score: 0.89
    completeness: 0.97
    bias_check: "passed"
    pii_check: "passed"

  lineage:
    source_systems: ["zendesk", "slack", "internal_wiki"]
    transformations:
      - "PII masking (v2.0)"
      - "Deduplication (cosine similarity > 0.95)"
      - "Quality filter (score >= 0.7)"

  associated_models:
    - model_id: "helpdesk-gpt-v3"
      trained_at: "2026-02-16"
      performance: { accuracy: 0.92, f1: 0.89 }

パイプラインの監視とアラート

監視すべきメトリクス

カテゴリメトリクス閾値例アラート
パフォーマンスパイプライン実行時間> 想定の2倍Warning
パフォーマンススループット (docs/min)< 100 docs/minWarning
品質品質ゲート通過率< 90%Critical
品質PII検出件数> 0 (Level 3以上)Critical
信頼性パイプライン成功率< 99%Warning
信頼性DLQメッセージ数> 100/日Warning
データドリフトKS統計量> 0.1Warning
コスト埋め込みAPI呼び出し数> 予算の80%Warning

アラートの設定例

monitoring:
  pipeline_alerts:
    - name: "pipeline_failure"
      condition: "pipeline.status == 'FAILED'"
      severity: "critical"
      channels: ["slack:#data-alerts", "pagerduty"]
      runbook: "https://wiki/runbooks/pipeline-failure"

    - name: "pii_detected"
      condition: "pii_detection.count > 0 AND data.classification >= 'confidential'"
      severity: "critical"
      channels: ["slack:#security-alerts", "email:data-owner"]
      action: "pipeline.halt()"
      runbook: "https://wiki/runbooks/pii-incident"

    - name: "quality_degradation"
      condition: "quality_gate.pass_rate < 0.90 FOR 3 consecutive runs"
      severity: "warning"
      channels: ["slack:#data-quality"]
      action: "notify_data_steward()"

    - name: "drift_detected"
      condition: "data_drift.ks_statistic > 0.15"
      severity: "warning"
      channels: ["slack:#ml-ops"]
      action: "trigger_model_evaluation()"

    - name: "embedding_cost_alert"
      condition: "monthly_embedding_cost > budget * 0.80"
      severity: "warning"
      channels: ["slack:#data-platform", "email:engineering-manager"]

データリネージュの管理

リネージュとは

データリネージュは、データがどこから来て、どのように変換され、どこで使われているかを追跡する仕組みです。

データリネージュの例:

[Salesforce CRM]                    [Slack Export]
     │                                   │
     ▼                                   ▼
[ETL: 顧客抽出]                    [ETL: チャット抽出]
     │                                   │
     ▼                                   ▼
[PII Masking v2.0]                 [PII Masking v2.0]
     │                                   │
     ▼                                   ▼
[Data Lake: customer/masked/]      [Data Lake: chat/masked/]
     │                                   │
     └──────────┬────────────────────────┘


        [Merge + Dedup]


        [Quality Gate]
          (score >= 0.80)


        [Chunking: semantic]


        [Embedding: text-embedding-3-small]


        [Pinecone: helpdesk-index]


        [RAG Application: 社内ヘルプデスクAI]

リネージュの活用シーン

シーンリネージュの活用
品質問題の根本原因分析AIの回答が不正確 → どのチャンクが原因か → 元データのどの部分か
コンプライアンス監査規制当局への報告: このAIが使うデータの出所と変換履歴
忘れられる権利の行使特定ユーザーのデータがどのベクトルに含まれるかを特定
モデル再学習の判断ソースデータの変更がモデル精度にどう影響するか
インシデント対応PII漏洩時の影響範囲の特定

リネージュ管理ツール

ツール特徴統合
Apache AtlasHadoopエコシステム統合Hive, Spark, Kafka
DataHub汎用、REST API多数のコネクタ
OpenLineage標準仕様、相互運用性Airflow, Spark, dbt
AWS Glue DataBrewAWS統合、ビジュアルS3, Redshift, RDS
Marquez (WeWork OSS)軽量、OpenLineage準拠Airflow

まとめ

ポイント内容
RAGパイプライン設計取り込み→PII検出→前処理→チャンキング→品質ゲート→埋め込み→格納
ベクトルDB選定精度、スケーラビリティ、運用性、コストで総合判断
ファインチューニングデータ収集→フィルタ→品質検証→フォーマット変換→学習→評価
データバージョニングDVCによるSemVer管理とメタデータの記録
監視とアラート品質・PII・ドリフト・コストの自動監視
リネージュ管理ソースからAI利用までの全変換履歴の追跡

チェックリスト

  • RAG用データパイプラインの各ステージを説明できる
  • ベクトルDBと埋め込みモデルの選定基準を理解した
  • ファインチューニング用データの準備フローを把握した
  • DVCによるデータバージョニングの仕組みを理解した
  • パイプラインの監視項目とアラート設計を把握した
  • データリネージュの重要性と活用シーンを説明できる

次のステップへ

次は演習です。ここまで学んだデータガバナンスの知識を統合し、実際のシナリオに基づいてデータガバナンスポリシーを策定しましょう。


推定読了時間: 30分