LESSON

演習:MLflowで実験管理を構築しよう

田中VPoE「いよいよ実践だ。NetShop社の離脱予測モデルを題材に、MLflowで実験管理を構築してみよう。」

あなた「実際にMLflowを起動して、複数の実験を記録・比較するんですね。」

田中VPoE「そうだ。さらにモデルレジストリへの登録、DVCでのデータバージョニングまで一気通貫でやってみよう。」

ミッション概要

離脱予測モデルの開発を通じて、MLflow Tracking → Model Registry → DVC によるデータバージョニングの実践を行います。

環境準備

# 必要なライブラリのインストール
pip install mlflow scikit-learn xgboost lightgbm pandas dvc dvc-s3

# MLflow UIの起動
mlflow ui --port 5000 &

Mission 1: MLflow Trackingで実験を記録する

3つの異なるアルゴリズムで離脱予測モデルを学習し、すべての実験をMLflowに記録してください。

要件

  1. 以下の3つのモデルを学習する:
    • RandomForest
    • XGBoost
    • LightGBM
  2. 各モデルでハイパーパラメータとメトリクスを記録する
  3. 混同行列をアーティファクトとして記録する
  4. 3モデルのF1スコアを比較し、最適モデルを特定する

データ準備

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

np.random.seed(42)
n = 5000

data = pd.DataFrame({
    "tenure_months": np.random.randint(1, 72, n),
    "monthly_charges": np.random.uniform(20, 100, n),
    "total_charges": np.random.uniform(100, 7000, n),
    "num_support_tickets": np.random.poisson(2, n),
    "contract_type": np.random.choice([0, 1, 2], n, p=[0.5, 0.3, 0.2]),
    "online_security": np.random.choice([0, 1], n),
    "tech_support": np.random.choice([0, 1], n),
    "payment_method": np.random.choice([0, 1, 2, 3], n),
})

churn_prob = 1 / (1 + np.exp(-(
    -2 + 0.03 * (72 - data["tenure_months"])
    + 0.02 * data["monthly_charges"]
    + 0.15 * data["num_support_tickets"]
    - 0.8 * data["contract_type"]
    + np.random.normal(0, 0.5, n)
)))
data["churn"] = (churn_prob > 0.5).astype(int)

X = data.drop("churn", axis=1)
y = data["churn"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

作業

3つのモデルをMLflowで実験記録してください。

解答例
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import xgboost as xgb
import lightgbm as lgb
import matplotlib.pyplot as plt

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("churn_prediction_comparison")

def log_experiment(model, model_name, params, X_train, y_train, X_test, y_test):
    with mlflow.start_run(run_name=model_name):
        mlflow.set_tag("model_type", model_name)
        mlflow.set_tag("developer", "ds_team")
        mlflow.log_params(params)
        mlflow.log_param("train_samples", len(X_train))
        mlflow.log_param("test_samples", len(X_test))

        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        y_prob = model.predict_proba(X_test)[:, 1]

        metrics = {
            "accuracy": accuracy_score(y_test, y_pred),
            "precision": precision_score(y_test, y_pred),
            "recall": recall_score(y_test, y_pred),
            "f1_score": f1_score(y_test, y_pred),
            "auc_roc": roc_auc_score(y_test, y_prob),
        }
        mlflow.log_metrics(metrics)

        cm = confusion_matrix(y_test, y_pred)
        disp = ConfusionMatrixDisplay(cm)
        disp.plot()
        plt.title(f"{model_name} - Confusion Matrix")
        plt.savefig(f"cm_{model_name}.png")
        mlflow.log_artifact(f"cm_{model_name}.png")
        plt.close()

        mlflow.sklearn.log_model(model, "model")
        return metrics

# 1. RandomForest
rf_params = {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5}
rf_model = RandomForestClassifier(**rf_params, random_state=42)
rf_metrics = log_experiment(rf_model, "RandomForest", rf_params,
                           X_train_scaled, y_train, X_test_scaled, y_test)

# 2. XGBoost
xgb_params = {"n_estimators": 100, "max_depth": 6, "learning_rate": 0.1, "subsample": 0.8}
xgb_model = xgb.XGBClassifier(**xgb_params, random_state=42, eval_metric="logloss")
xgb_metrics = log_experiment(xgb_model, "XGBoost", xgb_params,
                            X_train_scaled, y_train, X_test_scaled, y_test)

# 3. LightGBM
lgb_params = {"n_estimators": 100, "max_depth": 6, "learning_rate": 0.1, "num_leaves": 31}
lgb_model = lgb.LGBMClassifier(**lgb_params, random_state=42, verbose=-1)
lgb_metrics = log_experiment(lgb_model, "LightGBM", lgb_params,
                            X_train_scaled, y_train, X_test_scaled, y_test)

# 比較
comparison = pd.DataFrame([rf_metrics, xgb_metrics, lgb_metrics],
                          index=["RandomForest", "XGBoost", "LightGBM"])
print(comparison.sort_values("f1_score", ascending=False))

Mission 2: モデルレジストリに登録する

Mission 1で最もF1スコアが高いモデルをモデルレジストリに登録し、Productionステージに昇格させてください。

要件

  1. ベストモデルをレジストリに登録する
  2. モデルの説明とタグを追加する
  3. Staging → Production の昇格を行う
  4. Production モデルをロードして推論を実行する
解答例
from mlflow import MlflowClient

client = MlflowClient()

# ベストRunの取得
best_run = mlflow.search_runs(
    experiment_names=["churn_prediction_comparison"],
    order_by=["metrics.f1_score DESC"],
    max_results=1
).iloc[0]

print(f"Best model: {best_run['tags.model_type']}")
print(f"F1 Score: {best_run['metrics.f1_score']:.4f}")

# モデル登録
model_uri = f"runs:/{best_run['run_id']}/model"
result = mlflow.register_model(model_uri, "churn_predictor")

# モデルの説明追加
client.update_registered_model(
    name="churn_predictor",
    description="NetShop社 顧客離脱予測モデル。月次バッチスコアリング用。"
)
client.update_model_version(
    name="churn_predictor",
    version=result.version,
    description=f"F1={best_run['metrics.f1_score']:.4f}, AUC={best_run['metrics.auc_roc']:.4f}"
)

# タグ追加
client.set_model_version_tag("churn_predictor", result.version, "approved_by", "tanaka_vpoe")

# Staging → Production
client.set_registered_model_alias("churn_predictor", "staging", result.version)
print(f"Version {result.version} promoted to Staging")

client.set_registered_model_alias("churn_predictor", "production", result.version)
print(f"Version {result.version} promoted to Production")

# Production モデルの利用
prod_model = mlflow.pyfunc.load_model("models:/churn_predictor@production")
sample = X_test_scaled[:5]
predictions = prod_model.predict(sample)
print(f"Predictions: {predictions}")

Mission 3: DVCでデータバージョニングを設定する

学習データをDVCで管理し、コードとデータのバージョンを紐づけてください。

要件

  1. DVCを初期化し、学習データをトラッキングする
  2. データを更新し、新バージョンとして記録する
  3. 前バージョンのデータを復元する
解答例
# 1. DVC初期化
cd ml_project
dvc init

# 2. データの保存
mkdir -p data
python -c "
import pandas as pd
import numpy as np
np.random.seed(42)
# ... (Mission 1のデータ生成コードを実行)
data.to_csv('data/churn_data.csv', index=False)
print(f'Data shape: {data.shape}')
"

# 3. DVCでトラッキング
dvc add data/churn_data.csv
git add data/churn_data.csv.dvc data/.gitignore
git commit -m "Add churn data v1"
git tag data-v1

# 4. データの更新(新しいデータを追加)
python -c "
import pandas as pd
import numpy as np
np.random.seed(123)
# 追加データ生成
# ...
existing = pd.read_csv('data/churn_data.csv')
new_data = pd.concat([existing, additional_data], ignore_index=True)
new_data.to_csv('data/churn_data.csv', index=False)
print(f'Updated data shape: {new_data.shape}')
"

dvc add data/churn_data.csv
git add data/churn_data.csv.dvc
git commit -m "Update churn data v2 - added 1000 samples"
git tag data-v2

# 5. 前バージョンの復元
git checkout data-v1 -- data/churn_data.csv.dvc
dvc checkout
python -c "import pandas as pd; print(pd.read_csv('data/churn_data.csv').shape)"
# → v1のデータが復元される

# 元に戻す
git checkout data-v2 -- data/churn_data.csv.dvc
dvc checkout

達成度チェック

  • MLflowで3つ以上のモデルの実験を記録できた
  • パラメータ・メトリクス・アーティファクトすべて記録されている
  • モデルレジストリに登録し、Production昇格ができた
  • DVCでデータのバージョン管理を設定できた
  • 前バージョンのデータ復元ができた

推定所要時間:90分