ストーリー
メッセージブローカーとは
Producerが発行したメッセージをConsumerに届ける仲介者です。
Producer ──→ [Message Broker] ──→ Consumer
メリット:
├─ ProducerとConsumerの疎結合
├─ メッセージのバッファリング(Consumer停止中も保持)
├─ スケーラビリティ(Consumer数を柔軟に増減)
└─ 信頼性(メッセージの永続化)
Apache Kafka
分散ログストリーミングプラットフォームです。メッセージを「ログ」として永続化し、Consumerが読み進める位置(offset)を管理します。
// Kafka Producer
import { Kafka } from "kafkajs";
const kafka = new Kafka({ brokers: ["kafka:9092"] });
const producer = kafka.producer();
async function publishOrderEvent(order: Order): Promise<void> {
await producer.send({
topic: "order-events",
messages: [
{
key: order.id, // パーティションキー
value: JSON.stringify({
type: "order.created",
data: { orderId: order.id, userId: order.userId },
timestamp: new Date().toISOString(),
}),
},
],
});
}
// Kafka Consumer
const consumer = kafka.consumer({ groupId: "inventory-service" });
await consumer.subscribe({ topic: "order-events", fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
console.log(`Processing: ${event.type} from partition ${partition}`);
await processEvent(event);
},
});
Kafkaの特徴
Topic: order-events
├─ Partition 0: [msg1] [msg3] [msg5] ...
├─ Partition 1: [msg2] [msg4] [msg6] ...
└─ Partition 2: [msg7] [msg8] [msg9] ...
Consumer Group: inventory-service
├─ Consumer A → Partition 0
├─ Consumer B → Partition 1
└─ Consumer C → Partition 2
→ 各Partitionは1つのConsumerのみが処理(順序保証)
| 特徴 | 値 |
|---|---|
| スループット | 非常に高い(100万msg/sec以上) |
| メッセージ保持 | 設定期間保持(デフォルト7日) |
| 順序保証 | Partition内で保証 |
| 再読み込み | offset指定で過去メッセージを再読み可能 |
| Consumer Group | 複数グループが独立にメッセージを読める |
RabbitMQ
AMQP準拠のメッセージブローカーです。従来のメッセージキューの概念を実装しています。
// RabbitMQ Producer
import amqp from "amqplib";
async function publishOrderEvent(order: Order): Promise<void> {
const connection = await amqp.connect("amqp://rabbitmq:5672");
const channel = await connection.createChannel();
// Exchange経由でルーティング
await channel.assertExchange("order-events", "topic", { durable: true });
channel.publish(
"order-events",
"order.created", // routing key
Buffer.from(JSON.stringify({
orderId: order.id,
userId: order.userId,
})),
{ persistent: true } // メッセージの永続化
);
}
// RabbitMQ Consumer
async function consumeOrderEvents(): Promise<void> {
const connection = await amqp.connect("amqp://rabbitmq:5672");
const channel = await connection.createChannel();
// キューを作成しExchangにバインド
const queue = await channel.assertQueue("inventory-order-queue", { durable: true });
await channel.bindQueue(queue.queue, "order-events", "order.*");
// prefetchで同時処理数を制御
channel.prefetch(10);
channel.consume(queue.queue, async (msg) => {
if (!msg) return;
const event = JSON.parse(msg.content.toString());
await processEvent(event);
channel.ack(msg); // 処理完了を通知
});
}
RabbitMQのルーティング
Exchange Types:
├─ Direct: routing keyが完全一致
├─ Topic: routing keyのパターンマッチ(order.*)
├─ Fanout: 全キューにブロードキャスト
└─ Headers: ヘッダーの値でルーティング
Amazon SQS
AWSのフルマネージドメッセージキューサービスです。
// SQS Producer
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
const sqs = new SQSClient({ region: "ap-northeast-1" });
async function publishOrderEvent(order: Order): Promise<void> {
await sqs.send(new SendMessageCommand({
QueueUrl: "https://sqs.ap-northeast-1.amazonaws.com/123456789/order-events",
MessageBody: JSON.stringify({
type: "order.created",
data: { orderId: order.id, userId: order.userId },
}),
MessageGroupId: order.id, // FIFOキューの場合
MessageDeduplicationId: `${order.id}-created`, // 重複排除
}));
}
// SQS Consumer(Lambda統合)
export const handler = async (event: SQSEvent): Promise<void> => {
for (const record of event.Records) {
const orderEvent = JSON.parse(record.body);
await processEvent(orderEvent);
}
};
三者比較
| 観点 | Kafka | RabbitMQ | SQS |
|---|---|---|---|
| 設計思想 | 分散ログ | メッセージキュー | マネージドキュー |
| スループット | 非常に高い | 高い | 中〜高 |
| メッセージ保持 | 長期保持(再読み可) | 消費後に削除 | 最大14日 |
| 順序保証 | Partition内 | なし(優先度あり) | FIFOキューで保証 |
| ルーティング | Topic + Partition | Exchange + Routing Key | なし(単一キュー) |
| 運用 | 複雑(ZooKeeperなど) | 中程度 | フルマネージド |
| コスト | インフラ費用 | インフラ費用 | 従量課金 |
| ユースケース | ログ収集、ストリーム処理 | タスクキュー、RPC | サーバーレス統合 |
選択のガイドライン
const selectionGuide = {
kafka: {
when: [
"大量のイベントストリーム処理",
"イベントの再処理が必要",
"複数のConsumer Groupが同じイベントを処理",
"イベントソーシング",
],
avoid: "小規模システム、運用チームが小さい場合",
},
rabbitMQ: {
when: [
"複雑なルーティングが必要",
"タスクキュー(ワーカーパターン)",
"メッセージの優先度制御",
"RPC(リモートプロシージャコール)パターン",
],
avoid: "超高スループットが必要な場合",
},
sqs: {
when: [
"AWS環境でシンプルなキューが必要",
"サーバーレス(Lambda)との統合",
"運用負荷を最小化したい",
"SNSと組み合わせたFanout",
],
avoid: "複雑なルーティング、イベントの再処理",
},
};
まとめ
| ポイント | 内容 |
|---|---|
| Kafka | 分散ログ。高スループット、再読み可能 |
| RabbitMQ | メッセージキュー。柔軟なルーティング |
| SQS | マネージドキュー。運用負荷ゼロ |
| 選択基準 | スループット、保持期間、運用負荷、ルーティング |
チェックリスト
- Kafka、RabbitMQ、SQSの設計思想の違いを説明できる
- 各ブローカーのメッセージ保持の仕組みを理解した
- ユースケースに応じた選択ができる
- KafkaのPartitionとConsumer Groupの関係を理解した
次のステップへ
次はイベントの「形」を定義するイベントスキーマと、その管理に欠かせないスキーマレジストリを学びます。
推定読了時間: 30分