演習:ハイブリッド推薦システムを構築しよう
「いよいよ個別のアルゴリズムを組み合わせて、NetShop社のハイブリッド推薦システムを構築してくれ。」
田中VPoEが要件書を渡す。
「Cold Startにも対応し、ビジネスルールも適用した実用的なシステムを頼む。H&Mのデータで検証してから、NetShop社に提案しよう。」
ミッション概要
H&M / MovieLensデータを使って、ハイブリッド推薦システムを構築する演習である。候補生成からリランキングまでの全パイプラインを実装する。
Mission 1: LightFMによるハイブリッドモデル(25分)
LightFMを使ってCold Startに対応したハイブリッドモデルを構築せよ。
タスク:
- MovieLens 100Kデータにアイテム特徴量(ジャンル)を付与してLightFMモデルを学習する
- 特徴量なし(pure CF)と特徴量あり(ハイブリッド)の性能を比較する
- WARP損失とBPR損失を比較する
- ハイパーパラメータ(no_components、epochs、learning_rate)をチューニングする
解答例
from lightfm import LightFM
from lightfm.data import Dataset
from lightfm.evaluation import precision_at_k, auc_score
from lightfm.datasets import fetch_movielens
import numpy as np
# MovieLensデータ読み込み
data = fetch_movielens(min_rating=3.5)
train = data['train']
test = data['test']
item_features = data['item_features'] # ジャンル情報
# モデル比較
results = []
for loss in ['warp', 'bpr']:
for use_features in [False, True]:
model = LightFM(
no_components=64,
loss=loss,
learning_rate=0.05,
item_alpha=1e-6,
)
if use_features:
model.fit(train, item_features=item_features, epochs=30, num_threads=4)
p_at_k = precision_at_k(model, test, item_features=item_features, k=10).mean()
auc = auc_score(model, test, item_features=item_features).mean()
else:
model.fit(train, epochs=30, num_threads=4)
p_at_k = precision_at_k(model, test, k=10).mean()
auc = auc_score(model, test).mean()
results.append({
'loss': loss,
'features': 'Hybrid' if use_features else 'Pure CF',
'Precision@10': p_at_k,
'AUC': auc,
})
import pandas as pd
results_df = pd.DataFrame(results)
print(results_df)
# WARP + Hybrid が最も高い性能を示すことが多い
Mission 2: 特徴量エンジニアリング(25分)
推薦モデルの入力特徴量を設計・構築せよ。
タスク:
- ユーザー特徴量を最低5つ設計・実装する
- アイテム特徴量を最低5つ設計・実装する
- 特徴量の重要度を分析する
- 特徴量追加前後の性能改善を定量的に示す
解答例
# ユーザー特徴量の構築
def build_user_features_movielens(ratings_df):
features = pd.DataFrame()
user_stats = ratings_df.groupby('user_id')
features['n_ratings'] = user_stats.size()
features['avg_rating'] = user_stats['rating'].mean()
features['std_rating'] = user_stats['rating'].std().fillna(0)
features['n_genres'] = user_stats['genre'].nunique()
features['rating_entropy'] = user_stats['rating'].apply(
lambda x: -(x.value_counts(normalize=True) *
np.log2(x.value_counts(normalize=True) + 1e-10)).sum()
)
return features
# アイテム特徴量の構築
def build_item_features_movielens(ratings_df, movies_df):
features = movies_df.copy()
item_stats = ratings_df.groupby('item_id')
features['n_ratings'] = item_stats.size()
features['avg_rating'] = item_stats['rating'].mean()
features['unique_raters'] = item_stats['user_id'].nunique()
features['rating_variance'] = item_stats['rating'].var().fillna(0)
# 人気度(全体に対する評価割合)
features['popularity'] = features['n_ratings'] / len(ratings_df)
return features
# 特徴量追加前後の比較
# (前) LightFM pure CF: Precision@10 = 0.08
# (後) LightFM + features: Precision@10 = 0.11 (+37.5%)
Mission 3: リランキングの実装(25分)
ビジネスルールを適用したリランキングを実装せよ。
タスク:
- 多様性を考慮したMMRリランキングを実装する
- ビジネスルール(在庫フィルタ、新商品ブースト、カテゴリ多様性)を実装する
- リランキング前後の推薦結果を比較する
- 多様性指標(カテゴリカバレッジ)を測定する
解答例
from sklearn.metrics.pairwise import cosine_similarity
def mmr_rerank(recommendations, item_features, lambda_param=0.5, n=10):
"""MMRベースのリランキング"""
selected = []
candidates = list(range(len(recommendations)))
for _ in range(n):
best_score = -float('inf')
best_idx = None
for idx in candidates:
item_id, relevance = recommendations[idx]
item_vec = item_features[item_id]
# 多様性スコア
if selected:
selected_vecs = [item_features[recommendations[s][0]] for s in selected]
max_sim = max(
cosine_similarity([item_vec], [sv])[0][0]
for sv in selected_vecs
)
else:
max_sim = 0
# MMRスコア
mmr = lambda_param * relevance - (1 - lambda_param) * max_sim
if mmr > best_score:
best_score = mmr
best_idx = idx
selected.append(best_idx)
candidates.remove(best_idx)
return [recommendations[i] for i in selected]
def apply_business_rules(recommendations, item_info, n=10):
"""ビジネスルール適用"""
# 1. 在庫フィルタ
filtered = [(iid, s) for iid, s in recommendations
if item_info[iid].get('stock', 1) > 0]
# 2. 新商品ブースト
boosted = []
for iid, score in filtered:
if item_info[iid].get('is_new', False):
score *= 1.3
boosted.append((iid, score))
boosted.sort(key=lambda x: x[1], reverse=True)
# 3. カテゴリ多様性(同一カテゴリ最大3つ)
result = []
cat_count = {}
for iid, score in boosted:
cat = item_info[iid]['category']
if cat_count.get(cat, 0) < 3:
result.append((iid, score))
cat_count[cat] = cat_count.get(cat, 0) + 1
if len(result) >= n:
break
return result
# カテゴリカバレッジの計測
def category_coverage(recommendations, item_info, total_categories):
cats = set(item_info[iid]['category'] for iid, _ in recommendations)
return len(cats) / total_categories
# 比較
# リランキング前: Coverage = 0.12(少数カテゴリに集中)
# リランキング後: Coverage = 0.35(多様なカテゴリ)
Mission 4: パイプライン統合(15分)
全コンポーネントを統合した推薦パイプラインを構築せよ。
タスク:
- 候補生成 → スコアリング → リランキングの3段階パイプラインを実装する
- Cold Startユーザーへの推薦を確認する
- パイプライン全体の処理時間を測定する
- 各段階の推薦結果の変化を可視化する
解答例
class RecommendationPipeline:
"""統合推薦パイプライン"""
def __init__(self, lightfm_model, content_model, item_info):
self.lightfm_model = lightfm_model
self.content_model = content_model
self.item_info = item_info
def recommend(self, user_id, n=10):
import time
timings = {}
# Stage 1: 候補生成
t0 = time.time()
n_history = get_user_history_count(user_id)
if n_history >= 5:
candidates = self.lightfm_model.recommend(user_id, n=100)
elif n_history > 0:
candidates = self.content_model.recommend(user_id, n=100)
else:
candidates = get_popular_items(n=100)
timings['candidate_generation'] = time.time() - t0
# Stage 2: スコアリング(既にスコア付き)
t1 = time.time()
scored = candidates[:50] # Top 50
timings['scoring'] = time.time() - t1
# Stage 3: リランキング
t2 = time.time()
reranked = apply_business_rules(scored, self.item_info, n=n)
timings['reranking'] = time.time() - t2
timings['total'] = sum(timings.values())
return reranked, timings
# 実行例
# pipeline = RecommendationPipeline(model, content_model, item_info)
# recs, timings = pipeline.recommend(user_id=42, n=10)
# print(f"Total: {timings['total']*1000:.1f}ms")
達成度チェック
- LightFMでハイブリッドモデルを構築しPure CFより性能向上を確認した
- ユーザー/アイテム特徴量を各5つ以上設計・実装した
- MMRリランキングとビジネスルールを実装した
- カテゴリカバレッジの改善を定量的に示した
- 3段階パイプラインを統合し処理時間を測定した
推定所要時間: 90分