LESSON 40分

ストーリー

佐藤CTO
プロンプトの品質は確保できた。次はLLM APIとの統合だ
佐藤CTO
先週、OpenAIのAPIが30分間ダウンした。その間、うちのサービスも全停止。これは設計の問題だ
あなた
リトライとフォールバックが必要ですね
佐藤CTO
それだけじゃない。指数バックオフ、サーキットブレーカー、レート制限の管理。本番のLLM統合は”APIを叩くだけ”では済まない

LLM APIクライアントの設計

抽象化レイヤー

複数のLLMプロバイダーを統一的に扱うための抽象化が重要です。

// LLMプロバイダーの抽象インターフェース
interface LLMProvider {
  readonly name: string;
  readonly modelId: string;

  complete(
    messages: ChatMessage[],
    options?: CompletionOptions,
  ): Promise<CompletionResult>;

  stream(
    messages: ChatMessage[],
    options?: CompletionOptions,
  ): AsyncIterable<StreamChunk>;

  countTokens(text: string): number;
}

interface CompletionOptions {
  temperature?: number;
  maxTokens?: number;
  responseFormat?: { type: 'text' | 'json_object' };
  stopSequences?: string[];
  timeout?: number;
}

interface CompletionResult {
  content: string;
  usage: {
    inputTokens: number;
    outputTokens: number;
    totalTokens: number;
  };
  model: string;
  finishReason: 'stop' | 'length' | 'content_filter';
  latencyMs: number;
}

OpenAI / Anthropic プロバイダー実装

class OpenAIProvider implements LLMProvider {
  readonly name = 'openai';

  constructor(
    readonly modelId: string,
    private readonly client: OpenAI,
  ) {}

  async complete(
    messages: ChatMessage[],
    options?: CompletionOptions,
  ): Promise<CompletionResult> {
    const startTime = Date.now();

    const response = await this.client.chat.completions.create({
      model: this.modelId,
      messages: messages.map(m => ({
        role: m.role,
        content: m.content,
      })),
      temperature: options?.temperature ?? 0,
      max_tokens: options?.maxTokens ?? 1000,
      response_format: options?.responseFormat,
      stop: options?.stopSequences,
    });

    return {
      content: response.choices[0].message.content ?? '',
      usage: {
        inputTokens: response.usage?.prompt_tokens ?? 0,
        outputTokens: response.usage?.completion_tokens ?? 0,
        totalTokens: response.usage?.total_tokens ?? 0,
      },
      model: response.model,
      finishReason: response.choices[0].finish_reason as CompletionResult['finishReason'],
      latencyMs: Date.now() - startTime,
    };
  }

  async *stream(
    messages: ChatMessage[],
    options?: CompletionOptions,
  ): AsyncIterable<StreamChunk> {
    const response = await this.client.chat.completions.create({
      model: this.modelId,
      messages: messages.map(m => ({ role: m.role, content: m.content })),
      temperature: options?.temperature ?? 0,
      max_tokens: options?.maxTokens ?? 1000,
      stream: true,
    });

    for await (const chunk of response) {
      const delta = chunk.choices[0]?.delta?.content;
      if (delta) {
        yield { type: 'content', content: delta };
      }
    }
  }

  countTokens(text: string): number {
    // tiktoken等を使用した実装
    return Math.ceil(text.length / 3);
  }
}

class AnthropicProvider implements LLMProvider {
  readonly name = 'anthropic';

  constructor(
    readonly modelId: string,
    private readonly client: Anthropic,
  ) {}

  async complete(
    messages: ChatMessage[],
    options?: CompletionOptions,
  ): Promise<CompletionResult> {
    const startTime = Date.now();
    const systemMessage = messages.find(m => m.role === 'system');
    const userMessages = messages.filter(m => m.role !== 'system');

    const response = await this.client.messages.create({
      model: this.modelId,
      system: systemMessage?.content,
      messages: userMessages.map(m => ({
        role: m.role as 'user' | 'assistant',
        content: m.content,
      })),
      max_tokens: options?.maxTokens ?? 1000,
      temperature: options?.temperature ?? 0,
    });

    const textBlock = response.content.find(b => b.type === 'text');

    return {
      content: textBlock?.text ?? '',
      usage: {
        inputTokens: response.usage.input_tokens,
        outputTokens: response.usage.output_tokens,
        totalTokens: response.usage.input_tokens + response.usage.output_tokens,
      },
      model: response.model,
      finishReason: response.stop_reason === 'end_turn' ? 'stop' : 'length',
      latencyMs: Date.now() - startTime,
    };
  }

  async *stream(
    messages: ChatMessage[],
    options?: CompletionOptions,
  ): AsyncIterable<StreamChunk> {
    // Anthropicのストリーミング実装
    const systemMessage = messages.find(m => m.role === 'system');
    const userMessages = messages.filter(m => m.role !== 'system');

    const stream = this.client.messages.stream({
      model: this.modelId,
      system: systemMessage?.content,
      messages: userMessages.map(m => ({
        role: m.role as 'user' | 'assistant',
        content: m.content,
      })),
      max_tokens: options?.maxTokens ?? 1000,
    });

    for await (const event of stream) {
      if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
        yield { type: 'content', content: event.delta.text };
      }
    }
  }

  countTokens(text: string): number {
    return Math.ceil(text.length / 3.5);
  }
}

指数バックオフ付きリトライ

リトライ戦略

interface RetryConfig {
  maxRetries: number;
  baseDelayMs: number;
  maxDelayMs: number;
  retryableErrors: string[];
  jitter: boolean;
}

const DEFAULT_RETRY_CONFIG: RetryConfig = {
  maxRetries: 3,
  baseDelayMs: 1000,
  maxDelayMs: 30000,
  retryableErrors: [
    'rate_limit_exceeded',
    'server_error',
    'timeout',
    'overloaded',
    'ECONNRESET',
    'ETIMEDOUT',
  ],
  jitter: true,
};

class RetryableLLMClient {
  constructor(
    private readonly provider: LLMProvider,
    private readonly config: RetryConfig = DEFAULT_RETRY_CONFIG,
    private readonly logger: Logger,
  ) {}

  async complete(
    messages: ChatMessage[],
    options?: CompletionOptions,
  ): Promise<CompletionResult> {
    let lastError: Error | null = null;

    for (let attempt = 0; attempt <= this.config.maxRetries; attempt++) {
      try {
        const result = await this.withTimeout(
          this.provider.complete(messages, options),
          options?.timeout ?? 30000,
        );
        return result;
      } catch (error) {
        lastError = error as Error;

        if (!this.isRetryable(error as Error)) {
          throw error;
        }

        if (attempt < this.config.maxRetries) {
          const delay = this.calculateDelay(attempt);
          this.logger.warn(
            `LLM API retry ${attempt + 1}/${this.config.maxRetries}`,
            {
              provider: this.provider.name,
              model: this.provider.modelId,
              error: (error as Error).message,
              delayMs: delay,
            },
          );
          await this.sleep(delay);
        }
      }
    }

    throw new Error(
      `LLM API failed after ${this.config.maxRetries + 1} attempts: ${lastError?.message}`,
    );
  }

  private calculateDelay(attempt: number): number {
    // 指数バックオフ: baseDelay * 2^attempt
    let delay = this.config.baseDelayMs * Math.pow(2, attempt);
    delay = Math.min(delay, this.config.maxDelayMs);

    // ジッター: 0.5〜1.5倍のランダム化
    if (this.config.jitter) {
      delay = delay * (0.5 + Math.random());
    }

    return Math.floor(delay);
  }

  private isRetryable(error: Error): boolean {
    const errorMessage = error.message.toLowerCase();
    return this.config.retryableErrors.some(
      pattern => errorMessage.includes(pattern.toLowerCase()),
    );
  }

  private withTimeout<T>(
    promise: Promise<T>,
    timeoutMs: number,
  ): Promise<T> {
    return new Promise((resolve, reject) => {
      const timer = setTimeout(
        () => reject(new Error(`timeout: LLM API did not respond within ${timeoutMs}ms`)),
        timeoutMs,
      );
      promise
        .then(result => { clearTimeout(timer); resolve(result); })
        .catch(error => { clearTimeout(timer); reject(error); });
    });
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

サーキットブレーカー

状態遷移

graph LR
    CLOSED["CLOSED
(通常状態)"] OPEN["OPEN
(遮断状態)"] HALFOPEN["HALF-OPEN
(回復確認中)"] CLOSED -->|"失敗がしきい値に達する"| OPEN OPEN -->|"timeout経過"| HALFOPEN HALFOPEN -->|"成功"| CLOSED HALFOPEN -->|"失敗"| OPEN CLOSED -->|"成功"| CLOSED NOTE["HALF-OPEN: 限定的にリクエストを通して回復を確認"] style CLOSED fill:#d1fae5,stroke:#059669,color:#065f46 style OPEN fill:#fee2e2,stroke:#dc2626,color:#991b1b style HALFOPEN fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e style NOTE fill:#f3f4f6,stroke:#9ca3af,color:#374151

実装

type CircuitState = 'CLOSED' | 'OPEN' | 'HALF_OPEN';

interface CircuitBreakerConfig {
  failureThreshold: number;    // OPEN に遷移する失敗回数
  recoveryTimeout: number;     // OPEN → HALF_OPEN の待機時間(ms)
  halfOpenMaxRequests: number;  // HALF_OPEN で許可するリクエスト数
}

class LLMCircuitBreaker {
  private state: CircuitState = 'CLOSED';
  private failureCount = 0;
  private lastFailureTime = 0;
  private halfOpenRequests = 0;

  constructor(
    private readonly config: CircuitBreakerConfig = {
      failureThreshold: 5,
      recoveryTimeout: 60000,
      halfOpenMaxRequests: 3,
    },
    private readonly logger: Logger,
  ) {}

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (!this.canExecute()) {
      throw new Error(
        `Circuit breaker is OPEN for ${this.config.recoveryTimeout / 1000}s. ` +
        `Failures: ${this.failureCount}`,
      );
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private canExecute(): boolean {
    switch (this.state) {
      case 'CLOSED':
        return true;

      case 'OPEN': {
        const elapsed = Date.now() - this.lastFailureTime;
        if (elapsed >= this.config.recoveryTimeout) {
          this.state = 'HALF_OPEN';
          this.halfOpenRequests = 0;
          this.logger.info('Circuit breaker: OPEN → HALF_OPEN');
          return true;
        }
        return false;
      }

      case 'HALF_OPEN':
        return this.halfOpenRequests < this.config.halfOpenMaxRequests;
    }
  }

  private onSuccess(): void {
    if (this.state === 'HALF_OPEN') {
      this.halfOpenRequests++;
      if (this.halfOpenRequests >= this.config.halfOpenMaxRequests) {
        this.state = 'CLOSED';
        this.failureCount = 0;
        this.logger.info('Circuit breaker: HALF_OPEN → CLOSED');
      }
    } else {
      this.failureCount = 0;
    }
  }

  private onFailure(): void {
    this.failureCount++;
    this.lastFailureTime = Date.now();

    if (this.state === 'HALF_OPEN') {
      this.state = 'OPEN';
      this.logger.warn('Circuit breaker: HALF_OPEN → OPEN');
    } else if (this.failureCount >= this.config.failureThreshold) {
      this.state = 'OPEN';
      this.logger.warn(
        `Circuit breaker: CLOSED → OPEN (failures: ${this.failureCount})`,
      );
    }
  }

  getState(): { state: CircuitState; failureCount: number } {
    return { state: this.state, failureCount: this.failureCount };
  }
}

レート制限の管理

トークンバケットアルゴリズム

class TokenBucket {
  private tokens: number;
  private lastRefill: number;

  constructor(
    private readonly capacity: number,     // バケットの最大容量
    private readonly refillRate: number,    // 1秒あたりの補充トークン数
  ) {
    this.tokens = capacity;
    this.lastRefill = Date.now();
  }

  async acquire(count: number = 1): Promise<void> {
    this.refill();

    if (this.tokens >= count) {
      this.tokens -= count;
      return;
    }

    // トークンが足りない場合は待機
    const deficit = count - this.tokens;
    const waitMs = (deficit / this.refillRate) * 1000;
    await new Promise(resolve => setTimeout(resolve, waitMs));
    this.refill();
    this.tokens -= count;
  }

  private refill(): void {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(
      this.capacity,
      this.tokens + elapsed * this.refillRate,
    );
    this.lastRefill = now;
  }
}

// プロバイダー別のレート制限管理
class RateLimitManager {
  private buckets: Map<string, {
    rpm: TokenBucket;   // Requests Per Minute
    tpm: TokenBucket;   // Tokens Per Minute
  }> = new Map();

  constructor() {
    // OpenAI GPT-4o-mini の制限例
    this.buckets.set('openai:gpt-4o-mini', {
      rpm: new TokenBucket(500, 500 / 60),      // 500 RPM
      tpm: new TokenBucket(200000, 200000 / 60), // 200K TPM
    });

    // Anthropic Claude 3 Haiku の制限例
    this.buckets.set('anthropic:claude-3-haiku', {
      rpm: new TokenBucket(1000, 1000 / 60),
      tpm: new TokenBucket(100000, 100000 / 60),
    });
  }

  async acquireRequest(providerId: string): Promise<void> {
    const bucket = this.buckets.get(providerId);
    if (bucket) {
      await bucket.rpm.acquire(1);
    }
  }

  async acquireTokens(providerId: string, tokenCount: number): Promise<void> {
    const bucket = this.buckets.get(providerId);
    if (bucket) {
      await bucket.tpm.acquire(tokenCount);
    }
  }
}

統合LLMクライアント

全パターンの統合

class ResilientLLMClient {
  constructor(
    private readonly provider: LLMProvider,
    private readonly retryClient: RetryableLLMClient,
    private readonly circuitBreaker: LLMCircuitBreaker,
    private readonly rateLimiter: RateLimitManager,
    private readonly logger: Logger,
  ) {}

  async complete(
    messages: ChatMessage[],
    options?: CompletionOptions,
  ): Promise<CompletionResult> {
    const providerId = `${this.provider.name}:${this.provider.modelId}`;

    // 1. レート制限チェック
    const estimatedTokens = messages
      .reduce((sum, m) => sum + this.provider.countTokens(m.content), 0);
    await this.rateLimiter.acquireRequest(providerId);
    await this.rateLimiter.acquireTokens(providerId, estimatedTokens);

    // 2. サーキットブレーカー + リトライ
    return this.circuitBreaker.execute(
      () => this.retryClient.complete(messages, options),
    );
  }
}

まとめ

ポイント内容
抽象化プロバイダーに依存しないインターフェースで統一
リトライ指数バックオフ + ジッターでリトライストームを防止
サーキットブレーカーCLOSED → OPEN → HALF_OPEN の3状態で障害を隔離
レート制限トークンバケットでRPM/TPMを管理し、429エラーを事前に回避

チェックリスト

  • LLMプロバイダーの抽象化インターフェースを理解した
  • 指数バックオフとジッターの仕組みを理解した
  • サーキットブレーカーの3つの状態遷移を理解した
  • レート制限管理の実装パターンを理解した

次のステップへ

LLM API統合の基本パターンを学びました。次のセクションでは、LLM特有のエラーハンドリングとフォールバック戦略を深掘りします。

本番のLLM統合は”APIを叩くだけ”ではない。リトライ、サーキットブレーカー、レート制限の三位一体で堅牢性を確保する。


推定読了時間: 40分