From 65b531acf6faefa20b0038d58f52dda04dfb1a9a Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 24 Apr 2026 19:52:08 -0700 Subject: [PATCH] fix(workspace): tag self-originated A2A POSTs with X-Workspace-ID MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workspace runtime fired four classes of A2A request to the platform without the X-Workspace-ID header that identifies the source workspace: heartbeat self-messages, initial_prompt, idle-loop fires, and peer-to-peer A2A from runtime tools. The platform's a2a_receive logger keys source_id off that header — without it, every such row was written with source_id=NULL, which the canvas's My Chat tab filters as ?source=canvas (i.e. "user typed this") and rendered the internal triggers as if the human user had sent them. The "Delegation results are ready..." heartbeat trigger was visible to end users in the chat history; delegate_task A2A calls between agents were misclassified the same way. Centralise the header construction in a new platform_auth helper self_source_headers(workspace_id) that returns auth_headers() PLUS {X-Workspace-ID: }. Apply it to: - heartbeat.py self-message (refactored from inline header dict) - main.py initial_prompt POST - main.py idle_prompt POST - a2a_client.py send_a2a_message (peer A2A from runtime) - builtin_tools/a2a_tools.py delegate_task (was missing ALL headers) Tests: - test_heartbeat.py asserts the X-Workspace-ID header is set on the self-message POST. - test_a2a_tools_module.py asserts the same on delegate_task POSTs; FakeClient.post mocks updated to accept the headers kwarg. Production effect lands the moment workspace containers are rebuilt with this code; existing rows in activity_logs keep their NULL source_id (legacy data). The canvas-side filter (#follow-up) covers the historical-rows case until backfill. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_client.py | 10 +++++++-- workspace/builtin_tools/a2a_tools.py | 7 ++++++- workspace/heartbeat.py | 7 +++++-- workspace/main.py | 20 +++++++++++++++--- workspace/platform_auth.py | 20 ++++++++++++++++++ workspace/tests/test_a2a_tools_module.py | 26 ++++++++++++++++-------- workspace/tests/test_heartbeat.py | 13 ++++++++++++ 7 files changed, 87 insertions(+), 16 deletions(-) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 1a3e4f76..514dc438 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -10,7 +10,7 @@ import uuid import httpx -from platform_auth import auth_headers +from platform_auth import auth_headers, self_source_headers logger = logging.getLogger(__name__) @@ -56,9 +56,15 @@ async def send_a2a_message(target_url: str, message: str) -> str: timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0) ) as client: try: + # self_source_headers() includes X-Workspace-ID so the + # platform's a2a_receive logger records source_id = + # WORKSPACE_ID. Otherwise peer-A2A messages — including + # the case where target_url resolves to this workspace's + # own /a2a — get logged with source_id=NULL and surface + # in the recipient's My Chat tab as user-typed input. resp = await client.post( target_url, - headers=auth_headers(), + headers=self_source_headers(WORKSPACE_ID), json={ "jsonrpc": "2.0", "id": str(uuid.uuid4()), diff --git a/workspace/builtin_tools/a2a_tools.py b/workspace/builtin_tools/a2a_tools.py index 8c7be15f..df4f9d78 100644 --- a/workspace/builtin_tools/a2a_tools.py +++ b/workspace/builtin_tools/a2a_tools.py @@ -42,10 +42,15 @@ async def delegate_task(workspace_id: str, task: str) -> str: except Exception as e: return f"Error discovering workspace: {e}" - # Send A2A message + # Send A2A message. X-Workspace-ID identifies us as the source — + # without it the platform's a2a_receive logger writes + # source_id=NULL and the recipient's My Chat tab renders the + # delegation as if a human user typed it. Same hazard fixed + # in heartbeat.py / a2a_client.py / main.py initial+idle flows. try: a2a_resp = await client.post( target_url, + headers={"X-Workspace-ID": WORKSPACE_ID}, json={ "jsonrpc": "2.0", "id": str(uuid.uuid4()), diff --git a/workspace/heartbeat.py b/workspace/heartbeat.py index 1eb5b4fd..7e8b1c69 100644 --- a/workspace/heartbeat.py +++ b/workspace/heartbeat.py @@ -17,7 +17,7 @@ from pathlib import Path import httpx -from platform_auth import auth_headers, refresh_cache +from platform_auth import auth_headers, refresh_cache, self_source_headers logger = logging.getLogger(__name__) @@ -284,6 +284,9 @@ class HeartbeatLoop: else: self._last_self_message_time = now try: + # self_source_headers() adds X-Workspace-ID so the + # platform tags this row source=agent, not canvas + # — see platform_auth.py for the full rationale. await client.post( f"{self.platform_url}/workspaces/{self.workspace_id}/a2a", json={ @@ -295,7 +298,7 @@ class HeartbeatLoop: }, }, }, - headers=auth_headers(), + headers=self_source_headers(self.workspace_id), timeout=120.0, ) logger.info("Heartbeat: self-message sent to process delegation results") diff --git a/workspace/main.py b/workspace/main.py index 5b6b4409..ab0b06f5 100644 --- a/workspace/main.py +++ b/workspace/main.py @@ -33,7 +33,7 @@ from initial_prompt import ( mark_initial_prompt_attempted, resolve_initial_prompt_marker, ) -from platform_auth import auth_headers +from platform_auth import auth_headers, self_source_headers def get_machine_ip() -> str: # pragma: no cover @@ -439,7 +439,15 @@ async def main(): # pragma: no cover # silently rejected once any workspace has a live token on # file. Without this, initial_prompt 401s in multi-tenant # mode exactly like /registry/register did in #215. - headers = {"Content-Type": "application/json", **auth_headers()} + # X-Workspace-ID via self_source_headers() so the platform + # tags the row source=agent — without it the canvas's + # My Chat tab renders the initial_prompt as if the user + # had typed it. See platform_auth.py for the full + # explanation. + headers = { + "Content-Type": "application/json", + **self_source_headers(workspace_id), + } # Retry with backoff — the platform proxy may not be able to # reach us yet (container networking takes a moment to settle). @@ -531,7 +539,13 @@ async def main(): # pragma: no cover # actual outcome instead of a bare "post failed" line. # #220: include auth_headers() on every idle fire. Without # this, the idle loop 401s in multi-tenant mode. - headers = {"Content-Type": "application/json", **auth_headers()} + # self_source_headers() adds X-Workspace-ID so the + # platform classifies the idle fire as source=agent + # rather than user-typed canvas input. + headers = { + "Content-Type": "application/json", + **self_source_headers(workspace_id), + } try: req = _urlreq.Request( f"{platform_url}/workspaces/{workspace_id}/a2a", diff --git a/workspace/platform_auth.py b/workspace/platform_auth.py index 39a17075..f767381d 100644 --- a/workspace/platform_auth.py +++ b/workspace/platform_auth.py @@ -98,6 +98,26 @@ def auth_headers() -> dict[str, str]: return {"Authorization": f"Bearer {tok}"} +def self_source_headers(workspace_id: str) -> dict[str, str]: + """Return auth headers PLUS X-Workspace-ID identifying this workspace + as the source of the request. + + Use this for any POST the workspace's own runtime fires against the + platform's A2A endpoints — heartbeat self-messages, initial_prompt, + idle-loop fires, peer-to-peer A2A from runtime tools. Without the + X-Workspace-ID header the platform's a2a_receive logger writes + source_id=NULL, which the canvas's My Chat tab interprets as a + user-typed message and renders the internal prompt to the user. + See workspace-server/internal/handlers/a2a_proxy.go:184 for the + server-side classification rule. + + Centralised here so adding a new system header (e.g. a per-fire + correlation ID) only touches one place — and so that any + workspace→A2A POST that doesn't use this helper stands out in + review as a probable bug.""" + return {**auth_headers(), "X-Workspace-ID": workspace_id} + + def clear_cache() -> None: """Reset the in-memory cache. Used by tests that write fresh token files between cases.""" diff --git a/workspace/tests/test_a2a_tools_module.py b/workspace/tests/test_a2a_tools_module.py index 3f70b631..1a058326 100644 --- a/workspace/tests/test_a2a_tools_module.py +++ b/workspace/tests/test_a2a_tools_module.py @@ -114,11 +114,11 @@ class TestDelegateTask: async def __aexit__(self, *a): pass async def get(self, url, headers=None): - calls.append(("get", url)) + calls.append(("get", url, headers)) return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): - calls.append(("post", url)) + async def post(self, url, json=None, headers=None): + calls.append(("post", url, headers)) return _FakeResponse(200, { "result": { "parts": [{"kind": "text", "text": "Task done!"}] @@ -130,7 +130,17 @@ class TestDelegateTask: result = await mod.delegate_task("ws-target", "do something") assert result == "Task done!" assert any(c[0] == "get" for c in calls) - assert any(c[0] == "post" for c in calls) + post_calls = [c for c in calls if c[0] == "post"] + assert post_calls, "delegate_task must POST to the target's /a2a endpoint" + # Regression: peer A2A POSTs MUST include X-Workspace-ID so + # the platform's a2a_receive logger writes source_id correctly + # — without it the recipient's My Chat tab would render the + # delegation as user-typed input. Same hazard fixed in + # heartbeat.py / a2a_client.py / main.py initial+idle flows. + post_headers = post_calls[0][2] or {} + assert post_headers.get("X-Workspace-ID"), ( + f"delegate_task POST must include X-Workspace-ID; got headers={post_headers!r}" + ) async def test_delegate_task_success_empty_parts(self, monkeypatch): """Result with empty parts list falls back to str(result).""" @@ -144,7 +154,7 @@ class TestDelegateTask: async def get(self, url, headers=None): return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): + async def post(self, url, json=None, headers=None): return _FakeResponse(200, {"result": {"parts": []}}) monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient) @@ -217,7 +227,7 @@ class TestDelegateTask: async def get(self, url, headers=None): return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): + async def post(self, url, json=None, headers=None): return _FakeResponse(200, { "error": {"code": -32603, "message": "Internal error"} }) @@ -240,7 +250,7 @@ class TestDelegateTask: async def get(self, url, headers=None): return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): + async def post(self, url, json=None, headers=None): return _FakeResponse(200, {"jsonrpc": "2.0", "id": "123"}) monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient) @@ -262,7 +272,7 @@ class TestDelegateTask: async def get(self, url, headers=None): return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): + async def post(self, url, json=None, headers=None): call_count["n"] += 1 raise ConnectionError("target down") diff --git a/workspace/tests/test_heartbeat.py b/workspace/tests/test_heartbeat.py index 09e971e8..b4b51b3c 100644 --- a/workspace/tests/test_heartbeat.py +++ b/workspace/tests/test_heartbeat.py @@ -269,6 +269,19 @@ async def test_check_delegations_sends_self_message(tmp_path): a2a_call = post_calls[0] assert "/a2a" in str(a2a_call) + # Regression: the self-message MUST include X-Workspace-ID set to + # the workspace's own id, so the platform's a2a_receive logger + # records source_id = workspace_id (not NULL). Without this header + # the canvas's My Chat tab (which filters source_id IS NULL) would + # render the internal "Delegation results are ready..." trigger + # as a user-typed message. Bug observed 2026-04-25 on UX A/B Lab + # Design Director chat. + a2a_headers = a2a_call.kwargs.get("headers") or {} + assert a2a_headers.get("X-Workspace-ID") == "ws-abc", ( + f"self-message must self-identify via X-Workspace-ID header, " + f"got headers={a2a_headers!r}" + ) + @pytest.mark.asyncio async def test_check_delegations_cooldown():