LangGraphCacheAdapter
Persistent checkpoint storage for LangGraph state graphs. LangGraphCacheAdapter implements BaseCheckpointSaver, allowing you to persist and restore graph state across runs using any OmniCache-AI backend. Compatible with both LangGraph 0.x and 1.x.
Overview
LangGraph uses a checkpointer to save and restore the state of compiled state graphs between invocations. By implementing BaseCheckpointSaver, LangGraphCacheAdapter stores checkpoint data in OmniCache-AI's backend (in-memory, disk, or Redis), enabling persistent multi-turn conversations, state recovery, and time-travel debugging.
The adapter auto-detects the LangGraph version at import time and adapts its behavior accordingly:
- LangGraph 0.x -- Uses
get(),put(), andlist(). - LangGraph 1.x -- Uses
get_tuple(),put(),put_writes(),list(), andget_next_version().
When to use:
- You are building LangGraph state graphs and need checkpoint persistence.
- You want to share checkpoint storage across multiple graph instances.
- You want to use the same OmniCache-AI backend for both LLM caching and graph checkpointing.
Installation
pip install 'omnicache-ai[langgraph]'
This installs langgraph >= 0.1 and langchain-core >= 0.2 alongside omnicache-ai.
Usage
- LangGraph 0.x
from langgraph.graph import StateGraph, END
from omnicache_ai import CacheManager, InMemoryBackend, CacheKeyBuilder
from omnicache_ai.adapters.langgraph_adapter import LangGraphCacheAdapter
# Set up OmniCache-AI
manager = CacheManager(
backend=InMemoryBackend(),
key_builder=CacheKeyBuilder(namespace="myapp"),
)
saver = LangGraphCacheAdapter(manager)
# Define a simple graph
def agent_node(state):
return {"messages": state["messages"] + ["Hello!"]}
graph = StateGraph(dict)
graph.add_node("agent", agent_node)
graph.set_entry_point("agent")
graph.add_edge("agent", END)
app = graph.compile(checkpointer=saver)
# Invoke with a thread ID for persistent state
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": ["Hi"]}, config=config)
# State is checkpointed -- restore on next invocation
result = app.invoke({"messages": ["What did I say?"]}, config=config)
- LangGraph 1.x
from langgraph.graph import StateGraph, END
from omnicache_ai import CacheManager, InMemoryBackend, CacheKeyBuilder
from omnicache_ai.adapters.langgraph_adapter import LangGraphCacheAdapter
# Same API -- the adapter detects the version automatically
manager = CacheManager(
backend=InMemoryBackend(),
key_builder=CacheKeyBuilder(namespace="myapp"),
)
saver = LangGraphCacheAdapter(manager)
graph = StateGraph(dict)
graph.add_node("agent", agent_node)
graph.set_entry_point("agent")
graph.add_edge("agent", END)
app = graph.compile(checkpointer=saver)
config = {"configurable": {"thread_id": "user-456"}}
result = app.invoke({"messages": ["Hi"]}, config=config)
Multi-Thread Conversations
Each thread_id maintains its own checkpoint history:
# Thread 1
config_1 = {"configurable": {"thread_id": "alice"}}
app.invoke({"messages": ["I like Python"]}, config=config_1)
# Thread 2 (independent state)
config_2 = {"configurable": {"thread_id": "bob"}}
app.invoke({"messages": ["I like Rust"]}, config=config_2)
# Thread 1 remembers its own state
app.invoke({"messages": ["What do I like?"]}, config=config_1)
Listing Checkpoints
Iterate over saved checkpoints for a thread, most recent first:
config = {"configurable": {"thread_id": "alice"}}
for checkpoint_tuple in saver.list(config, limit=5):
print(checkpoint_tuple.checkpoint)
print(checkpoint_tuple.metadata)
Redis-Backed Persistence
Use a Redis backend for cross-process or cross-server checkpoint sharing:
from omnicache_ai.backends.redis_backend import RedisBackend
manager = CacheManager(
backend=RedisBackend(url="redis://localhost:6379/0"),
key_builder=CacheKeyBuilder(namespace="prod"),
)
saver = LangGraphCacheAdapter(manager)
Async Usage
The adapter provides async shims that delegate to the synchronous implementations:
# In an async context
tuple_result = await saver.aget_tuple(config)
new_config = await saver.aput(config, checkpoint, metadata)
async for item in saver.alist(config, limit=10):
print(item)
API Reference
LangGraphCacheAdapter
Subclasses langgraph.checkpoint.base.BaseCheckpointSaver.
Constructor:
| Parameter | Type | Default | Description |
|---|---|---|---|
cache_manager | CacheManager | (required) | The OmniCache-AI cache manager instance |
LangGraph 0.x Methods:
| Method | Signature | Description |
|---|---|---|
get | (config: dict) -> Any | None | Return the raw checkpoint dict for the given config. Returns None if no checkpoint exists. |
put | (config, checkpoint, metadata, new_versions=None) -> dict | Store a checkpoint and return the updated config with checkpoint_id filled in. |
list | (config, *, filter=None, before=None, limit=None) -> Iterator[CheckpointTuple] | Yield CheckpointTuple objects for the thread, most recent first. |
LangGraph 1.x Methods:
| Method | Signature | Description |
|---|---|---|
get_tuple | (config: dict) -> CheckpointTuple | None | Return a CheckpointTuple for the given config. If no checkpoint_id is specified, returns the latest checkpoint for the thread. |
put | (config, checkpoint, metadata, new_versions=None) -> dict | Store a checkpoint. Accepts the new_versions parameter (ChannelVersions) required by LangGraph 1.x. |
put_writes | (config, writes: list[tuple[str, Any]], task_id: str) -> None | Store intermediate writes for a checkpoint. |
get_next_version | (current: Any, channel: Any) -> int | str | Return the next channel version. Returns 1 for None, increments integers, converts strings to int and increments. |
Async Shims:
| Method | Signature | Description |
|---|---|---|
aget_tuple | (config) -> CheckpointTuple | None | Async wrapper around get_tuple(). |
aput | (config, checkpoint, metadata, new_versions=None) -> dict | Async wrapper around put(). |
aput_writes | (config, writes, task_id) -> None | Async wrapper around put_writes(). |
alist | (config, *, filter=None, before=None, limit=None) -> AsyncIterator | Async generator that yields from list(). |
Config Structure:
The config dict must contain a configurable key with the following optional fields:
| Field | Type | Description |
|---|---|---|
thread_id | str | Thread identifier (default: "default") |
checkpoint_ns | str | Checkpoint namespace (default: "") |
checkpoint_id | str | Specific checkpoint ID (default: "" -- uses latest) |
When checkpoint_id is not specified in the config, get_tuple() automatically loads the most recent checkpoint for the given thread_id. The put() method generates a checkpoint_id from time.monotonic_ns() if one is not provided.
The async methods (aget_tuple, aput, alist, aput_writes) currently delegate to the synchronous implementations. For true async I/O with non-blocking backends, you would need to implement a fully async adapter.
How It Works
Checkpoints are stored as pickled dicts containing checkpoint, metadata, parent_config, pending_writes, and new_versions. A separate thread index (a pickled list of checkpoint IDs) is maintained per thread_id to enable list() and latest-checkpoint resolution.