ストリーム処理
田中VPoE「Kafkaでメッセージの送受信ができるようになった。次はストリームデータの処理パターンを学ぼう。フィルタリング、集約、結合、そしてウィンドウ処理の具体的な実装だ。」
あなた「リアルタイムで注文データを集計して、異常検知やダッシュボード更新に使うイメージですね。」
田中VPoE「その通り。ストリーム処理の設計パターンを身につければ、リアルタイムアプリケーションの幅が大きく広がる。」
ストリーム処理のパターン
1. フィルタリング
def filter_high_value_orders(event: dict) -> bool:
"""高額注文のみを抽出する"""
return event.get("total_amount", 0) >= 10000
2. マッピング(変換)
def enrich_order(event: dict) -> dict:
"""注文イベントにカテゴリ情報を追加する"""
amount = event["total_amount"]
event["order_tier"] = (
"premium" if amount >= 50000
else "standard" if amount >= 10000
else "basic"
)
return event
3. ウィンドウ集約
from collections import defaultdict
from datetime import datetime, timedelta
class TumblingWindowAggregator:
"""タンブリングウィンドウによるリアルタイム集約"""
def __init__(self, window_size_seconds: int = 300):
self.window_size = timedelta(seconds=window_size_seconds)
self.windows = defaultdict(lambda: {"count": 0, "total": 0})
def _get_window_key(self, timestamp: datetime) -> str:
window_start = timestamp - timedelta(
seconds=timestamp.timestamp() % self.window_size.total_seconds()
)
return window_start.isoformat()
def add(self, event: dict):
ts = datetime.fromisoformat(event["timestamp"])
key = self._get_window_key(ts)
self.windows[key]["count"] += 1
self.windows[key]["total"] += event["total_amount"]
def get_results(self) -> dict:
return dict(self.windows)
4. ストリーム結合
class StreamJoiner:
"""2つのストリームを結合する"""
def __init__(self, join_window_seconds: int = 60):
self.left_buffer = {}
self.right_buffer = {}
self.join_window = timedelta(seconds=join_window_seconds)
def add_left(self, key: str, event: dict):
self.left_buffer[key] = event
return self._try_join(key)
def add_right(self, key: str, event: dict):
self.right_buffer[key] = event
return self._try_join(key)
def _try_join(self, key: str):
if key in self.left_buffer and key in self.right_buffer:
result = {**self.left_buffer[key], **self.right_buffer[key]}
del self.left_buffer[key]
del self.right_buffer[key]
return result
return None
状態管理
ストリーム処理では、ウィンドウ集約やカウンターなどの状態を管理する必要があります。
| 方式 | 説明 | 利点 | 欠点 |
|---|---|---|---|
| インメモリ | アプリケーション内のメモリに保持 | 高速 | 障害時にデータ喪失 |
| 外部ストア | Redis、DynamoDB等に保存 | 耐障害性 | レイテンシ増加 |
| チェックポイント | 定期的にスナップショットを保存 | バランスが良い | 復旧に時間がかかる場合あり |
import redis
class StatefulProcessor:
"""Redis-backed stateful stream processor"""
def __init__(self):
self.store = redis.Redis(host="localhost", port=6379)
def process_order(self, event: dict):
customer_id = event["customer_id"]
amount = event["total_amount"]
# 顧客ごとの累計を更新
self.store.incr(f"customer:{customer_id}:order_count")
self.store.incrbyfloat(f"customer:{customer_id}:total_amount", amount)
# リアルタイム異常検知
total = float(self.store.get(f"customer:{customer_id}:total_amount") or 0)
if total > 1000000:
self._alert_high_spending(customer_id, total)
エラーハンドリング
ストリーム処理のエラーハンドリングパターンは以下の3つが基本です。
| パターン | 動作 | ユースケース |
|---|---|---|
| Skip & Log | エラーを記録してスキップ | ログ集計、非クリティカル処理 |
| Dead Letter Queue | 失敗メッセージを別トピックへ | 後で再処理が必要なケース |
| Retry with Backoff | リトライ後に失敗したらDLQ | API呼び出し等の一時障害 |
def process_with_dlq(consumer, dlq_producer):
"""DLQ付きストリーム処理"""
msg = consumer.poll(timeout=1.0)
if msg is None or msg.error():
return
try:
event = json.loads(msg.value().decode())
process_event(event)
consumer.commit(msg)
except Exception as e:
# DLQに送信
dlq_producer.produce(
topic="orders-dlq",
key=msg.key(),
value=msg.value(),
headers={"error": str(e).encode()},
)
consumer.commit(msg)
まとめ
| 項目 | ポイント |
|---|---|
| 処理パターン | フィルタリング、マッピング、ウィンドウ集約、ストリーム結合 |
| 状態管理 | インメモリ、外部ストア、チェックポイントの3方式 |
| エラーハンドリング | Skip & Log、DLQ、Retry with Backoffの3パターン |
チェックリスト
- ストリーム処理の4つの基本パターンを実装できる
- ウィンドウ集約の仕組みと実装方法を理解している
- 状態管理の3つの方式とトレードオフを説明できる
- ストリーム処理のエラーハンドリングパターンを使い分けられる
次のステップへ
ストリーム処理パターンを学びました。次はLambda/Kappaアーキテクチャを学び、バッチとストリーミングの統合を理解しましょう。
推定読了時間:30分