LangGraphワークフロー構築
「ツールが揃ったら、次はそれらを一連のワークフローに組み上げる。日次で自動実行される予測パイプラインだ。」
田中VPoEがワークフロー図を描き始める。
「LangGraphを使えば、予測→異常検知→発注量計算→アラート生成という一連の流れを、状態管理付きのグラフとして構築できる。条件分岐やリトライも宣言的に書けるのが強みだ。」
LangGraphの基本概念
LangGraphは、LLMアプリケーションのワークフローをグラフ構造で定義するフレームワークである。
| 概念 | 説明 |
|---|---|
| State | ワークフロー全体で共有される状態オブジェクト |
| Node | 処理の単位。ツール呼び出しやLLM推論を実行 |
| Edge | ノード間の遷移。条件分岐も定義可能 |
| Graph | ノードとエッジで構成されるワークフロー全体 |
ワークフローの全体設計
需要予測エージェントのワークフローは以下の4ステップで構成される。
日次予測 → 異常検知 → 発注量計算 → アラート生成
↓ ↓ ↓ ↓
forecast → detect → calculate → alert
node node node node
State定義
ワークフローの状態を型安全に定義する。
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
import operator
class DemandForecastState(TypedDict):
# 入力パラメータ
store_nbr: int
family: str
current_stock: float
lead_time_days: int
service_level: float
# 予測結果
predictions: list[dict]
forecast_method: str
forecast_metrics: dict
# 異常検知結果
anomalies: list[dict]
alert_level: str
# 発注計算結果
recommended_quantity: float
safety_stock: float
reorder_point: float
stockout_risk: float
# アラート
alerts: Annotated[list[str], operator.add]
# 制御フラグ
needs_reforecast: bool
error_message: str | None
Node実装
予測ノード
def forecast_node(state: DemandForecastState) -> dict:
"""日次の需要予測を実行する"""
try:
result = forecast_demand.invoke({
"store_nbr": state["store_nbr"],
"family": state["family"],
"horizon_days": 15,
"method": "ensemble"
})
return {
"predictions": result.predictions,
"forecast_method": result.method,
"forecast_metrics": result.metrics,
"error_message": None
}
except Exception as e:
return {
"predictions": [],
"error_message": f"予測エラー: {str(e)}",
"alerts": [f"[ERROR] 予測実行に失敗: {str(e)}"]
}
異常検知ノード
def detect_node(state: DemandForecastState) -> dict:
"""予測結果と直近の実績を比較し異常を検知する"""
# 直近の実績データを取得
actual_values = load_recent_actuals(
state["store_nbr"], state["family"], days=7
)
if not actual_values:
return {
"anomalies": [],
"alert_level": "NORMAL",
"alerts": ["[INFO] 直近の実績データなし。異常検知をスキップ"]
}
result = detect_anomalies.invoke({
"actual_values": actual_values,
"predicted_values": state["predictions"][:len(actual_values)],
"threshold_sigma": 2.0
})
alerts = []
if result.alert_level == "WARNING":
alerts.append(
f"[WARNING] {result.anomaly_count}件の異常検出。{result.summary}"
)
elif result.alert_level == "CRITICAL":
alerts.append(
f"[CRITICAL] {result.anomaly_count}件の重大な異常。即座に確認が必要。{result.summary}"
)
return {
"anomalies": result.anomalies,
"alert_level": result.alert_level,
"needs_reforecast": result.alert_level == "CRITICAL",
"alerts": alerts
}
発注量計算ノード
def calculate_node(state: DemandForecastState) -> dict:
"""最適な発注量を計算する"""
result = calculate_order.invoke({
"store_nbr": state["store_nbr"],
"family": state["family"],
"current_stock": state["current_stock"],
"lead_time_days": state["lead_time_days"],
"service_level": state["service_level"],
"predictions": state["predictions"]
})
alerts = []
if result.expected_stockout_risk > 0.1:
alerts.append(
f"[WARNING] 欠品リスク {result.expected_stockout_risk:.1%}。"
f"推奨発注量: {result.recommended_quantity:.0f}"
)
if result.recommended_quantity > 0:
alerts.append(
f"[ORDER] 発注推奨: {result.recommended_quantity:.0f}個 "
f"(安全在庫: {result.safety_stock:.0f}, 発注点: {result.reorder_point:.0f})"
)
return {
"recommended_quantity": result.recommended_quantity,
"safety_stock": result.safety_stock,
"reorder_point": result.reorder_point,
"stockout_risk": result.expected_stockout_risk,
"alerts": alerts
}
アラート生成ノード
def alert_node(state: DemandForecastState) -> dict:
"""アラートを集約しレポートを生成する"""
report_lines = [
f"=== 需要予測レポート ===",
f"店舗: {state['store_nbr']}, カテゴリ: {state['family']}",
f"予測手法: {state['forecast_method']}",
f"異常レベル: {state['alert_level']}",
f"推奨発注量: {state['recommended_quantity']:.0f}",
f"欠品リスク: {state['stockout_risk']:.1%}",
f"",
f"--- アラート一覧 ---",
]
for alert in state.get("alerts", []):
report_lines.append(f" {alert}")
report = "\n".join(report_lines)
# Slack通知やメール送信などの外部連携
if state["alert_level"] in ("WARNING", "CRITICAL"):
send_notification(report, channel="supply-chain-alerts")
return {"alerts": [f"[REPORT] レポート生成完了。アラート数: {len(state.get('alerts', []))}"]}
条件分岐の定義
def should_reforecast(state: DemandForecastState) -> Literal["reforecast", "calculate"]:
"""異常検知結果に基づき再予測が必要か判定する"""
if state.get("needs_reforecast", False):
return "reforecast"
return "calculate"
def has_error(state: DemandForecastState) -> Literal["error", "detect"]:
"""予測エラーの有無をチェックする"""
if state.get("error_message"):
return "error"
return "detect"
グラフの構築
# グラフの定義
workflow = StateGraph(DemandForecastState)
# ノードの追加
workflow.add_node("forecast", forecast_node)
workflow.add_node("detect", detect_node)
workflow.add_node("calculate", calculate_node)
workflow.add_node("alert", alert_node)
# エッジの定義
workflow.set_entry_point("forecast")
# 予測 → エラーチェック → 異常検知 or 終了
workflow.add_conditional_edges(
"forecast",
has_error,
{
"error": "alert", # エラー時はアラートへ直行
"detect": "detect" # 正常時は異常検知へ
}
)
# 異常検知 → 再予測判定 → 再予測 or 発注計算
workflow.add_conditional_edges(
"detect",
should_reforecast,
{
"reforecast": "forecast", # 重大異常時は再予測
"calculate": "calculate" # 通常時は発注計算へ
}
)
# 発注計算 → アラート生成 → 終了
workflow.add_edge("calculate", "alert")
workflow.add_edge("alert", END)
# コンパイル
app = workflow.compile()
ワークフローの実行
# 実行
initial_state = {
"store_nbr": 1,
"family": "GROCERY I",
"current_stock": 5000.0,
"lead_time_days": 3,
"service_level": 0.95,
"alerts": [],
"needs_reforecast": False,
"error_message": None,
}
result = app.invoke(initial_state)
# 結果の確認
print(f"予測手法: {result['forecast_method']}")
print(f"異常レベル: {result['alert_level']}")
print(f"推奨発注量: {result['recommended_quantity']:.0f}")
print(f"欠品リスク: {result['stockout_risk']:.1%}")
print(f"\nアラート:")
for alert in result['alerts']:
print(f" {alert}")
ワークフローの可視化
# グラフの可視化(Mermaid形式)
print(app.get_graph().draw_mermaid())
# Jupyter環境では画像として表示
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))
まとめ
| 項目 | ポイント |
|---|---|
| State | ワークフロー全体の状態を型安全に定義 |
| Node | 予測・検知・計算・アラートの4ノード構成 |
| 条件分岐 | 異常検知結果で再予測ループを制御 |
| エラー処理 | エラー時はアラートノードへ直行する設計 |
チェックリスト
- LangGraphのState/Node/Edge/Graphの概念を理解した
- 需要予測ワークフローの4ステップ構成を把握した
- 条件分岐による再予測ループのロジックを理解した
- ワークフローの構築・実行・可視化ができる
次のステップへ
ワークフローの基本構造ができたところで、次は異常検知の精度を高め、ビジネスに直結するアラート条件の設計に踏み込もう。
推定読了時間: 30分