fix(builtin_tools/validation): cover remaining WORKSPACE_ID URL usages

Extend get_validated_workspace_id() to all remaining unguarded URL positions:

- consolidation.py: _consolidate() — validates before GET/POST/DELETE to
  /workspaces/{id}/memories endpoints. Graceful skip on failure (log + return).
- coordinator.py: get_children() — validates before /registry/{id}/peers.
  Graceful skip (empty list) on failure.
- molecule_ai_status.py: set_status() — validates before /registry/heartbeat
  and /workspaces/{id}/activity. Exits with descriptive error on failure.

With these three, every runtime use of WORKSPACE_ID in a URL path is now
validated. Remaining WORKSPACE_ID uses are:
- JSON body fields (not injection-risky): heartbeat, memory POST bodies
- Header values (X-Workspace-ID): lower risk, non-URL-injection
This commit is contained in:
Molecule AI · infra-sre 2026-04-20 23:53:15 +00:00
parent 42bdf530b5
commit be9c9997c0
3 changed files with 36 additions and 12 deletions

View File

@ -14,7 +14,8 @@ import os
import httpx
from molecule_runtime.platform_auth import auth_headers
from .builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
from .platform_auth import auth_headers
logger = logging.getLogger(__name__)
@ -50,10 +51,17 @@ class ConsolidationLoop:
async def _consolidate(self):
"""Check if consolidation is needed and run it."""
# --- Workspace ID validation (CWE-20 / CWE-88) ------------------------
try:
ws_id = get_validated_workspace_id(caller="consolidation._consolidate")
except WorkspaceIdValidationError as e:
logger.warning("Consolidation skipped: %s", e)
return
async with httpx.AsyncClient(timeout=10.0) as client:
# Fetch local memories
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
f"{PLATFORM_URL}/workspaces/{ws_id}/memories",
params={"scope": "LOCAL"},
headers=auth_headers(),
)
@ -96,7 +104,7 @@ class ConsolidationLoop:
if summary:
# Store consolidated summary as a TEAM memory — only delete originals if POST succeeds
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
f"{PLATFORM_URL}/workspaces/{ws_id}/memories",
json={"content": f"[Consolidated] {summary}", "scope": "TEAM"},
headers=auth_headers(),
)
@ -104,7 +112,7 @@ class ConsolidationLoop:
# Safe to delete originals — consolidated version is saved
for m in memories:
await client.delete(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories/{m['id']}",
f"{PLATFORM_URL}/workspaces/{ws_id}/memories/{m['id']}",
headers=auth_headers(),
)
logger.info("Consolidated %d memories into team knowledge", len(memories))
@ -121,7 +129,7 @@ class ConsolidationLoop:
if not (self.agent and summary):
combined = " | ".join(contents[:20])
await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
f"{PLATFORM_URL}/workspaces/{ws_id}/memories",
json={"content": f"[Consolidated] {combined}", "scope": "TEAM"},
headers=auth_headers(),
)

View File

@ -17,6 +17,7 @@ import os
import httpx
from langchain_core.tools import tool
from builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
from molecule_runtime.adapters.shared_runtime import build_peer_section
from policies.routing import build_team_routing_payload
@ -51,16 +52,21 @@ async def get_parent_context() -> list[dict]:
async def get_children() -> list[dict]:
"""Fetch this workspace's children from the platform."""
try:
ws_id = get_validated_workspace_id(caller="coordinator.get_children")
except WorkspaceIdValidationError:
logger.warning("get_children skipped: invalid WORKSPACE_ID")
return []
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/registry/{WORKSPACE_ID}/peers",
headers={"X-Workspace-ID": WORKSPACE_ID},
f"{PLATFORM_URL}/registry/{ws_id}/peers",
headers={"X-Workspace-ID": ws_id},
)
if resp.status_code == 200:
peers = resp.json()
# Filter to only children (parent_id == our ID)
return [p for p in peers if p.get("parent_id") == WORKSPACE_ID]
return [p for p in peers if p.get("parent_id") == ws_id]
except Exception as e:
logger.warning("Failed to fetch children: %s", e)
return []

View File

@ -22,6 +22,8 @@ import sys
import httpx
from builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
@ -30,14 +32,22 @@ def set_status(task: str):
"""Push current_task to platform via heartbeat."""
try:
try:
from platform_auth import auth_headers as _auth
from builtin_tools.platform_auth import auth_headers as _auth
_headers = _auth()
except Exception:
_headers = {}
# --- Workspace ID validation (CWE-20 / CWE-88) -----------------------
try:
ws_id = get_validated_workspace_id(caller="molecule_ai_status.set_status")
except WorkspaceIdValidationError as e:
sys.stderr.write(f"molecule_ai_status: {e}\n")
return
httpx.post(
f"{PLATFORM_URL}/registry/heartbeat",
json={
"workspace_id": WORKSPACE_ID,
"workspace_id": ws_id,
"current_task": task,
"active_tasks": 1 if task else 0,
"error_rate": 0,
@ -50,10 +60,10 @@ def set_status(task: str):
if task:
# Also log as activity for traceability
httpx.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity",
f"{PLATFORM_URL}/workspaces/{ws_id}/activity",
json={
"activity_type": "task_update",
"source_id": WORKSPACE_ID,
"source_id": ws_id,
"summary": task,
"status": "ok",
},