ストーリー
問題: Dual Write
// ❌ 危険: DB書き込みとイベント発行が非原子的
async createOrder(data: OrderInput): Promise<void> {
await this.db.insert(orders).values(data); // ① 成功
await this.kafka.publish('order.created', data); // ② ここで失敗したら?
// → DBには注文あるがイベントは発行されていない = データ不整合
}
| シナリオ | DB | Kafka | 結果 |
|---|---|---|---|
| 両方成功 | ✅ | ✅ | 正常 |
| DB成功、Kafka失敗 | ✅ | ❌ | 不整合(イベント欠落) |
| DB失敗 | ❌ | - | 正常(何も起きない) |
Outbox パターンの仕組み
graph TD
subgraph OS["Order Service"]
subgraph TX["トランザクション"]
W1["1. orders テーブルに INSERT"]
W2["2. outbox テーブルに INSERT"]
W1 --> W2
end
RELAY["Outbox Relay(CDC)<br/>outbox → Kafka に転送"]
end
TX --> RELAY
RELAY --> KAFKA["Kafka"]
classDef tx fill:#d4edda,stroke:#28a745,color:#333
classDef relay fill:#fff3cd,stroke:#f0ad4e,color:#333
classDef kafka fill:#e8f4fd,stroke:#2196f3,color:#333
class TX tx
class RELAY relay
class KAFKA kafka
実装
-- Outbox テーブル
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL, -- 'Order'
aggregate_id VARCHAR(255) NOT NULL, -- 'ord-789'
event_type VARCHAR(255) NOT NULL, -- 'order.created'
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
published_at TIMESTAMP NULL -- 発行済みフラグ
);
class OrderService {
async createOrder(data: OrderInput): Promise<Order> {
return await this.db.transaction(async (tx) => {
// 1. ビジネスデータを保存
const order = await tx.insert(orders).values(data).returning();
// 2. 同一トランザクションで Outbox に書き込み
await tx.insert(outbox).values({
aggregateType: 'Order',
aggregateId: order.id,
eventType: 'order.created',
payload: {
orderId: order.id,
customerId: data.customerId,
items: data.items,
totalAmount: data.totalAmount,
},
});
return order;
});
// トランザクション成功 = ビジネスデータとイベントが原子的に保存
}
}
CDC(Change Data Capture)による転送
Debezium + Kafka Connect
PostgreSQL WAL → Debezium Connector → Kafka Topic
(変更ログ) (CDC監視) (イベント配信)
# Debezium Connector 設定
connector.class: io.debezium.connector.postgresql.PostgresConnector
database.hostname: order-db
database.dbname: orders
table.include.list: public.outbox
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.field.event.id: id
transforms.outbox.table.field.event.key: aggregate_id
transforms.outbox.table.field.event.type: event_type
transforms.outbox.table.field.event.payload: payload
transforms.outbox.route.topic.replacement: events.${routedByValue}
Polling Publisher(CDC不使用の場合)
// 定期的にOutboxテーブルをポーリングして発行
class OutboxPublisher {
async publishPending(): Promise<void> {
const pending = await this.db
.select()
.from(outbox)
.where(isNull(outbox.publishedAt))
.orderBy(outbox.createdAt)
.limit(100);
for (const event of pending) {
await this.kafka.publish(event.eventType, event.payload);
await this.db
.update(outbox)
.set({ publishedAt: new Date() })
.where(eq(outbox.id, event.id));
}
}
}
// 5秒ごとにポーリング
setInterval(() => publisher.publishPending(), 5000);
Inbox パターン(Consumer側)
Consumer側でも冪等性を保証する Inbox パターン:
class InboxHandler {
async handle(event: CloudEvent): Promise<void> {
await this.db.transaction(async (tx) => {
// 1. 処理済みチェック
const exists = await tx.select().from(inbox)
.where(eq(inbox.eventId, event.id));
if (exists.length > 0) return; // 冪等
// 2. ビジネスロジック実行
await this.processEvent(tx, event);
// 3. 処理済み記録
await tx.insert(inbox).values({
eventId: event.id,
eventType: event.type,
processedAt: new Date(),
});
});
}
}
まとめ
| ポイント | 内容 |
|---|---|
| Dual Write | DB + MQ の非原子的書き込みは危険 |
| Outbox | 同一トランザクションでイベントを保存 |
| CDC | Debezium で Outbox → Kafka を自動転送 |
| Inbox | Consumer 側の冪等性を保証 |
チェックリスト
- Dual Write の問題を説明できる
- Outbox パターンの仕組みを理解した
- CDC(Debezium)の役割を理解した
- Inbox パターンの必要性を理解した
次のステップへ
次は演習で分散トランザクションパターンを実践します。
推定読了時間: 40分