LESSON 40分

ストーリー

あなた
注文は保存されたのに、イベントが発行されなかった。決済サービスが注文を知らないまま…
高橋アーキテクト
“DBへの書き込み”と”イベントの発行”は、2つの異なるシステムへの書き込みだ。これを確実に両方成功させるのがOutboxパターンだ

二重書き込み問題

// 問題: DBとメッセージブローカーへの二重書き込み
async function createOrder(data: OrderData): Promise<Order> {
  // Step 1: DBに保存
  const order = await orderRepo.save(data);

  // Step 2: イベントを発行
  await eventBus.publish("order.created", { orderId: order.id });
  // ↑ ここでクラッシュしたら? → DBには保存済みだがイベント未発行
  //   → 決済サービスは注文の存在を知らない

  return order;
}

// 逆パターンも問題
async function createOrderReversed(data: OrderData): Promise<Order> {
  // Step 1: イベントを先に発行
  await eventBus.publish("order.created", { orderId: data.id });
  // ↑ ここでクラッシュしたら? → イベントは発行されたがDB未保存
  //   → 決済サービスは存在しない注文の決済を試みる

  // Step 2: DBに保存
  const order = await orderRepo.save(data);
  return order;
}

Outboxパターン

イベントをDBの「Outboxテーブル」に一緒に書き込み、別プロセスでメッセージブローカーに転送します。

┌────────────────────────────────────┐
│ Order Service                       │
│                                    │
│  1. ビジネスロジック実行              │
│     │                              │
│  2. 同一トランザクションで:           │
│     ├─ orders テーブルに INSERT      │
│     └─ outbox テーブルに INSERT      │
│                                    │
│  3. [Outbox Relay] (別プロセス)     │
│     └─ outbox から読み取り           │
│        → Message Broker に発行      │
│        → 発行済みレコードを削除       │
│                                    │
└────────────────────────────────────┘

テーブル設計

-- Outboxテーブル
CREATE TABLE outbox (
  id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  aggregate_type VARCHAR(255) NOT NULL,  -- 'Order'
  aggregate_id   VARCHAR(255) NOT NULL,  -- 'ord-123'
  event_type     VARCHAR(255) NOT NULL,  -- 'order.created'
  payload        JSONB NOT NULL,          -- イベントデータ
  created_at     TIMESTAMP NOT NULL DEFAULT NOW(),
  published_at   TIMESTAMP NULL           -- 発行済みフラグ
);

CREATE INDEX idx_outbox_unpublished ON outbox (created_at)
  WHERE published_at IS NULL;

実装

// Outboxパターンの実装
class OrderService {
  async createOrder(data: OrderData): Promise<Order> {
    // 同一トランザクション内でDBとOutboxに書き込み
    return await this.db.transaction(async (tx) => {
      // 1. 注文を保存
      const order = await tx.insert("orders", {
        id: generateId(),
        ...data,
        status: "PENDING",
      });

      // 2. Outboxにイベントを書き込み
      await tx.insert("outbox", {
        aggregateType: "Order",
        aggregateId: order.id,
        eventType: "order.created",
        payload: JSON.stringify({
          orderId: order.id,
          userId: data.userId,
          items: data.items,
          totalAmount: data.totalAmount,
        }),
      });

      return order;
      // トランザクション成功 → 両方確実に保存される
      // トランザクション失敗 → 両方確実にロールバック
    });
  }
}

Outbox Relay(転送プロセス)

Outboxテーブルからメッセージブローカーへ転送する2つの方式があります。

方式1: ポーリング

// 定期的にOutboxテーブルをチェック
class OutboxPoller {
  private readonly pollInterval = 1000; // 1秒ごと

  async start(): Promise<void> {
    while (true) {
      await this.processOutbox();
      await sleep(this.pollInterval);
    }
  }

  private async processOutbox(): Promise<void> {
    const events = await this.db.query(
      `SELECT * FROM outbox
       WHERE published_at IS NULL
       ORDER BY created_at
       LIMIT 100
       FOR UPDATE SKIP LOCKED` // 他のPollerと競合しない
    );

    for (const event of events) {
      try {
        await this.messageBroker.publish(event.eventType, event.payload);
        await this.db.query(
          `UPDATE outbox SET published_at = NOW() WHERE id = $1`,
          [event.id]
        );
      } catch (error) {
        console.error(`Failed to publish event ${event.id}:`, error);
        // 次のポーリングでリトライ
      }
    }
  }
}

方式2: Change Data Capture(CDC)

CDC(Change Data Capture):
  DBの変更ログ(WAL/binlog)を監視して、
  変更をリアルタイムでメッセージブローカーに転送

  [PostgreSQL WAL] → [Debezium] → [Kafka]

  Debezium がoutboxテーブルの INSERT を検出
  → 自動的にKafkaにイベントを発行

方式比較

観点ポーリングCDC
レイテンシポーリング間隔に依存(秒単位)ほぼリアルタイム(ミリ秒)
DB負荷定期クエリの負荷WAL読み取り(低負荷)
実装複雑度低い高い(Debezium等の導入)
信頼性高い高い
運用シンプルDebeziumの運用が必要

DynamoDB Streamsを使ったOutbox

AWS DynamoDBを使う場合、DynamoDB StreamsがCDCの役割を果たします。

// DynamoDB Streams + Lambda
// DynamoDBへの書き込みがStreamに流れ、Lambdaが処理

// Lambda Handler
export const handler = async (event: DynamoDBStreamEvent): Promise<void> => {
  for (const record of event.Records) {
    if (record.eventName === "INSERT" && record.dynamodb?.NewImage) {
      const item = unmarshall(record.dynamodb.NewImage);

      if (item.entityType === "OUTBOX_EVENT") {
        // EventBridgeにイベントを発行
        await eventBridge.putEvents({
          Entries: [{
            Source: item.source,
            DetailType: item.eventType,
            Detail: JSON.stringify(item.payload),
          }],
        });
      }
    }
  }
};

Transactional Outboxの注意点

// 1. 順序保証: created_at順に発行する
// 2. 冪等性: Consumer側でも重複排除が必要
//    (Outbox Relayが同じイベントを2回送る可能性)
// 3. クリーンアップ: 古い発行済みレコードを定期削除

class OutboxCleaner {
  async cleanup(): Promise<void> {
    await this.db.query(
      `DELETE FROM outbox
       WHERE published_at IS NOT NULL
       AND published_at < NOW() - INTERVAL '7 days'`
    );
  }
}

まとめ

ポイント内容
二重書き込み問題DB保存とイベント発行の不整合
Outboxパターン同一トランザクションでOutboxテーブルに書き込み
転送方式ポーリング(シンプル)、CDC(リアルタイム)
AWS対応DynamoDB Streams + Lambda
注意点順序保証、Consumer側の冪等性

チェックリスト

  • 二重書き込み問題を説明できる
  • Outboxパターンの仕組みを説明できる
  • ポーリングとCDCの違いを理解した
  • DynamoDB StreamsによるOutboxを理解した

次のステップへ

次は演習で、Sagaパターンを実際に設計・実装してみましょう。補償トランザクションとOutboxパターンを組み合わせた実践的な設計に挑戦します。


推定読了時間: 40分