dbtモデルの設計と実装
田中VPoE「dbtの概念は理解できたね。次は実際にモデルを設計して書いてみよう。NetShop社のデータを使ってStagingからMartまで作っていくよ。」
あなた「SQLが書ければ大丈夫ですか?」
田中VPoE「基本はSQLだけど、dbt特有のベストプラクティスがある。レイヤーの責務分離や命名規則を守ることで、メンテナンスしやすいプロジェクトになるんだ。」
モデルのレイヤー設計
推奨ディレクトリ構成
models/
├── staging/ # ソースデータの標準化
│ ├── _sources.yml
│ ├── _stg_models.yml
│ ├── stg_orders.sql
│ ├── stg_customers.sql
│ └── stg_products.sql
├── intermediate/ # 複雑なロジックの分解
│ ├── int_orders_enriched.sql
│ └── int_customer_metrics.sql
└── marts/ # ビジネスユースケース別
├── marketing/
│ ├── mart_customer_ltv.sql
│ └── mart_campaign_roi.sql
└── finance/
├── mart_daily_revenue.sql
└── mart_monthly_summary.sql
各レイヤーの責務
| レイヤー | 責務 | 命名規則 | マテリアライゼーション |
|---|---|---|---|
| Staging | 型変換、リネーム、フィルタ | stg_{source}_{table} | view |
| Intermediate | 複雑なJOIN、集計の中間処理 | int_{概念}_{動詞} | ephemeral / view |
| Mart | ビジネスロジック、最終集計 | mart_{ドメイン}_{概念} | table / incremental |
Stagingモデルの実装
Stagingモデルは「ソースデータの窓口」です。ここで一度標準化しておくことで、下流のモデルがソースの変更に影響されにくくなります。
Stagingモデルのルール
- 1ソーステーブルにつき1モデル
- カラムのリネーム:ビジネスで使う名前に変換
- 型のキャスト:明示的に型を指定
- 基本的なフィルタ:削除済みレコードの除外など
- JOINは行わない
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
SELECT
id AS order_id,
user_id AS customer_id,
CAST(total_amount AS NUMERIC) AS order_amount,
CAST(tax_amount AS NUMERIC) AS tax_amount,
CAST(discount_amount AS NUMERIC) AS discount_amount,
status AS order_status,
CAST(created_at AS TIMESTAMP) AS ordered_at,
CAST(updated_at AS TIMESTAMP) AS updated_at
FROM source
WHERE is_deleted = FALSE
)
SELECT * FROM renamed
-- models/staging/stg_customers.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'customers') }}
),
renamed AS (
SELECT
id AS customer_id,
CONCAT(last_name, ' ', first_name) AS customer_name,
email,
CAST(registered_at AS TIMESTAMP) AS registered_at,
CASE
WHEN tier = 'G' THEN 'Gold'
WHEN tier = 'S' THEN 'Silver'
WHEN tier = 'B' THEN 'Bronze'
ELSE 'Unknown'
END AS customer_tier,
prefecture AS region
FROM source
WHERE is_deleted = FALSE
)
SELECT * FROM renamed
Intermediateモデルの実装
複雑な変換を分解し、可読性を高めます。
-- models/intermediate/int_orders_enriched.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
products AS (
SELECT * FROM {{ ref('stg_order_items') }}
),
enriched AS (
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.customer_tier,
c.region,
o.order_amount,
o.tax_amount,
o.discount_amount,
o.order_amount - o.discount_amount AS net_amount,
o.order_status,
o.ordered_at,
DATE_DIFF(o.ordered_at, c.registered_at, DAY) AS days_since_registration
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
)
SELECT * FROM enriched
Martモデルの実装
ビジネスユースケースに特化した集計テーブルです。
-- models/marts/marketing/mart_customer_ltv.sql
{{
config(
materialized='table',
partition_by={
"field": "last_order_date",
"data_type": "date",
"granularity": "month"
}
)
}}
WITH customer_orders AS (
SELECT
customer_id,
customer_name,
customer_tier,
region,
COUNT(*) AS total_orders,
SUM(net_amount) AS total_revenue,
AVG(net_amount) AS avg_order_value,
MIN(ordered_at) AS first_order_date,
MAX(ordered_at) AS last_order_date,
DATE_DIFF(MAX(ordered_at), MIN(ordered_at), DAY) AS customer_lifespan_days
FROM {{ ref('int_orders_enriched') }}
WHERE order_status = 'completed'
GROUP BY 1, 2, 3, 4
),
ltv_calculated AS (
SELECT
*,
CASE
WHEN total_orders >= 10 AND total_revenue >= 100000 THEN 'VIP'
WHEN total_orders >= 5 AND total_revenue >= 50000 THEN 'High'
WHEN total_orders >= 2 THEN 'Medium'
ELSE 'Low'
END AS ltv_segment,
SAFE_DIVIDE(total_revenue, GREATEST(customer_lifespan_days, 1)) * 365
AS annualized_revenue
FROM customer_orders
)
SELECT * FROM ltv_calculated
Incrementalモデル
大量データの場合、毎回全件再作成するのは非効率です。Incrementalモデルは差分のみを処理します。
-- models/marts/finance/mart_daily_revenue.sql
{{
config(
materialized='incremental',
unique_key='revenue_date',
on_schema_change='append_new_columns'
)
}}
WITH daily AS (
SELECT
DATE(ordered_at) AS revenue_date,
COUNT(DISTINCT customer_id) AS unique_customers,
COUNT(*) AS order_count,
SUM(net_amount) AS gross_revenue,
SUM(discount_amount) AS total_discounts,
SUM(net_amount) - SUM(discount_amount) AS net_revenue
FROM {{ ref('int_orders_enriched') }}
WHERE order_status = 'completed'
{% if is_incremental() %}
AND ordered_at > (SELECT MAX(revenue_date) FROM {{ this }})
{% endif %}
GROUP BY 1
)
SELECT * FROM daily
is_incremental() の動作
| 実行モード | is_incremental() | 動作 |
|---|---|---|
| 初回実行 | false | 全件処理(CREATE TABLE) |
| 2回目以降 | true | 差分のみ処理(MERGE/INSERT) |
--full-refresh | false | 全件再作成 |
まとめ
| 項目 | ポイント |
|---|---|
| レイヤー設計 | Staging → Intermediate → Mart |
| Staging | ソースの標準化、JOINなし、1 |
| Intermediate | 複雑なロジックの分解、可読性向上 |
| Mart | ビジネスユースケース別の最終出力 |
| Incremental | 差分処理で大量データに対応 |
チェックリスト
- 3層レイヤー(Staging/Intermediate/Mart)の責務を説明できる
- Stagingモデルのルールに従ってSQLを書ける
- ref()を使ったモデル間参照ができる
- Incrementalモデルの仕組みを理解している
次のステップへ
dbtモデルの設計と実装方法を学びました。次は、データの品質を担保するdbtのテスト機能について学びましょう。
推定読了時間:30分