演習:ストリーミングパイプラインを設計しよう
田中VPoE「ストリーミングの基礎、Kafka、処理パターン、アーキテクチャを学んだ。NetShop社のリアルタイムデータ基盤を設計してみよう。」
あなた「注文イベントのリアルタイム集計と、異常検知の仕組みを構築するんですね。」
田中VPoE「そうだ。加えて、バッチ処理との統合も含めたアーキテクチャ設計をしてほしい。」
ミッション概要
NetShop社のリアルタイム注文処理パイプラインを設計・実装します。Kafkaによるイベント収集、ストリーム処理、リアルタイム集計、異常検知を含みます。
前提条件
- Step 4の各レッスン(ストリーミング基礎、Kafka入門、ストリーム処理、Lambda/Kappaアーキテクチャ)を修了していること
- PythonとKafkaの基本概念を理解していること
Mission 1: イベントスキーマとProducerの設計(30分)
NetShop社の注文イベントのスキーマを設計し、Producerを実装してください。
要件
- 注文イベント、在庫イベント、顧客行動イベントの3つのスキーマを定義
- スキーマバリデーション付きのProducerクラスを実装
- パーティションキーの設計
解答例
from dataclasses import dataclass, asdict
from datetime import datetime
import json
@dataclass
class OrderEvent:
order_id: str
customer_id: int
items: list
total_amount: float
status: str # created, paid, shipped, delivered, cancelled
region: str
timestamp: str
@dataclass
class InventoryEvent:
product_id: str
warehouse_id: str
quantity_change: int
current_stock: int
event_type: str # sold, restocked, returned
timestamp: str
@dataclass
class CustomerBehaviorEvent:
customer_id: int
event_type: str # page_view, add_to_cart, purchase, search
product_id: str | None
session_id: str
timestamp: str
class EventProducer:
TOPIC_MAP = {
OrderEvent: "orders",
InventoryEvent: "inventory",
CustomerBehaviorEvent: "customer-behavior",
}
PARTITION_KEY_MAP = {
OrderEvent: lambda e: str(e.customer_id),
InventoryEvent: lambda e: e.product_id,
CustomerBehaviorEvent: lambda e: str(e.customer_id),
}
def __init__(self, bootstrap_servers: str):
from confluent_kafka import Producer
self.producer = Producer({
"bootstrap.servers": bootstrap_servers,
"acks": "all",
})
def send(self, event):
event_type = type(event)
topic = self.TOPIC_MAP[event_type]
key = self.PARTITION_KEY_MAP[event_type](event)
self.producer.produce(
topic=topic,
key=key.encode(),
value=json.dumps(asdict(event)).encode(),
)
self.producer.flush()
Mission 2: リアルタイム集約エンジンの実装(30分)
注文イベントをリアルタイムで集約するエンジンを実装してください。
要件
- 5分間のタンブリングウィンドウで注文数・売上を集計
- 地域別の売上ランキング
- 異常検知(直近ウィンドウの注文数が過去平均の3倍以上でアラート)
解答例
from collections import defaultdict
from datetime import datetime, timedelta
import statistics
class RealTimeAggregator:
def __init__(self, window_minutes: int = 5):
self.window_size = timedelta(minutes=window_minutes)
self.windows = defaultdict(lambda: {
"order_count": 0,
"total_revenue": 0,
"by_region": defaultdict(float),
"customers": set(),
})
self.historical_counts = []
def _window_key(self, ts: datetime) -> str:
seconds = ts.timestamp()
window_start = seconds - (seconds % self.window_size.total_seconds())
return datetime.fromtimestamp(window_start).isoformat()
def process(self, event: dict) -> dict | None:
ts = datetime.fromisoformat(event["timestamp"])
key = self._window_key(ts)
w = self.windows[key]
w["order_count"] += 1
w["total_revenue"] += event["total_amount"]
w["by_region"][event["region"]] += event["total_amount"]
w["customers"].add(event["customer_id"])
# 異常検知
alert = self._check_anomaly(key, w["order_count"])
if alert:
return alert
return None
def _check_anomaly(self, window_key: str, current_count: int) -> dict | None:
if len(self.historical_counts) < 5:
return None
avg = statistics.mean(self.historical_counts[-20:])
if current_count > avg * 3:
return {
"type": "ORDER_SPIKE",
"window": window_key,
"current": current_count,
"average": round(avg, 1),
"ratio": round(current_count / avg, 1),
}
return None
def close_window(self, window_key: str) -> dict:
w = self.windows[window_key]
self.historical_counts.append(w["order_count"])
region_ranking = sorted(
w["by_region"].items(), key=lambda x: x[1], reverse=True
)
result = {
"window": window_key,
"order_count": w["order_count"],
"total_revenue": w["total_revenue"],
"unique_customers": len(w["customers"]),
"avg_order_value": (
w["total_revenue"] / w["order_count"]
if w["order_count"] > 0 else 0
),
"top_regions": region_ranking[:5],
}
del self.windows[window_key]
return result
Mission 3: アーキテクチャ設計書の作成(30分)
NetShop社のリアルタイムデータ基盤のアーキテクチャ設計書を作成してください。
要件
- バッチ処理とストリーミング処理の統合アーキテクチャ(Lambda or Kappa)の選択と理由
- トピック設計(トピック名、パーティション数、保持期間)
- Consumer Group設計と障害時の対応
解答例
アーキテクチャ選択: Lambda(段階的にKappaへ移行)
理由:
- 既存のバッチパイプライン(Airflow + dbt)を活かす
- リアルタイム機能を段階的に追加
- 正確な日次レポートはバッチで担保
トピック設計:
| トピック | パーティション数 | 保持期間 | キー |
|---|---|---|---|
| orders | 12 | 30日 | customer_id |
| inventory | 6 | 7日 | product_id |
| customer-behavior | 24 | 3日 | customer_id |
| orders-dlq | 3 | 90日 | order_id |
| alerts | 3 | 7日 | alert_type |
Consumer Group設計:
| Group | トピック | インスタンス数 | 処理内容 |
|---|---|---|---|
| order-aggregator | orders | 4 | リアルタイム売上集計 |
| fraud-detector | orders | 3 | 不正注文検知 |
| inventory-updater | inventory | 2 | 在庫数更新 |
| behavior-tracker | customer-behavior | 6 | 行動分析 |
| batch-sink | orders, inventory | 2 | S3へのバッチ用データ蓄積 |
障害時の対応:
- Consumer障害: Consumer Groupのリバランスで自動復旧
- Broker障害: レプリケーションによるLeader切り替え
- 処理エラー: DLQに退避 + アラート発報
達成度チェック
- 3種類のイベントスキーマとバリデーション付きProducerを設計できた
- ウィンドウ集約と異常検知を含むリアルタイム処理エンジンを実装できた
- バッチとストリーミングの統合アーキテクチャを設計できた
- トピック設計とConsumer Group設計を適切に行えた
- 障害時の対応策を定義できた
推定所要時間:90分