diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 1a3e4f76..514dc438 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -10,7 +10,7 @@ import uuid import httpx -from platform_auth import auth_headers +from platform_auth import auth_headers, self_source_headers logger = logging.getLogger(__name__) @@ -56,9 +56,15 @@ async def send_a2a_message(target_url: str, message: str) -> str: timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0) ) as client: try: + # self_source_headers() includes X-Workspace-ID so the + # platform's a2a_receive logger records source_id = + # WORKSPACE_ID. Otherwise peer-A2A messages — including + # the case where target_url resolves to this workspace's + # own /a2a — get logged with source_id=NULL and surface + # in the recipient's My Chat tab as user-typed input. resp = await client.post( target_url, - headers=auth_headers(), + headers=self_source_headers(WORKSPACE_ID), json={ "jsonrpc": "2.0", "id": str(uuid.uuid4()), diff --git a/workspace/builtin_tools/a2a_tools.py b/workspace/builtin_tools/a2a_tools.py index 8c7be15f..df4f9d78 100644 --- a/workspace/builtin_tools/a2a_tools.py +++ b/workspace/builtin_tools/a2a_tools.py @@ -42,10 +42,15 @@ async def delegate_task(workspace_id: str, task: str) -> str: except Exception as e: return f"Error discovering workspace: {e}" - # Send A2A message + # Send A2A message. X-Workspace-ID identifies us as the source — + # without it the platform's a2a_receive logger writes + # source_id=NULL and the recipient's My Chat tab renders the + # delegation as if a human user typed it. Same hazard fixed + # in heartbeat.py / a2a_client.py / main.py initial+idle flows. try: a2a_resp = await client.post( target_url, + headers={"X-Workspace-ID": WORKSPACE_ID}, json={ "jsonrpc": "2.0", "id": str(uuid.uuid4()), diff --git a/workspace/heartbeat.py b/workspace/heartbeat.py index 1eb5b4fd..7e8b1c69 100644 --- a/workspace/heartbeat.py +++ b/workspace/heartbeat.py @@ -17,7 +17,7 @@ from pathlib import Path import httpx -from platform_auth import auth_headers, refresh_cache +from platform_auth import auth_headers, refresh_cache, self_source_headers logger = logging.getLogger(__name__) @@ -284,6 +284,9 @@ class HeartbeatLoop: else: self._last_self_message_time = now try: + # self_source_headers() adds X-Workspace-ID so the + # platform tags this row source=agent, not canvas + # — see platform_auth.py for the full rationale. await client.post( f"{self.platform_url}/workspaces/{self.workspace_id}/a2a", json={ @@ -295,7 +298,7 @@ class HeartbeatLoop: }, }, }, - headers=auth_headers(), + headers=self_source_headers(self.workspace_id), timeout=120.0, ) logger.info("Heartbeat: self-message sent to process delegation results") diff --git a/workspace/main.py b/workspace/main.py index 5b6b4409..ab0b06f5 100644 --- a/workspace/main.py +++ b/workspace/main.py @@ -33,7 +33,7 @@ from initial_prompt import ( mark_initial_prompt_attempted, resolve_initial_prompt_marker, ) -from platform_auth import auth_headers +from platform_auth import auth_headers, self_source_headers def get_machine_ip() -> str: # pragma: no cover @@ -439,7 +439,15 @@ async def main(): # pragma: no cover # silently rejected once any workspace has a live token on # file. Without this, initial_prompt 401s in multi-tenant # mode exactly like /registry/register did in #215. - headers = {"Content-Type": "application/json", **auth_headers()} + # X-Workspace-ID via self_source_headers() so the platform + # tags the row source=agent — without it the canvas's + # My Chat tab renders the initial_prompt as if the user + # had typed it. See platform_auth.py for the full + # explanation. + headers = { + "Content-Type": "application/json", + **self_source_headers(workspace_id), + } # Retry with backoff — the platform proxy may not be able to # reach us yet (container networking takes a moment to settle). @@ -531,7 +539,13 @@ async def main(): # pragma: no cover # actual outcome instead of a bare "post failed" line. # #220: include auth_headers() on every idle fire. Without # this, the idle loop 401s in multi-tenant mode. - headers = {"Content-Type": "application/json", **auth_headers()} + # self_source_headers() adds X-Workspace-ID so the + # platform classifies the idle fire as source=agent + # rather than user-typed canvas input. + headers = { + "Content-Type": "application/json", + **self_source_headers(workspace_id), + } try: req = _urlreq.Request( f"{platform_url}/workspaces/{workspace_id}/a2a", diff --git a/workspace/platform_auth.py b/workspace/platform_auth.py index 39a17075..f767381d 100644 --- a/workspace/platform_auth.py +++ b/workspace/platform_auth.py @@ -98,6 +98,26 @@ def auth_headers() -> dict[str, str]: return {"Authorization": f"Bearer {tok}"} +def self_source_headers(workspace_id: str) -> dict[str, str]: + """Return auth headers PLUS X-Workspace-ID identifying this workspace + as the source of the request. + + Use this for any POST the workspace's own runtime fires against the + platform's A2A endpoints — heartbeat self-messages, initial_prompt, + idle-loop fires, peer-to-peer A2A from runtime tools. Without the + X-Workspace-ID header the platform's a2a_receive logger writes + source_id=NULL, which the canvas's My Chat tab interprets as a + user-typed message and renders the internal prompt to the user. + See workspace-server/internal/handlers/a2a_proxy.go:184 for the + server-side classification rule. + + Centralised here so adding a new system header (e.g. a per-fire + correlation ID) only touches one place — and so that any + workspace→A2A POST that doesn't use this helper stands out in + review as a probable bug.""" + return {**auth_headers(), "X-Workspace-ID": workspace_id} + + def clear_cache() -> None: """Reset the in-memory cache. Used by tests that write fresh token files between cases.""" diff --git a/workspace/tests/test_a2a_tools_module.py b/workspace/tests/test_a2a_tools_module.py index 3f70b631..1a058326 100644 --- a/workspace/tests/test_a2a_tools_module.py +++ b/workspace/tests/test_a2a_tools_module.py @@ -114,11 +114,11 @@ class TestDelegateTask: async def __aexit__(self, *a): pass async def get(self, url, headers=None): - calls.append(("get", url)) + calls.append(("get", url, headers)) return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): - calls.append(("post", url)) + async def post(self, url, json=None, headers=None): + calls.append(("post", url, headers)) return _FakeResponse(200, { "result": { "parts": [{"kind": "text", "text": "Task done!"}] @@ -130,7 +130,17 @@ class TestDelegateTask: result = await mod.delegate_task("ws-target", "do something") assert result == "Task done!" assert any(c[0] == "get" for c in calls) - assert any(c[0] == "post" for c in calls) + post_calls = [c for c in calls if c[0] == "post"] + assert post_calls, "delegate_task must POST to the target's /a2a endpoint" + # Regression: peer A2A POSTs MUST include X-Workspace-ID so + # the platform's a2a_receive logger writes source_id correctly + # — without it the recipient's My Chat tab would render the + # delegation as user-typed input. Same hazard fixed in + # heartbeat.py / a2a_client.py / main.py initial+idle flows. + post_headers = post_calls[0][2] or {} + assert post_headers.get("X-Workspace-ID"), ( + f"delegate_task POST must include X-Workspace-ID; got headers={post_headers!r}" + ) async def test_delegate_task_success_empty_parts(self, monkeypatch): """Result with empty parts list falls back to str(result).""" @@ -144,7 +154,7 @@ class TestDelegateTask: async def get(self, url, headers=None): return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): + async def post(self, url, json=None, headers=None): return _FakeResponse(200, {"result": {"parts": []}}) monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient) @@ -217,7 +227,7 @@ class TestDelegateTask: async def get(self, url, headers=None): return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): + async def post(self, url, json=None, headers=None): return _FakeResponse(200, { "error": {"code": -32603, "message": "Internal error"} }) @@ -240,7 +250,7 @@ class TestDelegateTask: async def get(self, url, headers=None): return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): + async def post(self, url, json=None, headers=None): return _FakeResponse(200, {"jsonrpc": "2.0", "id": "123"}) monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient) @@ -262,7 +272,7 @@ class TestDelegateTask: async def get(self, url, headers=None): return _FakeResponse(200, {"url": "http://target.test/a2a"}) - async def post(self, url, json=None): + async def post(self, url, json=None, headers=None): call_count["n"] += 1 raise ConnectionError("target down") diff --git a/workspace/tests/test_heartbeat.py b/workspace/tests/test_heartbeat.py index 09e971e8..b4b51b3c 100644 --- a/workspace/tests/test_heartbeat.py +++ b/workspace/tests/test_heartbeat.py @@ -269,6 +269,19 @@ async def test_check_delegations_sends_self_message(tmp_path): a2a_call = post_calls[0] assert "/a2a" in str(a2a_call) + # Regression: the self-message MUST include X-Workspace-ID set to + # the workspace's own id, so the platform's a2a_receive logger + # records source_id = workspace_id (not NULL). Without this header + # the canvas's My Chat tab (which filters source_id IS NULL) would + # render the internal "Delegation results are ready..." trigger + # as a user-typed message. Bug observed 2026-04-25 on UX A/B Lab + # Design Director chat. + a2a_headers = a2a_call.kwargs.get("headers") or {} + assert a2a_headers.get("X-Workspace-ID") == "ws-abc", ( + f"self-message must self-identify via X-Workspace-ID header, " + f"got headers={a2a_headers!r}" + ) + @pytest.mark.asyncio async def test_check_delegations_cooldown():