EXERCISE 90分

演習:ストリーミングパイプラインを設計しよう

田中VPoE「ストリーミングの基礎、Kafka、処理パターン、アーキテクチャを学んだ。NetShop社のリアルタイムデータ基盤を設計してみよう。」

あなた「注文イベントのリアルタイム集計と、異常検知の仕組みを構築するんですね。」

田中VPoE「そうだ。加えて、バッチ処理との統合も含めたアーキテクチャ設計をしてほしい。」

ミッション概要

NetShop社のリアルタイム注文処理パイプラインを設計・実装します。Kafkaによるイベント収集、ストリーム処理、リアルタイム集計、異常検知を含みます。

前提条件

  • Step 4の各レッスン(ストリーミング基礎、Kafka入門、ストリーム処理、Lambda/Kappaアーキテクチャ)を修了していること
  • PythonとKafkaの基本概念を理解していること

Mission 1: イベントスキーマとProducerの設計(30分)

NetShop社の注文イベントのスキーマを設計し、Producerを実装してください。

要件

  1. 注文イベント、在庫イベント、顧客行動イベントの3つのスキーマを定義
  2. スキーマバリデーション付きのProducerクラスを実装
  3. パーティションキーの設計
解答例
from dataclasses import dataclass, asdict
from datetime import datetime
import json

@dataclass
class OrderEvent:
    order_id: str
    customer_id: int
    items: list
    total_amount: float
    status: str  # created, paid, shipped, delivered, cancelled
    region: str
    timestamp: str

@dataclass
class InventoryEvent:
    product_id: str
    warehouse_id: str
    quantity_change: int
    current_stock: int
    event_type: str  # sold, restocked, returned
    timestamp: str

@dataclass
class CustomerBehaviorEvent:
    customer_id: int
    event_type: str  # page_view, add_to_cart, purchase, search
    product_id: str | None
    session_id: str
    timestamp: str

class EventProducer:
    TOPIC_MAP = {
        OrderEvent: "orders",
        InventoryEvent: "inventory",
        CustomerBehaviorEvent: "customer-behavior",
    }

    PARTITION_KEY_MAP = {
        OrderEvent: lambda e: str(e.customer_id),
        InventoryEvent: lambda e: e.product_id,
        CustomerBehaviorEvent: lambda e: str(e.customer_id),
    }

    def __init__(self, bootstrap_servers: str):
        from confluent_kafka import Producer
        self.producer = Producer({
            "bootstrap.servers": bootstrap_servers,
            "acks": "all",
        })

    def send(self, event):
        event_type = type(event)
        topic = self.TOPIC_MAP[event_type]
        key = self.PARTITION_KEY_MAP[event_type](event)

        self.producer.produce(
            topic=topic,
            key=key.encode(),
            value=json.dumps(asdict(event)).encode(),
        )
        self.producer.flush()

Mission 2: リアルタイム集約エンジンの実装(30分)

注文イベントをリアルタイムで集約するエンジンを実装してください。

要件

  1. 5分間のタンブリングウィンドウで注文数・売上を集計
  2. 地域別の売上ランキング
  3. 異常検知(直近ウィンドウの注文数が過去平均の3倍以上でアラート)
解答例
from collections import defaultdict
from datetime import datetime, timedelta
import statistics

class RealTimeAggregator:
    def __init__(self, window_minutes: int = 5):
        self.window_size = timedelta(minutes=window_minutes)
        self.windows = defaultdict(lambda: {
            "order_count": 0,
            "total_revenue": 0,
            "by_region": defaultdict(float),
            "customers": set(),
        })
        self.historical_counts = []

    def _window_key(self, ts: datetime) -> str:
        seconds = ts.timestamp()
        window_start = seconds - (seconds % self.window_size.total_seconds())
        return datetime.fromtimestamp(window_start).isoformat()

    def process(self, event: dict) -> dict | None:
        ts = datetime.fromisoformat(event["timestamp"])
        key = self._window_key(ts)
        w = self.windows[key]

        w["order_count"] += 1
        w["total_revenue"] += event["total_amount"]
        w["by_region"][event["region"]] += event["total_amount"]
        w["customers"].add(event["customer_id"])

        # 異常検知
        alert = self._check_anomaly(key, w["order_count"])
        if alert:
            return alert
        return None

    def _check_anomaly(self, window_key: str, current_count: int) -> dict | None:
        if len(self.historical_counts) < 5:
            return None

        avg = statistics.mean(self.historical_counts[-20:])
        if current_count > avg * 3:
            return {
                "type": "ORDER_SPIKE",
                "window": window_key,
                "current": current_count,
                "average": round(avg, 1),
                "ratio": round(current_count / avg, 1),
            }
        return None

    def close_window(self, window_key: str) -> dict:
        w = self.windows[window_key]
        self.historical_counts.append(w["order_count"])

        region_ranking = sorted(
            w["by_region"].items(), key=lambda x: x[1], reverse=True
        )

        result = {
            "window": window_key,
            "order_count": w["order_count"],
            "total_revenue": w["total_revenue"],
            "unique_customers": len(w["customers"]),
            "avg_order_value": (
                w["total_revenue"] / w["order_count"]
                if w["order_count"] > 0 else 0
            ),
            "top_regions": region_ranking[:5],
        }

        del self.windows[window_key]
        return result

Mission 3: アーキテクチャ設計書の作成(30分)

NetShop社のリアルタイムデータ基盤のアーキテクチャ設計書を作成してください。

要件

  1. バッチ処理とストリーミング処理の統合アーキテクチャ(Lambda or Kappa)の選択と理由
  2. トピック設計(トピック名、パーティション数、保持期間)
  3. Consumer Group設計と障害時の対応
解答例

アーキテクチャ選択: Lambda(段階的にKappaへ移行)

理由:

  • 既存のバッチパイプライン(Airflow + dbt)を活かす
  • リアルタイム機能を段階的に追加
  • 正確な日次レポートはバッチで担保

トピック設計:

トピックパーティション数保持期間キー
orders1230日customer_id
inventory67日product_id
customer-behavior243日customer_id
orders-dlq390日order_id
alerts37日alert_type

Consumer Group設計:

Groupトピックインスタンス数処理内容
order-aggregatororders4リアルタイム売上集計
fraud-detectororders3不正注文検知
inventory-updaterinventory2在庫数更新
behavior-trackercustomer-behavior6行動分析
batch-sinkorders, inventory2S3へのバッチ用データ蓄積

障害時の対応:

  • Consumer障害: Consumer Groupのリバランスで自動復旧
  • Broker障害: レプリケーションによるLeader切り替え
  • 処理エラー: DLQに退避 + アラート発報

達成度チェック

  • 3種類のイベントスキーマとバリデーション付きProducerを設計できた
  • ウィンドウ集約と異常検知を含むリアルタイム処理エンジンを実装できた
  • バッチとストリーミングの統合アーキテクチャを設計できた
  • トピック設計とConsumer Group設計を適切に行えた
  • 障害時の対応策を定義できた

推定所要時間:90分