LESSON 40分

ストーリー

佐藤CTO
“Garbage in, garbage out” — データ品質が悪ければ、どんな高度な分析も無意味だ
佐藤CTO
データ品質は後から対処するものではなく、アーキテクチャに組み込むものだ。品質の6次元、データコントラクト、そして自動検証の仕組みを学ぼう

データ品質の6次元

次元定義測定例
完全性 (Completeness)必要なデータが欠損なく存在するかNULL率 < 0.1%
正確性 (Accuracy)データが現実の値と一致するか住所の郵便番号マッチ率
一貫性 (Consistency)複数システム間でデータが矛盾しないか注文DB vs 決済DBの金額一致率
適時性 (Timeliness)データが必要なタイミングで利用可能かパイプライン遅延 < 2時間
一意性 (Uniqueness)重複データが存在しないか重複レコード率 < 0.01%
有効性 (Validity)データが定義された形式・範囲に収まるかメール形式の適合率

Great Expectations によるデータ検証

# Great Expectations: データ品質の自動検証

import great_expectations as gx

# コンテキストの初期化
context = gx.get_context()

# データソースの設定
datasource = context.sources.add_pandas("orders_source")
data_asset = datasource.add_dataframe_asset("orders")

# Expectation Suite の定義
suite = context.add_expectation_suite("orders_quality_suite")

# 完全性チェック
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(
        column="order_id",
        meta={"dimension": "completeness", "severity": "critical"}
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(
        column="customer_id",
        mostly=0.999,  # 99.9%以上がNULLでないこと
    )
)

# 有効性チェック
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="total_amount",
        min_value=0,
        max_value=10_000_000,
        meta={"dimension": "validity"}
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="status",
        value_set=["pending", "confirmed", "shipped", "delivered", "cancelled"],
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToMatchRegex(
        column="email",
        regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
    )
)

# 一意性チェック
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(
        column="order_id",
        meta={"dimension": "uniqueness", "severity": "critical"}
    )
)

# 適時性チェック
suite.add_expectation(
    gx.expectations.ExpectColumnMaxToBeBetween(
        column="created_at",
        min_value="{{yesterday}}",  # 昨日以降のデータがあること
        max_value="{{today}}",
    )
)

# 統計的チェック(異常検知)
suite.add_expectation(
    gx.expectations.ExpectColumnMeanToBeBetween(
        column="total_amount",
        min_value=1000,
        max_value=50000,
        meta={"dimension": "accuracy", "note": "平均注文額が異常範囲にないこと"}
    )
)

# 行数チェック(データ量の急激な変化を検知)
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=1000,
        max_value=1_000_000,
    )
)

データコントラクト

コントラクトの定義

# data-contract.yaml
# プロデューサーとコンシューマー間のデータ品質契約

apiVersion: datacontract/v1
kind: DataContract
metadata:
  name: order-events-contract
  version: "2.0.0"
  owner: order-team
  consumers:
    - analytics-team
    - marketing-team
    - finance-team

spec:
  # スキーマ定義(厳密な型)
  schema:
    type: object
    required: [order_id, customer_id, amount, status, event_time]
    properties:
      order_id:
        type: string
        format: uuid
        description: "注文ID (UUID v4)"
      customer_id:
        type: string
        format: uuid
      amount:
        type: number
        minimum: 0
        maximum: 10000000
        description: "注文金額(税込、円)"
      status:
        type: string
        enum: [created, confirmed, shipped, delivered, cancelled]
      event_time:
        type: string
        format: date-time
        description: "イベント発生時刻 (ISO 8601, UTC)"

  # SLO(Service Level Objectives)
  slo:
    freshness:
      maxDelaySeconds: 300  # 5分以内
    completeness:
      minPercentage: 99.9
    availability:
      uptimePercentage: 99.95
    volume:
      minDailyRecords: 1000
      maxDailyRecords: 1000000

  # 破壊的変更のルール
  compatibility:
    mode: BACKWARD  # 後方互換性を保証
    rules:
      - "フィールド削除は禁止"
      - "必須フィールドの追加は禁止"
      - "型の変更は禁止"
      - "オプショナルフィールドの追加は許可"

  # 違反時のアクション
  alerting:
    critical:
      - channel: pagerduty
        conditions: ["schema_violation", "freshness_breach > 30min"]
    warning:
      - channel: slack
        conditions: ["completeness < 99.9%", "volume_anomaly"]

コントラクトの自動検証

// CI/CDパイプラインでのデータコントラクト検証

interface ContractValidationResult {
  contractName: string;
  version: string;
  timestamp: Date;
  results: {
    schemaValid: boolean;
    sloMet: boolean;
    compatibilityValid: boolean;
    details: ValidationDetail[];
  };
}

class DataContractValidator {
  async validate(
    contract: DataContract,
    currentData: DataSample
  ): Promise<ContractValidationResult> {
    const details: ValidationDetail[] = [];

    // 1. スキーマ検証
    const schemaResult = this.validateSchema(contract.spec.schema, currentData);
    details.push(...schemaResult);

    // 2. SLO検証
    const sloResult = await this.validateSLO(contract.spec.slo, currentData);
    details.push(...sloResult);

    // 3. 後方互換性検証(前バージョンとの比較)
    const previousContract = await this.getPreviousVersion(contract);
    if (previousContract) {
      const compatResult = this.validateCompatibility(
        previousContract,
        contract,
        contract.spec.compatibility.mode
      );
      details.push(...compatResult);
    }

    return {
      contractName: contract.metadata.name,
      version: contract.metadata.version,
      timestamp: new Date(),
      results: {
        schemaValid: details.filter(d => d.category === 'schema').every(d => d.passed),
        sloMet: details.filter(d => d.category === 'slo').every(d => d.passed),
        compatibilityValid: details.filter(d => d.category === 'compatibility').every(d => d.passed),
        details,
      },
    };
  }

  private validateCompatibility(
    previous: DataContract,
    current: DataContract,
    mode: 'BACKWARD' | 'FORWARD' | 'FULL'
  ): ValidationDetail[] {
    const details: ValidationDetail[] = [];

    if (mode === 'BACKWARD' || mode === 'FULL') {
      // 既存フィールドの削除チェック
      for (const field of Object.keys(previous.spec.schema.properties)) {
        if (!(field in current.spec.schema.properties)) {
          details.push({
            category: 'compatibility',
            passed: false,
            message: `フィールド '${field}' が削除されています(後方互換性違反)`,
          });
        }
      }

      // 新規必須フィールドの追加チェック
      const newRequired = current.spec.schema.required.filter(
        f => !previous.spec.schema.required.includes(f)
      );
      if (newRequired.length > 0) {
        details.push({
          category: 'compatibility',
          passed: false,
          message: `新規必須フィールド ${newRequired.join(', ')} が追加されています`,
        });
      }
    }

    return details;
  }
}

パイプライン品質の監視

# データ品質ダッシュボード用メトリクス

from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class PipelineHealthMetrics:
    """パイプライン健全性メトリクス"""
    pipeline_name: str
    execution_date: datetime

    # パフォーマンス
    duration_seconds: float
    records_processed: int
    throughput_rps: float  # records per second

    # 品質スコア
    completeness_score: float  # 0-100
    validity_score: float
    uniqueness_score: float
    freshness_minutes: float

    # 異常検知
    row_count_deviation: float  # 前回比の偏差率
    null_rate_change: float     # NULL率の変化

    @property
    def overall_health(self) -> str:
        """総合健全性判定"""
        if self.completeness_score >= 99.9 and self.validity_score >= 99.5:
            return "HEALTHY"
        elif self.completeness_score >= 99.0 and self.validity_score >= 98.0:
            return "WARNING"
        else:
            return "CRITICAL"

# アラートルール
ALERT_RULES = {
    "freshness_breach": {
        "condition": lambda m: m.freshness_minutes > 120,
        "severity": "critical",
        "message": "データ鮮度が2時間を超過: {freshness_minutes:.0f}分",
    },
    "row_count_anomaly": {
        "condition": lambda m: abs(m.row_count_deviation) > 0.5,
        "severity": "warning",
        "message": "レコード数が前回比50%以上変動: {row_count_deviation:.1%}",
    },
    "completeness_drop": {
        "condition": lambda m: m.completeness_score < 99.0,
        "severity": "critical",
        "message": "完全性スコアが99%未満: {completeness_score:.2f}%",
    },
}
データ品質のシフトレフト

データ品質を下流で検知するのではなく、上流(ソース)で防ぐアプローチです。

レベル対策
アプリケーション入力バリデーションフォームバリデーション、API契約
データベース制約NOT NULL, CHECK, UNIQUE, FK
パイプライン変換時チェックdbt tests, Great Expectations
DWH品質ダッシュボードメトリクス監視、アノマリー検知

最も効果的なのはアプリケーション層での防止です。不正なデータがそもそも入らなければ、下流の品質問題は大幅に減少します。


まとめ

ポイント内容
品質6次元完全性/正確性/一貫性/適時性/一意性/有効性
Great Expectations宣言的にデータ品質ルールを定義・自動検証
データコントラクトプロデューサー/コンシューマー間の品質合意
監視メトリクス収集、アノマリー検知、アラートの自動化

チェックリスト

  • データ品質の6次元を説明できる
  • Great Expectationsでの品質チェック設定を理解した
  • データコントラクトの構成要素と後方互換性ルールを理解した
  • パイプライン品質の監視・アラート設計ができる

次のステップへ

次は演習でデータパイプラインの設計を実践します。


推定読了時間: 40分