演習:dbtでデータ変換パイプラインを構築しよう
田中VPoE「dbtの基礎からテスト、高度な機能まで学んだ。ここでNetShop社のデータ変換パイプラインを実際に構築してみよう。」
あなた「staging → intermediate → mart の3層構造で設計するんですね。テストとドキュメントも含めて実装します。」
田中VPoE「その通り。本番運用を意識して、データ品質テストも忘れずにな。」
ミッション概要
NetShop社のEC売上分析基盤に必要なdbtプロジェクトを構築します。rawデータからビジネスで利用可能なmartテーブルまでの変換パイプラインを設計・実装します。
前提条件
- Step 2の各レッスン(dbt概要、モデリング、テスト、高度な機能)を修了していること
- dbt Core がインストール済みであること
Mission 1: staging層の構築(30分)
rawデータからstaging層のモデルを作成してください。
ソースデータ
# models/staging/sources.yml
version: 2
sources:
- name: raw
schema: raw_data
tables:
- name: orders
columns:
- name: order_id
- name: customer_id
- name: order_date
- name: total_amount
- name: status
- name: customers
columns:
- name: customer_id
- name: name
- name: email
- name: region
- name: created_at
- name: products
columns:
- name: product_id
- name: product_name
- name: category
- name: price
タスク
3つのstagingモデルを作成し、スキーマファイルにテストを定義してください。
解答例
-- models/staging/stg_orders.sql
with source as (
select * from {{ source('raw', 'orders') }}
),
cleaned as (
select
order_id,
customer_id,
cast(order_date as date) as order_date,
cast(total_amount as numeric(12, 2)) as total_amount,
lower(trim(status)) as status
from source
where order_id is not null
)
select * from cleaned
-- models/staging/stg_customers.sql
with source as (
select * from {{ source('raw', 'customers') }}
),
cleaned as (
select
customer_id,
trim(name) as customer_name,
lower(trim(email)) as email,
trim(region) as region,
cast(created_at as timestamp) as created_at
from source
where customer_id is not null
)
select * from cleaned
-- models/staging/stg_products.sql
with source as (
select * from {{ source('raw', 'products') }}
),
cleaned as (
select
product_id,
trim(product_name) as product_name,
trim(category) as category,
cast(price as numeric(10, 2)) as price
from source
where product_id is not null
)
select * from cleaned
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- not_null
- unique
- name: status
tests:
- accepted_values:
values: ['completed', 'pending', 'cancelled', 'shipped']
- name: stg_customers
columns:
- name: customer_id
tests:
- not_null
- unique
- name: email
tests:
- not_null
- name: stg_products
columns:
- name: product_id
tests:
- not_null
- unique
Mission 2: mart層の構築(30分)
staging層のモデルを組み合わせて、ビジネスで利用するmartモデルを作成してください。
要件
mart_customer_summary: 顧客ごとの注文サマリーmart_monthly_revenue: 月次売上レポート
解答例
-- models/mart/mart_customer_summary.sql
with orders as (
select * from {{ ref('stg_orders') }}
where status = 'completed'
),
customers as (
select * from {{ ref('stg_customers') }}
),
order_stats as (
select
customer_id,
count(*) as total_orders,
sum(total_amount) as lifetime_value,
avg(total_amount) as avg_order_amount,
min(order_date) as first_order_date,
max(order_date) as last_order_date,
datediff('day', max(order_date), current_date) as days_since_last_order
from orders
group by customer_id
)
select
c.customer_id,
c.customer_name,
c.region,
c.created_at,
coalesce(o.total_orders, 0) as total_orders,
coalesce(o.lifetime_value, 0) as lifetime_value,
coalesce(o.avg_order_amount, 0) as avg_order_amount,
o.first_order_date,
o.last_order_date,
coalesce(o.days_since_last_order, 999) as days_since_last_order,
case
when o.days_since_last_order <= 30 then 'Active'
when o.days_since_last_order <= 90 then 'At Risk'
else 'Churned'
end as customer_status
from customers c
left join order_stats o on c.customer_id = o.customer_id
-- models/mart/mart_monthly_revenue.sql
with orders as (
select * from {{ ref('stg_orders') }}
where status = 'completed'
),
monthly as (
select
date_trunc('month', order_date) as month,
count(distinct customer_id) as unique_customers,
count(*) as total_orders,
sum(total_amount) as revenue,
avg(total_amount) as avg_order_value
from orders
group by date_trunc('month', order_date)
)
select
month,
unique_customers,
total_orders,
revenue,
avg_order_value,
lag(revenue) over (order by month) as prev_month_revenue,
round((revenue - lag(revenue) over (order by month))
/ nullif(lag(revenue) over (order by month), 0) * 100, 1) as revenue_growth_pct
from monthly
order by month
Mission 3: テストとドキュメントの整備(30分)
mart層のテストとプロジェクトドキュメントを整備してください。
要件
- martモデルのスキーマテスト定義
- カスタムテスト(データ品質チェック)の実装
- モデルのdescription追加
解答例
# models/mart/schema.yml
version: 2
models:
- name: mart_customer_summary
description: "顧客ごとの注文サマリーテーブル。注文統計と顧客ステータスを含む。"
columns:
- name: customer_id
description: "顧客の一意識別子"
tests:
- not_null
- unique
- name: total_orders
description: "完了済み注文の合計数"
tests:
- not_null
- name: lifetime_value
description: "顧客の累計購入金額"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
- name: customer_status
description: "顧客ステータス(Active/At Risk/Churned)"
tests:
- accepted_values:
values: ['Active', 'At Risk', 'Churned']
- name: mart_monthly_revenue
description: "月次売上レポート。前月比成長率を含む。"
columns:
- name: month
tests:
- not_null
- unique
- name: revenue
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
-- tests/assert_no_negative_lifetime_value.sql
select
customer_id,
lifetime_value
from {{ ref('mart_customer_summary') }}
where lifetime_value < 0
-- tests/assert_revenue_consistency.sql
select
month,
revenue,
total_orders
from {{ ref('mart_monthly_revenue') }}
where total_orders > 0 and revenue = 0
達成度チェック
- staging層の3モデルを作成し、クレンジングロジックを実装できた
- staging層にスキーマテストを定義できた
- mart層で顧客サマリーと月次売上レポートを作成できた
- カスタムテストでデータ品質チェックを実装できた
- モデルにdescriptionを追加しドキュメントを整備できた
推定所要時間:90分