演習:需要予測AIエージェントを実装しよう
「設計したコンポーネントを全部つなげよう。物流チームが明日から使えるレベルのエージェントにしてくれ。」
田中VPoEが要件リストを渡す。
「予測、発注、アラート。全機能を統合して、自然言語で操作できるAIエージェントを完成させるんだ。」
ミッション概要
これまでに設計した各Toolを統合し、LangGraphベースの需要予測AIエージェントを完成させる。
Mission 1: Tool群の統合(30分)
タスク:
- DataFetchTool, ForecastTool, AnomalyDetectionTool, OrderingTool, TrendAnalysisToolを統合する
- 各Toolのインターフェースを統一する(入力, 出力)
- エラーハンドリングを追加する(データ不足、不正なパラメータ)
- 各Toolの単体テストを実装する
解答例
from langchain_core.tools import tool
import traceback
class ToolRegistry:
"""Tool群を管理するレジストリ"""
def __init__(self):
self.tools = {
'forecast': forecast_tool,
'anomaly': anomaly_detection_tool,
'ordering': ordering_tool,
'trend': trend_analysis_tool,
}
def get_tool(self, name: str):
if name not in self.tools:
raise ValueError(f"Unknown tool: {name}. Available: {list(self.tools.keys())}")
return self.tools[name]
def execute(self, name: str, **kwargs) -> dict:
try:
tool = self.get_tool(name)
result = tool.invoke(kwargs)
return {'status': 'success', 'result': result}
except Exception as e:
return {
'status': 'error',
'error': str(e),
'traceback': traceback.format_exc()
}
# 単体テスト
def test_tools():
registry = ToolRegistry()
# ForecastToolのテスト
result = registry.execute('forecast', family='GROCERY I', horizon=7)
assert result['status'] == 'success'
assert len(result['result']['predictions']) == 7
# AnomalyToolのテスト
result = registry.execute('anomaly', family='GROCERY I')
assert result['status'] == 'success'
assert 'anomaly_count' in result['result']
# OrderingToolのテスト
result = registry.execute('ordering', family='GROCERY I', current_stock=1000)
assert result['status'] == 'success'
assert 'recommended_order_quantity' in result['result']
# TrendToolのテスト
result = registry.execute('trend', family='GROCERY I')
assert result['status'] == 'success'
assert 'growth_rate' in result['result']
# エラーケースのテスト
result = registry.execute('forecast', family='NONEXISTENT')
assert result['status'] == 'error' or result['result']['predictions'] == []
print("All tests passed!")
test_tools()
Mission 2: LangGraphワークフローの実装(30分)
タスク:
- StateGraphでエージェントのワークフローを定義する
- クエリ解析ノードを実装する
- ルーティングノードを実装する
- 応答生成ノードを実装する
- ワークフロー全体を結合し動作確認する
解答例
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
import json
class DemandAgentState(TypedDict):
messages: Annotated[list, operator.add]
query_type: str
parameters: dict
tool_result: dict
response: str
def parse_query_node(state: DemandAgentState) -> dict:
"""ユーザーのクエリを解析"""
user_message = state['messages'][-1]
# LLMでクエリを解析
prompt = f"""以下の質問を分析し、JSON形式で返してください:
質問: {user_message}
{{
"query_type": "forecast|anomaly|ordering|trend",
"family": "カテゴリ名 or null",
"store_nbr": "店舗番号 or null",
"horizon": "予測日数 or 7",
"current_stock": "現在在庫 or null"
}}"""
# LLM呼び出し(簡略化)
parsed = llm.invoke(prompt)
params = json.loads(parsed.content)
return {
'query_type': params.get('query_type', 'forecast'),
'parameters': params
}
def route_node(state: DemandAgentState) -> str:
"""クエリタイプに基づいてルーティング"""
return state['query_type']
def forecast_node(state: DemandAgentState) -> dict:
"""予測を実行"""
params = state['parameters']
result = forecast_tool.invoke({
'family': params.get('family', 'GROCERY I'),
'store_nbr': params.get('store_nbr'),
'horizon': params.get('horizon', 7),
})
return {'tool_result': result}
def generate_response_node(state: DemandAgentState) -> dict:
"""結果を自然言語に変換"""
result = state['tool_result']
query_type = state['query_type']
prompt = f"""以下の分析結果を、物流担当者に分かりやすい日本語で説明してください。
分析種別: {query_type}
結果: {json.dumps(result, ensure_ascii=False, indent=2)}
箇条書きで要点をまとめ、推奨アクションも提示してください。"""
response = llm.invoke(prompt)
return {'response': response.content}
# ワークフロー構築
workflow = StateGraph(DemandAgentState)
workflow.add_node("parse", parse_query_node)
workflow.add_node("forecast", forecast_node)
workflow.add_node("anomaly", anomaly_node)
workflow.add_node("ordering", ordering_node)
workflow.add_node("trend", trend_node)
workflow.add_node("respond", generate_response_node)
workflow.set_entry_point("parse")
workflow.add_conditional_edges("parse", route_node, {
"forecast": "forecast",
"anomaly": "anomaly",
"ordering": "ordering",
"trend": "trend",
})
for n in ["forecast", "anomaly", "ordering", "trend"]:
workflow.add_edge(n, "respond")
workflow.add_edge("respond", END)
agent = workflow.compile()
Mission 3: エージェントのテスト(30分)
タスク:
- 以下のシナリオでエージェントをテストする
- 各応答の品質を評価する
- エッジケースでの挙動を確認する
テストシナリオ:
| # | 質問 | 期待される動作 |
|---|---|---|
| 1 | 「来週のGROCERY Iの売上予測を教えて」 | 7日間の日次予測を返す |
| 2 | 「BEVERAGESに異常な動きはある?」 | 異常検知結果を返す |
| 3 | 「店舗3のPRODUCEの発注量を計算して。在庫は500個」 | 推奨発注量を返す |
| 4 | 「DAIRY のトレンドはどうなっている?」 | 成長率と季節パターンを返す |
| 5 | 「すべてのカテゴリの欠品リスクを確認して」 | 全カテゴリのリスク評価(エッジケース) |
解答例
test_queries = [
"来週のGROCERY Iの売上予測を教えて",
"BEVERAGESに異常な動きはある?",
"店舗3のPRODUCEの発注量を計算して。在庫は500個",
"DAIRYのトレンドはどうなっている?",
"すべてのカテゴリの欠品リスクを確認して",
]
for query in test_queries:
print(f"\n{'='*60}")
print(f"質問: {query}")
print(f"{'='*60}")
result = agent.invoke({
'messages': [query],
'query_type': '',
'parameters': {},
'tool_result': {},
'response': ''
})
print(f"\n応答:")
print(result['response'])
print(f"\nクエリタイプ: {result['query_type']}")
Mission 4: アラートシステムの統合(30分)
タスク:
- AlertEngineをエージェントに統合する
- 定期チェック機能を追加する(全カテゴリ巡回)
- 日次レポートの自動生成機能を実装する
- アラートの優先度に基づくソート・フィルタリングを実装する
解答例
class DemandForecastAgent:
"""統合された需要予測AIエージェント"""
def __init__(self):
self.tool_registry = ToolRegistry()
self.alert_engine = AlertEngine()
self.workflow = self._build_workflow()
def chat(self, query: str) -> str:
"""チャットインターフェース"""
result = self.workflow.invoke({
'messages': [query],
'query_type': '', 'parameters': {},
'tool_result': {}, 'response': ''
})
return result['response']
def daily_check(self, families: list[str] = None) -> str:
"""日次の全カテゴリチェック"""
if families is None:
families = ['GROCERY I', 'BEVERAGES', 'PRODUCE', 'CLEANING', 'DAIRY']
all_alerts = []
for family in families:
# 異常検知
anomaly_result = self.tool_registry.execute('anomaly', family=family)
if anomaly_result['status'] == 'success':
for anomaly in anomaly_result['result'].get('anomalies', []):
alert = self.alert_engine.check_demand_anomaly(
actual=anomaly['actual_sales'],
predicted=anomaly['expected_sales'],
family=family
)
if alert:
all_alerts.append(alert)
# 日次レポート生成
return generate_daily_report(self.alert_engine)
# 使用例
agent = DemandForecastAgent()
# チャット
print(agent.chat("GROCERY Iの来週の需要予測は?"))
# 日次チェック
print(agent.daily_check())
達成度チェック
- Tool群を統合し単体テストをパスした
- LangGraphワークフローを構築し動作確認した
- 5つのテストシナリオで正常に応答した
- AlertEngineを統合し日次チェック機能を実装した
- エッジケースでのエラーハンドリングを確認した
推定所要時間: 120分