LESSON 40分

ストーリー

佐藤CTO
“この数字はどこから来たのか?” — 経営会議で聞かれた時、30秒以内に回答できるか?
佐藤CTO
データリネージュはデータの血統書だ。どこで生まれ、どう加工され、どこに届くか。影響分析と監査証跡を合わせて学ぼう

データリネージュの種類

種類粒度用途
テーブルレベルテーブル間の依存関係影響分析(スキーマ変更時)raw.ordersstaging.stg_ordersmarts.fct_revenue
カラムレベルカラム間の変換関係計算ロジックの追跡orders.totalSUM(total)fct_revenue.revenue
レコードレベル行単位のトレーサビリティ規制監査、データ品質の根本原因分析record_id: 12345 → 変換ログ → 最終値

テーブルレベルリネージュの実装

// dbt + DataHub でのテーブルレベルリネージュ

// dbt は ref() / source() から自動的にリネージュを生成
// manifest.json に依存関係が記録される

interface DbtLineageNode {
  uniqueId: string;
  resourceType: 'source' | 'model' | 'test' | 'exposure';
  dependsOn: {
    nodes: string[];   // 上流ノード
    macros: string[];
  };
  columns: Record<string, ColumnInfo>;
}

// dbt manifest.json からリネージュを抽出する例
function extractLineage(manifest: DbtManifest): LineageEdge[] {
  const edges: LineageEdge[] = [];

  for (const [nodeId, node] of Object.entries(manifest.nodes)) {
    for (const upstreamId of node.depends_on.nodes) {
      edges.push({
        upstream: upstreamId,
        downstream: nodeId,
        transformationType: detectTransformType(node),
        sql: node.compiled_sql,
      });
    }
  }

  return edges;
}

// リネージュの可視化クエリ(DataHub GraphQL API)
const lineageQuery = `
  query getLineage($urn: String!, $direction: LineageDirection!) {
    searchAcrossLineage(
      input: {
        urn: $urn
        direction: $direction
        maxHops: 5
        count: 50
      }
    ) {
      searchResults {
        entity {
          urn
          type
          ... on Dataset {
            name
            platform {
              name
            }
            properties {
              description
            }
          }
        }
        degree   # 何ホップ離れているか
        paths {
          path {
            urn
            type
          }
        }
      }
    }
  }
`;

カラムレベルリネージュの実装

// SQLをパースしてカラムレベルリネージュを抽出

interface ColumnLineage {
  targetColumn: string;
  sourceColumns: SourceColumnRef[];
  transformation: string;
}

interface SourceColumnRef {
  table: string;
  column: string;
  transformationApplied: string;
}

// 例: fct_daily_revenue のカラムリネージュ
const revenueColumnLineage: ColumnLineage[] = [
  {
    targetColumn: 'fct_daily_revenue.revenue_date',
    sourceColumns: [
      {
        table: 'stg_orders',
        column: 'order_date',
        transformationApplied: 'DATE(order_date)',
      },
    ],
    transformation: 'DATE型変換',
  },
  {
    targetColumn: 'fct_daily_revenue.total_revenue',
    sourceColumns: [
      {
        table: 'stg_orders',
        column: 'total_amount',
        transformationApplied: 'SUM(total_amount) WHERE status = "completed"',
      },
    ],
    transformation: '条件付き集計(completed注文のみ)',
  },
  {
    targetColumn: 'fct_daily_revenue.total_fees',
    sourceColumns: [
      {
        table: 'stg_orders',
        column: 'total_amount',
        transformationApplied: 'SUM(total_amount * fee_rate)',
      },
      {
        table: 'dim_transaction_type',
        column: 'fee_rate',
        transformationApplied: 'JOIN on type_id',
      },
    ],
    transformation: '金額 x 手数料率の積の集計',
  },
];
-- dbt: カラムレベルリネージュの自動記録(meta設定)

-- models/marts/finance/fct_daily_revenue.sql
{{
  config(
    materialized='table',
    meta={
      'owner': 'finance-team',
      'contains_pii': false,
      'sla_hours': 3,
    }
  )
}}

-- column-level documentation(カラム説明とリネージュ)
-- schema.yml
-- models:
--   - name: fct_daily_revenue
--     columns:
--       - name: total_revenue
--         description: "日次の確定売上合計(completed注文のtotal_amountの合計)"
--         meta:
--           source_columns:
--             - stg_orders.total_amount
--           transformation: "SUM(total_amount) WHERE status = 'completed'"
--           business_term: "daily_revenue"

SELECT
    DATE(o.order_date) AS revenue_date,
    t.category AS revenue_category,
    COUNT(DISTINCT o.order_id) AS order_count,
    SUM(o.total_amount) AS total_revenue,
    SUM(o.total_amount * t.fee_rate) AS total_fees
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('dim_transaction_type') }} t ON o.type_id = t.type_id
WHERE o.status = 'completed'
GROUP BY 1, 2

影響分析(Impact Analysis)

// スキーマ変更時の影響分析

interface ImpactAnalysis {
  changeDescription: string;
  affectedAssets: AffectedAsset[];
  riskLevel: 'low' | 'medium' | 'high' | 'critical';
  mitigationSteps: string[];
}

interface AffectedAsset {
  urn: string;
  name: string;
  type: 'dataset' | 'dashboard' | 'pipeline' | 'ml_model';
  impactType: 'breaking' | 'non-breaking' | 'potential';
  owner: string;
}

// 影響分析の実行例
async function analyzeImpact(
  targetUrn: string,
  changeType: 'column_rename' | 'column_delete' | 'type_change' | 'table_delete',
): Promise<ImpactAnalysis> {
  // 1. 下流の全依存関係を取得
  const downstream = await getDownstreamLineage(targetUrn, { maxHops: 10 });

  // 2. 影響の種類を判定
  const affectedAssets = downstream.map(asset => ({
    ...asset,
    impactType: assessImpactType(changeType, asset),
  }));

  // 3. リスクレベルの判定
  const riskLevel = calculateRisk(affectedAssets);

  return {
    changeDescription: `${targetUrn} の ${changeType}`,
    affectedAssets,
    riskLevel,
    mitigationSteps: generateMitigationPlan(changeType, affectedAssets),
  };
}

// 影響分析の結果例
const exampleImpact: ImpactAnalysis = {
  changeDescription: 'stg_orders.total_amount のカラム名を order_total に変更',
  affectedAssets: [
    {
      urn: 'urn:li:dataset:fct_daily_revenue',
      name: 'fct_daily_revenue',
      type: 'dataset',
      impactType: 'breaking',
      owner: 'finance-team',
    },
    {
      urn: 'urn:li:dataset:agg_monthly_customer',
      name: 'agg_monthly_customer',
      type: 'dataset',
      impactType: 'breaking',
      owner: 'analytics-team',
    },
    {
      urn: 'urn:li:dashboard:executive_kpi',
      name: '経営KPIダッシュボード',
      type: 'dashboard',
      impactType: 'potential',
      owner: 'bi-team',
    },
    {
      urn: 'urn:li:mlModel:churn_prediction',
      name: '離脱予測モデル',
      type: 'ml_model',
      impactType: 'potential',
      owner: 'ml-team',
    },
  ],
  riskLevel: 'high',
  mitigationSteps: [
    '1. finance-team と analytics-team に事前通知(5営業日前)',
    '2. 旧カラム名を維持しつつ新カラム名をエイリアスで追加(非破壊変更)',
    '3. 下流モデルを新カラム名に移行(1-2スプリント)',
    '4. 旧カラム名をdeprecatedマークし、90日後に削除',
  ],
};

監査証跡の設計

// 監査証跡(Audit Trail)のスキーマ設計

interface AuditEvent {
  eventId: string;
  timestamp: string;
  actor: {
    userId: string;
    role: string;
    ipAddress: string;
    userAgent: string;
  };
  action: AuditAction;
  resource: {
    type: 'dataset' | 'column' | 'record' | 'pipeline' | 'policy';
    identifier: string;
    classification: string;
  };
  details: Record<string, unknown>;
  outcome: 'success' | 'failure' | 'denied';
  reason?: string;
}

type AuditAction =
  | 'data_access'        // データの読み取り
  | 'data_export'        // データのダウンロード/エクスポート
  | 'data_modify'        // データの変更
  | 'data_delete'        // データの削除
  | 'schema_change'      // スキーマの変更
  | 'permission_grant'   // 権限の付与
  | 'permission_revoke'  // 権限の取り消し
  | 'policy_change'      // ポリシーの変更
  | 'dsar_request'       // データ主体アクセス要求
  | 'anonymization';     // 匿名化処理
-- 監査テーブル(PostgreSQL)
CREATE TABLE audit_trail (
    event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    event_timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    -- アクター情報
    actor_user_id VARCHAR(100) NOT NULL,
    actor_role VARCHAR(50) NOT NULL,
    actor_ip_address INET,
    actor_user_agent TEXT,
    -- アクション情報
    action VARCHAR(50) NOT NULL,
    resource_type VARCHAR(50) NOT NULL,
    resource_identifier TEXT NOT NULL,
    resource_classification VARCHAR(20),
    -- 詳細
    details JSONB,
    query_text TEXT,             -- 実行されたSQL(機密データはマスキング済み)
    rows_affected INT,
    -- 結果
    outcome VARCHAR(20) NOT NULL,
    failure_reason TEXT,
    -- パーティションキー
    event_date DATE GENERATED ALWAYS AS (event_timestamp::DATE) STORED
)
PARTITION BY RANGE (event_date);

-- 月次パーティション
CREATE TABLE audit_trail_2024_01 PARTITION OF audit_trail
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- インデックス
CREATE INDEX idx_audit_actor ON audit_trail (actor_user_id, event_timestamp DESC);
CREATE INDEX idx_audit_resource ON audit_trail (resource_identifier, event_timestamp DESC);
CREATE INDEX idx_audit_action ON audit_trail (action, event_timestamp DESC);
CREATE INDEX idx_audit_classification ON audit_trail (resource_classification, event_timestamp DESC);

-- 監査クエリ例: 特定ユーザーのRestrictedデータへのアクセス履歴
SELECT
    event_timestamp,
    action,
    resource_identifier,
    details->>'columns_accessed' AS columns_accessed,
    rows_affected,
    outcome
FROM audit_trail
WHERE actor_user_id = 'user-123'
  AND resource_classification = 'restricted'
  AND event_timestamp >= NOW() - INTERVAL '90 days'
ORDER BY event_timestamp DESC;
監査ログの保持とアーカイブ戦略
Hot Storage(直近3ヶ月):
  - PostgreSQL パーティションテーブル
  - 高速クエリ対応
  - インデックス完備

Warm Storage(3ヶ月〜1年):
  - S3 + Parquet形式にエクスポート
  - Athena / Presto でアドホッククエリ可能
  - 月次バッチでアーカイブ

Cold Storage(1年〜7年):
  - S3 Glacier Deep Archive
  - 規制要件による保持(金融: 7年、医療: 10年)
  - 復元に12-48時間

削除(7年超):
  - 法定保持期間終了後に自動削除
  - 削除証跡を別テーブルに記録

まとめ

ポイント内容
リネージュの粒度テーブル / カラム / レコードの3レベル
影響分析スキーマ変更前に下流への影響を事前評価
監査証跡誰が・いつ・何に・何をしたかを完全記録
保持戦略Hot/Warm/Cold の3層で規制要件とコストを両立

チェックリスト

  • テーブル/カラムレベルのリネージュを設計できる
  • スキーマ変更時の影響分析を実行できる
  • 監査証跡のスキーマとクエリパターンを設計できる
  • 監査ログの保持・アーカイブ戦略を策定できる

次のステップへ

次は演習でデータガバナンスの設計を実践します。


推定読了時間: 40分