LESSON 30分

ストリーム処理

田中VPoE「Kafkaでメッセージの送受信ができるようになった。次はストリームデータの処理パターンを学ぼう。フィルタリング、集約、結合、そしてウィンドウ処理の具体的な実装だ。」

あなた「リアルタイムで注文データを集計して、異常検知やダッシュボード更新に使うイメージですね。」

田中VPoE「その通り。ストリーム処理の設計パターンを身につければ、リアルタイムアプリケーションの幅が大きく広がる。」

ストリーム処理のパターン

1. フィルタリング

def filter_high_value_orders(event: dict) -> bool:
    """高額注文のみを抽出する"""
    return event.get("total_amount", 0) >= 10000

2. マッピング(変換)

def enrich_order(event: dict) -> dict:
    """注文イベントにカテゴリ情報を追加する"""
    amount = event["total_amount"]
    event["order_tier"] = (
        "premium" if amount >= 50000
        else "standard" if amount >= 10000
        else "basic"
    )
    return event

3. ウィンドウ集約

from collections import defaultdict
from datetime import datetime, timedelta

class TumblingWindowAggregator:
    """タンブリングウィンドウによるリアルタイム集約"""

    def __init__(self, window_size_seconds: int = 300):
        self.window_size = timedelta(seconds=window_size_seconds)
        self.windows = defaultdict(lambda: {"count": 0, "total": 0})

    def _get_window_key(self, timestamp: datetime) -> str:
        window_start = timestamp - timedelta(
            seconds=timestamp.timestamp() % self.window_size.total_seconds()
        )
        return window_start.isoformat()

    def add(self, event: dict):
        ts = datetime.fromisoformat(event["timestamp"])
        key = self._get_window_key(ts)
        self.windows[key]["count"] += 1
        self.windows[key]["total"] += event["total_amount"]

    def get_results(self) -> dict:
        return dict(self.windows)

4. ストリーム結合

class StreamJoiner:
    """2つのストリームを結合する"""

    def __init__(self, join_window_seconds: int = 60):
        self.left_buffer = {}
        self.right_buffer = {}
        self.join_window = timedelta(seconds=join_window_seconds)

    def add_left(self, key: str, event: dict):
        self.left_buffer[key] = event
        return self._try_join(key)

    def add_right(self, key: str, event: dict):
        self.right_buffer[key] = event
        return self._try_join(key)

    def _try_join(self, key: str):
        if key in self.left_buffer and key in self.right_buffer:
            result = {**self.left_buffer[key], **self.right_buffer[key]}
            del self.left_buffer[key]
            del self.right_buffer[key]
            return result
        return None

状態管理

ストリーム処理では、ウィンドウ集約やカウンターなどの状態を管理する必要があります。

方式説明利点欠点
インメモリアプリケーション内のメモリに保持高速障害時にデータ喪失
外部ストアRedis、DynamoDB等に保存耐障害性レイテンシ増加
チェックポイント定期的にスナップショットを保存バランスが良い復旧に時間がかかる場合あり
import redis

class StatefulProcessor:
    """Redis-backed stateful stream processor"""

    def __init__(self):
        self.store = redis.Redis(host="localhost", port=6379)

    def process_order(self, event: dict):
        customer_id = event["customer_id"]
        amount = event["total_amount"]

        # 顧客ごとの累計を更新
        self.store.incr(f"customer:{customer_id}:order_count")
        self.store.incrbyfloat(f"customer:{customer_id}:total_amount", amount)

        # リアルタイム異常検知
        total = float(self.store.get(f"customer:{customer_id}:total_amount") or 0)
        if total > 1000000:
            self._alert_high_spending(customer_id, total)

エラーハンドリング

ストリーム処理のエラーハンドリングパターンは以下の3つが基本です。

パターン動作ユースケース
Skip & Logエラーを記録してスキップログ集計、非クリティカル処理
Dead Letter Queue失敗メッセージを別トピックへ後で再処理が必要なケース
Retry with Backoffリトライ後に失敗したらDLQAPI呼び出し等の一時障害
def process_with_dlq(consumer, dlq_producer):
    """DLQ付きストリーム処理"""
    msg = consumer.poll(timeout=1.0)
    if msg is None or msg.error():
        return

    try:
        event = json.loads(msg.value().decode())
        process_event(event)
        consumer.commit(msg)
    except Exception as e:
        # DLQに送信
        dlq_producer.produce(
            topic="orders-dlq",
            key=msg.key(),
            value=msg.value(),
            headers={"error": str(e).encode()},
        )
        consumer.commit(msg)

まとめ

項目ポイント
処理パターンフィルタリング、マッピング、ウィンドウ集約、ストリーム結合
状態管理インメモリ、外部ストア、チェックポイントの3方式
エラーハンドリングSkip & Log、DLQ、Retry with Backoffの3パターン

チェックリスト

  • ストリーム処理の4つの基本パターンを実装できる
  • ウィンドウ集約の仕組みと実装方法を理解している
  • 状態管理の3つの方式とトレードオフを説明できる
  • ストリーム処理のエラーハンドリングパターンを使い分けられる

次のステップへ

ストリーム処理パターンを学びました。次はLambda/Kappaアーキテクチャを学び、バッチとストリーミングの統合を理解しましょう。


推定読了時間:30分