LESSON 30分

異常検知とアラート設計

「予測が外れたとき、いかに早く気づいて対処するかがビジネスの生命線だ。異常検知はエージェントの『目』にあたる部分だよ。」

田中VPoEが過去の在庫事故のレポートを開く。

「去年のゴールデンウィーク、需要予測が大きく外れて主力商品が3日間欠品した。損失額は800万円。もし異常検知が24時間早く作動していれば、緊急発注で半分は防げていた。」

異常検知の3つのアプローチ

需要予測における異常検知は、複数の手法を組み合わせて使う。

アプローチ手法検出対象
統計的手法Z-score、IQR、Grubbs検定予測値からの大きな乖離
時系列手法変化点検出、STL残差トレンドの急変、季節性の崩れ
機械学習手法Isolation Forest、DBSCAN複合的な異常パターン

統計的異常検知の実装

Z-scoreによる検知

import numpy as np
from scipy import stats

def detect_zscore_anomalies(
    actual: np.ndarray,
    predicted: np.ndarray,
    threshold: float = 2.0
) -> dict:
    """Z-scoreベースの異常検知"""
    residuals = actual - predicted
    z_scores = stats.zscore(residuals)

    anomaly_mask = np.abs(z_scores) > threshold
    anomaly_indices = np.where(anomaly_mask)[0]

    return {
        "anomaly_count": int(anomaly_mask.sum()),
        "anomaly_indices": anomaly_indices.tolist(),
        "z_scores": z_scores.tolist(),
        "mean_residual": float(np.mean(residuals)),
        "std_residual": float(np.std(residuals))
    }

移動ウィンドウによる適応的検知

固定の閾値では季節的な変動に対応できない場合がある。移動ウィンドウを使い、直近のパターンに適応する検知を行う。

def detect_adaptive_anomalies(
    actual: pd.Series,
    predicted: pd.Series,
    window: int = 28,
    threshold_sigma: float = 2.5
) -> pd.DataFrame:
    """移動ウィンドウによる適応的異常検知"""
    residuals = actual - predicted

    # 移動平均・移動標準偏差
    rolling_mean = residuals.rolling(window=window, min_periods=7).mean()
    rolling_std = residuals.rolling(window=window, min_periods=7).std()

    # 適応的Z-score
    adaptive_z = (residuals - rolling_mean) / rolling_std.clip(lower=1e-6)

    result = pd.DataFrame({
        'date': actual.index,
        'actual': actual.values,
        'predicted': predicted.values,
        'residual': residuals.values,
        'adaptive_z': adaptive_z.values,
        'is_anomaly': np.abs(adaptive_z.values) > threshold_sigma
    })

    return result

変化点検出

需要のトレンドが急激に変化したポイントを検出する。

import ruptures

def detect_changepoints(
    series: pd.Series,
    model: str = "rbf",
    penalty: float = 10.0
) -> list[dict]:
    """時系列の変化点を検出する"""
    signal = series.values.reshape(-1, 1)

    algo = ruptures.Pelt(model=model, min_size=7).fit(signal)
    change_points = algo.predict(pen=penalty)

    results = []
    for cp in change_points[:-1]:  # 最後の要素は系列長
        date = series.index[cp]
        before_mean = series.iloc[max(0, cp-7):cp].mean()
        after_mean = series.iloc[cp:min(len(series), cp+7)].mean()
        change_pct = (after_mean - before_mean) / before_mean * 100

        results.append({
            "date": date.strftime('%Y-%m-%d'),
            "index": cp,
            "before_mean": round(before_mean, 2),
            "after_mean": round(after_mean, 2),
            "change_percent": round(change_pct, 1)
        })

    return results

Isolation Forestによる複合異常検知

単一の指標だけでなく、複数の特徴量を組み合わせた異常検知を行う。

from sklearn.ensemble import IsolationForest

def detect_multivariate_anomalies(
    df: pd.DataFrame,
    feature_cols: list[str],
    contamination: float = 0.05
) -> pd.DataFrame:
    """多変量Isolation Forestによる異常検知"""
    features = df[feature_cols].dropna()

    iso_forest = IsolationForest(
        contamination=contamination,
        random_state=42,
        n_estimators=200
    )
    labels = iso_forest.fit_predict(features)
    scores = iso_forest.decision_function(features)

    result = features.copy()
    result['is_anomaly'] = labels == -1
    result['anomaly_score'] = scores

    return result

# 使用例: 売上・石油価格・曜日パターンの複合異常
anomaly_features = ['sales', 'dcoilwtico', 'dayofweek', 'rolling_mean_7', 'lag_7']
anomalies = detect_multivariate_anomalies(grocery_features, anomaly_features)
print(f"異常検出数: {anomalies['is_anomaly'].sum()}")

アラート設計

アラートレベルの定義

from enum import Enum

class AlertLevel(Enum):
    INFO = "INFO"           # 情報のみ。対応不要
    WARNING = "WARNING"     # 注意が必要。24時間以内に確認
    CRITICAL = "CRITICAL"   # 重大。即座に対応が必要
    EMERGENCY = "EMERGENCY" # 緊急。エスカレーション必須

ALERT_RULES = {
    # 予測精度の劣化
    "mape_degradation": {
        "WARNING": lambda mape: mape > 15,
        "CRITICAL": lambda mape: mape > 30,
    },
    # 欠品リスク
    "stockout_risk": {
        "WARNING": lambda risk: risk > 0.10,
        "CRITICAL": lambda risk: risk > 0.30,
        "EMERGENCY": lambda risk: risk > 0.50,
    },
    # 異常検知数
    "anomaly_count": {
        "WARNING": lambda count, total: count / total > 0.1,
        "CRITICAL": lambda count, total: count / total > 0.3,
    },
    # 変化点検出
    "changepoint": {
        "WARNING": lambda pct: abs(pct) > 20,
        "CRITICAL": lambda pct: abs(pct) > 50,
    },
}

アラート統合ノード

def evaluate_alerts(state: DemandForecastState) -> list[dict]:
    """全指標を評価しアラートを統合する"""
    alerts = []

    # 予測精度チェック
    mape = state["forecast_metrics"].get("mape", 0)
    if mape > 30:
        alerts.append({
            "level": "CRITICAL",
            "type": "mape_degradation",
            "message": f"予測精度が大幅に劣化。MAPE={mape:.1f}%",
            "action": "モデルの再学習を推奨"
        })
    elif mape > 15:
        alerts.append({
            "level": "WARNING",
            "type": "mape_degradation",
            "message": f"予測精度が低下傾向。MAPE={mape:.1f}%",
            "action": "特徴量の見直しを検討"
        })

    # 欠品リスクチェック
    risk = state.get("stockout_risk", 0)
    if risk > 0.50:
        alerts.append({
            "level": "EMERGENCY",
            "type": "stockout_risk",
            "message": f"欠品リスクが極めて高い。リスク={risk:.1%}",
            "action": "緊急発注を即時実行"
        })
    elif risk > 0.30:
        alerts.append({
            "level": "CRITICAL",
            "type": "stockout_risk",
            "message": f"欠品リスクが高い。リスク={risk:.1%}",
            "action": "発注量の増加を推奨"
        })

    # 異常パターンチェック
    anomaly_count = len(state.get("anomalies", []))
    if anomaly_count >= 3:
        alerts.append({
            "level": "CRITICAL",
            "type": "anomaly_pattern",
            "message": f"直近7日間で{anomaly_count}件の異常パターンを検出",
            "action": "外部要因の確認と予測モデルの再評価"
        })

    return alerts

通知チャネルのルーティング

NOTIFICATION_ROUTING = {
    "INFO": ["dashboard"],
    "WARNING": ["dashboard", "slack_channel"],
    "CRITICAL": ["dashboard", "slack_channel", "email"],
    "EMERGENCY": ["dashboard", "slack_channel", "email", "phone_call"],
}

def route_notification(alert: dict) -> None:
    """アラートレベルに応じて通知先を決定する"""
    channels = NOTIFICATION_ROUTING.get(alert["level"], ["dashboard"])
    for channel in channels:
        send_to_channel(channel, alert)

実運用での注意点

注意点対策
アラート疲れ閾値を段階的に調整し、本当に重要なものだけ通知
誤検知過去のアラート履歴を分析し、精度を改善
季節変動適応的閾値を使い、季節ごとの正常範囲を動的に設定
連鎖アラート同一原因のアラートはグルーピングして1件にまとめる
夜間・休日EMERGENCY以外は翌営業日対応とし、対応者の負担を軽減

まとめ

項目ポイント
統計的検知Z-score、適応的ウィンドウで基本的な乖離を検出
変化点検出rupturesで需要トレンドの急変を検出
複合検知Isolation Forestで多変量の異常パターンを捕捉
アラート設計レベル別の定義、ルーティング、運用上の注意点

チェックリスト

  • Z-scoreと適応的ウィンドウの違いを理解した
  • 変化点検出のrupturesライブラリの使い方を把握した
  • Isolation Forestによる多変量異常検知を理解した
  • アラートレベル設計と通知ルーティングを理解した

次のステップへ

異常検知とアラートの設計が完了した。次の演習では、ここまで学んだ全てのツールとワークフローを統合し、実際に動作する需要予測AIエージェントを構築しよう。

推定読了時間: 30分