演習:MLflowで実験管理を構築しよう
田中VPoE「いよいよ実践だ。NetShop社の離脱予測モデルを題材に、MLflowで実験管理を構築してみよう。」
あなた「実際にMLflowを起動して、複数の実験を記録・比較するんですね。」
田中VPoE「そうだ。さらにモデルレジストリへの登録、DVCでのデータバージョニングまで一気通貫でやってみよう。」
ミッション概要
離脱予測モデルの開発を通じて、MLflow Tracking → Model Registry → DVC によるデータバージョニングの実践を行います。
環境準備
# 必要なライブラリのインストール
pip install mlflow scikit-learn xgboost lightgbm pandas dvc dvc-s3
# MLflow UIの起動
mlflow ui --port 5000 &
Mission 1: MLflow Trackingで実験を記録する
3つの異なるアルゴリズムで離脱予測モデルを学習し、すべての実験をMLflowに記録してください。
要件
- 以下の3つのモデルを学習する:
- RandomForest
- XGBoost
- LightGBM
- 各モデルでハイパーパラメータとメトリクスを記録する
- 混同行列をアーティファクトとして記録する
- 3モデルのF1スコアを比較し、最適モデルを特定する
データ準備
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
np.random.seed(42)
n = 5000
data = pd.DataFrame({
"tenure_months": np.random.randint(1, 72, n),
"monthly_charges": np.random.uniform(20, 100, n),
"total_charges": np.random.uniform(100, 7000, n),
"num_support_tickets": np.random.poisson(2, n),
"contract_type": np.random.choice([0, 1, 2], n, p=[0.5, 0.3, 0.2]),
"online_security": np.random.choice([0, 1], n),
"tech_support": np.random.choice([0, 1], n),
"payment_method": np.random.choice([0, 1, 2, 3], n),
})
churn_prob = 1 / (1 + np.exp(-(
-2 + 0.03 * (72 - data["tenure_months"])
+ 0.02 * data["monthly_charges"]
+ 0.15 * data["num_support_tickets"]
- 0.8 * data["contract_type"]
+ np.random.normal(0, 0.5, n)
)))
data["churn"] = (churn_prob > 0.5).astype(int)
X = data.drop("churn", axis=1)
y = data["churn"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
作業
3つのモデルをMLflowで実験記録してください。
解答例
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import xgboost as xgb
import lightgbm as lgb
import matplotlib.pyplot as plt
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("churn_prediction_comparison")
def log_experiment(model, model_name, params, X_train, y_train, X_test, y_test):
with mlflow.start_run(run_name=model_name):
mlflow.set_tag("model_type", model_name)
mlflow.set_tag("developer", "ds_team")
mlflow.log_params(params)
mlflow.log_param("train_samples", len(X_train))
mlflow.log_param("test_samples", len(X_test))
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred),
"auc_roc": roc_auc_score(y_test, y_prob),
}
mlflow.log_metrics(metrics)
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(cm)
disp.plot()
plt.title(f"{model_name} - Confusion Matrix")
plt.savefig(f"cm_{model_name}.png")
mlflow.log_artifact(f"cm_{model_name}.png")
plt.close()
mlflow.sklearn.log_model(model, "model")
return metrics
# 1. RandomForest
rf_params = {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5}
rf_model = RandomForestClassifier(**rf_params, random_state=42)
rf_metrics = log_experiment(rf_model, "RandomForest", rf_params,
X_train_scaled, y_train, X_test_scaled, y_test)
# 2. XGBoost
xgb_params = {"n_estimators": 100, "max_depth": 6, "learning_rate": 0.1, "subsample": 0.8}
xgb_model = xgb.XGBClassifier(**xgb_params, random_state=42, eval_metric="logloss")
xgb_metrics = log_experiment(xgb_model, "XGBoost", xgb_params,
X_train_scaled, y_train, X_test_scaled, y_test)
# 3. LightGBM
lgb_params = {"n_estimators": 100, "max_depth": 6, "learning_rate": 0.1, "num_leaves": 31}
lgb_model = lgb.LGBMClassifier(**lgb_params, random_state=42, verbose=-1)
lgb_metrics = log_experiment(lgb_model, "LightGBM", lgb_params,
X_train_scaled, y_train, X_test_scaled, y_test)
# 比較
comparison = pd.DataFrame([rf_metrics, xgb_metrics, lgb_metrics],
index=["RandomForest", "XGBoost", "LightGBM"])
print(comparison.sort_values("f1_score", ascending=False))
Mission 2: モデルレジストリに登録する
Mission 1で最もF1スコアが高いモデルをモデルレジストリに登録し、Productionステージに昇格させてください。
要件
- ベストモデルをレジストリに登録する
- モデルの説明とタグを追加する
- Staging → Production の昇格を行う
- Production モデルをロードして推論を実行する
解答例
from mlflow import MlflowClient
client = MlflowClient()
# ベストRunの取得
best_run = mlflow.search_runs(
experiment_names=["churn_prediction_comparison"],
order_by=["metrics.f1_score DESC"],
max_results=1
).iloc[0]
print(f"Best model: {best_run['tags.model_type']}")
print(f"F1 Score: {best_run['metrics.f1_score']:.4f}")
# モデル登録
model_uri = f"runs:/{best_run['run_id']}/model"
result = mlflow.register_model(model_uri, "churn_predictor")
# モデルの説明追加
client.update_registered_model(
name="churn_predictor",
description="NetShop社 顧客離脱予測モデル。月次バッチスコアリング用。"
)
client.update_model_version(
name="churn_predictor",
version=result.version,
description=f"F1={best_run['metrics.f1_score']:.4f}, AUC={best_run['metrics.auc_roc']:.4f}"
)
# タグ追加
client.set_model_version_tag("churn_predictor", result.version, "approved_by", "tanaka_vpoe")
# Staging → Production
client.set_registered_model_alias("churn_predictor", "staging", result.version)
print(f"Version {result.version} promoted to Staging")
client.set_registered_model_alias("churn_predictor", "production", result.version)
print(f"Version {result.version} promoted to Production")
# Production モデルの利用
prod_model = mlflow.pyfunc.load_model("models:/churn_predictor@production")
sample = X_test_scaled[:5]
predictions = prod_model.predict(sample)
print(f"Predictions: {predictions}")
Mission 3: DVCでデータバージョニングを設定する
学習データをDVCで管理し、コードとデータのバージョンを紐づけてください。
要件
- DVCを初期化し、学習データをトラッキングする
- データを更新し、新バージョンとして記録する
- 前バージョンのデータを復元する
解答例
# 1. DVC初期化
cd ml_project
dvc init
# 2. データの保存
mkdir -p data
python -c "
import pandas as pd
import numpy as np
np.random.seed(42)
# ... (Mission 1のデータ生成コードを実行)
data.to_csv('data/churn_data.csv', index=False)
print(f'Data shape: {data.shape}')
"
# 3. DVCでトラッキング
dvc add data/churn_data.csv
git add data/churn_data.csv.dvc data/.gitignore
git commit -m "Add churn data v1"
git tag data-v1
# 4. データの更新(新しいデータを追加)
python -c "
import pandas as pd
import numpy as np
np.random.seed(123)
# 追加データ生成
# ...
existing = pd.read_csv('data/churn_data.csv')
new_data = pd.concat([existing, additional_data], ignore_index=True)
new_data.to_csv('data/churn_data.csv', index=False)
print(f'Updated data shape: {new_data.shape}')
"
dvc add data/churn_data.csv
git add data/churn_data.csv.dvc
git commit -m "Update churn data v2 - added 1000 samples"
git tag data-v2
# 5. 前バージョンの復元
git checkout data-v1 -- data/churn_data.csv.dvc
dvc checkout
python -c "import pandas as pd; print(pd.read_csv('data/churn_data.csv').shape)"
# → v1のデータが復元される
# 元に戻す
git checkout data-v2 -- data/churn_data.csv.dvc
dvc checkout
達成度チェック
- MLflowで3つ以上のモデルの実験を記録できた
- パラメータ・メトリクス・アーティファクトすべて記録されている
- モデルレジストリに登録し、Production昇格ができた
- DVCでデータのバージョン管理を設定できた
- 前バージョンのデータ復元ができた
推定所要時間:90分