LESSON 40分

ストーリー

佐藤CTO
メッセージングパターンの理論を学んだところで、次は実際のツールだ
佐藤CTO
Apache Kafkaは”分散ストリーミングプラットフォーム”だ。単なるメッセージキューではない。LinkedInが社内で開発し、今ではNetflix、Uber、Airbnbなど大規模システムの基盤になっている
あなた
なぜKafkaが選ばれるんですか?
佐藤CTO
高スループット、耐障害性、そしてイベントの永続化。メッセージを消費しても消えない。これが従来のメッセージキューとの決定的な違いだ

Kafkaアーキテクチャの全体像

graph TD
    subgraph Cluster["Kafka Cluster"]
        subgraph B0["Broker 0"]
            B0L["Topic A Part 0<br/>(Leader)"]
            B0R["Topic A Part 1<br/>(Replica)"]
        end
        subgraph B1["Broker 1"]
            B1L["Topic A Part 1<br/>(Leader)"]
            B1R["Topic A Part 0<br/>(Replica)"]
        end
        subgraph B2["Broker 2"]
            B2L["Topic A Part 2<br/>(Leader)"]
            B2R["Topic A Part 0<br/>(Replica)"]
        end
        ZK["ZooKeeper / KRaft<br/>(クラスター管理・メタデータ)"]
    end

    PRD["Producers<br/>注文サービス<br/>決済サービス<br/>ユーザーサービス"] --> Cluster
    Cluster --> CG["Consumer Groups<br/>Group A: 在庫サービス<br/>Group B: 分析サービス"]

    classDef broker fill:#fff3cd,stroke:#f0ad4e,color:#333
    classDef zk fill:#e8f4fd,stroke:#2196f3,color:#333
    classDef client fill:#d4edda,stroke:#28a745,color:#333
    class B0,B1,B2 broker
    class ZK zk
    class PRD,CG client

Kafkaの基本概念

ブローカー(Broker)

Kafkaクラスターを構成するサーバーノードです。各ブローカーがトピックのパーティションを分担して管理します。

トピック(Topic)

メッセージのカテゴリです。「order-events」「payment-events」のように、イベントの種類ごとにトピックを作成します。

パーティション(Partition)

トピックを分割した単位です。パーティションにより並列処理が可能になり、スループットが向上します。

Topic: order-events(3パーティション)

Partition 0: [msg0] [msg3] [msg6] [msg9]  ...
Partition 1: [msg1] [msg4] [msg7] [msg10] ...
Partition 2: [msg2] [msg5] [msg8] [msg11] ...


                                    |
                              offset(位置)

オフセット(Offset)

パーティション内でのメッセージの位置を示す連番です。コンシューマーは自分がどこまで読んだか(オフセット)を管理します。

コンシューマーグループ(Consumer Group)

複数のコンシューマーをグループ化し、パーティションを分担して処理します。同じグループ内の各コンシューマーは異なるパーティションを担当します。

Consumer Group "inventory-service":
  Consumer A ← Partition 0
  Consumer B ← Partition 1
  Consumer C ← Partition 2

Consumer Group "analytics-service":
  Consumer D ← Partition 0, 1
  Consumer E ← Partition 2

Producerパターン

基本的なProducer

import { Kafka, Producer, Partitioners } from 'kafkajs';

// Kafkaクライアントの初期化
const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});

const producer: Producer = kafka.producer({
  createPartitioner: Partitioners.DefaultPartitioner,
});

// Producer接続
async function initProducer(): Promise<void> {
  await producer.connect();
}

// イベントの送信
async function publishOrderCreated(order: Order): Promise<void> {
  await producer.send({
    topic: 'order-events',
    messages: [
      {
        // keyによるパーティション決定(同じ注文IDは同じパーティションへ)
        key: order.id,
        value: JSON.stringify({
          eventType: 'OrderCreated',
          orderId: order.id,
          userId: order.userId,
          items: order.items,
          totalAmount: order.totalAmount,
          createdAt: new Date().toISOString(),
        }),
        headers: {
          'event-type': 'OrderCreated',
          'correlation-id': order.correlationId,
          'schema-version': '1',
        },
      },
    ],
  });
}

信頼性の高いProducer設定

const reliableProducer = kafka.producer({
  // すべてのレプリカが確認するまで待つ(最も安全)
  // acks: -1 はすべてのISR(In-Sync Replicas)の確認を待つ
  allowAutoTopicCreation: false,
  transactionTimeout: 30000,
  idempotent: true, // べき等なプロデューサー(重複防止)
});

// バッチ送信
async function publishBatch(events: OrderEvent[]): Promise<void> {
  const messages = events.map(event => ({
    key: event.orderId,
    value: JSON.stringify(event),
    headers: {
      'event-type': event.eventType,
      'timestamp': new Date().toISOString(),
    },
  }));

  await reliableProducer.send({
    topic: 'order-events',
    messages,
    acks: -1,     // すべてのISRの確認を待つ
    timeout: 30000,
  });
}

Consumerパターン

基本的なConsumer

import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';

const consumer: Consumer = kafka.consumer({
  groupId: 'inventory-service',
});

async function startConsuming(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({
    topic: 'order-events',
    fromBeginning: false, // 最新のメッセージから読み開始
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
      const event = JSON.parse(message.value!.toString());
      const eventType = message.headers?.['event-type']?.toString();

      console.log(`Received [${eventType}] from ${topic}[${partition}]`);

      switch (eventType) {
        case 'OrderCreated':
          await handleOrderCreated(event);
          break;
        case 'OrderCancelled':
          await handleOrderCancelled(event);
          break;
        default:
          console.warn(`Unknown event type: ${eventType}`);
      }
    },
  });
}

async function handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
  // 在庫の引き当て処理
  for (const item of event.items) {
    await inventoryService.reserveStock(item.productId, item.quantity);
  }
}

オフセット管理

// 手動コミット(推奨:処理完了後にコミット)
const consumer = kafka.consumer({
  groupId: 'payment-service',
  // 自動コミットを無効化
});

await consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message }) => {
    try {
      const event = JSON.parse(message.value!.toString());
      await processPayment(event);

      // 処理成功後にオフセットをコミット
      await consumer.commitOffsets([{
        topic,
        partition,
        offset: (Number(message.offset) + 1).toString(),
      }]);
    } catch (error) {
      console.error('Processing failed:', error);
      // コミットしない → 再度同じメッセージが配信される
    }
  },
});

Kafka Streams概要

Kafka Streamsは、Kafkaトピック上のデータをリアルタイムに変換・集約するためのライブラリです。

graph LR
    Input["入力トピック\norder-events"] --> Filter["Filter"]
    Filter --> Transform["Transform"]
    Transform --> Aggregate["Aggregate"]
    Aggregate --> Output["出力トピック\norder-summary"]

    style Input fill:#dbeafe,stroke:#2563eb,color:#1e40af
    style Filter fill:#fef3c7,stroke:#d97706,color:#92400e
    style Transform fill:#fef3c7,stroke:#d97706,color:#92400e
    style Aggregate fill:#fef3c7,stroke:#d97706,color:#92400e
    style Output fill:#d1fae5,stroke:#059669,color:#065f46

例: 全注文イベントから、ユーザーごとの累計注文金額をリアルタイム集計

// Node.js環境でのストリーム処理イメージ
// (実際のKafka StreamsはJavaライブラリだが、概念を示す)

interface StreamProcessor<TInput, TOutput> {
  process(input: TInput): TOutput | null;
}

// フィルター: 高額注文のみ通過
class HighValueOrderFilter implements StreamProcessor<OrderEvent, OrderEvent> {
  process(input: OrderEvent): OrderEvent | null {
    return input.totalAmount >= 10000 ? input : null;
  }
}

// 変換: 注文イベントをサマリーに変換
class OrderSummaryTransformer
  implements StreamProcessor<OrderEvent, OrderSummary> {
  process(input: OrderEvent): OrderSummary {
    return {
      orderId: input.orderId,
      userId: input.userId,
      totalAmount: input.totalAmount,
      itemCount: input.items.length,
      processedAt: new Date(),
    };
  }
}

// 集約: ユーザーごとの累計注文金額
class UserOrderAggregator {
  private aggregates: Map<string, number> = new Map();

  aggregate(event: OrderEvent): UserOrderAggregate {
    const current = this.aggregates.get(event.userId) ?? 0;
    const newTotal = current + event.totalAmount;
    this.aggregates.set(event.userId, newTotal);
    return {
      userId: event.userId,
      totalAmount: newTotal,
      lastOrderAt: new Date(),
    };
  }
}

Schema Registry

なぜスキーマ管理が必要か

マイクロサービス間でメッセージをやり取りする場合、送信者と受信者がメッセージの構造について合意する必要があります。Schema Registryはスキーマのバージョン管理と互換性チェックを行います。

Avro / Protobufの比較

特性JSONAvroProtobuf
可読性高い低い(バイナリ)低い(バイナリ)
サイズ大きい小さい小さい
スキーマ進化なし強力強力
型安全性低い高い高い
パフォーマンス遅い速い最速
ツールチェーン豊富Kafka向け充実gRPC統合

TypeScriptでのスキーマ定義

// Avroスキーマの例
const orderCreatedSchema = {
  type: 'record',
  name: 'OrderCreated',
  namespace: 'com.example.order.events',
  fields: [
    { name: 'orderId', type: 'string' },
    { name: 'userId', type: 'string' },
    {
      name: 'items',
      type: {
        type: 'array',
        items: {
          type: 'record',
          name: 'OrderItem',
          fields: [
            { name: 'productId', type: 'string' },
            { name: 'quantity', type: 'int' },
            { name: 'price', type: 'double' },
          ],
        },
      },
    },
    { name: 'totalAmount', type: 'double' },
    { name: 'createdAt', type: { type: 'long', logicalType: 'timestamp-millis' } },
    // スキーマ進化: 新しいフィールドにはデフォルト値を設定
    { name: 'currency', type: 'string', default: 'JPY' },
  ],
};

// TypeScriptの型定義(スキーマから自動生成も可能)
interface OrderCreatedEvent {
  orderId: string;
  userId: string;
  items: Array<{
    productId: string;
    quantity: number;
    price: number;
  }>;
  totalAmount: number;
  createdAt: number; // timestamp-millis
  currency: string;
}

Kafka vs 他のメッセージブローカー

特性KafkaRabbitMQAmazon SQSAmazon SNS + SQS
モデルログベースキューベースキューベースPub/Sub + Queue
スループット非常に高い高い中程度中程度
メッセージ保持設定期間保持消費後削除最大14日SNS即時、SQS最大14日
順序保証パーティション内キュー内FIFO SQSのみFIFOのみ
コンシューマーグループネイティブ対応プラグインなしSQSで実現
リプレイ可能不可不可不可
運用コスト高い(自前管理)中程度低い(マネージド)低い(マネージド)
ユースケースイベントストリーミングタスクキューシンプルなキューイベント通知

選定基準

// メッセージブローカーの選定ガイド
const selectBroker = (requirements: BrokerRequirements): string => {
  if (requirements.eventReplay && requirements.highThroughput) {
    return 'Kafka'; // イベントソーシング、ストリーム処理
  }
  if (requirements.complexRouting && requirements.messageAcknowledge) {
    return 'RabbitMQ'; // 複雑なルーティング、タスクキュー
  }
  if (requirements.managedService && requirements.simpleQueue) {
    return 'Amazon SQS'; // シンプルなキュー、運用コスト最小
  }
  if (requirements.managedService && requirements.fanout) {
    return 'Amazon SNS + SQS'; // ファンアウト + キュー
  }
  return 'Kafka'; // デフォルト(汎用性が高い)
};

Kafka設定のベストプラクティス

トピック設計

設定項目推奨値理由
パーティション数コンシューマー数 x 2〜3将来のスケールアウト余地
レプリケーションファクター32台障害でもデータ保全
保持期間7日(イベントログは30日以上)要件に応じて調整
最大メッセージサイズ1MB以下大きなデータはS3等に格納
クリーンアップポリシーdelete(ログ)/ compact(状態)ユースケースに応じて

Producer設定

// 本番環境向けProducer設定
const producerConfig = {
  // 信頼性
  acks: -1,                    // すべてのISRの確認
  idempotent: true,           // 重複送信防止
  maxInFlightRequests: 5,     // べき等Producer使用時は5以下

  // パフォーマンス
  batchSize: 16384,           // バッチサイズ(bytes)
  lingerMs: 5,                // バッチ送信の待機時間
  compressionType: 'snappy',  // 圧縮(CPU vs 帯域のバランス)

  // リトライ
  retries: 2147483647,        // 無限リトライ(delivery.timeout.msで制限)
  deliveryTimeoutMs: 120000,  // 最大120秒で配信
};

Consumer設定

// 本番環境向けConsumer設定
const consumerConfig = {
  groupId: 'order-processing-service',

  // オフセット管理
  autoCommit: false,                // 手動コミット推奨
  autoOffsetReset: 'earliest',      // グループ初回は最古から

  // パフォーマンス
  maxBytesPerPartition: 1048576,    // パーティションあたり1MB
  maxWaitTimeInMs: 500,             // フェッチ最大待機時間
  sessionTimeout: 30000,            // セッションタイムアウト
  heartbeatInterval: 3000,          // ハートビート間隔

  // リバランス
  rebalanceTimeout: 60000,          // リバランスタイムアウト
  partitionAssigners: [
    // Cooperative Sticky Assignerで停止時間を最小化
  ],
};

まとめ

ポイント内容
KafkaアーキテクチャBroker、Topic、Partition、Consumer Groupの4要素
Producerメッセージキーでパーティション決定、acks=-1で信頼性確保
ConsumerConsumer Groupで負荷分散、手動コミットで処理保証
オフセット管理コンシューマーが「どこまで読んだか」を管理する仕組み
Kafka Streamsリアルタイムストリーム処理ライブラリ
Schema Registryスキーマのバージョン管理と互換性チェック
他ブローカーとの違いKafkaはログベース、メッセージ保持、リプレイが可能

チェックリスト

  • Kafkaのアーキテクチャ(Broker、Topic、Partition、Consumer Group)を説明できる
  • ProducerとConsumerの基本的な実装を理解した
  • オフセット管理の仕組みと手動コミットの重要性を理解した
  • Schema Registryの役割を説明できる
  • Kafka、RabbitMQ、SQSの使い分けを判断できる
  • 本番環境向けの設定ベストプラクティスを把握した

次のステップへ

次は「イベントスキーマ設計」を学びます。Kafkaに流すイベントの「中身」をどう設計するか、スキーマの進化戦略を含めて深掘りしましょう。


推定読了時間: 40分