ストーリー
ミッション概要
| ミッション | テーマ | 目安時間 |
|---|---|---|
| Mission 1 | バッチパイプライン設計 | 15分 |
| Mission 2 | ストリーム処理設計 | 15分 |
| Mission 3 | dbtモデル設計 | 15分 |
| Mission 4 | データ品質戦略 | 15分 |
前提シナリオ
ECサイト「ShopMaster」のデータパイプラインを設計します。
データソース:
- 運用DB(PostgreSQL): 注文、顧客、商品
- アクセスログ(S3): 行動ログ(ページビュー、クリック、検索)
- 外部API: 広告データ(Google Ads, Meta Ads)
- Kafka: リアルタイムイベント(カート追加、購入、レビュー)
分析要件:
- 日次売上レポート(翌朝9時までに更新)
- リアルタイムダッシュボード(5分以内の遅延)
- 顧客セグメンテーション分析(週次)
- 広告ROI分析(日次)
Mission 1: バッチパイプライン設計(15分)
要件
日次売上レポートと広告ROI分析のバッチパイプラインを Airflow DAG として設計してください。データソース、依存関係、スケジュール、エラー処理を含めること。
解答例
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=10),
'execution_timeout': timedelta(hours=3),
'on_failure_callback': alert_slack_and_pagerduty,
}
with DAG(
dag_id='daily_sales_and_ads_pipeline',
default_args=default_args,
schedule='0 2 * * *', # 毎日AM2時
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
) as dag:
# ===== データ取得フェーズ =====
# 並列実行: 3つのデータソースを同時に取得
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=airbyte_sync,
op_kwargs={'connection': 'orders-db', 'date': '{{ ds }}'},
)
wait_for_access_logs = S3KeySensor(
task_id='wait_for_access_logs',
bucket_name='raw-logs',
bucket_key='access_logs/dt={{ ds }}/_SUCCESS',
timeout=7200,
)
extract_ad_data = PythonOperator(
task_id='extract_ad_data',
python_callable=fetch_ad_api_data,
op_kwargs={'platforms': ['google_ads', 'meta_ads'], 'date': '{{ ds }}'},
)
# ===== 品質チェックフェーズ =====
quality_gate = PythonOperator(
task_id='quality_gate',
python_callable=run_great_expectations,
op_kwargs={'suite': 'raw_data_quality', 'date': '{{ ds }}'},
)
# ===== 変換フェーズ =====
dbt_staging = PythonOperator(
task_id='dbt_staging',
python_callable=run_dbt,
op_kwargs={'select': 'staging', 'vars': {'date': '{{ ds }}'}},
)
dbt_marts = PythonOperator(
task_id='dbt_marts',
python_callable=run_dbt,
op_kwargs={'select': 'marts.finance marts.marketing', 'vars': {'date': '{{ ds }}'}},
)
dbt_tests = PythonOperator(
task_id='dbt_tests',
python_callable=run_dbt_test,
op_kwargs={'select': 'marts'},
)
# ===== 配信フェーズ =====
refresh_dashboard = PythonOperator(
task_id='refresh_dashboard',
python_callable=refresh_metabase_cache,
)
notify_completion = PythonOperator(
task_id='notify_completion',
python_callable=send_slack_notification,
op_kwargs={'message': '{{ ds }} の日次レポートが完了しました'},
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# ===== 依存関係 =====
[extract_orders, wait_for_access_logs, extract_ad_data] >> quality_gate
quality_gate >> dbt_staging >> dbt_marts >> dbt_tests
dbt_tests >> refresh_dashboard >> notify_completion
設計判断:
- 3つのデータソースは独立しているため並列取得
- 品質ゲートを変換前に設置し、不正データの混入を防止
- AM2時開始 → AM9時までに完了(7時間の余裕)
- リトライ3回、10分間隔で一時的な障害に対応
Mission 2: ストリーム処理設計(15分)
要件
リアルタイムダッシュボード用のストリーム処理を設計してください。カート追加から購入までのファネル分析を5分ウィンドウで集計すること。
解答例
// Kafka Streams によるリアルタイムファネル分析
interface UserEvent {
userId: string;
sessionId: string;
eventType: 'page_view' | 'search' | 'product_view' | 'cart_add' | 'checkout_start' | 'purchase';
productId?: string;
amount?: number;
eventTime: string;
}
interface FunnelMetrics {
windowStart: string;
windowEnd: string;
pageViews: number;
searches: number;
productViews: number;
cartAdds: number;
checkoutStarts: number;
purchases: number;
conversionRate: number; // cart_add → purchase
cartAbandonmentRate: number; // cart_add したが purchase しない
totalRevenue: number;
}
class FunnelStreamProcessor {
private windowSize = 5 * 60 * 1000; // 5分
async process(event: UserEvent): Promise<void> {
const windowKey = this.getWindowKey(event.eventTime);
// 1. ウィンドウ内メトリクスの更新(Redis Sorted Set使用)
await this.redis.hincrby(
`funnel:${windowKey}`,
event.eventType,
1
);
// 2. ユニークユーザー追跡(HyperLogLog)
await this.redis.pfadd(
`funnel:${windowKey}:unique:${event.eventType}`,
event.userId
);
// 3. 購入の場合は売上を加算
if (event.eventType === 'purchase' && event.amount) {
await this.redis.hincrbyfloat(
`funnel:${windowKey}`,
'total_revenue',
event.amount
);
}
// 4. セッション単位のファネル追跡
await this.redis.sadd(
`session:${event.sessionId}:events`,
event.eventType
);
await this.redis.expire(`session:${event.sessionId}:events`, 3600);
}
// 5分ごとにウィンドウを集計して出力
async emitWindowMetrics(windowKey: string): Promise<FunnelMetrics> {
const raw = await this.redis.hgetall(`funnel:${windowKey}`);
const cartAdds = parseInt(raw.cart_add || '0');
const purchases = parseInt(raw.purchase || '0');
return {
windowStart: this.windowKeyToTime(windowKey),
windowEnd: this.windowKeyToTime(windowKey, this.windowSize),
pageViews: parseInt(raw.page_view || '0'),
searches: parseInt(raw.search || '0'),
productViews: parseInt(raw.product_view || '0'),
cartAdds,
checkoutStarts: parseInt(raw.checkout_start || '0'),
purchases,
conversionRate: cartAdds > 0 ? (purchases / cartAdds) * 100 : 0,
cartAbandonmentRate: cartAdds > 0 ? ((cartAdds - purchases) / cartAdds) * 100 : 0,
totalRevenue: parseFloat(raw.total_revenue || '0'),
};
}
}
設計判断:
- Redisのインメモリ処理で低レイテンシを実現
- HyperLogLogでユニークユーザー数を近似計算(メモリ効率)
- セッション追跡はTTL付きSetで自動クリーンアップ
Mission 3: dbtモデル設計(15分)
要件
顧客セグメンテーション分析のためのdbtモデル(staging → intermediate → marts)を設計してください。RFM分析(Recency, Frequency, Monetary)を実装すること。
解答例
-- models/staging/stg_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}
SELECT
id AS order_id,
customer_id,
CAST(order_date AS DATE) AS order_date,
CAST(total_amount AS DECIMAL(10,2)) AS total_amount,
status,
updated_at
FROM {{ source('raw', 'orders') }}
WHERE status = 'completed'
{% if is_incremental() %}
AND updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
-- models/intermediate/int_customer_rfm.sql
{{ config(materialized='table') }}
WITH customer_orders AS (
SELECT
customer_id,
MAX(order_date) AS last_order_date,
COUNT(DISTINCT order_id) AS order_count,
SUM(total_amount) AS total_spent,
AVG(total_amount) AS avg_order_value,
MIN(order_date) AS first_order_date
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
),
rfm_scores AS (
SELECT
customer_id,
last_order_date,
order_count,
total_spent,
avg_order_value,
first_order_date,
DATE_DIFF(CURRENT_DATE(), last_order_date, DAY) AS recency_days,
NTILE(5) OVER (ORDER BY DATE_DIFF(CURRENT_DATE(), last_order_date, DAY) ASC) AS r_score,
NTILE(5) OVER (ORDER BY order_count ASC) AS f_score,
NTILE(5) OVER (ORDER BY total_spent ASC) AS m_score
FROM customer_orders
)
SELECT
*,
r_score * 100 + f_score * 10 + m_score AS rfm_score,
CASE
WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champion'
WHEN r_score >= 4 AND f_score >= 3 THEN 'Loyal Customer'
WHEN r_score >= 4 AND f_score <= 2 THEN 'New Customer'
WHEN r_score <= 2 AND f_score >= 3 AND m_score >= 3 THEN 'At Risk'
WHEN r_score <= 2 AND f_score >= 4 THEN 'Cannot Lose'
WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
ELSE 'Others'
END AS customer_segment
FROM rfm_scores
-- models/marts/marketing/dim_customers.sql
{{ config(
materialized='table',
cluster_by=['customer_segment', 'region']
) }}
SELECT
c.customer_id,
c.name,
c.email,
c.region,
c.registration_date,
r.customer_segment,
r.rfm_score,
r.r_score AS recency_score,
r.f_score AS frequency_score,
r.m_score AS monetary_score,
r.recency_days,
r.order_count AS lifetime_orders,
r.total_spent AS lifetime_value,
r.avg_order_value,
r.first_order_date,
r.last_order_date,
DATE_DIFF(r.last_order_date, r.first_order_date, DAY) AS customer_lifetime_days
FROM {{ ref('stg_customers') }} c
LEFT JOIN {{ ref('int_customer_rfm') }} r ON c.customer_id = r.customer_id
設計判断:
- Staging: インクリメンタルで差分処理、completedのみフィルタ
- Intermediate: RFMスコア計算(NTILE関数で5分位に分割)
- Marts: 顧客マスタとRFMを結合した分析用ディメンションテーブル
Mission 4: データ品質戦略(15分)
要件
ShopMaster全体のデータ品質戦略を策定してください。各パイプラインの品質チェックポイント、データコントラクト、アラート設計を含めること。
解答例
| パイプライン | チェックポイント | 品質ルール | アラート |
|---|---|---|---|
| 注文データ取込 | Extract直後 | NULL率<0.1%, 重複率<0.01% | Slack (warning) |
| 品質ゲート | Transform前 | 行数±30%以内, スキーマ一致 | PagerDuty (critical) |
| dbt staging | Transform後 | ユニーク制約, FK整合性 | Slack (critical) |
| dbt marts | Marts完成時 | 金額合計一致, セグメント分布 | Slack (warning) |
| ダッシュボード | 配信前 | 鮮度<3時間, 数値範囲 | メール (info) |
# データコントラクト: 注文イベント
apiVersion: datacontract/v1
kind: DataContract
metadata:
name: shopmaster-orders
version: "1.0.0"
owner: order-team
consumers: [analytics-team, marketing-team]
spec:
slo:
freshness: {maxDelaySeconds: 7200} # 2時間
completeness: {minPercentage: 99.9}
availability: {uptimePercentage: 99.9}
qualityChecks:
- name: "注文金額の妥当性"
type: range
column: total_amount
min: 1
max: 5000000
severity: critical
- name: "注文日の妥当性"
type: freshness
column: order_date
maxAge: "48h"
severity: warning
- name: "DBとDWHの整合性"
type: cross_system_reconciliation
source: "SELECT COUNT(*), SUM(amount) FROM orders WHERE date = '{{ ds }}'"
target: "SELECT COUNT(*), SUM(total_amount) FROM fct_revenue WHERE order_date = '{{ ds }}'"
tolerance: 0.001 # 0.1%以内
severity: critical
alerting:
escalation:
- level: 1
channel: slack
wait: 0
- level: 2
channel: pagerduty
wait: 30 # 30分未解決で
- level: 3
channel: email_to_cto
wait: 120 # 2時間未解決で
まとめ
| ポイント | 内容 |
|---|---|
| バッチパイプライン | 並列取得→品質ゲート→変換→配信の流れ |
| ストリーム処理 | ウィンドウ集計でリアルタイムメトリクスを算出 |
| dbtモデル | staging→intermediate→martsの階層設計 |
| 品質戦略 | 各段階でのチェックポイントとエスカレーション |
チェックリスト
- Airflow DAGでバッチパイプラインを設計できた
- ストリーム処理でリアルタイム集計を設計できた
- dbtの3層モデル設計でRFM分析を実装できた
- エンドツーエンドのデータ品質戦略を策定できた
次のステップへ
次はチェックポイントクイズでデータパイプライン設計の理解度を確認します。
推定読了時間: 60分