ストーリー
データリネージュの種類
| 種類 | 粒度 | 用途 | 例 |
|---|---|---|---|
| テーブルレベル | テーブル間の依存関係 | 影響分析(スキーマ変更時) | raw.orders → staging.stg_orders → marts.fct_revenue |
| カラムレベル | カラム間の変換関係 | 計算ロジックの追跡 | orders.total → SUM(total) → fct_revenue.revenue |
| レコードレベル | 行単位のトレーサビリティ | 規制監査、データ品質の根本原因分析 | record_id: 12345 → 変換ログ → 最終値 |
テーブルレベルリネージュの実装
// dbt + DataHub でのテーブルレベルリネージュ
// dbt は ref() / source() から自動的にリネージュを生成
// manifest.json に依存関係が記録される
interface DbtLineageNode {
uniqueId: string;
resourceType: 'source' | 'model' | 'test' | 'exposure';
dependsOn: {
nodes: string[]; // 上流ノード
macros: string[];
};
columns: Record<string, ColumnInfo>;
}
// dbt manifest.json からリネージュを抽出する例
function extractLineage(manifest: DbtManifest): LineageEdge[] {
const edges: LineageEdge[] = [];
for (const [nodeId, node] of Object.entries(manifest.nodes)) {
for (const upstreamId of node.depends_on.nodes) {
edges.push({
upstream: upstreamId,
downstream: nodeId,
transformationType: detectTransformType(node),
sql: node.compiled_sql,
});
}
}
return edges;
}
// リネージュの可視化クエリ(DataHub GraphQL API)
const lineageQuery = `
query getLineage($urn: String!, $direction: LineageDirection!) {
searchAcrossLineage(
input: {
urn: $urn
direction: $direction
maxHops: 5
count: 50
}
) {
searchResults {
entity {
urn
type
... on Dataset {
name
platform {
name
}
properties {
description
}
}
}
degree # 何ホップ離れているか
paths {
path {
urn
type
}
}
}
}
}
`;
カラムレベルリネージュの実装
// SQLをパースしてカラムレベルリネージュを抽出
interface ColumnLineage {
targetColumn: string;
sourceColumns: SourceColumnRef[];
transformation: string;
}
interface SourceColumnRef {
table: string;
column: string;
transformationApplied: string;
}
// 例: fct_daily_revenue のカラムリネージュ
const revenueColumnLineage: ColumnLineage[] = [
{
targetColumn: 'fct_daily_revenue.revenue_date',
sourceColumns: [
{
table: 'stg_orders',
column: 'order_date',
transformationApplied: 'DATE(order_date)',
},
],
transformation: 'DATE型変換',
},
{
targetColumn: 'fct_daily_revenue.total_revenue',
sourceColumns: [
{
table: 'stg_orders',
column: 'total_amount',
transformationApplied: 'SUM(total_amount) WHERE status = "completed"',
},
],
transformation: '条件付き集計(completed注文のみ)',
},
{
targetColumn: 'fct_daily_revenue.total_fees',
sourceColumns: [
{
table: 'stg_orders',
column: 'total_amount',
transformationApplied: 'SUM(total_amount * fee_rate)',
},
{
table: 'dim_transaction_type',
column: 'fee_rate',
transformationApplied: 'JOIN on type_id',
},
],
transformation: '金額 x 手数料率の積の集計',
},
];
-- dbt: カラムレベルリネージュの自動記録(meta設定)
-- models/marts/finance/fct_daily_revenue.sql
{{
config(
materialized='table',
meta={
'owner': 'finance-team',
'contains_pii': false,
'sla_hours': 3,
}
)
}}
-- column-level documentation(カラム説明とリネージュ)
-- schema.yml
-- models:
-- - name: fct_daily_revenue
-- columns:
-- - name: total_revenue
-- description: "日次の確定売上合計(completed注文のtotal_amountの合計)"
-- meta:
-- source_columns:
-- - stg_orders.total_amount
-- transformation: "SUM(total_amount) WHERE status = 'completed'"
-- business_term: "daily_revenue"
SELECT
DATE(o.order_date) AS revenue_date,
t.category AS revenue_category,
COUNT(DISTINCT o.order_id) AS order_count,
SUM(o.total_amount) AS total_revenue,
SUM(o.total_amount * t.fee_rate) AS total_fees
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('dim_transaction_type') }} t ON o.type_id = t.type_id
WHERE o.status = 'completed'
GROUP BY 1, 2
影響分析(Impact Analysis)
// スキーマ変更時の影響分析
interface ImpactAnalysis {
changeDescription: string;
affectedAssets: AffectedAsset[];
riskLevel: 'low' | 'medium' | 'high' | 'critical';
mitigationSteps: string[];
}
interface AffectedAsset {
urn: string;
name: string;
type: 'dataset' | 'dashboard' | 'pipeline' | 'ml_model';
impactType: 'breaking' | 'non-breaking' | 'potential';
owner: string;
}
// 影響分析の実行例
async function analyzeImpact(
targetUrn: string,
changeType: 'column_rename' | 'column_delete' | 'type_change' | 'table_delete',
): Promise<ImpactAnalysis> {
// 1. 下流の全依存関係を取得
const downstream = await getDownstreamLineage(targetUrn, { maxHops: 10 });
// 2. 影響の種類を判定
const affectedAssets = downstream.map(asset => ({
...asset,
impactType: assessImpactType(changeType, asset),
}));
// 3. リスクレベルの判定
const riskLevel = calculateRisk(affectedAssets);
return {
changeDescription: `${targetUrn} の ${changeType}`,
affectedAssets,
riskLevel,
mitigationSteps: generateMitigationPlan(changeType, affectedAssets),
};
}
// 影響分析の結果例
const exampleImpact: ImpactAnalysis = {
changeDescription: 'stg_orders.total_amount のカラム名を order_total に変更',
affectedAssets: [
{
urn: 'urn:li:dataset:fct_daily_revenue',
name: 'fct_daily_revenue',
type: 'dataset',
impactType: 'breaking',
owner: 'finance-team',
},
{
urn: 'urn:li:dataset:agg_monthly_customer',
name: 'agg_monthly_customer',
type: 'dataset',
impactType: 'breaking',
owner: 'analytics-team',
},
{
urn: 'urn:li:dashboard:executive_kpi',
name: '経営KPIダッシュボード',
type: 'dashboard',
impactType: 'potential',
owner: 'bi-team',
},
{
urn: 'urn:li:mlModel:churn_prediction',
name: '離脱予測モデル',
type: 'ml_model',
impactType: 'potential',
owner: 'ml-team',
},
],
riskLevel: 'high',
mitigationSteps: [
'1. finance-team と analytics-team に事前通知(5営業日前)',
'2. 旧カラム名を維持しつつ新カラム名をエイリアスで追加(非破壊変更)',
'3. 下流モデルを新カラム名に移行(1-2スプリント)',
'4. 旧カラム名をdeprecatedマークし、90日後に削除',
],
};
監査証跡の設計
// 監査証跡(Audit Trail)のスキーマ設計
interface AuditEvent {
eventId: string;
timestamp: string;
actor: {
userId: string;
role: string;
ipAddress: string;
userAgent: string;
};
action: AuditAction;
resource: {
type: 'dataset' | 'column' | 'record' | 'pipeline' | 'policy';
identifier: string;
classification: string;
};
details: Record<string, unknown>;
outcome: 'success' | 'failure' | 'denied';
reason?: string;
}
type AuditAction =
| 'data_access' // データの読み取り
| 'data_export' // データのダウンロード/エクスポート
| 'data_modify' // データの変更
| 'data_delete' // データの削除
| 'schema_change' // スキーマの変更
| 'permission_grant' // 権限の付与
| 'permission_revoke' // 権限の取り消し
| 'policy_change' // ポリシーの変更
| 'dsar_request' // データ主体アクセス要求
| 'anonymization'; // 匿名化処理
-- 監査テーブル(PostgreSQL)
CREATE TABLE audit_trail (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- アクター情報
actor_user_id VARCHAR(100) NOT NULL,
actor_role VARCHAR(50) NOT NULL,
actor_ip_address INET,
actor_user_agent TEXT,
-- アクション情報
action VARCHAR(50) NOT NULL,
resource_type VARCHAR(50) NOT NULL,
resource_identifier TEXT NOT NULL,
resource_classification VARCHAR(20),
-- 詳細
details JSONB,
query_text TEXT, -- 実行されたSQL(機密データはマスキング済み)
rows_affected INT,
-- 結果
outcome VARCHAR(20) NOT NULL,
failure_reason TEXT,
-- パーティションキー
event_date DATE GENERATED ALWAYS AS (event_timestamp::DATE) STORED
)
PARTITION BY RANGE (event_date);
-- 月次パーティション
CREATE TABLE audit_trail_2024_01 PARTITION OF audit_trail
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- インデックス
CREATE INDEX idx_audit_actor ON audit_trail (actor_user_id, event_timestamp DESC);
CREATE INDEX idx_audit_resource ON audit_trail (resource_identifier, event_timestamp DESC);
CREATE INDEX idx_audit_action ON audit_trail (action, event_timestamp DESC);
CREATE INDEX idx_audit_classification ON audit_trail (resource_classification, event_timestamp DESC);
-- 監査クエリ例: 特定ユーザーのRestrictedデータへのアクセス履歴
SELECT
event_timestamp,
action,
resource_identifier,
details->>'columns_accessed' AS columns_accessed,
rows_affected,
outcome
FROM audit_trail
WHERE actor_user_id = 'user-123'
AND resource_classification = 'restricted'
AND event_timestamp >= NOW() - INTERVAL '90 days'
ORDER BY event_timestamp DESC;
監査ログの保持とアーカイブ戦略
Hot Storage(直近3ヶ月):
- PostgreSQL パーティションテーブル
- 高速クエリ対応
- インデックス完備
Warm Storage(3ヶ月〜1年):
- S3 + Parquet形式にエクスポート
- Athena / Presto でアドホッククエリ可能
- 月次バッチでアーカイブ
Cold Storage(1年〜7年):
- S3 Glacier Deep Archive
- 規制要件による保持(金融: 7年、医療: 10年)
- 復元に12-48時間
削除(7年超):
- 法定保持期間終了後に自動削除
- 削除証跡を別テーブルに記録
まとめ
| ポイント | 内容 |
|---|---|
| リネージュの粒度 | テーブル / カラム / レコードの3レベル |
| 影響分析 | スキーマ変更前に下流への影響を事前評価 |
| 監査証跡 | 誰が・いつ・何に・何をしたかを完全記録 |
| 保持戦略 | Hot/Warm/Cold の3層で規制要件とコストを両立 |
チェックリスト
- テーブル/カラムレベルのリネージュを設計できる
- スキーマ変更時の影響分析を実行できる
- 監査証跡のスキーマとクエリパターンを設計できる
- 監査ログの保持・アーカイブ戦略を策定できる
次のステップへ
次は演習でデータガバナンスの設計を実践します。
推定読了時間: 40分