ストーリー
イベントスキーマの重要性
イベントはサービス間の契約です。スキーマが明確でないと、以下の問題が起きます。
graph LR
subgraph "問題1: フィールド名の不一致"
P1["Producer<br/>orderId: 123<br/>amount: 1000"] -->|"イベント"| C1["Consumer<br/>event.data.totalAmount を期待<br/>→ undefined → エラー"]
end
subgraph "問題2: 型の不一致"
P2["Producer<br/>amountの型を<br/>number → stringに変更"] -->|"イベント"| C2["Consumer<br/>number前提で計算<br/>→ NaN → 不正データ蓄積"]
end
classDef error fill:#fdd,stroke:#c33,color:#900
class C1,C2 error
イベントスキーマの設計
CloudEvents仕様
業界標準のイベントフォーマットです。
// CloudEvents仕様に基づくイベント構造
interface CloudEvent<T> {
// 必須フィールド
specversion: "1.0"; // CloudEvents仕様バージョン
id: string; // イベントの一意なID
source: string; // 発行元(URIフォーマット)
type: string; // イベントタイプ
time: string; // 発生時刻(ISO 8601)
// オプションフィールド
datacontenttype?: string; // "application/json"
dataschema?: string; // スキーマのURI
subject?: string; // イベントの対象
// カスタム拡張
correlationid?: string; // リクエスト追跡ID
causationid?: string; // 原因イベントのID
// ペイロード
data: T;
}
// 使用例
const orderCreatedEvent: CloudEvent<OrderCreatedData> = {
specversion: "1.0",
id: "evt-abc123",
source: "/order-service",
type: "com.example.order.created",
time: "2025-01-15T10:30:00Z",
datacontenttype: "application/json",
dataschema: "https://schema-registry.example.com/order-created/v2",
correlationid: "req-xyz789",
data: {
orderId: "ord-001",
userId: "usr-123",
items: [{ productId: "prod-456", quantity: 2, price: 500 }],
totalAmount: 1000,
},
};
スキーマ定義(JSON Schema)
// JSON Schemaによるイベントスキーマ定義
const orderCreatedSchema = {
$id: "https://schema-registry.example.com/order-created/v2",
type: "object",
required: ["orderId", "userId", "items", "totalAmount"],
properties: {
orderId: { type: "string", pattern: "^ord-" },
userId: { type: "string", pattern: "^usr-" },
items: {
type: "array",
minItems: 1,
items: {
type: "object",
required: ["productId", "quantity", "price"],
properties: {
productId: { type: "string" },
quantity: { type: "integer", minimum: 1 },
price: { type: "number", minimum: 0 },
},
},
},
totalAmount: { type: "number", minimum: 0 },
},
additionalProperties: false,
};
スキーマの進化と互換性
互換性のタイプ
| タイプ | 説明 | 例 |
|---|---|---|
| 前方互換 | 古いConsumerが新しいスキーマを読める | フィールド追加(optional) |
| 後方互換 | 新しいConsumerが古いスキーマを読める | デフォルト値付きフィールド追加 |
| 完全互換 | 前方+後方の両方 | 最も安全 |
| 非互換 | どちらの互換性もない | 既存フィールドの型変更、削除 |
// 後方互換な変更(安全)
// v1
interface OrderCreatedV1 {
orderId: string;
userId: string;
totalAmount: number;
}
// v2: フィールド追加(デフォルト値あり)
interface OrderCreatedV2 {
orderId: string;
userId: string;
totalAmount: number;
currency?: string; // 新規追加(optional、デフォルト: "JPY")
createdAt?: string; // 新規追加(optional)
}
// 非互換な変更(危険)
interface OrderCreatedBroken {
orderId: string;
userId: string;
// totalAmount: number; // 削除 → 古いConsumerが壊れる
amount: string; // 型変更 + リネーム → 壊れる
}
安全なスキーマ進化のルール
const safeEvolutionRules = {
safe: [
"optionalフィールドの追加",
"デフォルト値付きフィールドの追加",
"フィールドの説明変更",
"optionalフィールドの型の拡張(number → number | string)",
],
unsafe: [
"requiredフィールドの削除",
"フィールドの型変更",
"フィールド名の変更",
"requiredフィールドの追加(デフォルト値なし)",
],
};
スキーマレジストリ
イベントスキーマを一元管理し、互換性を自動チェックするサービスです。
// スキーマレジストリの機能
interface SchemaRegistry {
// スキーマの登録
register(subject: string, schema: Schema): Promise<{ id: number; version: number }>;
// スキーマの取得
getById(id: number): Promise<Schema>;
getBySubjectVersion(subject: string, version: number): Promise<Schema>;
getLatest(subject: string): Promise<Schema>;
// 互換性チェック
checkCompatibility(subject: string, schema: Schema): Promise<CompatibilityResult>;
// 互換性レベルの設定
setCompatibility(subject: string, level: CompatibilityLevel): Promise<void>;
}
type CompatibilityLevel = "BACKWARD" | "FORWARD" | "FULL" | "NONE";
主要なスキーマレジストリ
| ツール | 対応フォーマット | 特徴 |
|---|---|---|
| Confluent Schema Registry | Avro, JSON Schema, Protobuf | Kafka標準 |
| AWS Glue Schema Registry | Avro, JSON Schema | AWS統合 |
| Apicurio Registry | OpenAPI, Avro, JSON Schema, Protobuf | OSS、多機能 |
バリデーションの実装
// ProducerとConsumerの両方でスキーマバリデーション
import Ajv from "ajv";
const ajv = new Ajv();
class EventPublisher {
constructor(
private broker: MessageBroker,
private registry: SchemaRegistry
) {}
async publish<T>(eventType: string, data: T): Promise<void> {
// スキーマレジストリから最新スキーマを取得
const schema = await this.registry.getLatest(eventType);
const validate = ajv.compile(schema);
// バリデーション
if (!validate(data)) {
throw new Error(`Schema validation failed: ${JSON.stringify(validate.errors)}`);
}
// 発行
await this.broker.publish(eventType, {
data,
schemaId: schema.id, // どのスキーマで書いたかを記録
});
}
}
class EventConsumer {
async handle(message: BrokerMessage): Promise<void> {
// メッセージに含まれるスキーマIDでデシリアライズ
const schema = await this.registry.getById(message.schemaId);
const validate = ajv.compile(schema);
if (!validate(message.data)) {
// デッドレターキューに送る
await this.deadLetterQueue.send(message);
return;
}
await this.processEvent(message.data);
}
}
まとめ
| ポイント | 内容 |
|---|---|
| イベントスキーマ | サービス間の契約。CloudEvents仕様が標準 |
| 互換性 | 前方互換、後方互換、完全互換 |
| 安全な進化 | optionalフィールドの追加、デフォルト値 |
| スキーマレジストリ | 一元管理と互換性の自動チェック |
チェックリスト
- CloudEvents仕様の構造を理解した
- 前方互換と後方互換の違いを説明できる
- 安全なスキーマ進化のルールを3つ以上挙げられる
- スキーマレジストリの役割を説明できる
次のステップへ
次は冪等性と順序保証を学びます。「同じイベントが2回届いても正しく動く」設計の技術です。
推定読了時間: 30分