LESSON 30分

ストーリー

あなた
注文イベントに userId フィールドを追加したら、消費側のサービスが全部落ちました…
高橋アーキテクト
イベントスキーマの変更は、APIのバージョニングと同じくらい慎重に扱う必要がある。スキーマレジストリを導入して、互換性を管理しよう

イベントスキーマの重要性

イベントはサービス間の契約です。スキーマが明確でないと、以下の問題が起きます。

graph LR
    subgraph "問題1: フィールド名の不一致"
        P1["Producer<br/>orderId: 123<br/>amount: 1000"] -->|"イベント"| C1["Consumer<br/>event.data.totalAmount を期待<br/>→ undefined → エラー"]
    end

    subgraph "問題2: 型の不一致"
        P2["Producer<br/>amountの型を<br/>number → stringに変更"] -->|"イベント"| C2["Consumer<br/>number前提で計算<br/>→ NaN → 不正データ蓄積"]
    end

    classDef error fill:#fdd,stroke:#c33,color:#900
    class C1,C2 error

イベントスキーマの設計

CloudEvents仕様

業界標準のイベントフォーマットです。

// CloudEvents仕様に基づくイベント構造
interface CloudEvent<T> {
  // 必須フィールド
  specversion: "1.0";           // CloudEvents仕様バージョン
  id: string;                   // イベントの一意なID
  source: string;               // 発行元(URIフォーマット)
  type: string;                 // イベントタイプ
  time: string;                 // 発生時刻(ISO 8601)

  // オプションフィールド
  datacontenttype?: string;     // "application/json"
  dataschema?: string;          // スキーマのURI
  subject?: string;             // イベントの対象

  // カスタム拡張
  correlationid?: string;       // リクエスト追跡ID
  causationid?: string;         // 原因イベントのID

  // ペイロード
  data: T;
}

// 使用例
const orderCreatedEvent: CloudEvent<OrderCreatedData> = {
  specversion: "1.0",
  id: "evt-abc123",
  source: "/order-service",
  type: "com.example.order.created",
  time: "2025-01-15T10:30:00Z",
  datacontenttype: "application/json",
  dataschema: "https://schema-registry.example.com/order-created/v2",
  correlationid: "req-xyz789",
  data: {
    orderId: "ord-001",
    userId: "usr-123",
    items: [{ productId: "prod-456", quantity: 2, price: 500 }],
    totalAmount: 1000,
  },
};

スキーマ定義(JSON Schema)

// JSON Schemaによるイベントスキーマ定義
const orderCreatedSchema = {
  $id: "https://schema-registry.example.com/order-created/v2",
  type: "object",
  required: ["orderId", "userId", "items", "totalAmount"],
  properties: {
    orderId: { type: "string", pattern: "^ord-" },
    userId: { type: "string", pattern: "^usr-" },
    items: {
      type: "array",
      minItems: 1,
      items: {
        type: "object",
        required: ["productId", "quantity", "price"],
        properties: {
          productId: { type: "string" },
          quantity: { type: "integer", minimum: 1 },
          price: { type: "number", minimum: 0 },
        },
      },
    },
    totalAmount: { type: "number", minimum: 0 },
  },
  additionalProperties: false,
};

スキーマの進化と互換性

互換性のタイプ

タイプ説明
前方互換古いConsumerが新しいスキーマを読めるフィールド追加(optional)
後方互換新しいConsumerが古いスキーマを読めるデフォルト値付きフィールド追加
完全互換前方+後方の両方最も安全
非互換どちらの互換性もない既存フィールドの型変更、削除
// 後方互換な変更(安全)
// v1
interface OrderCreatedV1 {
  orderId: string;
  userId: string;
  totalAmount: number;
}

// v2: フィールド追加(デフォルト値あり)
interface OrderCreatedV2 {
  orderId: string;
  userId: string;
  totalAmount: number;
  currency?: string;          // 新規追加(optional、デフォルト: "JPY")
  createdAt?: string;         // 新規追加(optional)
}

// 非互換な変更(危険)
interface OrderCreatedBroken {
  orderId: string;
  userId: string;
  // totalAmount: number;     // 削除 → 古いConsumerが壊れる
  amount: string;             // 型変更 + リネーム → 壊れる
}

安全なスキーマ進化のルール

const safeEvolutionRules = {
  safe: [
    "optionalフィールドの追加",
    "デフォルト値付きフィールドの追加",
    "フィールドの説明変更",
    "optionalフィールドの型の拡張(number → number | string)",
  ],
  unsafe: [
    "requiredフィールドの削除",
    "フィールドの型変更",
    "フィールド名の変更",
    "requiredフィールドの追加(デフォルト値なし)",
  ],
};

スキーマレジストリ

イベントスキーマを一元管理し、互換性を自動チェックするサービスです。

// スキーマレジストリの機能
interface SchemaRegistry {
  // スキーマの登録
  register(subject: string, schema: Schema): Promise<{ id: number; version: number }>;

  // スキーマの取得
  getById(id: number): Promise<Schema>;
  getBySubjectVersion(subject: string, version: number): Promise<Schema>;
  getLatest(subject: string): Promise<Schema>;

  // 互換性チェック
  checkCompatibility(subject: string, schema: Schema): Promise<CompatibilityResult>;

  // 互換性レベルの設定
  setCompatibility(subject: string, level: CompatibilityLevel): Promise<void>;
}

type CompatibilityLevel = "BACKWARD" | "FORWARD" | "FULL" | "NONE";

主要なスキーマレジストリ

ツール対応フォーマット特徴
Confluent Schema RegistryAvro, JSON Schema, ProtobufKafka標準
AWS Glue Schema RegistryAvro, JSON SchemaAWS統合
Apicurio RegistryOpenAPI, Avro, JSON Schema, ProtobufOSS、多機能

バリデーションの実装

// ProducerとConsumerの両方でスキーマバリデーション
import Ajv from "ajv";

const ajv = new Ajv();

class EventPublisher {
  constructor(
    private broker: MessageBroker,
    private registry: SchemaRegistry
  ) {}

  async publish<T>(eventType: string, data: T): Promise<void> {
    // スキーマレジストリから最新スキーマを取得
    const schema = await this.registry.getLatest(eventType);
    const validate = ajv.compile(schema);

    // バリデーション
    if (!validate(data)) {
      throw new Error(`Schema validation failed: ${JSON.stringify(validate.errors)}`);
    }

    // 発行
    await this.broker.publish(eventType, {
      data,
      schemaId: schema.id,   // どのスキーマで書いたかを記録
    });
  }
}

class EventConsumer {
  async handle(message: BrokerMessage): Promise<void> {
    // メッセージに含まれるスキーマIDでデシリアライズ
    const schema = await this.registry.getById(message.schemaId);
    const validate = ajv.compile(schema);

    if (!validate(message.data)) {
      // デッドレターキューに送る
      await this.deadLetterQueue.send(message);
      return;
    }

    await this.processEvent(message.data);
  }
}

まとめ

ポイント内容
イベントスキーマサービス間の契約。CloudEvents仕様が標準
互換性前方互換、後方互換、完全互換
安全な進化optionalフィールドの追加、デフォルト値
スキーマレジストリ一元管理と互換性の自動チェック

チェックリスト

  • CloudEvents仕様の構造を理解した
  • 前方互換と後方互換の違いを説明できる
  • 安全なスキーマ進化のルールを3つ以上挙げられる
  • スキーマレジストリの役割を説明できる

次のステップへ

次は冪等性と順序保証を学びます。「同じイベントが2回届いても正しく動く」設計の技術です。


推定読了時間: 30分