LESSON 40分

ストーリー

「この注文、なぜキャンセルされたんだ? ログを見てもステータスが『cancelled』になったことしかわからない」

高橋アーキテクトがデータベースの状態を見て首を振る。

「現在の状態だけを保存する設計だと、『なぜその状態になったか』がわからない。イベントソーシングなら、すべての出来事を記録する。銀行の通帳と同じだ。残高だけでなく、すべての入出金を記録する」


従来の設計 vs イベントソーシング

従来: 現在の状態を保存(State Sourcing)

-- 注文テーブル: 現在の状態のみ
UPDATE orders SET status = 'cancelled', updated_at = NOW() WHERE id = 1;
-- 以前の状態は上書きされて消える

イベントソーシング: 出来事を保存

-- イベントストア: すべての出来事を記録
INSERT INTO order_events (order_id, event_type, payload, occurred_at) VALUES
  (1, 'OrderCreated',   '{"userId": 100, "items": [...]}', '2024-03-15 10:00:00'),
  (1, 'PaymentReceived','{"amount": 5000, "method": "credit"}', '2024-03-15 10:05:00'),
  (1, 'ItemShipped',    '{"trackingId": "JP123456"}', '2024-03-16 09:00:00'),
  (1, 'RefundRequested','{"reason": "商品に傷がある"}', '2024-03-18 14:00:00'),
  (1, 'OrderCancelled', '{"refundAmount": 5000}', '2024-03-18 15:00:00');

-- 現在の状態は、イベントを順番に再生して導出する

イベントソーシングの仕組み

イベントストアの設計

CREATE TABLE events (
  id BIGSERIAL PRIMARY KEY,
  aggregate_type VARCHAR(100) NOT NULL,  -- 'Order', 'Account'
  aggregate_id VARCHAR(100) NOT NULL,     -- エンティティID
  event_type VARCHAR(100) NOT NULL,       -- 'OrderCreated', 'ItemAdded'
  payload JSONB NOT NULL,                 -- イベントデータ
  metadata JSONB,                         -- ユーザーID、リクエストID等
  version INT NOT NULL,                   -- 楽観的ロック用
  occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

  UNIQUE (aggregate_type, aggregate_id, version)
);

CREATE INDEX idx_events_aggregate ON events(aggregate_type, aggregate_id, version);

TypeScriptでの実装

// イベント型定義
type OrderEvent =
  | { type: 'OrderCreated'; data: { userId: number; items: OrderItem[] } }
  | { type: 'PaymentReceived'; data: { amount: number; method: string } }
  | { type: 'ItemShipped'; data: { trackingId: string } }
  | { type: 'OrderCancelled'; data: { reason: string; refundAmount: number } };

// 集約(Aggregate): イベントから状態を再構築
class OrderAggregate {
  id: string;
  status: string = 'unknown';
  items: OrderItem[] = [];
  totalPaid: number = 0;

  // イベントを適用して状態を更新
  apply(event: OrderEvent): void {
    switch (event.type) {
      case 'OrderCreated':
        this.status = 'pending';
        this.items = event.data.items;
        break;
      case 'PaymentReceived':
        this.status = 'paid';
        this.totalPaid += event.data.amount;
        break;
      case 'ItemShipped':
        this.status = 'shipped';
        break;
      case 'OrderCancelled':
        this.status = 'cancelled';
        break;
    }
  }

  // イベント列から状態を再構築
  static fromEvents(id: string, events: OrderEvent[]): OrderAggregate {
    const aggregate = new OrderAggregate();
    aggregate.id = id;
    for (const event of events) {
      aggregate.apply(event);
    }
    return aggregate;
  }
}

// イベントストア
class EventStore {
  async appendEvent(
    aggregateType: string,
    aggregateId: string,
    event: OrderEvent,
    expectedVersion: number
  ): Promise<void> {
    await db.query(`
      INSERT INTO events (aggregate_type, aggregate_id, event_type, payload, version)
      VALUES ($1, $2, $3, $4, $5)
    `, [aggregateType, aggregateId, event.type, JSON.stringify(event.data), expectedVersion + 1]);
  }

  async getEvents(aggregateType: string, aggregateId: string): Promise<OrderEvent[]> {
    const rows = await db.query(`
      SELECT event_type, payload FROM events
      WHERE aggregate_type = $1 AND aggregate_id = $2
      ORDER BY version ASC
    `, [aggregateType, aggregateId]);

    return rows.map(row => ({
      type: row.event_type,
      data: row.payload,
    }));
  }
}

スナップショット

イベント数が増えると、毎回全イベントを再生するのは非効率。定期的にスナップショットを保存する。

class SnapshotStore {
  async saveSnapshot(aggregate: OrderAggregate, version: number): Promise<void> {
    await db.query(`
      INSERT INTO snapshots (aggregate_type, aggregate_id, state, version)
      VALUES ('Order', $1, $2, $3)
      ON CONFLICT (aggregate_type, aggregate_id)
      DO UPDATE SET state = $2, version = $3
    `, [aggregate.id, JSON.stringify(aggregate), version]);
  }

  async loadAggregate(aggregateId: string): Promise<OrderAggregate> {
    // スナップショットから復元
    const snapshot = await this.getLatestSnapshot('Order', aggregateId);

    // スナップショット以降のイベントのみ再生
    const events = await this.eventStore.getEventsSince(
      'Order',
      aggregateId,
      snapshot?.version ?? 0
    );

    const aggregate = snapshot
      ? OrderAggregate.fromSnapshot(snapshot)
      : new OrderAggregate();

    for (const event of events) {
      aggregate.apply(event);
    }

    return aggregate;
  }
}

CQRS(Command Query Responsibility Segregation)

コマンド(書き込み)とクエリ(読み取り)で異なるモデルを使う。

Command(書き込み):                   Query(読み取り):
  ↓                                      ↓
[Write Model]                         [Read Model]
  ↓                                      ↑
[Event Store]  ──── 投影 ────→    [Read Database]
                  (Projection)

Projectionの実装

// イベントからRead Modelを構築(Projection)
class OrderProjection {
  async handleEvent(event: StoredEvent): Promise<void> {
    switch (event.event_type) {
      case 'OrderCreated':
        await this.readDb.query(`
          INSERT INTO order_summaries (id, user_id, status, item_count, created_at)
          VALUES ($1, $2, 'pending', $3, $4)
        `, [event.aggregate_id, event.payload.userId,
            event.payload.items.length, event.occurred_at]);
        break;

      case 'PaymentReceived':
        await this.readDb.query(`
          UPDATE order_summaries SET status = 'paid', paid_amount = $1
          WHERE id = $2
        `, [event.payload.amount, event.aggregate_id]);
        break;

      case 'OrderCancelled':
        await this.readDb.query(`
          UPDATE order_summaries SET status = 'cancelled'
          WHERE id = $1
        `, [event.aggregate_id]);
        break;
    }
  }
}

// Read側: 通常のSQLクエリで高速に読み取り
async function getOrderSummaries(userId: number) {
  return await readDb.query(`
    SELECT * FROM order_summaries
    WHERE user_id = $1
    ORDER BY created_at DESC
  `, [userId]);
}

イベントソーシングのメリット・デメリット

メリットデメリット
完全な監査証跡実装の複雑さ
時間遡行(any point in time)学習コスト
イベントからの多様なView構築結果整合性(Read Modelの遅延)
バグ修正後にデータ再構築可能イベントスキーマの進化が困難

適用判断

適用すべき場面避けるべき場面
完全な監査証跡が必要シンプルなCRUD
複雑なドメインロジック小規模なアプリ
多様なRead Modelが必要チームがパターンを理解していない
時間遡行が必要強い整合性が常に必要

まとめ

ポイント内容
イベントソーシング状態ではなくイベント(出来事)を記録
状態の導出イベントを再生して現在の状態を計算
スナップショットパフォーマンスのために定期的に状態を保存
CQRS書き込みと読み取りで別モデル
ProjectionイベントからRead Modelを構築

理解度チェックリスト

  • イベントソーシングと従来のCRUDの違いを説明できる
  • イベントストアの基本的な設計ができる
  • CQRSのCommand側とQuery側の役割を理解している
  • イベントソーシングの適用判断ができる

次のステップ

次は演習:整合性を保証する設計をしよう。ここまで学んだトランザクション、ロック、Sagaパターン、イベントソーシングを実際の設計に適用する。


推定読了時間: 40分