データ品質
田中VPoE「データパイプラインの構築ができた。しかし、パイプラインが動いていても中身のデータが正しいとは限らない。データ品質の管理を学ぼう。」
あなた「GIGO(Garbage In, Garbage Out)ですね。品質の悪いデータからは正しい意思決定はできません。」
田中VPoE「その通り。データ品質の定義、測定方法、自動チェックの仕組みを構築しよう。」
データ品質の6つの次元
| 次元 | 定義 | 測定例 |
|---|---|---|
| 完全性(Completeness) | 必要なデータが欠損なく存在する | NULL率、レコード数 |
| 正確性(Accuracy) | データが現実を正しく反映している | 範囲チェック、参照整合性 |
| 一貫性(Consistency) | システム間でデータが矛盾しない | クロスシステム比較 |
| 適時性(Timeliness) | データが必要なタイミングで利用可能 | 鮮度チェック、SLA |
| 一意性(Uniqueness) | 不要な重複がない | 重複率チェック |
| 妥当性(Validity) | データが定義された形式に適合 | スキーマバリデーション |
Great Expectations によるデータ品質チェック
import great_expectations as gx
# コンテキスト初期化
context = gx.get_context()
# データソース追加
datasource = context.sources.add_pandas(name="netshop")
asset = datasource.add_dataframe_asset(name="orders")
# Expectation Suite定義
suite = context.add_expectation_suite("orders_quality_suite")
# 完全性チェック
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(min_value=100, max_value=1000000)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
# 正確性チェック
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_amount", min_value=0, max_value=10000000
)
)
# 一意性チェック
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# 妥当性チェック
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["created", "paid", "shipped", "delivered", "cancelled"]
)
)
データ品質スコアカード
def calculate_quality_score(validation_results: dict) -> dict:
"""データ品質スコアを計算する"""
dimensions = {
"completeness": [],
"accuracy": [],
"uniqueness": [],
"validity": [],
}
for result in validation_results["results"]:
expectation_type = result["expectation_config"]["expectation_type"]
success = result["success"]
score = 100 if success else 0
if "not_be_null" in expectation_type or "row_count" in expectation_type:
dimensions["completeness"].append(score)
elif "between" in expectation_type:
dimensions["accuracy"].append(score)
elif "unique" in expectation_type:
dimensions["uniqueness"].append(score)
elif "in_set" in expectation_type:
dimensions["validity"].append(score)
scorecard = {}
for dim, scores in dimensions.items():
scorecard[dim] = round(sum(scores) / len(scores), 1) if scores else 100
scorecard["overall"] = round(sum(scorecard.values()) / len(scorecard), 1)
return scorecard
| スコア | 判定 | 対応 |
|---|---|---|
| 95-100 | 優秀 | モニタリング継続 |
| 80-94 | 良好 | 改善計画を立案 |
| 60-79 | 要改善 | 即座に原因調査 |
| 60未満 | 危険 | パイプライン停止・修正 |
データ品質テストの自動化
パイプラインの各段階でデータ品質テストを自動実行します。
[Extract] → 品質テスト → [Transform] → 品質テスト → [Load] → 品質テスト
↓ ↓ ↓
ソースデータ 変換後データ 最終データ
の妥当性確認 の整合性確認 の完全性確認
まとめ
| 項目 | ポイント |
|---|---|
| 6つの次元 | 完全性、正確性、一貫性、適時性、一意性、妥当性 |
| ツール | Great Expectationsで宣言的にルールを定義 |
| スコアカード | 次元ごとのスコアと総合スコアで可視化 |
| 自動化 | パイプラインの各段階でテストを自動実行 |
チェックリスト
- データ品質の6つの次元を説明できる
- Great Expectationsで基本的なバリデーションルールを定義できる
- データ品質スコアカードの概念を理解している
- パイプラインへのデータ品質テスト統合方法を説明できる
次のステップへ
データ品質の基礎を学びました。次はデータ可観測性について学びましょう。
推定読了時間:30分