LESSON 30分

ストーリー

佐藤CTOがホワイトボードにサービス間の通信を描き始めました。

佐藤CTO
Step 2でサービスの境界を引いた。次は”どうやってサービス同士を会話させるか”だ
あなた
REST APIで全部やればいいんじゃないですか?
佐藤CTO
それだと全サービスが常にオンラインでなければならない。決済サービスが落ちたら注文もできなくなる。メッセージングを使えば、サービス同士を疎結合にできる。ここがマイクロサービスの設計で最も重要なポイントだ

メッセージングとは

メッセージングとは、サービス間の通信を**メッセージ(データの塊)**を介して行う仕組みです。送信者と受信者が直接接続する必要がなく、中間のメッセージブローカーを経由してやり取りします。

graph LR
    subgraph 同期通信(REST API)
        OS1["注文サービス"] -->|"HTTP"| PS1["決済サービス"]
    end

    subgraph 非同期メッセージング
        OS2["注文サービス"] -->|"msg"| MB["メッセージブローカー"] -->|"msg"| PS2["決済サービス"]
    end

Point-to-Point パターン

概要

Point-to-Pointは、1つの送信者から1つの受信者にメッセージを送るパターンです。メッセージキューを使い、1つのメッセージは1つのコンシューマーだけが処理します。

graph LR
    P["Producer"] --> Q["Queue"]
    Q --> CA["Consumer A<br/>(msg1, msg3)"]
    Q --> CB["Consumer B<br/>(msg2, msg4)"]

ユースケース

ユースケース説明
タスクの分散処理ワーカー間で処理を均等に分散
コマンドの実行特定のサービスへの命令送信
リクエスト/レスポンス応答が必要な非同期通信

TypeScript実装例

// Point-to-Point: 注文処理のキュー
interface Message<T> {
  id: string;
  type: string;
  payload: T;
  timestamp: Date;
  headers: Record<string, string>;
}

interface MessageQueue<T> {
  send(message: Message<T>): Promise<void>;
  receive(): Promise<Message<T> | null>;
  acknowledge(messageId: string): Promise<void>;
}

// メッセージの送信(Producer)
class OrderCommandSender {
  constructor(private queue: MessageQueue<ProcessOrderCommand>) {}

  async sendProcessOrder(orderId: string, userId: string): Promise<void> {
    const message: Message<ProcessOrderCommand> = {
      id: crypto.randomUUID(),
      type: 'ProcessOrder',
      payload: { orderId, userId, requestedAt: new Date() },
      timestamp: new Date(),
      headers: { 'correlation-id': crypto.randomUUID() },
    };
    await this.queue.send(message);
  }
}

// メッセージの受信(Consumer)
class OrderCommandHandler {
  constructor(
    private queue: MessageQueue<ProcessOrderCommand>,
    private orderService: OrderService,
  ) {}

  async startProcessing(): Promise<void> {
    while (true) {
      const message = await this.queue.receive();
      if (!message) {
        await this.sleep(1000);
        continue;
      }

      try {
        await this.orderService.processOrder(message.payload);
        await this.queue.acknowledge(message.id);
      } catch (error) {
        console.error(`Failed to process message ${message.id}:`, error);
        // リトライまたはDead Letter Queueへ
      }
    }
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Publish-Subscribe パターン

概要

Publish-Subscribeは、1つの送信者から複数の受信者にメッセージを配信するパターンです。パブリッシャーは誰が受信するかを知らず、サブスクライバーは関心のあるトピックを購読します。

graph LR
    P["Publisher"] --> T["Topic"]
    T --> SA["Subscriber A<br/>(在庫更新)"]
    T --> SB["Subscriber B<br/>(通知送信)"]
    T --> SC["Subscriber C<br/>(分析記録)"]

ユースケース

ユースケース説明
イベント通知「注文が作成された」を関心のあるサービスに配信
データ同期複数サービスのデータを同期
監査ログすべてのイベントを監査サービスが受信

TypeScript実装例

// Publish-Subscribe: イベントバス
interface EventSubscriber<T> {
  handle(event: Message<T>): Promise<void>;
}

interface EventBus {
  publish<T>(topic: string, event: Message<T>): Promise<void>;
  subscribe<T>(topic: string, subscriber: EventSubscriber<T>): void;
  unsubscribe(topic: string, subscriber: EventSubscriber<unknown>): void;
}

// イベントのパブリッシュ(注文サービス)
class OrderService {
  constructor(private eventBus: EventBus) {}

  async createOrder(userId: string, items: OrderItem[]): Promise<Order> {
    const order = Order.create(userId, items);
    // ... 注文をDBに保存

    // イベントを発行(誰が受信するかは知らない)
    await this.eventBus.publish('order.created', {
      id: crypto.randomUUID(),
      type: 'OrderCreated',
      payload: {
        orderId: order.id,
        userId: order.userId,
        items: order.items,
        totalAmount: order.totalAmount,
      },
      timestamp: new Date(),
      headers: {},
    });

    return order;
  }
}

// サブスクライバー1: 在庫サービス
class InventorySubscriber implements EventSubscriber<OrderCreatedEvent> {
  constructor(private inventoryService: InventoryService) {}

  async handle(event: Message<OrderCreatedEvent>): Promise<void> {
    for (const item of event.payload.items) {
      await this.inventoryService.reserveStock(item.productId, item.quantity);
    }
  }
}

// サブスクライバー2: 通知サービス
class NotificationSubscriber implements EventSubscriber<OrderCreatedEvent> {
  constructor(private notificationService: NotificationService) {}

  async handle(event: Message<OrderCreatedEvent>): Promise<void> {
    await this.notificationService.sendOrderConfirmation(
      event.payload.userId,
      event.payload.orderId,
    );
  }
}

// サブスクライバー3: 分析サービス
class AnalyticsSubscriber implements EventSubscriber<OrderCreatedEvent> {
  constructor(private analyticsService: AnalyticsService) {}

  async handle(event: Message<OrderCreatedEvent>): Promise<void> {
    await this.analyticsService.recordOrderEvent({
      orderId: event.payload.orderId,
      totalAmount: event.payload.totalAmount,
      timestamp: event.timestamp,
    });
  }
}

メッセージチャネルとルーティング

メッセージチャネルの種類

チャネル説明用途
Point-to-Point Channel1対1のキューコマンド実行
Publish-Subscribe Channel1対多のトピックイベント通知
Dead Letter Channel処理失敗メッセージの退避先エラーハンドリング
Invalid Message Channel形式不正メッセージの退避先デバッグ

メッセージルーティング

// Content-Based Router: メッセージの内容に基づいてルーティング
class OrderEventRouter {
  private routes: Map<string, MessageQueue<unknown>> = new Map();

  registerRoute(eventType: string, queue: MessageQueue<unknown>): void {
    this.routes.set(eventType, queue);
  }

  async route(message: Message<unknown>): Promise<void> {
    const queue = this.routes.get(message.type);
    if (!queue) {
      console.warn(`No route for message type: ${message.type}`);
      await this.deadLetterQueue.send(message);
      return;
    }
    await queue.send(message);
  }
}

// Message Filter: 条件に合うメッセージだけを通過
class HighValueOrderFilter {
  constructor(
    private threshold: number,
    private downstream: EventSubscriber<OrderCreatedEvent>,
  ) {}

  async handle(event: Message<OrderCreatedEvent>): Promise<void> {
    if (event.payload.totalAmount >= this.threshold) {
      await this.downstream.handle(event);
    }
    // threshold未満のメッセージは無視
  }
}

// Message Transformer: メッセージの形式を変換
class OrderToShipmentTransformer {
  transform(orderEvent: Message<OrderCreatedEvent>): Message<CreateShipmentCommand> {
    return {
      id: crypto.randomUUID(),
      type: 'CreateShipment',
      payload: {
        orderId: orderEvent.payload.orderId,
        items: orderEvent.payload.items.map(item => ({
          productId: item.productId,
          quantity: item.quantity,
        })),
        shippingAddress: orderEvent.payload.shippingAddress,
      },
      timestamp: new Date(),
      headers: {
        'correlation-id': orderEvent.headers['correlation-id'] ?? orderEvent.id,
      },
    };
  }
}

Dead Letter Queue(DLQ)

DLQとは

Dead Letter Queue(デッドレターキュー)は、正常に処理できなかったメッセージを退避させるための特別なキューです。リトライ上限を超えた場合やメッセージ形式が不正な場合に使用します。

graph TD
    P["Producer"] --> MQ["Main Queue"]
    MQ --> C["Consumer<br/>(処理失敗)"]
    MQ -->|"リトライ上限超過"| DLQ["Dead Letter Queue"]
    DLQ --> OPS["運用チームが<br/>調査・再処理"]

TypeScript実装例

interface RetryPolicy {
  maxRetries: number;
  backoffMs: number;
  backoffMultiplier: number;
}

class ResilientMessageProcessor<T> {
  constructor(
    private mainQueue: MessageQueue<T>,
    private deadLetterQueue: MessageQueue<T>,
    private handler: (payload: T) => Promise<void>,
    private retryPolicy: RetryPolicy = {
      maxRetries: 3,
      backoffMs: 1000,
      backoffMultiplier: 2,
    },
  ) {}

  async processMessage(message: Message<T>): Promise<void> {
    let retryCount = 0;
    let backoff = this.retryPolicy.backoffMs;

    while (retryCount <= this.retryPolicy.maxRetries) {
      try {
        await this.handler(message.payload);
        await this.mainQueue.acknowledge(message.id);
        return; // 成功
      } catch (error) {
        retryCount++;
        if (retryCount > this.retryPolicy.maxRetries) {
          // リトライ上限超過 → DLQへ
          console.error(
            `Message ${message.id} exceeded max retries. Moving to DLQ.`,
          );
          await this.deadLetterQueue.send({
            ...message,
            headers: {
              ...message.headers,
              'x-retry-count': String(retryCount),
              'x-original-error': String(error),
              'x-moved-to-dlq-at': new Date().toISOString(),
            },
          });
          await this.mainQueue.acknowledge(message.id);
          return;
        }
        // バックオフ待機
        await new Promise(resolve => setTimeout(resolve, backoff));
        backoff *= this.retryPolicy.backoffMultiplier;
      }
    }
  }
}

べき等性(Idempotency)

なぜべき等性が重要か

メッセージングでは、ネットワーク障害やリトライにより同じメッセージが複数回配信される可能性があります。べき等性を確保することで、同じ処理を何度実行しても結果が変わらないことを保証します。

// べき等なメッセージ処理
class IdempotentOrderProcessor {
  constructor(
    private processedMessageIds: Set<string>, // 実運用ではRedisやDBを使用
    private orderService: OrderService,
  ) {}

  async processOrder(message: Message<ProcessOrderCommand>): Promise<void> {
    // すでに処理済みならスキップ
    if (this.processedMessageIds.has(message.id)) {
      console.log(`Message ${message.id} already processed. Skipping.`);
      return;
    }

    try {
      await this.orderService.processOrder(message.payload);
      // 処理済みとして記録
      this.processedMessageIds.add(message.id);
    } catch (error) {
      // 処理失敗の場合は記録しない(リトライ可能にする)
      throw error;
    }
  }
}

// DBベースのべき等性チェック(本番推奨)
class DatabaseIdempotencyChecker {
  constructor(private prisma: PrismaClient) {}

  async isProcessed(messageId: string): Promise<boolean> {
    const record = await this.prisma.processedMessage.findUnique({
      where: { messageId },
    });
    return record !== null;
  }

  async markAsProcessed(messageId: string): Promise<void> {
    await this.prisma.processedMessage.create({
      data: {
        messageId,
        processedAt: new Date(),
      },
    });
  }
}

配信保証レベル

比較表

配信保証説明特徴ユースケース
At-most-once最大1回配信メッセージ損失の可能性、重複なしログ、メトリクス
At-least-once最低1回配信重複の可能性、損失なし一般的なイベント処理
Exactly-once正確に1回配信損失も重複もなし、実装コスト高金融取引、決済

実装上の注意

// At-least-once + べき等性 = 実質的なExactly-once
// これが現実的な解法として最も広く使われている

class AtLeastOnceProcessor<T> {
  constructor(
    private idempotencyChecker: DatabaseIdempotencyChecker,
    private handler: (payload: T) => Promise<void>,
  ) {}

  async process(message: Message<T>): Promise<void> {
    // 1. べき等性チェック
    if (await this.idempotencyChecker.isProcessed(message.id)) {
      return; // すでに処理済み
    }

    // 2. 処理実行
    await this.handler(message.payload);

    // 3. 処理済みマーク
    await this.idempotencyChecker.markAsProcessed(message.id);
  }
}

「“Exactly-onceは不可能”とよく言われるが、“At-least-once + べき等性”で実質的に同じ効果を得られる。これを覚えておけ」 — 佐藤CTO


メッセージの順序保証

順序が重要なケース

同じエンティティに対するイベントは順序が重要です。例えば「注文作成 → 注文承認 → 注文出荷」は順序通り処理される必要があります。

// パーティションキーによる順序保証
interface PartitionedMessage<T> extends Message<T> {
  partitionKey: string; // 同じキーのメッセージは同じパーティションへ
}

// 注文IDをパーティションキーにすることで、
// 同じ注文に関するイベントは順序通り処理される
const orderCreatedEvent: PartitionedMessage<OrderCreatedEvent> = {
  id: crypto.randomUUID(),
  type: 'OrderCreated',
  payload: { orderId: 'order-123', /* ... */ },
  timestamp: new Date(),
  headers: {},
  partitionKey: 'order-123', // 注文IDがパーティションキー
};

Enterprise Integration Patterns 概要

Enterprise Integration Patterns(EIP)は、メッセージングシステムの設計パターンを体系化したものです。主要なパターンを紹介します。

カテゴリパターン説明
メッセージングMessage Channelメッセージの送受信経路
メッセージングMessage Router条件に基づくルーティング
メッセージングMessage Translatorメッセージ形式の変換
メッセージングMessage Endpointプロデューサ/コンシューマの接続点
構成Pipes and Filters処理をパイプラインで連結
構成Scatter-Gather複数サービスに並列リクエストし結果を集約
構成Aggregator複数メッセージを1つにまとめる
エラー処理Dead Letter Channel処理失敗メッセージの退避先
エラー処理Retry失敗時の再試行
// Pipes and Filters パターンの例
type MessageProcessor<T> = (message: Message<T>) => Promise<Message<T>>;

class MessagePipeline<T> {
  private filters: MessageProcessor<T>[] = [];

  addFilter(filter: MessageProcessor<T>): this {
    this.filters.push(filter);
    return this;
  }

  async process(message: Message<T>): Promise<Message<T>> {
    let current = message;
    for (const filter of this.filters) {
      current = await filter(current);
    }
    return current;
  }
}

// パイプラインの構築
const pipeline = new MessagePipeline<OrderEvent>()
  .addFilter(validateMessage)       // バリデーション
  .addFilter(enrichWithUserData)    // ユーザー情報の付与
  .addFilter(transformToInternal)   // 内部形式への変換
  .addFilter(logMessage);           // ログ記録

まとめ

ポイント内容
Point-to-Point1対1通信、タスク分散に適する
Publish-Subscribe1対多通信、イベント通知に適する
Dead Letter Queue処理失敗メッセージの退避と調査
べき等性同じメッセージを何度処理しても結果が同じ
At-least-once + べき等性Exactly-onceの実用的な代替手段
順序保証パーティションキーで同一エンティティの順序を維持
EIPメッセージングの設計パターン体系

チェックリスト

  • Point-to-PointとPublish-Subscribeの違いを説明できる
  • Dead Letter Queueの役割を理解した
  • べき等性の必要性と実装方法を理解した
  • 配信保証レベル(at-most-once, at-least-once, exactly-once)を説明できる
  • メッセージの順序保証の仕組みを理解した
  • Enterprise Integration Patternsの主要パターンを把握した

次のステップへ

次は「Apache Kafkaの基礎」を学びます。メッセージングパターンの理論を、実際のメッセージブローカーであるKafkaでどう実現するのかを見ていきましょう。


推定読了時間: 30分