ストーリー
Kafkaアーキテクチャの全体像
graph TD
subgraph Cluster["Kafka Cluster"]
subgraph B0["Broker 0"]
B0L["Topic A Part 0<br/>(Leader)"]
B0R["Topic A Part 1<br/>(Replica)"]
end
subgraph B1["Broker 1"]
B1L["Topic A Part 1<br/>(Leader)"]
B1R["Topic A Part 0<br/>(Replica)"]
end
subgraph B2["Broker 2"]
B2L["Topic A Part 2<br/>(Leader)"]
B2R["Topic A Part 0<br/>(Replica)"]
end
ZK["ZooKeeper / KRaft<br/>(クラスター管理・メタデータ)"]
end
PRD["Producers<br/>注文サービス<br/>決済サービス<br/>ユーザーサービス"] --> Cluster
Cluster --> CG["Consumer Groups<br/>Group A: 在庫サービス<br/>Group B: 分析サービス"]
classDef broker fill:#fff3cd,stroke:#f0ad4e,color:#333
classDef zk fill:#e8f4fd,stroke:#2196f3,color:#333
classDef client fill:#d4edda,stroke:#28a745,color:#333
class B0,B1,B2 broker
class ZK zk
class PRD,CG client
Kafkaの基本概念
ブローカー(Broker)
Kafkaクラスターを構成するサーバーノードです。各ブローカーがトピックのパーティションを分担して管理します。
トピック(Topic)
メッセージのカテゴリです。「order-events」「payment-events」のように、イベントの種類ごとにトピックを作成します。
パーティション(Partition)
トピックを分割した単位です。パーティションにより並列処理が可能になり、スループットが向上します。
Topic: order-events(3パーティション)
Partition 0: [msg0] [msg3] [msg6] [msg9] ...
Partition 1: [msg1] [msg4] [msg7] [msg10] ...
Partition 2: [msg2] [msg5] [msg8] [msg11] ...
▲
|
offset(位置)
オフセット(Offset)
パーティション内でのメッセージの位置を示す連番です。コンシューマーは自分がどこまで読んだか(オフセット)を管理します。
コンシューマーグループ(Consumer Group)
複数のコンシューマーをグループ化し、パーティションを分担して処理します。同じグループ内の各コンシューマーは異なるパーティションを担当します。
Consumer Group "inventory-service":
Consumer A ← Partition 0
Consumer B ← Partition 1
Consumer C ← Partition 2
Consumer Group "analytics-service":
Consumer D ← Partition 0, 1
Consumer E ← Partition 2
Producerパターン
基本的なProducer
import { Kafka, Producer, Partitioners } from 'kafkajs';
// Kafkaクライアントの初期化
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});
const producer: Producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
// Producer接続
async function initProducer(): Promise<void> {
await producer.connect();
}
// イベントの送信
async function publishOrderCreated(order: Order): Promise<void> {
await producer.send({
topic: 'order-events',
messages: [
{
// keyによるパーティション決定(同じ注文IDは同じパーティションへ)
key: order.id,
value: JSON.stringify({
eventType: 'OrderCreated',
orderId: order.id,
userId: order.userId,
items: order.items,
totalAmount: order.totalAmount,
createdAt: new Date().toISOString(),
}),
headers: {
'event-type': 'OrderCreated',
'correlation-id': order.correlationId,
'schema-version': '1',
},
},
],
});
}
信頼性の高いProducer設定
const reliableProducer = kafka.producer({
// すべてのレプリカが確認するまで待つ(最も安全)
// acks: -1 はすべてのISR(In-Sync Replicas)の確認を待つ
allowAutoTopicCreation: false,
transactionTimeout: 30000,
idempotent: true, // べき等なプロデューサー(重複防止)
});
// バッチ送信
async function publishBatch(events: OrderEvent[]): Promise<void> {
const messages = events.map(event => ({
key: event.orderId,
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'timestamp': new Date().toISOString(),
},
}));
await reliableProducer.send({
topic: 'order-events',
messages,
acks: -1, // すべてのISRの確認を待つ
timeout: 30000,
});
}
Consumerパターン
基本的なConsumer
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
const consumer: Consumer = kafka.consumer({
groupId: 'inventory-service',
});
async function startConsuming(): Promise<void> {
await consumer.connect();
await consumer.subscribe({
topic: 'order-events',
fromBeginning: false, // 最新のメッセージから読み開始
});
await consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const event = JSON.parse(message.value!.toString());
const eventType = message.headers?.['event-type']?.toString();
console.log(`Received [${eventType}] from ${topic}[${partition}]`);
switch (eventType) {
case 'OrderCreated':
await handleOrderCreated(event);
break;
case 'OrderCancelled':
await handleOrderCancelled(event);
break;
default:
console.warn(`Unknown event type: ${eventType}`);
}
},
});
}
async function handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
// 在庫の引き当て処理
for (const item of event.items) {
await inventoryService.reserveStock(item.productId, item.quantity);
}
}
オフセット管理
// 手動コミット(推奨:処理完了後にコミット)
const consumer = kafka.consumer({
groupId: 'payment-service',
// 自動コミットを無効化
});
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value!.toString());
await processPayment(event);
// 処理成功後にオフセットをコミット
await consumer.commitOffsets([{
topic,
partition,
offset: (Number(message.offset) + 1).toString(),
}]);
} catch (error) {
console.error('Processing failed:', error);
// コミットしない → 再度同じメッセージが配信される
}
},
});
Kafka Streams概要
Kafka Streamsは、Kafkaトピック上のデータをリアルタイムに変換・集約するためのライブラリです。
graph LR
Input["入力トピック\norder-events"] --> Filter["Filter"]
Filter --> Transform["Transform"]
Transform --> Aggregate["Aggregate"]
Aggregate --> Output["出力トピック\norder-summary"]
style Input fill:#dbeafe,stroke:#2563eb,color:#1e40af
style Filter fill:#fef3c7,stroke:#d97706,color:#92400e
style Transform fill:#fef3c7,stroke:#d97706,color:#92400e
style Aggregate fill:#fef3c7,stroke:#d97706,color:#92400e
style Output fill:#d1fae5,stroke:#059669,color:#065f46
例: 全注文イベントから、ユーザーごとの累計注文金額をリアルタイム集計
// Node.js環境でのストリーム処理イメージ
// (実際のKafka StreamsはJavaライブラリだが、概念を示す)
interface StreamProcessor<TInput, TOutput> {
process(input: TInput): TOutput | null;
}
// フィルター: 高額注文のみ通過
class HighValueOrderFilter implements StreamProcessor<OrderEvent, OrderEvent> {
process(input: OrderEvent): OrderEvent | null {
return input.totalAmount >= 10000 ? input : null;
}
}
// 変換: 注文イベントをサマリーに変換
class OrderSummaryTransformer
implements StreamProcessor<OrderEvent, OrderSummary> {
process(input: OrderEvent): OrderSummary {
return {
orderId: input.orderId,
userId: input.userId,
totalAmount: input.totalAmount,
itemCount: input.items.length,
processedAt: new Date(),
};
}
}
// 集約: ユーザーごとの累計注文金額
class UserOrderAggregator {
private aggregates: Map<string, number> = new Map();
aggregate(event: OrderEvent): UserOrderAggregate {
const current = this.aggregates.get(event.userId) ?? 0;
const newTotal = current + event.totalAmount;
this.aggregates.set(event.userId, newTotal);
return {
userId: event.userId,
totalAmount: newTotal,
lastOrderAt: new Date(),
};
}
}
Schema Registry
なぜスキーマ管理が必要か
マイクロサービス間でメッセージをやり取りする場合、送信者と受信者がメッセージの構造について合意する必要があります。Schema Registryはスキーマのバージョン管理と互換性チェックを行います。
Avro / Protobufの比較
| 特性 | JSON | Avro | Protobuf |
|---|---|---|---|
| 可読性 | 高い | 低い(バイナリ) | 低い(バイナリ) |
| サイズ | 大きい | 小さい | 小さい |
| スキーマ進化 | なし | 強力 | 強力 |
| 型安全性 | 低い | 高い | 高い |
| パフォーマンス | 遅い | 速い | 最速 |
| ツールチェーン | 豊富 | Kafka向け充実 | gRPC統合 |
TypeScriptでのスキーマ定義
// Avroスキーマの例
const orderCreatedSchema = {
type: 'record',
name: 'OrderCreated',
namespace: 'com.example.order.events',
fields: [
{ name: 'orderId', type: 'string' },
{ name: 'userId', type: 'string' },
{
name: 'items',
type: {
type: 'array',
items: {
type: 'record',
name: 'OrderItem',
fields: [
{ name: 'productId', type: 'string' },
{ name: 'quantity', type: 'int' },
{ name: 'price', type: 'double' },
],
},
},
},
{ name: 'totalAmount', type: 'double' },
{ name: 'createdAt', type: { type: 'long', logicalType: 'timestamp-millis' } },
// スキーマ進化: 新しいフィールドにはデフォルト値を設定
{ name: 'currency', type: 'string', default: 'JPY' },
],
};
// TypeScriptの型定義(スキーマから自動生成も可能)
interface OrderCreatedEvent {
orderId: string;
userId: string;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
totalAmount: number;
createdAt: number; // timestamp-millis
currency: string;
}
Kafka vs 他のメッセージブローカー
| 特性 | Kafka | RabbitMQ | Amazon SQS | Amazon SNS + SQS |
|---|---|---|---|---|
| モデル | ログベース | キューベース | キューベース | Pub/Sub + Queue |
| スループット | 非常に高い | 高い | 中程度 | 中程度 |
| メッセージ保持 | 設定期間保持 | 消費後削除 | 最大14日 | SNS即時、SQS最大14日 |
| 順序保証 | パーティション内 | キュー内 | FIFO SQSのみ | FIFOのみ |
| コンシューマーグループ | ネイティブ対応 | プラグイン | なし | SQSで実現 |
| リプレイ | 可能 | 不可 | 不可 | 不可 |
| 運用コスト | 高い(自前管理) | 中程度 | 低い(マネージド) | 低い(マネージド) |
| ユースケース | イベントストリーミング | タスクキュー | シンプルなキュー | イベント通知 |
選定基準
// メッセージブローカーの選定ガイド
const selectBroker = (requirements: BrokerRequirements): string => {
if (requirements.eventReplay && requirements.highThroughput) {
return 'Kafka'; // イベントソーシング、ストリーム処理
}
if (requirements.complexRouting && requirements.messageAcknowledge) {
return 'RabbitMQ'; // 複雑なルーティング、タスクキュー
}
if (requirements.managedService && requirements.simpleQueue) {
return 'Amazon SQS'; // シンプルなキュー、運用コスト最小
}
if (requirements.managedService && requirements.fanout) {
return 'Amazon SNS + SQS'; // ファンアウト + キュー
}
return 'Kafka'; // デフォルト(汎用性が高い)
};
Kafka設定のベストプラクティス
トピック設計
| 設定項目 | 推奨値 | 理由 |
|---|---|---|
| パーティション数 | コンシューマー数 x 2〜3 | 将来のスケールアウト余地 |
| レプリケーションファクター | 3 | 2台障害でもデータ保全 |
| 保持期間 | 7日(イベントログは30日以上) | 要件に応じて調整 |
| 最大メッセージサイズ | 1MB以下 | 大きなデータはS3等に格納 |
| クリーンアップポリシー | delete(ログ)/ compact(状態) | ユースケースに応じて |
Producer設定
// 本番環境向けProducer設定
const producerConfig = {
// 信頼性
acks: -1, // すべてのISRの確認
idempotent: true, // 重複送信防止
maxInFlightRequests: 5, // べき等Producer使用時は5以下
// パフォーマンス
batchSize: 16384, // バッチサイズ(bytes)
lingerMs: 5, // バッチ送信の待機時間
compressionType: 'snappy', // 圧縮(CPU vs 帯域のバランス)
// リトライ
retries: 2147483647, // 無限リトライ(delivery.timeout.msで制限)
deliveryTimeoutMs: 120000, // 最大120秒で配信
};
Consumer設定
// 本番環境向けConsumer設定
const consumerConfig = {
groupId: 'order-processing-service',
// オフセット管理
autoCommit: false, // 手動コミット推奨
autoOffsetReset: 'earliest', // グループ初回は最古から
// パフォーマンス
maxBytesPerPartition: 1048576, // パーティションあたり1MB
maxWaitTimeInMs: 500, // フェッチ最大待機時間
sessionTimeout: 30000, // セッションタイムアウト
heartbeatInterval: 3000, // ハートビート間隔
// リバランス
rebalanceTimeout: 60000, // リバランスタイムアウト
partitionAssigners: [
// Cooperative Sticky Assignerで停止時間を最小化
],
};
まとめ
| ポイント | 内容 |
|---|---|
| Kafkaアーキテクチャ | Broker、Topic、Partition、Consumer Groupの4要素 |
| Producer | メッセージキーでパーティション決定、acks=-1で信頼性確保 |
| Consumer | Consumer Groupで負荷分散、手動コミットで処理保証 |
| オフセット管理 | コンシューマーが「どこまで読んだか」を管理する仕組み |
| Kafka Streams | リアルタイムストリーム処理ライブラリ |
| Schema Registry | スキーマのバージョン管理と互換性チェック |
| 他ブローカーとの違い | Kafkaはログベース、メッセージ保持、リプレイが可能 |
チェックリスト
- Kafkaのアーキテクチャ(Broker、Topic、Partition、Consumer Group)を説明できる
- ProducerとConsumerの基本的な実装を理解した
- オフセット管理の仕組みと手動コミットの重要性を理解した
- Schema Registryの役割を説明できる
- Kafka、RabbitMQ、SQSの使い分けを判断できる
- 本番環境向けの設定ベストプラクティスを把握した
次のステップへ
次は「イベントスキーマ設計」を学びます。Kafkaに流すイベントの「中身」をどう設計するか、スキーマの進化戦略を含めて深掘りしましょう。
推定読了時間: 40分