ストーリー
演習の概要
| ミッション | テーマ | 難易度 |
|---|---|---|
| Mission 1 | エンベディングパイプラインの実装 | 中級 |
| Mission 2 | pgvectorスキーマとインデックス設計 | 中級 |
| Mission 3 | ハイブリッド検索の実装 | 上級 |
| Mission 4 | セマンティックキャッシュの実装 | 上級 |
Mission 1: エンベディングパイプラインの実装
以下の要件を満たすエンベディングパイプラインを実装してください。
要件:
- Markdownドキュメントを読み込み、構造ベースでチャンキング
- 各チャンクにメタデータ(タイトル、セクション、タグ)を付与
- バッチ処理でエンベディングを生成
- エラーハンドリング(API障害時のリトライ)
実装してください:チャンキング → メタデータ付与 → バッチエンベディング生成のパイプライン
解答例
import { OpenAI } from 'openai';
interface ProcessedChunk {
id: string;
content: string;
embedding: number[];
metadata: {
documentId: string;
documentTitle: string;
sectionTitle: string;
headingPath: string[];
tags: string[];
chunkIndex: number;
totalChunks: number;
};
}
class EmbeddingPipeline {
private openai: OpenAI;
constructor(apiKey: string) {
this.openai = new OpenAI({ apiKey });
}
async process(
documentId: string,
markdown: string,
tags: string[] = [],
): Promise<ProcessedChunk[]> {
// 1. チャンキング
const chunks = this.chunkMarkdown(markdown);
const documentTitle = this.extractTitle(markdown);
// 2. メタデータ付与
const enrichedChunks = chunks.map((chunk, index) => ({
id: `${documentId}-chunk-${index}`,
content: chunk.content,
metadata: {
documentId,
documentTitle,
sectionTitle: chunk.heading,
headingPath: chunk.headingPath,
tags,
chunkIndex: index,
totalChunks: chunks.length,
},
}));
// 3. バッチエンベディング生成(リトライ付き)
const texts = enrichedChunks.map(c =>
`${c.metadata.documentTitle} > ${c.metadata.sectionTitle}\n\n${c.content}`
);
const embeddings = await this.batchEmbed(texts);
return enrichedChunks.map((chunk, i) => ({
...chunk,
embedding: embeddings[i],
}));
}
private async batchEmbed(
texts: string[],
batchSize: number = 100,
maxRetries: number = 3,
): Promise<number[][]> {
const allEmbeddings: number[][] = [];
for (let i = 0; i < texts.length; i += batchSize) {
const batch = texts.slice(i, i + batchSize);
let lastError: Error | null = null;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const response = await this.openai.embeddings.create({
model: 'text-embedding-3-small',
input: batch,
dimensions: 1536,
});
allEmbeddings.push(...response.data.map(d => d.embedding));
lastError = null;
break;
} catch (error) {
lastError = error as Error;
const backoff = Math.pow(2, attempt) * 1000;
console.warn(`Embedding retry ${attempt + 1}/${maxRetries}, waiting ${backoff}ms`);
await new Promise(resolve => setTimeout(resolve, backoff));
}
}
if (lastError) throw lastError;
}
return allEmbeddings;
}
private chunkMarkdown(markdown: string): Array<{
content: string;
heading: string;
headingPath: string[];
}> {
const lines = markdown.split('\n');
const chunks: Array<{ content: string; heading: string; headingPath: string[] }> = [];
let current: string[] = [];
let headingStack: { text: string; level: number }[] = [];
for (const line of lines) {
const match = line.match(/^(#{1,6})\s+(.+)/);
if (match) {
if (current.length > 0) {
chunks.push({
content: current.join('\n').trim(),
heading: headingStack[headingStack.length - 1]?.text ?? '',
headingPath: headingStack.map(h => h.text),
});
current = [];
}
const level = match[1].length;
while (headingStack.length > 0 && headingStack[headingStack.length - 1].level >= level) {
headingStack.pop();
}
headingStack.push({ text: match[2], level });
}
current.push(line);
}
if (current.length > 0) {
chunks.push({
content: current.join('\n').trim(),
heading: headingStack[headingStack.length - 1]?.text ?? '',
headingPath: headingStack.map(h => h.text),
});
}
return chunks;
}
private extractTitle(markdown: string): string {
const match = markdown.match(/^#\s+(.+)/m);
return match?.[1] ?? 'Untitled';
}
}
Mission 2: pgvectorスキーマとインデックス設計
以下の要件を満たすpgvectorのスキーマとインデックスを設計してください。
要件:
- 1536次元のエンベディングを格納
- メタデータでのフィルタリング(documentType, team, updatedAt)
- 50万ベクトルを想定
- HNSW インデックスの適切なパラメータ設計
SQLとTypeScriptの実装コードを書いてください。
解答例
-- 拡張の有効化
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
-- テーブル作成
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
embedding vector(1536) NOT NULL,
document_type VARCHAR(50) NOT NULL, -- 'tech-doc', 'incident', 'adr'
team VARCHAR(100),
section_title VARCHAR(500),
heading_path TEXT[],
tags TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- HNSW インデックス(50万ベクトル向け)
CREATE INDEX CONCURRENTLY idx_chunks_hnsw
ON document_chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 256);
-- メタデータフィルタ用インデックス
CREATE INDEX idx_chunks_doc_type ON document_chunks (document_type);
CREATE INDEX idx_chunks_team ON document_chunks (team);
CREATE INDEX idx_chunks_updated ON document_chunks (updated_at);
CREATE INDEX idx_chunks_tags ON document_chunks USING gin (tags);
-- 全文検索インデックス
CREATE INDEX idx_chunks_fts ON document_chunks
USING gin (to_tsvector('simple', content));
class PgVectorRepository {
constructor(private readonly pool: Pool) {}
async searchWithFilters(
queryEmbedding: number[],
filters: {
documentType?: string;
team?: string;
updatedAfter?: Date;
tags?: string[];
},
topK: number = 5,
efSearch: number = 128,
): Promise<RetrievedContext[]> {
// ef_search の設定
await this.pool.query(`SET hnsw.ef_search = ${efSearch}`);
const conditions: string[] = [];
const params: unknown[] = [`[${queryEmbedding.join(',')}]`, topK];
let paramIdx = 3;
if (filters.documentType) {
conditions.push(`document_type = $${paramIdx++}`);
params.push(filters.documentType);
}
if (filters.team) {
conditions.push(`team = $${paramIdx++}`);
params.push(filters.team);
}
if (filters.updatedAfter) {
conditions.push(`updated_at >= $${paramIdx++}`);
params.push(filters.updatedAfter);
}
if (filters.tags?.length) {
conditions.push(`tags && $${paramIdx++}`);
params.push(filters.tags);
}
const whereClause = conditions.length > 0
? `WHERE ${conditions.join(' AND ')}`
: '';
const result = await this.pool.query(
`SELECT id, document_id, content, section_title, heading_path, tags,
document_type, team, updated_at,
1 - (embedding <=> $1::vector) AS score
FROM document_chunks
${whereClause}
ORDER BY embedding <=> $1::vector
LIMIT $2`,
params,
);
return result.rows.map(row => ({
chunk: {
id: row.id,
documentId: row.document_id,
content: row.content,
metadata: {
sectionTitle: row.section_title,
headingPath: row.heading_path,
tags: row.tags,
documentType: row.document_type,
team: row.team,
updatedAt: row.updated_at,
},
},
score: row.score,
}));
}
}
Mission 3: ハイブリッド検索の実装
pgvectorのベクトル検索とPostgreSQLの全文検索を組み合わせた、RRFベースのハイブリッド検索を実装してください。
解答例
class PgHybridSearchService {
constructor(private readonly pool: Pool) {}
async hybridSearch(
query: string,
queryEmbedding: number[],
options: {
topK?: number;
vectorWeight?: number;
filters?: Record<string, unknown>;
} = {},
): Promise<RetrievedContext[]> {
const { topK = 5, vectorWeight = 0.6 } = options;
const candidateK = topK * 4;
const keywordWeight = 1 - vectorWeight;
const result = await this.pool.query(
`WITH vector_results AS (
SELECT id, content, document_id,
section_title, heading_path, tags, document_type, team, updated_at,
ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) AS rank
FROM document_chunks
ORDER BY embedding <=> $1::vector
LIMIT $3
),
keyword_results AS (
SELECT id, content, document_id,
section_title, heading_path, tags, document_type, team, updated_at,
ROW_NUMBER() OVER (
ORDER BY ts_rank(to_tsvector('simple', content),
plainto_tsquery('simple', $2)) DESC
) AS rank
FROM document_chunks
WHERE to_tsvector('simple', content) @@ plainto_tsquery('simple', $2)
LIMIT $3
),
combined AS (
SELECT
COALESCE(v.id, k.id) AS id,
COALESCE(v.content, k.content) AS content,
COALESCE(v.document_id, k.document_id) AS document_id,
COALESCE(v.section_title, k.section_title) AS section_title,
COALESCE(v.heading_path, k.heading_path) AS heading_path,
COALESCE(v.tags, k.tags) AS tags,
COALESCE(v.document_type, k.document_type) AS document_type,
COALESCE(v.team, k.team) AS team,
COALESCE($4::float / (60.0 + v.rank), 0) +
COALESCE($5::float / (60.0 + k.rank), 0) AS rrf_score
FROM vector_results v
FULL OUTER JOIN keyword_results k ON v.id = k.id
)
SELECT * FROM combined
ORDER BY rrf_score DESC
LIMIT $6`,
[
`[${queryEmbedding.join(',')}]`,
query,
candidateK,
vectorWeight,
keywordWeight,
topK,
],
);
return result.rows.map(row => ({
chunk: {
id: row.id,
documentId: row.document_id,
content: row.content,
metadata: {
sectionTitle: row.section_title,
headingPath: row.heading_path,
tags: row.tags,
documentType: row.document_type,
team: row.team,
},
},
score: row.rrf_score,
}));
}
}
Mission 4: セマンティックキャッシュの実装
pgvectorを使ったセマンティックキャッシュを実装し、RAGパイプラインに統合してください。
要件:
- クエリのコサイン類似度 > 0.95 でキャッシュヒット判定
- TTL: 1時間
- キャッシュヒット率のメトリクス記録
解答例
class PgSemanticCache {
constructor(
private readonly pool: Pool,
private readonly embedder: EmbeddingService,
private readonly threshold: number = 0.95,
private readonly ttlMs: number = 3_600_000,
) {}
async initialize(): Promise<void> {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS semantic_cache (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
query_text TEXT NOT NULL,
query_embedding vector(1536) NOT NULL,
answer TEXT NOT NULL,
contexts JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
)
`);
await this.pool.query(`
CREATE INDEX IF NOT EXISTS idx_cache_hnsw
ON semantic_cache USING hnsw (query_embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 100)
`);
}
async get(query: string): Promise<{
answer: string;
contexts: string[];
} | null> {
const embedding = await this.embedder.embed(query);
const result = await this.pool.query(
`SELECT id, answer, contexts, created_at,
1 - (query_embedding <=> $1::vector) AS similarity
FROM semantic_cache
WHERE 1 - (query_embedding <=> $1::vector) > $2
AND created_at > NOW() - INTERVAL '${this.ttlMs / 1000} seconds'
ORDER BY query_embedding <=> $1::vector
LIMIT 1`,
[`[${embedding.join(',')}]`, this.threshold],
);
if (result.rows.length === 0) return null;
return {
answer: result.rows[0].answer,
contexts: result.rows[0].contexts,
};
}
async set(query: string, answer: string, contexts: string[]): Promise<void> {
const embedding = await this.embedder.embed(query);
await this.pool.query(
`INSERT INTO semantic_cache (query_text, query_embedding, answer, contexts)
VALUES ($1, $2::vector, $3, $4::jsonb)`,
[`${query}`, `[${embedding.join(',')}]`, answer, JSON.stringify(contexts)],
);
}
async cleanup(): Promise<void> {
await this.pool.query(
`DELETE FROM semantic_cache
WHERE created_at < NOW() - INTERVAL '${this.ttlMs / 1000} seconds'`,
);
}
}
まとめ
| ポイント | 内容 |
|---|---|
| エンベディングパイプライン | チャンキング → メタデータ → バッチ生成のフロー |
| pgvectorスキーマ | 適切なインデックスとフィルタ用カラム設計 |
| ハイブリッド検索 | RRFでベクトルとキーワード検索を統合 |
| セマンティックキャッシュ | 類似クエリの結果再利用でコスト削減 |
チェックリスト
- エンベディングパイプラインを実装できた
- pgvectorのスキーマとインデックスを設計できた
- ハイブリッド検索をRRFで実装できた
- セマンティックキャッシュを実装できた
次のステップへ
ベクトルDB構築の演習を完了しました。次はStep 2のチェックポイントクイズに挑戦しましょう。
推定所要時間: 60分