LESSON 40分

ストーリー

佐藤CTO
バッチ処理では数時間前のデータしか見られない。ビジネスは”今”起きていることを知りたい
佐藤CTO
ストリーム処理は、データが発生した瞬間に処理する。Kafka Streams、Flink、ウィンドウ処理、そしてexactly-onceセマンティクスの設計を理解しよう

ストリーム処理の基本概念

概念説明
イベント時間 (Event Time)イベントが実際に発生した時刻
処理時間 (Processing Time)システムがイベントを処理した時刻
ウォーターマーク (Watermark)「この時刻以前のイベントは全て到着済み」という目安
レイトデータウォーターマーク後に到着した遅延イベント
graph LR
    ET["●<br/>Event Time<br/>(ビジネス上の正確な時刻)"]
    DELAY["ネットワーク遅延"]
    PT["●<br/>Processing Time<br/>(システムが受け取った時刻)"]

    ET -.->|"遅延"| DELAY -.-> PT

    style ET fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
    style PT fill:#fff3e0,stroke:#e65100,stroke-width:2px
    style DELAY fill:#fce4ec,stroke:#c62828

Apache Kafka によるストリーム基盤

Kafka の基本構成

graph LR
    P["Producer"]
    subgraph Topic["Topic: order-events"]
        P0["P0"]
        P1["P1"]
        P2["P2"]
    end
    CGA["Consumer Group A"]
    CGB["Consumer Group B"]

    P --> Topic
    Topic --> CGA
    Topic --> CGB

    style Topic fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
    style P fill:#e3f2fd,stroke:#1565c0
    style CGA fill:#fff3e0,stroke:#e65100
    style CGB fill:#fff3e0,stroke:#e65100

Partition = 並列処理の単位。Consumer Group内で分散消費。

Kafka Streams による処理

// Kafka Streams (TypeScript / kafkajs + ストリーム処理の概念)

import { Kafka, EachMessagePayload } from 'kafkajs';

interface OrderEvent {
  orderId: string;
  customerId: string;
  amount: number;
  category: string;
  eventTime: string;
}

interface CustomerMetrics {
  customerId: string;
  totalOrders: number;
  totalSpent: number;
  lastOrderTime: string;
  averageOrderValue: number;
}

// ストリーム処理: 顧客メトリクスのリアルタイム集計
class OrderStreamProcessor {
  private metrics: Map<string, CustomerMetrics> = new Map();
  private kafka: Kafka;

  constructor() {
    this.kafka = new Kafka({
      clientId: 'order-processor',
      brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
    });
  }

  async start(): Promise<void> {
    const consumer = this.kafka.consumer({ groupId: 'metrics-processor' });
    const producer = this.kafka.producer({
      idempotent: true,  // 冪等プロデューサー(exactly-once の第一歩)
    });

    await consumer.connect();
    await producer.connect();

    await consumer.subscribe({ topic: 'order-events', fromBeginning: false });

    await consumer.run({
      eachMessage: async ({ message }: EachMessagePayload) => {
        const event: OrderEvent = JSON.parse(message.value!.toString());

        // 集約処理
        const current = this.metrics.get(event.customerId) || {
          customerId: event.customerId,
          totalOrders: 0,
          totalSpent: 0,
          lastOrderTime: '',
          averageOrderValue: 0,
        };

        current.totalOrders += 1;
        current.totalSpent += event.amount;
        current.lastOrderTime = event.eventTime;
        current.averageOrderValue = current.totalSpent / current.totalOrders;

        this.metrics.set(event.customerId, current);

        // 結果を出力トピックに送信
        await producer.send({
          topic: 'customer-metrics',
          messages: [{
            key: event.customerId,
            value: JSON.stringify(current),
          }],
        });
      },
    });
  }
}

ウィンドウ処理

ウィンドウの種類

gantt
    title Tumbling Window(固定ウィンドウ)
    dateFormat X
    axisFormat %s
    section Windows
        window 1 : 0, 5
        window 2 : 5, 10
        window 3 : 10, 15
gantt
    title Sliding Window(スライディングウィンドウ、スライド間隔: 1分)
    dateFormat X
    axisFormat %s
    section Windows
        window 1 : 0, 5
        window 2 : 1, 6
        window 3 : 2, 7
gantt
    title Session Window(セッションウィンドウ)
    dateFormat X
    axisFormat %s
    section Sessions
        session 1 : 0, 3
        gap       : crit, 3, 5
        session 2 : 5, 9
        gap       : crit, 9, 11
        session 3 : 11, 13

イベント間隔がギャップを超えると新しいセッションが開始される。

# PyFlink によるウィンドウ集計

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble, Slide, Session

# 環境設定
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
t_env = StreamTableEnvironment.create(env)

# Kafka ソースの定義
t_env.execute_sql("""
    CREATE TABLE order_events (
        order_id STRING,
        customer_id STRING,
        amount DECIMAL(10, 2),
        category STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'order-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-analytics',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""")

# Tumbling Window: 5分ごとのカテゴリ別売上集計
t_env.execute_sql("""
    SELECT
        TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
        TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
        category,
        COUNT(*) AS order_count,
        SUM(amount) AS total_revenue,
        AVG(amount) AS avg_order_value
    FROM order_events
    GROUP BY
        TUMBLE(event_time, INTERVAL '5' MINUTE),
        category
""")

# Sliding Window: 1時間ウィンドウ、10分スライド(移動平均)
t_env.execute_sql("""
    SELECT
        HOP_START(event_time, INTERVAL '10' MINUTE, INTERVAL '1' HOUR) AS window_start,
        category,
        AVG(amount) AS moving_avg_revenue,
        COUNT(*) AS order_count_1h
    FROM order_events
    GROUP BY
        HOP(event_time, INTERVAL '10' MINUTE, INTERVAL '1' HOUR),
        category
""")

# Session Window: 30分ギャップでセッション分析
t_env.execute_sql("""
    SELECT
        SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
        customer_id,
        COUNT(*) AS orders_in_session,
        SUM(amount) AS session_total,
        TIMESTAMPDIFF(MINUTE,
            SESSION_START(event_time, INTERVAL '30' MINUTE),
            SESSION_END(event_time, INTERVAL '30' MINUTE)
        ) AS session_duration_minutes
    FROM order_events
    GROUP BY
        SESSION(event_time, INTERVAL '30' MINUTE),
        customer_id
""")

Exactly-Once セマンティクス

デリバリーセマンティクスの比較

セマンティクス意味実現方法
At-most-once最大1回(欠損あり)Fire-and-forget
At-least-once最低1回(重複あり)ACK + リトライ
Exactly-once正確に1回冪等性 + トランザクション

Exactly-Once の実装パターン

// パターン1: 冪等プロデューサー + トランザクション
class ExactlyOnceProcessor {
  async processWithTransaction(
    consumer: Consumer,
    producer: Producer,
    processFunc: (event: OrderEvent) => CustomerMetrics
  ): Promise<void> {
    const transaction = await producer.transaction();

    try {
      const messages = await consumer.poll();

      for (const message of messages) {
        const event = JSON.parse(message.value);
        const result = processFunc(event);

        // 処理結果の送信とオフセットコミットをアトミックに実行
        await transaction.send({
          topic: 'customer-metrics',
          messages: [{ key: result.customerId, value: JSON.stringify(result) }],
        });
      }

      // オフセットもトランザクション内でコミット
      await transaction.sendOffsets({
        consumerGroupId: 'metrics-processor',
        topics: [{ topic: 'order-events', partitions: messages.map(m => ({
          partition: m.partition,
          offset: (parseInt(m.offset) + 1).toString(),
        })) }],
      });

      await transaction.commit();
    } catch (error) {
      await transaction.abort();
      throw error;
    }
  }
}

// パターン2: 冪等コンシューマー(外部ストアへの書き込み)
class IdempotentSinkProcessor {
  async processToDatabase(event: OrderEvent): Promise<void> {
    // UPSERT で冪等性を保証
    await db.query(`
      INSERT INTO customer_metrics (customer_id, total_orders, total_spent, updated_at)
      VALUES ($1, 1, $2, NOW())
      ON CONFLICT (customer_id)
      DO UPDATE SET
        total_orders = customer_metrics.total_orders + 1,
        total_spent = customer_metrics.total_spent + $2,
        updated_at = NOW()
      WHERE customer_metrics.last_processed_offset < $3
    `, [event.customerId, event.amount, event.offset]);
  }
}
Exactly-Once が本当に必要か?

Exactly-Once は実装コストが高いため、本当に必要な場面を見極めることが重要です。

ユースケース推奨セマンティクス理由
決済処理Exactly-once二重課金は致命的
ログ集計At-least-once + 重複排除多少の誤差は許容
メトリクス計算At-least-once近似値で十分
アラート通知At-least-once通知漏れより重複が安全
在庫引当Exactly-once在庫の整合性が重要

多くの場合、At-least-once + 冪等コンシューマーで十分です。


まとめ

ポイント内容
Event Time vs Processing Timeビジネスの正確性にはEvent Timeが必要
ウィンドウ処理Tumbling/Sliding/Sessionを用途で使い分け
ウォーターマーク遅延データの許容範囲を定義する
Exactly-Once冪等性 + トランザクションで実現、本当に必要か判断する

チェックリスト

  • Event TimeとProcessing Timeの違いを説明できる
  • 3種類のウィンドウ処理の特性を理解した
  • Exactly-Once セマンティクスの実装パターンを理解した
  • ストリーム処理が適切な場面を判断できる

次のステップへ

次はETL/ELTパイプライン設計を学び、データ変換と統合の実践的なアーキテクチャを理解します。


推定読了時間: 40分