molecule-core/workspace/platform_auth.py
Hongming Wang 65b531acf6 fix(workspace): tag self-originated A2A POSTs with X-Workspace-ID
Workspace runtime fired four classes of A2A request to the platform
without the X-Workspace-ID header that identifies the source
workspace: heartbeat self-messages, initial_prompt, idle-loop fires,
and peer-to-peer A2A from runtime tools. The platform's a2a_receive
logger keys source_id off that header — without it, every such row
was written with source_id=NULL, which the canvas's My Chat tab
filters as ?source=canvas (i.e. "user typed this") and rendered the
internal triggers as if the human user had sent them. The
"Delegation results are ready..." heartbeat trigger was visible to
end users in the chat history; delegate_task A2A calls between agents
were misclassified the same way.

Centralise the header construction in a new platform_auth helper
self_source_headers(workspace_id) that returns auth_headers() PLUS
{X-Workspace-ID: <id>}. Apply it to:

  - heartbeat.py self-message (refactored from inline header dict)
  - main.py initial_prompt POST
  - main.py idle_prompt POST
  - a2a_client.py send_a2a_message (peer A2A from runtime)
  - builtin_tools/a2a_tools.py delegate_task (was missing ALL headers)

Tests:
  - test_heartbeat.py asserts the X-Workspace-ID header is set on
    the self-message POST.
  - test_a2a_tools_module.py asserts the same on delegate_task POSTs;
    FakeClient.post mocks updated to accept the headers kwarg.

Production effect lands the moment workspace containers are rebuilt
with this code; existing rows in activity_logs keep their NULL
source_id (legacy data). The canvas-side filter (#follow-up)
covers the historical-rows case until backfill.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 19:54:43 -07:00

137 lines
5.0 KiB
Python

"""Workspace auth-token store (Phase 30.1).
Single source of truth for this workspace's authentication token. The
token is issued by the platform on the first successful
``POST /registry/register`` call and travels with every subsequent
heartbeat / update-card / (later) secrets-pull / A2A request.
The token is persisted to ``<configs>/.auth_token`` so it survives
restarts — we only expect to receive it once from the platform, since
``/registry/register`` no-ops token issuance for workspaces that already
have one on file.
Storage:
${CONFIGS_DIR}/.auth_token # 0600, one line, no trailing newline
Callers interact with three functions:
:func:`get_token` — returns the cached token or None
:func:`save_token` — persists a freshly-issued token
:func:`auth_headers`— builds the Authorization header dict for httpx
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
logger = logging.getLogger(__name__)
# In-process cache so we don't hit disk on every heartbeat. The heartbeat
# loop fires on a short interval and reading a tiny file 10x per minute
# is wasteful. The file is the durable copy; this var is the hot path.
_cached_token: str | None = None
def _token_file() -> Path:
"""Path to the on-disk token file. Respects CONFIGS_DIR, falls back
to /configs for the default container layout."""
return Path(os.environ.get("CONFIGS_DIR", "/configs")) / ".auth_token"
def get_token() -> str | None:
"""Return the cached token, reading it from disk on first call."""
global _cached_token
if _cached_token is not None:
return _cached_token
path = _token_file()
if not path.exists():
return None
try:
tok = path.read_text().strip()
except OSError as exc:
logger.warning("platform_auth: failed to read %s: %s", path, exc)
return None
if not tok:
return None
_cached_token = tok
return tok
def save_token(token: str) -> None:
"""Persist a newly-issued token. Creates the file with 0600 mode atomically.
Uses ``os.open(O_CREAT, 0o600)`` so the file is never world-readable,
even transiently. The previous ``write_text()`` + ``chmod()`` approach
had a TOCTOU window where a concurrent reader could access the token
between the two syscalls (M4 — flagged in security audit cycle 10).
Idempotent — if an identical token is already on disk we skip the
write so we don't churn the file's mtime or trigger spurious
filesystem watchers."""
global _cached_token
token = token.strip()
if not token:
raise ValueError("platform_auth: refusing to save empty token")
if get_token() == token:
return
path = _token_file()
path.parent.mkdir(parents=True, exist_ok=True)
# O_CREAT | O_WRONLY | O_TRUNC with mode=0o600 atomically creates (or
# truncates) the file with restricted permissions in a single syscall,
# eliminating the TOCTOU window.
fd = os.open(str(path), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
try:
os.write(fd, token.encode())
finally:
os.close(fd)
_cached_token = token
def auth_headers() -> dict[str, str]:
"""Return a header dict to merge into httpx calls. Empty if no token
is available yet — callers send the request as-is and the platform's
heartbeat handler grandfathers pre-token workspaces through until
their next /registry/register issues one."""
tok = get_token()
if not tok:
return {}
return {"Authorization": f"Bearer {tok}"}
def self_source_headers(workspace_id: str) -> dict[str, str]:
"""Return auth headers PLUS X-Workspace-ID identifying this workspace
as the source of the request.
Use this for any POST the workspace's own runtime fires against the
platform's A2A endpoints — heartbeat self-messages, initial_prompt,
idle-loop fires, peer-to-peer A2A from runtime tools. Without the
X-Workspace-ID header the platform's a2a_receive logger writes
source_id=NULL, which the canvas's My Chat tab interprets as a
user-typed message and renders the internal prompt to the user.
See workspace-server/internal/handlers/a2a_proxy.go:184 for the
server-side classification rule.
Centralised here so adding a new system header (e.g. a per-fire
correlation ID) only touches one place — and so that any
workspace→A2A POST that doesn't use this helper stands out in
review as a probable bug."""
return {**auth_headers(), "X-Workspace-ID": workspace_id}
def clear_cache() -> None:
"""Reset the in-memory cache. Used by tests that write fresh token
files between cases."""
global _cached_token
_cached_token = None
def refresh_cache() -> str | None:
"""Force re-read of the token from disk, discarding the in-process cache.
Use this when a 401 response suggests the cached token is stale —
e.g. after the platform rotates tokens during a restart (issue #1877).
Returns the (new) token value or None if not found/error."""
global _cached_token
_cached_token = None
return get_token()