diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index fc7e4862..67e448d5 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -26,6 +26,7 @@ from typing import Callable import inbox from a2a_tools import ( + tool_chat_history, tool_check_task_status, tool_commit_memory, tool_delegate_task, @@ -135,6 +136,12 @@ async def handle_tool_call(name: str, arguments: dict) -> str: return await tool_inbox_pop( arguments.get("activity_id", ""), ) + elif name == "chat_history": + return await tool_chat_history( + arguments.get("peer_id", ""), + arguments.get("limit", 20), + arguments.get("before_ts", ""), + ) return f"Unknown tool: {name}" diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index a72b203c..cf855b61 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -554,6 +554,85 @@ _INBOX_NOT_ENABLED_MSG = ( ) +async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "") -> str: + """Fetch the prior conversation with one peer. + + Hits ``/workspaces//activity?peer_id=&limit=`` + against the workspace-server, which returns activity rows where + this workspace is either the sender (``source_id=peer``) or the + recipient (``target_id=peer``) of an A2A turn — both sides of the + conversation in chronological order. + + Args: + peer_id: The other workspace's UUID. Same value the agent + sees as ``peer_id`` on a peer_agent push or ``workspace_id`` + on a delegate_task call. + limit: Maximum rows to return; capped server-side at 500. The + default of 20 covers \"most recent context for this peer\" + without flooding the agent's context window. + before_ts: Optional RFC3339 timestamp; only rows strictly + older are returned. Used to page backward through long + histories — pass the oldest ``ts`` from the previous + response. Empty (default) returns the most recent ``limit`` + rows. + + Returns a JSON-encoded list of activity rows (or an error string + starting with ``Error:`` so the agent can branch). Each row carries + ``activity_type``, ``source_id``, ``target_id``, ``method``, + ``summary``, ``request_body``, ``response_body``, ``status``, + ``created_at`` — same shape ``inbox_peek`` and the canvas chat + loader already see. + """ + if not peer_id or not isinstance(peer_id, str): + return "Error: peer_id is required" + if not isinstance(limit, int) or limit <= 0: + limit = 20 + if limit > 500: + limit = 500 + + params: dict[str, str] = { + "peer_id": peer_id, + "limit": str(limit), + } + # Forward verbatim — the server route validates as RFC3339 at the + # trust boundary and translates into a `created_at < $X` clause. + if before_ts: + params["before_ts"] = before_ts + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get( + f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity", + params=params, + headers=_auth_headers_for_heartbeat(), + ) + except Exception as exc: # noqa: BLE001 + return f"Error: chat_history request failed: {exc}" + + if resp.status_code == 400: + # Trust-boundary rejection (malformed peer_id, etc.) — surface + # the server's reason verbatim so the agent can correct itself. + try: + err = resp.json().get("error", "bad request") + except Exception: # noqa: BLE001 + err = "bad request" + return f"Error: {err}" + if resp.status_code >= 400: + return f"Error: chat_history returned HTTP {resp.status_code}" + + try: + rows = resp.json() + except Exception: # noqa: BLE001 + return "Error: chat_history response was not JSON" + if not isinstance(rows, list): + return "Error: chat_history response was not a list" + + # Server returns DESC (most recent first); reverse to chronological + # so the agent reads the conversation top-down like a chat log. + rows.reverse() + return json.dumps(rows) + + async def tool_inbox_peek(limit: int = 10) -> str: """Return up to ``limit`` pending inbound messages without removing them.""" import inbox # local import — avoids a circular dep at module load diff --git a/workspace/executor_helpers.py b/workspace/executor_helpers.py index f3fa177c..e6d335e2 100644 --- a/workspace/executor_helpers.py +++ b/workspace/executor_helpers.py @@ -342,6 +342,14 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = { "wait_for_message": None, "inbox_peek": None, "inbox_pop": None, + # `chat_history` is reachable from the CLI runtime in principle + # (it's just an HTTP GET) but the standard CLI doesn't expose a + # subcommand for it today — the in-container CLI runtimes drive + # via a2a_cli's delegate / status / peers verbs, and chat-history + # browsing is a wheel-side standalone-runtime use case. Mapped + # to None here for adapter consistency; flip to a keyword if the + # a2a_cli grows a `history` subcommand in the future. + "chat_history": None, } diff --git a/workspace/platform_tools/registry.py b/workspace/platform_tools/registry.py index 8091bc8f..1c1de25b 100644 --- a/workspace/platform_tools/registry.py +++ b/workspace/platform_tools/registry.py @@ -51,6 +51,7 @@ from dataclasses import dataclass from typing import Any, Literal from a2a_tools import ( + tool_chat_history, tool_check_task_status, tool_commit_memory, tool_delegate_task, @@ -363,6 +364,54 @@ _INBOX_PEEK = ToolSpec( section=A2A_SECTION, ) +_CHAT_HISTORY = ToolSpec( + name="chat_history", + short="Fetch the prior conversation with one peer (both sides, chronological).", + when_to_use=( + "Call this when a peer_agent push lands and you need context " + "from prior turns with that workspace — e.g. \"what task did " + "this peer assign me last hour?\" or \"what did I tell them?\". " + "Both sides of the conversation appear in chronological order, " + "so the agent reads the log top-down. Cheaper than re-deriving " + "context from memory because the platform already audits every " + "A2A turn into activity_logs. Pair with `agent_card_url` from " + "the channel envelope when you also need the peer's " + "capabilities." + ), + input_schema={ + "type": "object", + "properties": { + "peer_id": { + "type": "string", + "description": ( + "The peer workspace's UUID — same value you got " + "as `peer_id` on the inbound push, or as " + "`workspace_id` from `list_peers`." + ), + }, + "limit": { + "type": "integer", + "description": ( + "Max rows to return (default 20, capped at 500). " + "Default 20 covers \"most recent context\" without " + "flooding the conversation window." + ), + }, + "before_ts": { + "type": "string", + "description": ( + "Optional RFC3339 timestamp; passes through to the " + "server for paging backward through long histories. " + "Use the oldest `created_at` from a previous response." + ), + }, + }, + "required": ["peer_id"], + }, + impl=tool_chat_history, + section=A2A_SECTION, +) + _INBOX_POP = ToolSpec( name="inbox_pop", short="Remove a handled message from the inbox queue by activity_id.", @@ -469,6 +518,7 @@ TOOLS: list[ToolSpec] = [ _WAIT_FOR_MESSAGE, _INBOX_PEEK, _INBOX_POP, + _CHAT_HISTORY, # HMA _COMMIT_MEMORY, _RECALL_MEMORY, diff --git a/workspace/tests/snapshots/a2a_instructions_mcp.txt b/workspace/tests/snapshots/a2a_instructions_mcp.txt index 35863cf4..8eacdb1c 100644 --- a/workspace/tests/snapshots/a2a_instructions_mcp.txt +++ b/workspace/tests/snapshots/a2a_instructions_mcp.txt @@ -9,6 +9,7 @@ - **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses. - **inbox_peek**: List pending inbound messages without removing them. - **inbox_pop**: Remove a handled message from the inbox queue by activity_id. +- **chat_history**: Fetch the prior conversation with one peer (both sides, chronological). ### delegate_task Use for QUICK questions and small sub-tasks where you can afford to wait inline. Returns the peer's response text directly. For longer-running work (research, multi-minute jobs) use delegate_task_async + check_task_status instead so you don't hold this workspace busy waiting. @@ -37,4 +38,7 @@ Standalone-runtime ONLY. Use to inspect what's queued before deciding which to h ### inbox_pop Standalone-runtime ONLY. Call after you've replied to a message returned from wait_for_message or inbox_peek to drop it from the queue. Idempotent — popping a missing id reports removed=false without erroring. +### chat_history +Call this when a peer_agent push lands and you need context from prior turns with that workspace — e.g. "what task did this peer assign me last hour?" or "what did I tell them?". Both sides of the conversation appear in chronological order, so the agent reads the log top-down. Cheaper than re-deriving context from memory because the platform already audits every A2A turn into activity_logs. Pair with `agent_card_url` from the channel envelope when you also need the peer's capabilities. + Always use list_peers first to discover available workspace IDs. Access control is enforced — you can only reach siblings and parent/children. If a delegation returns a DELEGATION FAILED message, do NOT forward the raw error to the user. Instead: (1) try a different peer, (2) handle the task yourself, or (3) tell the user which peer is unavailable and provide your own best answer. diff --git a/workspace/tests/test_a2a_tools_impl.py b/workspace/tests/test_a2a_tools_impl.py index a29cf738..1dd2fa14 100644 --- a/workspace/tests/test_a2a_tools_impl.py +++ b/workspace/tests/test_a2a_tools_impl.py @@ -966,3 +966,154 @@ class TestToolRecallMemory: mc.get.assert_not_called() assert "Error" in result assert "memory.read" in result + + +# --------------------------------------------------------------------------- +# tool_chat_history — wraps /workspaces/:id/activity?peer_id=X +# --------------------------------------------------------------------------- +# +# The tool fetches both sides of an A2A conversation with one peer for +# resume-context UX. Hits the new peer_id filter on the activity API +# (workspace-server PR #2472), reverses the DESC-ordered server response +# into chronological order, and returns the rows as JSON. Tests pin +# every distinct execution path so a regression in the server response +# shape, the validation, the sort direction, or the error envelope is +# caught at unit-test time instead of on a live workspace. + + +_PEER = "11111111-2222-3333-4444-555555555555" + + +class TestChatHistory: + + async def test_rejects_empty_peer_id(self): + """Empty peer_id: short-circuit before any HTTP call. Defense + in depth — server also 400s on missing peer_id, but a clean + error message at the wheel side is friendlier to the agent.""" + import a2a_tools + + mc = _make_http_mock() + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_chat_history(peer_id="") + + mc.get.assert_not_called() + assert result.startswith("Error:") + + async def test_calls_activity_route_with_peer_id_filter(self): + """peer_id is forwarded as a query param exactly. Limit + defaults to 20, before_ts is omitted when empty.""" + import a2a_tools + + mc = _make_http_mock(get_resp=_resp(200, [])) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + await a2a_tools.tool_chat_history(peer_id=_PEER) + + url, kwargs = mc.get.call_args.args[0], mc.get.call_args.kwargs + assert url.endswith("/activity") + params = kwargs["params"] + assert params["peer_id"] == _PEER + assert params["limit"] == "20" + assert "before_ts" not in params + + async def test_caps_limit_at_500(self): + """Server caps at 500; mirror the cap client-side so an + agent passing limit=999999 doesn't waste a round-trip on the + server's 400-or-truncate decision.""" + import a2a_tools + + mc = _make_http_mock(get_resp=_resp(200, [])) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + await a2a_tools.tool_chat_history(peer_id=_PEER, limit=10000) + + params = mc.get.call_args.kwargs["params"] + assert params["limit"] == "500" + + async def test_negative_or_zero_limit_falls_to_default(self): + """Defensive: limit=0 or negative reverts to 20 instead of + echoing a useless query that the server would reject.""" + import a2a_tools + + mc = _make_http_mock(get_resp=_resp(200, [])) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + await a2a_tools.tool_chat_history(peer_id=_PEER, limit=0) + + assert mc.get.call_args.kwargs["params"]["limit"] == "20" + + async def test_passes_before_ts_when_set(self): + import a2a_tools + + mc = _make_http_mock(get_resp=_resp(200, [])) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + await a2a_tools.tool_chat_history( + peer_id=_PEER, before_ts="2026-05-01T00:00:00Z", + ) + + assert mc.get.call_args.kwargs["params"]["before_ts"] == "2026-05-01T00:00:00Z" + + async def test_reverses_desc_response_to_chronological(self): + """Server returns DESC (newest first); the wheel reverses to + chronological so the agent reads the chat top-down — same + order a human would scrolling through canvas history.""" + import a2a_tools + + rows = [ + {"id": "act-3", "created_at": "2026-05-01T00:03:00Z"}, + {"id": "act-2", "created_at": "2026-05-01T00:02:00Z"}, + {"id": "act-1", "created_at": "2026-05-01T00:01:00Z"}, + ] + mc = _make_http_mock(get_resp=_resp(200, rows)) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_chat_history(peer_id=_PEER) + + out = json.loads(result) + assert [r["id"] for r in out] == ["act-1", "act-2", "act-3"] + + async def test_400_returns_server_error_verbatim(self): + """Server-side trust-boundary rejection (e.g. malformed + peer_id): surface the server's error message verbatim so the + agent can correct itself instead of guessing why.""" + import a2a_tools + + mc = _make_http_mock(get_resp=_resp(400, {"error": "peer_id must be a UUID"})) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_chat_history(peer_id="bad") + + assert "peer_id must be a UUID" in result + + async def test_500_returns_generic_error(self): + """Server 5xx: don't echo the body (might leak internals); + return a clean error string the agent can branch on.""" + import a2a_tools + + mc = _make_http_mock(get_resp=_resp(500, {"error": "internal"})) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_chat_history(peer_id=_PEER) + + assert result.startswith("Error:") + assert "500" in result + + async def test_network_failure_returns_error_envelope(self): + """httpx raises (network down, DNS fail, etc.): tool must + not crash the MCP server — return an error string so the + agent can retry or fall back.""" + import a2a_tools + + mc = _make_http_mock(get_exc=httpx.ConnectError("network down")) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_chat_history(peer_id=_PEER) + + assert result.startswith("Error:") + assert "network down" in result + + async def test_non_list_response_returns_error(self): + """Server somehow returns a dict instead of a list (proxy + returns an HTML error page that JSON-parses, or a future + wire-shape change): defend against the type mismatch so the + json.loads on the agent side doesn't blow up.""" + import a2a_tools + + mc = _make_http_mock(get_resp=_resp(200, {"unexpected": "shape"})) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_chat_history(peer_id=_PEER) + + assert result.startswith("Error:")