EXERCISE 60分

ストーリー

佐藤CTO
設計だけでは分からないことがある。実際に構築してみよう
佐藤CTO
pgvectorを使ってベクトルDBを構築し、ハイブリッド検索まで実装してほしい。インデックスのパラメータチューニングも忘れずに

演習の概要

ミッションテーマ難易度
Mission 1エンベディングパイプラインの実装中級
Mission 2pgvectorスキーマとインデックス設計中級
Mission 3ハイブリッド検索の実装上級
Mission 4セマンティックキャッシュの実装上級

Mission 1: エンベディングパイプラインの実装

以下の要件を満たすエンベディングパイプラインを実装してください。

要件:
- Markdownドキュメントを読み込み、構造ベースでチャンキング
- 各チャンクにメタデータ(タイトル、セクション、タグ)を付与
- バッチ処理でエンベディングを生成
- エラーハンドリング(API障害時のリトライ)

実装してください:チャンキング → メタデータ付与 → バッチエンベディング生成のパイプライン

解答例
import { OpenAI } from 'openai';

interface ProcessedChunk {
  id: string;
  content: string;
  embedding: number[];
  metadata: {
    documentId: string;
    documentTitle: string;
    sectionTitle: string;
    headingPath: string[];
    tags: string[];
    chunkIndex: number;
    totalChunks: number;
  };
}

class EmbeddingPipeline {
  private openai: OpenAI;

  constructor(apiKey: string) {
    this.openai = new OpenAI({ apiKey });
  }

  async process(
    documentId: string,
    markdown: string,
    tags: string[] = [],
  ): Promise<ProcessedChunk[]> {
    // 1. チャンキング
    const chunks = this.chunkMarkdown(markdown);
    const documentTitle = this.extractTitle(markdown);

    // 2. メタデータ付与
    const enrichedChunks = chunks.map((chunk, index) => ({
      id: `${documentId}-chunk-${index}`,
      content: chunk.content,
      metadata: {
        documentId,
        documentTitle,
        sectionTitle: chunk.heading,
        headingPath: chunk.headingPath,
        tags,
        chunkIndex: index,
        totalChunks: chunks.length,
      },
    }));

    // 3. バッチエンベディング生成(リトライ付き)
    const texts = enrichedChunks.map(c =>
      `${c.metadata.documentTitle} > ${c.metadata.sectionTitle}\n\n${c.content}`
    );
    const embeddings = await this.batchEmbed(texts);

    return enrichedChunks.map((chunk, i) => ({
      ...chunk,
      embedding: embeddings[i],
    }));
  }

  private async batchEmbed(
    texts: string[],
    batchSize: number = 100,
    maxRetries: number = 3,
  ): Promise<number[][]> {
    const allEmbeddings: number[][] = [];

    for (let i = 0; i < texts.length; i += batchSize) {
      const batch = texts.slice(i, i + batchSize);
      let lastError: Error | null = null;

      for (let attempt = 0; attempt < maxRetries; attempt++) {
        try {
          const response = await this.openai.embeddings.create({
            model: 'text-embedding-3-small',
            input: batch,
            dimensions: 1536,
          });
          allEmbeddings.push(...response.data.map(d => d.embedding));
          lastError = null;
          break;
        } catch (error) {
          lastError = error as Error;
          const backoff = Math.pow(2, attempt) * 1000;
          console.warn(`Embedding retry ${attempt + 1}/${maxRetries}, waiting ${backoff}ms`);
          await new Promise(resolve => setTimeout(resolve, backoff));
        }
      }

      if (lastError) throw lastError;
    }

    return allEmbeddings;
  }

  private chunkMarkdown(markdown: string): Array<{
    content: string;
    heading: string;
    headingPath: string[];
  }> {
    const lines = markdown.split('\n');
    const chunks: Array<{ content: string; heading: string; headingPath: string[] }> = [];
    let current: string[] = [];
    let headingStack: { text: string; level: number }[] = [];

    for (const line of lines) {
      const match = line.match(/^(#{1,6})\s+(.+)/);
      if (match) {
        if (current.length > 0) {
          chunks.push({
            content: current.join('\n').trim(),
            heading: headingStack[headingStack.length - 1]?.text ?? '',
            headingPath: headingStack.map(h => h.text),
          });
          current = [];
        }
        const level = match[1].length;
        while (headingStack.length > 0 && headingStack[headingStack.length - 1].level >= level) {
          headingStack.pop();
        }
        headingStack.push({ text: match[2], level });
      }
      current.push(line);
    }

    if (current.length > 0) {
      chunks.push({
        content: current.join('\n').trim(),
        heading: headingStack[headingStack.length - 1]?.text ?? '',
        headingPath: headingStack.map(h => h.text),
      });
    }

    return chunks;
  }

  private extractTitle(markdown: string): string {
    const match = markdown.match(/^#\s+(.+)/m);
    return match?.[1] ?? 'Untitled';
  }
}

Mission 2: pgvectorスキーマとインデックス設計

以下の要件を満たすpgvectorのスキーマとインデックスを設計してください。

要件:
- 1536次元のエンベディングを格納
- メタデータでのフィルタリング(documentType, team, updatedAt)
- 50万ベクトルを想定
- HNSW インデックスの適切なパラメータ設計

SQLとTypeScriptの実装コードを書いてください。

解答例
-- 拡張の有効化
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm;

-- テーブル作成
CREATE TABLE document_chunks (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  document_id VARCHAR(255) NOT NULL,
  content TEXT NOT NULL,
  embedding vector(1536) NOT NULL,
  document_type VARCHAR(50) NOT NULL,  -- 'tech-doc', 'incident', 'adr'
  team VARCHAR(100),
  section_title VARCHAR(500),
  heading_path TEXT[],
  tags TEXT[] DEFAULT '{}',
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- HNSW インデックス(50万ベクトル向け)
CREATE INDEX CONCURRENTLY idx_chunks_hnsw
ON document_chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 256);

-- メタデータフィルタ用インデックス
CREATE INDEX idx_chunks_doc_type ON document_chunks (document_type);
CREATE INDEX idx_chunks_team ON document_chunks (team);
CREATE INDEX idx_chunks_updated ON document_chunks (updated_at);
CREATE INDEX idx_chunks_tags ON document_chunks USING gin (tags);

-- 全文検索インデックス
CREATE INDEX idx_chunks_fts ON document_chunks
USING gin (to_tsvector('simple', content));
class PgVectorRepository {
  constructor(private readonly pool: Pool) {}

  async searchWithFilters(
    queryEmbedding: number[],
    filters: {
      documentType?: string;
      team?: string;
      updatedAfter?: Date;
      tags?: string[];
    },
    topK: number = 5,
    efSearch: number = 128,
  ): Promise<RetrievedContext[]> {
    // ef_search の設定
    await this.pool.query(`SET hnsw.ef_search = ${efSearch}`);

    const conditions: string[] = [];
    const params: unknown[] = [`[${queryEmbedding.join(',')}]`, topK];
    let paramIdx = 3;

    if (filters.documentType) {
      conditions.push(`document_type = $${paramIdx++}`);
      params.push(filters.documentType);
    }
    if (filters.team) {
      conditions.push(`team = $${paramIdx++}`);
      params.push(filters.team);
    }
    if (filters.updatedAfter) {
      conditions.push(`updated_at >= $${paramIdx++}`);
      params.push(filters.updatedAfter);
    }
    if (filters.tags?.length) {
      conditions.push(`tags && $${paramIdx++}`);
      params.push(filters.tags);
    }

    const whereClause = conditions.length > 0
      ? `WHERE ${conditions.join(' AND ')}`
      : '';

    const result = await this.pool.query(
      `SELECT id, document_id, content, section_title, heading_path, tags,
              document_type, team, updated_at,
              1 - (embedding <=> $1::vector) AS score
       FROM document_chunks
       ${whereClause}
       ORDER BY embedding <=> $1::vector
       LIMIT $2`,
      params,
    );

    return result.rows.map(row => ({
      chunk: {
        id: row.id,
        documentId: row.document_id,
        content: row.content,
        metadata: {
          sectionTitle: row.section_title,
          headingPath: row.heading_path,
          tags: row.tags,
          documentType: row.document_type,
          team: row.team,
          updatedAt: row.updated_at,
        },
      },
      score: row.score,
    }));
  }
}

Mission 3: ハイブリッド検索の実装

pgvectorのベクトル検索とPostgreSQLの全文検索を組み合わせた、RRFベースのハイブリッド検索を実装してください。

解答例
class PgHybridSearchService {
  constructor(private readonly pool: Pool) {}

  async hybridSearch(
    query: string,
    queryEmbedding: number[],
    options: {
      topK?: number;
      vectorWeight?: number;
      filters?: Record<string, unknown>;
    } = {},
  ): Promise<RetrievedContext[]> {
    const { topK = 5, vectorWeight = 0.6 } = options;
    const candidateK = topK * 4;
    const keywordWeight = 1 - vectorWeight;

    const result = await this.pool.query(
      `WITH vector_results AS (
        SELECT id, content, document_id,
               section_title, heading_path, tags, document_type, team, updated_at,
               ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) AS rank
        FROM document_chunks
        ORDER BY embedding <=> $1::vector
        LIMIT $3
      ),
      keyword_results AS (
        SELECT id, content, document_id,
               section_title, heading_path, tags, document_type, team, updated_at,
               ROW_NUMBER() OVER (
                 ORDER BY ts_rank(to_tsvector('simple', content),
                                  plainto_tsquery('simple', $2)) DESC
               ) AS rank
        FROM document_chunks
        WHERE to_tsvector('simple', content) @@ plainto_tsquery('simple', $2)
        LIMIT $3
      ),
      combined AS (
        SELECT
          COALESCE(v.id, k.id) AS id,
          COALESCE(v.content, k.content) AS content,
          COALESCE(v.document_id, k.document_id) AS document_id,
          COALESCE(v.section_title, k.section_title) AS section_title,
          COALESCE(v.heading_path, k.heading_path) AS heading_path,
          COALESCE(v.tags, k.tags) AS tags,
          COALESCE(v.document_type, k.document_type) AS document_type,
          COALESCE(v.team, k.team) AS team,
          COALESCE($4::float / (60.0 + v.rank), 0) +
          COALESCE($5::float / (60.0 + k.rank), 0) AS rrf_score
        FROM vector_results v
        FULL OUTER JOIN keyword_results k ON v.id = k.id
      )
      SELECT * FROM combined
      ORDER BY rrf_score DESC
      LIMIT $6`,
      [
        `[${queryEmbedding.join(',')}]`,
        query,
        candidateK,
        vectorWeight,
        keywordWeight,
        topK,
      ],
    );

    return result.rows.map(row => ({
      chunk: {
        id: row.id,
        documentId: row.document_id,
        content: row.content,
        metadata: {
          sectionTitle: row.section_title,
          headingPath: row.heading_path,
          tags: row.tags,
          documentType: row.document_type,
          team: row.team,
        },
      },
      score: row.rrf_score,
    }));
  }
}

Mission 4: セマンティックキャッシュの実装

pgvectorを使ったセマンティックキャッシュを実装し、RAGパイプラインに統合してください。

要件:
- クエリのコサイン類似度 > 0.95 でキャッシュヒット判定
- TTL: 1時間
- キャッシュヒット率のメトリクス記録
解答例
class PgSemanticCache {
  constructor(
    private readonly pool: Pool,
    private readonly embedder: EmbeddingService,
    private readonly threshold: number = 0.95,
    private readonly ttlMs: number = 3_600_000,
  ) {}

  async initialize(): Promise<void> {
    await this.pool.query(`
      CREATE TABLE IF NOT EXISTS semantic_cache (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        query_text TEXT NOT NULL,
        query_embedding vector(1536) NOT NULL,
        answer TEXT NOT NULL,
        contexts JSONB NOT NULL,
        created_at TIMESTAMPTZ DEFAULT NOW()
      )
    `);
    await this.pool.query(`
      CREATE INDEX IF NOT EXISTS idx_cache_hnsw
      ON semantic_cache USING hnsw (query_embedding vector_cosine_ops)
      WITH (m = 16, ef_construction = 100)
    `);
  }

  async get(query: string): Promise<{
    answer: string;
    contexts: string[];
  } | null> {
    const embedding = await this.embedder.embed(query);

    const result = await this.pool.query(
      `SELECT id, answer, contexts, created_at,
              1 - (query_embedding <=> $1::vector) AS similarity
       FROM semantic_cache
       WHERE 1 - (query_embedding <=> $1::vector) > $2
         AND created_at > NOW() - INTERVAL '${this.ttlMs / 1000} seconds'
       ORDER BY query_embedding <=> $1::vector
       LIMIT 1`,
      [`[${embedding.join(',')}]`, this.threshold],
    );

    if (result.rows.length === 0) return null;

    return {
      answer: result.rows[0].answer,
      contexts: result.rows[0].contexts,
    };
  }

  async set(query: string, answer: string, contexts: string[]): Promise<void> {
    const embedding = await this.embedder.embed(query);

    await this.pool.query(
      `INSERT INTO semantic_cache (query_text, query_embedding, answer, contexts)
       VALUES ($1, $2::vector, $3, $4::jsonb)`,
      [`${query}`, `[${embedding.join(',')}]`, answer, JSON.stringify(contexts)],
    );
  }

  async cleanup(): Promise<void> {
    await this.pool.query(
      `DELETE FROM semantic_cache
       WHERE created_at < NOW() - INTERVAL '${this.ttlMs / 1000} seconds'`,
    );
  }
}

まとめ

ポイント内容
エンベディングパイプラインチャンキング → メタデータ → バッチ生成のフロー
pgvectorスキーマ適切なインデックスとフィルタ用カラム設計
ハイブリッド検索RRFでベクトルとキーワード検索を統合
セマンティックキャッシュ類似クエリの結果再利用でコスト削減

チェックリスト

  • エンベディングパイプラインを実装できた
  • pgvectorのスキーマとインデックスを設計できた
  • ハイブリッド検索をRRFで実装できた
  • セマンティックキャッシュを実装できた

次のステップへ

ベクトルDB構築の演習を完了しました。次はStep 2のチェックポイントクイズに挑戦しましょう。


推定所要時間: 60分