Merge pull request #2474 from Molecule-AI/feat/chat-history-mcp-tool

feat(a2a-mcp): add chat_history tool for prior turns with a peer
This commit is contained in:
Hongming Wang 2026-05-02 01:27:38 +00:00 committed by GitHub
commit 82beb98fff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 299 additions and 0 deletions

View File

@ -26,6 +26,7 @@ from typing import Callable
import inbox
from a2a_tools import (
tool_chat_history,
tool_check_task_status,
tool_commit_memory,
tool_delegate_task,
@ -135,6 +136,12 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
return await tool_inbox_pop(
arguments.get("activity_id", ""),
)
elif name == "chat_history":
return await tool_chat_history(
arguments.get("peer_id", ""),
arguments.get("limit", 20),
arguments.get("before_ts", ""),
)
return f"Unknown tool: {name}"

View File

@ -554,6 +554,85 @@ _INBOX_NOT_ENABLED_MSG = (
)
async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "") -> str:
"""Fetch the prior conversation with one peer.
Hits ``/workspaces/<self>/activity?peer_id=<peer>&limit=<N>``
against the workspace-server, which returns activity rows where
this workspace is either the sender (``source_id=peer``) or the
recipient (``target_id=peer``) of an A2A turn both sides of the
conversation in chronological order.
Args:
peer_id: The other workspace's UUID. Same value the agent
sees as ``peer_id`` on a peer_agent push or ``workspace_id``
on a delegate_task call.
limit: Maximum rows to return; capped server-side at 500. The
default of 20 covers \"most recent context for this peer\"
without flooding the agent's context window.
before_ts: Optional RFC3339 timestamp; only rows strictly
older are returned. Used to page backward through long
histories pass the oldest ``ts`` from the previous
response. Empty (default) returns the most recent ``limit``
rows.
Returns a JSON-encoded list of activity rows (or an error string
starting with ``Error:`` so the agent can branch). Each row carries
``activity_type``, ``source_id``, ``target_id``, ``method``,
``summary``, ``request_body``, ``response_body``, ``status``,
``created_at`` same shape ``inbox_peek`` and the canvas chat
loader already see.
"""
if not peer_id or not isinstance(peer_id, str):
return "Error: peer_id is required"
if not isinstance(limit, int) or limit <= 0:
limit = 20
if limit > 500:
limit = 500
params: dict[str, str] = {
"peer_id": peer_id,
"limit": str(limit),
}
# Forward verbatim — the server route validates as RFC3339 at the
# trust boundary and translates into a `created_at < $X` clause.
if before_ts:
params["before_ts"] = before_ts
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity",
params=params,
headers=_auth_headers_for_heartbeat(),
)
except Exception as exc: # noqa: BLE001
return f"Error: chat_history request failed: {exc}"
if resp.status_code == 400:
# Trust-boundary rejection (malformed peer_id, etc.) — surface
# the server's reason verbatim so the agent can correct itself.
try:
err = resp.json().get("error", "bad request")
except Exception: # noqa: BLE001
err = "bad request"
return f"Error: {err}"
if resp.status_code >= 400:
return f"Error: chat_history returned HTTP {resp.status_code}"
try:
rows = resp.json()
except Exception: # noqa: BLE001
return "Error: chat_history response was not JSON"
if not isinstance(rows, list):
return "Error: chat_history response was not a list"
# Server returns DESC (most recent first); reverse to chronological
# so the agent reads the conversation top-down like a chat log.
rows.reverse()
return json.dumps(rows)
async def tool_inbox_peek(limit: int = 10) -> str:
"""Return up to ``limit`` pending inbound messages without removing them."""
import inbox # local import — avoids a circular dep at module load

View File

@ -342,6 +342,14 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = {
"wait_for_message": None,
"inbox_peek": None,
"inbox_pop": None,
# `chat_history` is reachable from the CLI runtime in principle
# (it's just an HTTP GET) but the standard CLI doesn't expose a
# subcommand for it today — the in-container CLI runtimes drive
# via a2a_cli's delegate / status / peers verbs, and chat-history
# browsing is a wheel-side standalone-runtime use case. Mapped
# to None here for adapter consistency; flip to a keyword if the
# a2a_cli grows a `history` subcommand in the future.
"chat_history": None,
}

View File

@ -51,6 +51,7 @@ from dataclasses import dataclass
from typing import Any, Literal
from a2a_tools import (
tool_chat_history,
tool_check_task_status,
tool_commit_memory,
tool_delegate_task,
@ -363,6 +364,54 @@ _INBOX_PEEK = ToolSpec(
section=A2A_SECTION,
)
_CHAT_HISTORY = ToolSpec(
name="chat_history",
short="Fetch the prior conversation with one peer (both sides, chronological).",
when_to_use=(
"Call this when a peer_agent push lands and you need context "
"from prior turns with that workspace — e.g. \"what task did "
"this peer assign me last hour?\" or \"what did I tell them?\". "
"Both sides of the conversation appear in chronological order, "
"so the agent reads the log top-down. Cheaper than re-deriving "
"context from memory because the platform already audits every "
"A2A turn into activity_logs. Pair with `agent_card_url` from "
"the channel envelope when you also need the peer's "
"capabilities."
),
input_schema={
"type": "object",
"properties": {
"peer_id": {
"type": "string",
"description": (
"The peer workspace's UUID — same value you got "
"as `peer_id` on the inbound push, or as "
"`workspace_id` from `list_peers`."
),
},
"limit": {
"type": "integer",
"description": (
"Max rows to return (default 20, capped at 500). "
"Default 20 covers \"most recent context\" without "
"flooding the conversation window."
),
},
"before_ts": {
"type": "string",
"description": (
"Optional RFC3339 timestamp; passes through to the "
"server for paging backward through long histories. "
"Use the oldest `created_at` from a previous response."
),
},
},
"required": ["peer_id"],
},
impl=tool_chat_history,
section=A2A_SECTION,
)
_INBOX_POP = ToolSpec(
name="inbox_pop",
short="Remove a handled message from the inbox queue by activity_id.",
@ -469,6 +518,7 @@ TOOLS: list[ToolSpec] = [
_WAIT_FOR_MESSAGE,
_INBOX_PEEK,
_INBOX_POP,
_CHAT_HISTORY,
# HMA
_COMMIT_MEMORY,
_RECALL_MEMORY,

View File

@ -9,6 +9,7 @@
- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses.
- **inbox_peek**: List pending inbound messages without removing them.
- **inbox_pop**: Remove a handled message from the inbox queue by activity_id.
- **chat_history**: Fetch the prior conversation with one peer (both sides, chronological).
### delegate_task
Use for QUICK questions and small sub-tasks where you can afford to wait inline. Returns the peer's response text directly. For longer-running work (research, multi-minute jobs) use delegate_task_async + check_task_status instead so you don't hold this workspace busy waiting.
@ -37,4 +38,7 @@ Standalone-runtime ONLY. Use to inspect what's queued before deciding which to h
### inbox_pop
Standalone-runtime ONLY. Call after you've replied to a message returned from wait_for_message or inbox_peek to drop it from the queue. Idempotent — popping a missing id reports removed=false without erroring.
### chat_history
Call this when a peer_agent push lands and you need context from prior turns with that workspace — e.g. "what task did this peer assign me last hour?" or "what did I tell them?". Both sides of the conversation appear in chronological order, so the agent reads the log top-down. Cheaper than re-deriving context from memory because the platform already audits every A2A turn into activity_logs. Pair with `agent_card_url` from the channel envelope when you also need the peer's capabilities.
Always use list_peers first to discover available workspace IDs. Access control is enforced — you can only reach siblings and parent/children. If a delegation returns a DELEGATION FAILED message, do NOT forward the raw error to the user. Instead: (1) try a different peer, (2) handle the task yourself, or (3) tell the user which peer is unavailable and provide your own best answer.

View File

@ -966,3 +966,154 @@ class TestToolRecallMemory:
mc.get.assert_not_called()
assert "Error" in result
assert "memory.read" in result
# ---------------------------------------------------------------------------
# tool_chat_history — wraps /workspaces/:id/activity?peer_id=X
# ---------------------------------------------------------------------------
#
# The tool fetches both sides of an A2A conversation with one peer for
# resume-context UX. Hits the new peer_id filter on the activity API
# (workspace-server PR #2472), reverses the DESC-ordered server response
# into chronological order, and returns the rows as JSON. Tests pin
# every distinct execution path so a regression in the server response
# shape, the validation, the sort direction, or the error envelope is
# caught at unit-test time instead of on a live workspace.
_PEER = "11111111-2222-3333-4444-555555555555"
class TestChatHistory:
async def test_rejects_empty_peer_id(self):
"""Empty peer_id: short-circuit before any HTTP call. Defense
in depth server also 400s on missing peer_id, but a clean
error message at the wheel side is friendlier to the agent."""
import a2a_tools
mc = _make_http_mock()
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_chat_history(peer_id="")
mc.get.assert_not_called()
assert result.startswith("Error:")
async def test_calls_activity_route_with_peer_id_filter(self):
"""peer_id is forwarded as a query param exactly. Limit
defaults to 20, before_ts is omitted when empty."""
import a2a_tools
mc = _make_http_mock(get_resp=_resp(200, []))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
await a2a_tools.tool_chat_history(peer_id=_PEER)
url, kwargs = mc.get.call_args.args[0], mc.get.call_args.kwargs
assert url.endswith("/activity")
params = kwargs["params"]
assert params["peer_id"] == _PEER
assert params["limit"] == "20"
assert "before_ts" not in params
async def test_caps_limit_at_500(self):
"""Server caps at 500; mirror the cap client-side so an
agent passing limit=999999 doesn't waste a round-trip on the
server's 400-or-truncate decision."""
import a2a_tools
mc = _make_http_mock(get_resp=_resp(200, []))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
await a2a_tools.tool_chat_history(peer_id=_PEER, limit=10000)
params = mc.get.call_args.kwargs["params"]
assert params["limit"] == "500"
async def test_negative_or_zero_limit_falls_to_default(self):
"""Defensive: limit=0 or negative reverts to 20 instead of
echoing a useless query that the server would reject."""
import a2a_tools
mc = _make_http_mock(get_resp=_resp(200, []))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
await a2a_tools.tool_chat_history(peer_id=_PEER, limit=0)
assert mc.get.call_args.kwargs["params"]["limit"] == "20"
async def test_passes_before_ts_when_set(self):
import a2a_tools
mc = _make_http_mock(get_resp=_resp(200, []))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
await a2a_tools.tool_chat_history(
peer_id=_PEER, before_ts="2026-05-01T00:00:00Z",
)
assert mc.get.call_args.kwargs["params"]["before_ts"] == "2026-05-01T00:00:00Z"
async def test_reverses_desc_response_to_chronological(self):
"""Server returns DESC (newest first); the wheel reverses to
chronological so the agent reads the chat top-down same
order a human would scrolling through canvas history."""
import a2a_tools
rows = [
{"id": "act-3", "created_at": "2026-05-01T00:03:00Z"},
{"id": "act-2", "created_at": "2026-05-01T00:02:00Z"},
{"id": "act-1", "created_at": "2026-05-01T00:01:00Z"},
]
mc = _make_http_mock(get_resp=_resp(200, rows))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
out = json.loads(result)
assert [r["id"] for r in out] == ["act-1", "act-2", "act-3"]
async def test_400_returns_server_error_verbatim(self):
"""Server-side trust-boundary rejection (e.g. malformed
peer_id): surface the server's error message verbatim so the
agent can correct itself instead of guessing why."""
import a2a_tools
mc = _make_http_mock(get_resp=_resp(400, {"error": "peer_id must be a UUID"}))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_chat_history(peer_id="bad")
assert "peer_id must be a UUID" in result
async def test_500_returns_generic_error(self):
"""Server 5xx: don't echo the body (might leak internals);
return a clean error string the agent can branch on."""
import a2a_tools
mc = _make_http_mock(get_resp=_resp(500, {"error": "internal"}))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
assert result.startswith("Error:")
assert "500" in result
async def test_network_failure_returns_error_envelope(self):
"""httpx raises (network down, DNS fail, etc.): tool must
not crash the MCP server return an error string so the
agent can retry or fall back."""
import a2a_tools
mc = _make_http_mock(get_exc=httpx.ConnectError("network down"))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
assert result.startswith("Error:")
assert "network down" in result
async def test_non_list_response_returns_error(self):
"""Server somehow returns a dict instead of a list (proxy
returns an HTML error page that JSON-parses, or a future
wire-shape change): defend against the type mismatch so the
json.loads on the agent side doesn't blow up."""
import a2a_tools
mc = _make_http_mock(get_resp=_resp(200, {"unexpected": "shape"}))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
assert result.startswith("Error:")