異常検知とアラート設計
「予測が外れたとき、いかに早く気づいて対処するかがビジネスの生命線だ。異常検知はエージェントの『目』にあたる部分だよ。」
田中VPoEが過去の在庫事故のレポートを開く。
「去年のゴールデンウィーク、需要予測が大きく外れて主力商品が3日間欠品した。損失額は800万円。もし異常検知が24時間早く作動していれば、緊急発注で半分は防げていた。」
異常検知の3つのアプローチ
需要予測における異常検知は、複数の手法を組み合わせて使う。
| アプローチ | 手法 | 検出対象 |
|---|---|---|
| 統計的手法 | Z-score、IQR、Grubbs検定 | 予測値からの大きな乖離 |
| 時系列手法 | 変化点検出、STL残差 | トレンドの急変、季節性の崩れ |
| 機械学習手法 | Isolation Forest、DBSCAN | 複合的な異常パターン |
統計的異常検知の実装
Z-scoreによる検知
import numpy as np
from scipy import stats
def detect_zscore_anomalies(
actual: np.ndarray,
predicted: np.ndarray,
threshold: float = 2.0
) -> dict:
"""Z-scoreベースの異常検知"""
residuals = actual - predicted
z_scores = stats.zscore(residuals)
anomaly_mask = np.abs(z_scores) > threshold
anomaly_indices = np.where(anomaly_mask)[0]
return {
"anomaly_count": int(anomaly_mask.sum()),
"anomaly_indices": anomaly_indices.tolist(),
"z_scores": z_scores.tolist(),
"mean_residual": float(np.mean(residuals)),
"std_residual": float(np.std(residuals))
}
移動ウィンドウによる適応的検知
固定の閾値では季節的な変動に対応できない場合がある。移動ウィンドウを使い、直近のパターンに適応する検知を行う。
def detect_adaptive_anomalies(
actual: pd.Series,
predicted: pd.Series,
window: int = 28,
threshold_sigma: float = 2.5
) -> pd.DataFrame:
"""移動ウィンドウによる適応的異常検知"""
residuals = actual - predicted
# 移動平均・移動標準偏差
rolling_mean = residuals.rolling(window=window, min_periods=7).mean()
rolling_std = residuals.rolling(window=window, min_periods=7).std()
# 適応的Z-score
adaptive_z = (residuals - rolling_mean) / rolling_std.clip(lower=1e-6)
result = pd.DataFrame({
'date': actual.index,
'actual': actual.values,
'predicted': predicted.values,
'residual': residuals.values,
'adaptive_z': adaptive_z.values,
'is_anomaly': np.abs(adaptive_z.values) > threshold_sigma
})
return result
変化点検出
需要のトレンドが急激に変化したポイントを検出する。
import ruptures
def detect_changepoints(
series: pd.Series,
model: str = "rbf",
penalty: float = 10.0
) -> list[dict]:
"""時系列の変化点を検出する"""
signal = series.values.reshape(-1, 1)
algo = ruptures.Pelt(model=model, min_size=7).fit(signal)
change_points = algo.predict(pen=penalty)
results = []
for cp in change_points[:-1]: # 最後の要素は系列長
date = series.index[cp]
before_mean = series.iloc[max(0, cp-7):cp].mean()
after_mean = series.iloc[cp:min(len(series), cp+7)].mean()
change_pct = (after_mean - before_mean) / before_mean * 100
results.append({
"date": date.strftime('%Y-%m-%d'),
"index": cp,
"before_mean": round(before_mean, 2),
"after_mean": round(after_mean, 2),
"change_percent": round(change_pct, 1)
})
return results
Isolation Forestによる複合異常検知
単一の指標だけでなく、複数の特徴量を組み合わせた異常検知を行う。
from sklearn.ensemble import IsolationForest
def detect_multivariate_anomalies(
df: pd.DataFrame,
feature_cols: list[str],
contamination: float = 0.05
) -> pd.DataFrame:
"""多変量Isolation Forestによる異常検知"""
features = df[feature_cols].dropna()
iso_forest = IsolationForest(
contamination=contamination,
random_state=42,
n_estimators=200
)
labels = iso_forest.fit_predict(features)
scores = iso_forest.decision_function(features)
result = features.copy()
result['is_anomaly'] = labels == -1
result['anomaly_score'] = scores
return result
# 使用例: 売上・石油価格・曜日パターンの複合異常
anomaly_features = ['sales', 'dcoilwtico', 'dayofweek', 'rolling_mean_7', 'lag_7']
anomalies = detect_multivariate_anomalies(grocery_features, anomaly_features)
print(f"異常検出数: {anomalies['is_anomaly'].sum()}")
アラート設計
アラートレベルの定義
from enum import Enum
class AlertLevel(Enum):
INFO = "INFO" # 情報のみ。対応不要
WARNING = "WARNING" # 注意が必要。24時間以内に確認
CRITICAL = "CRITICAL" # 重大。即座に対応が必要
EMERGENCY = "EMERGENCY" # 緊急。エスカレーション必須
ALERT_RULES = {
# 予測精度の劣化
"mape_degradation": {
"WARNING": lambda mape: mape > 15,
"CRITICAL": lambda mape: mape > 30,
},
# 欠品リスク
"stockout_risk": {
"WARNING": lambda risk: risk > 0.10,
"CRITICAL": lambda risk: risk > 0.30,
"EMERGENCY": lambda risk: risk > 0.50,
},
# 異常検知数
"anomaly_count": {
"WARNING": lambda count, total: count / total > 0.1,
"CRITICAL": lambda count, total: count / total > 0.3,
},
# 変化点検出
"changepoint": {
"WARNING": lambda pct: abs(pct) > 20,
"CRITICAL": lambda pct: abs(pct) > 50,
},
}
アラート統合ノード
def evaluate_alerts(state: DemandForecastState) -> list[dict]:
"""全指標を評価しアラートを統合する"""
alerts = []
# 予測精度チェック
mape = state["forecast_metrics"].get("mape", 0)
if mape > 30:
alerts.append({
"level": "CRITICAL",
"type": "mape_degradation",
"message": f"予測精度が大幅に劣化。MAPE={mape:.1f}%",
"action": "モデルの再学習を推奨"
})
elif mape > 15:
alerts.append({
"level": "WARNING",
"type": "mape_degradation",
"message": f"予測精度が低下傾向。MAPE={mape:.1f}%",
"action": "特徴量の見直しを検討"
})
# 欠品リスクチェック
risk = state.get("stockout_risk", 0)
if risk > 0.50:
alerts.append({
"level": "EMERGENCY",
"type": "stockout_risk",
"message": f"欠品リスクが極めて高い。リスク={risk:.1%}",
"action": "緊急発注を即時実行"
})
elif risk > 0.30:
alerts.append({
"level": "CRITICAL",
"type": "stockout_risk",
"message": f"欠品リスクが高い。リスク={risk:.1%}",
"action": "発注量の増加を推奨"
})
# 異常パターンチェック
anomaly_count = len(state.get("anomalies", []))
if anomaly_count >= 3:
alerts.append({
"level": "CRITICAL",
"type": "anomaly_pattern",
"message": f"直近7日間で{anomaly_count}件の異常パターンを検出",
"action": "外部要因の確認と予測モデルの再評価"
})
return alerts
通知チャネルのルーティング
NOTIFICATION_ROUTING = {
"INFO": ["dashboard"],
"WARNING": ["dashboard", "slack_channel"],
"CRITICAL": ["dashboard", "slack_channel", "email"],
"EMERGENCY": ["dashboard", "slack_channel", "email", "phone_call"],
}
def route_notification(alert: dict) -> None:
"""アラートレベルに応じて通知先を決定する"""
channels = NOTIFICATION_ROUTING.get(alert["level"], ["dashboard"])
for channel in channels:
send_to_channel(channel, alert)
実運用での注意点
| 注意点 | 対策 |
|---|---|
| アラート疲れ | 閾値を段階的に調整し、本当に重要なものだけ通知 |
| 誤検知 | 過去のアラート履歴を分析し、精度を改善 |
| 季節変動 | 適応的閾値を使い、季節ごとの正常範囲を動的に設定 |
| 連鎖アラート | 同一原因のアラートはグルーピングして1件にまとめる |
| 夜間・休日 | EMERGENCY以外は翌営業日対応とし、対応者の負担を軽減 |
まとめ
| 項目 | ポイント |
|---|---|
| 統計的検知 | Z-score、適応的ウィンドウで基本的な乖離を検出 |
| 変化点検出 | rupturesで需要トレンドの急変を検出 |
| 複合検知 | Isolation Forestで多変量の異常パターンを捕捉 |
| アラート設計 | レベル別の定義、ルーティング、運用上の注意点 |
チェックリスト
- Z-scoreと適応的ウィンドウの違いを理解した
- 変化点検出のrupturesライブラリの使い方を把握した
- Isolation Forestによる多変量異常検知を理解した
- アラートレベル設計と通知ルーティングを理解した
次のステップへ
異常検知とアラートの設計が完了した。次の演習では、ここまで学んだ全てのツールとワークフローを統合し、実際に動作する需要予測AIエージェントを構築しよう。
推定読了時間: 30分