EXERCISE 60分

ストーリー

佐藤CTO
DWH、データレイク、OLAP、リアルタイム分析。それぞれの武器は揃った
佐藤CTO
次はそれらを組み合わせて、実際のビジネス要件に応える分析基盤を設計してみよう。技術選定の”なぜ”を説明できることが大切だ

ミッション概要

ミッションテーマ目安時間
Mission 1DWHアーキテクチャ設計15分
Mission 2データレイクハウス設計15分
Mission 3ディメンショナルモデル設計15分
Mission 4リアルタイム分析基盤設計15分

前提シナリオ

あなたは総合フィンテック企業「FinFlow」のデータアーキテクトです。

主要データソース:

  1. トランザクションDB(PostgreSQL): 口座、送金、決済
  2. ユーザー行動ログ(Kafka): アプリ操作、ページ遷移、検索
  3. 外部マーケットデータ(API): 為替レート、株価、金利
  4. 不正検知システム(ストリーム): アラート、スコアリング結果
  5. CRMデータ(Salesforce): 顧客情報、問い合わせ履歴

分析要件:

  • 経営ダッシュボード(日次KPI、月次決算レポート)
  • リアルタイム不正検知ダッシュボード(遅延5秒以内)
  • 顧客LTV分析・離脱予測(週次バッチ)
  • 規制報告書の自動生成(月次・四半期)

Mission 1: DWHアーキテクチャ設計(15分)

要件

FinFlowの日次KPIレポートと月次決算レポートを支えるDWHアーキテクチャを設計してください。クラウドDWHの選定理由、データ取込パターン、コスト最適化戦略を含めること。

解答例
// DWH アーキテクチャ設計書

interface DWHArchitecture {
  platform: string;
  selectionReason: string[];
  dataIngestion: IngestionConfig[];
  costOptimization: CostStrategy[];
}

const finflowDWH: DWHArchitecture = {
  platform: 'Snowflake',
  selectionReason: [
    'コンピュートとストレージの分離 → コスト効率',
    'Time Travel(90日)→ 規制対応の監査要件',
    'Zero-copy Clone → 本番相当のテスト環境を低コストで',
    'セミ構造化データ(VARIANT型)→ JSON/APIデータの柔軟な取込',
  ],

  dataIngestion: [
    {
      source: 'PostgreSQL(トランザクション)',
      method: 'CDC via Debezium → Kafka → Snowpipe Streaming',
      frequency: 'near real-time(5分以内)',
      format: 'Avro → Snowflake内部テーブル',
    },
    {
      source: 'Salesforce CRM',
      method: 'Fivetran(マネージドELT)',
      frequency: 'daily incremental(AM2時)',
      format: 'Fivetran正規化テーブル',
    },
    {
      source: '外部マーケットデータ',
      method: 'Airflow → REST API → S3 → Snowpipe',
      frequency: 'hourly',
      format: 'JSON → VARIANT型テーブル',
    },
  ],

  costOptimization: [
    {
      strategy: 'Warehouse Sizing',
      detail: 'ETL用: Medium(夜間のみ), BI用: Small(営業時間), アドホック: XS(auto-suspend 60s)',
    },
    {
      strategy: 'Clustering Keys',
      detail: 'fct_transactionsはtransaction_dateでクラスタリング(プルーニング効率化)',
    },
    {
      strategy: 'Materialized Views',
      detail: '日次KPIサマリーをMVで事前集計、BI直接参照',
    },
    {
      strategy: 'Data Lifecycle',
      detail: 'Hot: 直近3ヶ月(Standard), Warm: 3-12ヶ月, Cold: 1年以上(自動アーカイブ)',
    },
  ],
};
-- Snowflake: マルチウェアハウス構成
CREATE WAREHOUSE wh_etl
  WAREHOUSE_SIZE = 'MEDIUM'
  AUTO_SUSPEND = 300
  AUTO_RESUME = TRUE
  INITIALLY_SUSPENDED = TRUE
  COMMENT = 'ETLバッチ処理用(夜間)';

CREATE WAREHOUSE wh_bi
  WAREHOUSE_SIZE = 'SMALL'
  AUTO_SUSPEND = 120
  AUTO_RESUME = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 3
  SCALING_POLICY = 'ECONOMY'
  COMMENT = 'BIツール・ダッシュボード用';

CREATE WAREHOUSE wh_adhoc
  WAREHOUSE_SIZE = 'XSMALL'
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE
  COMMENT = 'アドホック分析用(自動停止60秒)';

-- コスト監視: Resource Monitor
CREATE RESOURCE MONITOR monthly_budget
  WITH CREDIT_QUOTA = 500
  TRIGGERS
    ON 75 PERCENT DO NOTIFY
    ON 90 PERCENT DO NOTIFY
    ON 100 PERCENT DO SUSPEND;

ALTER WAREHOUSE wh_etl SET RESOURCE_MONITOR = monthly_budget;

設計判断:

  • Snowflakeを選定: コンピュート分離によるコスト効率と、Time Travelによる金融規制対応
  • ETL/BI/アドホックを分離し、ワークロード間の干渉を防止
  • Resource Monitorで予期しないコスト増を自動検知

Mission 2: データレイクハウス設計(15分)

要件

構造化データ(トランザクション)と非構造化データ(行動ログ、API応答)を統合管理するレイクハウスを設計してください。テーブルフォーマット選定、メダリオンアーキテクチャのレイヤー定義、アクセス制御を含めること。

解答例
// メダリオンアーキテクチャ設計

interface MedallionLayer {
  name: string;
  purpose: string;
  tableFormat: string;
  retentionPolicy: string;
  accessControl: string[];
}

const medallionArchitecture: MedallionLayer[] = [
  {
    name: 'Bronze(Raw)',
    purpose: '全データソースの生データを忠実に保存。スキーマオンリード。',
    tableFormat: 'Delta Lake(append-only、schema evolution対応)',
    retentionPolicy: '7年間保持(金融規制要件)',
    accessControl: ['data-engineering-team: read/write', 'audit-team: read'],
  },
  {
    name: 'Silver(Cleansed)',
    purpose: 'クレンジング済み、型統一、重複排除、NULL処理済み。',
    tableFormat: 'Delta Lake(MERGE対応、Z-ORDER最適化)',
    retentionPolicy: '3年間保持',
    accessControl: [
      'data-engineering-team: read/write',
      'data-analyst-team: read',
    ],
  },
  {
    name: 'Gold(Business)',
    purpose: 'ビジネスロジック適用済み。ディメンション・ファクトテーブル。',
    tableFormat: 'Delta Lake(最適化テーブル、キャッシュ有効)',
    retentionPolicy: '5年間保持(規制報告用)',
    accessControl: [
      'data-analyst-team: read',
      'bi-tools: read',
      'compliance-team: read',
    ],
  },
];
-- Delta Lake: Bronze → Silver の MERGE パターン
MERGE INTO silver.transactions AS target
USING (
    SELECT
        CAST(transaction_id AS STRING) AS transaction_id,
        CAST(account_id AS STRING) AS account_id,
        CAST(amount AS DECIMAL(18,2)) AS amount,
        CAST(currency AS STRING) AS currency,
        TO_TIMESTAMP(event_time) AS event_time,
        CAST(transaction_type AS STRING) AS transaction_type,
        CURRENT_TIMESTAMP() AS _etl_loaded_at,
        input_file_name() AS _source_file
    FROM bronze.transactions_raw
    WHERE _ingested_at > '{{ last_processed_timestamp }}'
) AS source
ON target.transaction_id = source.transaction_id
WHEN MATCHED AND source.event_time > target.event_time THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *;

-- Z-ORDER: 頻出クエリパターンに最適化
OPTIMIZE silver.transactions
  ZORDER BY (account_id, event_time);

-- Gold: 日次トランザクションサマリー
CREATE OR REPLACE TABLE gold.fct_daily_transactions AS
SELECT
    DATE(event_time) AS transaction_date,
    transaction_type,
    currency,
    COUNT(*) AS transaction_count,
    SUM(amount) AS total_amount,
    AVG(amount) AS avg_amount,
    PERCENTILE_APPROX(amount, 0.5) AS median_amount,
    COUNT(DISTINCT account_id) AS unique_accounts
FROM silver.transactions
GROUP BY 1, 2, 3;

設計判断:

  • Delta Lakeを採用: ACID保証、Time Travel、MERGE対応が金融データに必須
  • 7年保持(Bronze)は金融規制の記録保持要件に準拠
  • Z-ORDERでaccount_id + event_timeの検索を最適化

Mission 3: ディメンショナルモデル設計(15分)

要件

FinFlowの「顧客トランザクション分析」のためのスタースキーマを設計してください。ファクトテーブル、ディメンションテーブル(SCD Type 2含む)、集計テーブルを定義すること。

解答例
-- ディメンション: 顧客(SCD Type 2)
CREATE TABLE gold.dim_customer (
    customer_sk BIGINT GENERATED ALWAYS AS IDENTITY,  -- サロゲートキー
    customer_id VARCHAR(50) NOT NULL,                   -- ナチュラルキー
    full_name VARCHAR(200),
    email VARCHAR(200),
    phone VARCHAR(50),
    tier VARCHAR(20),            -- Free / Premium / Business / Enterprise
    kyc_status VARCHAR(20),      -- pending / verified / rejected
    risk_score DECIMAL(5,2),
    region VARCHAR(50),
    country VARCHAR(10),
    registration_date DATE,
    -- SCD Type 2 管理カラム
    effective_from DATE NOT NULL,
    effective_to DATE DEFAULT '9999-12-31',
    is_current BOOLEAN DEFAULT TRUE,
    _etl_loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- ディメンション: 日付
CREATE TABLE gold.dim_date (
    date_key INT PRIMARY KEY,     -- YYYYMMDD
    full_date DATE NOT NULL,
    day_of_week INT,
    day_name VARCHAR(10),
    month INT,
    month_name VARCHAR(10),
    quarter INT,
    year INT,
    is_weekend BOOLEAN,
    is_holiday BOOLEAN,
    fiscal_quarter INT,
    fiscal_year INT
);

-- ディメンション: トランザクション種別
CREATE TABLE gold.dim_transaction_type (
    type_sk INT GENERATED ALWAYS AS IDENTITY,
    type_code VARCHAR(50) NOT NULL,
    type_name VARCHAR(100),
    category VARCHAR(50),         -- payment / transfer / deposit / withdrawal
    is_revenue_generating BOOLEAN,
    fee_rate DECIMAL(5,4)
);

-- ファクト: トランザクション
CREATE TABLE gold.fct_transaction (
    transaction_sk BIGINT GENERATED ALWAYS AS IDENTITY,
    transaction_id VARCHAR(50) NOT NULL,
    customer_sk BIGINT REFERENCES gold.dim_customer(customer_sk),
    date_key INT REFERENCES gold.dim_date(date_key),
    type_sk INT REFERENCES gold.dim_transaction_type(type_sk),
    -- メジャー(計測値)
    amount DECIMAL(18,2) NOT NULL,
    fee_amount DECIMAL(18,2) DEFAULT 0,
    net_amount DECIMAL(18,2),
    exchange_rate DECIMAL(12,6),
    amount_in_jpy DECIMAL(18,2),
    -- デジェネレイトディメンション
    currency_code VARCHAR(10),
    channel VARCHAR(20),          -- app / web / api
    status VARCHAR(20),
    -- メタデータ
    event_timestamp TIMESTAMP NOT NULL,
    _etl_loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
PARTITION BY RANGE (date_key);

-- 集計テーブル: 月次顧客サマリー
CREATE TABLE gold.agg_monthly_customer AS
SELECT
    d.year,
    d.month,
    c.customer_id,
    c.tier,
    c.region,
    COUNT(*) AS transaction_count,
    SUM(f.amount_in_jpy) AS total_amount_jpy,
    SUM(f.fee_amount) AS total_fees,
    COUNT(DISTINCT f.date_key) AS active_days,
    MIN(f.event_timestamp) AS first_transaction,
    MAX(f.event_timestamp) AS last_transaction
FROM gold.fct_transaction f
JOIN gold.dim_customer c ON f.customer_sk = c.customer_sk AND c.is_current = TRUE
JOIN gold.dim_date d ON f.date_key = d.date_key
GROUP BY d.year, d.month, c.customer_id, c.tier, c.region;

設計判断:

  • 顧客ディメンションにSCD Type 2を適用(tier変更や risk_score変更の履歴を追跡)
  • ファクトテーブルはdate_keyでパーティション(時間軸の範囲クエリを高速化)
  • amount_in_jpyで通貨統一済みの金額を保持(JOINなしで集計可能)

Mission 4: リアルタイム分析基盤設計(15分)

要件

不正検知のリアルタイムダッシュボード(遅延5秒以内)を支える分析基盤を設計してください。データフロー、OLAPエンジン選定、アラート連携を含めること。

解答例
// リアルタイム不正検知分析基盤

interface RealtimeAnalyticsPlatform {
  ingestion: IngestionLayer;
  processing: ProcessingLayer;
  serving: ServingLayer;
  alerting: AlertingLayer;
}

const fraudAnalyticsPlatform: RealtimeAnalyticsPlatform = {
  ingestion: {
    source: 'Kafka(fraud-events トピック)',
    throughput: '10,000 events/sec',
    format: 'Avro(Schema Registry管理)',
    partitionKey: 'account_id',
    retention: '7日間',
  },

  processing: {
    engine: 'Apache Flink',
    operations: [
      'ウィンドウ集計: 1分/5分/1時間の不正スコア分布',
      'パターンマッチ: CEP(Complex Event Processing)で連続不正パターン検出',
      'エンリッチメント: 顧客マスタ(Redis)でリアルタイム補完',
    ],
  },

  serving: {
    engine: 'ClickHouse',
    reason: 'SQL互換、Kafka直接取込、秒以下の集計クエリ',
    tables: ['fraud_events(リアルタイム)', 'fraud_hourly_agg(事前集計)'],
  },

  alerting: {
    rules: [
      { condition: '5分間で不正スコア>0.9が10件以上', severity: 'critical', channel: 'PagerDuty' },
      { condition: '1時間の不正率が前日同時刻比200%以上', severity: 'warning', channel: 'Slack' },
    ],
  },
};
-- ClickHouse: 不正検知イベントテーブル
CREATE TABLE fraud_events (
    event_id UUID DEFAULT generateUUIDv4(),
    account_id String,
    transaction_id String,
    fraud_score Float64,
    fraud_reason LowCardinality(String),
    transaction_amount Decimal64(2),
    currency LowCardinality(String),
    merchant_category LowCardinality(String),
    device_fingerprint String,
    ip_address String,
    country LowCardinality(String),
    is_flagged UInt8,
    event_time DateTime64(3)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY (account_id, event_time)
TTL event_time + INTERVAL 90 DAY;

-- Kafka からリアルタイム取り込み
CREATE TABLE fraud_events_kafka AS fraud_events
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'fraud-events',
    kafka_group_name = 'clickhouse-fraud',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 8;

CREATE MATERIALIZED VIEW fraud_events_consumer TO fraud_events AS
SELECT * FROM fraud_events_kafka;

-- 事前集計: 5分ウィンドウ
CREATE MATERIALIZED VIEW fraud_5min_agg
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMMDD(window_start)
ORDER BY (window_start, country, merchant_category)
AS
SELECT
    toStartOfFiveMinutes(event_time) AS window_start,
    country,
    merchant_category,
    count() AS event_count,
    countIf(is_flagged = 1) AS flagged_count,
    avg(fraud_score) AS avg_fraud_score,
    max(fraud_score) AS max_fraud_score,
    sum(transaction_amount) AS total_amount
FROM fraud_events
GROUP BY window_start, country, merchant_category;

-- ダッシュボード用クエリ: 直近1時間の不正状況(<1秒応答)
SELECT
    window_start,
    sum(event_count) AS total_events,
    sum(flagged_count) AS total_flagged,
    round(sum(flagged_count) / sum(event_count) * 100, 2) AS flag_rate,
    round(avg(avg_fraud_score), 3) AS avg_score
FROM fraud_5min_agg
WHERE window_start >= now() - INTERVAL 1 HOUR
GROUP BY window_start
ORDER BY window_start;

設計判断:

  • ClickHouseを選定: SQL互換でBI連携が容易、Kafka直接取込で5秒以内の遅延を実現
  • SummingMergeTreeで5分集計を自動化し、ダッシュボードの応答速度を向上
  • 90日TTLでストレージコストを制御(長期保存はDWH側で管理)

まとめ

ポイント内容
DWHアーキテクチャワークロード分離とコスト最適化が鍵
レイクハウスメダリオンアーキテクチャで段階的にデータ品質を向上
ディメンショナルモデルSCD Type 2で履歴追跡、集計テーブルで高速化
リアルタイム分析Kafka→ClickHouseで低遅延、事前集計でダッシュボード応答向上

チェックリスト

  • クラウドDWHの選定理由を説明し、コスト最適化を設計できた
  • メダリオンアーキテクチャでレイクハウスを設計できた
  • SCD Type 2を含むスタースキーマを設計できた
  • リアルタイム分析基盤のエンドツーエンドを設計できた

次のステップへ

次はチェックポイントクイズで分析基盤の理解度を確認します。


推定読了時間: 60分