LESSON 30分

ストーリー

佐藤CTO
アーキテクチャスタイルの最後のテーマだ
佐藤CTO
ECサイトで注文一覧を表示するとき、どんなデータが必要だ?
あなた
注文情報に加えて、商品名、配送状況、支払い状況…複数テーブルのJOINが必要ですね
佐藤CTO
そうだ。読み取りは複雑なクエリが必要で、書き込みはビジネスルールの検証が必要だ。この”読み”と”書き”を分離するのがCQRSだ。さらにイベントソーシングと組み合わせると、完全な監査証跡と強力なデータ分析基盤が手に入る。ただし複雑さも増す。いつ使うべきかの判断が重要だ

CQRS(Command Query Responsibility Segregation)

CQRSとは

CQRSは、データの書き込み(Command)と読み取り(Query)を別々のモデルに分離するパターンです。

従来のCRUDアプローチ:
┌──────────┐    ┌──────────┐    ┌────────┐
│ Controller│───→│  Service  │───→│  同一   │
│ (CRUD)   │    │  (同一)   │    │  DB/   │
│          │←───│          │←───│ モデル  │
└──────────┘    └──────────┘    └────────┘

CQRSアプローチ:
┌───────────┐    ┌──────────────┐    ┌──────────┐
│ Command    │───→│ Command Model│───→│ Write DB │
│ Controller │    │ (ビジネスロジック) │    │ (正規化)  │
└───────────┘    └──────────────┘    └────┬─────┘
                                          │ 同期/非同期
┌───────────┐    ┌──────────────┐    ┌────▼─────┐
│ Query      │───→│ Query Model  │───→│ Read DB  │
│ Controller │    │ (単純な読み取り) │    │ (非正規化) │
└───────────┘    └──────────────┘    └──────────┘

なぜCQRSが必要か

課題従来のCRUDCQRS
読み取りの複雑なJOIN毎回JOINを実行事前にJOIN済みのビューを用意
書き込みのバリデーション読み取りモデルに引きずられる書き込みに最適化されたモデル
スケーリング読み書き同時にスケール読み書きを独立にスケール
パフォーマンス読み書きが互いに影響読み書きを独立に最適化

TypeScriptによるCQRS実装

// === Command側(書き込み) ===

// コマンド: 意図を表現
interface PlaceOrderCommand {
  customerId: string;
  items: Array<{ productId: string; quantity: number; unitPrice: number }>;
  shippingAddress: string;
}

// コマンドハンドラー: ビジネスロジックを実行
class PlaceOrderCommandHandler {
  constructor(
    private orderRepo: OrderWriteRepository,
    private inventoryService: InventoryPort
  ) {}

  async execute(command: PlaceOrderCommand): Promise<string> {
    // ビジネスルールの検証
    for (const item of command.items) {
      const available = await this.inventoryService.checkStock(
        item.productId, item.quantity
      );
      if (!available) {
        throw new Error(`商品 ${item.productId} の在庫が不足しています`);
      }
    }

    // エンティティを生成
    const order = Order.place({
      customerId: command.customerId,
      items: command.items.map(i => OrderItem.create(i)),
      shippingAddress: Address.of(command.shippingAddress),
    });

    // 書き込みモデルに保存(正規化されたテーブル)
    await this.orderRepo.save(order);

    return order.id.value;
  }
}

// 書き込みリポジトリ: 正規化されたデータ構造
interface OrderWriteRepository {
  save(order: Order): Promise<void>;
  findById(id: OrderId): Promise<Order | null>;
}

// === Query側(読み取り) ===

// クエリ: 何を知りたいかを表現
interface GetOrderListQuery {
  customerId: string;
  page: number;
  limit: number;
  status?: string;
}

// クエリ結果: 表示に最適化されたDTO
interface OrderListItemDto {
  orderId: string;
  orderDate: string;
  status: string;
  statusLabel: string;  // "配送中", "完了" etc.
  totalAmount: number;
  itemCount: number;
  firstItemName: string;  // 一覧表示用
  thumbnailUrl: string;   // 一覧表示用
}

// クエリハンドラー: JOINなしで高速に読み取り
class GetOrderListQueryHandler {
  constructor(private readDb: OrderReadRepository) {}

  async execute(query: GetOrderListQuery): Promise<{
    orders: OrderListItemDto[];
    total: number;
  }> {
    // 読み取りモデルから直接取得(JOINなし)
    return this.readDb.findOrderList(
      query.customerId,
      query.page,
      query.limit,
      query.status
    );
  }
}

// 読み取りリポジトリ: 非正規化されたビュー
interface OrderReadRepository {
  findOrderList(
    customerId: string,
    page: number,
    limit: number,
    status?: string
  ): Promise<{ orders: OrderListItemDto[]; total: number }>;
}

Read Modelの同期

// イベントを使ってRead Modelを更新する
class OrderProjection {
  constructor(private readDb: OrderReadDatabase) {}

  // 注文作成時
  async onOrderPlaced(event: OrderPlacedEvent): Promise<void> {
    await this.readDb.upsert('order_list_view', {
      orderId: event.orderId,
      customerId: event.customerId,
      orderDate: event.occurredAt,
      status: 'PLACED',
      statusLabel: '注文確定',
      totalAmount: event.totalAmount,
      itemCount: event.items.length,
      firstItemName: event.items[0].productName,
      thumbnailUrl: event.items[0].thumbnailUrl,
    });
  }

  // 配送開始時
  async onOrderShipped(event: OrderShippedEvent): Promise<void> {
    await this.readDb.update('order_list_view', event.orderId, {
      status: 'SHIPPED',
      statusLabel: '配送中',
      trackingNumber: event.trackingNumber,
    });
  }

  // キャンセル時
  async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
    await this.readDb.update('order_list_view', event.orderId, {
      status: 'CANCELLED',
      statusLabel: 'キャンセル済み',
      cancelReason: event.reason,
    });
  }
}

イベントソーシング(Event Sourcing)

イベントソーシングとは

イベントソーシングは、エンティティの現在の状態ではなく、状態変化の履歴(イベント)を永続化するパターンです。現在の状態はイベントを順に適用して再構築します。

従来のステート保存:
┌─────────────────────────┐
│ orders テーブル           │
│ id: O-001               │
│ status: SHIPPED         │ ← 現在の状態のみ
│ amount: 5000            │
│ updated_at: 2024-01-03  │
└─────────────────────────┘

イベントソーシング:
┌─────────────────────────────────────────────┐
│ order_events テーブル                        │
│                                              │
│ 1. OrderPlaced    { orderId: O-001, ... }   │
│ 2. PaymentReceived { amount: 5000, ... }    │
│ 3. OrderShipped   { trackingId: T-99, ... } │
│                                              │
│ 現在の状態 = イベント1 + 2 + 3 を順に適用    │
└─────────────────────────────────────────────┘

イベントストアの設計

// ドメインイベントの基底型
interface DomainEvent {
  eventId: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  payload: Record<string, unknown>;
  version: number;      // 楽観的ロック用
  occurredAt: Date;
  metadata: {
    userId?: string;
    correlationId?: string;
    causationId?: string;
  };
}

// 注文に関するドメインイベント
interface OrderPlacedEvent extends DomainEvent {
  eventType: 'OrderPlaced';
  payload: {
    customerId: string;
    items: Array<{ productId: string; quantity: number; unitPrice: number }>;
    shippingAddress: string;
  };
}

interface PaymentReceivedEvent extends DomainEvent {
  eventType: 'PaymentReceived';
  payload: {
    transactionId: string;
    amount: number;
    method: string;
  };
}

interface OrderShippedEvent extends DomainEvent {
  eventType: 'OrderShipped';
  payload: {
    trackingId: string;
    carrier: string;
    estimatedDelivery: string;
  };
}

interface OrderCancelledEvent extends DomainEvent {
  eventType: 'OrderCancelled';
  payload: {
    reason: string;
    cancelledBy: string;
  };
}

// イベントストアのインターフェース
interface EventStore {
  // イベントを追記(Append-Only)
  append(events: DomainEvent[]): Promise<void>;
  // 特定のアグリゲートのイベント履歴を取得
  getEvents(aggregateId: string): Promise<DomainEvent[]>;
  // 特定バージョン以降のイベントを取得
  getEventsSince(aggregateId: string, sinceVersion: number): Promise<DomainEvent[]>;
}

アグリゲートの状態再構築

// イベントソーシング対応のOrderアグリゲート
class Order {
  private _id: string;
  private _status: OrderStatus;
  private _customerId: string;
  private _items: OrderItem[] = [];
  private _totalAmount: number = 0;
  private _trackingId: string | null = null;
  private _version: number = 0;

  // 未コミットのイベント(保存待ち)
  private _uncommittedEvents: DomainEvent[] = [];

  // === ファクトリメソッド ===
  static place(command: PlaceOrderCommand): Order {
    const order = new Order();
    // イベントを適用(状態変更 + イベント記録)
    order.apply({
      eventId: crypto.randomUUID(),
      aggregateId: crypto.randomUUID(),
      aggregateType: 'Order',
      eventType: 'OrderPlaced',
      payload: {
        customerId: command.customerId,
        items: command.items,
        shippingAddress: command.shippingAddress,
      },
      version: 1,
      occurredAt: new Date(),
      metadata: {},
    });
    return order;
  }

  // === コマンドメソッド ===
  receivePayment(transactionId: string, amount: number, method: string): void {
    if (this._status !== 'PLACED') {
      throw new Error('支払い受付は注文確定状態でのみ可能です');
    }
    this.apply({
      eventId: crypto.randomUUID(),
      aggregateId: this._id,
      aggregateType: 'Order',
      eventType: 'PaymentReceived',
      payload: { transactionId, amount, method },
      version: this._version + 1,
      occurredAt: new Date(),
      metadata: {},
    });
  }

  ship(trackingId: string, carrier: string): void {
    if (this._status !== 'PAID') {
      throw new Error('配送は支払い完了状態でのみ可能です');
    }
    this.apply({
      eventId: crypto.randomUUID(),
      aggregateId: this._id,
      aggregateType: 'Order',
      eventType: 'OrderShipped',
      payload: {
        trackingId,
        carrier,
        estimatedDelivery: this.calculateEstimatedDelivery(),
      },
      version: this._version + 1,
      occurredAt: new Date(),
      metadata: {},
    });
  }

  cancel(reason: string, cancelledBy: string): void {
    if (this._status === 'SHIPPED' || this._status === 'DELIVERED') {
      throw new Error('配送済み・配達完了の注文はキャンセルできません');
    }
    this.apply({
      eventId: crypto.randomUUID(),
      aggregateId: this._id,
      aggregateType: 'Order',
      eventType: 'OrderCancelled',
      payload: { reason, cancelledBy },
      version: this._version + 1,
      occurredAt: new Date(),
      metadata: {},
    });
  }

  // === イベントの適用 ===
  private apply(event: DomainEvent): void {
    this.applyEvent(event);
    this._uncommittedEvents.push(event);
  }

  // イベントから状態を復元するメソッド
  private applyEvent(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced':
        this._id = event.aggregateId;
        this._status = 'PLACED';
        this._customerId = event.payload.customerId as string;
        this._items = (event.payload.items as any[]).map(i => ({
          productId: i.productId,
          quantity: i.quantity,
          unitPrice: i.unitPrice,
        }));
        this._totalAmount = this._items.reduce(
          (sum, i) => sum + i.quantity * i.unitPrice, 0
        );
        break;
      case 'PaymentReceived':
        this._status = 'PAID';
        break;
      case 'OrderShipped':
        this._status = 'SHIPPED';
        this._trackingId = event.payload.trackingId as string;
        break;
      case 'OrderCancelled':
        this._status = 'CANCELLED';
        break;
    }
    this._version = event.version;
  }

  // === イベント履歴からの復元 ===
  static fromEvents(events: DomainEvent[]): Order {
    const order = new Order();
    for (const event of events) {
      order.applyEvent(event);
    }
    return order;
  }

  // 未コミットイベントの取得とクリア
  getUncommittedEvents(): DomainEvent[] {
    return [...this._uncommittedEvents];
  }

  clearUncommittedEvents(): void {
    this._uncommittedEvents = [];
  }

  private calculateEstimatedDelivery(): string {
    const date = new Date();
    date.setDate(date.getDate() + 3);
    return date.toISOString();
  }

  // ゲッター
  get id(): string { return this._id; }
  get status(): OrderStatus { return this._status; }
  get version(): number { return this._version; }
}

type OrderStatus = 'PLACED' | 'PAID' | 'SHIPPED' | 'DELIVERED' | 'CANCELLED';

イベントソーシング対応のリポジトリ

class EventSourcedOrderRepository {
  constructor(private eventStore: EventStore) {}

  async save(order: Order): Promise<void> {
    const events = order.getUncommittedEvents();
    if (events.length === 0) return;

    await this.eventStore.append(events);
    order.clearUncommittedEvents();
  }

  async findById(orderId: string): Promise<Order | null> {
    const events = await this.eventStore.getEvents(orderId);
    if (events.length === 0) return null;
    return Order.fromEvents(events);
  }
}

Projections(プロジェクション)と Read Model

プロジェクションとは

プロジェクションは、イベントストリームを消費して特定の目的に最適化された読み取りモデルを構築するプロセスです。

graph LR
    ES["Event Store<br/>OrderPlaced<br/>PaymentReceived<br/>OrderShipped<br/>OrderCancelled<br/>..."]
    ES --> P1["Projection:<br/>注文一覧"]
    ES --> P2["Projection:<br/>売上集計"]
    ES --> P3["Projection:<br/>顧客分析"]
    ES --> P4["Projection:<br/>在庫追跡"]
    P1 --> V1["order_list_view"]
    P2 --> V2["daily_sales_view"]
    P3 --> V3["customer_analytics_view"]
    P4 --> V4["inventory_tracking_view"]

    classDef esStyle fill:#4a90d9,stroke:#2c5f8a,color:#fff
    classDef projStyle fill:#e8a838,stroke:#b07c1e,color:#fff
    classDef viewStyle fill:#5cb85c,stroke:#3d8b3d,color:#fff

    class ES esStyle
    class P1,P2,P3,P4 projStyle
    class V1,V2,V3,V4 viewStyle

※ 1つのイベントストリームから複数の読み取りモデルを生成可能 ※ 新しい要件が生まれたら、新しいプロジェクションを追加するだけ

// 売上集計プロジェクション
class DailySalesProjection {
  constructor(private readDb: ReadDatabase) {}

  async onOrderPlaced(event: OrderPlacedEvent): Promise<void> {
    const date = event.occurredAt.toISOString().split('T')[0];
    await this.readDb.upsert('daily_sales', {
      date,
      orderCount: { $increment: 1 },
      totalAmount: { $increment: event.payload.totalAmount },
    });
  }

  async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
    const originalOrder = await this.readDb.get('order_list_view', event.aggregateId);
    if (!originalOrder) return;

    const date = originalOrder.orderDate.split('T')[0];
    await this.readDb.upsert('daily_sales', {
      date,
      orderCount: { $increment: -1 },
      totalAmount: { $increment: -originalOrder.totalAmount },
      cancelCount: { $increment: 1 },
    });
  }

  // イベントを最初から再処理してRead Modelを再構築
  async rebuild(): Promise<void> {
    await this.readDb.truncate('daily_sales');
    const allEvents = await this.eventStore.getAllEvents();
    for (const event of allEvents) {
      if (event.eventType === 'OrderPlaced') await this.onOrderPlaced(event);
      if (event.eventType === 'OrderCancelled') await this.onOrderCancelled(event);
    }
  }
}

CQRSとイベントソーシングの組み合わせ

全体アーキテクチャ

                        Command側
┌───────────┐    ┌──────────────┐    ┌──────────────┐
│  POST/PUT  │───→│ Command      │───→│ Event Store  │
│  API       │    │ Handler      │    │ (Append-Only)│
└───────────┘    └──────────────┘    └──────┬───────┘

                                     イベント発行

                        Query側             ▼
┌───────────┐    ┌──────────────┐    ┌──────────────┐
│  GET API   │───→│ Query        │    │ Projections  │
│            │    │ Handler      │    │ (非同期更新)  │
└───────────┘    └──────┬───────┘    └──────┬───────┘
                        │                    │
                        ▼                    ▼
                  ┌──────────────────────────────┐
                  │        Read Database          │
                  │   (非正規化されたビュー)       │
                  └──────────────────────────────┘

いつ使い分けるか

パターン単独使用組み合わせ
CQRS のみ読み書きの負荷が非対称なシステム-
ES のみ監査証跡が重要なシステム-
CQRS + ES-複雑なドメイン + 高い読み取り性能 + 完全な監査証跡
CQRSのみを使うケース:
  - 読み取りが書き込みより圧倒的に多い(SNSのタイムライン等)
  - 読み取りに最適化されたビューが必要
  - イベント履歴までは不要

ESのみを使うケース:
  - 完全な監査証跡が法的に必要(金融、医療)
  - 状態の変化履歴を分析したい
  - 読み書きの分離は不要

CQRS + ESを使うケース:
  - 上記の両方が求められる場合
  - 複雑なドメインモデルと高い読み取り性能の両立

落とし穴と複雑さの警告

よくある落とし穴

落とし穴説明対策
過剰適用単純なCRUDにCQRS/ESを適用してしまう複雑さが正当化される場面でのみ使用
イベントスキーマの変更イベント構造を変更すると過去の再生に影響イベントのバージョニング戦略を事前に決定
結果整合性の誤解書き込み直後に読み取りが反映されていないUIでの楽観的更新、ポーリング
イベント爆発細かすぎるイベントが大量に発生適切な粒度の設計、スナップショット
テストの複雑さイベント駆動のテストが難しいGiven-When-Thenパターンのテスト

イベントのバージョニング

// バージョン1のイベント
interface OrderPlacedV1 {
  eventType: 'OrderPlaced';
  version: 1;
  payload: {
    customerId: string;
    items: Array<{ productId: string; quantity: number }>;
  };
}

// バージョン2: 配送先住所を追加
interface OrderPlacedV2 {
  eventType: 'OrderPlaced';
  version: 2;
  payload: {
    customerId: string;
    items: Array<{ productId: string; quantity: number; unitPrice: number }>;
    shippingAddress: string;  // 新しいフィールド
  };
}

// アップキャスター: V1をV2に変換
function upcastOrderPlaced(event: DomainEvent): DomainEvent {
  if (event.eventType === 'OrderPlaced' && event.version === 1) {
    return {
      ...event,
      version: 2,
      payload: {
        ...event.payload,
        items: (event.payload.items as any[]).map(i => ({
          ...i,
          unitPrice: 0,  // デフォルト値
        })),
        shippingAddress: 'unknown',  // デフォルト値
      },
    };
  }
  return event;
}

スナップショットによるパフォーマンス最適化

// イベント数が多い場合、毎回全イベントを再生するのは非効率
// スナップショットで途中の状態を保存する
interface Snapshot {
  aggregateId: string;
  version: number;
  state: Record<string, unknown>;
  createdAt: Date;
}

class SnapshotRepository {
  async save(snapshot: Snapshot): Promise<void> { /* ... */ }
  async getLatest(aggregateId: string): Promise<Snapshot | null> { /* ... */ }
}

class EventSourcedOrderRepositoryWithSnapshot {
  constructor(
    private eventStore: EventStore,
    private snapshotRepo: SnapshotRepository,
    private snapshotInterval: number = 100  // 100イベントごとにスナップショット
  ) {}

  async findById(orderId: string): Promise<Order | null> {
    // 1. 最新のスナップショットを取得
    const snapshot = await this.snapshotRepo.getLatest(orderId);

    let events: DomainEvent[];
    if (snapshot) {
      // 2a. スナップショット以降のイベントだけ取得
      events = await this.eventStore.getEventsSince(orderId, snapshot.version);
      const order = Order.fromSnapshot(snapshot.state);
      for (const event of events) {
        order.applyEvent(event);
      }
      return order;
    } else {
      // 2b. 全イベントから復元
      events = await this.eventStore.getEvents(orderId);
      if (events.length === 0) return null;
      return Order.fromEvents(events);
    }
  }

  async save(order: Order): Promise<void> {
    const events = order.getUncommittedEvents();
    await this.eventStore.append(events);

    // スナップショットの作成判定
    if (order.version % this.snapshotInterval === 0) {
      await this.snapshotRepo.save({
        aggregateId: order.id,
        version: order.version,
        state: order.toSnapshot(),
        createdAt: new Date(),
      });
    }

    order.clearUncommittedEvents();
  }
}

CQRSを使わないべきケース

CQRSを避けるべき場面:
  - 単純なCRUDアプリケーション
  - チームにCQRS/ESの経験がない(学習コストが高い)
  - 読み書きの負荷パターンが類似している
  - 強い整合性が全ての操作で必要

まずは単純なアプローチから始め、
必要性が明確になった部分にのみCQRSを適用するのが推奨。

まとめ

ポイント内容
CQRS読み取りと書き込みを分離し、それぞれに最適化
イベントソーシング状態変化をイベントとして永続化、完全な履歴を保持
プロジェクションイベントストリームから目的別の読み取りモデルを構築
組み合わせCQRS + ESで高い読み取り性能と完全な監査証跡を両立
結果整合性Read Modelの更新は非同期、一時的な不整合が発生
複雑さ過剰適用は避け、複雑さが正当化される場面でのみ使用
スナップショットイベント数が多い場合のパフォーマンス最適化手段

チェックリスト

  • CQRSの目的(読み書きの分離と最適化)を説明できる
  • イベントソーシングの概念(状態ではなくイベントを保存)を理解した
  • プロジェクションの役割を説明できる
  • CQRSとESをそれぞれ単独で使う場面を区別できる
  • イベントのバージョニング問題を理解した
  • 過剰適用のリスクを認識している

次のステップへ

次は「演習:アーキテクチャ選択肢を評価しよう」に取り組みます。Step 3で学んだ4つのアーキテクチャスタイルの知識を活用し、具体的なシナリオに対して最適なアーキテクチャを選定しましょう。


推定読了時間: 30分