LESSON 30分

ストーリー

あなた
イベント駆動にしたいんですが、どのメッセージブローカーを使えばいいですか? Kafka? RabbitMQ? SQS?
高橋アーキテクト
それぞれ設計思想が違う。Kafkaは”ログ”、RabbitMQは”キュー”、SQSは”マネージドキュー”。用途に合わせて選ぶ必要がある

メッセージブローカーとは

Producerが発行したメッセージをConsumerに届ける仲介者です。

Producer ──→ [Message Broker] ──→ Consumer

メリット:
  ├─ ProducerとConsumerの疎結合
  ├─ メッセージのバッファリング(Consumer停止中も保持)
  ├─ スケーラビリティ(Consumer数を柔軟に増減)
  └─ 信頼性(メッセージの永続化)

Apache Kafka

分散ログストリーミングプラットフォームです。メッセージを「ログ」として永続化し、Consumerが読み進める位置(offset)を管理します。

// Kafka Producer
import { Kafka } from "kafkajs";

const kafka = new Kafka({ brokers: ["kafka:9092"] });
const producer = kafka.producer();

async function publishOrderEvent(order: Order): Promise<void> {
  await producer.send({
    topic: "order-events",
    messages: [
      {
        key: order.id,                    // パーティションキー
        value: JSON.stringify({
          type: "order.created",
          data: { orderId: order.id, userId: order.userId },
          timestamp: new Date().toISOString(),
        }),
      },
    ],
  });
}

// Kafka Consumer
const consumer = kafka.consumer({ groupId: "inventory-service" });

await consumer.subscribe({ topic: "order-events", fromBeginning: false });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value!.toString());
    console.log(`Processing: ${event.type} from partition ${partition}`);
    await processEvent(event);
  },
});

Kafkaの特徴

Topic: order-events
  ├─ Partition 0: [msg1] [msg3] [msg5] ...
  ├─ Partition 1: [msg2] [msg4] [msg6] ...
  └─ Partition 2: [msg7] [msg8] [msg9] ...

Consumer Group: inventory-service
  ├─ Consumer A → Partition 0
  ├─ Consumer B → Partition 1
  └─ Consumer C → Partition 2
  → 各Partitionは1つのConsumerのみが処理(順序保証)
特徴
スループット非常に高い(100万msg/sec以上)
メッセージ保持設定期間保持(デフォルト7日)
順序保証Partition内で保証
再読み込みoffset指定で過去メッセージを再読み可能
Consumer Group複数グループが独立にメッセージを読める

RabbitMQ

AMQP準拠のメッセージブローカーです。従来のメッセージキューの概念を実装しています。

// RabbitMQ Producer
import amqp from "amqplib";

async function publishOrderEvent(order: Order): Promise<void> {
  const connection = await amqp.connect("amqp://rabbitmq:5672");
  const channel = await connection.createChannel();

  // Exchange経由でルーティング
  await channel.assertExchange("order-events", "topic", { durable: true });

  channel.publish(
    "order-events",
    "order.created",                    // routing key
    Buffer.from(JSON.stringify({
      orderId: order.id,
      userId: order.userId,
    })),
    { persistent: true }               // メッセージの永続化
  );
}

// RabbitMQ Consumer
async function consumeOrderEvents(): Promise<void> {
  const connection = await amqp.connect("amqp://rabbitmq:5672");
  const channel = await connection.createChannel();

  // キューを作成しExchangにバインド
  const queue = await channel.assertQueue("inventory-order-queue", { durable: true });
  await channel.bindQueue(queue.queue, "order-events", "order.*");

  // prefetchで同時処理数を制御
  channel.prefetch(10);

  channel.consume(queue.queue, async (msg) => {
    if (!msg) return;
    const event = JSON.parse(msg.content.toString());
    await processEvent(event);
    channel.ack(msg);                   // 処理完了を通知
  });
}

RabbitMQのルーティング

Exchange Types:
  ├─ Direct:  routing keyが完全一致
  ├─ Topic:   routing keyのパターンマッチ(order.*)
  ├─ Fanout:  全キューにブロードキャスト
  └─ Headers: ヘッダーの値でルーティング

Amazon SQS

AWSのフルマネージドメッセージキューサービスです。

// SQS Producer
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";

const sqs = new SQSClient({ region: "ap-northeast-1" });

async function publishOrderEvent(order: Order): Promise<void> {
  await sqs.send(new SendMessageCommand({
    QueueUrl: "https://sqs.ap-northeast-1.amazonaws.com/123456789/order-events",
    MessageBody: JSON.stringify({
      type: "order.created",
      data: { orderId: order.id, userId: order.userId },
    }),
    MessageGroupId: order.id,          // FIFOキューの場合
    MessageDeduplicationId: `${order.id}-created`, // 重複排除
  }));
}

// SQS Consumer(Lambda統合)
export const handler = async (event: SQSEvent): Promise<void> => {
  for (const record of event.Records) {
    const orderEvent = JSON.parse(record.body);
    await processEvent(orderEvent);
  }
};

三者比較

観点KafkaRabbitMQSQS
設計思想分散ログメッセージキューマネージドキュー
スループット非常に高い高い中〜高
メッセージ保持長期保持(再読み可)消費後に削除最大14日
順序保証Partition内なし(優先度あり)FIFOキューで保証
ルーティングTopic + PartitionExchange + Routing Keyなし(単一キュー)
運用複雑(ZooKeeperなど)中程度フルマネージド
コストインフラ費用インフラ費用従量課金
ユースケースログ収集、ストリーム処理タスクキュー、RPCサーバーレス統合

選択のガイドライン

const selectionGuide = {
  kafka: {
    when: [
      "大量のイベントストリーム処理",
      "イベントの再処理が必要",
      "複数のConsumer Groupが同じイベントを処理",
      "イベントソーシング",
    ],
    avoid: "小規模システム、運用チームが小さい場合",
  },
  rabbitMQ: {
    when: [
      "複雑なルーティングが必要",
      "タスクキュー(ワーカーパターン)",
      "メッセージの優先度制御",
      "RPC(リモートプロシージャコール)パターン",
    ],
    avoid: "超高スループットが必要な場合",
  },
  sqs: {
    when: [
      "AWS環境でシンプルなキューが必要",
      "サーバーレス(Lambda)との統合",
      "運用負荷を最小化したい",
      "SNSと組み合わせたFanout",
    ],
    avoid: "複雑なルーティング、イベントの再処理",
  },
};

まとめ

ポイント内容
Kafka分散ログ。高スループット、再読み可能
RabbitMQメッセージキュー。柔軟なルーティング
SQSマネージドキュー。運用負荷ゼロ
選択基準スループット、保持期間、運用負荷、ルーティング

チェックリスト

  • Kafka、RabbitMQ、SQSの設計思想の違いを説明できる
  • 各ブローカーのメッセージ保持の仕組みを理解した
  • ユースケースに応じた選択ができる
  • KafkaのPartitionとConsumer Groupの関係を理解した

次のステップへ

次はイベントの「形」を定義するイベントスキーマと、その管理に欠かせないスキーマレジストリを学びます。


推定読了時間: 30分