LESSON

演習:需要予測AIエージェントを実装しよう

「設計したコンポーネントを全部つなげよう。物流チームが明日から使えるレベルのエージェントにしてくれ。」

田中VPoEが要件リストを渡す。

「予測、発注、アラート。全機能を統合して、自然言語で操作できるAIエージェントを完成させるんだ。」

ミッション概要

これまでに設計した各Toolを統合し、LangGraphベースの需要予測AIエージェントを完成させる。


Mission 1: Tool群の統合(30分)

タスク:

  1. DataFetchTool, ForecastTool, AnomalyDetectionTool, OrderingTool, TrendAnalysisToolを統合する
  2. 各Toolのインターフェースを統一する(入力
    , 出力
  3. エラーハンドリングを追加する(データ不足、不正なパラメータ)
  4. 各Toolの単体テストを実装する
解答例
from langchain_core.tools import tool
import traceback

class ToolRegistry:
    """Tool群を管理するレジストリ"""

    def __init__(self):
        self.tools = {
            'forecast': forecast_tool,
            'anomaly': anomaly_detection_tool,
            'ordering': ordering_tool,
            'trend': trend_analysis_tool,
        }

    def get_tool(self, name: str):
        if name not in self.tools:
            raise ValueError(f"Unknown tool: {name}. Available: {list(self.tools.keys())}")
        return self.tools[name]

    def execute(self, name: str, **kwargs) -> dict:
        try:
            tool = self.get_tool(name)
            result = tool.invoke(kwargs)
            return {'status': 'success', 'result': result}
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e),
                'traceback': traceback.format_exc()
            }

# 単体テスト
def test_tools():
    registry = ToolRegistry()

    # ForecastToolのテスト
    result = registry.execute('forecast', family='GROCERY I', horizon=7)
    assert result['status'] == 'success'
    assert len(result['result']['predictions']) == 7

    # AnomalyToolのテスト
    result = registry.execute('anomaly', family='GROCERY I')
    assert result['status'] == 'success'
    assert 'anomaly_count' in result['result']

    # OrderingToolのテスト
    result = registry.execute('ordering', family='GROCERY I', current_stock=1000)
    assert result['status'] == 'success'
    assert 'recommended_order_quantity' in result['result']

    # TrendToolのテスト
    result = registry.execute('trend', family='GROCERY I')
    assert result['status'] == 'success'
    assert 'growth_rate' in result['result']

    # エラーケースのテスト
    result = registry.execute('forecast', family='NONEXISTENT')
    assert result['status'] == 'error' or result['result']['predictions'] == []

    print("All tests passed!")

test_tools()

Mission 2: LangGraphワークフローの実装(30分)

タスク:

  1. StateGraphでエージェントのワークフローを定義する
  2. クエリ解析ノードを実装する
  3. ルーティングノードを実装する
  4. 応答生成ノードを実装する
  5. ワークフロー全体を結合し動作確認する
解答例
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
import json

class DemandAgentState(TypedDict):
    messages: Annotated[list, operator.add]
    query_type: str
    parameters: dict
    tool_result: dict
    response: str

def parse_query_node(state: DemandAgentState) -> dict:
    """ユーザーのクエリを解析"""
    user_message = state['messages'][-1]

    # LLMでクエリを解析
    prompt = f"""以下の質問を分析し、JSON形式で返してください:
    質問: {user_message}

    {{
      "query_type": "forecast|anomaly|ordering|trend",
      "family": "カテゴリ名 or null",
      "store_nbr": "店舗番号 or null",
      "horizon": "予測日数 or 7",
      "current_stock": "現在在庫 or null"
    }}"""

    # LLM呼び出し(簡略化)
    parsed = llm.invoke(prompt)
    params = json.loads(parsed.content)

    return {
        'query_type': params.get('query_type', 'forecast'),
        'parameters': params
    }

def route_node(state: DemandAgentState) -> str:
    """クエリタイプに基づいてルーティング"""
    return state['query_type']

def forecast_node(state: DemandAgentState) -> dict:
    """予測を実行"""
    params = state['parameters']
    result = forecast_tool.invoke({
        'family': params.get('family', 'GROCERY I'),
        'store_nbr': params.get('store_nbr'),
        'horizon': params.get('horizon', 7),
    })
    return {'tool_result': result}

def generate_response_node(state: DemandAgentState) -> dict:
    """結果を自然言語に変換"""
    result = state['tool_result']
    query_type = state['query_type']

    prompt = f"""以下の分析結果を、物流担当者に分かりやすい日本語で説明してください。
    分析種別: {query_type}
    結果: {json.dumps(result, ensure_ascii=False, indent=2)}

    箇条書きで要点をまとめ、推奨アクションも提示してください。"""

    response = llm.invoke(prompt)
    return {'response': response.content}

# ワークフロー構築
workflow = StateGraph(DemandAgentState)
workflow.add_node("parse", parse_query_node)
workflow.add_node("forecast", forecast_node)
workflow.add_node("anomaly", anomaly_node)
workflow.add_node("ordering", ordering_node)
workflow.add_node("trend", trend_node)
workflow.add_node("respond", generate_response_node)

workflow.set_entry_point("parse")
workflow.add_conditional_edges("parse", route_node, {
    "forecast": "forecast",
    "anomaly": "anomaly",
    "ordering": "ordering",
    "trend": "trend",
})
for n in ["forecast", "anomaly", "ordering", "trend"]:
    workflow.add_edge(n, "respond")
workflow.add_edge("respond", END)

agent = workflow.compile()

Mission 3: エージェントのテスト(30分)

タスク:

  1. 以下のシナリオでエージェントをテストする
  2. 各応答の品質を評価する
  3. エッジケースでの挙動を確認する

テストシナリオ:

#質問期待される動作
1「来週のGROCERY Iの売上予測を教えて」7日間の日次予測を返す
2「BEVERAGESに異常な動きはある?」異常検知結果を返す
3「店舗3のPRODUCEの発注量を計算して。在庫は500個」推奨発注量を返す
4「DAIRY のトレンドはどうなっている?」成長率と季節パターンを返す
5「すべてのカテゴリの欠品リスクを確認して」全カテゴリのリスク評価(エッジケース)
解答例
test_queries = [
    "来週のGROCERY Iの売上予測を教えて",
    "BEVERAGESに異常な動きはある?",
    "店舗3のPRODUCEの発注量を計算して。在庫は500個",
    "DAIRYのトレンドはどうなっている?",
    "すべてのカテゴリの欠品リスクを確認して",
]

for query in test_queries:
    print(f"\n{'='*60}")
    print(f"質問: {query}")
    print(f"{'='*60}")

    result = agent.invoke({
        'messages': [query],
        'query_type': '',
        'parameters': {},
        'tool_result': {},
        'response': ''
    })

    print(f"\n応答:")
    print(result['response'])
    print(f"\nクエリタイプ: {result['query_type']}")

Mission 4: アラートシステムの統合(30分)

タスク:

  1. AlertEngineをエージェントに統合する
  2. 定期チェック機能を追加する(全カテゴリ巡回)
  3. 日次レポートの自動生成機能を実装する
  4. アラートの優先度に基づくソート・フィルタリングを実装する
解答例
class DemandForecastAgent:
    """統合された需要予測AIエージェント"""

    def __init__(self):
        self.tool_registry = ToolRegistry()
        self.alert_engine = AlertEngine()
        self.workflow = self._build_workflow()

    def chat(self, query: str) -> str:
        """チャットインターフェース"""
        result = self.workflow.invoke({
            'messages': [query],
            'query_type': '', 'parameters': {},
            'tool_result': {}, 'response': ''
        })
        return result['response']

    def daily_check(self, families: list[str] = None) -> str:
        """日次の全カテゴリチェック"""
        if families is None:
            families = ['GROCERY I', 'BEVERAGES', 'PRODUCE', 'CLEANING', 'DAIRY']

        all_alerts = []
        for family in families:
            # 異常検知
            anomaly_result = self.tool_registry.execute('anomaly', family=family)
            if anomaly_result['status'] == 'success':
                for anomaly in anomaly_result['result'].get('anomalies', []):
                    alert = self.alert_engine.check_demand_anomaly(
                        actual=anomaly['actual_sales'],
                        predicted=anomaly['expected_sales'],
                        family=family
                    )
                    if alert:
                        all_alerts.append(alert)

        # 日次レポート生成
        return generate_daily_report(self.alert_engine)

# 使用例
agent = DemandForecastAgent()

# チャット
print(agent.chat("GROCERY Iの来週の需要予測は?"))

# 日次チェック
print(agent.daily_check())

達成度チェック

  • Tool群を統合し単体テストをパスした
  • LangGraphワークフローを構築し動作確認した
  • 5つのテストシナリオで正常に応答した
  • AlertEngineを統合し日次チェック機能を実装した
  • エッジケースでのエラーハンドリングを確認した

推定所要時間: 120分