LESSON 30分

ストーリー

佐藤CTO
次はサービス間の通信パターンだ
佐藤CTO
“注文が確定したら在庫を減らして、決済を処理して、メールを送る”。この一連の処理をどう設計する?
あなた
普通にAPIを呼び出せばいいのでは?
佐藤CTO
それも1つの方法だ。だが”注文サービスが決済サービスの障害で止まる”のは許容できるか?“メール送信が遅くて注文確定に5秒かかる”のは?リクエスト応答とイベント駆動、それぞれの特徴を理解して使い分けることが重要だ

リクエスト応答パターン

概要

リクエスト応答(Request-Response)は、呼び出し元がサービスにリクエストを送り、レスポンスが返るまで待つ同期通信パターンです。

sequenceDiagram
    participant Order as 注文サービス
    participant Payment as 決済サービス

    Note over Order: 呼び出し元は<br/>レスポンスを待つ
    Order->>Payment: リクエスト
    Note over Payment: 処理完了まで<br/>呼び出し元をブロック
    Payment-->>Order: レスポンス

リクエスト応答の実装

// 注文サービス:同期呼び出し
class OrderService {
  constructor(
    private paymentClient: PaymentClient,
    private inventoryClient: InventoryClient,
    private notificationClient: NotificationClient
  ) {}

  async createOrder(command: CreateOrderCommand): Promise<Order> {
    // 1. 在庫確認(同期)
    const reserved = await this.inventoryClient.reserve(command.items);
    if (!reserved.success) {
      throw new Error('在庫が不足しています');
    }

    // 2. 決済処理(同期)
    const payment = await this.paymentClient.charge({
      amount: command.totalAmount,
      method: command.paymentMethod,
    });
    if (!payment.success) {
      // 在庫予約を取り消す必要がある
      await this.inventoryClient.cancelReservation(reserved.reservationId);
      throw new Error('決済に失敗しました');
    }

    // 3. 注文を保存
    const order = Order.create(command, payment.transactionId);
    await this.orderRepo.save(order);

    // 4. 通知を送信(同期 - メール送信完了まで待つ)
    await this.notificationClient.sendOrderConfirmation(order);

    return order;
  }
}

リクエスト応答の課題

graph LR
    A["注文サービス"] -->|"100ms"| B["在庫サービス"]
    B -->|"200ms"| C["決済サービス"]
    C -->|"500ms"| D["通知サービス"]
    D --> E["合計: 800ms"]

    classDef svcStyle fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af
    classDef totalStyle fill:#fee2e2,stroke:#dc2626,color:#991b1b

    class A,B,C,D svcStyle
    class E totalStyle

問題:

  • 1つのサービスが遅いと全体が遅くなる
  • 1つのサービスが落ちると全体が失敗する
  • 補償トランザクション(在庫予約の取り消し等)が複雑

イベント駆動パターン

概要

イベント駆動アーキテクチャ(Event-Driven Architecture)は、イベントの発行と購読によってサービス間を疎結合にする非同期通信パターンです。

graph LR
    Order["注文サービス"] -->|"OrderCreated"| EB["Event Bus"]
    EB --> Inv["在庫サービス"]
    EB --> Pay["決済サービス"]
    EB --> Notify["通知サービス"]

    classDef orderStyle fill:#4a90d9,stroke:#2c5f8a,color:#fff
    classDef busStyle fill:#e8a838,stroke:#b07c1e,color:#fff
    classDef svcStyle fill:#5cb85c,stroke:#3d8b3d,color:#fff

    class Order orderStyle
    class EB busStyle
    class Inv,Pay,Notify svcStyle

※ 注文サービスは他サービスの存在を知らない

4つのイベント駆動パターン

1. Event Notification(イベント通知)

最小限の情報を含むイベントを発行し、詳細が必要な消費者は発行元に問い合わせます。

// イベント: 最小限の情報のみ
interface OrderCreatedEvent {
  type: 'OrderCreated';
  orderId: string;
  occurredAt: Date;
}

// 消費者: 必要な詳細は注文サービスに問い合わせる
class InventoryEventHandler {
  constructor(
    private orderClient: OrderClient,
    private inventoryRepo: InventoryRepository
  ) {}

  async handle(event: OrderCreatedEvent): Promise<void> {
    // 詳細を取得するために注文サービスを呼び出す
    const order = await this.orderClient.getOrder(event.orderId);
    for (const item of order.items) {
      await this.inventoryRepo.reduceStock(item.productId, item.quantity);
    }
  }
}

2. Event-Carried State Transfer(イベント搬送状態転送)

イベントに処理に必要な全データを含め、消費者が発行元に問い合わせる必要をなくします。

// イベント: 必要な情報を全て含む
interface OrderCreatedEvent {
  type: 'OrderCreated';
  orderId: string;
  customerId: string;
  items: Array<{
    productId: string;
    productName: string;
    quantity: number;
    unitPrice: number;
  }>;
  totalAmount: number;
  shippingAddress: {
    postalCode: string;
    prefecture: string;
    city: string;
    line: string;
  };
  occurredAt: Date;
}

// 消費者: イベントのデータだけで処理が完結
class InventoryEventHandler {
  constructor(private inventoryRepo: InventoryRepository) {}

  async handle(event: OrderCreatedEvent): Promise<void> {
    // 注文サービスへの問い合わせが不要
    for (const item of event.items) {
      await this.inventoryRepo.reduceStock(item.productId, item.quantity);
    }
  }
}

3. Event Sourcing(イベントソーシング)

状態を直接保存せず、状態変化のイベント列を保存し、現在の状態はイベントから再構築します(詳細はstep3_4で解説)。

// イベントの列として状態変化を記録
const orderEvents = [
  { type: 'OrderCreated', orderId: 'O-001', items: [...], at: '2024-01-01T10:00' },
  { type: 'PaymentReceived', orderId: 'O-001', amount: 5000, at: '2024-01-01T10:01' },
  { type: 'OrderShipped', orderId: 'O-001', trackingId: 'T-999', at: '2024-01-02T09:00' },
];

// 現在の状態 = イベントを順に適用して再構築
// { status: 'SHIPPED', amount: 5000, trackingId: 'T-999' }

4. CQRS(Command Query Responsibility Segregation)

コマンド(書き込み)とクエリ(読み取り)を分離し、それぞれに最適化されたモデルを使用します(詳細はstep3_4で解説)。


メッセージブローカーの比較

主要なメッセージブローカー

特性Apache KafkaRabbitMQAWS SQS/SNS
方式分散ログメッセージキューマネージドキュー/トピック
順序保証パーティション内で保証キュー内で保証FIFOキューで保証
メッセージ保持期間指定で保持消費後に削除最大14日保持
スループット非常に高い高い高い
再処理容易(オフセット巻き戻し)困難DLQで部分的に可能
運用コスト高い(自前運用)中程度低い(マネージド)
ユースケースイベントストリーム、ログ集約タスクキュー、RPCAWS上のイベント駆動

AWS SQS/SNS の構成例

graph TD
    SNS["SNS Topic<br/>(OrderEvents)"]
    SNS --> Q1["SQS Queue<br/>(Inventory)"]
    SNS --> Q2["SQS Queue<br/>(Payment)"]
    SNS --> Q3["SQS Queue<br/>(Email)"]
    Q1 --> S1["在庫サービス"]
    Q2 --> S2["決済サービス"]
    Q3 --> S3["通知サービス"]

    classDef snsStyle fill:#d9534f,stroke:#b52b27,color:#fff
    classDef sqsStyle fill:#e8a838,stroke:#b07c1e,color:#fff
    classDef svcStyle fill:#4a90d9,stroke:#2c5f8a,color:#fff

    class SNS snsStyle
    class Q1,Q2,Q3 sqsStyle
    class S1,S2,S3 svcStyle

Choreography vs Orchestration

Choreography(振り付け)

各サービスがイベントに反応して自律的に動作します。中央のコーディネーターは存在しません。

graph LR
    OC["OrderCreated"] --> IS["在庫サービス"]
    IS --> IR["InventoryReserved"]
    IR --> PS["決済サービス"]
    PS --> PC["PaymentCompleted"]
    PC --> NS["通知サービス"]
    NS --> NSent["NotificationSent"]

    classDef eventStyle fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e
    classDef svcStyle fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e40af

    class OC,IR,PC,NSent eventStyle
    class IS,PS,NS svcStyle
// Choreography: 各サービスが自律的にイベントに反応
class InventoryService {
  @OnEvent('OrderCreated')
  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    const reserved = await this.reserve(event.items);
    // 結果をイベントとして発行
    await this.eventBus.publish({
      type: reserved ? 'InventoryReserved' : 'InventoryReservationFailed',
      orderId: event.orderId,
      reservationId: reserved?.id,
    });
  }
}

class PaymentService {
  @OnEvent('InventoryReserved')
  async handleInventoryReserved(event: InventoryReservedEvent): Promise<void> {
    const result = await this.processPayment(event.orderId);
    await this.eventBus.publish({
      type: result.success ? 'PaymentCompleted' : 'PaymentFailed',
      orderId: event.orderId,
      transactionId: result.transactionId,
    });
  }
}

Orchestration(オーケストレーション)

中央のオーケストレーター(Saga)が処理フローを制御します。

graph LR
    subgraph Saga["Order Saga(Orchestrator)"]
        S1["1. reserve inventory"]
        S2["2. process payment"]
        S3["3. send notification"]
        COMP["失敗時: compensate"]
    end

    S1 --> Inv["在庫サービス"]
    S2 --> Pay["決済サービス"]
    S3 --> Notify["通知サービス"]
    COMP -.->|"補償処理"| Inv & Pay & Notify

    classDef sagaStyle fill:#4a90d9,stroke:#2c5f8a,color:#fff
    classDef svcStyle fill:#5cb85c,stroke:#3d8b3d,color:#fff
    classDef compStyle fill:#d9534f,stroke:#b52b27,color:#fff

    class S1,S2,S3 sagaStyle
    class Inv,Pay,Notify svcStyle
    class COMP compStyle
// Orchestration: Sagaが処理フローを管理
class CreateOrderSaga {
  constructor(
    private inventoryService: InventoryPort,
    private paymentService: PaymentPort,
    private notificationService: NotificationPort,
    private orderRepo: OrderRepository
  ) {}

  async execute(command: CreateOrderCommand): Promise<Order> {
    const order = Order.create(command);
    const compensations: Array<() => Promise<void>> = [];

    try {
      // Step 1: 在庫予約
      const reservation = await this.inventoryService.reserve(command.items);
      compensations.push(() =>
        this.inventoryService.cancelReservation(reservation.id)
      );

      // Step 2: 決済処理
      const payment = await this.paymentService.charge({
        orderId: order.id.value,
        amount: command.totalAmount,
      });
      compensations.push(() =>
        this.paymentService.refund(payment.transactionId)
      );

      // Step 3: 注文確定
      order.confirm(payment.transactionId);
      await this.orderRepo.save(order);

      // Step 4: 通知(失敗しても注文は取り消さない)
      await this.notificationService
        .sendConfirmation(order)
        .catch(err => console.error('通知失敗:', err));

      return order;
    } catch (error) {
      // 補償トランザクション: 逆順で実行
      for (const compensate of compensations.reverse()) {
        await compensate().catch(err =>
          console.error('補償トランザクション失敗:', err)
        );
      }
      throw error;
    }
  }
}

Choreography vs Orchestration の比較

観点ChoreographyOrchestration
結合度低い(各サービスが自律的)中程度(オーケストレーターに依存)
フロー可視性低い(分散して把握しにくい)高い(1箇所でフロー全体を把握)
追加・変更の容易さ容易(新サービスがイベントを購読するだけ)サガの修正が必要
エラーハンドリング複雑(各サービスが自律的に対処)明確(サガが補償トランザクションを管理)
デバッグ困難(イベントの流れを追跡する必要)容易(サガのログを確認)
適用場面単純なフロー、サービス追加が頻繁複雑なフロー、厳密な順序が必要

結果整合性(Eventual Consistency)

結果整合性とは

イベント駆動アーキテクチャでは、データの更新が即座に全サービスに反映されるわけではありません。一定時間後に全てのサービスで整合性が取れることを保証するのが結果整合性です。

時刻T1: 注文サービス  [注文確定]
時刻T1: 在庫サービス  [まだ反映されていない] ← 不整合
時刻T2: 在庫サービス  [在庫を減算]           ← 整合性回復

※ T1〜T2の間は一時的に不整合な状態

結果整合性への対処パターン

// パターン1: 楽観的UI(クライアント側で即座に反映)
// フロントエンドで注文完了を表示し、バックグラウンドで処理

// パターン2: ポーリング + リトライ
class OrderStatusChecker {
  async waitForConfirmation(orderId: string, maxRetries = 10): Promise<OrderStatus> {
    for (let i = 0; i < maxRetries; i++) {
      const status = await this.orderClient.getStatus(orderId);
      if (status !== 'PENDING') return status;
      await this.delay(1000 * Math.pow(2, i)); // Exponential backoff
    }
    throw new Error('注文確認がタイムアウトしました');
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// パターン3: Outboxパターン(確実なイベント発行)
// トランザクション内でイベントをOutboxテーブルに保存し、
// 別プロセスがOutboxからイベントブローカーに発行する
class OrderService {
  async createOrder(command: CreateOrderCommand): Promise<Order> {
    return await this.db.transaction(async (tx) => {
      // 注文を保存
      const order = Order.create(command);
      await tx.orders.insert(order.toPersistence());

      // 同じトランザクション内でイベントをOutboxに保存
      await tx.outbox.insert({
        id: crypto.randomUUID(),
        aggregateType: 'Order',
        aggregateId: order.id.value,
        eventType: 'OrderCreated',
        payload: JSON.stringify({
          orderId: order.id.value,
          items: command.items,
          totalAmount: command.totalAmount,
        }),
        createdAt: new Date(),
        published: false,
      });

      return order;
    });
  }
}

TypeScriptによるシンプルなEvent Bus実装

// 学習用のインメモリEvent Bus実装
type EventHandler<T = unknown> = (event: T) => Promise<void>;

class InMemoryEventBus {
  private handlers: Map<string, EventHandler[]> = new Map();

  // イベントハンドラーを登録
  subscribe<T>(eventType: string, handler: EventHandler<T>): void {
    const existing = this.handlers.get(eventType) ?? [];
    existing.push(handler as EventHandler);
    this.handlers.set(eventType, existing);
  }

  // イベントを発行(全ハンドラーに非同期で通知)
  async publish<T extends { type: string }>(event: T): Promise<void> {
    const handlers = this.handlers.get(event.type) ?? [];
    console.log(`[EventBus] Publishing ${event.type} to ${handlers.length} handlers`);

    // 全ハンドラーを並列実行
    const results = await Promise.allSettled(
      handlers.map(handler => handler(event))
    );

    // エラーをログ出力(発行者にはエラーを伝播しない)
    results.forEach((result, index) => {
      if (result.status === 'rejected') {
        console.error(`[EventBus] Handler ${index} failed:`, result.reason);
      }
    });
  }
}

// 使用例
const eventBus = new InMemoryEventBus();

// 在庫サービスがOrderCreatedを購読
eventBus.subscribe<OrderCreatedEvent>('OrderCreated', async (event) => {
  console.log(`在庫を減算: 注文 ${event.orderId}`);
  // await inventoryService.reduceStock(event.items);
});

// 通知サービスがOrderCreatedを購読
eventBus.subscribe<OrderCreatedEvent>('OrderCreated', async (event) => {
  console.log(`確認メールを送信: 注文 ${event.orderId}`);
  // await notificationService.sendEmail(event.customerId);
});

// 注文サービスがイベントを発行
await eventBus.publish({
  type: 'OrderCreated',
  orderId: 'O-001',
  customerId: 'C-100',
  items: [{ productId: 'P-1', quantity: 2 }],
  totalAmount: 5000,
  occurredAt: new Date(),
});

リクエスト応答 vs イベント駆動:判断基準

判断基準リクエスト応答イベント駆動
即時のレスポンスが必要適切不適切
処理の順序が厳密適切Orchestrationなら可能
サービス間の疎結合低い高い
障害の伝播伝播する隔離される
デバッグの容易さ容易困難
スケーラビリティ制限あり高い
データの即時整合性保証結果整合性

実践的なガイドライン

リクエスト応答を選ぶ場面:
  - ユーザーが即時の結果を期待するAPI(残高照会、認証)
  - 処理結果によって次のステップが変わる場合
  - 強い整合性が求められる場合

イベント駆動を選ぶ場面:
  - 複数のサービスに通知する必要がある場合
  - 処理が非同期で問題ない場合(メール送信、ログ記録)
  - サービス間の結合を最小化したい場合
  - 将来的に新しい消費者を追加する可能性がある場合

ハイブリッドアプローチ(推奨):
  - コアの処理はリクエスト応答で同期的に実行
  - 副次的な処理はイベント駆動で非同期的に実行
  - 例: 注文の在庫確認・決済は同期、メール通知・分析は非同期

まとめ

ポイント内容
リクエスト応答シンプルだが結合度が高く、障害が伝播しやすい
イベント駆動疎結合だが、結果整合性・デバッグの複雑さが課題
Event Notification最小限の情報を通知、詳細は問い合わせ
Event-Carried State Transfer全データを含めて問い合わせを不要に
Choreography分散制御、各サービスが自律的に動作
Orchestration中央制御、フロー全体の可視性が高い
結果整合性イベント駆動では即時整合性を諦める代わりに疎結合を得る
ハイブリッドコア処理は同期、副次処理は非同期が実用的

チェックリスト

  • リクエスト応答パターンの利点と欠点を説明できる
  • イベント駆動パターンの4つの種類を区別できる
  • Choreography と Orchestration の違いを説明できる
  • 結果整合性の概念を理解した
  • Outboxパターンの目的を理解した
  • 場面に応じた通信パターンの選択基準を理解した

次のステップへ

次は「サーバーレスとコンテナ」を学びます。アプリケーションの実行基盤として、コンテナとサーバーレスをどう選択するかの判断基準を身につけましょう。


推定読了時間: 30分