fix(workspace): tag self-originated A2A POSTs with X-Workspace-ID
Workspace runtime fired four classes of A2A request to the platform
without the X-Workspace-ID header that identifies the source
workspace: heartbeat self-messages, initial_prompt, idle-loop fires,
and peer-to-peer A2A from runtime tools. The platform's a2a_receive
logger keys source_id off that header — without it, every such row
was written with source_id=NULL, which the canvas's My Chat tab
filters as ?source=canvas (i.e. "user typed this") and rendered the
internal triggers as if the human user had sent them. The
"Delegation results are ready..." heartbeat trigger was visible to
end users in the chat history; delegate_task A2A calls between agents
were misclassified the same way.
Centralise the header construction in a new platform_auth helper
self_source_headers(workspace_id) that returns auth_headers() PLUS
{X-Workspace-ID: <id>}. Apply it to:
- heartbeat.py self-message (refactored from inline header dict)
- main.py initial_prompt POST
- main.py idle_prompt POST
- a2a_client.py send_a2a_message (peer A2A from runtime)
- builtin_tools/a2a_tools.py delegate_task (was missing ALL headers)
Tests:
- test_heartbeat.py asserts the X-Workspace-ID header is set on
the self-message POST.
- test_a2a_tools_module.py asserts the same on delegate_task POSTs;
FakeClient.post mocks updated to accept the headers kwarg.
Production effect lands the moment workspace containers are rebuilt
with this code; existing rows in activity_logs keep their NULL
source_id (legacy data). The canvas-side filter (#follow-up)
covers the historical-rows case until backfill.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
01c417828d
commit
65b531acf6
@ -10,7 +10,7 @@ import uuid
|
||||
|
||||
import httpx
|
||||
|
||||
from platform_auth import auth_headers
|
||||
from platform_auth import auth_headers, self_source_headers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -56,9 +56,15 @@ async def send_a2a_message(target_url: str, message: str) -> str:
|
||||
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
|
||||
) as client:
|
||||
try:
|
||||
# self_source_headers() includes X-Workspace-ID so the
|
||||
# platform's a2a_receive logger records source_id =
|
||||
# WORKSPACE_ID. Otherwise peer-A2A messages — including
|
||||
# the case where target_url resolves to this workspace's
|
||||
# own /a2a — get logged with source_id=NULL and surface
|
||||
# in the recipient's My Chat tab as user-typed input.
|
||||
resp = await client.post(
|
||||
target_url,
|
||||
headers=auth_headers(),
|
||||
headers=self_source_headers(WORKSPACE_ID),
|
||||
json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": str(uuid.uuid4()),
|
||||
|
||||
@ -42,10 +42,15 @@ async def delegate_task(workspace_id: str, task: str) -> str:
|
||||
except Exception as e:
|
||||
return f"Error discovering workspace: {e}"
|
||||
|
||||
# Send A2A message
|
||||
# Send A2A message. X-Workspace-ID identifies us as the source —
|
||||
# without it the platform's a2a_receive logger writes
|
||||
# source_id=NULL and the recipient's My Chat tab renders the
|
||||
# delegation as if a human user typed it. Same hazard fixed
|
||||
# in heartbeat.py / a2a_client.py / main.py initial+idle flows.
|
||||
try:
|
||||
a2a_resp = await client.post(
|
||||
target_url,
|
||||
headers={"X-Workspace-ID": WORKSPACE_ID},
|
||||
json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": str(uuid.uuid4()),
|
||||
|
||||
@ -17,7 +17,7 @@ from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
from platform_auth import auth_headers, refresh_cache
|
||||
from platform_auth import auth_headers, refresh_cache, self_source_headers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -284,6 +284,9 @@ class HeartbeatLoop:
|
||||
else:
|
||||
self._last_self_message_time = now
|
||||
try:
|
||||
# self_source_headers() adds X-Workspace-ID so the
|
||||
# platform tags this row source=agent, not canvas
|
||||
# — see platform_auth.py for the full rationale.
|
||||
await client.post(
|
||||
f"{self.platform_url}/workspaces/{self.workspace_id}/a2a",
|
||||
json={
|
||||
@ -295,7 +298,7 @@ class HeartbeatLoop:
|
||||
},
|
||||
},
|
||||
},
|
||||
headers=auth_headers(),
|
||||
headers=self_source_headers(self.workspace_id),
|
||||
timeout=120.0,
|
||||
)
|
||||
logger.info("Heartbeat: self-message sent to process delegation results")
|
||||
|
||||
@ -33,7 +33,7 @@ from initial_prompt import (
|
||||
mark_initial_prompt_attempted,
|
||||
resolve_initial_prompt_marker,
|
||||
)
|
||||
from platform_auth import auth_headers
|
||||
from platform_auth import auth_headers, self_source_headers
|
||||
|
||||
|
||||
def get_machine_ip() -> str: # pragma: no cover
|
||||
@ -439,7 +439,15 @@ async def main(): # pragma: no cover
|
||||
# silently rejected once any workspace has a live token on
|
||||
# file. Without this, initial_prompt 401s in multi-tenant
|
||||
# mode exactly like /registry/register did in #215.
|
||||
headers = {"Content-Type": "application/json", **auth_headers()}
|
||||
# X-Workspace-ID via self_source_headers() so the platform
|
||||
# tags the row source=agent — without it the canvas's
|
||||
# My Chat tab renders the initial_prompt as if the user
|
||||
# had typed it. See platform_auth.py for the full
|
||||
# explanation.
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
**self_source_headers(workspace_id),
|
||||
}
|
||||
|
||||
# Retry with backoff — the platform proxy may not be able to
|
||||
# reach us yet (container networking takes a moment to settle).
|
||||
@ -531,7 +539,13 @@ async def main(): # pragma: no cover
|
||||
# actual outcome instead of a bare "post failed" line.
|
||||
# #220: include auth_headers() on every idle fire. Without
|
||||
# this, the idle loop 401s in multi-tenant mode.
|
||||
headers = {"Content-Type": "application/json", **auth_headers()}
|
||||
# self_source_headers() adds X-Workspace-ID so the
|
||||
# platform classifies the idle fire as source=agent
|
||||
# rather than user-typed canvas input.
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
**self_source_headers(workspace_id),
|
||||
}
|
||||
try:
|
||||
req = _urlreq.Request(
|
||||
f"{platform_url}/workspaces/{workspace_id}/a2a",
|
||||
|
||||
@ -98,6 +98,26 @@ def auth_headers() -> dict[str, str]:
|
||||
return {"Authorization": f"Bearer {tok}"}
|
||||
|
||||
|
||||
def self_source_headers(workspace_id: str) -> dict[str, str]:
|
||||
"""Return auth headers PLUS X-Workspace-ID identifying this workspace
|
||||
as the source of the request.
|
||||
|
||||
Use this for any POST the workspace's own runtime fires against the
|
||||
platform's A2A endpoints — heartbeat self-messages, initial_prompt,
|
||||
idle-loop fires, peer-to-peer A2A from runtime tools. Without the
|
||||
X-Workspace-ID header the platform's a2a_receive logger writes
|
||||
source_id=NULL, which the canvas's My Chat tab interprets as a
|
||||
user-typed message and renders the internal prompt to the user.
|
||||
See workspace-server/internal/handlers/a2a_proxy.go:184 for the
|
||||
server-side classification rule.
|
||||
|
||||
Centralised here so adding a new system header (e.g. a per-fire
|
||||
correlation ID) only touches one place — and so that any
|
||||
workspace→A2A POST that doesn't use this helper stands out in
|
||||
review as a probable bug."""
|
||||
return {**auth_headers(), "X-Workspace-ID": workspace_id}
|
||||
|
||||
|
||||
def clear_cache() -> None:
|
||||
"""Reset the in-memory cache. Used by tests that write fresh token
|
||||
files between cases."""
|
||||
|
||||
@ -114,11 +114,11 @@ class TestDelegateTask:
|
||||
async def __aexit__(self, *a): pass
|
||||
|
||||
async def get(self, url, headers=None):
|
||||
calls.append(("get", url))
|
||||
calls.append(("get", url, headers))
|
||||
return _FakeResponse(200, {"url": "http://target.test/a2a"})
|
||||
|
||||
async def post(self, url, json=None):
|
||||
calls.append(("post", url))
|
||||
async def post(self, url, json=None, headers=None):
|
||||
calls.append(("post", url, headers))
|
||||
return _FakeResponse(200, {
|
||||
"result": {
|
||||
"parts": [{"kind": "text", "text": "Task done!"}]
|
||||
@ -130,7 +130,17 @@ class TestDelegateTask:
|
||||
result = await mod.delegate_task("ws-target", "do something")
|
||||
assert result == "Task done!"
|
||||
assert any(c[0] == "get" for c in calls)
|
||||
assert any(c[0] == "post" for c in calls)
|
||||
post_calls = [c for c in calls if c[0] == "post"]
|
||||
assert post_calls, "delegate_task must POST to the target's /a2a endpoint"
|
||||
# Regression: peer A2A POSTs MUST include X-Workspace-ID so
|
||||
# the platform's a2a_receive logger writes source_id correctly
|
||||
# — without it the recipient's My Chat tab would render the
|
||||
# delegation as user-typed input. Same hazard fixed in
|
||||
# heartbeat.py / a2a_client.py / main.py initial+idle flows.
|
||||
post_headers = post_calls[0][2] or {}
|
||||
assert post_headers.get("X-Workspace-ID"), (
|
||||
f"delegate_task POST must include X-Workspace-ID; got headers={post_headers!r}"
|
||||
)
|
||||
|
||||
async def test_delegate_task_success_empty_parts(self, monkeypatch):
|
||||
"""Result with empty parts list falls back to str(result)."""
|
||||
@ -144,7 +154,7 @@ class TestDelegateTask:
|
||||
async def get(self, url, headers=None):
|
||||
return _FakeResponse(200, {"url": "http://target.test/a2a"})
|
||||
|
||||
async def post(self, url, json=None):
|
||||
async def post(self, url, json=None, headers=None):
|
||||
return _FakeResponse(200, {"result": {"parts": []}})
|
||||
|
||||
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
|
||||
@ -217,7 +227,7 @@ class TestDelegateTask:
|
||||
async def get(self, url, headers=None):
|
||||
return _FakeResponse(200, {"url": "http://target.test/a2a"})
|
||||
|
||||
async def post(self, url, json=None):
|
||||
async def post(self, url, json=None, headers=None):
|
||||
return _FakeResponse(200, {
|
||||
"error": {"code": -32603, "message": "Internal error"}
|
||||
})
|
||||
@ -240,7 +250,7 @@ class TestDelegateTask:
|
||||
async def get(self, url, headers=None):
|
||||
return _FakeResponse(200, {"url": "http://target.test/a2a"})
|
||||
|
||||
async def post(self, url, json=None):
|
||||
async def post(self, url, json=None, headers=None):
|
||||
return _FakeResponse(200, {"jsonrpc": "2.0", "id": "123"})
|
||||
|
||||
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
|
||||
@ -262,7 +272,7 @@ class TestDelegateTask:
|
||||
async def get(self, url, headers=None):
|
||||
return _FakeResponse(200, {"url": "http://target.test/a2a"})
|
||||
|
||||
async def post(self, url, json=None):
|
||||
async def post(self, url, json=None, headers=None):
|
||||
call_count["n"] += 1
|
||||
raise ConnectionError("target down")
|
||||
|
||||
|
||||
@ -269,6 +269,19 @@ async def test_check_delegations_sends_self_message(tmp_path):
|
||||
a2a_call = post_calls[0]
|
||||
assert "/a2a" in str(a2a_call)
|
||||
|
||||
# Regression: the self-message MUST include X-Workspace-ID set to
|
||||
# the workspace's own id, so the platform's a2a_receive logger
|
||||
# records source_id = workspace_id (not NULL). Without this header
|
||||
# the canvas's My Chat tab (which filters source_id IS NULL) would
|
||||
# render the internal "Delegation results are ready..." trigger
|
||||
# as a user-typed message. Bug observed 2026-04-25 on UX A/B Lab
|
||||
# Design Director chat.
|
||||
a2a_headers = a2a_call.kwargs.get("headers") or {}
|
||||
assert a2a_headers.get("X-Workspace-ID") == "ws-abc", (
|
||||
f"self-message must self-identify via X-Workspace-ID header, "
|
||||
f"got headers={a2a_headers!r}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_delegations_cooldown():
|
||||
|
||||
Loading…
Reference in New Issue
Block a user