LESSON 40分

ストーリー

佐藤CTO
データは散在している。システムA、システムB、外部SaaS…それらを統合して意味のある分析基盤を作るのがパイプラインの仕事だ
佐藤CTO
ETLかELTか。ツール選定としてはAirbyte、dbt、Airflow、Dagster。それぞれの強みを理解して、目的に応じたパイプラインを設計しよう

ETL vs ELT

観点ETLELT
処理順序Extract → Transform → LoadExtract → Load → Transform
変換場所ETLエンジン上DWH/データレイク上
スケーラビリティETLサーバに依存DWHの計算能力を活用
適用場面オンプレミス、データクレンジングが複雑クラウドDWH、大量データ
代表的ツールInformatica, Talenddbt, Airbyte + BigQuery
graph LR
    subgraph ETL
        S1["Source"] --> E1["Extract"] --> T1["Transform<br/>(ETLサーバで変換)"] --> L1["Load"] --> D1["DWH"]
    end

    subgraph ELT
        S2["Source"] --> E2["Extract"] --> L2["Load<br/>(生データをまずロード)"] --> D2["DWH"] --> T2["Transform<br/>(DWH内で変換 / SQLベース)"]
    end

    style T1 fill:#fff3e0,stroke:#e65100,stroke-width:2px
    style T2 fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
    style D1 fill:#e8f5e9,stroke:#2e7d32
    style D2 fill:#e8f5e9,stroke:#2e7d32

モダンデータスタックの構成

graph TD
    subgraph Stack["モダンデータスタック"]
        direction TB
        A["Airbyte<br/>(Extract)"] --> S["Snowflake<br/>(Load)"] --> D["dbt<br/>(Transform)"]
        D --> O["Dagster / Airflow<br/>(Orchestration)"]
        O --> M["Metabase<br/>(BI)"]
        O --> G["Grafana<br/>(Dashboard)"]
        O --> L["Looker<br/>(Analytics)"]
    end

    style A fill:#e3f2fd,stroke:#1565c0
    style S fill:#e8f5e9,stroke:#2e7d32
    style D fill:#fff3e0,stroke:#e65100
    style O fill:#f3e5f5,stroke:#6a1b9a
    style M fill:#fce4ec,stroke:#c62828
    style G fill:#fce4ec,stroke:#c62828
    style L fill:#fce4ec,stroke:#c62828

Airbyte によるデータ抽出・ロード

# Airbyte Connection 設定例

# Source: PostgreSQL (運用DB)
source:
  name: production-orders-db
  connector: source-postgres
  config:
    host: orders-db.internal
    port: 5432
    database: orders_production
    username: ${AIRBYTE_DB_USER}
    password: ${AIRBYTE_DB_PASSWORD}
    replication_method:
      method: CDC  # Change Data Capture
      plugin: pgoutput
      publication: airbyte_publication
      initial_waiting_seconds: 300

# Destination: BigQuery
destination:
  name: analytics-bigquery
  connector: destination-bigquery
  config:
    project_id: analytics-project
    dataset_id: raw_data
    credentials_json: ${GCP_CREDENTIALS}
    loading_method:
      method: GCS Staging
      gcs_bucket_name: airbyte-staging
      gcs_bucket_path: staging/

# Sync設定
connection:
  name: orders-to-bigquery
  schedule:
    type: cron
    cron_expression: "0 */2 * * *"  # 2時間ごと
  sync_mode: incremental  # 差分同期
  destination_sync_mode: append_dedup
  cursor_field: updated_at
  primary_key: [id]

  # ストリーム選択
  streams:
    - name: orders
      sync_mode: incremental
    - name: order_items
      sync_mode: incremental
    - name: customers
      sync_mode: incremental
    - name: products
      sync_mode: full_refresh  # マスタデータは全件同期

dbt によるデータ変換

dbt プロジェクト構成

graph TD
    Root["dbt_project/"] --> Config["dbt_project.yml"]
    Root --> Models["models/"]
    Root --> Tests["tests/ -- カスタムテスト"]
    Root --> Macros["macros/ -- 再利用可能なSQL"]
    Root --> Seeds["seeds/ -- 静的データ"]

    Models --> Staging["staging/
生データのクレンジング"] Models --> Intermediate["intermediate/
中間変換"] Models --> Marts["marts/
ビジネスロジック適用済み"] Staging --> StgOrders["stg_orders.sql"] Staging --> StgCustomers["stg_customers.sql"] Staging --> StagingYml["_staging.yml
テスト・ドキュメント"] Intermediate --> IntOrderItems["int_order_items_enriched.sql"] Marts --> Finance["finance/"] Marts --> Marketing["marketing/"] Finance --> FctRevenue["fct_revenue.sql"] Marketing --> DimCustomers["dim_customers.sql"] style Root fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af style Models fill:#d1fae5,stroke:#059669,color:#065f46 style Staging fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e style Intermediate fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e style Marts fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e

dbt モデルの実装

-- models/staging/stg_orders.sql
-- Staging: 生データのクレンジングと型変換

{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='append_new_columns'
) }}

WITH source AS (
    SELECT * FROM {{ source('raw_data', 'orders') }}
    {% if is_incremental() %}
    WHERE _airbyte_extracted_at > (
        SELECT MAX(_airbyte_extracted_at) FROM {{ this }}
    )
    {% endif %}
),

cleaned AS (
    SELECT
        id AS order_id,
        customer_id,
        CAST(order_date AS TIMESTAMP) AS order_date,
        CAST(total_amount AS DECIMAL(10, 2)) AS total_amount,
        UPPER(TRIM(status)) AS status,
        CASE
            WHEN status IN ('completed', 'delivered') THEN TRUE
            ELSE FALSE
        END AS is_completed,
        created_at,
        updated_at
    FROM source
    WHERE id IS NOT NULL
      AND total_amount >= 0
)

SELECT * FROM cleaned
-- models/marts/finance/fct_revenue.sql
-- Mart: ビジネスメトリクス

{{ config(
    materialized='table',
    partition_by={
        "field": "order_date",
        "data_type": "date",
        "granularity": "month"
    },
    cluster_by=['category', 'region']
) }}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
    WHERE is_completed = TRUE
),

order_items AS (
    SELECT * FROM {{ ref('int_order_items_enriched') }}
),

customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
)

SELECT
    o.order_id,
    o.order_date,
    c.customer_segment,
    c.region,
    oi.category,
    oi.product_name,
    oi.quantity,
    oi.unit_price,
    oi.quantity * oi.unit_price AS line_total,
    o.total_amount AS order_total,
    -- ビジネスメトリクス
    CASE
        WHEN c.first_order_date = o.order_date THEN 'new'
        ELSE 'returning'
    END AS customer_type,
    DATE_DIFF(o.order_date, c.first_order_date, DAY) AS days_since_first_order
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN customers c ON o.customer_id = c.customer_id
# models/staging/_staging.yml
# テストとドキュメントの定義

version: 2

models:
  - name: stg_orders
    description: "クレンジング済み注文データ"
    columns:
      - name: order_id
        description: "注文ID"
        tests:
          - unique
          - not_null
      - name: customer_id
        description: "顧客ID"
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id
      - name: total_amount
        description: "注文合計金額"
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 10000000
      - name: status
        description: "注文ステータス"
        tests:
          - accepted_values:
              values: ['PENDING', 'CONFIRMED', 'SHIPPED', 'DELIVERED', 'COMPLETED', 'CANCELLED']

オーケストレーション

Dagster による統合パイプライン

# Dagster: Asset-based orchestration

from dagster import (
    asset, AssetIn, DailyPartitionsDefinition,
    FreshnessPolicy, AutoMaterializePolicy,
)

daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(
    partitions_def=daily_partitions,
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=180),
    description="Airbyte経由で取得した生注文データ",
)
def raw_orders(context) -> None:
    """Airbyte sync をトリガー"""
    partition_date = context.partition_key
    trigger_airbyte_sync(
        connection_id="orders-to-bigquery",
        state={"cursor": partition_date},
    )

@asset(
    ins={"raw_orders": AssetIn()},
    partitions_def=daily_partitions,
    description="dbt staging モデル: クレンジング済み注文",
)
def stg_orders(context, raw_orders) -> None:
    """dbt run --select stg_orders"""
    partition_date = context.partition_key
    run_dbt_model("stg_orders", vars={"date": partition_date})

@asset(
    ins={"stg_orders": AssetIn()},
    partitions_def=daily_partitions,
    auto_materialize_policy=AutoMaterializePolicy.eager(),
    description="売上分析マート",
)
def fct_revenue(context, stg_orders) -> None:
    """dbt run --select fct_revenue"""
    partition_date = context.partition_key
    run_dbt_model("fct_revenue", vars={"date": partition_date})

    # データ品質チェック
    run_dbt_test("fct_revenue")

    # 完了通知
    context.log.info(f"Revenue mart updated for {partition_date}")
Airflow vs Dagster の比較
観点AirflowDagster
抽象化単位Task(タスク)Asset(データ資産)
パラダイムタスク中心(何をするか)データ中心(何を作るか)
テストDAGのユニットテストが困難アセットのユニットテスト可能
ローカル開発Docker Compose必要dagster devで即起動
UIタスク実行履歴中心データリネージュ可視化
成熟度高い(大規模実績多数)中程度(急成長中)

大規模な既存Airflow環境がある場合はAirflowを継続。新規構築ならDagsterの方がモダンな設計思想に基づいています。


まとめ

ポイント内容
ETL vs ELTクラウドDWH環境ではELT(dbt)が主流
AirbyteCDCによる差分同期で運用DBの負荷を最小化
dbtSQLベースの変換、テスト、ドキュメントの統合
オーケストレーションDagster/Airflowで依存関係と実行スケジュールを管理

チェックリスト

  • ETLとELTの違いと適用場面を説明できる
  • モダンデータスタックの構成要素を理解した
  • dbtのモデル設計(staging/intermediate/marts)を理解した
  • オーケストレーションツールの選定基準を把握した

次のステップへ

次はデータ品質と整合性の管理方法を学び、パイプライン全体の信頼性を確保する手法を理解します。


推定読了時間: 40分