LESSON

演習:ハイブリッド推薦システムを構築しよう

「いよいよ個別のアルゴリズムを組み合わせて、NetShop社のハイブリッド推薦システムを構築してくれ。」

田中VPoEが要件書を渡す。

「Cold Startにも対応し、ビジネスルールも適用した実用的なシステムを頼む。H&Mのデータで検証してから、NetShop社に提案しよう。」

ミッション概要

H&M / MovieLensデータを使って、ハイブリッド推薦システムを構築する演習である。候補生成からリランキングまでの全パイプラインを実装する。


Mission 1: LightFMによるハイブリッドモデル(25分)

LightFMを使ってCold Startに対応したハイブリッドモデルを構築せよ。

タスク:

  1. MovieLens 100Kデータにアイテム特徴量(ジャンル)を付与してLightFMモデルを学習する
  2. 特徴量なし(pure CF)と特徴量あり(ハイブリッド)の性能を比較する
  3. WARP損失とBPR損失を比較する
  4. ハイパーパラメータ(no_components、epochs、learning_rate)をチューニングする
解答例
from lightfm import LightFM
from lightfm.data import Dataset
from lightfm.evaluation import precision_at_k, auc_score
from lightfm.datasets import fetch_movielens
import numpy as np

# MovieLensデータ読み込み
data = fetch_movielens(min_rating=3.5)
train = data['train']
test = data['test']
item_features = data['item_features']  # ジャンル情報

# モデル比較
results = []

for loss in ['warp', 'bpr']:
    for use_features in [False, True]:
        model = LightFM(
            no_components=64,
            loss=loss,
            learning_rate=0.05,
            item_alpha=1e-6,
        )

        if use_features:
            model.fit(train, item_features=item_features, epochs=30, num_threads=4)
            p_at_k = precision_at_k(model, test, item_features=item_features, k=10).mean()
            auc = auc_score(model, test, item_features=item_features).mean()
        else:
            model.fit(train, epochs=30, num_threads=4)
            p_at_k = precision_at_k(model, test, k=10).mean()
            auc = auc_score(model, test).mean()

        results.append({
            'loss': loss,
            'features': 'Hybrid' if use_features else 'Pure CF',
            'Precision@10': p_at_k,
            'AUC': auc,
        })

import pandas as pd
results_df = pd.DataFrame(results)
print(results_df)
# WARP + Hybrid が最も高い性能を示すことが多い

Mission 2: 特徴量エンジニアリング(25分)

推薦モデルの入力特徴量を設計・構築せよ。

タスク:

  1. ユーザー特徴量を最低5つ設計・実装する
  2. アイテム特徴量を最低5つ設計・実装する
  3. 特徴量の重要度を分析する
  4. 特徴量追加前後の性能改善を定量的に示す
解答例
# ユーザー特徴量の構築
def build_user_features_movielens(ratings_df):
    features = pd.DataFrame()
    user_stats = ratings_df.groupby('user_id')

    features['n_ratings'] = user_stats.size()
    features['avg_rating'] = user_stats['rating'].mean()
    features['std_rating'] = user_stats['rating'].std().fillna(0)
    features['n_genres'] = user_stats['genre'].nunique()
    features['rating_entropy'] = user_stats['rating'].apply(
        lambda x: -(x.value_counts(normalize=True) *
                     np.log2(x.value_counts(normalize=True) + 1e-10)).sum()
    )
    return features

# アイテム特徴量の構築
def build_item_features_movielens(ratings_df, movies_df):
    features = movies_df.copy()
    item_stats = ratings_df.groupby('item_id')

    features['n_ratings'] = item_stats.size()
    features['avg_rating'] = item_stats['rating'].mean()
    features['unique_raters'] = item_stats['user_id'].nunique()
    features['rating_variance'] = item_stats['rating'].var().fillna(0)
    # 人気度(全体に対する評価割合)
    features['popularity'] = features['n_ratings'] / len(ratings_df)
    return features

# 特徴量追加前後の比較
# (前) LightFM pure CF: Precision@10 = 0.08
# (後) LightFM + features: Precision@10 = 0.11 (+37.5%)

Mission 3: リランキングの実装(25分)

ビジネスルールを適用したリランキングを実装せよ。

タスク:

  1. 多様性を考慮したMMRリランキングを実装する
  2. ビジネスルール(在庫フィルタ、新商品ブースト、カテゴリ多様性)を実装する
  3. リランキング前後の推薦結果を比較する
  4. 多様性指標(カテゴリカバレッジ)を測定する
解答例
from sklearn.metrics.pairwise import cosine_similarity

def mmr_rerank(recommendations, item_features, lambda_param=0.5, n=10):
    """MMRベースのリランキング"""
    selected = []
    candidates = list(range(len(recommendations)))

    for _ in range(n):
        best_score = -float('inf')
        best_idx = None

        for idx in candidates:
            item_id, relevance = recommendations[idx]
            item_vec = item_features[item_id]

            # 多様性スコア
            if selected:
                selected_vecs = [item_features[recommendations[s][0]] for s in selected]
                max_sim = max(
                    cosine_similarity([item_vec], [sv])[0][0]
                    for sv in selected_vecs
                )
            else:
                max_sim = 0

            # MMRスコア
            mmr = lambda_param * relevance - (1 - lambda_param) * max_sim

            if mmr > best_score:
                best_score = mmr
                best_idx = idx

        selected.append(best_idx)
        candidates.remove(best_idx)

    return [recommendations[i] for i in selected]

def apply_business_rules(recommendations, item_info, n=10):
    """ビジネスルール適用"""
    # 1. 在庫フィルタ
    filtered = [(iid, s) for iid, s in recommendations
                if item_info[iid].get('stock', 1) > 0]

    # 2. 新商品ブースト
    boosted = []
    for iid, score in filtered:
        if item_info[iid].get('is_new', False):
            score *= 1.3
        boosted.append((iid, score))
    boosted.sort(key=lambda x: x[1], reverse=True)

    # 3. カテゴリ多様性(同一カテゴリ最大3つ)
    result = []
    cat_count = {}
    for iid, score in boosted:
        cat = item_info[iid]['category']
        if cat_count.get(cat, 0) < 3:
            result.append((iid, score))
            cat_count[cat] = cat_count.get(cat, 0) + 1
        if len(result) >= n:
            break

    return result

# カテゴリカバレッジの計測
def category_coverage(recommendations, item_info, total_categories):
    cats = set(item_info[iid]['category'] for iid, _ in recommendations)
    return len(cats) / total_categories

# 比較
# リランキング前: Coverage = 0.12(少数カテゴリに集中)
# リランキング後: Coverage = 0.35(多様なカテゴリ)

Mission 4: パイプライン統合(15分)

全コンポーネントを統合した推薦パイプラインを構築せよ。

タスク:

  1. 候補生成 → スコアリング → リランキングの3段階パイプラインを実装する
  2. Cold Startユーザーへの推薦を確認する
  3. パイプライン全体の処理時間を測定する
  4. 各段階の推薦結果の変化を可視化する
解答例
class RecommendationPipeline:
    """統合推薦パイプライン"""

    def __init__(self, lightfm_model, content_model, item_info):
        self.lightfm_model = lightfm_model
        self.content_model = content_model
        self.item_info = item_info

    def recommend(self, user_id, n=10):
        import time
        timings = {}

        # Stage 1: 候補生成
        t0 = time.time()
        n_history = get_user_history_count(user_id)
        if n_history >= 5:
            candidates = self.lightfm_model.recommend(user_id, n=100)
        elif n_history > 0:
            candidates = self.content_model.recommend(user_id, n=100)
        else:
            candidates = get_popular_items(n=100)
        timings['candidate_generation'] = time.time() - t0

        # Stage 2: スコアリング(既にスコア付き)
        t1 = time.time()
        scored = candidates[:50]  # Top 50
        timings['scoring'] = time.time() - t1

        # Stage 3: リランキング
        t2 = time.time()
        reranked = apply_business_rules(scored, self.item_info, n=n)
        timings['reranking'] = time.time() - t2

        timings['total'] = sum(timings.values())
        return reranked, timings

# 実行例
# pipeline = RecommendationPipeline(model, content_model, item_info)
# recs, timings = pipeline.recommend(user_id=42, n=10)
# print(f"Total: {timings['total']*1000:.1f}ms")

達成度チェック

  • LightFMでハイブリッドモデルを構築しPure CFより性能向上を確認した
  • ユーザー/アイテム特徴量を各5つ以上設計・実装した
  • MMRリランキングとビジネスルールを実装した
  • カテゴリカバレッジの改善を定量的に示した
  • 3段階パイプラインを統合し処理時間を測定した

推定所要時間: 90分