ストーリー
佐
佐藤CTO
DWHは構造化データの分析に最適だが、画像、ログ、IoTデータなど非構造化データも扱う必要がある
佐
佐藤CTO
データレイクは全てのデータを保存する。ただし”データスワンプ”にならないための設計が重要だ。そしてレイクハウスは、レイクとDWHの良いところを統合する新しいパラダイムだ
データレイクの基本設計
データレイク vs DWH
| 観点 | データレイク | DWH |
|---|
| データ形式 | 構造化/半構造化/非構造化 | 構造化のみ |
| スキーマ | Schema-on-Read | Schema-on-Write |
| ストレージ | オブジェクトストレージ(安価) | 専用ストレージ(高価) |
| ユーザー | データサイエンティスト | ビジネスアナリスト |
| 処理エンジン | Spark, Presto, Athena | DWHネイティブエンジン |
S3ベースのデータレイク設計
graph TD
Root["s3://data-lake/"] --> Raw["raw/
Bronze: 生データ"]
Root --> Processed["processed/
Silver: クレンジング済み"]
Root --> Curated["curated/
Gold: 分析・ML用に最適化"]
Root --> Meta["_metadata/"]
Raw --> RawOrders["orders/"]
Raw --> Logs["access_logs/"]
Raw --> Images["images/"]
RawOrders --> Postgres["source=postgres/
dt=2024-06-15/"]
RawOrders --> Shopify["source=shopify/
dt=2024-06-15/"]
Postgres --> ParquetFile["part-00000.parquet
_SUCCESS"]
Logs --> LogDate["dt=2024-06-15/hour=14/"]
LogDate --> Events["events.json.gz"]
Images --> Products["products/2024/06/"]
Processed --> ProcOrders["orders/
dt=2024-06-15/"]
Processed --> Sessions["user_sessions/
dt=2024-06-15/"]
Curated --> C360["customer_360/"]
Curated --> Recommend["product_recommendations/"]
Curated --> Revenue["revenue_daily/"]
Meta --> Schemas["schemas/"]
Meta --> QReports["quality_reports/"]
Meta --> Lineage["lineage/"]
style Root fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af
style Raw fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e
style Processed fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af
style Curated fill:#d1fae5,stroke:#059669,color:#065f46
style Meta fill:#f3f4f6,stroke:#9ca3af,color:#374151
Medallionアーキテクチャ
graph LR
Bronze["Bronze(Raw)
生データそのまま
JSON, CSV等
Schema-on-Read
保持: 永久
目的: 再処理可能"]
Silver["Silver(Cleaned)
クレンジング
型変換、重複排除
標準化
保持: 3年
目的: 信頼できるデータ"]
Gold["Gold(Business)
ビジネス集計
非正規化
ML特徴量
保持: 用途による
目的: 消費最適化"]
Bronze --> Silver --> Gold
style Bronze fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e
style Silver fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af
style Gold fill:#d1fae5,stroke:#059669,color:#065f46
データレイクハウス
レイクハウスの解決する課題
| データレイクの課題 | レイクハウスの解決策 |
|---|
| ACIDトランザクション不可 | テーブルフォーマットでACID保証 |
| スキーマ強制なし | Schema Evolution対応 |
| 小ファイル問題 | 自動コンパクション |
| タイムトラベルなし | バージョニング |
| インデックスなし | Z-Order, Data Skipping |
Delta Lake
# Delta Lake: ACIDトランザクション対応のレイクハウス
from delta import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# === Delta テーブルへの書き込み ===
# 初回書き込み
orders_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("order_date") \
.save("s3://lakehouse/orders")
# 増分書き込み(MERGE / UPSERT)
delta_table = DeltaTable.forPath(spark, "s3://lakehouse/orders")
delta_table.alias("target").merge(
new_orders_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(set={
"status": "source.status",
"total_amount": "source.total_amount",
"updated_at": "source.updated_at",
}).whenNotMatchedInsertAll().execute()
# === タイムトラベル ===
# 特定バージョンのデータを読む
orders_v5 = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("s3://lakehouse/orders")
# 特定時刻のデータを読む
orders_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-06-14") \
.load("s3://lakehouse/orders")
# 変更履歴の確認
delta_table.history().select(
"version", "timestamp", "operation",
"operationParameters", "operationMetrics"
).show()
# === 最適化 ===
# 小ファイルのコンパクション
delta_table.optimize().executeCompaction()
# Z-Order インデックス(特定カラムでのフィルタ高速化)
delta_table.optimize() \
.where("order_date >= '2024-06-01'") \
.executeZOrderBy("customer_id", "category")
Apache Iceberg
-- Apache Iceberg: オープンテーブルフォーマット
-- テーブル作成
CREATE TABLE lakehouse.orders (
order_id STRING,
customer_id STRING,
order_date DATE,
category STRING,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(order_date)) -- Hidden Partitioning
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.metadata.compression-codec' = 'gzip'
);
-- スキーマ進化(カラム追加)
ALTER TABLE lakehouse.orders ADD COLUMN shipping_method STRING;
-- パーティション進化(ダウンタイムなし)
ALTER TABLE lakehouse.orders
ADD PARTITION FIELD truncate(category, 2);
-- タイムトラベルクエリ
SELECT * FROM lakehouse.orders
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-06-14 00:00:00';
-- スナップショット間の差分取得(Incremental Read)
SELECT * FROM lakehouse.orders
FOR SYSTEM_VERSION BETWEEN 10 AND 20;
-- テーブルメンテナンス
CALL lakehouse.system.rewrite_data_files(
table => 'lakehouse.orders',
strategy => 'sort',
sort_order => 'order_date ASC, customer_id ASC'
);
-- 古いスナップショットの削除
CALL lakehouse.system.expire_snapshots(
table => 'lakehouse.orders',
older_than => TIMESTAMP '2024-05-01 00:00:00',
retain_last => 10
);
テーブルフォーマット比較
| 機能 | Delta Lake | Apache Iceberg | Apache Hudi |
|---|
| ACIDトランザクション | Yes | Yes | Yes |
| タイムトラベル | Yes | Yes | Yes |
| Schema Evolution | Yes | Yes(進化が柔軟) | Yes |
| Partition Evolution | 限定的 | Yes(Hidden Partitioning) | 限定的 |
| 主要エコシステム | Databricks | マルチエンジン | AWS中心 |
| エンジン互換性 | Spark中心 | Spark/Trino/Flink等 | Spark/Flink |
テーブルフォーマット選定の指針
- Delta Lake: Databricks環境で統一する場合。エコシステムが成熟しており、学習コストが低い
- Apache Iceberg: マルチエンジン環境(Spark + Trino + Flink等)。パーティション進化が必要な場合。ベンダーニュートラルを重視する場合
- Apache Hudi: CDC(Change Data Capture)中心のユースケース。UPSERTが頻繁な場合。AWSのEMR/Glueとの統合
現在のトレンドでは、Apache Iceberg が最も勢いがあり、Snowflake、BigQuery、Databricks全てがIceberg対応を進めています。
まとめ
| ポイント | 内容 |
|---|
| データレイク | 全形式のデータを安価に保存、Medallionアーキテクチャで整理 |
| レイクハウス | レイクにDWHの機能(ACID、スキーマ、インデックス)を追加 |
| Delta Lake | Databricks中心、成熟したエコシステム |
| Iceberg | マルチエンジン対応、パーティション進化が強力 |
チェックリスト
次のステップへ
次はOLAP設計とディメンショナルモデリングを学び、分析に最適化されたデータ構造を理解します。
推定読了時間: 40分