ストーリー
データ品質の6次元
| 次元 | 定義 | 測定例 |
|---|---|---|
| 完全性 (Completeness) | 必要なデータが欠損なく存在するか | NULL率 < 0.1% |
| 正確性 (Accuracy) | データが現実の値と一致するか | 住所の郵便番号マッチ率 |
| 一貫性 (Consistency) | 複数システム間でデータが矛盾しないか | 注文DB vs 決済DBの金額一致率 |
| 適時性 (Timeliness) | データが必要なタイミングで利用可能か | パイプライン遅延 < 2時間 |
| 一意性 (Uniqueness) | 重複データが存在しないか | 重複レコード率 < 0.01% |
| 有効性 (Validity) | データが定義された形式・範囲に収まるか | メール形式の適合率 |
Great Expectations によるデータ検証
# Great Expectations: データ品質の自動検証
import great_expectations as gx
# コンテキストの初期化
context = gx.get_context()
# データソースの設定
datasource = context.sources.add_pandas("orders_source")
data_asset = datasource.add_dataframe_asset("orders")
# Expectation Suite の定義
suite = context.add_expectation_suite("orders_quality_suite")
# 完全性チェック
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column="order_id",
meta={"dimension": "completeness", "severity": "critical"}
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column="customer_id",
mostly=0.999, # 99.9%以上がNULLでないこと
)
)
# 有効性チェック
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_amount",
min_value=0,
max_value=10_000_000,
meta={"dimension": "validity"}
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "confirmed", "shipped", "delivered", "cancelled"],
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
)
)
# 一意性チェック
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(
column="order_id",
meta={"dimension": "uniqueness", "severity": "critical"}
)
)
# 適時性チェック
suite.add_expectation(
gx.expectations.ExpectColumnMaxToBeBetween(
column="created_at",
min_value="{{yesterday}}", # 昨日以降のデータがあること
max_value="{{today}}",
)
)
# 統計的チェック(異常検知)
suite.add_expectation(
gx.expectations.ExpectColumnMeanToBeBetween(
column="total_amount",
min_value=1000,
max_value=50000,
meta={"dimension": "accuracy", "note": "平均注文額が異常範囲にないこと"}
)
)
# 行数チェック(データ量の急激な変化を検知)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000,
max_value=1_000_000,
)
)
データコントラクト
コントラクトの定義
# data-contract.yaml
# プロデューサーとコンシューマー間のデータ品質契約
apiVersion: datacontract/v1
kind: DataContract
metadata:
name: order-events-contract
version: "2.0.0"
owner: order-team
consumers:
- analytics-team
- marketing-team
- finance-team
spec:
# スキーマ定義(厳密な型)
schema:
type: object
required: [order_id, customer_id, amount, status, event_time]
properties:
order_id:
type: string
format: uuid
description: "注文ID (UUID v4)"
customer_id:
type: string
format: uuid
amount:
type: number
minimum: 0
maximum: 10000000
description: "注文金額(税込、円)"
status:
type: string
enum: [created, confirmed, shipped, delivered, cancelled]
event_time:
type: string
format: date-time
description: "イベント発生時刻 (ISO 8601, UTC)"
# SLO(Service Level Objectives)
slo:
freshness:
maxDelaySeconds: 300 # 5分以内
completeness:
minPercentage: 99.9
availability:
uptimePercentage: 99.95
volume:
minDailyRecords: 1000
maxDailyRecords: 1000000
# 破壊的変更のルール
compatibility:
mode: BACKWARD # 後方互換性を保証
rules:
- "フィールド削除は禁止"
- "必須フィールドの追加は禁止"
- "型の変更は禁止"
- "オプショナルフィールドの追加は許可"
# 違反時のアクション
alerting:
critical:
- channel: pagerduty
conditions: ["schema_violation", "freshness_breach > 30min"]
warning:
- channel: slack
conditions: ["completeness < 99.9%", "volume_anomaly"]
コントラクトの自動検証
// CI/CDパイプラインでのデータコントラクト検証
interface ContractValidationResult {
contractName: string;
version: string;
timestamp: Date;
results: {
schemaValid: boolean;
sloMet: boolean;
compatibilityValid: boolean;
details: ValidationDetail[];
};
}
class DataContractValidator {
async validate(
contract: DataContract,
currentData: DataSample
): Promise<ContractValidationResult> {
const details: ValidationDetail[] = [];
// 1. スキーマ検証
const schemaResult = this.validateSchema(contract.spec.schema, currentData);
details.push(...schemaResult);
// 2. SLO検証
const sloResult = await this.validateSLO(contract.spec.slo, currentData);
details.push(...sloResult);
// 3. 後方互換性検証(前バージョンとの比較)
const previousContract = await this.getPreviousVersion(contract);
if (previousContract) {
const compatResult = this.validateCompatibility(
previousContract,
contract,
contract.spec.compatibility.mode
);
details.push(...compatResult);
}
return {
contractName: contract.metadata.name,
version: contract.metadata.version,
timestamp: new Date(),
results: {
schemaValid: details.filter(d => d.category === 'schema').every(d => d.passed),
sloMet: details.filter(d => d.category === 'slo').every(d => d.passed),
compatibilityValid: details.filter(d => d.category === 'compatibility').every(d => d.passed),
details,
},
};
}
private validateCompatibility(
previous: DataContract,
current: DataContract,
mode: 'BACKWARD' | 'FORWARD' | 'FULL'
): ValidationDetail[] {
const details: ValidationDetail[] = [];
if (mode === 'BACKWARD' || mode === 'FULL') {
// 既存フィールドの削除チェック
for (const field of Object.keys(previous.spec.schema.properties)) {
if (!(field in current.spec.schema.properties)) {
details.push({
category: 'compatibility',
passed: false,
message: `フィールド '${field}' が削除されています(後方互換性違反)`,
});
}
}
// 新規必須フィールドの追加チェック
const newRequired = current.spec.schema.required.filter(
f => !previous.spec.schema.required.includes(f)
);
if (newRequired.length > 0) {
details.push({
category: 'compatibility',
passed: false,
message: `新規必須フィールド ${newRequired.join(', ')} が追加されています`,
});
}
}
return details;
}
}
パイプライン品質の監視
# データ品質ダッシュボード用メトリクス
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class PipelineHealthMetrics:
"""パイプライン健全性メトリクス"""
pipeline_name: str
execution_date: datetime
# パフォーマンス
duration_seconds: float
records_processed: int
throughput_rps: float # records per second
# 品質スコア
completeness_score: float # 0-100
validity_score: float
uniqueness_score: float
freshness_minutes: float
# 異常検知
row_count_deviation: float # 前回比の偏差率
null_rate_change: float # NULL率の変化
@property
def overall_health(self) -> str:
"""総合健全性判定"""
if self.completeness_score >= 99.9 and self.validity_score >= 99.5:
return "HEALTHY"
elif self.completeness_score >= 99.0 and self.validity_score >= 98.0:
return "WARNING"
else:
return "CRITICAL"
# アラートルール
ALERT_RULES = {
"freshness_breach": {
"condition": lambda m: m.freshness_minutes > 120,
"severity": "critical",
"message": "データ鮮度が2時間を超過: {freshness_minutes:.0f}分",
},
"row_count_anomaly": {
"condition": lambda m: abs(m.row_count_deviation) > 0.5,
"severity": "warning",
"message": "レコード数が前回比50%以上変動: {row_count_deviation:.1%}",
},
"completeness_drop": {
"condition": lambda m: m.completeness_score < 99.0,
"severity": "critical",
"message": "完全性スコアが99%未満: {completeness_score:.2f}%",
},
}
データ品質のシフトレフト
データ品質を下流で検知するのではなく、上流(ソース)で防ぐアプローチです。
| レベル | 対策 | 例 |
|---|---|---|
| アプリケーション | 入力バリデーション | フォームバリデーション、API契約 |
| データベース | 制約 | NOT NULL, CHECK, UNIQUE, FK |
| パイプライン | 変換時チェック | dbt tests, Great Expectations |
| DWH | 品質ダッシュボード | メトリクス監視、アノマリー検知 |
最も効果的なのはアプリケーション層での防止です。不正なデータがそもそも入らなければ、下流の品質問題は大幅に減少します。
まとめ
| ポイント | 内容 |
|---|---|
| 品質6次元 | 完全性/正確性/一貫性/適時性/一意性/有効性 |
| Great Expectations | 宣言的にデータ品質ルールを定義・自動検証 |
| データコントラクト | プロデューサー/コンシューマー間の品質合意 |
| 監視 | メトリクス収集、アノマリー検知、アラートの自動化 |
チェックリスト
- データ品質の6次元を説明できる
- Great Expectationsでの品質チェック設定を理解した
- データコントラクトの構成要素と後方互換性ルールを理解した
- パイプライン品質の監視・アラート設計ができる
次のステップへ
次は演習でデータパイプラインの設計を実践します。
推定読了時間: 40分