ストーリー
ミッション概要
| ミッション | テーマ | 目安時間 |
|---|---|---|
| Mission 1 | DWHアーキテクチャ設計 | 15分 |
| Mission 2 | データレイクハウス設計 | 15分 |
| Mission 3 | ディメンショナルモデル設計 | 15分 |
| Mission 4 | リアルタイム分析基盤設計 | 15分 |
前提シナリオ
あなたは総合フィンテック企業「FinFlow」のデータアーキテクトです。
主要データソース:
- トランザクションDB(PostgreSQL): 口座、送金、決済
- ユーザー行動ログ(Kafka): アプリ操作、ページ遷移、検索
- 外部マーケットデータ(API): 為替レート、株価、金利
- 不正検知システム(ストリーム): アラート、スコアリング結果
- CRMデータ(Salesforce): 顧客情報、問い合わせ履歴
分析要件:
- 経営ダッシュボード(日次KPI、月次決算レポート)
- リアルタイム不正検知ダッシュボード(遅延5秒以内)
- 顧客LTV分析・離脱予測(週次バッチ)
- 規制報告書の自動生成(月次・四半期)
Mission 1: DWHアーキテクチャ設計(15分)
要件
FinFlowの日次KPIレポートと月次決算レポートを支えるDWHアーキテクチャを設計してください。クラウドDWHの選定理由、データ取込パターン、コスト最適化戦略を含めること。
解答例
// DWH アーキテクチャ設計書
interface DWHArchitecture {
platform: string;
selectionReason: string[];
dataIngestion: IngestionConfig[];
costOptimization: CostStrategy[];
}
const finflowDWH: DWHArchitecture = {
platform: 'Snowflake',
selectionReason: [
'コンピュートとストレージの分離 → コスト効率',
'Time Travel(90日)→ 規制対応の監査要件',
'Zero-copy Clone → 本番相当のテスト環境を低コストで',
'セミ構造化データ(VARIANT型)→ JSON/APIデータの柔軟な取込',
],
dataIngestion: [
{
source: 'PostgreSQL(トランザクション)',
method: 'CDC via Debezium → Kafka → Snowpipe Streaming',
frequency: 'near real-time(5分以内)',
format: 'Avro → Snowflake内部テーブル',
},
{
source: 'Salesforce CRM',
method: 'Fivetran(マネージドELT)',
frequency: 'daily incremental(AM2時)',
format: 'Fivetran正規化テーブル',
},
{
source: '外部マーケットデータ',
method: 'Airflow → REST API → S3 → Snowpipe',
frequency: 'hourly',
format: 'JSON → VARIANT型テーブル',
},
],
costOptimization: [
{
strategy: 'Warehouse Sizing',
detail: 'ETL用: Medium(夜間のみ), BI用: Small(営業時間), アドホック: XS(auto-suspend 60s)',
},
{
strategy: 'Clustering Keys',
detail: 'fct_transactionsはtransaction_dateでクラスタリング(プルーニング効率化)',
},
{
strategy: 'Materialized Views',
detail: '日次KPIサマリーをMVで事前集計、BI直接参照',
},
{
strategy: 'Data Lifecycle',
detail: 'Hot: 直近3ヶ月(Standard), Warm: 3-12ヶ月, Cold: 1年以上(自動アーカイブ)',
},
],
};
-- Snowflake: マルチウェアハウス構成
CREATE WAREHOUSE wh_etl
WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE
COMMENT = 'ETLバッチ処理用(夜間)';
CREATE WAREHOUSE wh_bi
WAREHOUSE_SIZE = 'SMALL'
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3
SCALING_POLICY = 'ECONOMY'
COMMENT = 'BIツール・ダッシュボード用';
CREATE WAREHOUSE wh_adhoc
WAREHOUSE_SIZE = 'XSMALL'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
COMMENT = 'アドホック分析用(自動停止60秒)';
-- コスト監視: Resource Monitor
CREATE RESOURCE MONITOR monthly_budget
WITH CREDIT_QUOTA = 500
TRIGGERS
ON 75 PERCENT DO NOTIFY
ON 90 PERCENT DO NOTIFY
ON 100 PERCENT DO SUSPEND;
ALTER WAREHOUSE wh_etl SET RESOURCE_MONITOR = monthly_budget;
設計判断:
- Snowflakeを選定: コンピュート分離によるコスト効率と、Time Travelによる金融規制対応
- ETL/BI/アドホックを分離し、ワークロード間の干渉を防止
- Resource Monitorで予期しないコスト増を自動検知
Mission 2: データレイクハウス設計(15分)
要件
構造化データ(トランザクション)と非構造化データ(行動ログ、API応答)を統合管理するレイクハウスを設計してください。テーブルフォーマット選定、メダリオンアーキテクチャのレイヤー定義、アクセス制御を含めること。
解答例
// メダリオンアーキテクチャ設計
interface MedallionLayer {
name: string;
purpose: string;
tableFormat: string;
retentionPolicy: string;
accessControl: string[];
}
const medallionArchitecture: MedallionLayer[] = [
{
name: 'Bronze(Raw)',
purpose: '全データソースの生データを忠実に保存。スキーマオンリード。',
tableFormat: 'Delta Lake(append-only、schema evolution対応)',
retentionPolicy: '7年間保持(金融規制要件)',
accessControl: ['data-engineering-team: read/write', 'audit-team: read'],
},
{
name: 'Silver(Cleansed)',
purpose: 'クレンジング済み、型統一、重複排除、NULL処理済み。',
tableFormat: 'Delta Lake(MERGE対応、Z-ORDER最適化)',
retentionPolicy: '3年間保持',
accessControl: [
'data-engineering-team: read/write',
'data-analyst-team: read',
],
},
{
name: 'Gold(Business)',
purpose: 'ビジネスロジック適用済み。ディメンション・ファクトテーブル。',
tableFormat: 'Delta Lake(最適化テーブル、キャッシュ有効)',
retentionPolicy: '5年間保持(規制報告用)',
accessControl: [
'data-analyst-team: read',
'bi-tools: read',
'compliance-team: read',
],
},
];
-- Delta Lake: Bronze → Silver の MERGE パターン
MERGE INTO silver.transactions AS target
USING (
SELECT
CAST(transaction_id AS STRING) AS transaction_id,
CAST(account_id AS STRING) AS account_id,
CAST(amount AS DECIMAL(18,2)) AS amount,
CAST(currency AS STRING) AS currency,
TO_TIMESTAMP(event_time) AS event_time,
CAST(transaction_type AS STRING) AS transaction_type,
CURRENT_TIMESTAMP() AS _etl_loaded_at,
input_file_name() AS _source_file
FROM bronze.transactions_raw
WHERE _ingested_at > '{{ last_processed_timestamp }}'
) AS source
ON target.transaction_id = source.transaction_id
WHEN MATCHED AND source.event_time > target.event_time THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- Z-ORDER: 頻出クエリパターンに最適化
OPTIMIZE silver.transactions
ZORDER BY (account_id, event_time);
-- Gold: 日次トランザクションサマリー
CREATE OR REPLACE TABLE gold.fct_daily_transactions AS
SELECT
DATE(event_time) AS transaction_date,
transaction_type,
currency,
COUNT(*) AS transaction_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount,
PERCENTILE_APPROX(amount, 0.5) AS median_amount,
COUNT(DISTINCT account_id) AS unique_accounts
FROM silver.transactions
GROUP BY 1, 2, 3;
設計判断:
- Delta Lakeを採用: ACID保証、Time Travel、MERGE対応が金融データに必須
- 7年保持(Bronze)は金融規制の記録保持要件に準拠
- Z-ORDERでaccount_id + event_timeの検索を最適化
Mission 3: ディメンショナルモデル設計(15分)
要件
FinFlowの「顧客トランザクション分析」のためのスタースキーマを設計してください。ファクトテーブル、ディメンションテーブル(SCD Type 2含む)、集計テーブルを定義すること。
解答例
-- ディメンション: 顧客(SCD Type 2)
CREATE TABLE gold.dim_customer (
customer_sk BIGINT GENERATED ALWAYS AS IDENTITY, -- サロゲートキー
customer_id VARCHAR(50) NOT NULL, -- ナチュラルキー
full_name VARCHAR(200),
email VARCHAR(200),
phone VARCHAR(50),
tier VARCHAR(20), -- Free / Premium / Business / Enterprise
kyc_status VARCHAR(20), -- pending / verified / rejected
risk_score DECIMAL(5,2),
region VARCHAR(50),
country VARCHAR(10),
registration_date DATE,
-- SCD Type 2 管理カラム
effective_from DATE NOT NULL,
effective_to DATE DEFAULT '9999-12-31',
is_current BOOLEAN DEFAULT TRUE,
_etl_loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- ディメンション: 日付
CREATE TABLE gold.dim_date (
date_key INT PRIMARY KEY, -- YYYYMMDD
full_date DATE NOT NULL,
day_of_week INT,
day_name VARCHAR(10),
month INT,
month_name VARCHAR(10),
quarter INT,
year INT,
is_weekend BOOLEAN,
is_holiday BOOLEAN,
fiscal_quarter INT,
fiscal_year INT
);
-- ディメンション: トランザクション種別
CREATE TABLE gold.dim_transaction_type (
type_sk INT GENERATED ALWAYS AS IDENTITY,
type_code VARCHAR(50) NOT NULL,
type_name VARCHAR(100),
category VARCHAR(50), -- payment / transfer / deposit / withdrawal
is_revenue_generating BOOLEAN,
fee_rate DECIMAL(5,4)
);
-- ファクト: トランザクション
CREATE TABLE gold.fct_transaction (
transaction_sk BIGINT GENERATED ALWAYS AS IDENTITY,
transaction_id VARCHAR(50) NOT NULL,
customer_sk BIGINT REFERENCES gold.dim_customer(customer_sk),
date_key INT REFERENCES gold.dim_date(date_key),
type_sk INT REFERENCES gold.dim_transaction_type(type_sk),
-- メジャー(計測値)
amount DECIMAL(18,2) NOT NULL,
fee_amount DECIMAL(18,2) DEFAULT 0,
net_amount DECIMAL(18,2),
exchange_rate DECIMAL(12,6),
amount_in_jpy DECIMAL(18,2),
-- デジェネレイトディメンション
currency_code VARCHAR(10),
channel VARCHAR(20), -- app / web / api
status VARCHAR(20),
-- メタデータ
event_timestamp TIMESTAMP NOT NULL,
_etl_loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
PARTITION BY RANGE (date_key);
-- 集計テーブル: 月次顧客サマリー
CREATE TABLE gold.agg_monthly_customer AS
SELECT
d.year,
d.month,
c.customer_id,
c.tier,
c.region,
COUNT(*) AS transaction_count,
SUM(f.amount_in_jpy) AS total_amount_jpy,
SUM(f.fee_amount) AS total_fees,
COUNT(DISTINCT f.date_key) AS active_days,
MIN(f.event_timestamp) AS first_transaction,
MAX(f.event_timestamp) AS last_transaction
FROM gold.fct_transaction f
JOIN gold.dim_customer c ON f.customer_sk = c.customer_sk AND c.is_current = TRUE
JOIN gold.dim_date d ON f.date_key = d.date_key
GROUP BY d.year, d.month, c.customer_id, c.tier, c.region;
設計判断:
- 顧客ディメンションにSCD Type 2を適用(tier変更や risk_score変更の履歴を追跡)
- ファクトテーブルはdate_keyでパーティション(時間軸の範囲クエリを高速化)
- amount_in_jpyで通貨統一済みの金額を保持(JOINなしで集計可能)
Mission 4: リアルタイム分析基盤設計(15分)
要件
不正検知のリアルタイムダッシュボード(遅延5秒以内)を支える分析基盤を設計してください。データフロー、OLAPエンジン選定、アラート連携を含めること。
解答例
// リアルタイム不正検知分析基盤
interface RealtimeAnalyticsPlatform {
ingestion: IngestionLayer;
processing: ProcessingLayer;
serving: ServingLayer;
alerting: AlertingLayer;
}
const fraudAnalyticsPlatform: RealtimeAnalyticsPlatform = {
ingestion: {
source: 'Kafka(fraud-events トピック)',
throughput: '10,000 events/sec',
format: 'Avro(Schema Registry管理)',
partitionKey: 'account_id',
retention: '7日間',
},
processing: {
engine: 'Apache Flink',
operations: [
'ウィンドウ集計: 1分/5分/1時間の不正スコア分布',
'パターンマッチ: CEP(Complex Event Processing)で連続不正パターン検出',
'エンリッチメント: 顧客マスタ(Redis)でリアルタイム補完',
],
},
serving: {
engine: 'ClickHouse',
reason: 'SQL互換、Kafka直接取込、秒以下の集計クエリ',
tables: ['fraud_events(リアルタイム)', 'fraud_hourly_agg(事前集計)'],
},
alerting: {
rules: [
{ condition: '5分間で不正スコア>0.9が10件以上', severity: 'critical', channel: 'PagerDuty' },
{ condition: '1時間の不正率が前日同時刻比200%以上', severity: 'warning', channel: 'Slack' },
],
},
};
-- ClickHouse: 不正検知イベントテーブル
CREATE TABLE fraud_events (
event_id UUID DEFAULT generateUUIDv4(),
account_id String,
transaction_id String,
fraud_score Float64,
fraud_reason LowCardinality(String),
transaction_amount Decimal64(2),
currency LowCardinality(String),
merchant_category LowCardinality(String),
device_fingerprint String,
ip_address String,
country LowCardinality(String),
is_flagged UInt8,
event_time DateTime64(3)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY (account_id, event_time)
TTL event_time + INTERVAL 90 DAY;
-- Kafka からリアルタイム取り込み
CREATE TABLE fraud_events_kafka AS fraud_events
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'fraud-events',
kafka_group_name = 'clickhouse-fraud',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 8;
CREATE MATERIALIZED VIEW fraud_events_consumer TO fraud_events AS
SELECT * FROM fraud_events_kafka;
-- 事前集計: 5分ウィンドウ
CREATE MATERIALIZED VIEW fraud_5min_agg
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMMDD(window_start)
ORDER BY (window_start, country, merchant_category)
AS
SELECT
toStartOfFiveMinutes(event_time) AS window_start,
country,
merchant_category,
count() AS event_count,
countIf(is_flagged = 1) AS flagged_count,
avg(fraud_score) AS avg_fraud_score,
max(fraud_score) AS max_fraud_score,
sum(transaction_amount) AS total_amount
FROM fraud_events
GROUP BY window_start, country, merchant_category;
-- ダッシュボード用クエリ: 直近1時間の不正状況(<1秒応答)
SELECT
window_start,
sum(event_count) AS total_events,
sum(flagged_count) AS total_flagged,
round(sum(flagged_count) / sum(event_count) * 100, 2) AS flag_rate,
round(avg(avg_fraud_score), 3) AS avg_score
FROM fraud_5min_agg
WHERE window_start >= now() - INTERVAL 1 HOUR
GROUP BY window_start
ORDER BY window_start;
設計判断:
- ClickHouseを選定: SQL互換でBI連携が容易、Kafka直接取込で5秒以内の遅延を実現
- SummingMergeTreeで5分集計を自動化し、ダッシュボードの応答速度を向上
- 90日TTLでストレージコストを制御(長期保存はDWH側で管理)
まとめ
| ポイント | 内容 |
|---|---|
| DWHアーキテクチャ | ワークロード分離とコスト最適化が鍵 |
| レイクハウス | メダリオンアーキテクチャで段階的にデータ品質を向上 |
| ディメンショナルモデル | SCD Type 2で履歴追跡、集計テーブルで高速化 |
| リアルタイム分析 | Kafka→ClickHouseで低遅延、事前集計でダッシュボード応答向上 |
チェックリスト
- クラウドDWHの選定理由を説明し、コスト最適化を設計できた
- メダリオンアーキテクチャでレイクハウスを設計できた
- SCD Type 2を含むスタースキーマを設計できた
- リアルタイム分析基盤のエンドツーエンドを設計できた
次のステップへ
次はチェックポイントクイズで分析基盤の理解度を確認します。
推定読了時間: 60分