ストーリー
メッセージ配信の保証レベル
| レベル | 意味 | 実現の難しさ |
|---|---|---|
| At-most-once | 最大1回。ロスの可能性あり | 簡単(Fire and Forget) |
| At-least-once | 少なくとも1回。重複の可能性あり | 中程度(ACK + リトライ) |
| Exactly-once | 正確に1回。ロスも重複もなし | 非常に難しい |
graph LR
subgraph "At-most-once"
P1["Producer"] --> B1["Broker"] --> C1["Consumer"]
end
subgraph "At-least-once"
P2["Producer"] --> B2["Broker"] --> C2["Consumer"]
C2 -->|"ACK"| B2
B2 -.->|"ACK失敗→リトライ→重複"| C2
end
subgraph "Exactly-once"
P3["Producer"] --> B3["Broker"] --> C3["Consumer<br/>(冪等な処理)"]
C3 -->|"ACK"| B3
end
冪等性(Idempotency)とは
同じ操作を何回実行しても結果が変わらない性質です。
// 冪等な操作の例
// SET: user.email = "new@example.com" → 何回実行しても同じ結果
// DELETE: 注文ID=123を削除 → 2回目以降は何も起きない
// 冪等でない操作の例
// INCREMENT: user.balance += 1000 → 実行回数分だけ増える
// INSERT: 新しい注文レコードを作成 → 実行回数分だけレコードが増える
冪等性の実装パターン
1. 冪等キー(Idempotency Key)
// クライアントが一意なキーを付与
// 同じキーのリクエストは2回目以降を無視
class IdempotentProcessor {
constructor(private store: KeyValueStore) {}
async process(idempotencyKey: string, handler: () => Promise<Result>): Promise<Result> {
// 既に処理済みか確認
const existing = await this.store.get(`idempotent:${idempotencyKey}`);
if (existing) {
console.log(`Duplicate detected: ${idempotencyKey}`);
return JSON.parse(existing); // 前回の結果を返す
}
// 処理を実行
const result = await handler();
// 結果を保存(TTL付き)
await this.store.set(
`idempotent:${idempotencyKey}`,
JSON.stringify(result),
{ ttl: 86400 } // 24時間保持
);
return result;
}
}
// 使用例
app.post("/api/orders", async (req, res) => {
const idempotencyKey = req.headers["idempotency-key"];
if (!idempotencyKey) {
return res.status(400).json({ error: "Idempotency-Key header required" });
}
const result = await idempotentProcessor.process(idempotencyKey, async () => {
return await orderService.createOrder(req.body);
});
res.json(result);
});
2. イベントIDによるデデュプリケーション
// イベントのIDで重複を検知
class EventHandler {
constructor(private processedEventStore: Set<string>) {}
async handleEvent(event: DomainEvent): Promise<void> {
// イベントIDで重複チェック
if (await this.isProcessed(event.metadata.eventId)) {
console.log(`Event already processed: ${event.metadata.eventId}`);
return; // スキップ
}
// イベントを処理
await this.processEvent(event);
// 処理済みとして記録
await this.markAsProcessed(event.metadata.eventId);
}
private async isProcessed(eventId: string): Promise<boolean> {
return this.processedEventStore.has(eventId);
}
private async markAsProcessed(eventId: string): Promise<void> {
this.processedEventStore.add(eventId);
}
}
3. 条件付き更新(Optimistic Concurrency)
// バージョン番号で冪等性を保証
async function updateOrderStatus(
orderId: string,
newStatus: string,
expectedVersion: number
): Promise<void> {
const result = await db.query(
`UPDATE orders
SET status = $1, version = version + 1
WHERE id = $2 AND version = $3`,
[newStatus, orderId, expectedVersion]
);
if (result.rowCount === 0) {
// バージョンが一致しない → 既に別の更新が適用済み
throw new OptimisticLockError("Order was already updated");
}
}
順序保証
なぜ順序が重要か
graph LR
subgraph "正しい順序"
E1["1. order.created"] --> E2["2. order.paid"] --> E3["3. order.cancelled"]
end
subgraph "順序が入れ替わった場合"
F3["3. cancelled"] --> F2["2. paid"] --> F1["1. created"]
F1 -.->|"最終状態が created に<br/>(本来は cancelled)"| BUG["バグ"]
end
classDef error fill:#fdd,stroke:#c33,color:#900
class BUG error
Kafkaでの順序保証
// Kafkaは同一Partition内の順序を保証
// → 同じエンティティのイベントを同じPartitionに送る
await producer.send({
topic: "order-events",
messages: [{
key: orderId, // 同じorderIdのイベントは同じPartitionへ
value: JSON.stringify(event),
}],
});
// 注意: 異なるPartition間の順序は保証されない
// → 異なるorderIdのイベント同士は順不同
シーケンス番号による順序制御
// Consumer側でシーケンス番号を確認
class OrderedEventProcessor {
private lastSequence: Map<string, number> = new Map();
async process(event: SequencedEvent): Promise<void> {
const entityId = event.entityId;
const expectedSeq = (this.lastSequence.get(entityId) || 0) + 1;
if (event.sequenceNumber < expectedSeq) {
// 既に処理済み(重複)→ スキップ
return;
}
if (event.sequenceNumber > expectedSeq) {
// 順序が飛んでいる → バッファリングして待機
await this.buffer(event);
return;
}
// 期待通りの順序 → 処理実行
await this.handleEvent(event);
this.lastSequence.set(entityId, event.sequenceNumber);
// バッファから次のイベントを取り出して処理
await this.processBuffered(entityId);
}
}
まとめ
| ポイント | 内容 |
|---|---|
| 配信保証 | At-most-once, At-least-once, Exactly-once |
| 冪等性 | 同じ操作を何回実行しても結果が同じ |
| 実装パターン | 冪等キー、デデュプリケーション、条件付き更新 |
| 順序保証 | Kafkaのpartition key、シーケンス番号 |
チェックリスト
- 3つの配信保証レベルを説明できる
- 冪等性の意味と必要性を理解した
- 冪等キーによる重複排除を実装できる
- Kafkaでの順序保証の仕組みを理解した
次のステップへ
次は演習で、イベント駆動システムを実際に設計してみましょう。ここまでの知識を統合して、実践的なアーキテクチャを描きます。
推定読了時間: 25分