fix(platform_auth): validate WORKSPACE_ID at import time (issue #14, CWE-20) (#29)

WORKSPACE_ID was read via os.environ.get("WORKSPACE_ID", "") in multiple
builtin_tools modules and used directly in platform API URLs and X-Workspace-ID
headers without validation. A crafted ID containing /, .., or # could cause
URL path injection.

Fix: validate_workspace_id() in platform_auth.py now validates the ID format
at module import time using a regex that permits only lowercase alphanumerics
and hyphens (matching UUIDs and org-generated IDs). The validated value is
exposed as a module-level WORKSPACE_ID constant. builtin_tools/approval.py
and builtin_tools/delegation.py now import from platform_auth instead of
reading os.environ directly.

Failing input raises ValueError with a clear message — workspace fails fast
at startup rather than silently accepting malformed IDs in requests.

Add 15 regression tests (45/45 passing total).

Co-authored-by: Molecule AI Infra-Runtime-BE <infra-runtime-be@agents.moleculesai.app>
Co-authored-by: Infra-Runtime-BE <infra-runtime-be@molecule.ai>
This commit is contained in:
molecule-ai[bot] 2026-04-21 00:04:54 +00:00 committed by GitHub
parent 953aa2847c
commit 30d96b4e4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 197 additions and 3 deletions

View File

@ -33,7 +33,7 @@ RBAC denials emit an ``rbac / rbac.deny / denied`` event instead.
Environment variables
---------------------
PLATFORM_URL Platform base URL (default: http://platform:8080)
WORKSPACE_ID This workspace's ID (default: "")
WORKSPACE_ID This workspace's ID (validated at startup by platform_auth)
APPROVAL_TIMEOUT Max wait in seconds (default: 300)
APPROVAL_POLL_INTERVAL Polling interval in seconds (default: 5, polling path only)
APPROVAL_USE_WEBSOCKET "true" to force WS, "false"
@ -55,7 +55,7 @@ from builtin_tools.audit import check_permission, get_workspace_roles, log_event
logger = logging.getLogger(__name__)
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
from molecule_runtime.platform_auth import WORKSPACE_ID
APPROVAL_POLL_INTERVAL = float(os.environ.get("APPROVAL_POLL_INTERVAL", "5"))
APPROVAL_TIMEOUT = float(os.environ.get("APPROVAL_TIMEOUT", "300"))

View File

@ -30,7 +30,7 @@ from builtin_tools.telemetry import (
)
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
from molecule_runtime.platform_auth import WORKSPACE_ID
DELEGATION_RETRY_ATTEMPTS = int(os.environ.get("DELEGATION_RETRY_ATTEMPTS", "3"))
DELEGATION_RETRY_DELAY = float(os.environ.get("DELEGATION_RETRY_DELAY", "5.0"))
DELEGATION_TIMEOUT = float(os.environ.get("DELEGATION_TIMEOUT", "300.0"))

View File

@ -22,15 +22,68 @@ from __future__ import annotations
import logging
import os
import re
from pathlib import Path
logger = logging.getLogger(__name__)
# Valid workspace ID: lowercase alphanumeric + hyphens (UUIDs and org-generated IDs).
# Rejects /, \, .., #, ?, &, newlines — all chars that could break URL paths
# or HTTP header values. This is the single validation gate for WORKSPACE_ID.
_WORKSPACE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9\-]{0,127}$")
# Cached result — validated once per process startup, not on every call.
_validated_workspace_id: str | None = None
def validate_workspace_id(workspace_id: str) -> str:
"""Validate *workspace_id* and return it.
Raises ValueError if the ID is empty, contains unsafe characters, or
does not match the expected format. This function is the single validation
gate call it once at startup and reuse the result.
Fixes issue #14 (CWE-20): prevents URL/header injection when WORKSPACE_ID
is used in platform API URLs and ``X-Workspace-ID`` headers.
"""
global _validated_workspace_id
if _validated_workspace_id is not None:
return _validated_workspace_id # pragma: no cover — cached fast path
if not workspace_id:
raise ValueError("WORKSPACE_ID is empty — set the WORKSPACE_ID env var")
# Strip and check again after strip
workspace_id = workspace_id.strip()
if not _WORKSPACE_ID_RE.match(workspace_id):
raise ValueError(
f"WORKSPACE_ID contains invalid characters: {workspace_id!r}. "
"Only lowercase letters, digits, and hyphens are allowed. "
"Ensure WORKSPACE_ID is a valid UUID or alphanumeric ID."
)
_validated_workspace_id = workspace_id
return workspace_id
# In-process cache so we don't hit disk on every heartbeat. The heartbeat
# loop fires on a short interval and reading a tiny file 10x per minute
# is wasteful. The file is the durable copy; this var is the hot path.
_cached_token: str | None = None
# Validated WORKSPACE_ID — read once at import time so every caller gets the
# same validated value without re-checking. Raises on bad input.
WORKSPACE_ID: str = validate_workspace_id(os.environ.get("WORKSPACE_ID", ""))
def get_workspace_id() -> str:
"""Return the validated workspace ID.
Cached result from module-level WORKSPACE_ID constant. Call this instead
of reading WORKSPACE_ID directly it guarantees the ID passed validation.
"""
return WORKSPACE_ID
def _token_file() -> Path:
"""Path to the on-disk token file. Respects CONFIGS_DIR, falls back

View File

@ -0,0 +1,141 @@
"""Regression tests for WORKSPACE_ID validation (issue #14, CWE-20)."""
import os
import re
import pytest
class TestValidateWorkspaceId:
"""validate_workspace_id() must reject injection characters."""
@pytest.fixture(autouse=True)
def _reset_cache(self, monkeypatch):
"""Clear module-level caches and env before each test."""
import molecule_runtime.platform_auth as pa_mod
pa_mod._validated_workspace_id = None
monkeypatch.delenv("WORKSPACE_ID", raising=False)
def test_rejects_empty_string(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "")
with pytest.raises(ValueError, match="empty"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_rejects_whitespace_only(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", " ")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_rejects_slash(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "ws/foo")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_rejects_double_dot(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "ws..foo")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_rejects_hash_fragment(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "ws#foo")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_rejects_question_mark(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "ws?foo")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_rejects_backslash(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "ws\\foo")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_rejects_newline(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "ws\nfoo")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_accepts_valid_uuid(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "53255246-1f34-432c-87e5-ae07f888f905")
assert pa_mod.validate_workspace_id("53255246-1f34-432c-87e5-ae07f888f905") == "53255246-1f34-432c-87e5-ae07f888f905"
def test_accepts_simple_alphanumeric(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "test-workspace")
assert pa_mod.validate_workspace_id("test-workspace") == "test-workspace"
def test_rejects_uppercase(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "Test-Workspace")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_rejects_starts_with_hyphen(self, monkeypatch):
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "-test-workspace")
with pytest.raises(ValueError, match="invalid characters"):
pa_mod.validate_workspace_id(os.environ["WORKSPACE_ID"])
def test_cache_returns_same_result(self, monkeypatch):
"""Second call returns cached result without re-validation."""
import molecule_runtime.platform_auth as pa_mod
monkeypatch.setenv("WORKSPACE_ID", "my-workspace-123")
first = pa_mod.validate_workspace_id("my-workspace-123")
second = pa_mod.validate_workspace_id("my-workspace-123")
assert first == second == "my-workspace-123"
class TestWorkspaceIdConstantInApproval:
"""approval.py imports WORKSPACE_ID from platform_auth."""
def test_approval_imports_validated_id(self, monkeypatch):
"""approval.py uses platform_auth's validated WORKSPACE_ID constant."""
import molecule_runtime.builtin_tools.approval as approval_mod
from molecule_runtime.platform_auth import WORKSPACE_ID
# Verify it's the same object
assert approval_mod.WORKSPACE_ID is WORKSPACE_ID
# Verify the value contains no injection chars
assert "\n" not in approval_mod.WORKSPACE_ID
assert "/" not in approval_mod.WORKSPACE_ID
assert "\\" not in approval_mod.WORKSPACE_ID
assert ".." not in approval_mod.WORKSPACE_ID
class TestWorkspaceIdConstantInDelegation:
"""delegation.py imports WORKSPACE_ID from platform_auth."""
def test_delegation_imports_validated_id(self, monkeypatch):
"""delegation.py uses platform_auth's validated WORKSPACE_ID constant."""
import molecule_runtime.builtin_tools.delegation as delegation_mod
from molecule_runtime.platform_auth import WORKSPACE_ID
assert delegation_mod.WORKSPACE_ID is WORKSPACE_ID
assert "\n" not in delegation_mod.WORKSPACE_ID
assert "/" not in delegation_mod.WORKSPACE_ID
assert "\\" not in delegation_mod.WORKSPACE_ID
assert ".." not in delegation_mod.WORKSPACE_ID