演習:データ品質管理基盤を構築しよう
田中VPoE「データ品質、可観測性、リネージを学んだ。NetShop社のデータパイプラインに統合的な品質管理基盤を組み込もう。」
あなた「品質テスト、異常検知、リネージ追跡を統合したダッシュボードまで作るんですね。」
田中VPoE「そうだ。データの信頼性を組織全体で可視化できる基盤を作ろう。」
ミッション概要
NetShop社のデータパイプラインに品質テスト、異常検知、リネージ管理を統合したデータ品質管理基盤を構築します。
前提条件
- Step 5の各レッスン(データ品質、データ可観測性、データリネージ)を修了していること
Mission 1: データ品質テストスイートの設計(20分)
NetShop社の主要テーブルに対する品質テストスイートを設計してください。
要件
- 3つのテーブル(orders、customers、products)の品質ルール定義
- 各テーブルに6つの次元(完全性・正確性・一貫性・適時性・一意性・妥当性)のテスト
解答例
QUALITY_RULES = {
"orders": {
"completeness": [
{"column": "order_id", "rule": "not_null"},
{"column": "customer_id", "rule": "not_null"},
{"column": "total_amount", "rule": "not_null"},
],
"accuracy": [
{"column": "total_amount", "rule": "between", "min": 0, "max": 10000000},
{"column": "order_date", "rule": "between",
"min": "2020-01-01", "max": "2027-12-31"},
],
"consistency": [
{"rule": "referential_integrity",
"column": "customer_id", "ref_table": "customers", "ref_column": "customer_id"},
],
"timeliness": [
{"rule": "freshness", "max_age_hours": 24},
],
"uniqueness": [
{"column": "order_id", "rule": "unique"},
],
"validity": [
{"column": "status", "rule": "in_set",
"values": ["created", "paid", "shipped", "delivered", "cancelled"]},
],
},
"customers": {
"completeness": [
{"column": "customer_id", "rule": "not_null"},
{"column": "email", "rule": "not_null"},
],
"accuracy": [
{"column": "email", "rule": "regex", "pattern": r"^[^@]+@[^@]+\.[^@]+$"},
],
"uniqueness": [
{"column": "customer_id", "rule": "unique"},
{"column": "email", "rule": "unique"},
],
"validity": [
{"column": "region", "rule": "in_set",
"values": ["関東", "関西", "中部", "北海道", "九州", "東北", "中国", "四国", "沖縄"]},
],
},
"products": {
"completeness": [
{"column": "product_id", "rule": "not_null"},
{"column": "product_name", "rule": "not_null"},
{"column": "price", "rule": "not_null"},
],
"accuracy": [
{"column": "price", "rule": "between", "min": 0, "max": 10000000},
],
"uniqueness": [
{"column": "product_id", "rule": "unique"},
],
},
}
Mission 2: 異常検知モニターの実装(20分)
テーブルのボリュームと分布の異常を検知するモニターを実装してください。
要件
- 行数の異常検知(z-scoreベース)
- 数値カラムの分布変化検知
- 検知結果のサマリーレポート
解答例
import numpy as np
from datetime import datetime
class AnomalyMonitor:
def __init__(self):
self.volume_history = {}
self.stat_history = {}
def check_volume(self, table: str, count: int) -> dict:
key = f"{table}_vol"
if key not in self.volume_history:
self.volume_history[key] = []
self.volume_history[key].append(count)
history = self.volume_history[key]
if len(history) < 7:
return {"table": table, "check": "volume", "status": "LEARNING"}
mean = np.mean(history[-30:-1])
std = np.std(history[-30:-1])
z = (count - mean) / std if std > 0 else 0
return {
"table": table,
"check": "volume",
"current": count,
"mean": round(mean),
"z_score": round(z, 2),
"status": "ALERT" if abs(z) > 3 else "WARNING" if abs(z) > 2 else "OK",
}
def check_distribution(self, table: str, column: str, values: np.ndarray) -> dict:
key = f"{table}_{column}_dist"
current_stats = {
"mean": float(np.mean(values)),
"std": float(np.std(values)),
"median": float(np.median(values)),
"null_pct": float(np.isnan(values).mean()) if np.issubdtype(values.dtype, np.floating) else 0,
}
if key in self.stat_history:
prev = self.stat_history[key]
changes = {}
for stat in ["mean", "std", "median"]:
if prev[stat] != 0:
pct = abs(current_stats[stat] - prev[stat]) / abs(prev[stat]) * 100
changes[stat] = round(pct, 1)
significant = [k for k, v in changes.items() if v > 30]
status = "ALERT" if significant else "OK"
else:
changes = {}
significant = []
status = "FIRST_RUN"
self.stat_history[key] = current_stats
return {
"table": table,
"column": column,
"check": "distribution",
"stats": current_stats,
"changes_pct": changes,
"significant_changes": significant,
"status": status,
}
def generate_report(self, results: list) -> str:
lines = [f"=== Anomaly Report ({datetime.now().isoformat()}) ==="]
alerts = [r for r in results if r["status"] == "ALERT"]
warnings = [r for r in results if r["status"] == "WARNING"]
lines.append(f"Total checks: {len(results)}")
lines.append(f"Alerts: {len(alerts)}, Warnings: {len(warnings)}")
if alerts:
lines.append("\n--- ALERTS ---")
for a in alerts:
lines.append(f" {a['table']}.{a.get('column', '')} [{a['check']}]: {a.get('z_score', a.get('significant_changes', ''))}")
return "\n".join(lines)
Mission 3: リネージマップの作成(20分)
NetShop社のデータパイプライン全体のリネージマップを作成してください。
要件
- ソースからmartまでの全テーブルの依存関係図
- 各テーブルの変更が影響するダウンストリームテーブルの一覧
- 影響分析レポートの生成
解答例
class LineageManager:
def __init__(self):
self.graph = {}
def add_dependency(self, target: str, sources: list[str]):
self.graph[target] = sources
def get_upstream(self, table: str, depth: int = -1) -> set:
visited = set()
self._traverse_up(table, visited, depth, 0)
visited.discard(table)
return visited
def _traverse_up(self, table: str, visited: set, max_depth: int, current: int):
if table in visited or (max_depth >= 0 and current > max_depth):
return
visited.add(table)
for source in self.graph.get(table, []):
self._traverse_up(source, visited, max_depth, current + 1)
def get_downstream(self, table: str) -> set:
downstream = set()
for target, sources in self.graph.items():
if table in sources:
downstream.add(target)
downstream |= self.get_downstream(target)
return downstream
def impact_analysis(self, table: str) -> str:
downstream = self.get_downstream(table)
lines = [f"Impact Analysis: {table}"]
lines.append(f"Affected tables: {len(downstream)}")
for t in sorted(downstream):
lines.append(f" - {t}")
return "\n".join(lines)
# NetShop社のリネージ定義
lineage = LineageManager()
lineage.add_dependency("stg_orders", ["raw.orders"])
lineage.add_dependency("stg_customers", ["raw.customers"])
lineage.add_dependency("stg_products", ["raw.products"])
lineage.add_dependency("int_order_details", ["stg_orders", "stg_products"])
lineage.add_dependency("mart_customer_summary", ["stg_customers", "int_order_details"])
lineage.add_dependency("mart_monthly_revenue", ["int_order_details"])
lineage.add_dependency("mart_product_performance", ["int_order_details", "stg_products"])
# 影響分析
print(lineage.impact_analysis("raw.orders"))
# raw.ordersの変更 → stg_orders → int_order_details → mart全テーブルに影響
達成度チェック
- 6つの品質次元を網羅したテストスイートを設計できた
- ボリュームと分布の異常検知モニターを実装できた
- 異常検知サマリーレポートを生成できた
- リネージマップを作成し、影響分析を実行できた
推定所要時間:60分