LESSON 30分

ストーリー

あなた
CQRSの概念はわかりました。でも、具体的にWrite ModelとRead Modelをどう設計するんですか? 2つのモデルのデータはどうやって同期するんですか?
高橋アーキテクト
良い質問だ。Write Modelはビジネスルールの番人、Read Modelは表示の達人。それぞれ全く異なる設計思想で作るんだ

Write Model(コマンドモデル)

ビジネスルールの整合性を守ることに特化したモデルです。

// Write Model: 正規化された設計
// orders テーブル
interface OrderEntity {
  id: string;
  userId: string;
  status: "PENDING" | "CONFIRMED" | "SHIPPED" | "DELIVERED" | "CANCELLED";
  createdAt: Date;
  updatedAt: Date;
}

// order_items テーブル(正規化)
interface OrderItemEntity {
  id: string;
  orderId: string;      // FK → orders
  productId: string;    // FK → products
  quantity: number;
  unitPrice: number;
}

// Write Model のリポジトリ
class OrderWriteRepository {
  async save(order: CreateOrderCommand): Promise<Order> {
    return await this.db.transaction(async (tx) => {
      const orderRecord = await tx.insert("orders", {
        userId: order.userId,
        status: "PENDING",
      });

      for (const item of order.items) {
        await tx.insert("order_items", {
          orderId: orderRecord.id,
          productId: item.productId,
          quantity: item.quantity,
          unitPrice: item.unitPrice,
        });
      }

      return orderRecord;
    });
  }
}

Read Model(クエリモデル)

表示・検索のパフォーマンスに特化したモデルです。

// Read Model: 非正規化された設計
// 1回のクエリで必要な情報をすべて取得
interface OrderReadModel {
  orderId: string;
  userId: string;
  userName: string;          // users テーブルの情報を埋め込み
  userEmail: string;
  status: string;
  items: Array<{
    productId: string;
    productName: string;     // products テーブルの情報を埋め込み
    productImage: string;
    quantity: number;
    unitPrice: number;
    subtotal: number;        // 計算済みの値を保持
  }>;
  totalAmount: number;       // 合計金額(計算済み)
  itemCount: number;         // 商品数(計算済み)
  createdAt: string;
  lastUpdatedAt: string;
}

// Read Model のクエリ(JOINなし、高速)
class OrderReadRepository {
  // DynamoDB や Elasticsearch を使用
  async getByUserId(userId: string): Promise<OrderReadModel[]> {
    return await this.readStore.query({
      index: "userId-createdAt-index",
      keyCondition: { userId },
      scanIndexForward: false,  // 新しい順
      limit: 20,
    });
  }

  // 全文検索も可能(Elasticsearch)
  async search(query: string): Promise<OrderReadModel[]> {
    return await this.elasticsearch.search({
      index: "orders",
      body: {
        query: {
          multi_match: {
            query,
            fields: ["userName", "items.productName"],
          },
        },
      },
    });
  }
}

Read Modelの同期(プロジェクション)

Write Modelの変更をRead Modelに反映するプロセスを「プロジェクション」と呼びます。

// プロジェクション: イベントを購読してRead Modelを更新
class OrderProjection {
  constructor(
    private readStore: ReadStore,
    private userService: UserService,
    private productService: ProductService
  ) {}

  // order.created イベントを処理
  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    // 関連データを取得(非正規化に必要)
    const user = await this.userService.getUser(event.data.userId);
    const products = await this.productService.getProductsBatch(
      event.data.items.map(i => i.productId)
    );

    // Read Modelを作成
    const readModel: OrderReadModel = {
      orderId: event.data.orderId,
      userId: event.data.userId,
      userName: user.name,
      userEmail: user.email,
      status: "PENDING",
      items: event.data.items.map(item => {
        const product = products.find(p => p.id === item.productId)!;
        return {
          productId: item.productId,
          productName: product.name,
          productImage: product.imageUrl,
          quantity: item.quantity,
          unitPrice: item.unitPrice,
          subtotal: item.quantity * item.unitPrice,
        };
      }),
      totalAmount: event.data.totalAmount,
      itemCount: event.data.items.length,
      createdAt: event.metadata.timestamp,
      lastUpdatedAt: event.metadata.timestamp,
    };

    await this.readStore.put(readModel);
  }

  // order.status_changed イベントを処理
  async handleOrderStatusChanged(event: OrderStatusChangedEvent): Promise<void> {
    await this.readStore.update(event.data.orderId, {
      status: event.data.newStatus,
      lastUpdatedAt: event.metadata.timestamp,
    });
  }
}

同期方式の選択

方式遅延一貫性実装複雑度
同期プロジェクションなし強い一貫性低い
非同期プロジェクション秒〜分結果整合性中程度
CDC + ストリーム処理ミリ秒ほぼリアルタイム高い
同期プロジェクション:
  Command → Write DB → 同じトランザクション内で → Read DB
  ✓ 一貫性が高い  ✗ Write性能に影響

非同期プロジェクション:
  Command → Write DB → Event → [Consumer] → Read DB
  ✓ Write性能に影響なし  ✗ 遅延あり(結果整合性)

CDC + ストリーム処理:
  Command → Write DB → [Debezium] → [Kafka] → [Consumer] → Read DB
  ✓ 低遅延  ✗ インフラ複雑

Read Modelのリビルド

Read Modelが破損した場合や、スキーマを変更した場合にゼロから再構築できます。

// Read Modelのリビルド
class ReadModelRebuilder {
  async rebuild(eventStore: EventStore, projection: Projection): Promise<void> {
    // 1. 既存のRead Modelをクリア
    await this.readStore.clear();

    // 2. イベントストアから全イベントを再生
    const events = eventStore.getAllEvents();

    for await (const event of events) {
      await projection.handle(event);
    }

    console.log("Read Model rebuild completed");
  }
}

// 利点: Read Modelは「使い捨て」
// → 壊れても、イベントから再構築できる
// → スキーマ変更も、新スキーマで再構築するだけ

技術スタックの例

パターン1: PostgreSQL + Elasticsearch
  Write: PostgreSQL(ACID、正規化)
  Read: Elasticsearch(全文検索、高速クエリ)
  同期: イベント経由で非同期プロジェクション

パターン2: PostgreSQL + Redis
  Write: PostgreSQL(ACID、正規化)
  Read: Redis(キャッシュ、超高速読み取り)
  同期: イベント経由でキャッシュ更新

パターン3: DynamoDB + DynamoDB
  Write: DynamoDB(シングルテーブル設計)
  Read: DynamoDB(GSI + 非正規化ビュー)
  同期: DynamoDB Streams

パターン4: PostgreSQL + DynamoDB
  Write: PostgreSQL(複雑なトランザクション)
  Read: DynamoDB(高スケーラビリティ)
  同期: CDC(Debezium) → Kafka → Lambda

まとめ

ポイント内容
Write Model正規化、ビジネスルール中心、ACID
Read Model非正規化、表示・検索最適化、JOINなし
プロジェクションイベントを購読してRead Modelを更新
リビルドRead Modelはイベントから再構築可能

チェックリスト

  • Write ModelとRead Modelの設計思想の違いを説明できる
  • プロジェクションの仕組みを理解した
  • 同期/非同期プロジェクションの選択基準を説明できる
  • Read Modelのリビルドの仕組みを理解した

次のステップへ

次は「結果整合性」を深掘りします。Read ModelがWrite Modelと一時的にずれる状況をどう受け入れ、ユーザー体験を損なわないかを学びましょう。


推定読了時間: 30分