LESSON 25分

ストーリー

あなた
お客さんから、注文が二重になったとクレームが…
高橋アーキテクト
ネットワーク障害でリトライが発生し、同じ注文イベントが2回処理されたんだ。分散システムでは”少なくとも1回”の配信は保証できるが、“正確に1回”は非常に難しい。だから冪等性が重要なんだ

メッセージ配信の保証レベル

レベル意味実現の難しさ
At-most-once最大1回。ロスの可能性あり簡単(Fire and Forget)
At-least-once少なくとも1回。重複の可能性あり中程度(ACK + リトライ)
Exactly-once正確に1回。ロスも重複もなし非常に難しい
graph LR
    subgraph "At-most-once"
        P1["Producer"] --> B1["Broker"] --> C1["Consumer"]
    end

    subgraph "At-least-once"
        P2["Producer"] --> B2["Broker"] --> C2["Consumer"]
        C2 -->|"ACK"| B2
        B2 -.->|"ACK失敗→リトライ→重複"| C2
    end

    subgraph "Exactly-once"
        P3["Producer"] --> B3["Broker"] --> C3["Consumer<br/>(冪等な処理)"]
        C3 -->|"ACK"| B3
    end

冪等性(Idempotency)とは

同じ操作を何回実行しても結果が変わらない性質です。

// 冪等な操作の例
// SET: user.email = "new@example.com" → 何回実行しても同じ結果
// DELETE: 注文ID=123を削除 → 2回目以降は何も起きない

// 冪等でない操作の例
// INCREMENT: user.balance += 1000 → 実行回数分だけ増える
// INSERT: 新しい注文レコードを作成 → 実行回数分だけレコードが増える

冪等性の実装パターン

1. 冪等キー(Idempotency Key)

// クライアントが一意なキーを付与
// 同じキーのリクエストは2回目以降を無視
class IdempotentProcessor {
  constructor(private store: KeyValueStore) {}

  async process(idempotencyKey: string, handler: () => Promise<Result>): Promise<Result> {
    // 既に処理済みか確認
    const existing = await this.store.get(`idempotent:${idempotencyKey}`);
    if (existing) {
      console.log(`Duplicate detected: ${idempotencyKey}`);
      return JSON.parse(existing);  // 前回の結果を返す
    }

    // 処理を実行
    const result = await handler();

    // 結果を保存(TTL付き)
    await this.store.set(
      `idempotent:${idempotencyKey}`,
      JSON.stringify(result),
      { ttl: 86400 }  // 24時間保持
    );

    return result;
  }
}

// 使用例
app.post("/api/orders", async (req, res) => {
  const idempotencyKey = req.headers["idempotency-key"];
  if (!idempotencyKey) {
    return res.status(400).json({ error: "Idempotency-Key header required" });
  }

  const result = await idempotentProcessor.process(idempotencyKey, async () => {
    return await orderService.createOrder(req.body);
  });

  res.json(result);
});

2. イベントIDによるデデュプリケーション

// イベントのIDで重複を検知
class EventHandler {
  constructor(private processedEventStore: Set<string>) {}

  async handleEvent(event: DomainEvent): Promise<void> {
    // イベントIDで重複チェック
    if (await this.isProcessed(event.metadata.eventId)) {
      console.log(`Event already processed: ${event.metadata.eventId}`);
      return; // スキップ
    }

    // イベントを処理
    await this.processEvent(event);

    // 処理済みとして記録
    await this.markAsProcessed(event.metadata.eventId);
  }

  private async isProcessed(eventId: string): Promise<boolean> {
    return this.processedEventStore.has(eventId);
  }

  private async markAsProcessed(eventId: string): Promise<void> {
    this.processedEventStore.add(eventId);
  }
}

3. 条件付き更新(Optimistic Concurrency)

// バージョン番号で冪等性を保証
async function updateOrderStatus(
  orderId: string,
  newStatus: string,
  expectedVersion: number
): Promise<void> {
  const result = await db.query(
    `UPDATE orders
     SET status = $1, version = version + 1
     WHERE id = $2 AND version = $3`,
    [newStatus, orderId, expectedVersion]
  );

  if (result.rowCount === 0) {
    // バージョンが一致しない → 既に別の更新が適用済み
    throw new OptimisticLockError("Order was already updated");
  }
}

順序保証

なぜ順序が重要か

graph LR
    subgraph "正しい順序"
        E1["1. order.created"] --> E2["2. order.paid"] --> E3["3. order.cancelled"]
    end

    subgraph "順序が入れ替わった場合"
        F3["3. cancelled"] --> F2["2. paid"] --> F1["1. created"]
        F1 -.->|"最終状態が created に<br/>(本来は cancelled)"| BUG["バグ"]
    end

    classDef error fill:#fdd,stroke:#c33,color:#900
    class BUG error

Kafkaでの順序保証

// Kafkaは同一Partition内の順序を保証
// → 同じエンティティのイベントを同じPartitionに送る

await producer.send({
  topic: "order-events",
  messages: [{
    key: orderId,  // 同じorderIdのイベントは同じPartitionへ
    value: JSON.stringify(event),
  }],
});

// 注意: 異なるPartition間の順序は保証されない
// → 異なるorderIdのイベント同士は順不同

シーケンス番号による順序制御

// Consumer側でシーケンス番号を確認
class OrderedEventProcessor {
  private lastSequence: Map<string, number> = new Map();

  async process(event: SequencedEvent): Promise<void> {
    const entityId = event.entityId;
    const expectedSeq = (this.lastSequence.get(entityId) || 0) + 1;

    if (event.sequenceNumber < expectedSeq) {
      // 既に処理済み(重複)→ スキップ
      return;
    }

    if (event.sequenceNumber > expectedSeq) {
      // 順序が飛んでいる → バッファリングして待機
      await this.buffer(event);
      return;
    }

    // 期待通りの順序 → 処理実行
    await this.handleEvent(event);
    this.lastSequence.set(entityId, event.sequenceNumber);

    // バッファから次のイベントを取り出して処理
    await this.processBuffered(entityId);
  }
}

まとめ

ポイント内容
配信保証At-most-once, At-least-once, Exactly-once
冪等性同じ操作を何回実行しても結果が同じ
実装パターン冪等キー、デデュプリケーション、条件付き更新
順序保証Kafkaのpartition key、シーケンス番号

チェックリスト

  • 3つの配信保証レベルを説明できる
  • 冪等性の意味と必要性を理解した
  • 冪等キーによる重複排除を実装できる
  • Kafkaでの順序保証の仕組みを理解した

次のステップへ

次は演習で、イベント駆動システムを実際に設計してみましょう。ここまでの知識を統合して、実践的なアーキテクチャを描きます。


推定読了時間: 25分