ストーリー
チェックポイントの仕組み
基本概念
チェックポイントは、ワークフローの各ノード実行後にStateのスナップショットを保存する仕組みです。
ワークフロー実行とチェックポイント:
[Node A] → Checkpoint 1 → [Node B] → Checkpoint 2 → [Node C]
↑
ここで中断しても
Checkpoint 2から再開可能
Checkpointerの種類
| Checkpointer | 保存先 | 適するケース |
|---|---|---|
| MemorySaver | メモリ内 | 開発・テスト用 |
| SqliteSaver | SQLiteファイル | 小規模運用 |
| PostgresSaver | PostgreSQL | 本番運用 |
| RedisSaver | Redis | 高速アクセスが必要な場合 |
実装例
# 開発用: メモリ内チェックポイント
from langgraph.checkpoint.memory import MemorySaver
memory_checkpointer = MemorySaver()
# 本番用: PostgreSQLチェックポイント
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg
DB_URI = "postgresql://user:pass@localhost:5432/agent_db"
with psycopg.Connection.connect(DB_URI) as conn:
postgres_checkpointer = PostgresSaver(conn)
postgres_checkpointer.setup() # テーブル自動作成
# グラフのコンパイル時にCheckpointerを指定
app = workflow.compile(checkpointer=postgres_checkpointer)
スレッド管理
thread_idによる会話の分離
複数のユーザーやセッションの状態を個別に管理するために、thread_id を使用します。
# ユーザーAの会話
config_a = {"configurable": {"thread_id": "user-A-session-1"}}
result_a = app.invoke(
{"messages": [HumanMessage(content="注文を確認したい")]},
config_a
)
# ユーザーBの会話(独立した状態)
config_b = {"configurable": {"thread_id": "user-B-session-1"}}
result_b = app.invoke(
{"messages": [HumanMessage(content="返品したい")]},
config_b
)
# ユーザーAの会話を継続(前の状態が復元される)
result_a2 = app.invoke(
{"messages": [HumanMessage(content="注文番号はORD-12345です")]},
config_a
)
状態の確認と復元
# 現在の状態を確認
state = app.get_state(config_a)
print(state.values) # 現在のState
print(state.next) # 次に実行されるノード
# 状態履歴の取得
for snapshot in app.get_state_history(config_a):
print(f"Step: {snapshot.metadata.get('step')}")
print(f"Node: {snapshot.metadata.get('source')}")
print(f"Messages: {len(snapshot.values.get('messages', []))}")
print("---")
メモリ管理
短期メモリと長期メモリ
エージェントのメモリは用途に応じて使い分けます。
| メモリ種別 | 保持期間 | 用途 | 実装例 |
|---|---|---|---|
| 短期メモリ | セッション中 | 会話コンテキスト、中間結果 | LangGraphのState |
| 長期メモリ | セッション跨ぎ | ユーザー情報、過去の対応履歴 | 外部DB、ベクトルDB |
会話履歴の管理
会話が長くなるとトークン数が増大します。適切な履歴管理が必要です。
from langchain_core.messages import trim_messages
class ManagedState(TypedDict):
messages: Annotated[list, add_messages]
summary: str | None # 過去の会話のサマリー
def manage_memory(state: ManagedState) -> ManagedState:
"""メッセージ数が閾値を超えたらサマリー化"""
messages = state["messages"]
if len(messages) > 20:
# 古いメッセージをサマリー化
old_messages = messages[:-10]
summary_prompt = f"以下の会話を簡潔に要約してください:\n{old_messages}"
summary = llm.invoke(summary_prompt).content
# 新しいメッセージのみ保持
return {
"messages": messages[-10:],
"summary": summary
}
return state
# トークン数ベースのトリミング
trimmed = trim_messages(
messages,
max_tokens=4000,
strategy="last", # 最新のメッセージを優先
token_counter=llm,
include_system=True # システムメッセージは常に保持
)
長期メモリの活用
class LongTermMemory:
"""ユーザーごとの長期メモリ"""
def __init__(self, db_connection):
self.db = db_connection
async def get_user_context(self, customer_id: str) -> dict:
"""過去の対応履歴やユーザーの好みを取得"""
return await self.db.query(
"SELECT * FROM customer_interactions WHERE customer_id = ? ORDER BY created_at DESC LIMIT 10",
[customer_id]
)
async def save_interaction(self, customer_id: str, summary: str, resolution: str):
"""対応結果を長期メモリに保存"""
await self.db.execute(
"INSERT INTO customer_interactions (customer_id, summary, resolution, created_at) VALUES (?, ?, ?, NOW())",
[customer_id, summary, resolution]
)
障害復旧
チェックポイントからの自動復旧
async def resilient_invoke(app, input_data, config, max_retries=3):
"""障害発生時にチェックポイントから自動復旧"""
for attempt in range(max_retries):
try:
# 既存の状態があればそこから再開
current_state = app.get_state(config)
if current_state.next:
# 中断されたワークフローを再開
result = app.invoke(None, config)
else:
# 新規実行
result = app.invoke(input_data, config)
return result
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
まとめ
| ポイント | 内容 |
|---|---|
| チェックポイント | 各ノード実行後にStateを永続化 |
| thread_id | ユーザー/セッション単位で状態を分離管理 |
| メモリ管理 | 短期メモリ(セッション内)と長期メモリ(セッション跨ぎ)を使い分け |
| 障害復旧 | チェックポイントからワークフローを自動再開 |
チェックリスト
- チェックポイントの仕組みと各種Checkpointerの使い分けを理解した
- thread_idによるマルチセッション管理を理解した
- 短期/長期メモリの設計方法を把握した
- チェックポイントを活用した障害復旧の実装を理解した
次のステップへ
次は「演習:LangGraphワークフローを構築しよう」です。学んだ知識を活用して、NetShop社のカスタマーサポートエージェントのワークフローを構築します。
推定読了時間: 30分