fix(builtin_tools/validation): extend WORKSPACE_ID validation to top-level modules

Fixes remaining unguarded WORKSPACE_ID URL usages identified after the initial
builtin_tools/ fix:

- a2a_client.py: get_peers() and get_workspace_info() now use
  get_validated_workspace_id() before URL construction. The raw module-level
  constant is still used in the discover_peer() header (low risk, not URL path).
- a2a_cli.py: peers() and info() CLI commands now validate WORKSPACE_ID before
  calling the platform API. Commands exit with error code 1 + descriptive
  message if WORKSPACE_ID is empty or malformed.

Follow-up candidates (lower priority, not URL injection risk):
- coordinator.py: WORKSPACE_ID in registry peer URL
- consolidation.py: WORKSPACE_ID in memory URLs (long-running consolidation job)
- molecule_ai_status.py: WORKSPACE_ID in activity log URL
This commit is contained in:
Molecule AI · infra-sre 2026-04-20 23:42:23 +00:00
parent d52082839f
commit 42bdf530b5
2 changed files with 28 additions and 7 deletions

View File

@ -21,7 +21,9 @@ import uuid
import httpx
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
from builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "") # used for discover() headers only; URL uses validated version
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
@ -182,8 +184,13 @@ async def check_status(target_id: str, task_id: str):
async def peers():
"""List available peers."""
try:
ws_id = get_validated_workspace_id(caller="a2a_cli.peers")
except WorkspaceIdValidationError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{PLATFORM_URL}/registry/{WORKSPACE_ID}/peers")
resp = await client.get(f"{PLATFORM_URL}/registry/{ws_id}/peers")
if resp.status_code != 200:
print("Error: could not fetch peers", file=sys.stderr)
sys.exit(1)
@ -195,8 +202,13 @@ async def peers():
async def info():
"""Get this workspace's info."""
try:
ws_id = get_validated_workspace_id(caller="a2a_cli.info")
except WorkspaceIdValidationError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}")
resp = await client.get(f"{PLATFORM_URL}/workspaces/{ws_id}")
if resp.status_code == 200:
d = resp.json()
print(f"ID: {d['id']}")

View File

@ -10,7 +10,8 @@ import uuid
import httpx
from molecule_runtime.platform_auth import auth_headers
from .builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
from .platform_auth import auth_headers
logger = logging.getLogger(__name__)
@ -83,11 +84,15 @@ async def send_a2a_message(target_url: str, message: str) -> str:
async def get_peers() -> list[dict]:
"""Get this workspace's peers from the platform registry."""
try:
ws_id = get_validated_workspace_id(caller="a2a_client.get_peers")
except WorkspaceIdValidationError:
return []
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/registry/{WORKSPACE_ID}/peers",
headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()},
f"{PLATFORM_URL}/registry/{ws_id}/peers",
headers={"X-Workspace-ID": ws_id, **auth_headers()},
)
if resp.status_code == 200:
return resp.json()
@ -98,10 +103,14 @@ async def get_peers() -> list[dict]:
async def get_workspace_info() -> dict:
"""Get this workspace's info from the platform."""
try:
ws_id = get_validated_workspace_id(caller="a2a_client.get_workspace_info")
except WorkspaceIdValidationError:
return {"error": "WORKSPACE_ID validation failed"}
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}",
f"{PLATFORM_URL}/workspaces/{ws_id}",
headers=auth_headers(),
)
if resp.status_code == 200: