ストーリー
「この注文、なぜキャンセルされたんだ? ログを見てもステータスが『cancelled』になったことしかわからない」
高橋アーキテクトがデータベースの状態を見て首を振る。
「現在の状態だけを保存する設計だと、『なぜその状態になったか』がわからない。イベントソーシングなら、すべての出来事を記録する。銀行の通帳と同じだ。残高だけでなく、すべての入出金を記録する」
従来の設計 vs イベントソーシング
従来: 現在の状態を保存(State Sourcing)
-- 注文テーブル: 現在の状態のみ
UPDATE orders SET status = 'cancelled', updated_at = NOW() WHERE id = 1;
-- 以前の状態は上書きされて消える
イベントソーシング: 出来事を保存
-- イベントストア: すべての出来事を記録
INSERT INTO order_events (order_id, event_type, payload, occurred_at) VALUES
(1, 'OrderCreated', '{"userId": 100, "items": [...]}', '2024-03-15 10:00:00'),
(1, 'PaymentReceived','{"amount": 5000, "method": "credit"}', '2024-03-15 10:05:00'),
(1, 'ItemShipped', '{"trackingId": "JP123456"}', '2024-03-16 09:00:00'),
(1, 'RefundRequested','{"reason": "商品に傷がある"}', '2024-03-18 14:00:00'),
(1, 'OrderCancelled', '{"refundAmount": 5000}', '2024-03-18 15:00:00');
-- 現在の状態は、イベントを順番に再生して導出する
イベントソーシングの仕組み
イベントストアの設計
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(100) NOT NULL, -- 'Order', 'Account'
aggregate_id VARCHAR(100) NOT NULL, -- エンティティID
event_type VARCHAR(100) NOT NULL, -- 'OrderCreated', 'ItemAdded'
payload JSONB NOT NULL, -- イベントデータ
metadata JSONB, -- ユーザーID、リクエストID等
version INT NOT NULL, -- 楽観的ロック用
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_type, aggregate_id, version)
);
CREATE INDEX idx_events_aggregate ON events(aggregate_type, aggregate_id, version);
TypeScriptでの実装
// イベント型定義
type OrderEvent =
| { type: 'OrderCreated'; data: { userId: number; items: OrderItem[] } }
| { type: 'PaymentReceived'; data: { amount: number; method: string } }
| { type: 'ItemShipped'; data: { trackingId: string } }
| { type: 'OrderCancelled'; data: { reason: string; refundAmount: number } };
// 集約(Aggregate): イベントから状態を再構築
class OrderAggregate {
id: string;
status: string = 'unknown';
items: OrderItem[] = [];
totalPaid: number = 0;
// イベントを適用して状態を更新
apply(event: OrderEvent): void {
switch (event.type) {
case 'OrderCreated':
this.status = 'pending';
this.items = event.data.items;
break;
case 'PaymentReceived':
this.status = 'paid';
this.totalPaid += event.data.amount;
break;
case 'ItemShipped':
this.status = 'shipped';
break;
case 'OrderCancelled':
this.status = 'cancelled';
break;
}
}
// イベント列から状態を再構築
static fromEvents(id: string, events: OrderEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
aggregate.id = id;
for (const event of events) {
aggregate.apply(event);
}
return aggregate;
}
}
// イベントストア
class EventStore {
async appendEvent(
aggregateType: string,
aggregateId: string,
event: OrderEvent,
expectedVersion: number
): Promise<void> {
await db.query(`
INSERT INTO events (aggregate_type, aggregate_id, event_type, payload, version)
VALUES ($1, $2, $3, $4, $5)
`, [aggregateType, aggregateId, event.type, JSON.stringify(event.data), expectedVersion + 1]);
}
async getEvents(aggregateType: string, aggregateId: string): Promise<OrderEvent[]> {
const rows = await db.query(`
SELECT event_type, payload FROM events
WHERE aggregate_type = $1 AND aggregate_id = $2
ORDER BY version ASC
`, [aggregateType, aggregateId]);
return rows.map(row => ({
type: row.event_type,
data: row.payload,
}));
}
}
スナップショット
イベント数が増えると、毎回全イベントを再生するのは非効率。定期的にスナップショットを保存する。
class SnapshotStore {
async saveSnapshot(aggregate: OrderAggregate, version: number): Promise<void> {
await db.query(`
INSERT INTO snapshots (aggregate_type, aggregate_id, state, version)
VALUES ('Order', $1, $2, $3)
ON CONFLICT (aggregate_type, aggregate_id)
DO UPDATE SET state = $2, version = $3
`, [aggregate.id, JSON.stringify(aggregate), version]);
}
async loadAggregate(aggregateId: string): Promise<OrderAggregate> {
// スナップショットから復元
const snapshot = await this.getLatestSnapshot('Order', aggregateId);
// スナップショット以降のイベントのみ再生
const events = await this.eventStore.getEventsSince(
'Order',
aggregateId,
snapshot?.version ?? 0
);
const aggregate = snapshot
? OrderAggregate.fromSnapshot(snapshot)
: new OrderAggregate();
for (const event of events) {
aggregate.apply(event);
}
return aggregate;
}
}
CQRS(Command Query Responsibility Segregation)
コマンド(書き込み)とクエリ(読み取り)で異なるモデルを使う。
Command(書き込み): Query(読み取り):
↓ ↓
[Write Model] [Read Model]
↓ ↑
[Event Store] ──── 投影 ────→ [Read Database]
(Projection)
Projectionの実装
// イベントからRead Modelを構築(Projection)
class OrderProjection {
async handleEvent(event: StoredEvent): Promise<void> {
switch (event.event_type) {
case 'OrderCreated':
await this.readDb.query(`
INSERT INTO order_summaries (id, user_id, status, item_count, created_at)
VALUES ($1, $2, 'pending', $3, $4)
`, [event.aggregate_id, event.payload.userId,
event.payload.items.length, event.occurred_at]);
break;
case 'PaymentReceived':
await this.readDb.query(`
UPDATE order_summaries SET status = 'paid', paid_amount = $1
WHERE id = $2
`, [event.payload.amount, event.aggregate_id]);
break;
case 'OrderCancelled':
await this.readDb.query(`
UPDATE order_summaries SET status = 'cancelled'
WHERE id = $1
`, [event.aggregate_id]);
break;
}
}
}
// Read側: 通常のSQLクエリで高速に読み取り
async function getOrderSummaries(userId: number) {
return await readDb.query(`
SELECT * FROM order_summaries
WHERE user_id = $1
ORDER BY created_at DESC
`, [userId]);
}
イベントソーシングのメリット・デメリット
| メリット | デメリット |
|---|---|
| 完全な監査証跡 | 実装の複雑さ |
| 時間遡行(any point in time) | 学習コスト |
| イベントからの多様なView構築 | 結果整合性(Read Modelの遅延) |
| バグ修正後にデータ再構築可能 | イベントスキーマの進化が困難 |
適用判断
| 適用すべき場面 | 避けるべき場面 |
|---|---|
| 完全な監査証跡が必要 | シンプルなCRUD |
| 複雑なドメインロジック | 小規模なアプリ |
| 多様なRead Modelが必要 | チームがパターンを理解していない |
| 時間遡行が必要 | 強い整合性が常に必要 |
まとめ
| ポイント | 内容 |
|---|---|
| イベントソーシング | 状態ではなくイベント(出来事)を記録 |
| 状態の導出 | イベントを再生して現在の状態を計算 |
| スナップショット | パフォーマンスのために定期的に状態を保存 |
| CQRS | 書き込みと読み取りで別モデル |
| Projection | イベントからRead Modelを構築 |
理解度チェックリスト
- イベントソーシングと従来のCRUDの違いを説明できる
- イベントストアの基本的な設計ができる
- CQRSのCommand側とQuery側の役割を理解している
- イベントソーシングの適用判断ができる
次のステップ
次は演習:整合性を保証する設計をしよう。ここまで学んだトランザクション、ロック、Sagaパターン、イベントソーシングを実際の設計に適用する。
推定読了時間: 40分