EXERCISE 60分

ストーリー

佐藤CTO
データの流れを設計する力を実践で試そう
佐藤CTO
バッチ、ストリーム、ETL/ELT、品質管理。これらを統合してエンドツーエンドのパイプラインを設計できるかが、データアーキテクトの力量だ

ミッション概要

ミッションテーマ目安時間
Mission 1バッチパイプライン設計15分
Mission 2ストリーム処理設計15分
Mission 3dbtモデル設計15分
Mission 4データ品質戦略15分

前提シナリオ

ECサイト「ShopMaster」のデータパイプラインを設計します。

データソース:

  • 運用DB(PostgreSQL): 注文、顧客、商品
  • アクセスログ(S3): 行動ログ(ページビュー、クリック、検索)
  • 外部API: 広告データ(Google Ads, Meta Ads)
  • Kafka: リアルタイムイベント(カート追加、購入、レビュー)

分析要件:

  • 日次売上レポート(翌朝9時までに更新)
  • リアルタイムダッシュボード(5分以内の遅延)
  • 顧客セグメンテーション分析(週次)
  • 広告ROI分析(日次)

Mission 1: バッチパイプライン設計(15分)

要件

日次売上レポートと広告ROI分析のバッチパイプラインを Airflow DAG として設計してください。データソース、依存関係、スケジュール、エラー処理を含めること。

解答例
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
    'execution_timeout': timedelta(hours=3),
    'on_failure_callback': alert_slack_and_pagerduty,
}

with DAG(
    dag_id='daily_sales_and_ads_pipeline',
    default_args=default_args,
    schedule='0 2 * * *',  # 毎日AM2時
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
) as dag:

    # ===== データ取得フェーズ =====
    # 並列実行: 3つのデータソースを同時に取得
    extract_orders = PythonOperator(
        task_id='extract_orders',
        python_callable=airbyte_sync,
        op_kwargs={'connection': 'orders-db', 'date': '{{ ds }}'},
    )

    wait_for_access_logs = S3KeySensor(
        task_id='wait_for_access_logs',
        bucket_name='raw-logs',
        bucket_key='access_logs/dt={{ ds }}/_SUCCESS',
        timeout=7200,
    )

    extract_ad_data = PythonOperator(
        task_id='extract_ad_data',
        python_callable=fetch_ad_api_data,
        op_kwargs={'platforms': ['google_ads', 'meta_ads'], 'date': '{{ ds }}'},
    )

    # ===== 品質チェックフェーズ =====
    quality_gate = PythonOperator(
        task_id='quality_gate',
        python_callable=run_great_expectations,
        op_kwargs={'suite': 'raw_data_quality', 'date': '{{ ds }}'},
    )

    # ===== 変換フェーズ =====
    dbt_staging = PythonOperator(
        task_id='dbt_staging',
        python_callable=run_dbt,
        op_kwargs={'select': 'staging', 'vars': {'date': '{{ ds }}'}},
    )

    dbt_marts = PythonOperator(
        task_id='dbt_marts',
        python_callable=run_dbt,
        op_kwargs={'select': 'marts.finance marts.marketing', 'vars': {'date': '{{ ds }}'}},
    )

    dbt_tests = PythonOperator(
        task_id='dbt_tests',
        python_callable=run_dbt_test,
        op_kwargs={'select': 'marts'},
    )

    # ===== 配信フェーズ =====
    refresh_dashboard = PythonOperator(
        task_id='refresh_dashboard',
        python_callable=refresh_metabase_cache,
    )

    notify_completion = PythonOperator(
        task_id='notify_completion',
        python_callable=send_slack_notification,
        op_kwargs={'message': '{{ ds }} の日次レポートが完了しました'},
        trigger_rule=TriggerRule.ALL_SUCCESS,
    )

    # ===== 依存関係 =====
    [extract_orders, wait_for_access_logs, extract_ad_data] >> quality_gate
    quality_gate >> dbt_staging >> dbt_marts >> dbt_tests
    dbt_tests >> refresh_dashboard >> notify_completion

設計判断:

  • 3つのデータソースは独立しているため並列取得
  • 品質ゲートを変換前に設置し、不正データの混入を防止
  • AM2時開始 → AM9時までに完了(7時間の余裕)
  • リトライ3回、10分間隔で一時的な障害に対応

Mission 2: ストリーム処理設計(15分)

要件

リアルタイムダッシュボード用のストリーム処理を設計してください。カート追加から購入までのファネル分析を5分ウィンドウで集計すること。

解答例
// Kafka Streams によるリアルタイムファネル分析

interface UserEvent {
  userId: string;
  sessionId: string;
  eventType: 'page_view' | 'search' | 'product_view' | 'cart_add' | 'checkout_start' | 'purchase';
  productId?: string;
  amount?: number;
  eventTime: string;
}

interface FunnelMetrics {
  windowStart: string;
  windowEnd: string;
  pageViews: number;
  searches: number;
  productViews: number;
  cartAdds: number;
  checkoutStarts: number;
  purchases: number;
  conversionRate: number;       // cart_add → purchase
  cartAbandonmentRate: number;  // cart_add したが purchase しない
  totalRevenue: number;
}

class FunnelStreamProcessor {
  private windowSize = 5 * 60 * 1000; // 5分

  async process(event: UserEvent): Promise<void> {
    const windowKey = this.getWindowKey(event.eventTime);

    // 1. ウィンドウ内メトリクスの更新(Redis Sorted Set使用)
    await this.redis.hincrby(
      `funnel:${windowKey}`,
      event.eventType,
      1
    );

    // 2. ユニークユーザー追跡(HyperLogLog)
    await this.redis.pfadd(
      `funnel:${windowKey}:unique:${event.eventType}`,
      event.userId
    );

    // 3. 購入の場合は売上を加算
    if (event.eventType === 'purchase' && event.amount) {
      await this.redis.hincrbyfloat(
        `funnel:${windowKey}`,
        'total_revenue',
        event.amount
      );
    }

    // 4. セッション単位のファネル追跡
    await this.redis.sadd(
      `session:${event.sessionId}:events`,
      event.eventType
    );
    await this.redis.expire(`session:${event.sessionId}:events`, 3600);
  }

  // 5分ごとにウィンドウを集計して出力
  async emitWindowMetrics(windowKey: string): Promise<FunnelMetrics> {
    const raw = await this.redis.hgetall(`funnel:${windowKey}`);
    const cartAdds = parseInt(raw.cart_add || '0');
    const purchases = parseInt(raw.purchase || '0');

    return {
      windowStart: this.windowKeyToTime(windowKey),
      windowEnd: this.windowKeyToTime(windowKey, this.windowSize),
      pageViews: parseInt(raw.page_view || '0'),
      searches: parseInt(raw.search || '0'),
      productViews: parseInt(raw.product_view || '0'),
      cartAdds,
      checkoutStarts: parseInt(raw.checkout_start || '0'),
      purchases,
      conversionRate: cartAdds > 0 ? (purchases / cartAdds) * 100 : 0,
      cartAbandonmentRate: cartAdds > 0 ? ((cartAdds - purchases) / cartAdds) * 100 : 0,
      totalRevenue: parseFloat(raw.total_revenue || '0'),
    };
  }
}

設計判断:

  • Redisのインメモリ処理で低レイテンシを実現
  • HyperLogLogでユニークユーザー数を近似計算(メモリ効率)
  • セッション追跡はTTL付きSetで自動クリーンアップ

Mission 3: dbtモデル設計(15分)

要件

顧客セグメンテーション分析のためのdbtモデル(staging → intermediate → marts)を設計してください。RFM分析(Recency, Frequency, Monetary)を実装すること。

解答例
-- models/staging/stg_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}

SELECT
    id AS order_id,
    customer_id,
    CAST(order_date AS DATE) AS order_date,
    CAST(total_amount AS DECIMAL(10,2)) AS total_amount,
    status,
    updated_at
FROM {{ source('raw', 'orders') }}
WHERE status = 'completed'
{% if is_incremental() %}
AND updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

-- models/intermediate/int_customer_rfm.sql
{{ config(materialized='table') }}

WITH customer_orders AS (
    SELECT
        customer_id,
        MAX(order_date) AS last_order_date,
        COUNT(DISTINCT order_id) AS order_count,
        SUM(total_amount) AS total_spent,
        AVG(total_amount) AS avg_order_value,
        MIN(order_date) AS first_order_date
    FROM {{ ref('stg_orders') }}
    GROUP BY customer_id
),

rfm_scores AS (
    SELECT
        customer_id,
        last_order_date,
        order_count,
        total_spent,
        avg_order_value,
        first_order_date,
        DATE_DIFF(CURRENT_DATE(), last_order_date, DAY) AS recency_days,
        NTILE(5) OVER (ORDER BY DATE_DIFF(CURRENT_DATE(), last_order_date, DAY) ASC) AS r_score,
        NTILE(5) OVER (ORDER BY order_count ASC) AS f_score,
        NTILE(5) OVER (ORDER BY total_spent ASC) AS m_score
    FROM customer_orders
)

SELECT
    *,
    r_score * 100 + f_score * 10 + m_score AS rfm_score,
    CASE
        WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champion'
        WHEN r_score >= 4 AND f_score >= 3 THEN 'Loyal Customer'
        WHEN r_score >= 4 AND f_score <= 2 THEN 'New Customer'
        WHEN r_score <= 2 AND f_score >= 3 AND m_score >= 3 THEN 'At Risk'
        WHEN r_score <= 2 AND f_score >= 4 THEN 'Cannot Lose'
        WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
        ELSE 'Others'
    END AS customer_segment
FROM rfm_scores

-- models/marts/marketing/dim_customers.sql
{{ config(
    materialized='table',
    cluster_by=['customer_segment', 'region']
) }}

SELECT
    c.customer_id,
    c.name,
    c.email,
    c.region,
    c.registration_date,
    r.customer_segment,
    r.rfm_score,
    r.r_score AS recency_score,
    r.f_score AS frequency_score,
    r.m_score AS monetary_score,
    r.recency_days,
    r.order_count AS lifetime_orders,
    r.total_spent AS lifetime_value,
    r.avg_order_value,
    r.first_order_date,
    r.last_order_date,
    DATE_DIFF(r.last_order_date, r.first_order_date, DAY) AS customer_lifetime_days
FROM {{ ref('stg_customers') }} c
LEFT JOIN {{ ref('int_customer_rfm') }} r ON c.customer_id = r.customer_id

設計判断:

  • Staging: インクリメンタルで差分処理、completedのみフィルタ
  • Intermediate: RFMスコア計算(NTILE関数で5分位に分割)
  • Marts: 顧客マスタとRFMを結合した分析用ディメンションテーブル

Mission 4: データ品質戦略(15分)

要件

ShopMaster全体のデータ品質戦略を策定してください。各パイプラインの品質チェックポイント、データコントラクト、アラート設計を含めること。

解答例
パイプラインチェックポイント品質ルールアラート
注文データ取込Extract直後NULL率<0.1%, 重複率<0.01%Slack (warning)
品質ゲートTransform前行数±30%以内, スキーマ一致PagerDuty (critical)
dbt stagingTransform後ユニーク制約, FK整合性Slack (critical)
dbt martsMarts完成時金額合計一致, セグメント分布Slack (warning)
ダッシュボード配信前鮮度<3時間, 数値範囲メール (info)
# データコントラクト: 注文イベント
apiVersion: datacontract/v1
kind: DataContract
metadata:
  name: shopmaster-orders
  version: "1.0.0"
  owner: order-team
  consumers: [analytics-team, marketing-team]

spec:
  slo:
    freshness: {maxDelaySeconds: 7200}  # 2時間
    completeness: {minPercentage: 99.9}
    availability: {uptimePercentage: 99.9}

  qualityChecks:
    - name: "注文金額の妥当性"
      type: range
      column: total_amount
      min: 1
      max: 5000000
      severity: critical

    - name: "注文日の妥当性"
      type: freshness
      column: order_date
      maxAge: "48h"
      severity: warning

    - name: "DBとDWHの整合性"
      type: cross_system_reconciliation
      source: "SELECT COUNT(*), SUM(amount) FROM orders WHERE date = '{{ ds }}'"
      target: "SELECT COUNT(*), SUM(total_amount) FROM fct_revenue WHERE order_date = '{{ ds }}'"
      tolerance: 0.001  # 0.1%以内
      severity: critical

  alerting:
    escalation:
      - level: 1
        channel: slack
        wait: 0
      - level: 2
        channel: pagerduty
        wait: 30  # 30分未解決で
      - level: 3
        channel: email_to_cto
        wait: 120  # 2時間未解決で

まとめ

ポイント内容
バッチパイプライン並列取得→品質ゲート→変換→配信の流れ
ストリーム処理ウィンドウ集計でリアルタイムメトリクスを算出
dbtモデルstaging→intermediate→martsの階層設計
品質戦略各段階でのチェックポイントとエスカレーション

チェックリスト

  • Airflow DAGでバッチパイプラインを設計できた
  • ストリーム処理でリアルタイム集計を設計できた
  • dbtの3層モデル設計でRFM分析を実装できた
  • エンドツーエンドのデータ品質戦略を策定できた

次のステップへ

次はチェックポイントクイズでデータパイプライン設計の理解度を確認します。


推定読了時間: 60分