A2A (Agent-to-Agent)
Cache inter-agent communication payloads with the framework-agnostic A2A adapter. It wraps any callable that takes a payload and returns a response.
pip install a2a-sdk omnicache-ai
7.1 Basic process() call
Cache the result of a handler function so repeated calls with the same payload skip the expensive downstream work.
from omnicache_ai import CacheManager, InMemoryBackend, CacheKeyBuilder
from omnicache_ai.adapters.a2a_adapter import A2ACacheAdapter
manager = CacheManager(
backend=InMemoryBackend(),
key_builder=CacheKeyBuilder(namespace="a2a"),
)
adapter = A2ACacheAdapter(manager, agent_id="planner")
def planner_handler(task: dict) -> dict:
# Expensive downstream call
return {"plan": f"Steps to solve: {task['objective']}"}
payload = {"objective": "Build a recommendation system"}
result1 = adapter.process(planner_handler, payload) # real call
result2 = adapter.process(planner_handler, payload) # cache hit
assert result1 == result2
print(result1)
7.2 @wrap decorator
Use the @adapter.wrap decorator for a cleaner syntax.
from omnicache_ai import CacheManager, InMemoryBackend, CacheKeyBuilder
from omnicache_ai.adapters.a2a_adapter import A2ACacheAdapter
manager = CacheManager(backend=InMemoryBackend(), key_builder=CacheKeyBuilder())
adapter = A2ACacheAdapter(manager, agent_id="summarizer")
@adapter.wrap
def summarize(payload: dict) -> dict:
text = payload["text"]
# ... call LLM ...
return {"summary": text[:100] + "..."}
payload = {"text": "Long document about vector databases..."}
print(summarize(payload)) # first call
print(summarize(payload)) # cache hit
7.3 Async aprocess()
Use aprocess() for async handler functions.
import asyncio
from omnicache_ai import CacheManager, InMemoryBackend, CacheKeyBuilder
from omnicache_ai.adapters.a2a_adapter import A2ACacheAdapter
manager = CacheManager(backend=InMemoryBackend(), key_builder=CacheKeyBuilder())
adapter = A2ACacheAdapter(manager, agent_id="executor")
async def async_executor(payload: dict) -> dict:
# Simulate async downstream agent call
await asyncio.sleep(0.1)
return {"result": f"executed: {payload['task']}"}
async def main():
task = {"task": "analyze sentiment of customer reviews"}
r1 = await adapter.aprocess(async_executor, task)
r2 = await adapter.aprocess(async_executor, task) # instant
assert r1 == r2
print(r1)
asyncio.run(main())
7.4 A2A with a2a-sdk 0.3.x TaskHandler
Integrate with the a2a-sdk AgentExecutor interface for standards-compliant agent services.
# pip install a2a-sdk>=0.3
import asyncio
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import Task, TaskState
from omnicache_ai import CacheManager, DiskBackend, CacheKeyBuilder
from omnicache_ai.adapters.a2a_adapter import A2ACacheAdapter
manager = CacheManager(
backend=DiskBackend(directory="/var/cache/a2a"),
key_builder=CacheKeyBuilder(namespace="a2a"),
)
adapter = A2ACacheAdapter(manager, agent_id="my-agent")
class CachedAgentExecutor(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
task = context.current_task
payload = {"input": task.history[-1].parts[0].root.text if task.history else ""}
async def _run(p: dict) -> dict:
# Replace with actual model call
return {"output": f"Answer to: {p['input']}"}
result = await adapter.aprocess(_run, payload)
await event_queue.enqueue_event(
Task(id=task.id, contextId=task.contextId, state=TaskState.completed)
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
pass
7.5 Multi-agent pipeline with A2A caching
Chain multiple A2A-cached agents into a pipeline. On the second run every step is served from cache.
import asyncio
from omnicache_ai import CacheManager, InMemoryBackend, CacheKeyBuilder, TTLPolicy
from omnicache_ai.adapters.a2a_adapter import A2ACacheAdapter
manager = CacheManager(
backend=InMemoryBackend(),
key_builder=CacheKeyBuilder(namespace="a2a"),
ttl_policy=TTLPolicy(default_ttl=1800),
)
planner = A2ACacheAdapter(manager, agent_id="planner")
executor = A2ACacheAdapter(manager, agent_id="executor")
reviewer = A2ACacheAdapter(manager, agent_id="reviewer")
async def plan(payload): return {"plan": f"plan for {payload['goal']}"}
async def execute(payload): return {"result": f"done: {payload['plan']}"}
async def review(payload): return {"review": f"looks good: {payload['result']}"}
async def pipeline(goal: str) -> dict:
step1 = await planner.aprocess(plan, {"goal": goal})
step2 = await executor.aprocess(execute, step1)
step3 = await reviewer.aprocess(review, step2)
return step3
asyncio.run(pipeline("Build a chatbot"))
asyncio.run(pipeline("Build a chatbot")) # all three steps served from cache