EXERCISE 60分

演習:データ品質管理基盤を構築しよう

田中VPoE「データ品質、可観測性、リネージを学んだ。NetShop社のデータパイプラインに統合的な品質管理基盤を組み込もう。」

あなた「品質テスト、異常検知、リネージ追跡を統合したダッシュボードまで作るんですね。」

田中VPoE「そうだ。データの信頼性を組織全体で可視化できる基盤を作ろう。」

ミッション概要

NetShop社のデータパイプラインに品質テスト、異常検知、リネージ管理を統合したデータ品質管理基盤を構築します。

前提条件

  • Step 5の各レッスン(データ品質、データ可観測性、データリネージ)を修了していること

Mission 1: データ品質テストスイートの設計(20分)

NetShop社の主要テーブルに対する品質テストスイートを設計してください。

要件

  1. 3つのテーブル(orders、customers、products)の品質ルール定義
  2. 各テーブルに6つの次元(完全性・正確性・一貫性・適時性・一意性・妥当性)のテスト
解答例
QUALITY_RULES = {
    "orders": {
        "completeness": [
            {"column": "order_id", "rule": "not_null"},
            {"column": "customer_id", "rule": "not_null"},
            {"column": "total_amount", "rule": "not_null"},
        ],
        "accuracy": [
            {"column": "total_amount", "rule": "between", "min": 0, "max": 10000000},
            {"column": "order_date", "rule": "between",
             "min": "2020-01-01", "max": "2027-12-31"},
        ],
        "consistency": [
            {"rule": "referential_integrity",
             "column": "customer_id", "ref_table": "customers", "ref_column": "customer_id"},
        ],
        "timeliness": [
            {"rule": "freshness", "max_age_hours": 24},
        ],
        "uniqueness": [
            {"column": "order_id", "rule": "unique"},
        ],
        "validity": [
            {"column": "status", "rule": "in_set",
             "values": ["created", "paid", "shipped", "delivered", "cancelled"]},
        ],
    },
    "customers": {
        "completeness": [
            {"column": "customer_id", "rule": "not_null"},
            {"column": "email", "rule": "not_null"},
        ],
        "accuracy": [
            {"column": "email", "rule": "regex", "pattern": r"^[^@]+@[^@]+\.[^@]+$"},
        ],
        "uniqueness": [
            {"column": "customer_id", "rule": "unique"},
            {"column": "email", "rule": "unique"},
        ],
        "validity": [
            {"column": "region", "rule": "in_set",
             "values": ["関東", "関西", "中部", "北海道", "九州", "東北", "中国", "四国", "沖縄"]},
        ],
    },
    "products": {
        "completeness": [
            {"column": "product_id", "rule": "not_null"},
            {"column": "product_name", "rule": "not_null"},
            {"column": "price", "rule": "not_null"},
        ],
        "accuracy": [
            {"column": "price", "rule": "between", "min": 0, "max": 10000000},
        ],
        "uniqueness": [
            {"column": "product_id", "rule": "unique"},
        ],
    },
}

Mission 2: 異常検知モニターの実装(20分)

テーブルのボリュームと分布の異常を検知するモニターを実装してください。

要件

  1. 行数の異常検知(z-scoreベース)
  2. 数値カラムの分布変化検知
  3. 検知結果のサマリーレポート
解答例
import numpy as np
from datetime import datetime

class AnomalyMonitor:
    def __init__(self):
        self.volume_history = {}
        self.stat_history = {}

    def check_volume(self, table: str, count: int) -> dict:
        key = f"{table}_vol"
        if key not in self.volume_history:
            self.volume_history[key] = []
        self.volume_history[key].append(count)

        history = self.volume_history[key]
        if len(history) < 7:
            return {"table": table, "check": "volume", "status": "LEARNING"}

        mean = np.mean(history[-30:-1])
        std = np.std(history[-30:-1])
        z = (count - mean) / std if std > 0 else 0

        return {
            "table": table,
            "check": "volume",
            "current": count,
            "mean": round(mean),
            "z_score": round(z, 2),
            "status": "ALERT" if abs(z) > 3 else "WARNING" if abs(z) > 2 else "OK",
        }

    def check_distribution(self, table: str, column: str, values: np.ndarray) -> dict:
        key = f"{table}_{column}_dist"
        current_stats = {
            "mean": float(np.mean(values)),
            "std": float(np.std(values)),
            "median": float(np.median(values)),
            "null_pct": float(np.isnan(values).mean()) if np.issubdtype(values.dtype, np.floating) else 0,
        }

        if key in self.stat_history:
            prev = self.stat_history[key]
            changes = {}
            for stat in ["mean", "std", "median"]:
                if prev[stat] != 0:
                    pct = abs(current_stats[stat] - prev[stat]) / abs(prev[stat]) * 100
                    changes[stat] = round(pct, 1)

            significant = [k for k, v in changes.items() if v > 30]
            status = "ALERT" if significant else "OK"
        else:
            changes = {}
            significant = []
            status = "FIRST_RUN"

        self.stat_history[key] = current_stats

        return {
            "table": table,
            "column": column,
            "check": "distribution",
            "stats": current_stats,
            "changes_pct": changes,
            "significant_changes": significant,
            "status": status,
        }

    def generate_report(self, results: list) -> str:
        lines = [f"=== Anomaly Report ({datetime.now().isoformat()}) ==="]
        alerts = [r for r in results if r["status"] == "ALERT"]
        warnings = [r for r in results if r["status"] == "WARNING"]

        lines.append(f"Total checks: {len(results)}")
        lines.append(f"Alerts: {len(alerts)}, Warnings: {len(warnings)}")

        if alerts:
            lines.append("\n--- ALERTS ---")
            for a in alerts:
                lines.append(f"  {a['table']}.{a.get('column', '')} [{a['check']}]: {a.get('z_score', a.get('significant_changes', ''))}")

        return "\n".join(lines)

Mission 3: リネージマップの作成(20分)

NetShop社のデータパイプライン全体のリネージマップを作成してください。

要件

  1. ソースからmartまでの全テーブルの依存関係図
  2. 各テーブルの変更が影響するダウンストリームテーブルの一覧
  3. 影響分析レポートの生成
解答例
class LineageManager:
    def __init__(self):
        self.graph = {}

    def add_dependency(self, target: str, sources: list[str]):
        self.graph[target] = sources

    def get_upstream(self, table: str, depth: int = -1) -> set:
        visited = set()
        self._traverse_up(table, visited, depth, 0)
        visited.discard(table)
        return visited

    def _traverse_up(self, table: str, visited: set, max_depth: int, current: int):
        if table in visited or (max_depth >= 0 and current > max_depth):
            return
        visited.add(table)
        for source in self.graph.get(table, []):
            self._traverse_up(source, visited, max_depth, current + 1)

    def get_downstream(self, table: str) -> set:
        downstream = set()
        for target, sources in self.graph.items():
            if table in sources:
                downstream.add(target)
                downstream |= self.get_downstream(target)
        return downstream

    def impact_analysis(self, table: str) -> str:
        downstream = self.get_downstream(table)
        lines = [f"Impact Analysis: {table}"]
        lines.append(f"Affected tables: {len(downstream)}")
        for t in sorted(downstream):
            lines.append(f"  - {t}")
        return "\n".join(lines)

# NetShop社のリネージ定義
lineage = LineageManager()
lineage.add_dependency("stg_orders", ["raw.orders"])
lineage.add_dependency("stg_customers", ["raw.customers"])
lineage.add_dependency("stg_products", ["raw.products"])
lineage.add_dependency("int_order_details", ["stg_orders", "stg_products"])
lineage.add_dependency("mart_customer_summary", ["stg_customers", "int_order_details"])
lineage.add_dependency("mart_monthly_revenue", ["int_order_details"])
lineage.add_dependency("mart_product_performance", ["int_order_details", "stg_products"])

# 影響分析
print(lineage.impact_analysis("raw.orders"))
# raw.ordersの変更 → stg_orders → int_order_details → mart全テーブルに影響

達成度チェック

  • 6つの品質次元を網羅したテストスイートを設計できた
  • ボリュームと分布の異常検知モニターを実装できた
  • 異常検知サマリーレポートを生成できた
  • リネージマップを作成し、影響分析を実行できた

推定所要時間:60分