ストーリー
二重書き込み問題
// 問題: DBとメッセージブローカーへの二重書き込み
async function createOrder(data: OrderData): Promise<Order> {
// Step 1: DBに保存
const order = await orderRepo.save(data);
// Step 2: イベントを発行
await eventBus.publish("order.created", { orderId: order.id });
// ↑ ここでクラッシュしたら? → DBには保存済みだがイベント未発行
// → 決済サービスは注文の存在を知らない
return order;
}
// 逆パターンも問題
async function createOrderReversed(data: OrderData): Promise<Order> {
// Step 1: イベントを先に発行
await eventBus.publish("order.created", { orderId: data.id });
// ↑ ここでクラッシュしたら? → イベントは発行されたがDB未保存
// → 決済サービスは存在しない注文の決済を試みる
// Step 2: DBに保存
const order = await orderRepo.save(data);
return order;
}
Outboxパターン
イベントをDBの「Outboxテーブル」に一緒に書き込み、別プロセスでメッセージブローカーに転送します。
┌────────────────────────────────────┐
│ Order Service │
│ │
│ 1. ビジネスロジック実行 │
│ │ │
│ 2. 同一トランザクションで: │
│ ├─ orders テーブルに INSERT │
│ └─ outbox テーブルに INSERT │
│ │
│ 3. [Outbox Relay] (別プロセス) │
│ └─ outbox から読み取り │
│ → Message Broker に発行 │
│ → 発行済みレコードを削除 │
│ │
└────────────────────────────────────┘
テーブル設計
-- Outboxテーブル
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL, -- 'Order'
aggregate_id VARCHAR(255) NOT NULL, -- 'ord-123'
event_type VARCHAR(255) NOT NULL, -- 'order.created'
payload JSONB NOT NULL, -- イベントデータ
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
published_at TIMESTAMP NULL -- 発行済みフラグ
);
CREATE INDEX idx_outbox_unpublished ON outbox (created_at)
WHERE published_at IS NULL;
実装
// Outboxパターンの実装
class OrderService {
async createOrder(data: OrderData): Promise<Order> {
// 同一トランザクション内でDBとOutboxに書き込み
return await this.db.transaction(async (tx) => {
// 1. 注文を保存
const order = await tx.insert("orders", {
id: generateId(),
...data,
status: "PENDING",
});
// 2. Outboxにイベントを書き込み
await tx.insert("outbox", {
aggregateType: "Order",
aggregateId: order.id,
eventType: "order.created",
payload: JSON.stringify({
orderId: order.id,
userId: data.userId,
items: data.items,
totalAmount: data.totalAmount,
}),
});
return order;
// トランザクション成功 → 両方確実に保存される
// トランザクション失敗 → 両方確実にロールバック
});
}
}
Outbox Relay(転送プロセス)
Outboxテーブルからメッセージブローカーへ転送する2つの方式があります。
方式1: ポーリング
// 定期的にOutboxテーブルをチェック
class OutboxPoller {
private readonly pollInterval = 1000; // 1秒ごと
async start(): Promise<void> {
while (true) {
await this.processOutbox();
await sleep(this.pollInterval);
}
}
private async processOutbox(): Promise<void> {
const events = await this.db.query(
`SELECT * FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED` // 他のPollerと競合しない
);
for (const event of events) {
try {
await this.messageBroker.publish(event.eventType, event.payload);
await this.db.query(
`UPDATE outbox SET published_at = NOW() WHERE id = $1`,
[event.id]
);
} catch (error) {
console.error(`Failed to publish event ${event.id}:`, error);
// 次のポーリングでリトライ
}
}
}
}
方式2: Change Data Capture(CDC)
CDC(Change Data Capture):
DBの変更ログ(WAL/binlog)を監視して、
変更をリアルタイムでメッセージブローカーに転送
[PostgreSQL WAL] → [Debezium] → [Kafka]
Debezium がoutboxテーブルの INSERT を検出
→ 自動的にKafkaにイベントを発行
方式比較
| 観点 | ポーリング | CDC |
|---|---|---|
| レイテンシ | ポーリング間隔に依存(秒単位) | ほぼリアルタイム(ミリ秒) |
| DB負荷 | 定期クエリの負荷 | WAL読み取り(低負荷) |
| 実装複雑度 | 低い | 高い(Debezium等の導入) |
| 信頼性 | 高い | 高い |
| 運用 | シンプル | Debeziumの運用が必要 |
DynamoDB Streamsを使ったOutbox
AWS DynamoDBを使う場合、DynamoDB StreamsがCDCの役割を果たします。
// DynamoDB Streams + Lambda
// DynamoDBへの書き込みがStreamに流れ、Lambdaが処理
// Lambda Handler
export const handler = async (event: DynamoDBStreamEvent): Promise<void> => {
for (const record of event.Records) {
if (record.eventName === "INSERT" && record.dynamodb?.NewImage) {
const item = unmarshall(record.dynamodb.NewImage);
if (item.entityType === "OUTBOX_EVENT") {
// EventBridgeにイベントを発行
await eventBridge.putEvents({
Entries: [{
Source: item.source,
DetailType: item.eventType,
Detail: JSON.stringify(item.payload),
}],
});
}
}
}
};
Transactional Outboxの注意点
// 1. 順序保証: created_at順に発行する
// 2. 冪等性: Consumer側でも重複排除が必要
// (Outbox Relayが同じイベントを2回送る可能性)
// 3. クリーンアップ: 古い発行済みレコードを定期削除
class OutboxCleaner {
async cleanup(): Promise<void> {
await this.db.query(
`DELETE FROM outbox
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days'`
);
}
}
まとめ
| ポイント | 内容 |
|---|---|
| 二重書き込み問題 | DB保存とイベント発行の不整合 |
| Outboxパターン | 同一トランザクションでOutboxテーブルに書き込み |
| 転送方式 | ポーリング(シンプル)、CDC(リアルタイム) |
| AWS対応 | DynamoDB Streams + Lambda |
| 注意点 | 順序保証、Consumer側の冪等性 |
チェックリスト
- 二重書き込み問題を説明できる
- Outboxパターンの仕組みを説明できる
- ポーリングとCDCの違いを理解した
- DynamoDB StreamsによるOutboxを理解した
次のステップへ
次は演習で、Sagaパターンを実際に設計・実装してみましょう。補償トランザクションとOutboxパターンを組み合わせた実践的な設計に挑戦します。
推定読了時間: 40分