ストーリー
ストリーム処理の基本概念
| 概念 | 説明 |
|---|---|
| イベント時間 (Event Time) | イベントが実際に発生した時刻 |
| 処理時間 (Processing Time) | システムがイベントを処理した時刻 |
| ウォーターマーク (Watermark) | 「この時刻以前のイベントは全て到着済み」という目安 |
| レイトデータ | ウォーターマーク後に到着した遅延イベント |
graph LR
ET["●<br/>Event Time<br/>(ビジネス上の正確な時刻)"]
DELAY["ネットワーク遅延"]
PT["●<br/>Processing Time<br/>(システムが受け取った時刻)"]
ET -.->|"遅延"| DELAY -.-> PT
style ET fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style PT fill:#fff3e0,stroke:#e65100,stroke-width:2px
style DELAY fill:#fce4ec,stroke:#c62828
Apache Kafka によるストリーム基盤
Kafka の基本構成
graph LR
P["Producer"]
subgraph Topic["Topic: order-events"]
P0["P0"]
P1["P1"]
P2["P2"]
end
CGA["Consumer Group A"]
CGB["Consumer Group B"]
P --> Topic
Topic --> CGA
Topic --> CGB
style Topic fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style P fill:#e3f2fd,stroke:#1565c0
style CGA fill:#fff3e0,stroke:#e65100
style CGB fill:#fff3e0,stroke:#e65100
Partition = 並列処理の単位。Consumer Group内で分散消費。
Kafka Streams による処理
// Kafka Streams (TypeScript / kafkajs + ストリーム処理の概念)
import { Kafka, EachMessagePayload } from 'kafkajs';
interface OrderEvent {
orderId: string;
customerId: string;
amount: number;
category: string;
eventTime: string;
}
interface CustomerMetrics {
customerId: string;
totalOrders: number;
totalSpent: number;
lastOrderTime: string;
averageOrderValue: number;
}
// ストリーム処理: 顧客メトリクスのリアルタイム集計
class OrderStreamProcessor {
private metrics: Map<string, CustomerMetrics> = new Map();
private kafka: Kafka;
constructor() {
this.kafka = new Kafka({
clientId: 'order-processor',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});
}
async start(): Promise<void> {
const consumer = this.kafka.consumer({ groupId: 'metrics-processor' });
const producer = this.kafka.producer({
idempotent: true, // 冪等プロデューサー(exactly-once の第一歩)
});
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: 'order-events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }: EachMessagePayload) => {
const event: OrderEvent = JSON.parse(message.value!.toString());
// 集約処理
const current = this.metrics.get(event.customerId) || {
customerId: event.customerId,
totalOrders: 0,
totalSpent: 0,
lastOrderTime: '',
averageOrderValue: 0,
};
current.totalOrders += 1;
current.totalSpent += event.amount;
current.lastOrderTime = event.eventTime;
current.averageOrderValue = current.totalSpent / current.totalOrders;
this.metrics.set(event.customerId, current);
// 結果を出力トピックに送信
await producer.send({
topic: 'customer-metrics',
messages: [{
key: event.customerId,
value: JSON.stringify(current),
}],
});
},
});
}
}
ウィンドウ処理
ウィンドウの種類
gantt
title Tumbling Window(固定ウィンドウ)
dateFormat X
axisFormat %s
section Windows
window 1 : 0, 5
window 2 : 5, 10
window 3 : 10, 15
gantt
title Sliding Window(スライディングウィンドウ、スライド間隔: 1分)
dateFormat X
axisFormat %s
section Windows
window 1 : 0, 5
window 2 : 1, 6
window 3 : 2, 7
gantt
title Session Window(セッションウィンドウ)
dateFormat X
axisFormat %s
section Sessions
session 1 : 0, 3
gap : crit, 3, 5
session 2 : 5, 9
gap : crit, 9, 11
session 3 : 11, 13
イベント間隔がギャップを超えると新しいセッションが開始される。
Apache Flink によるウィンドウ処理
# PyFlink によるウィンドウ集計
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble, Slide, Session
# 環境設定
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
t_env = StreamTableEnvironment.create(env)
# Kafka ソースの定義
t_env.execute_sql("""
CREATE TABLE order_events (
order_id STRING,
customer_id STRING,
amount DECIMAL(10, 2),
category STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'order-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-analytics',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Tumbling Window: 5分ごとのカテゴリ別売上集計
t_env.execute_sql("""
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
category,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM order_events
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTE),
category
""")
# Sliding Window: 1時間ウィンドウ、10分スライド(移動平均)
t_env.execute_sql("""
SELECT
HOP_START(event_time, INTERVAL '10' MINUTE, INTERVAL '1' HOUR) AS window_start,
category,
AVG(amount) AS moving_avg_revenue,
COUNT(*) AS order_count_1h
FROM order_events
GROUP BY
HOP(event_time, INTERVAL '10' MINUTE, INTERVAL '1' HOUR),
category
""")
# Session Window: 30分ギャップでセッション分析
t_env.execute_sql("""
SELECT
SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
customer_id,
COUNT(*) AS orders_in_session,
SUM(amount) AS session_total,
TIMESTAMPDIFF(MINUTE,
SESSION_START(event_time, INTERVAL '30' MINUTE),
SESSION_END(event_time, INTERVAL '30' MINUTE)
) AS session_duration_minutes
FROM order_events
GROUP BY
SESSION(event_time, INTERVAL '30' MINUTE),
customer_id
""")
Exactly-Once セマンティクス
デリバリーセマンティクスの比較
| セマンティクス | 意味 | 実現方法 |
|---|---|---|
| At-most-once | 最大1回(欠損あり) | Fire-and-forget |
| At-least-once | 最低1回(重複あり) | ACK + リトライ |
| Exactly-once | 正確に1回 | 冪等性 + トランザクション |
Exactly-Once の実装パターン
// パターン1: 冪等プロデューサー + トランザクション
class ExactlyOnceProcessor {
async processWithTransaction(
consumer: Consumer,
producer: Producer,
processFunc: (event: OrderEvent) => CustomerMetrics
): Promise<void> {
const transaction = await producer.transaction();
try {
const messages = await consumer.poll();
for (const message of messages) {
const event = JSON.parse(message.value);
const result = processFunc(event);
// 処理結果の送信とオフセットコミットをアトミックに実行
await transaction.send({
topic: 'customer-metrics',
messages: [{ key: result.customerId, value: JSON.stringify(result) }],
});
}
// オフセットもトランザクション内でコミット
await transaction.sendOffsets({
consumerGroupId: 'metrics-processor',
topics: [{ topic: 'order-events', partitions: messages.map(m => ({
partition: m.partition,
offset: (parseInt(m.offset) + 1).toString(),
})) }],
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
}
// パターン2: 冪等コンシューマー(外部ストアへの書き込み)
class IdempotentSinkProcessor {
async processToDatabase(event: OrderEvent): Promise<void> {
// UPSERT で冪等性を保証
await db.query(`
INSERT INTO customer_metrics (customer_id, total_orders, total_spent, updated_at)
VALUES ($1, 1, $2, NOW())
ON CONFLICT (customer_id)
DO UPDATE SET
total_orders = customer_metrics.total_orders + 1,
total_spent = customer_metrics.total_spent + $2,
updated_at = NOW()
WHERE customer_metrics.last_processed_offset < $3
`, [event.customerId, event.amount, event.offset]);
}
}
Exactly-Once が本当に必要か?
Exactly-Once は実装コストが高いため、本当に必要な場面を見極めることが重要です。
| ユースケース | 推奨セマンティクス | 理由 |
|---|---|---|
| 決済処理 | Exactly-once | 二重課金は致命的 |
| ログ集計 | At-least-once + 重複排除 | 多少の誤差は許容 |
| メトリクス計算 | At-least-once | 近似値で十分 |
| アラート通知 | At-least-once | 通知漏れより重複が安全 |
| 在庫引当 | Exactly-once | 在庫の整合性が重要 |
多くの場合、At-least-once + 冪等コンシューマーで十分です。
まとめ
| ポイント | 内容 |
|---|---|
| Event Time vs Processing Time | ビジネスの正確性にはEvent Timeが必要 |
| ウィンドウ処理 | Tumbling/Sliding/Sessionを用途で使い分け |
| ウォーターマーク | 遅延データの許容範囲を定義する |
| Exactly-Once | 冪等性 + トランザクションで実現、本当に必要か判断する |
チェックリスト
- Event TimeとProcessing Timeの違いを説明できる
- 3種類のウィンドウ処理の特性を理解した
- Exactly-Once セマンティクスの実装パターンを理解した
- ストリーム処理が適切な場面を判断できる
次のステップへ
次はETL/ELTパイプライン設計を学び、データ変換と統合の実践的なアーキテクチャを理解します。
推定読了時間: 40分