LESSON

演習:Feastで特徴量ストアを構築しよう

田中VPoE「特徴量ストアの概念、Feast、パイプライン、ガバナンスを学んだ。実際にNetShop社の離脱予測モデル用の特徴量ストアを構築してみよう。」

あなた「Feature Definitionの作成から、データの投入、Point-in-Time Joinでの学習データセット作成まで一気通貫ですね。」

田中VPoE「加えて、特徴量の品質チェックまでやってほしい。ガバナンスまで含めて初めて実用レベルだ。」

ミッション概要

NetShop社の離脱予測モデル向けに、Feastベースの特徴量ストアを構築します。

環境準備

pip install feast pandas pyarrow great-expectations

feast init netshop_features
cd netshop_features

Mission 1: 特徴量データの準備とFeature Definition

顧客プロファイルと注文統計の特徴量データを作成し、Feastで定義してください。

要件

  1. 2つのFeature View(customer_profile、customer_order_stats)を定義する
  2. 各Feature Viewに4つ以上の特徴量を含める
  3. メタデータ(description、tags)を適切に設定する

データ準備

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

np.random.seed(42)
n_customers = 1000
base_date = datetime(2025, 12, 1)

# 顧客プロファイル特徴量
customer_profile_data = pd.DataFrame({
    "customer_id": range(1, n_customers + 1),
    "event_timestamp": [base_date] * n_customers,
    "age": np.random.normal(38, 12, n_customers).clip(18, 75).astype(int),
    "gender": np.random.choice(["M", "F", "Other"], n_customers, p=[0.48, 0.48, 0.04]),
    "region": np.random.choice(["関東", "関西", "中部", "北海道", "九州"], n_customers),
    "membership": np.random.choice(["Free", "Silver", "Gold"], n_customers, p=[0.6, 0.3, 0.1]),
    "tenure_days": np.random.randint(30, 1800, n_customers),
})

# 注文統計特徴量(複数時点)
order_stats_records = []
for month_offset in range(6):
    ts = base_date - timedelta(days=30 * month_offset)
    for cid in range(1, n_customers + 1):
        order_stats_records.append({
            "customer_id": cid,
            "event_timestamp": ts,
            "order_count_30d": max(0, int(np.random.poisson(3) - month_offset * 0.3)),
            "total_amount_30d": round(np.random.lognormal(8.5, 0.8), 0),
            "avg_order_amount": round(np.random.lognormal(7.5, 0.6), 0),
            "days_since_last_order": np.random.randint(1, 60),
            "favorite_category": np.random.choice(["家電", "書籍", "食品", "ファッション"]),
        })

order_stats_data = pd.DataFrame(order_stats_records)

# Parquet保存
customer_profile_data.to_parquet("data/customer_profile.parquet")
order_stats_data.to_parquet("data/order_stats.parquet")

作業

Feature Definitionを作成してください。

解答例
# definitions.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String

customer = Entity(
    name="customer_id",
    description="NetShop顧客の一意識別子",
)

customer_profile_source = FileSource(
    path="data/customer_profile.parquet",
    timestamp_field="event_timestamp",
)

order_stats_source = FileSource(
    path="data/order_stats.parquet",
    timestamp_field="event_timestamp",
)

customer_profile = FeatureView(
    name="customer_profile",
    entities=[customer],
    ttl=timedelta(days=365),
    schema=[
        Field(name="age", dtype=Int64, description="顧客年齢"),
        Field(name="gender", dtype=String, description="性別"),
        Field(name="region", dtype=String, description="地域"),
        Field(name="membership", dtype=String, description="会員ランク(Free/Silver/Gold)"),
        Field(name="tenure_days", dtype=Int64, description="登録からの日数"),
    ],
    source=customer_profile_source,
    online=True,
    tags={"team": "ds", "domain": "customer", "owner": "sato"},
)

customer_order_stats = FeatureView(
    name="customer_order_stats",
    entities=[customer],
    ttl=timedelta(days=30),
    schema=[
        Field(name="order_count_30d", dtype=Int64, description="30日間注文数"),
        Field(name="total_amount_30d", dtype=Float32, description="30日間合計金額"),
        Field(name="avg_order_amount", dtype=Float32, description="平均注文金額"),
        Field(name="days_since_last_order", dtype=Int64, description="最終注文からの日数"),
        Field(name="favorite_category", dtype=String, description="最頻購入カテゴリ"),
    ],
    source=order_stats_source,
    online=True,
    tags={"team": "ds", "domain": "order", "owner": "sato"},
)
feast apply
feast feature-views list

Mission 2: Point-in-Time Joinで学習データセットを作成する

Entity DataFrameを用意し、Point-in-Time Joinで正確な学習データセットを作成してください。

要件

  1. 500件の学習用Entity DataFrameを作成する
  2. 異なる時点の特徴量をPoint-in-Time Joinで取得する
  3. 取得した特徴量で離脱予測モデルを学習する
解答例
from feast import FeatureStore
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

store = FeatureStore(repo_path=".")

# Entity DataFrame(異なる時点を含む)
entity_df = pd.DataFrame({
    "customer_id": np.random.choice(range(1, 1001), 500),
    "event_timestamp": pd.to_datetime(
        np.random.choice(pd.date_range("2025-07-01", "2025-12-01", freq="D"), 500)
    ),
})

# ラベル付与(シミュレーション)
entity_df["churn"] = np.random.choice([0, 1], 500, p=[0.7, 0.3])

# Point-in-Time Join
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "customer_profile:age",
        "customer_profile:tenure_days",
        "customer_order_stats:order_count_30d",
        "customer_order_stats:total_amount_30d",
        "customer_order_stats:avg_order_amount",
        "customer_order_stats:days_since_last_order",
    ],
).to_df()

print(f"Training dataset shape: {training_df.shape}")
print(training_df.head())

# 数値特徴量のみで学習
feature_cols = ["age", "tenure_days", "order_count_30d",
                "total_amount_30d", "avg_order_amount", "days_since_last_order"]
training_df = training_df.dropna(subset=feature_cols)

X = training_df[feature_cols]
y = training_df["churn"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

print(classification_report(y_test, model.predict(X_test)))

Mission 3: 特徴量の品質チェックを実装する

学習データセットの特徴量に対して品質バリデーションを実施してください。

要件

  1. 各特徴量のNull率チェック
  2. 値の範囲チェック
  3. 分布の統計チェック
  4. 品質レポートの生成
解答例
def validate_features(df, feature_cols):
    """特徴量品質バリデーション"""
    report = []

    for col in feature_cols:
        null_rate = df[col].isnull().mean()
        stats = {
            "feature": col,
            "null_rate": f"{null_rate:.2%}",
            "null_check": "PASS" if null_rate < 0.05 else "FAIL",
            "min": df[col].min() if df[col].dtype != "object" else "N/A",
            "max": df[col].max() if df[col].dtype != "object" else "N/A",
            "mean": round(df[col].mean(), 2) if df[col].dtype != "object" else "N/A",
            "std": round(df[col].std(), 2) if df[col].dtype != "object" else "N/A",
        }

        # 範囲チェック
        range_rules = {
            "age": (18, 80),
            "tenure_days": (0, 3650),
            "order_count_30d": (0, 500),
            "total_amount_30d": (0, 10000000),
            "avg_order_amount": (0, 1000000),
            "days_since_last_order": (0, 365),
        }

        if col in range_rules:
            min_val, max_val = range_rules[col]
            out_of_range = ((df[col] < min_val) | (df[col] > max_val)).sum()
            stats["range_check"] = "PASS" if out_of_range == 0 else f"FAIL ({out_of_range} violations)"
        else:
            stats["range_check"] = "N/A"

        report.append(stats)

    report_df = pd.DataFrame(report)
    print("=== Feature Quality Report ===")
    print(report_df.to_markdown(index=False))

    failed = report_df[
        (report_df["null_check"] == "FAIL") |
        (report_df["range_check"].str.startswith("FAIL", na=False))
    ]
    if len(failed) > 0:
        print(f"\nWARNING: {len(failed)} features have quality issues!")
    else:
        print("\nAll quality checks PASSED!")

    return report_df

quality_report = validate_features(training_df, feature_cols)

達成度チェック

  • Feastで2つ以上のFeature Viewを定義・登録できた
  • Point-in-Time Joinで異なる時点の特徴量を正確に取得できた
  • 取得した特徴量でモデル学習を実行できた
  • 特徴量の品質バリデーションを実装・実行できた
  • メタデータ(description、tags)が適切に設定されている

推定所要時間:90分