LESSON 30分

データ可観測性

田中VPoE「データ品質テストは”既知の問題”を検出する仕組みだ。しかし、未知の問題はどうする?データ可観測性(Data Observability)は、予期しない異常も検出する仕組みを提供する。」

あなた「アプリケーションの可観測性(メトリクス、ログ、トレース)と同じ考え方をデータに適用するんですね。」

田中VPoE「その通り。データの健全性を常に把握できる状態を作ろう。」

データ可観測性とは

データ可観測性は、データパイプラインとデータの健全性を継続的に監視・把握するための仕組みです。

データ品質テスト vs データ可観測性

観点データ品質テストデータ可観測性
対象既知のルール既知 + 未知の異常
手法宣言的ルール定義統計的異常検知
タイミングパイプライン内で実行常時監視
ツールGreat ExpectationsMonte Carlo, Elementary

データ可観測性の5つの柱

説明
Freshnessデータが最新か最終更新が24時間以上前
Volumeデータ量が期待通りか行数が前日比50%以下
Schemaスキーマが変更されていないかカラム追加・削除・型変更
Distribution値の分布が正常か平均値が3σ以上変動
Lineageデータの来歴が追跡可能か上流のどのテーブルから来たか

異常検知の実装

import numpy as np
from datetime import datetime, timedelta

class DataObserver:
    """データ可観測性モニター"""

    def __init__(self, history_days: int = 30):
        self.history = {}
        self.history_days = history_days

    def observe_freshness(self, table: str, last_updated: datetime) -> dict:
        """鮮度チェック"""
        age_hours = (datetime.now() - last_updated).total_seconds() / 3600
        return {
            "metric": "freshness",
            "table": table,
            "age_hours": round(age_hours, 1),
            "status": "OK" if age_hours < 24 else "ALERT",
        }

    def observe_volume(self, table: str, current_count: int) -> dict:
        """ボリュームチェック"""
        key = f"{table}_volume"
        if key not in self.history:
            self.history[key] = []
        self.history[key].append(current_count)

        if len(self.history[key]) < 7:
            return {"metric": "volume", "table": table, "status": "INSUFFICIENT_DATA"}

        recent = self.history[key][-30:]
        mean = np.mean(recent[:-1])
        std = np.std(recent[:-1])

        if std == 0:
            z_score = 0
        else:
            z_score = (current_count - mean) / std

        return {
            "metric": "volume",
            "table": table,
            "current": current_count,
            "expected_mean": round(mean, 0),
            "z_score": round(z_score, 2),
            "status": "OK" if abs(z_score) < 3 else "ALERT",
        }

    def observe_distribution(self, table: str, column: str,
                            current_stats: dict, historical_stats: dict) -> dict:
        """分布チェック"""
        alerts = []
        for stat in ["mean", "std", "min", "max"]:
            if stat in current_stats and stat in historical_stats:
                curr = current_stats[stat]
                hist = historical_stats[stat]
                if hist != 0:
                    change_pct = abs(curr - hist) / abs(hist) * 100
                    if change_pct > 50:
                        alerts.append(f"{stat}: {change_pct:.0f}% change")

        return {
            "metric": "distribution",
            "table": table,
            "column": column,
            "alerts": alerts,
            "status": "OK" if not alerts else "ALERT",
        }

スキーマ変更検知

def detect_schema_changes(previous_schema: dict, current_schema: dict) -> list:
    """スキーマの変更を検知する"""
    changes = []

    prev_cols = set(previous_schema.keys())
    curr_cols = set(current_schema.keys())

    for col in curr_cols - prev_cols:
        changes.append({"type": "ADDED", "column": col, "dtype": current_schema[col]})

    for col in prev_cols - curr_cols:
        changes.append({"type": "REMOVED", "column": col, "dtype": previous_schema[col]})

    for col in prev_cols & curr_cols:
        if previous_schema[col] != current_schema[col]:
            changes.append({
                "type": "TYPE_CHANGED",
                "column": col,
                "from": previous_schema[col],
                "to": current_schema[col],
            })

    return changes

ダッシュボード設計

パネル表示内容更新頻度
データ鮮度マップ全テーブルの最終更新時刻と鮮度状態5分
ボリュームトレンドテーブルごとの行数推移(7日間)1時間
異常アラート一覧検知された異常の一覧と重要度リアルタイム
スキーマ変更履歴直近のスキーマ変更一覧日次
品質スコアトレンド品質スコアの推移日次

まとめ

項目ポイント
5つの柱Freshness、Volume、Schema、Distribution、Lineage
異常検知統計的手法(z-score等)で未知の異常も検出
スキーマ監視カラムの追加・削除・型変更を自動検知
ダッシュボード全テーブルの健全性を一覧で把握

チェックリスト

  • データ可観測性の5つの柱を説明できる
  • データ品質テストとデータ可観測性の違いを理解している
  • 統計的異常検知の基本的な実装方法を理解している
  • スキーマ変更検知の仕組みを説明できる

次のステップへ

データ可観測性を理解しました。次はデータリネージについて学びましょう。


推定読了時間:30分