EXERCISE 90分

演習:dbtでデータ変換パイプラインを構築しよう

田中VPoE「dbtの基礎からテスト、高度な機能まで学んだ。ここでNetShop社のデータ変換パイプラインを実際に構築してみよう。」

あなた「staging → intermediate → mart の3層構造で設計するんですね。テストとドキュメントも含めて実装します。」

田中VPoE「その通り。本番運用を意識して、データ品質テストも忘れずにな。」

ミッション概要

NetShop社のEC売上分析基盤に必要なdbtプロジェクトを構築します。rawデータからビジネスで利用可能なmartテーブルまでの変換パイプラインを設計・実装します。

前提条件

  • Step 2の各レッスン(dbt概要、モデリング、テスト、高度な機能)を修了していること
  • dbt Core がインストール済みであること

Mission 1: staging層の構築(30分)

rawデータからstaging層のモデルを作成してください。

ソースデータ

# models/staging/sources.yml
version: 2
sources:
  - name: raw
    schema: raw_data
    tables:
      - name: orders
        columns:
          - name: order_id
          - name: customer_id
          - name: order_date
          - name: total_amount
          - name: status
      - name: customers
        columns:
          - name: customer_id
          - name: name
          - name: email
          - name: region
          - name: created_at
      - name: products
        columns:
          - name: product_id
          - name: product_name
          - name: category
          - name: price

タスク

3つのstagingモデルを作成し、スキーマファイルにテストを定義してください。

解答例
-- models/staging/stg_orders.sql
with source as (
    select * from {{ source('raw', 'orders') }}
),

cleaned as (
    select
        order_id,
        customer_id,
        cast(order_date as date) as order_date,
        cast(total_amount as numeric(12, 2)) as total_amount,
        lower(trim(status)) as status
    from source
    where order_id is not null
)

select * from cleaned
-- models/staging/stg_customers.sql
with source as (
    select * from {{ source('raw', 'customers') }}
),

cleaned as (
    select
        customer_id,
        trim(name) as customer_name,
        lower(trim(email)) as email,
        trim(region) as region,
        cast(created_at as timestamp) as created_at
    from source
    where customer_id is not null
)

select * from cleaned
-- models/staging/stg_products.sql
with source as (
    select * from {{ source('raw', 'products') }}
),

cleaned as (
    select
        product_id,
        trim(product_name) as product_name,
        trim(category) as category,
        cast(price as numeric(10, 2)) as price
    from source
    where product_id is not null
)

select * from cleaned
# models/staging/schema.yml
version: 2
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - not_null
          - unique
      - name: status
        tests:
          - accepted_values:
              values: ['completed', 'pending', 'cancelled', 'shipped']
  - name: stg_customers
    columns:
      - name: customer_id
        tests:
          - not_null
          - unique
      - name: email
        tests:
          - not_null
  - name: stg_products
    columns:
      - name: product_id
        tests:
          - not_null
          - unique

Mission 2: mart層の構築(30分)

staging層のモデルを組み合わせて、ビジネスで利用するmartモデルを作成してください。

要件

  1. mart_customer_summary: 顧客ごとの注文サマリー
  2. mart_monthly_revenue: 月次売上レポート
解答例
-- models/mart/mart_customer_summary.sql
with orders as (
    select * from {{ ref('stg_orders') }}
    where status = 'completed'
),

customers as (
    select * from {{ ref('stg_customers') }}
),

order_stats as (
    select
        customer_id,
        count(*) as total_orders,
        sum(total_amount) as lifetime_value,
        avg(total_amount) as avg_order_amount,
        min(order_date) as first_order_date,
        max(order_date) as last_order_date,
        datediff('day', max(order_date), current_date) as days_since_last_order
    from orders
    group by customer_id
)

select
    c.customer_id,
    c.customer_name,
    c.region,
    c.created_at,
    coalesce(o.total_orders, 0) as total_orders,
    coalesce(o.lifetime_value, 0) as lifetime_value,
    coalesce(o.avg_order_amount, 0) as avg_order_amount,
    o.first_order_date,
    o.last_order_date,
    coalesce(o.days_since_last_order, 999) as days_since_last_order,
    case
        when o.days_since_last_order <= 30 then 'Active'
        when o.days_since_last_order <= 90 then 'At Risk'
        else 'Churned'
    end as customer_status
from customers c
left join order_stats o on c.customer_id = o.customer_id
-- models/mart/mart_monthly_revenue.sql
with orders as (
    select * from {{ ref('stg_orders') }}
    where status = 'completed'
),

monthly as (
    select
        date_trunc('month', order_date) as month,
        count(distinct customer_id) as unique_customers,
        count(*) as total_orders,
        sum(total_amount) as revenue,
        avg(total_amount) as avg_order_value
    from orders
    group by date_trunc('month', order_date)
)

select
    month,
    unique_customers,
    total_orders,
    revenue,
    avg_order_value,
    lag(revenue) over (order by month) as prev_month_revenue,
    round((revenue - lag(revenue) over (order by month))
        / nullif(lag(revenue) over (order by month), 0) * 100, 1) as revenue_growth_pct
from monthly
order by month

Mission 3: テストとドキュメントの整備(30分)

mart層のテストとプロジェクトドキュメントを整備してください。

要件

  1. martモデルのスキーマテスト定義
  2. カスタムテスト(データ品質チェック)の実装
  3. モデルのdescription追加
解答例
# models/mart/schema.yml
version: 2
models:
  - name: mart_customer_summary
    description: "顧客ごとの注文サマリーテーブル。注文統計と顧客ステータスを含む。"
    columns:
      - name: customer_id
        description: "顧客の一意識別子"
        tests:
          - not_null
          - unique
      - name: total_orders
        description: "完了済み注文の合計数"
        tests:
          - not_null
      - name: lifetime_value
        description: "顧客の累計購入金額"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
      - name: customer_status
        description: "顧客ステータス(Active/At Risk/Churned)"
        tests:
          - accepted_values:
              values: ['Active', 'At Risk', 'Churned']

  - name: mart_monthly_revenue
    description: "月次売上レポート。前月比成長率を含む。"
    columns:
      - name: month
        tests:
          - not_null
          - unique
      - name: revenue
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
-- tests/assert_no_negative_lifetime_value.sql
select
    customer_id,
    lifetime_value
from {{ ref('mart_customer_summary') }}
where lifetime_value < 0
-- tests/assert_revenue_consistency.sql
select
    month,
    revenue,
    total_orders
from {{ ref('mart_monthly_revenue') }}
where total_orders > 0 and revenue = 0

達成度チェック

  • staging層の3モデルを作成し、クレンジングロジックを実装できた
  • staging層にスキーマテストを定義できた
  • mart層で顧客サマリーと月次売上レポートを作成できた
  • カスタムテストでデータ品質チェックを実装できた
  • モデルにdescriptionを追加しドキュメントを整備できた

推定所要時間:90分