LESSON 30分

ストーリー

田中VPoE
Human-in-the-Loopで中断・再開の仕組みを学んだが、あの仕組みが動くのは「ワークフローの状態が保存されている」からだ
あなた
中断中にサーバーが再起動しても大丈夫なんですか?
田中VPoE
チェックポインター(Checkpointer)が状態を永続化しているから大丈夫だ。メモリ上だけでなく、データベースに保存することで、サーバー再起動や障害からの復旧にも対応できる
あなた
会話履歴やツールの実行結果も保存されるんですね
田中VPoE
その通り。エージェントの「記憶」を管理することは、長期的に安定したエージェントを運用する上で不可欠だ

チェックポイントの仕組み

基本概念

チェックポイントは、ワークフローの各ノード実行後にStateのスナップショットを保存する仕組みです。

ワークフロー実行とチェックポイント:

[Node A] → Checkpoint 1 → [Node B] → Checkpoint 2 → [Node C]

                                    ここで中断しても
                                    Checkpoint 2から再開可能

Checkpointerの種類

Checkpointer保存先適するケース
MemorySaverメモリ内開発・テスト用
SqliteSaverSQLiteファイル小規模運用
PostgresSaverPostgreSQL本番運用
RedisSaverRedis高速アクセスが必要な場合

実装例

# 開発用: メモリ内チェックポイント
from langgraph.checkpoint.memory import MemorySaver
memory_checkpointer = MemorySaver()

# 本番用: PostgreSQLチェックポイント
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg

DB_URI = "postgresql://user:pass@localhost:5432/agent_db"
with psycopg.Connection.connect(DB_URI) as conn:
    postgres_checkpointer = PostgresSaver(conn)
    postgres_checkpointer.setup()  # テーブル自動作成

# グラフのコンパイル時にCheckpointerを指定
app = workflow.compile(checkpointer=postgres_checkpointer)

スレッド管理

thread_idによる会話の分離

複数のユーザーやセッションの状態を個別に管理するために、thread_id を使用します。

# ユーザーAの会話
config_a = {"configurable": {"thread_id": "user-A-session-1"}}
result_a = app.invoke(
    {"messages": [HumanMessage(content="注文を確認したい")]},
    config_a
)

# ユーザーBの会話(独立した状態)
config_b = {"configurable": {"thread_id": "user-B-session-1"}}
result_b = app.invoke(
    {"messages": [HumanMessage(content="返品したい")]},
    config_b
)

# ユーザーAの会話を継続(前の状態が復元される)
result_a2 = app.invoke(
    {"messages": [HumanMessage(content="注文番号はORD-12345です")]},
    config_a
)

状態の確認と復元

# 現在の状態を確認
state = app.get_state(config_a)
print(state.values)  # 現在のState
print(state.next)    # 次に実行されるノード

# 状態履歴の取得
for snapshot in app.get_state_history(config_a):
    print(f"Step: {snapshot.metadata.get('step')}")
    print(f"Node: {snapshot.metadata.get('source')}")
    print(f"Messages: {len(snapshot.values.get('messages', []))}")
    print("---")

メモリ管理

短期メモリと長期メモリ

エージェントのメモリは用途に応じて使い分けます。

メモリ種別保持期間用途実装例
短期メモリセッション中会話コンテキスト、中間結果LangGraphのState
長期メモリセッション跨ぎユーザー情報、過去の対応履歴外部DB、ベクトルDB

会話履歴の管理

会話が長くなるとトークン数が増大します。適切な履歴管理が必要です。

from langchain_core.messages import trim_messages

class ManagedState(TypedDict):
    messages: Annotated[list, add_messages]
    summary: str | None  # 過去の会話のサマリー

def manage_memory(state: ManagedState) -> ManagedState:
    """メッセージ数が閾値を超えたらサマリー化"""
    messages = state["messages"]

    if len(messages) > 20:
        # 古いメッセージをサマリー化
        old_messages = messages[:-10]
        summary_prompt = f"以下の会話を簡潔に要約してください:\n{old_messages}"
        summary = llm.invoke(summary_prompt).content

        # 新しいメッセージのみ保持
        return {
            "messages": messages[-10:],
            "summary": summary
        }

    return state

# トークン数ベースのトリミング
trimmed = trim_messages(
    messages,
    max_tokens=4000,
    strategy="last",  # 最新のメッセージを優先
    token_counter=llm,
    include_system=True  # システムメッセージは常に保持
)

長期メモリの活用

class LongTermMemory:
    """ユーザーごとの長期メモリ"""

    def __init__(self, db_connection):
        self.db = db_connection

    async def get_user_context(self, customer_id: str) -> dict:
        """過去の対応履歴やユーザーの好みを取得"""
        return await self.db.query(
            "SELECT * FROM customer_interactions WHERE customer_id = ? ORDER BY created_at DESC LIMIT 10",
            [customer_id]
        )

    async def save_interaction(self, customer_id: str, summary: str, resolution: str):
        """対応結果を長期メモリに保存"""
        await self.db.execute(
            "INSERT INTO customer_interactions (customer_id, summary, resolution, created_at) VALUES (?, ?, ?, NOW())",
            [customer_id, summary, resolution]
        )

障害復旧

チェックポイントからの自動復旧

async def resilient_invoke(app, input_data, config, max_retries=3):
    """障害発生時にチェックポイントから自動復旧"""
    for attempt in range(max_retries):
        try:
            # 既存の状態があればそこから再開
            current_state = app.get_state(config)
            if current_state.next:
                # 中断されたワークフローを再開
                result = app.invoke(None, config)
            else:
                # 新規実行
                result = app.invoke(input_data, config)
            return result

        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)

まとめ

ポイント内容
チェックポイント各ノード実行後にStateを永続化
thread_idユーザー/セッション単位で状態を分離管理
メモリ管理短期メモリ(セッション内)と長期メモリ(セッション跨ぎ)を使い分け
障害復旧チェックポイントからワークフローを自動再開

チェックリスト

  • チェックポイントの仕組みと各種Checkpointerの使い分けを理解した
  • thread_idによるマルチセッション管理を理解した
  • 短期/長期メモリの設計方法を把握した
  • チェックポイントを活用した障害復旧の実装を理解した

次のステップへ

次は「演習:LangGraphワークフローを構築しよう」です。学んだ知識を活用して、NetShop社のカスタマーサポートエージェントのワークフローを構築します。


推定読了時間: 30分