ストーリー
イベントソーシングの基本概念
CRUD vs イベントソーシング
| 観点 | CRUD | イベントソーシング |
|---|---|---|
| データ保存 | 現在の状態のみ | 全イベントの履歴 |
| 更新 | UPDATE文で上書き | 新しいイベントを追加 |
| 監査証跡 | 別途実装が必要 | 自然に実現 |
| デバッグ | 「なぜこの状態か」が不明 | 再生して原因特定可能 |
| ストレージ | 効率的 | イベント蓄積でサイズ増大 |
イベントストアの設計
// イベントの基本インターフェース
interface DomainEvent {
eventId: string;
aggregateId: string;
aggregateType: string;
eventType: string;
version: number; // 楽観的ロック用
payload: Record<string, unknown>;
metadata: {
userId: string;
timestamp: Date;
correlationId: string;
};
}
// 注文に関するドメインイベント
type OrderEvent =
| { eventType: 'OrderCreated'; payload: { customerId: string; items: OrderItem[] } }
| { eventType: 'OrderItemAdded'; payload: { productId: string; quantity: number; price: number } }
| { eventType: 'OrderConfirmed'; payload: { confirmedAt: Date } }
| { eventType: 'PaymentReceived'; payload: { paymentId: string; amount: number } }
| { eventType: 'OrderShipped'; payload: { trackingNumber: string; carrier: string } }
| { eventType: 'OrderCancelled'; payload: { reason: string; cancelledBy: string } };
-- イベントストアのテーブル設計
CREATE TABLE event_store (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
version INT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- 同一集約内でのバージョンの一意性(楽観的ロック)
UNIQUE (aggregate_id, version)
);
-- インデックス
CREATE INDEX idx_event_store_aggregate
ON event_store (aggregate_id, version ASC);
CREATE INDEX idx_event_store_type
ON event_store (event_type, created_at);
CREATE INDEX idx_event_store_correlation
ON event_store ((metadata->>'correlationId'));
イベントソーシングの実装
Aggregate のリハイドレーション
class OrderAggregate {
private id: string;
private status: OrderStatus = OrderStatus.DRAFT;
private items: Map<string, OrderItem> = new Map();
private totalAmount: number = 0;
private version: number = 0;
// イベントから状態を復元(リハイドレーション)
static fromEvents(events: DomainEvent[]): OrderAggregate {
const order = new OrderAggregate();
for (const event of events) {
order.apply(event, false); // isNew = false
}
return order;
}
// コマンド: ビジネスルールを検証してイベントを生成
confirm(): DomainEvent {
if (this.status !== OrderStatus.DRAFT) {
throw new Error('確定済みの注文は再確定できません');
}
if (this.items.size === 0) {
throw new Error('空の注文は確定できません');
}
const event: DomainEvent = {
eventId: crypto.randomUUID(),
aggregateId: this.id,
aggregateType: 'Order',
eventType: 'OrderConfirmed',
version: this.version + 1,
payload: { confirmedAt: new Date() },
metadata: { userId: '', timestamp: new Date(), correlationId: '' },
};
this.apply(event, true);
return event;
}
// イベントを適用して状態を変更
private apply(event: DomainEvent, isNew: boolean): void {
switch (event.eventType) {
case 'OrderCreated':
this.id = event.aggregateId;
this.status = OrderStatus.DRAFT;
break;
case 'OrderItemAdded':
const item = event.payload as { productId: string; quantity: number; price: number };
this.items.set(item.productId, item);
this.totalAmount += item.quantity * item.price;
break;
case 'OrderConfirmed':
this.status = OrderStatus.CONFIRMED;
break;
case 'OrderCancelled':
this.status = OrderStatus.CANCELLED;
break;
}
this.version = event.version;
}
}
スナップショットによる最適化
// イベント数が増えるとリハイドレーションが遅くなる
// → N件ごとにスナップショットを保存
interface Snapshot {
aggregateId: string;
aggregateType: string;
version: number;
state: Record<string, unknown>;
createdAt: Date;
}
class EventStore {
private readonly SNAPSHOT_INTERVAL = 100;
async loadAggregate(aggregateId: string): Promise<OrderAggregate> {
// 1. 最新のスナップショットを取得
const snapshot = await this.getLatestSnapshot(aggregateId);
// 2. スナップショット以降のイベントのみ取得
const fromVersion = snapshot ? snapshot.version + 1 : 0;
const events = await this.getEvents(aggregateId, fromVersion);
// 3. スナップショット + 差分イベントで復元
const aggregate = snapshot
? OrderAggregate.fromSnapshot(snapshot, events)
: OrderAggregate.fromEvents(events);
return aggregate;
}
async saveEvents(
aggregateId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
// 楽観的ロック: 期待するバージョンと一致しなければ競合エラー
await this.appendEvents(aggregateId, events, expectedVersion);
// スナップショット条件チェック
const latestVersion = events[events.length - 1].version;
if (latestVersion % this.SNAPSHOT_INTERVAL === 0) {
const aggregate = await this.loadAggregate(aggregateId);
await this.saveSnapshot(aggregate);
}
}
}
CQRSパターン
コマンドとクエリの分離
graph LR
CMD["Command<br/>(書き込み)"] --> WM["Write Model<br/>(Domain)"] --> ES["Event Store<br/>(真実の源泉)"]
ES -- "Event<br/>Projection" --> RDB["Read DB<br/>(非正規化)"]
RDB --> RM["Read Model<br/>(Optimized)"] --> QRY["Query<br/>(読み取り)"]
classDef write fill:#fee2e2,stroke:#ef4444
classDef read fill:#dbeafe,stroke:#3b82f6
classDef store fill:#fef3c7,stroke:#f59e0b
class CMD,WM write
class QRY,RM read
class ES,RDB store
Projection(読み取りモデル構築)
// イベントを購読して読み取り用ビューを構築
class OrderSummaryProjection {
constructor(private readonly readDb: ReadDatabase) {}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.readDb.insert('order_summaries', {
orderId: event.aggregateId,
customerId: event.payload.customerId,
status: 'draft',
itemCount: 0,
totalAmount: 0,
createdAt: event.metadata.timestamp,
});
break;
case 'OrderItemAdded':
await this.readDb.increment('order_summaries', event.aggregateId, {
itemCount: 1,
totalAmount: event.payload.price * event.payload.quantity,
});
break;
case 'OrderConfirmed':
await this.readDb.update('order_summaries', event.aggregateId, {
status: 'confirmed',
confirmedAt: event.payload.confirmedAt,
});
break;
case 'OrderCancelled':
await this.readDb.update('order_summaries', event.aggregateId, {
status: 'cancelled',
cancelledAt: event.metadata.timestamp,
cancellationReason: event.payload.reason,
});
break;
}
}
}
// 複数の読み取りモデルを並列構築
class CustomerOrderHistoryProjection {
async handle(event: DomainEvent): Promise<void> {
if (event.eventType === 'OrderConfirmed') {
// 顧客ごとの注文履歴ビュー(別テーブル/別DB)
await this.readDb.appendToList(
`customer_orders:${event.payload.customerId}`,
{
orderId: event.aggregateId,
confirmedAt: event.payload.confirmedAt,
totalAmount: event.payload.totalAmount,
}
);
}
}
}
イベントソーシング + CQRSの注意点
結果整合性(Eventual Consistency)への対応
CQRSでは書き込みと読み取りの間に遅延が発生します。
// 問題: 注文直後にリダイレクトすると、まだProjectionに反映されていない
// → Read-your-writes consistency を実装
class OrderController {
async createOrder(req: Request, res: Response): Promise<void> {
const orderId = await this.commandBus.execute(new CreateOrderCommand(req.body));
// 方法1: Projectionの反映を待つ(最大3秒)
await this.waitForProjection(orderId, 3000);
// 方法2: コマンドの結果を直接返す(Projectionを経由しない)
res.json({
orderId,
status: 'created',
message: '注文が作成されました',
});
}
// 方法3: バージョンベースのポーリング
async getOrder(orderId: string, afterVersion?: number): Promise<Order> {
const order = await this.readModel.getOrder(orderId);
if (afterVersion && order.version < afterVersion) {
// まだ反映されていない → リトライまたはWrite Modelから直接取得
return this.writeModel.getOrder(orderId);
}
return order;
}
}
イベントスキーマの進化
イベントは不変であるため、スキーマ変更は慎重に行う必要があります。
// アップキャスティング: 古いイベントを新しい形式に変換
class EventUpcaster {
upcast(event: DomainEvent): DomainEvent {
switch (event.eventType) {
case 'OrderCreated':
// v1 → v2: currency フィールドを追加
if (!event.payload.currency) {
return {
...event,
payload: {
...event.payload,
currency: 'JPY', // デフォルト値
},
};
}
return event;
default:
return event;
}
}
}
まとめ
| ポイント | 内容 |
|---|---|
| イベントソーシング | 状態ではなくイベントの履歴を保存する |
| リハイドレーション | イベントを再生して現在の状態を復元 |
| スナップショット | N件ごとに状態を保存して復元を高速化 |
| CQRS | コマンドとクエリを分離し、それぞれ最適化 |
| Projection | イベントから読み取り用ビューを非同期構築 |
チェックリスト
- CRUD とイベントソーシングの違いを説明できる
- イベントストアの設計(スキーマ、楽観的ロック)を理解した
- CQRSの書き込みモデルと読み取りモデルの分離を理解した
- 結果整合性への対処方法を理解した
次のステップへ
次はデータメッシュの考え方を学び、ドメイン駆動のデータ所有権と自己充足型データプラットフォームを理解します。
推定読了時間: 30分