LESSON 40分

ストーリー

佐藤CTO
DBへの書き込みとイベント発行を確実に両方成功させる。これが分散システムで最も難しい問題の一つだ
佐藤CTO
Outbox パターンはこの問題をエレガントに解決する

問題: Dual Write

// ❌ 危険: DB書き込みとイベント発行が非原子的
async createOrder(data: OrderInput): Promise<void> {
  await this.db.insert(orders).values(data);     // ① 成功
  await this.kafka.publish('order.created', data); // ② ここで失敗したら?
  // → DBには注文あるがイベントは発行されていない = データ不整合
}
シナリオDBKafka結果
両方成功正常
DB成功、Kafka失敗不整合(イベント欠落)
DB失敗-正常(何も起きない)

Outbox パターンの仕組み

graph TD
    subgraph OS["Order Service"]
        subgraph TX["トランザクション"]
            W1["1. orders テーブルに INSERT"]
            W2["2. outbox テーブルに INSERT"]
            W1 --> W2
        end
        RELAY["Outbox Relay(CDC)<br/>outbox → Kafka に転送"]
    end
    TX --> RELAY
    RELAY --> KAFKA["Kafka"]

    classDef tx fill:#d4edda,stroke:#28a745,color:#333
    classDef relay fill:#fff3cd,stroke:#f0ad4e,color:#333
    classDef kafka fill:#e8f4fd,stroke:#2196f3,color:#333
    class TX tx
    class RELAY relay
    class KAFKA kafka

実装

-- Outbox テーブル
CREATE TABLE outbox (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  aggregate_type VARCHAR(255) NOT NULL,  -- 'Order'
  aggregate_id VARCHAR(255) NOT NULL,    -- 'ord-789'
  event_type VARCHAR(255) NOT NULL,      -- 'order.created'
  payload JSONB NOT NULL,
  created_at TIMESTAMP DEFAULT NOW(),
  published_at TIMESTAMP NULL           -- 発行済みフラグ
);
class OrderService {
  async createOrder(data: OrderInput): Promise<Order> {
    return await this.db.transaction(async (tx) => {
      // 1. ビジネスデータを保存
      const order = await tx.insert(orders).values(data).returning();

      // 2. 同一トランザクションで Outbox に書き込み
      await tx.insert(outbox).values({
        aggregateType: 'Order',
        aggregateId: order.id,
        eventType: 'order.created',
        payload: {
          orderId: order.id,
          customerId: data.customerId,
          items: data.items,
          totalAmount: data.totalAmount,
        },
      });

      return order;
    });
    // トランザクション成功 = ビジネスデータとイベントが原子的に保存
  }
}

CDC(Change Data Capture)による転送

Debezium + Kafka Connect

PostgreSQL WAL → Debezium Connector → Kafka Topic
  (変更ログ)      (CDC監視)           (イベント配信)
# Debezium Connector 設定
connector.class: io.debezium.connector.postgresql.PostgresConnector
database.hostname: order-db
database.dbname: orders
table.include.list: public.outbox
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.field.event.id: id
transforms.outbox.table.field.event.key: aggregate_id
transforms.outbox.table.field.event.type: event_type
transforms.outbox.table.field.event.payload: payload
transforms.outbox.route.topic.replacement: events.${routedByValue}

Polling Publisher(CDC不使用の場合)

// 定期的にOutboxテーブルをポーリングして発行
class OutboxPublisher {
  async publishPending(): Promise<void> {
    const pending = await this.db
      .select()
      .from(outbox)
      .where(isNull(outbox.publishedAt))
      .orderBy(outbox.createdAt)
      .limit(100);

    for (const event of pending) {
      await this.kafka.publish(event.eventType, event.payload);
      await this.db
        .update(outbox)
        .set({ publishedAt: new Date() })
        .where(eq(outbox.id, event.id));
    }
  }
}

// 5秒ごとにポーリング
setInterval(() => publisher.publishPending(), 5000);

Inbox パターン(Consumer側)

Consumer側でも冪等性を保証する Inbox パターン:

class InboxHandler {
  async handle(event: CloudEvent): Promise<void> {
    await this.db.transaction(async (tx) => {
      // 1. 処理済みチェック
      const exists = await tx.select().from(inbox)
        .where(eq(inbox.eventId, event.id));
      if (exists.length > 0) return; // 冪等

      // 2. ビジネスロジック実行
      await this.processEvent(tx, event);

      // 3. 処理済み記録
      await tx.insert(inbox).values({
        eventId: event.id,
        eventType: event.type,
        processedAt: new Date(),
      });
    });
  }
}

まとめ

ポイント内容
Dual WriteDB + MQ の非原子的書き込みは危険
Outbox同一トランザクションでイベントを保存
CDCDebezium で Outbox → Kafka を自動転送
InboxConsumer 側の冪等性を保証

チェックリスト

  • Dual Write の問題を説明できる
  • Outbox パターンの仕組みを理解した
  • CDC(Debezium)の役割を理解した
  • Inbox パターンの必要性を理解した

次のステップへ

次は演習で分散トランザクションパターンを実践します。


推定読了時間: 40分