LESSON 30分

LangGraphワークフロー構築

「ツールが揃ったら、次はそれらを一連のワークフローに組み上げる。日次で自動実行される予測パイプラインだ。」

田中VPoEがワークフロー図を描き始める。

「LangGraphを使えば、予測→異常検知→発注量計算→アラート生成という一連の流れを、状態管理付きのグラフとして構築できる。条件分岐やリトライも宣言的に書けるのが強みだ。」

LangGraphの基本概念

LangGraphは、LLMアプリケーションのワークフローをグラフ構造で定義するフレームワークである。

概念説明
Stateワークフロー全体で共有される状態オブジェクト
Node処理の単位。ツール呼び出しやLLM推論を実行
Edgeノード間の遷移。条件分岐も定義可能
Graphノードとエッジで構成されるワークフロー全体

ワークフローの全体設計

需要予測エージェントのワークフローは以下の4ステップで構成される。

日次予測 → 異常検知 → 発注量計算 → アラート生成
   ↓          ↓           ↓            ↓
forecast → detect → calculate → alert
   node     node      node       node

State定義

ワークフローの状態を型安全に定義する。

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
import operator

class DemandForecastState(TypedDict):
    # 入力パラメータ
    store_nbr: int
    family: str
    current_stock: float
    lead_time_days: int
    service_level: float

    # 予測結果
    predictions: list[dict]
    forecast_method: str
    forecast_metrics: dict

    # 異常検知結果
    anomalies: list[dict]
    alert_level: str

    # 発注計算結果
    recommended_quantity: float
    safety_stock: float
    reorder_point: float
    stockout_risk: float

    # アラート
    alerts: Annotated[list[str], operator.add]

    # 制御フラグ
    needs_reforecast: bool
    error_message: str | None

Node実装

予測ノード

def forecast_node(state: DemandForecastState) -> dict:
    """日次の需要予測を実行する"""
    try:
        result = forecast_demand.invoke({
            "store_nbr": state["store_nbr"],
            "family": state["family"],
            "horizon_days": 15,
            "method": "ensemble"
        })
        return {
            "predictions": result.predictions,
            "forecast_method": result.method,
            "forecast_metrics": result.metrics,
            "error_message": None
        }
    except Exception as e:
        return {
            "predictions": [],
            "error_message": f"予測エラー: {str(e)}",
            "alerts": [f"[ERROR] 予測実行に失敗: {str(e)}"]
        }

異常検知ノード

def detect_node(state: DemandForecastState) -> dict:
    """予測結果と直近の実績を比較し異常を検知する"""
    # 直近の実績データを取得
    actual_values = load_recent_actuals(
        state["store_nbr"], state["family"], days=7
    )

    if not actual_values:
        return {
            "anomalies": [],
            "alert_level": "NORMAL",
            "alerts": ["[INFO] 直近の実績データなし。異常検知をスキップ"]
        }

    result = detect_anomalies.invoke({
        "actual_values": actual_values,
        "predicted_values": state["predictions"][:len(actual_values)],
        "threshold_sigma": 2.0
    })

    alerts = []
    if result.alert_level == "WARNING":
        alerts.append(
            f"[WARNING] {result.anomaly_count}件の異常検出。{result.summary}"
        )
    elif result.alert_level == "CRITICAL":
        alerts.append(
            f"[CRITICAL] {result.anomaly_count}件の重大な異常。即座に確認が必要。{result.summary}"
        )

    return {
        "anomalies": result.anomalies,
        "alert_level": result.alert_level,
        "needs_reforecast": result.alert_level == "CRITICAL",
        "alerts": alerts
    }

発注量計算ノード

def calculate_node(state: DemandForecastState) -> dict:
    """最適な発注量を計算する"""
    result = calculate_order.invoke({
        "store_nbr": state["store_nbr"],
        "family": state["family"],
        "current_stock": state["current_stock"],
        "lead_time_days": state["lead_time_days"],
        "service_level": state["service_level"],
        "predictions": state["predictions"]
    })

    alerts = []
    if result.expected_stockout_risk > 0.1:
        alerts.append(
            f"[WARNING] 欠品リスク {result.expected_stockout_risk:.1%}。"
            f"推奨発注量: {result.recommended_quantity:.0f}"
        )
    if result.recommended_quantity > 0:
        alerts.append(
            f"[ORDER] 発注推奨: {result.recommended_quantity:.0f}個 "
            f"(安全在庫: {result.safety_stock:.0f}, 発注点: {result.reorder_point:.0f})"
        )

    return {
        "recommended_quantity": result.recommended_quantity,
        "safety_stock": result.safety_stock,
        "reorder_point": result.reorder_point,
        "stockout_risk": result.expected_stockout_risk,
        "alerts": alerts
    }

アラート生成ノード

def alert_node(state: DemandForecastState) -> dict:
    """アラートを集約しレポートを生成する"""
    report_lines = [
        f"=== 需要予測レポート ===",
        f"店舗: {state['store_nbr']}, カテゴリ: {state['family']}",
        f"予測手法: {state['forecast_method']}",
        f"異常レベル: {state['alert_level']}",
        f"推奨発注量: {state['recommended_quantity']:.0f}",
        f"欠品リスク: {state['stockout_risk']:.1%}",
        f"",
        f"--- アラート一覧 ---",
    ]
    for alert in state.get("alerts", []):
        report_lines.append(f"  {alert}")

    report = "\n".join(report_lines)

    # Slack通知やメール送信などの外部連携
    if state["alert_level"] in ("WARNING", "CRITICAL"):
        send_notification(report, channel="supply-chain-alerts")

    return {"alerts": [f"[REPORT] レポート生成完了。アラート数: {len(state.get('alerts', []))}"]}

条件分岐の定義

def should_reforecast(state: DemandForecastState) -> Literal["reforecast", "calculate"]:
    """異常検知結果に基づき再予測が必要か判定する"""
    if state.get("needs_reforecast", False):
        return "reforecast"
    return "calculate"

def has_error(state: DemandForecastState) -> Literal["error", "detect"]:
    """予測エラーの有無をチェックする"""
    if state.get("error_message"):
        return "error"
    return "detect"

グラフの構築

# グラフの定義
workflow = StateGraph(DemandForecastState)

# ノードの追加
workflow.add_node("forecast", forecast_node)
workflow.add_node("detect", detect_node)
workflow.add_node("calculate", calculate_node)
workflow.add_node("alert", alert_node)

# エッジの定義
workflow.set_entry_point("forecast")

# 予測 → エラーチェック → 異常検知 or 終了
workflow.add_conditional_edges(
    "forecast",
    has_error,
    {
        "error": "alert",      # エラー時はアラートへ直行
        "detect": "detect"     # 正常時は異常検知へ
    }
)

# 異常検知 → 再予測判定 → 再予測 or 発注計算
workflow.add_conditional_edges(
    "detect",
    should_reforecast,
    {
        "reforecast": "forecast",  # 重大異常時は再予測
        "calculate": "calculate"   # 通常時は発注計算へ
    }
)

# 発注計算 → アラート生成 → 終了
workflow.add_edge("calculate", "alert")
workflow.add_edge("alert", END)

# コンパイル
app = workflow.compile()

ワークフローの実行

# 実行
initial_state = {
    "store_nbr": 1,
    "family": "GROCERY I",
    "current_stock": 5000.0,
    "lead_time_days": 3,
    "service_level": 0.95,
    "alerts": [],
    "needs_reforecast": False,
    "error_message": None,
}

result = app.invoke(initial_state)

# 結果の確認
print(f"予測手法: {result['forecast_method']}")
print(f"異常レベル: {result['alert_level']}")
print(f"推奨発注量: {result['recommended_quantity']:.0f}")
print(f"欠品リスク: {result['stockout_risk']:.1%}")
print(f"\nアラート:")
for alert in result['alerts']:
    print(f"  {alert}")

ワークフローの可視化

# グラフの可視化(Mermaid形式)
print(app.get_graph().draw_mermaid())

# Jupyter環境では画像として表示
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))

まとめ

項目ポイント
Stateワークフロー全体の状態を型安全に定義
Node予測・検知・計算・アラートの4ノード構成
条件分岐異常検知結果で再予測ループを制御
エラー処理エラー時はアラートノードへ直行する設計

チェックリスト

  • LangGraphのState/Node/Edge/Graphの概念を理解した
  • 需要予測ワークフローの4ステップ構成を把握した
  • 条件分岐による再予測ループのロジックを理解した
  • ワークフローの構築・実行・可視化ができる

次のステップへ

ワークフローの基本構造ができたところで、次は異常検知の精度を高め、ビジネスに直結するアラート条件の設計に踏み込もう。

推定読了時間: 30分