ストーリー
ETL vs ELT
| 観点 | ETL | ELT |
|---|---|---|
| 処理順序 | Extract → Transform → Load | Extract → Load → Transform |
| 変換場所 | ETLエンジン上 | DWH/データレイク上 |
| スケーラビリティ | ETLサーバに依存 | DWHの計算能力を活用 |
| 適用場面 | オンプレミス、データクレンジングが複雑 | クラウドDWH、大量データ |
| 代表的ツール | Informatica, Talend | dbt, Airbyte + BigQuery |
graph LR
subgraph ETL
S1["Source"] --> E1["Extract"] --> T1["Transform<br/>(ETLサーバで変換)"] --> L1["Load"] --> D1["DWH"]
end
subgraph ELT
S2["Source"] --> E2["Extract"] --> L2["Load<br/>(生データをまずロード)"] --> D2["DWH"] --> T2["Transform<br/>(DWH内で変換 / SQLベース)"]
end
style T1 fill:#fff3e0,stroke:#e65100,stroke-width:2px
style T2 fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style D1 fill:#e8f5e9,stroke:#2e7d32
style D2 fill:#e8f5e9,stroke:#2e7d32
モダンデータスタックの構成
graph TD
subgraph Stack["モダンデータスタック"]
direction TB
A["Airbyte<br/>(Extract)"] --> S["Snowflake<br/>(Load)"] --> D["dbt<br/>(Transform)"]
D --> O["Dagster / Airflow<br/>(Orchestration)"]
O --> M["Metabase<br/>(BI)"]
O --> G["Grafana<br/>(Dashboard)"]
O --> L["Looker<br/>(Analytics)"]
end
style A fill:#e3f2fd,stroke:#1565c0
style S fill:#e8f5e9,stroke:#2e7d32
style D fill:#fff3e0,stroke:#e65100
style O fill:#f3e5f5,stroke:#6a1b9a
style M fill:#fce4ec,stroke:#c62828
style G fill:#fce4ec,stroke:#c62828
style L fill:#fce4ec,stroke:#c62828
Airbyte によるデータ抽出・ロード
# Airbyte Connection 設定例
# Source: PostgreSQL (運用DB)
source:
name: production-orders-db
connector: source-postgres
config:
host: orders-db.internal
port: 5432
database: orders_production
username: ${AIRBYTE_DB_USER}
password: ${AIRBYTE_DB_PASSWORD}
replication_method:
method: CDC # Change Data Capture
plugin: pgoutput
publication: airbyte_publication
initial_waiting_seconds: 300
# Destination: BigQuery
destination:
name: analytics-bigquery
connector: destination-bigquery
config:
project_id: analytics-project
dataset_id: raw_data
credentials_json: ${GCP_CREDENTIALS}
loading_method:
method: GCS Staging
gcs_bucket_name: airbyte-staging
gcs_bucket_path: staging/
# Sync設定
connection:
name: orders-to-bigquery
schedule:
type: cron
cron_expression: "0 */2 * * *" # 2時間ごと
sync_mode: incremental # 差分同期
destination_sync_mode: append_dedup
cursor_field: updated_at
primary_key: [id]
# ストリーム選択
streams:
- name: orders
sync_mode: incremental
- name: order_items
sync_mode: incremental
- name: customers
sync_mode: incremental
- name: products
sync_mode: full_refresh # マスタデータは全件同期
dbt によるデータ変換
dbt プロジェクト構成
graph TD
Root["dbt_project/"] --> Config["dbt_project.yml"]
Root --> Models["models/"]
Root --> Tests["tests/ -- カスタムテスト"]
Root --> Macros["macros/ -- 再利用可能なSQL"]
Root --> Seeds["seeds/ -- 静的データ"]
Models --> Staging["staging/
生データのクレンジング"]
Models --> Intermediate["intermediate/
中間変換"]
Models --> Marts["marts/
ビジネスロジック適用済み"]
Staging --> StgOrders["stg_orders.sql"]
Staging --> StgCustomers["stg_customers.sql"]
Staging --> StagingYml["_staging.yml
テスト・ドキュメント"]
Intermediate --> IntOrderItems["int_order_items_enriched.sql"]
Marts --> Finance["finance/"]
Marts --> Marketing["marketing/"]
Finance --> FctRevenue["fct_revenue.sql"]
Marketing --> DimCustomers["dim_customers.sql"]
style Root fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af
style Models fill:#d1fae5,stroke:#059669,color:#065f46
style Staging fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e
style Intermediate fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e
style Marts fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e
dbt モデルの実装
-- models/staging/stg_orders.sql
-- Staging: 生データのクレンジングと型変換
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='append_new_columns'
) }}
WITH source AS (
SELECT * FROM {{ source('raw_data', 'orders') }}
{% if is_incremental() %}
WHERE _airbyte_extracted_at > (
SELECT MAX(_airbyte_extracted_at) FROM {{ this }}
)
{% endif %}
),
cleaned AS (
SELECT
id AS order_id,
customer_id,
CAST(order_date AS TIMESTAMP) AS order_date,
CAST(total_amount AS DECIMAL(10, 2)) AS total_amount,
UPPER(TRIM(status)) AS status,
CASE
WHEN status IN ('completed', 'delivered') THEN TRUE
ELSE FALSE
END AS is_completed,
created_at,
updated_at
FROM source
WHERE id IS NOT NULL
AND total_amount >= 0
)
SELECT * FROM cleaned
-- models/marts/finance/fct_revenue.sql
-- Mart: ビジネスメトリクス
{{ config(
materialized='table',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "month"
},
cluster_by=['category', 'region']
) }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
WHERE is_completed = TRUE
),
order_items AS (
SELECT * FROM {{ ref('int_order_items_enriched') }}
),
customers AS (
SELECT * FROM {{ ref('dim_customers') }}
)
SELECT
o.order_id,
o.order_date,
c.customer_segment,
c.region,
oi.category,
oi.product_name,
oi.quantity,
oi.unit_price,
oi.quantity * oi.unit_price AS line_total,
o.total_amount AS order_total,
-- ビジネスメトリクス
CASE
WHEN c.first_order_date = o.order_date THEN 'new'
ELSE 'returning'
END AS customer_type,
DATE_DIFF(o.order_date, c.first_order_date, DAY) AS days_since_first_order
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN customers c ON o.customer_id = c.customer_id
# models/staging/_staging.yml
# テストとドキュメントの定義
version: 2
models:
- name: stg_orders
description: "クレンジング済み注文データ"
columns:
- name: order_id
description: "注文ID"
tests:
- unique
- not_null
- name: customer_id
description: "顧客ID"
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: total_amount
description: "注文合計金額"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 10000000
- name: status
description: "注文ステータス"
tests:
- accepted_values:
values: ['PENDING', 'CONFIRMED', 'SHIPPED', 'DELIVERED', 'COMPLETED', 'CANCELLED']
オーケストレーション
Dagster による統合パイプライン
# Dagster: Asset-based orchestration
from dagster import (
asset, AssetIn, DailyPartitionsDefinition,
FreshnessPolicy, AutoMaterializePolicy,
)
daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
@asset(
partitions_def=daily_partitions,
freshness_policy=FreshnessPolicy(maximum_lag_minutes=180),
description="Airbyte経由で取得した生注文データ",
)
def raw_orders(context) -> None:
"""Airbyte sync をトリガー"""
partition_date = context.partition_key
trigger_airbyte_sync(
connection_id="orders-to-bigquery",
state={"cursor": partition_date},
)
@asset(
ins={"raw_orders": AssetIn()},
partitions_def=daily_partitions,
description="dbt staging モデル: クレンジング済み注文",
)
def stg_orders(context, raw_orders) -> None:
"""dbt run --select stg_orders"""
partition_date = context.partition_key
run_dbt_model("stg_orders", vars={"date": partition_date})
@asset(
ins={"stg_orders": AssetIn()},
partitions_def=daily_partitions,
auto_materialize_policy=AutoMaterializePolicy.eager(),
description="売上分析マート",
)
def fct_revenue(context, stg_orders) -> None:
"""dbt run --select fct_revenue"""
partition_date = context.partition_key
run_dbt_model("fct_revenue", vars={"date": partition_date})
# データ品質チェック
run_dbt_test("fct_revenue")
# 完了通知
context.log.info(f"Revenue mart updated for {partition_date}")
Airflow vs Dagster の比較
| 観点 | Airflow | Dagster |
|---|---|---|
| 抽象化単位 | Task(タスク) | Asset(データ資産) |
| パラダイム | タスク中心(何をするか) | データ中心(何を作るか) |
| テスト | DAGのユニットテストが困難 | アセットのユニットテスト可能 |
| ローカル開発 | Docker Compose必要 | dagster devで即起動 |
| UI | タスク実行履歴中心 | データリネージュ可視化 |
| 成熟度 | 高い(大規模実績多数) | 中程度(急成長中) |
大規模な既存Airflow環境がある場合はAirflowを継続。新規構築ならDagsterの方がモダンな設計思想に基づいています。
まとめ
| ポイント | 内容 |
|---|---|
| ETL vs ELT | クラウドDWH環境ではELT(dbt)が主流 |
| Airbyte | CDCによる差分同期で運用DBの負荷を最小化 |
| dbt | SQLベースの変換、テスト、ドキュメントの統合 |
| オーケストレーション | Dagster/Airflowで依存関係と実行スケジュールを管理 |
チェックリスト
- ETLとELTの違いと適用場面を説明できる
- モダンデータスタックの構成要素を理解した
- dbtのモデル設計(staging/intermediate/marts)を理解した
- オーケストレーションツールの選定基準を把握した
次のステップへ
次はデータ品質と整合性の管理方法を学び、パイプライン全体の信頼性を確保する手法を理解します。
推定読了時間: 40分