LESSON 40分

「EventBridge、SQS、SNS、Step Functions。これらを正しく組み合わせることが、サーバーレスアーキテクチャの腕の見せ所だ。どれを使うか迷ったら、まず要件を整理しよう」と佐藤CTOがアドバイスした。

推定読了時間: 40分


メッセージングサービスの使い分け

サービス用途特徴
EventBridgeイベントバスルールベースルーティング、スキーマレジストリ
SQSメッセージキューバッファリング、順序保証(FIFO)、DLQ
SNSパブサブFan-out、フィルタリング
Kinesisストリーミング大量データの順序保証、リプレイ
graph TD
    A{"1対1の通信?"} -->|Yes| B["SQS"]
    A -->|No| C{"1対多の通信?"}
    C -->|Yes| D["SNS or EventBridge"]
    C -->|No| E{"複雑なルーティング?"}
    E -->|Yes| F["EventBridge"]
    E -->|No| G{"大量ストリーム?"}
    G -->|Yes| H["Kinesis"]

    classDef decision fill:#f39c12,stroke:#e67e22,color:#fff
    classDef service fill:#3498db,stroke:#2980b9,color:#fff
    class A,C,E,G decision
    class B,D,F,H service

EventBridge 統合

イベントスキーマ定義

// domain/events/OrderEvents.ts
interface OrderEvent {
  source: "order-service";
  "detail-type": "OrderCreated" | "OrderConfirmed" | "OrderShipped" | "OrderCancelled";
  detail: {
    orderId: string;
    customerId: string;
    totalAmount: number;
    items: Array<{
      productId: string;
      quantity: number;
      price: number;
    }>;
    timestamp: string;
  };
}

EventBridge ルールとターゲット

# CDK で EventBridge を構築
Resources:
  OrderEventBus:
    Type: AWS::Events::EventBus
    Properties:
      Name: order-events

  # 注文作成 → 在庫確認
  OrderCreatedToInventory:
    Type: AWS::Events::Rule
    Properties:
      EventBusName: !Ref OrderEventBus
      EventPattern:
        source:
          - "order-service"
        detail-type:
          - "OrderCreated"
      Targets:
        - Arn: !GetAtt InventoryCheckFunction.Arn
          Id: inventory-check
          # Input Transformer
          InputTransformer:
            InputPathsMap:
              orderId: "$.detail.orderId"
              items: "$.detail.items"
            InputTemplate: |
              {
                "orderId": <orderId>,
                "items": <items>,
                "action": "CHECK_INVENTORY"
              }

  # 高額注文 → 承認フロー
  HighValueOrderRule:
    Type: AWS::Events::Rule
    Properties:
      EventBusName: !Ref OrderEventBus
      EventPattern:
        source:
          - "order-service"
        detail-type:
          - "OrderCreated"
        detail:
          totalAmount:
            - numeric:
                - ">="
                - 100000
      Targets:
        - Arn: !Ref ApprovalStateMachine
          Id: approval-flow
          RoleArn: !GetAtt EventBridgeRole.Arn

イベント発行

import { EventBridgeClient, PutEventsCommand } from "@aws-sdk/client-eventbridge";

const eventBridge = new EventBridgeClient({ region: "ap-northeast-1" });

export const publishOrderEvent = async (
  eventType: string,
  order: Order,
) => {
  await eventBridge.send(new PutEventsCommand({
    Entries: [{
      Source: "order-service",
      DetailType: eventType,
      Detail: JSON.stringify({
        orderId: order.id,
        customerId: order.customerId,
        totalAmount: order.totalAmount,
        items: order.items,
        timestamp: new Date().toISOString(),
      }),
      EventBusName: process.env.EVENT_BUS_NAME,
    }],
  }));
};

SQS + Lambda 統合

FIFO キューでの順序保証

Resources:
  OrderProcessingQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: order-processing.fifo
      FifoQueue: true
      ContentBasedDeduplication: false
      DeduplicationScope: messageGroup
      FifoThroughputLimit: perMessageGroupId
      VisibilityTimeout: 300
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt OrderDLQ.Arn
        maxReceiveCount: 3

  OrderDLQ:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: order-processing-dlq.fifo
      FifoQueue: true
      MessageRetentionPeriod: 1209600

  OrderProcessor:
    Type: AWS::Serverless::Function
    Properties:
      Handler: dist/orderProcessor.handler
      Runtime: nodejs20.x
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt OrderProcessingQueue.Arn
            BatchSize: 10
            MaximumBatchingWindowInSeconds: 5
            FunctionResponseTypes:
              - ReportBatchItemFailures

バッチ処理と部分失敗

import { SQSBatchResponse, SQSEvent, SQSRecord } from "aws-lambda";

export const handler = async (event: SQSEvent): Promise<SQSBatchResponse> => {
  const batchItemFailures: Array<{ itemIdentifier: string }> = [];

  for (const record of event.Records) {
    try {
      await processMessage(record);
    } catch (error) {
      console.error(`Failed to process message: ${record.messageId}`, error);
      batchItemFailures.push({ itemIdentifier: record.messageId });
    }
  }

  // 失敗したメッセージのみ再処理される
  return { batchItemFailures };
};

async function processMessage(record: SQSRecord): Promise<void> {
  const order = JSON.parse(record.body);
  await validateOrder(order);
  await saveOrder(order);
  await notifyOrderCreated(order);
}

Step Functions

注文処理ワークフロー

# Step Functions 定義 (ASL - Amazon States Language)
Resources:
  OrderProcessingStateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      DefinitionUri: statemachine/order-processing.asl.json
      Policies:
        - LambdaInvokePolicy:
            FunctionName: !Ref InventoryCheckFunction
        - LambdaInvokePolicy:
            FunctionName: !Ref PaymentFunction
        - LambdaInvokePolicy:
            FunctionName: !Ref ShippingFunction
{
  "Comment": "Order Processing Workflow",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "${ValidateOrderFunctionArn}",
      "Next": "CheckInventory",
      "Catch": [{
        "ErrorEquals": ["ValidationError"],
        "Next": "OrderRejected"
      }]
    },
    "CheckInventory": {
      "Type": "Task",
      "Resource": "${InventoryCheckFunctionArn}",
      "Next": "ProcessPayment",
      "Catch": [{
        "ErrorEquals": ["InsufficientStock"],
        "Next": "NotifyOutOfStock"
      }],
      "Retry": [{
        "ErrorEquals": ["ServiceUnavailable"],
        "IntervalSeconds": 2,
        "MaxAttempts": 3,
        "BackoffRate": 2
      }]
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "${PaymentFunctionArn}",
      "Next": "ParallelNotifications",
      "Catch": [{
        "ErrorEquals": ["PaymentFailed"],
        "Next": "ReleaseInventory"
      }]
    },
    "ParallelNotifications": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "SendConfirmation",
          "States": {
            "SendConfirmation": {
              "Type": "Task",
              "Resource": "${NotificationFunctionArn}",
              "End": true
            }
          }
        },
        {
          "StartAt": "UpdateAnalytics",
          "States": {
            "UpdateAnalytics": {
              "Type": "Task",
              "Resource": "${AnalyticsFunctionArn}",
              "End": true
            }
          }
        }
      ],
      "Next": "ArrangeShipping"
    },
    "ArrangeShipping": {
      "Type": "Task",
      "Resource": "${ShippingFunctionArn}",
      "Next": "OrderComplete"
    },
    "OrderComplete": {
      "Type": "Succeed"
    },
    "ReleaseInventory": {
      "Type": "Task",
      "Resource": "${ReleaseInventoryFunctionArn}",
      "Next": "OrderFailed"
    },
    "NotifyOutOfStock": {
      "Type": "Task",
      "Resource": "${NotificationFunctionArn}",
      "Next": "OrderFailed"
    },
    "OrderRejected": {
      "Type": "Fail",
      "Error": "OrderRejected",
      "Cause": "Order validation failed"
    },
    "OrderFailed": {
      "Type": "Fail",
      "Error": "OrderFailed",
      "Cause": "Order processing failed"
    }
  }
}

Saga パターン(補償トランザクション)

正常フロー:
  在庫確保 → 決済 → 出荷指示 → 完了

補償フロー(決済失敗時):
  在庫確保 → 決済(失敗) → 在庫解放(補償) → 失敗通知
CDK での Step Functions 定義
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";

const reserveInventory = new tasks.LambdaInvoke(this, "ReserveInventory", {
  lambdaFunction: inventoryFn,
  resultPath: "$.inventoryResult",
});

const processPayment = new tasks.LambdaInvoke(this, "ProcessPayment", {
  lambdaFunction: paymentFn,
  resultPath: "$.paymentResult",
});

const releaseInventory = new tasks.LambdaInvoke(this, "ReleaseInventory", {
  lambdaFunction: releaseInventoryFn,
});

const orderFailed = new sfn.Fail(this, "OrderFailed", {
  error: "OrderProcessingFailed",
});

// Saga: 決済失敗時に在庫を解放
const paymentWithCompensation = processPayment
  .addCatch(releaseInventory.next(orderFailed), {
    errors: ["PaymentFailed"],
    resultPath: "$.error",
  });

const definition = reserveInventory
  .next(paymentWithCompensation)
  .next(new tasks.LambdaInvoke(this, "ArrangeShipping", {
    lambdaFunction: shippingFn,
  }));

new sfn.StateMachine(this, "OrderProcessing", {
  definitionBody: sfn.DefinitionBody.fromChainable(definition),
  timeout: Duration.minutes(5),
});

Event Sourcing on Serverless

graph TD
    A["API Gateway"] --> B["Lambda<br/>(Command)"]
    B --> C["DynamoDB<br/>(Event Store)"]
    C -->|"DynamoDB Streams"| D["Lambda<br/>(Projector)"]
    D --> E["DynamoDB<br/>(Read Model)"]

    classDef gateway fill:#8e44ad,stroke:#6c3483,color:#fff
    classDef compute fill:#e67e22,stroke:#d35400,color:#fff
    classDef storage fill:#3498db,stroke:#2980b9,color:#fff
    class A gateway
    class B,D compute
    class C,E storage

まとめ

統合パターンサービス構成用途
イベントルーティングEventBridge + Lambda複雑なイベント駆動
キューイングSQS + Lambdaバッファリング、順序保証
ワークフローStep Functions複数ステップの処理
SagaStep Functions + 補償Lambda分散トランザクション
Event SourcingDynamoDB Streams + Lambda完全な監査証跡

チェックリスト

  • EventBridge のルールとパターンマッチングを設定できる
  • SQS + Lambda のバッチ処理と部分失敗処理を実装できる
  • Step Functions でワークフローを定義できる
  • Saga パターンで補償トランザクションを実装できる
  • 各メッセージングサービスの使い分けができる

次のステップへ

次のレッスンでは、サーバーレスの限界と「使うべきでない場面」を学びます。