ストーリー
佐藤CTOがホワイトボードにサービス間の通信を描き始めました。
メッセージングとは
メッセージングとは、サービス間の通信を**メッセージ(データの塊)**を介して行う仕組みです。送信者と受信者が直接接続する必要がなく、中間のメッセージブローカーを経由してやり取りします。
graph LR
subgraph 同期通信(REST API)
OS1["注文サービス"] -->|"HTTP"| PS1["決済サービス"]
end
subgraph 非同期メッセージング
OS2["注文サービス"] -->|"msg"| MB["メッセージブローカー"] -->|"msg"| PS2["決済サービス"]
end
Point-to-Point パターン
概要
Point-to-Pointは、1つの送信者から1つの受信者にメッセージを送るパターンです。メッセージキューを使い、1つのメッセージは1つのコンシューマーだけが処理します。
graph LR
P["Producer"] --> Q["Queue"]
Q --> CA["Consumer A<br/>(msg1, msg3)"]
Q --> CB["Consumer B<br/>(msg2, msg4)"]
ユースケース
| ユースケース | 説明 |
|---|---|
| タスクの分散処理 | ワーカー間で処理を均等に分散 |
| コマンドの実行 | 特定のサービスへの命令送信 |
| リクエスト/レスポンス | 応答が必要な非同期通信 |
TypeScript実装例
// Point-to-Point: 注文処理のキュー
interface Message<T> {
id: string;
type: string;
payload: T;
timestamp: Date;
headers: Record<string, string>;
}
interface MessageQueue<T> {
send(message: Message<T>): Promise<void>;
receive(): Promise<Message<T> | null>;
acknowledge(messageId: string): Promise<void>;
}
// メッセージの送信(Producer)
class OrderCommandSender {
constructor(private queue: MessageQueue<ProcessOrderCommand>) {}
async sendProcessOrder(orderId: string, userId: string): Promise<void> {
const message: Message<ProcessOrderCommand> = {
id: crypto.randomUUID(),
type: 'ProcessOrder',
payload: { orderId, userId, requestedAt: new Date() },
timestamp: new Date(),
headers: { 'correlation-id': crypto.randomUUID() },
};
await this.queue.send(message);
}
}
// メッセージの受信(Consumer)
class OrderCommandHandler {
constructor(
private queue: MessageQueue<ProcessOrderCommand>,
private orderService: OrderService,
) {}
async startProcessing(): Promise<void> {
while (true) {
const message = await this.queue.receive();
if (!message) {
await this.sleep(1000);
continue;
}
try {
await this.orderService.processOrder(message.payload);
await this.queue.acknowledge(message.id);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// リトライまたはDead Letter Queueへ
}
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Publish-Subscribe パターン
概要
Publish-Subscribeは、1つの送信者から複数の受信者にメッセージを配信するパターンです。パブリッシャーは誰が受信するかを知らず、サブスクライバーは関心のあるトピックを購読します。
graph LR
P["Publisher"] --> T["Topic"]
T --> SA["Subscriber A<br/>(在庫更新)"]
T --> SB["Subscriber B<br/>(通知送信)"]
T --> SC["Subscriber C<br/>(分析記録)"]
ユースケース
| ユースケース | 説明 |
|---|---|
| イベント通知 | 「注文が作成された」を関心のあるサービスに配信 |
| データ同期 | 複数サービスのデータを同期 |
| 監査ログ | すべてのイベントを監査サービスが受信 |
TypeScript実装例
// Publish-Subscribe: イベントバス
interface EventSubscriber<T> {
handle(event: Message<T>): Promise<void>;
}
interface EventBus {
publish<T>(topic: string, event: Message<T>): Promise<void>;
subscribe<T>(topic: string, subscriber: EventSubscriber<T>): void;
unsubscribe(topic: string, subscriber: EventSubscriber<unknown>): void;
}
// イベントのパブリッシュ(注文サービス)
class OrderService {
constructor(private eventBus: EventBus) {}
async createOrder(userId: string, items: OrderItem[]): Promise<Order> {
const order = Order.create(userId, items);
// ... 注文をDBに保存
// イベントを発行(誰が受信するかは知らない)
await this.eventBus.publish('order.created', {
id: crypto.randomUUID(),
type: 'OrderCreated',
payload: {
orderId: order.id,
userId: order.userId,
items: order.items,
totalAmount: order.totalAmount,
},
timestamp: new Date(),
headers: {},
});
return order;
}
}
// サブスクライバー1: 在庫サービス
class InventorySubscriber implements EventSubscriber<OrderCreatedEvent> {
constructor(private inventoryService: InventoryService) {}
async handle(event: Message<OrderCreatedEvent>): Promise<void> {
for (const item of event.payload.items) {
await this.inventoryService.reserveStock(item.productId, item.quantity);
}
}
}
// サブスクライバー2: 通知サービス
class NotificationSubscriber implements EventSubscriber<OrderCreatedEvent> {
constructor(private notificationService: NotificationService) {}
async handle(event: Message<OrderCreatedEvent>): Promise<void> {
await this.notificationService.sendOrderConfirmation(
event.payload.userId,
event.payload.orderId,
);
}
}
// サブスクライバー3: 分析サービス
class AnalyticsSubscriber implements EventSubscriber<OrderCreatedEvent> {
constructor(private analyticsService: AnalyticsService) {}
async handle(event: Message<OrderCreatedEvent>): Promise<void> {
await this.analyticsService.recordOrderEvent({
orderId: event.payload.orderId,
totalAmount: event.payload.totalAmount,
timestamp: event.timestamp,
});
}
}
メッセージチャネルとルーティング
メッセージチャネルの種類
| チャネル | 説明 | 用途 |
|---|---|---|
| Point-to-Point Channel | 1対1のキュー | コマンド実行 |
| Publish-Subscribe Channel | 1対多のトピック | イベント通知 |
| Dead Letter Channel | 処理失敗メッセージの退避先 | エラーハンドリング |
| Invalid Message Channel | 形式不正メッセージの退避先 | デバッグ |
メッセージルーティング
// Content-Based Router: メッセージの内容に基づいてルーティング
class OrderEventRouter {
private routes: Map<string, MessageQueue<unknown>> = new Map();
registerRoute(eventType: string, queue: MessageQueue<unknown>): void {
this.routes.set(eventType, queue);
}
async route(message: Message<unknown>): Promise<void> {
const queue = this.routes.get(message.type);
if (!queue) {
console.warn(`No route for message type: ${message.type}`);
await this.deadLetterQueue.send(message);
return;
}
await queue.send(message);
}
}
// Message Filter: 条件に合うメッセージだけを通過
class HighValueOrderFilter {
constructor(
private threshold: number,
private downstream: EventSubscriber<OrderCreatedEvent>,
) {}
async handle(event: Message<OrderCreatedEvent>): Promise<void> {
if (event.payload.totalAmount >= this.threshold) {
await this.downstream.handle(event);
}
// threshold未満のメッセージは無視
}
}
// Message Transformer: メッセージの形式を変換
class OrderToShipmentTransformer {
transform(orderEvent: Message<OrderCreatedEvent>): Message<CreateShipmentCommand> {
return {
id: crypto.randomUUID(),
type: 'CreateShipment',
payload: {
orderId: orderEvent.payload.orderId,
items: orderEvent.payload.items.map(item => ({
productId: item.productId,
quantity: item.quantity,
})),
shippingAddress: orderEvent.payload.shippingAddress,
},
timestamp: new Date(),
headers: {
'correlation-id': orderEvent.headers['correlation-id'] ?? orderEvent.id,
},
};
}
}
Dead Letter Queue(DLQ)
DLQとは
Dead Letter Queue(デッドレターキュー)は、正常に処理できなかったメッセージを退避させるための特別なキューです。リトライ上限を超えた場合やメッセージ形式が不正な場合に使用します。
graph TD
P["Producer"] --> MQ["Main Queue"]
MQ --> C["Consumer<br/>(処理失敗)"]
MQ -->|"リトライ上限超過"| DLQ["Dead Letter Queue"]
DLQ --> OPS["運用チームが<br/>調査・再処理"]
TypeScript実装例
interface RetryPolicy {
maxRetries: number;
backoffMs: number;
backoffMultiplier: number;
}
class ResilientMessageProcessor<T> {
constructor(
private mainQueue: MessageQueue<T>,
private deadLetterQueue: MessageQueue<T>,
private handler: (payload: T) => Promise<void>,
private retryPolicy: RetryPolicy = {
maxRetries: 3,
backoffMs: 1000,
backoffMultiplier: 2,
},
) {}
async processMessage(message: Message<T>): Promise<void> {
let retryCount = 0;
let backoff = this.retryPolicy.backoffMs;
while (retryCount <= this.retryPolicy.maxRetries) {
try {
await this.handler(message.payload);
await this.mainQueue.acknowledge(message.id);
return; // 成功
} catch (error) {
retryCount++;
if (retryCount > this.retryPolicy.maxRetries) {
// リトライ上限超過 → DLQへ
console.error(
`Message ${message.id} exceeded max retries. Moving to DLQ.`,
);
await this.deadLetterQueue.send({
...message,
headers: {
...message.headers,
'x-retry-count': String(retryCount),
'x-original-error': String(error),
'x-moved-to-dlq-at': new Date().toISOString(),
},
});
await this.mainQueue.acknowledge(message.id);
return;
}
// バックオフ待機
await new Promise(resolve => setTimeout(resolve, backoff));
backoff *= this.retryPolicy.backoffMultiplier;
}
}
}
}
べき等性(Idempotency)
なぜべき等性が重要か
メッセージングでは、ネットワーク障害やリトライにより同じメッセージが複数回配信される可能性があります。べき等性を確保することで、同じ処理を何度実行しても結果が変わらないことを保証します。
// べき等なメッセージ処理
class IdempotentOrderProcessor {
constructor(
private processedMessageIds: Set<string>, // 実運用ではRedisやDBを使用
private orderService: OrderService,
) {}
async processOrder(message: Message<ProcessOrderCommand>): Promise<void> {
// すでに処理済みならスキップ
if (this.processedMessageIds.has(message.id)) {
console.log(`Message ${message.id} already processed. Skipping.`);
return;
}
try {
await this.orderService.processOrder(message.payload);
// 処理済みとして記録
this.processedMessageIds.add(message.id);
} catch (error) {
// 処理失敗の場合は記録しない(リトライ可能にする)
throw error;
}
}
}
// DBベースのべき等性チェック(本番推奨)
class DatabaseIdempotencyChecker {
constructor(private prisma: PrismaClient) {}
async isProcessed(messageId: string): Promise<boolean> {
const record = await this.prisma.processedMessage.findUnique({
where: { messageId },
});
return record !== null;
}
async markAsProcessed(messageId: string): Promise<void> {
await this.prisma.processedMessage.create({
data: {
messageId,
processedAt: new Date(),
},
});
}
}
配信保証レベル
比較表
| 配信保証 | 説明 | 特徴 | ユースケース |
|---|---|---|---|
| At-most-once | 最大1回配信 | メッセージ損失の可能性、重複なし | ログ、メトリクス |
| At-least-once | 最低1回配信 | 重複の可能性、損失なし | 一般的なイベント処理 |
| Exactly-once | 正確に1回配信 | 損失も重複もなし、実装コスト高 | 金融取引、決済 |
実装上の注意
// At-least-once + べき等性 = 実質的なExactly-once
// これが現実的な解法として最も広く使われている
class AtLeastOnceProcessor<T> {
constructor(
private idempotencyChecker: DatabaseIdempotencyChecker,
private handler: (payload: T) => Promise<void>,
) {}
async process(message: Message<T>): Promise<void> {
// 1. べき等性チェック
if (await this.idempotencyChecker.isProcessed(message.id)) {
return; // すでに処理済み
}
// 2. 処理実行
await this.handler(message.payload);
// 3. 処理済みマーク
await this.idempotencyChecker.markAsProcessed(message.id);
}
}
「“Exactly-onceは不可能”とよく言われるが、“At-least-once + べき等性”で実質的に同じ効果を得られる。これを覚えておけ」 — 佐藤CTO
メッセージの順序保証
順序が重要なケース
同じエンティティに対するイベントは順序が重要です。例えば「注文作成 → 注文承認 → 注文出荷」は順序通り処理される必要があります。
// パーティションキーによる順序保証
interface PartitionedMessage<T> extends Message<T> {
partitionKey: string; // 同じキーのメッセージは同じパーティションへ
}
// 注文IDをパーティションキーにすることで、
// 同じ注文に関するイベントは順序通り処理される
const orderCreatedEvent: PartitionedMessage<OrderCreatedEvent> = {
id: crypto.randomUUID(),
type: 'OrderCreated',
payload: { orderId: 'order-123', /* ... */ },
timestamp: new Date(),
headers: {},
partitionKey: 'order-123', // 注文IDがパーティションキー
};
Enterprise Integration Patterns 概要
Enterprise Integration Patterns(EIP)は、メッセージングシステムの設計パターンを体系化したものです。主要なパターンを紹介します。
| カテゴリ | パターン | 説明 |
|---|---|---|
| メッセージング | Message Channel | メッセージの送受信経路 |
| メッセージング | Message Router | 条件に基づくルーティング |
| メッセージング | Message Translator | メッセージ形式の変換 |
| メッセージング | Message Endpoint | プロデューサ/コンシューマの接続点 |
| 構成 | Pipes and Filters | 処理をパイプラインで連結 |
| 構成 | Scatter-Gather | 複数サービスに並列リクエストし結果を集約 |
| 構成 | Aggregator | 複数メッセージを1つにまとめる |
| エラー処理 | Dead Letter Channel | 処理失敗メッセージの退避先 |
| エラー処理 | Retry | 失敗時の再試行 |
// Pipes and Filters パターンの例
type MessageProcessor<T> = (message: Message<T>) => Promise<Message<T>>;
class MessagePipeline<T> {
private filters: MessageProcessor<T>[] = [];
addFilter(filter: MessageProcessor<T>): this {
this.filters.push(filter);
return this;
}
async process(message: Message<T>): Promise<Message<T>> {
let current = message;
for (const filter of this.filters) {
current = await filter(current);
}
return current;
}
}
// パイプラインの構築
const pipeline = new MessagePipeline<OrderEvent>()
.addFilter(validateMessage) // バリデーション
.addFilter(enrichWithUserData) // ユーザー情報の付与
.addFilter(transformToInternal) // 内部形式への変換
.addFilter(logMessage); // ログ記録
まとめ
| ポイント | 内容 |
|---|---|
| Point-to-Point | 1対1通信、タスク分散に適する |
| Publish-Subscribe | 1対多通信、イベント通知に適する |
| Dead Letter Queue | 処理失敗メッセージの退避と調査 |
| べき等性 | 同じメッセージを何度処理しても結果が同じ |
| At-least-once + べき等性 | Exactly-onceの実用的な代替手段 |
| 順序保証 | パーティションキーで同一エンティティの順序を維持 |
| EIP | メッセージングの設計パターン体系 |
チェックリスト
- Point-to-PointとPublish-Subscribeの違いを説明できる
- Dead Letter Queueの役割を理解した
- べき等性の必要性と実装方法を理解した
- 配信保証レベル(at-most-once, at-least-once, exactly-once)を説明できる
- メッセージの順序保証の仕組みを理解した
- Enterprise Integration Patternsの主要パターンを把握した
次のステップへ
次は「Apache Kafkaの基礎」を学びます。メッセージングパターンの理論を、実際のメッセージブローカーであるKafkaでどう実現するのかを見ていきましょう。
推定読了時間: 30分