ストーリー
CQRS(Command Query Responsibility Segregation)
CQRSとは
CQRSは、データの書き込み(Command)と読み取り(Query)を別々のモデルに分離するパターンです。
従来のCRUDアプローチ:
┌──────────┐ ┌──────────┐ ┌────────┐
│ Controller│───→│ Service │───→│ 同一 │
│ (CRUD) │ │ (同一) │ │ DB/ │
│ │←───│ │←───│ モデル │
└──────────┘ └──────────┘ └────────┘
CQRSアプローチ:
┌───────────┐ ┌──────────────┐ ┌──────────┐
│ Command │───→│ Command Model│───→│ Write DB │
│ Controller │ │ (ビジネスロジック) │ │ (正規化) │
└───────────┘ └──────────────┘ └────┬─────┘
│ 同期/非同期
┌───────────┐ ┌──────────────┐ ┌────▼─────┐
│ Query │───→│ Query Model │───→│ Read DB │
│ Controller │ │ (単純な読み取り) │ │ (非正規化) │
└───────────┘ └──────────────┘ └──────────┘
なぜCQRSが必要か
| 課題 | 従来のCRUD | CQRS |
|---|---|---|
| 読み取りの複雑なJOIN | 毎回JOINを実行 | 事前にJOIN済みのビューを用意 |
| 書き込みのバリデーション | 読み取りモデルに引きずられる | 書き込みに最適化されたモデル |
| スケーリング | 読み書き同時にスケール | 読み書きを独立にスケール |
| パフォーマンス | 読み書きが互いに影響 | 読み書きを独立に最適化 |
TypeScriptによるCQRS実装
// === Command側(書き込み) ===
// コマンド: 意図を表現
interface PlaceOrderCommand {
customerId: string;
items: Array<{ productId: string; quantity: number; unitPrice: number }>;
shippingAddress: string;
}
// コマンドハンドラー: ビジネスロジックを実行
class PlaceOrderCommandHandler {
constructor(
private orderRepo: OrderWriteRepository,
private inventoryService: InventoryPort
) {}
async execute(command: PlaceOrderCommand): Promise<string> {
// ビジネスルールの検証
for (const item of command.items) {
const available = await this.inventoryService.checkStock(
item.productId, item.quantity
);
if (!available) {
throw new Error(`商品 ${item.productId} の在庫が不足しています`);
}
}
// エンティティを生成
const order = Order.place({
customerId: command.customerId,
items: command.items.map(i => OrderItem.create(i)),
shippingAddress: Address.of(command.shippingAddress),
});
// 書き込みモデルに保存(正規化されたテーブル)
await this.orderRepo.save(order);
return order.id.value;
}
}
// 書き込みリポジトリ: 正規化されたデータ構造
interface OrderWriteRepository {
save(order: Order): Promise<void>;
findById(id: OrderId): Promise<Order | null>;
}
// === Query側(読み取り) ===
// クエリ: 何を知りたいかを表現
interface GetOrderListQuery {
customerId: string;
page: number;
limit: number;
status?: string;
}
// クエリ結果: 表示に最適化されたDTO
interface OrderListItemDto {
orderId: string;
orderDate: string;
status: string;
statusLabel: string; // "配送中", "完了" etc.
totalAmount: number;
itemCount: number;
firstItemName: string; // 一覧表示用
thumbnailUrl: string; // 一覧表示用
}
// クエリハンドラー: JOINなしで高速に読み取り
class GetOrderListQueryHandler {
constructor(private readDb: OrderReadRepository) {}
async execute(query: GetOrderListQuery): Promise<{
orders: OrderListItemDto[];
total: number;
}> {
// 読み取りモデルから直接取得(JOINなし)
return this.readDb.findOrderList(
query.customerId,
query.page,
query.limit,
query.status
);
}
}
// 読み取りリポジトリ: 非正規化されたビュー
interface OrderReadRepository {
findOrderList(
customerId: string,
page: number,
limit: number,
status?: string
): Promise<{ orders: OrderListItemDto[]; total: number }>;
}
Read Modelの同期
// イベントを使ってRead Modelを更新する
class OrderProjection {
constructor(private readDb: OrderReadDatabase) {}
// 注文作成時
async onOrderPlaced(event: OrderPlacedEvent): Promise<void> {
await this.readDb.upsert('order_list_view', {
orderId: event.orderId,
customerId: event.customerId,
orderDate: event.occurredAt,
status: 'PLACED',
statusLabel: '注文確定',
totalAmount: event.totalAmount,
itemCount: event.items.length,
firstItemName: event.items[0].productName,
thumbnailUrl: event.items[0].thumbnailUrl,
});
}
// 配送開始時
async onOrderShipped(event: OrderShippedEvent): Promise<void> {
await this.readDb.update('order_list_view', event.orderId, {
status: 'SHIPPED',
statusLabel: '配送中',
trackingNumber: event.trackingNumber,
});
}
// キャンセル時
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
await this.readDb.update('order_list_view', event.orderId, {
status: 'CANCELLED',
statusLabel: 'キャンセル済み',
cancelReason: event.reason,
});
}
}
イベントソーシング(Event Sourcing)
イベントソーシングとは
イベントソーシングは、エンティティの現在の状態ではなく、状態変化の履歴(イベント)を永続化するパターンです。現在の状態はイベントを順に適用して再構築します。
従来のステート保存:
┌─────────────────────────┐
│ orders テーブル │
│ id: O-001 │
│ status: SHIPPED │ ← 現在の状態のみ
│ amount: 5000 │
│ updated_at: 2024-01-03 │
└─────────────────────────┘
イベントソーシング:
┌─────────────────────────────────────────────┐
│ order_events テーブル │
│ │
│ 1. OrderPlaced { orderId: O-001, ... } │
│ 2. PaymentReceived { amount: 5000, ... } │
│ 3. OrderShipped { trackingId: T-99, ... } │
│ │
│ 現在の状態 = イベント1 + 2 + 3 を順に適用 │
└─────────────────────────────────────────────┘
イベントストアの設計
// ドメインイベントの基底型
interface DomainEvent {
eventId: string;
aggregateId: string;
aggregateType: string;
eventType: string;
payload: Record<string, unknown>;
version: number; // 楽観的ロック用
occurredAt: Date;
metadata: {
userId?: string;
correlationId?: string;
causationId?: string;
};
}
// 注文に関するドメインイベント
interface OrderPlacedEvent extends DomainEvent {
eventType: 'OrderPlaced';
payload: {
customerId: string;
items: Array<{ productId: string; quantity: number; unitPrice: number }>;
shippingAddress: string;
};
}
interface PaymentReceivedEvent extends DomainEvent {
eventType: 'PaymentReceived';
payload: {
transactionId: string;
amount: number;
method: string;
};
}
interface OrderShippedEvent extends DomainEvent {
eventType: 'OrderShipped';
payload: {
trackingId: string;
carrier: string;
estimatedDelivery: string;
};
}
interface OrderCancelledEvent extends DomainEvent {
eventType: 'OrderCancelled';
payload: {
reason: string;
cancelledBy: string;
};
}
// イベントストアのインターフェース
interface EventStore {
// イベントを追記(Append-Only)
append(events: DomainEvent[]): Promise<void>;
// 特定のアグリゲートのイベント履歴を取得
getEvents(aggregateId: string): Promise<DomainEvent[]>;
// 特定バージョン以降のイベントを取得
getEventsSince(aggregateId: string, sinceVersion: number): Promise<DomainEvent[]>;
}
アグリゲートの状態再構築
// イベントソーシング対応のOrderアグリゲート
class Order {
private _id: string;
private _status: OrderStatus;
private _customerId: string;
private _items: OrderItem[] = [];
private _totalAmount: number = 0;
private _trackingId: string | null = null;
private _version: number = 0;
// 未コミットのイベント(保存待ち)
private _uncommittedEvents: DomainEvent[] = [];
// === ファクトリメソッド ===
static place(command: PlaceOrderCommand): Order {
const order = new Order();
// イベントを適用(状態変更 + イベント記録)
order.apply({
eventId: crypto.randomUUID(),
aggregateId: crypto.randomUUID(),
aggregateType: 'Order',
eventType: 'OrderPlaced',
payload: {
customerId: command.customerId,
items: command.items,
shippingAddress: command.shippingAddress,
},
version: 1,
occurredAt: new Date(),
metadata: {},
});
return order;
}
// === コマンドメソッド ===
receivePayment(transactionId: string, amount: number, method: string): void {
if (this._status !== 'PLACED') {
throw new Error('支払い受付は注文確定状態でのみ可能です');
}
this.apply({
eventId: crypto.randomUUID(),
aggregateId: this._id,
aggregateType: 'Order',
eventType: 'PaymentReceived',
payload: { transactionId, amount, method },
version: this._version + 1,
occurredAt: new Date(),
metadata: {},
});
}
ship(trackingId: string, carrier: string): void {
if (this._status !== 'PAID') {
throw new Error('配送は支払い完了状態でのみ可能です');
}
this.apply({
eventId: crypto.randomUUID(),
aggregateId: this._id,
aggregateType: 'Order',
eventType: 'OrderShipped',
payload: {
trackingId,
carrier,
estimatedDelivery: this.calculateEstimatedDelivery(),
},
version: this._version + 1,
occurredAt: new Date(),
metadata: {},
});
}
cancel(reason: string, cancelledBy: string): void {
if (this._status === 'SHIPPED' || this._status === 'DELIVERED') {
throw new Error('配送済み・配達完了の注文はキャンセルできません');
}
this.apply({
eventId: crypto.randomUUID(),
aggregateId: this._id,
aggregateType: 'Order',
eventType: 'OrderCancelled',
payload: { reason, cancelledBy },
version: this._version + 1,
occurredAt: new Date(),
metadata: {},
});
}
// === イベントの適用 ===
private apply(event: DomainEvent): void {
this.applyEvent(event);
this._uncommittedEvents.push(event);
}
// イベントから状態を復元するメソッド
private applyEvent(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced':
this._id = event.aggregateId;
this._status = 'PLACED';
this._customerId = event.payload.customerId as string;
this._items = (event.payload.items as any[]).map(i => ({
productId: i.productId,
quantity: i.quantity,
unitPrice: i.unitPrice,
}));
this._totalAmount = this._items.reduce(
(sum, i) => sum + i.quantity * i.unitPrice, 0
);
break;
case 'PaymentReceived':
this._status = 'PAID';
break;
case 'OrderShipped':
this._status = 'SHIPPED';
this._trackingId = event.payload.trackingId as string;
break;
case 'OrderCancelled':
this._status = 'CANCELLED';
break;
}
this._version = event.version;
}
// === イベント履歴からの復元 ===
static fromEvents(events: DomainEvent[]): Order {
const order = new Order();
for (const event of events) {
order.applyEvent(event);
}
return order;
}
// 未コミットイベントの取得とクリア
getUncommittedEvents(): DomainEvent[] {
return [...this._uncommittedEvents];
}
clearUncommittedEvents(): void {
this._uncommittedEvents = [];
}
private calculateEstimatedDelivery(): string {
const date = new Date();
date.setDate(date.getDate() + 3);
return date.toISOString();
}
// ゲッター
get id(): string { return this._id; }
get status(): OrderStatus { return this._status; }
get version(): number { return this._version; }
}
type OrderStatus = 'PLACED' | 'PAID' | 'SHIPPED' | 'DELIVERED' | 'CANCELLED';
イベントソーシング対応のリポジトリ
class EventSourcedOrderRepository {
constructor(private eventStore: EventStore) {}
async save(order: Order): Promise<void> {
const events = order.getUncommittedEvents();
if (events.length === 0) return;
await this.eventStore.append(events);
order.clearUncommittedEvents();
}
async findById(orderId: string): Promise<Order | null> {
const events = await this.eventStore.getEvents(orderId);
if (events.length === 0) return null;
return Order.fromEvents(events);
}
}
Projections(プロジェクション)と Read Model
プロジェクションとは
プロジェクションは、イベントストリームを消費して特定の目的に最適化された読み取りモデルを構築するプロセスです。
graph LR
ES["Event Store<br/>OrderPlaced<br/>PaymentReceived<br/>OrderShipped<br/>OrderCancelled<br/>..."]
ES --> P1["Projection:<br/>注文一覧"]
ES --> P2["Projection:<br/>売上集計"]
ES --> P3["Projection:<br/>顧客分析"]
ES --> P4["Projection:<br/>在庫追跡"]
P1 --> V1["order_list_view"]
P2 --> V2["daily_sales_view"]
P3 --> V3["customer_analytics_view"]
P4 --> V4["inventory_tracking_view"]
classDef esStyle fill:#4a90d9,stroke:#2c5f8a,color:#fff
classDef projStyle fill:#e8a838,stroke:#b07c1e,color:#fff
classDef viewStyle fill:#5cb85c,stroke:#3d8b3d,color:#fff
class ES esStyle
class P1,P2,P3,P4 projStyle
class V1,V2,V3,V4 viewStyle
※ 1つのイベントストリームから複数の読み取りモデルを生成可能 ※ 新しい要件が生まれたら、新しいプロジェクションを追加するだけ
// 売上集計プロジェクション
class DailySalesProjection {
constructor(private readDb: ReadDatabase) {}
async onOrderPlaced(event: OrderPlacedEvent): Promise<void> {
const date = event.occurredAt.toISOString().split('T')[0];
await this.readDb.upsert('daily_sales', {
date,
orderCount: { $increment: 1 },
totalAmount: { $increment: event.payload.totalAmount },
});
}
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
const originalOrder = await this.readDb.get('order_list_view', event.aggregateId);
if (!originalOrder) return;
const date = originalOrder.orderDate.split('T')[0];
await this.readDb.upsert('daily_sales', {
date,
orderCount: { $increment: -1 },
totalAmount: { $increment: -originalOrder.totalAmount },
cancelCount: { $increment: 1 },
});
}
// イベントを最初から再処理してRead Modelを再構築
async rebuild(): Promise<void> {
await this.readDb.truncate('daily_sales');
const allEvents = await this.eventStore.getAllEvents();
for (const event of allEvents) {
if (event.eventType === 'OrderPlaced') await this.onOrderPlaced(event);
if (event.eventType === 'OrderCancelled') await this.onOrderCancelled(event);
}
}
}
CQRSとイベントソーシングの組み合わせ
全体アーキテクチャ
Command側
┌───────────┐ ┌──────────────┐ ┌──────────────┐
│ POST/PUT │───→│ Command │───→│ Event Store │
│ API │ │ Handler │ │ (Append-Only)│
└───────────┘ └──────────────┘ └──────┬───────┘
│
イベント発行
│
Query側 ▼
┌───────────┐ ┌──────────────┐ ┌──────────────┐
│ GET API │───→│ Query │ │ Projections │
│ │ │ Handler │ │ (非同期更新) │
└───────────┘ └──────┬───────┘ └──────┬───────┘
│ │
▼ ▼
┌──────────────────────────────┐
│ Read Database │
│ (非正規化されたビュー) │
└──────────────────────────────┘
いつ使い分けるか
| パターン | 単独使用 | 組み合わせ |
|---|---|---|
| CQRS のみ | 読み書きの負荷が非対称なシステム | - |
| ES のみ | 監査証跡が重要なシステム | - |
| CQRS + ES | - | 複雑なドメイン + 高い読み取り性能 + 完全な監査証跡 |
CQRSのみを使うケース:
- 読み取りが書き込みより圧倒的に多い(SNSのタイムライン等)
- 読み取りに最適化されたビューが必要
- イベント履歴までは不要
ESのみを使うケース:
- 完全な監査証跡が法的に必要(金融、医療)
- 状態の変化履歴を分析したい
- 読み書きの分離は不要
CQRS + ESを使うケース:
- 上記の両方が求められる場合
- 複雑なドメインモデルと高い読み取り性能の両立
落とし穴と複雑さの警告
よくある落とし穴
| 落とし穴 | 説明 | 対策 |
|---|---|---|
| 過剰適用 | 単純なCRUDにCQRS/ESを適用してしまう | 複雑さが正当化される場面でのみ使用 |
| イベントスキーマの変更 | イベント構造を変更すると過去の再生に影響 | イベントのバージョニング戦略を事前に決定 |
| 結果整合性の誤解 | 書き込み直後に読み取りが反映されていない | UIでの楽観的更新、ポーリング |
| イベント爆発 | 細かすぎるイベントが大量に発生 | 適切な粒度の設計、スナップショット |
| テストの複雑さ | イベント駆動のテストが難しい | Given-When-Thenパターンのテスト |
イベントのバージョニング
// バージョン1のイベント
interface OrderPlacedV1 {
eventType: 'OrderPlaced';
version: 1;
payload: {
customerId: string;
items: Array<{ productId: string; quantity: number }>;
};
}
// バージョン2: 配送先住所を追加
interface OrderPlacedV2 {
eventType: 'OrderPlaced';
version: 2;
payload: {
customerId: string;
items: Array<{ productId: string; quantity: number; unitPrice: number }>;
shippingAddress: string; // 新しいフィールド
};
}
// アップキャスター: V1をV2に変換
function upcastOrderPlaced(event: DomainEvent): DomainEvent {
if (event.eventType === 'OrderPlaced' && event.version === 1) {
return {
...event,
version: 2,
payload: {
...event.payload,
items: (event.payload.items as any[]).map(i => ({
...i,
unitPrice: 0, // デフォルト値
})),
shippingAddress: 'unknown', // デフォルト値
},
};
}
return event;
}
スナップショットによるパフォーマンス最適化
// イベント数が多い場合、毎回全イベントを再生するのは非効率
// スナップショットで途中の状態を保存する
interface Snapshot {
aggregateId: string;
version: number;
state: Record<string, unknown>;
createdAt: Date;
}
class SnapshotRepository {
async save(snapshot: Snapshot): Promise<void> { /* ... */ }
async getLatest(aggregateId: string): Promise<Snapshot | null> { /* ... */ }
}
class EventSourcedOrderRepositoryWithSnapshot {
constructor(
private eventStore: EventStore,
private snapshotRepo: SnapshotRepository,
private snapshotInterval: number = 100 // 100イベントごとにスナップショット
) {}
async findById(orderId: string): Promise<Order | null> {
// 1. 最新のスナップショットを取得
const snapshot = await this.snapshotRepo.getLatest(orderId);
let events: DomainEvent[];
if (snapshot) {
// 2a. スナップショット以降のイベントだけ取得
events = await this.eventStore.getEventsSince(orderId, snapshot.version);
const order = Order.fromSnapshot(snapshot.state);
for (const event of events) {
order.applyEvent(event);
}
return order;
} else {
// 2b. 全イベントから復元
events = await this.eventStore.getEvents(orderId);
if (events.length === 0) return null;
return Order.fromEvents(events);
}
}
async save(order: Order): Promise<void> {
const events = order.getUncommittedEvents();
await this.eventStore.append(events);
// スナップショットの作成判定
if (order.version % this.snapshotInterval === 0) {
await this.snapshotRepo.save({
aggregateId: order.id,
version: order.version,
state: order.toSnapshot(),
createdAt: new Date(),
});
}
order.clearUncommittedEvents();
}
}
CQRSを使わないべきケース
CQRSを避けるべき場面:
- 単純なCRUDアプリケーション
- チームにCQRS/ESの経験がない(学習コストが高い)
- 読み書きの負荷パターンが類似している
- 強い整合性が全ての操作で必要
まずは単純なアプローチから始め、
必要性が明確になった部分にのみCQRSを適用するのが推奨。
まとめ
| ポイント | 内容 |
|---|---|
| CQRS | 読み取りと書き込みを分離し、それぞれに最適化 |
| イベントソーシング | 状態変化をイベントとして永続化、完全な履歴を保持 |
| プロジェクション | イベントストリームから目的別の読み取りモデルを構築 |
| 組み合わせ | CQRS + ESで高い読み取り性能と完全な監査証跡を両立 |
| 結果整合性 | Read Modelの更新は非同期、一時的な不整合が発生 |
| 複雑さ | 過剰適用は避け、複雑さが正当化される場面でのみ使用 |
| スナップショット | イベント数が多い場合のパフォーマンス最適化手段 |
チェックリスト
- CQRSの目的(読み書きの分離と最適化)を説明できる
- イベントソーシングの概念(状態ではなくイベントを保存)を理解した
- プロジェクションの役割を説明できる
- CQRSとESをそれぞれ単独で使う場面を区別できる
- イベントのバージョニング問題を理解した
- 過剰適用のリスクを認識している
次のステップへ
次は「演習:アーキテクチャ選択肢を評価しよう」に取り組みます。Step 3で学んだ4つのアーキテクチャスタイルの知識を活用し、具体的なシナリオに対して最適なアーキテクチャを選定しましょう。
推定読了時間: 30分