データ可観測性
田中VPoE「データ品質テストは”既知の問題”を検出する仕組みだ。しかし、未知の問題はどうする?データ可観測性(Data Observability)は、予期しない異常も検出する仕組みを提供する。」
あなた「アプリケーションの可観測性(メトリクス、ログ、トレース)と同じ考え方をデータに適用するんですね。」
田中VPoE「その通り。データの健全性を常に把握できる状態を作ろう。」
データ可観測性とは
データ可観測性は、データパイプラインとデータの健全性を継続的に監視・把握するための仕組みです。
データ品質テスト vs データ可観測性
| 観点 | データ品質テスト | データ可観測性 |
|---|---|---|
| 対象 | 既知のルール | 既知 + 未知の異常 |
| 手法 | 宣言的ルール定義 | 統計的異常検知 |
| タイミング | パイプライン内で実行 | 常時監視 |
| ツール | Great Expectations | Monte Carlo, Elementary |
データ可観測性の5つの柱
| 柱 | 説明 | 例 |
|---|---|---|
| Freshness | データが最新か | 最終更新が24時間以上前 |
| Volume | データ量が期待通りか | 行数が前日比50%以下 |
| Schema | スキーマが変更されていないか | カラム追加・削除・型変更 |
| Distribution | 値の分布が正常か | 平均値が3σ以上変動 |
| Lineage | データの来歴が追跡可能か | 上流のどのテーブルから来たか |
異常検知の実装
import numpy as np
from datetime import datetime, timedelta
class DataObserver:
"""データ可観測性モニター"""
def __init__(self, history_days: int = 30):
self.history = {}
self.history_days = history_days
def observe_freshness(self, table: str, last_updated: datetime) -> dict:
"""鮮度チェック"""
age_hours = (datetime.now() - last_updated).total_seconds() / 3600
return {
"metric": "freshness",
"table": table,
"age_hours": round(age_hours, 1),
"status": "OK" if age_hours < 24 else "ALERT",
}
def observe_volume(self, table: str, current_count: int) -> dict:
"""ボリュームチェック"""
key = f"{table}_volume"
if key not in self.history:
self.history[key] = []
self.history[key].append(current_count)
if len(self.history[key]) < 7:
return {"metric": "volume", "table": table, "status": "INSUFFICIENT_DATA"}
recent = self.history[key][-30:]
mean = np.mean(recent[:-1])
std = np.std(recent[:-1])
if std == 0:
z_score = 0
else:
z_score = (current_count - mean) / std
return {
"metric": "volume",
"table": table,
"current": current_count,
"expected_mean": round(mean, 0),
"z_score": round(z_score, 2),
"status": "OK" if abs(z_score) < 3 else "ALERT",
}
def observe_distribution(self, table: str, column: str,
current_stats: dict, historical_stats: dict) -> dict:
"""分布チェック"""
alerts = []
for stat in ["mean", "std", "min", "max"]:
if stat in current_stats and stat in historical_stats:
curr = current_stats[stat]
hist = historical_stats[stat]
if hist != 0:
change_pct = abs(curr - hist) / abs(hist) * 100
if change_pct > 50:
alerts.append(f"{stat}: {change_pct:.0f}% change")
return {
"metric": "distribution",
"table": table,
"column": column,
"alerts": alerts,
"status": "OK" if not alerts else "ALERT",
}
スキーマ変更検知
def detect_schema_changes(previous_schema: dict, current_schema: dict) -> list:
"""スキーマの変更を検知する"""
changes = []
prev_cols = set(previous_schema.keys())
curr_cols = set(current_schema.keys())
for col in curr_cols - prev_cols:
changes.append({"type": "ADDED", "column": col, "dtype": current_schema[col]})
for col in prev_cols - curr_cols:
changes.append({"type": "REMOVED", "column": col, "dtype": previous_schema[col]})
for col in prev_cols & curr_cols:
if previous_schema[col] != current_schema[col]:
changes.append({
"type": "TYPE_CHANGED",
"column": col,
"from": previous_schema[col],
"to": current_schema[col],
})
return changes
ダッシュボード設計
| パネル | 表示内容 | 更新頻度 |
|---|---|---|
| データ鮮度マップ | 全テーブルの最終更新時刻と鮮度状態 | 5分 |
| ボリュームトレンド | テーブルごとの行数推移(7日間) | 1時間 |
| 異常アラート一覧 | 検知された異常の一覧と重要度 | リアルタイム |
| スキーマ変更履歴 | 直近のスキーマ変更一覧 | 日次 |
| 品質スコアトレンド | 品質スコアの推移 | 日次 |
まとめ
| 項目 | ポイント |
|---|---|
| 5つの柱 | Freshness、Volume、Schema、Distribution、Lineage |
| 異常検知 | 統計的手法(z-score等)で未知の異常も検出 |
| スキーマ監視 | カラムの追加・削除・型変更を自動検知 |
| ダッシュボード | 全テーブルの健全性を一覧で把握 |
チェックリスト
- データ可観測性の5つの柱を説明できる
- データ品質テストとデータ可観測性の違いを理解している
- 統計的異常検知の基本的な実装方法を理解している
- スキーマ変更検知の仕組みを説明できる
次のステップへ
データ可観測性を理解しました。次はデータリネージについて学びましょう。
推定読了時間:30分