LESSON 30分

ストーリー

佐藤CTO
従来のCRUDでは”現在の状態”しか保存しない。だがビジネスは”何が起きたか”を知りたがっている
佐藤CTO
イベントソーシングは全ての変更をイベントとして記録する。銀行の台帳と同じ考え方だ。CQRSと組み合わせれば、書き込みと読み取りをそれぞれ最適化できる

イベントソーシングの基本概念

CRUD vs イベントソーシング

観点CRUDイベントソーシング
データ保存現在の状態のみ全イベントの履歴
更新UPDATE文で上書き新しいイベントを追加
監査証跡別途実装が必要自然に実現
デバッグ「なぜこの状態か」が不明再生して原因特定可能
ストレージ効率的イベント蓄積でサイズ増大

イベントストアの設計

// イベントの基本インターフェース
interface DomainEvent {
  eventId: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  version: number;       // 楽観的ロック用
  payload: Record<string, unknown>;
  metadata: {
    userId: string;
    timestamp: Date;
    correlationId: string;
  };
}

// 注文に関するドメインイベント
type OrderEvent =
  | { eventType: 'OrderCreated'; payload: { customerId: string; items: OrderItem[] } }
  | { eventType: 'OrderItemAdded'; payload: { productId: string; quantity: number; price: number } }
  | { eventType: 'OrderConfirmed'; payload: { confirmedAt: Date } }
  | { eventType: 'PaymentReceived'; payload: { paymentId: string; amount: number } }
  | { eventType: 'OrderShipped'; payload: { trackingNumber: string; carrier: string } }
  | { eventType: 'OrderCancelled'; payload: { reason: string; cancelledBy: string } };
-- イベントストアのテーブル設計
CREATE TABLE event_store (
  event_id       UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  aggregate_id   UUID NOT NULL,
  aggregate_type VARCHAR(100) NOT NULL,
  event_type     VARCHAR(100) NOT NULL,
  version        INT NOT NULL,
  payload        JSONB NOT NULL,
  metadata       JSONB NOT NULL DEFAULT '{}',
  created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),

  -- 同一集約内でのバージョンの一意性(楽観的ロック)
  UNIQUE (aggregate_id, version)
);

-- インデックス
CREATE INDEX idx_event_store_aggregate
  ON event_store (aggregate_id, version ASC);
CREATE INDEX idx_event_store_type
  ON event_store (event_type, created_at);
CREATE INDEX idx_event_store_correlation
  ON event_store ((metadata->>'correlationId'));

イベントソーシングの実装

Aggregate のリハイドレーション

class OrderAggregate {
  private id: string;
  private status: OrderStatus = OrderStatus.DRAFT;
  private items: Map<string, OrderItem> = new Map();
  private totalAmount: number = 0;
  private version: number = 0;

  // イベントから状態を復元(リハイドレーション)
  static fromEvents(events: DomainEvent[]): OrderAggregate {
    const order = new OrderAggregate();
    for (const event of events) {
      order.apply(event, false); // isNew = false
    }
    return order;
  }

  // コマンド: ビジネスルールを検証してイベントを生成
  confirm(): DomainEvent {
    if (this.status !== OrderStatus.DRAFT) {
      throw new Error('確定済みの注文は再確定できません');
    }
    if (this.items.size === 0) {
      throw new Error('空の注文は確定できません');
    }

    const event: DomainEvent = {
      eventId: crypto.randomUUID(),
      aggregateId: this.id,
      aggregateType: 'Order',
      eventType: 'OrderConfirmed',
      version: this.version + 1,
      payload: { confirmedAt: new Date() },
      metadata: { userId: '', timestamp: new Date(), correlationId: '' },
    };

    this.apply(event, true);
    return event;
  }

  // イベントを適用して状態を変更
  private apply(event: DomainEvent, isNew: boolean): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.id = event.aggregateId;
        this.status = OrderStatus.DRAFT;
        break;
      case 'OrderItemAdded':
        const item = event.payload as { productId: string; quantity: number; price: number };
        this.items.set(item.productId, item);
        this.totalAmount += item.quantity * item.price;
        break;
      case 'OrderConfirmed':
        this.status = OrderStatus.CONFIRMED;
        break;
      case 'OrderCancelled':
        this.status = OrderStatus.CANCELLED;
        break;
    }
    this.version = event.version;
  }
}

スナップショットによる最適化

// イベント数が増えるとリハイドレーションが遅くなる
// → N件ごとにスナップショットを保存

interface Snapshot {
  aggregateId: string;
  aggregateType: string;
  version: number;
  state: Record<string, unknown>;
  createdAt: Date;
}

class EventStore {
  private readonly SNAPSHOT_INTERVAL = 100;

  async loadAggregate(aggregateId: string): Promise<OrderAggregate> {
    // 1. 最新のスナップショットを取得
    const snapshot = await this.getLatestSnapshot(aggregateId);

    // 2. スナップショット以降のイベントのみ取得
    const fromVersion = snapshot ? snapshot.version + 1 : 0;
    const events = await this.getEvents(aggregateId, fromVersion);

    // 3. スナップショット + 差分イベントで復元
    const aggregate = snapshot
      ? OrderAggregate.fromSnapshot(snapshot, events)
      : OrderAggregate.fromEvents(events);

    return aggregate;
  }

  async saveEvents(
    aggregateId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    // 楽観的ロック: 期待するバージョンと一致しなければ競合エラー
    await this.appendEvents(aggregateId, events, expectedVersion);

    // スナップショット条件チェック
    const latestVersion = events[events.length - 1].version;
    if (latestVersion % this.SNAPSHOT_INTERVAL === 0) {
      const aggregate = await this.loadAggregate(aggregateId);
      await this.saveSnapshot(aggregate);
    }
  }
}

CQRSパターン

コマンドとクエリの分離

graph LR
    CMD["Command<br/>(書き込み)"] --> WM["Write Model<br/>(Domain)"] --> ES["Event Store<br/>(真実の源泉)"]
    ES -- "Event<br/>Projection" --> RDB["Read DB<br/>(非正規化)"]
    RDB --> RM["Read Model<br/>(Optimized)"] --> QRY["Query<br/>(読み取り)"]

    classDef write fill:#fee2e2,stroke:#ef4444
    classDef read fill:#dbeafe,stroke:#3b82f6
    classDef store fill:#fef3c7,stroke:#f59e0b
    class CMD,WM write
    class QRY,RM read
    class ES,RDB store

Projection(読み取りモデル構築)

// イベントを購読して読み取り用ビューを構築
class OrderSummaryProjection {
  constructor(private readonly readDb: ReadDatabase) {}

  async handle(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.readDb.insert('order_summaries', {
          orderId: event.aggregateId,
          customerId: event.payload.customerId,
          status: 'draft',
          itemCount: 0,
          totalAmount: 0,
          createdAt: event.metadata.timestamp,
        });
        break;

      case 'OrderItemAdded':
        await this.readDb.increment('order_summaries', event.aggregateId, {
          itemCount: 1,
          totalAmount: event.payload.price * event.payload.quantity,
        });
        break;

      case 'OrderConfirmed':
        await this.readDb.update('order_summaries', event.aggregateId, {
          status: 'confirmed',
          confirmedAt: event.payload.confirmedAt,
        });
        break;

      case 'OrderCancelled':
        await this.readDb.update('order_summaries', event.aggregateId, {
          status: 'cancelled',
          cancelledAt: event.metadata.timestamp,
          cancellationReason: event.payload.reason,
        });
        break;
    }
  }
}

// 複数の読み取りモデルを並列構築
class CustomerOrderHistoryProjection {
  async handle(event: DomainEvent): Promise<void> {
    if (event.eventType === 'OrderConfirmed') {
      // 顧客ごとの注文履歴ビュー(別テーブル/別DB)
      await this.readDb.appendToList(
        `customer_orders:${event.payload.customerId}`,
        {
          orderId: event.aggregateId,
          confirmedAt: event.payload.confirmedAt,
          totalAmount: event.payload.totalAmount,
        }
      );
    }
  }
}

イベントソーシング + CQRSの注意点

結果整合性(Eventual Consistency)への対応

CQRSでは書き込みと読み取りの間に遅延が発生します。

// 問題: 注文直後にリダイレクトすると、まだProjectionに反映されていない
// → Read-your-writes consistency を実装

class OrderController {
  async createOrder(req: Request, res: Response): Promise<void> {
    const orderId = await this.commandBus.execute(new CreateOrderCommand(req.body));

    // 方法1: Projectionの反映を待つ(最大3秒)
    await this.waitForProjection(orderId, 3000);

    // 方法2: コマンドの結果を直接返す(Projectionを経由しない)
    res.json({
      orderId,
      status: 'created',
      message: '注文が作成されました',
    });
  }

  // 方法3: バージョンベースのポーリング
  async getOrder(orderId: string, afterVersion?: number): Promise<Order> {
    const order = await this.readModel.getOrder(orderId);
    if (afterVersion && order.version < afterVersion) {
      // まだ反映されていない → リトライまたはWrite Modelから直接取得
      return this.writeModel.getOrder(orderId);
    }
    return order;
  }
}
イベントスキーマの進化

イベントは不変であるため、スキーマ変更は慎重に行う必要があります。

// アップキャスティング: 古いイベントを新しい形式に変換
class EventUpcaster {
  upcast(event: DomainEvent): DomainEvent {
    switch (event.eventType) {
      case 'OrderCreated':
        // v1 → v2: currency フィールドを追加
        if (!event.payload.currency) {
          return {
            ...event,
            payload: {
              ...event.payload,
              currency: 'JPY',  // デフォルト値
            },
          };
        }
        return event;

      default:
        return event;
    }
  }
}

まとめ

ポイント内容
イベントソーシング状態ではなくイベントの履歴を保存する
リハイドレーションイベントを再生して現在の状態を復元
スナップショットN件ごとに状態を保存して復元を高速化
CQRSコマンドとクエリを分離し、それぞれ最適化
Projectionイベントから読み取り用ビューを非同期構築

チェックリスト

  • CRUD とイベントソーシングの違いを説明できる
  • イベントストアの設計(スキーマ、楽観的ロック)を理解した
  • CQRSの書き込みモデルと読み取りモデルの分離を理解した
  • 結果整合性への対処方法を理解した

次のステップへ

次はデータメッシュの考え方を学び、ドメイン駆動のデータ所有権と自己充足型データプラットフォームを理解します。


推定読了時間: 30分