LESSON 40分

ストーリー

佐藤CTO
DWHは構造化データの分析に最適だが、画像、ログ、IoTデータなど非構造化データも扱う必要がある
佐藤CTO
データレイクは全てのデータを保存する。ただし”データスワンプ”にならないための設計が重要だ。そしてレイクハウスは、レイクとDWHの良いところを統合する新しいパラダイムだ

データレイクの基本設計

データレイク vs DWH

観点データレイクDWH
データ形式構造化/半構造化/非構造化構造化のみ
スキーマSchema-on-ReadSchema-on-Write
ストレージオブジェクトストレージ(安価)専用ストレージ(高価)
ユーザーデータサイエンティストビジネスアナリスト
処理エンジンSpark, Presto, AthenaDWHネイティブエンジン

S3ベースのデータレイク設計

graph TD
    Root["s3://data-lake/"] --> Raw["raw/
Bronze: 生データ"] Root --> Processed["processed/
Silver: クレンジング済み"] Root --> Curated["curated/
Gold: 分析・ML用に最適化"] Root --> Meta["_metadata/"] Raw --> RawOrders["orders/"] Raw --> Logs["access_logs/"] Raw --> Images["images/"] RawOrders --> Postgres["source=postgres/
dt=2024-06-15/"] RawOrders --> Shopify["source=shopify/
dt=2024-06-15/"] Postgres --> ParquetFile["part-00000.parquet
_SUCCESS"] Logs --> LogDate["dt=2024-06-15/hour=14/"] LogDate --> Events["events.json.gz"] Images --> Products["products/2024/06/"] Processed --> ProcOrders["orders/
dt=2024-06-15/"] Processed --> Sessions["user_sessions/
dt=2024-06-15/"] Curated --> C360["customer_360/"] Curated --> Recommend["product_recommendations/"] Curated --> Revenue["revenue_daily/"] Meta --> Schemas["schemas/"] Meta --> QReports["quality_reports/"] Meta --> Lineage["lineage/"] style Root fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af style Raw fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e style Processed fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af style Curated fill:#d1fae5,stroke:#059669,color:#065f46 style Meta fill:#f3f4f6,stroke:#9ca3af,color:#374151

Medallionアーキテクチャ

graph LR
    Bronze["Bronze(Raw)
生データそのまま
JSON, CSV等
Schema-on-Read
保持: 永久
目的: 再処理可能"] Silver["Silver(Cleaned)
クレンジング
型変換、重複排除
標準化
保持: 3年
目的: 信頼できるデータ"] Gold["Gold(Business)
ビジネス集計
非正規化
ML特徴量
保持: 用途による
目的: 消費最適化"] Bronze --> Silver --> Gold style Bronze fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e style Silver fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af style Gold fill:#d1fae5,stroke:#059669,color:#065f46

データレイクハウス

レイクハウスの解決する課題

データレイクの課題レイクハウスの解決策
ACIDトランザクション不可テーブルフォーマットでACID保証
スキーマ強制なしSchema Evolution対応
小ファイル問題自動コンパクション
タイムトラベルなしバージョニング
インデックスなしZ-Order, Data Skipping

Delta Lake

# Delta Lake: ACIDトランザクション対応のレイクハウス

from delta import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# === Delta テーブルへの書き込み ===

# 初回書き込み
orders_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .save("s3://lakehouse/orders")

# 増分書き込み(MERGE / UPSERT)
delta_table = DeltaTable.forPath(spark, "s3://lakehouse/orders")

delta_table.alias("target").merge(
    new_orders_df.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdate(set={
    "status": "source.status",
    "total_amount": "source.total_amount",
    "updated_at": "source.updated_at",
}).whenNotMatchedInsertAll().execute()

# === タイムトラベル ===

# 特定バージョンのデータを読む
orders_v5 = spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load("s3://lakehouse/orders")

# 特定時刻のデータを読む
orders_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-06-14") \
    .load("s3://lakehouse/orders")

# 変更履歴の確認
delta_table.history().select(
    "version", "timestamp", "operation",
    "operationParameters", "operationMetrics"
).show()

# === 最適化 ===

# 小ファイルのコンパクション
delta_table.optimize().executeCompaction()

# Z-Order インデックス(特定カラムでのフィルタ高速化)
delta_table.optimize() \
    .where("order_date >= '2024-06-01'") \
    .executeZOrderBy("customer_id", "category")

Apache Iceberg

-- Apache Iceberg: オープンテーブルフォーマット

-- テーブル作成
CREATE TABLE lakehouse.orders (
    order_id    STRING,
    customer_id STRING,
    order_date  DATE,
    category    STRING,
    amount      DECIMAL(10,2),
    status      STRING,
    updated_at  TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(order_date))  -- Hidden Partitioning
TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.metadata.compression-codec' = 'gzip'
);

-- スキーマ進化(カラム追加)
ALTER TABLE lakehouse.orders ADD COLUMN shipping_method STRING;

-- パーティション進化(ダウンタイムなし)
ALTER TABLE lakehouse.orders
  ADD PARTITION FIELD truncate(category, 2);

-- タイムトラベルクエリ
SELECT * FROM lakehouse.orders
  FOR SYSTEM_TIME AS OF TIMESTAMP '2024-06-14 00:00:00';

-- スナップショット間の差分取得(Incremental Read)
SELECT * FROM lakehouse.orders
  FOR SYSTEM_VERSION BETWEEN 10 AND 20;

-- テーブルメンテナンス
CALL lakehouse.system.rewrite_data_files(
  table => 'lakehouse.orders',
  strategy => 'sort',
  sort_order => 'order_date ASC, customer_id ASC'
);

-- 古いスナップショットの削除
CALL lakehouse.system.expire_snapshots(
  table => 'lakehouse.orders',
  older_than => TIMESTAMP '2024-05-01 00:00:00',
  retain_last => 10
);

テーブルフォーマット比較

機能Delta LakeApache IcebergApache Hudi
ACIDトランザクションYesYesYes
タイムトラベルYesYesYes
Schema EvolutionYesYes(進化が柔軟)Yes
Partition Evolution限定的Yes(Hidden Partitioning)限定的
主要エコシステムDatabricksマルチエンジンAWS中心
エンジン互換性Spark中心Spark/Trino/Flink等Spark/Flink
テーブルフォーマット選定の指針
  • Delta Lake: Databricks環境で統一する場合。エコシステムが成熟しており、学習コストが低い
  • Apache Iceberg: マルチエンジン環境(Spark + Trino + Flink等)。パーティション進化が必要な場合。ベンダーニュートラルを重視する場合
  • Apache Hudi: CDC(Change Data Capture)中心のユースケース。UPSERTが頻繁な場合。AWSのEMR/Glueとの統合

現在のトレンドでは、Apache Iceberg が最も勢いがあり、Snowflake、BigQuery、Databricks全てがIceberg対応を進めています。


まとめ

ポイント内容
データレイク全形式のデータを安価に保存、Medallionアーキテクチャで整理
レイクハウスレイクにDWHの機能(ACID、スキーマ、インデックス)を追加
Delta LakeDatabricks中心、成熟したエコシステム
Icebergマルチエンジン対応、パーティション進化が強力

チェックリスト

  • データレイクとDWHの使い分けを説明できる
  • Medallionアーキテクチャ(Bronze/Silver/Gold)を理解した
  • Delta Lake/Icebergの主要機能(ACID、タイムトラベル)を理解した
  • テーブルフォーマットの選定基準を把握した

次のステップへ

次はOLAP設計とディメンショナルモデリングを学び、分析に最適化されたデータ構造を理解します。


推定読了時間: 40分