feat(workspace): /internal/chat/uploads/ingest endpoint (RFC #2312, PR-B)

Stacked on PR-A (#2313). The platform-side rewrite that actually calls
this endpoint lands in PR-C; this PR adds the workspace-side consumer
+ hardening so PR-C is a small Go-only diff.

What this adds:

  * platform_inbound_auth.py — auth gate mirroring transcript_auth.py.
    Reads /configs/.platform_inbound_secret (delivered by the PR-A
    provisioner). Fail-closed when the file is missing/empty/unreadable.
    Constant-time compare via hmac.compare_digest.

  * internal_chat_uploads.py — POST /internal/chat/uploads/ingest.
    Multipart parse → sanitize each filename → write to
    /workspace/.molecule/chat-uploads/<random>-<name> with
    O_CREAT|O_EXCL|O_NOFOLLOW. Same response shape (uri/name/mimeType/
    size + workspace: URI scheme) as the legacy Go handler — canvas /
    agent code that resolves "workspace:..." paths keeps working.

  * Wired into workspace/main.py via starlette_app.add_route alongside
    the existing /transcript route.

  * python-multipart>=0.0.18 added to requirements.txt (Starlette's
    Request.form() needs it; ≥ 0.0.18 closes CVE-2024-53981).

Test coverage (36 tests, all green; full workspace suite 1266 passed):

  * test_platform_inbound_auth.py — 14 tests:
      happy path, fail-closed on missing file, empty file, whitespace-
      only file, missing/case-wrong/empty Bearer prefix, in-process
      cache, default CONFIGS_DIR fallback, end-to-end file → authorized.

  * test_internal_chat_uploads.py — 22 tests:
      sanitize_filename matrix (incl. ../traversal, CJK chars, length
      truncation), 401 on missing/wrong/no-secret-file bearer, single +
      batch upload happy paths, unique random prefix on duplicate names,
      mimetype guess fallback, 400 on missing files field, 413 on per-
      file + total-body oversize, symlink-at-target refusal (with
      sentinel-content unchanged assertion).

Why this is safe to ship before PR-C:

  * No platform-side caller yet → no behavior change visible to users.
  * Auth fails closed; nothing on the network can hit a write path
    until the platform forwards with the matching bearer.
  * Workspace's existing routes (/health, /transcript, /handle/*) are
    unchanged.

Refs #2312 (parent RFC), #2308 (chat upload 503 incident).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-29 14:16:14 -07:00
parent 1c9cea980d
commit d1de330152
6 changed files with 737 additions and 0 deletions

View File

@ -0,0 +1,241 @@
"""POST /internal/chat/uploads/ingest — workspace-side chat upload sink.
Replaces the Docker-exec / tar-copy path the platform-side workspace-server
used historically (see RFC #2312). The platform forwards the multipart
request to this handler with a Bearer header carrying the workspace's
inbound secret; this handler validates, writes each file under
``/workspace/.molecule/chat-uploads/<random>-<sanitized-name>``, and
returns the same ``ChatUploadedFile`` shape the platform Go handler
returned previously, so callers (canvas, molecli, A2A tools) see no
contract change.
Why no platform-side Docker-exec equivalent here:
The handler runs INSIDE the workspace container, which already has
direct filesystem access to /workspace. mkdir + open + write is
enough no archive ceremony, no remote-exec round-trip, no
docker socket dependency. Same code path on local Docker and SaaS
EC2; the bug behind #2308 (platform's findContainer is nil in
SaaS) cannot exist here by construction.
Path safety:
sanitize_filename strips everything outside [A-Za-z0-9._-], collapses
spaces, refuses ``""``/`"."`/`".."`, and caps length at 100 chars
(preserving extension if 16 chars). Files are written with
O_CREAT|O_EXCL|O_NOFOLLOW so a pre-existing symlink at the target
cannot redirect the write to /etc/* or any sensitive location, and
a colliding name fails fast (the random prefix already makes
collisions astronomical, but defense-in-depth costs nothing).
Limits (matches the Go contract from chat_files.go):
- 50 MB total request body
- 25 MB per file
- filename truncated to 100 chars
Response shape:
{"files": [
{"uri": "workspace:/workspace/.molecule/chat-uploads/<id>-<name>",
"name": "<sanitized name>",
"mimeType": "<content-type or guessed>",
"size": <bytes>}
]}
"""
from __future__ import annotations
import logging
import mimetypes
import os
import re
import secrets as pysecrets
from pathlib import Path
from starlette.requests import Request
from starlette.responses import JSONResponse
from platform_inbound_auth import get_inbound_secret, inbound_authorized
logger = logging.getLogger(__name__)
# In-container destination — must match the platform-side Go constant
# `chatUploadDir` so the URI scheme stays identical and existing canvas
# / agent code that resolves "workspace:/workspace/.molecule/chat-uploads/*"
# keeps working unchanged.
CHAT_UPLOAD_DIR = "/workspace/.molecule/chat-uploads"
# Total-request body cap. multipart/form-data with multiple parts can
# add ~100 bytes of framing per file; the cap is the bytes hitting the
# socket, including framing.
CHAT_UPLOAD_MAX_BYTES = 50 * 1024 * 1024 # 50 MB
# Per-file cap. Keeping per-file under total lets a user attach, say,
# a 5 MB PDF + 10 small screenshots in a single batch.
CHAT_UPLOAD_MAX_FILE_BYTES = 25 * 1024 * 1024 # 25 MB
# Conservative {alnum, dot, underscore, dash} character class — anything
# outside gets rewritten so embedded paths, control chars, newlines,
# quotes, and shell metachars never reach the filesystem.
_UNSAFE_FILENAME_CHARS = re.compile(r"[^a-zA-Z0-9._\-]")
def sanitize_filename(name: str) -> str:
"""Reduce a user-supplied filename to a safe form.
Mirrors workspace-server/internal/handlers/chat_files.go::sanitizeFilename
so canvas-emitted URIs stay identical regardless of which path
handles the upload.
"""
base = os.path.basename(name)
base = base.replace(" ", "_")
base = _UNSAFE_FILENAME_CHARS.sub("_", base)
if len(base) > 100:
ext = ""
dot = base.rfind(".")
if dot >= 0 and len(base) - dot <= 16:
ext = base[dot:]
base = base[: 100 - len(ext)] + ext
if base in ("", ".", ".."):
return "file"
return base
def _open_safe(path: str) -> int:
"""Open `path` for write with O_CREAT|O_EXCL|O_NOFOLLOW.
Refuses to follow a pre-existing symlink at the target, and refuses
to overwrite an existing regular file. Both protections close the
same class of attack: a process inside the workspace container that
raced to create a symlink at the destination before the upload landed.
The random 16-byte prefix on the stored name makes the race
effectively impossible, but defense-in-depth costs nothing here.
"""
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
# O_NOFOLLOW is POSIX; refuses to open if the path is a symlink.
if hasattr(os, "O_NOFOLLOW"):
flags |= os.O_NOFOLLOW
return os.open(path, flags, 0o600)
async def ingest_handler(request: Request) -> JSONResponse:
"""POST /internal/chat/uploads/ingest — Starlette route handler.
Auth: Bearer <platform_inbound_secret>; fail-closed when the secret
file is missing or empty.
Body: multipart/form-data with one or more `files` parts.
Returns 200 with the list of stored URIs on success, or one of:
401 unauthorized bad / missing bearer
400 bad request malformed multipart, no files field, etc.
413 payload too large total body or per-file over cap
500 internal disk write failed
"""
if not inbound_authorized(get_inbound_secret(), request.headers.get("Authorization", "")):
return JSONResponse({"error": "unauthorized"}, status_code=401)
# Total-body guard. Starlette won't enforce this for us; we read
# Content-Length first and reject early to avoid streaming a 5 GB
# request through the multipart parser only to bail at the end.
cl_str = request.headers.get("Content-Length", "")
if cl_str:
try:
cl = int(cl_str)
except ValueError:
cl = -1
if cl > CHAT_UPLOAD_MAX_BYTES:
return JSONResponse(
{"error": f"request body exceeds total limit ({CHAT_UPLOAD_MAX_BYTES // (1024*1024)} MB)"},
status_code=413,
)
try:
form = await request.form(max_files=64, max_fields=32)
except Exception as exc: # multipart parse error
logger.warning("internal_chat_uploads: multipart parse failed: %s", exc)
return JSONResponse({"error": "failed to parse multipart form"}, status_code=400)
# Starlette's FormData allows multiple values per key — `files` may
# appear multiple times for batched uploads. getlist returns them
# in order.
parts = form.getlist("files")
if not parts:
return JSONResponse({"error": "expected at least one 'files' field"}, status_code=400)
# Filter out non-file entries defensively. Starlette's UploadFile
# has a .filename attribute; plain string fields don't.
uploads = [p for p in parts if hasattr(p, "filename") and hasattr(p, "read")]
if not uploads:
return JSONResponse({"error": "expected at least one 'files' field"}, status_code=400)
# mkdir -p is idempotent. Fired every call so a container restart
# that wipes /workspace/.molecule doesn't surprise us.
try:
Path(CHAT_UPLOAD_DIR).mkdir(parents=True, exist_ok=True)
except OSError as exc:
logger.error("internal_chat_uploads: mkdir %s failed: %s", CHAT_UPLOAD_DIR, exc)
return JSONResponse({"error": "failed to prepare uploads dir"}, status_code=500)
response_files: list[dict] = []
total_bytes = 0
for upload in uploads:
# Read into memory with a hard cap. Files larger than the cap
# surface as 413; we don't truncate silently.
data = await upload.read(CHAT_UPLOAD_MAX_FILE_BYTES + 1)
if len(data) > CHAT_UPLOAD_MAX_FILE_BYTES:
return JSONResponse(
{"error": f"{upload.filename} exceeds per-file limit ({CHAT_UPLOAD_MAX_FILE_BYTES // (1024*1024)} MB)"},
status_code=413,
)
total_bytes += len(data)
if total_bytes > CHAT_UPLOAD_MAX_BYTES:
return JSONResponse(
{"error": f"total request body exceeds limit ({CHAT_UPLOAD_MAX_BYTES // (1024*1024)} MB)"},
status_code=413,
)
sanitized = sanitize_filename(upload.filename or "file")
# 16-byte random prefix → 32-hex-char + sanitized name. Same
# shape as the Go handler's `hex.EncodeToString(rand 16) + "-" + name`.
prefix = pysecrets.token_hex(16)
stored = f"{prefix}-{sanitized}"
target = os.path.join(CHAT_UPLOAD_DIR, stored)
try:
fd = _open_safe(target)
except FileExistsError:
# 32 hex chars of entropy → 128 bits → re-collision is
# astronomical. If we hit it anyway, surface as 500 rather
# than overwriting; the next retry will pick a fresh prefix.
logger.error("internal_chat_uploads: collision at %s — refusing overwrite", target)
return JSONResponse({"error": "internal collision; retry"}, status_code=500)
except OSError as exc:
logger.error("internal_chat_uploads: open %s failed: %s", target, exc)
return JSONResponse({"error": "failed to write file"}, status_code=500)
try:
with os.fdopen(fd, "wb") as f:
f.write(data)
except OSError as exc:
logger.error("internal_chat_uploads: write %s failed: %s", target, exc)
# Best-effort cleanup of the partial file. unlink can fail
# if the file was never created (open succeeded but write
# failed before any bytes hit disk) or if the dir was
# concurrently torn down — neither case warrants surfacing.
try:
os.unlink(target)
except OSError as unlink_exc:
logger.debug("internal_chat_uploads: unlink %s after write fail: %s", target, unlink_exc)
return JSONResponse({"error": "failed to write file"}, status_code=500)
# Mime type: prefer the part's Content-Type header, fall back to
# extension-based guess. matches the Go handler's precedence.
mime_type = upload.headers.get("content-type") if hasattr(upload, "headers") else None
if not mime_type:
mime_type, _ = mimetypes.guess_type(sanitized)
response_files.append({
"uri": f"workspace:{CHAT_UPLOAD_DIR}/{stored}",
"name": sanitized,
"mimeType": mime_type or "",
"size": len(data),
})
return JSONResponse({"files": response_files}, status_code=200)

View File

@ -414,6 +414,17 @@ async def main(): # pragma: no cover
starlette_app.add_route("/transcript", _transcript_handler, methods=["GET"])
# /internal/* — platform→workspace forward calls (RFC #2312). Auth
# is the per-workspace platform_inbound_secret in
# /configs/.platform_inbound_secret, distinct from the outbound
# workspace_auth_token used by /transcript above.
from internal_chat_uploads import ingest_handler as _internal_chat_uploads_ingest
starlette_app.add_route(
"/internal/chat/uploads/ingest",
_internal_chat_uploads_ingest,
methods=["POST"],
)
built_app = make_trace_middleware(starlette_app)
server_config = uvicorn.Config(

View File

@ -0,0 +1,101 @@
"""Auth gate for the /internal/* Starlette routes.
The platform calls into the workspace's HTTP server using a per-workspace
shared secret minted at provision time and stored in
``/configs/.platform_inbound_secret`` (see migration 044 + RFC #2312).
The workspace validates by string-equality against the file content
the platform side stores the same plaintext in ``workspaces
.platform_inbound_secret`` and reads it back on every forward call.
Asymmetric to ``platform_auth.py``:
platform_auth.py platform_inbound_auth.py
workspace platform platform workspace
/configs/.auth_token /configs/.platform_inbound_secret
workspace presents bearer workspace validates bearer
Fail-closed semantics (mirrors transcript_auth.py): if the secret file is
missing, empty, or unreadable, every request is rejected. The platform
will surface this as a structural error rather than silently sending
unauthenticated requests through.
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
logger = logging.getLogger(__name__)
# In-process cache so we don't hit disk on every forward call. Same
# pattern as platform_auth._cached_token. The file is the durable copy;
# this var is the hot path.
_cached_secret: str | None = None
def _secret_file() -> Path:
"""Path to the on-disk inbound-secret file. Respects CONFIGS_DIR,
falls back to /configs for the default container layout."""
return Path(os.environ.get("CONFIGS_DIR", "/configs")) / ".platform_inbound_secret"
def get_inbound_secret() -> str | None:
"""Return the cached inbound secret, reading from disk on first call.
Returns None if the file is missing, empty, or unreadable. Callers
MUST treat None as an auth failure (fail-closed) never substitute
a default or skip-auth-on-missing semantics.
"""
global _cached_secret
if _cached_secret is not None:
return _cached_secret
path = _secret_file()
if not path.exists():
return None
try:
secret = path.read_text().strip()
except OSError as exc:
logger.warning("platform_inbound_auth: read %s failed: %s", path, exc)
return None
if not secret:
return None
_cached_secret = secret
return secret
def reset_cache() -> None:
"""Drop the in-process cache. Used by tests + the rare runtime-side
path that needs to re-read after the file is overwritten (e.g. a
rotation flow lands in the future)."""
global _cached_secret
_cached_secret = None
def inbound_authorized(expected_secret: str | None, auth_header: str) -> bool:
"""Return True iff a /internal/* request should be served.
Args:
expected_secret: the workspace's stored inbound secret, or None
if /configs/.platform_inbound_secret is absent / empty /
unreadable.
auth_header: raw Authorization request header value.
Behavior:
- None / empty expected fail closed. A missing secret file
is an auth failure, not a bypass.
- Non-empty expected strict string-equality against
"Bearer <secret>". Bearer prefix is case-sensitive (matches
the platform's wsauth.BearerTokenFromHeader contract).
Constant-time comparison is used to avoid leaking the secret one
byte at a time via timing analysis on a network-reachable endpoint.
"""
if not expected_secret:
return False
expected = f"Bearer {expected_secret}"
# hmac.compare_digest is the stdlib constant-time string compare.
# Length mismatch is documented to short-circuit safely (returns
# False without leaking length-difference timing).
import hmac
return hmac.compare_digest(auth_header, expected)

View File

@ -14,6 +14,11 @@ uvicorn>=0.46.0
starlette>=0.38.0
websockets>=16.0
# multipart/form-data parser — required for Starlette's Request.form() on
# /internal/chat/uploads/ingest. Pinned ≥ 0.0.18 because earlier versions
# had a CVE-2024-53981 (DoS via malformed boundary).
python-multipart>=0.0.18
# Config parsing
pyyaml>=6.0

View File

@ -0,0 +1,259 @@
"""Unit + functional tests for /internal/chat/uploads/ingest.
Exercises the route via Starlette's TestClient so multipart parsing,
auth, and disk-write paths all run together.
"""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.testclient import TestClient
import platform_inbound_auth
import internal_chat_uploads
from internal_chat_uploads import ingest_handler, sanitize_filename
@pytest.fixture
def configs_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
platform_inbound_auth.reset_cache()
yield tmp_path
platform_inbound_auth.reset_cache()
@pytest.fixture
def chat_uploads_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
"""Redirect CHAT_UPLOAD_DIR to a writable tmp path.
The default /workspace/.molecule/chat-uploads requires real container
filesystem; under pytest we point it at a tmpdir so the tests
don't need root + container.
"""
target = tmp_path / "chat-uploads"
monkeypatch.setattr(internal_chat_uploads, "CHAT_UPLOAD_DIR", str(target))
return target
@pytest.fixture
def client(configs_dir: Path, chat_uploads_dir: Path) -> TestClient:
(configs_dir / ".platform_inbound_secret").write_text("test-secret")
app = Starlette(routes=[
Route("/internal/chat/uploads/ingest", ingest_handler, methods=["POST"]),
])
return TestClient(app)
# ───────────── sanitize_filename ─────────────
@pytest.mark.parametrize("raw,expected", [
("foo.txt", "foo.txt"),
("hello world.txt", "hello_world.txt"),
("../../../etc/passwd", "passwd"), # basename strips path; sanitize keeps the rest clean
("sneaky/../sneaky.png", "sneaky.png"),
("file with spaces & symbols!.png", "file_with_spaces___symbols_.png"),
("", "file"), # empty → safe default
(".", "file"),
("..", "file"),
("名前.txt", "__.txt"), # Python operates on codepoints (2 CJK chars → 2 underscores); Go operated on bytes
])
def test_sanitize_filename(raw: str, expected: str):
assert sanitize_filename(raw) == expected
def test_sanitize_filename_truncates_long_names():
long = "a" * 200 + ".txt"
out = sanitize_filename(long)
assert len(out) <= 100
assert out.endswith(".txt"), "extension preserved"
def test_sanitize_filename_drops_long_extension():
"""Extensions longer than 16 chars don't qualify as extensions; the
truncation just chops the tail."""
long = "a" * 110 + ".verylongextensionofdoom"
out = sanitize_filename(long)
assert len(out) == 100
assert "." not in out[-16:], "no false-extension preserved"
# ───────────── auth ─────────────
def test_unauthorized_no_bearer(client: TestClient):
r = client.post("/internal/chat/uploads/ingest", files={"files": ("a.txt", b"x")})
assert r.status_code == 401
assert r.json() == {"error": "unauthorized"}
def test_unauthorized_wrong_bearer(client: TestClient):
r = client.post(
"/internal/chat/uploads/ingest",
files={"files": ("a.txt", b"x")},
headers={"Authorization": "Bearer wrong"},
)
assert r.status_code == 401
def test_unauthorized_when_secret_file_missing(tmp_path: Path, chat_uploads_dir: Path, monkeypatch: pytest.MonkeyPatch):
"""Fail-closed: no secret file on disk → every request 401, even
with an "Authorization: Bearer" header."""
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
platform_inbound_auth.reset_cache()
app = Starlette(routes=[
Route("/internal/chat/uploads/ingest", ingest_handler, methods=["POST"]),
])
client = TestClient(app)
r = client.post(
"/internal/chat/uploads/ingest",
files={"files": ("a.txt", b"x")},
headers={"Authorization": "Bearer anything"},
)
assert r.status_code == 401
platform_inbound_auth.reset_cache()
# ───────────── happy paths ─────────────
def test_single_upload_writes_to_disk(client: TestClient, chat_uploads_dir: Path):
payload = b"hello world"
r = client.post(
"/internal/chat/uploads/ingest",
files={"files": ("greeting.txt", payload, "text/plain")},
headers={"Authorization": "Bearer test-secret"},
)
assert r.status_code == 200, r.text
body = r.json()
assert "files" in body and len(body["files"]) == 1
f = body["files"][0]
assert f["name"] == "greeting.txt"
assert f["mimeType"] == "text/plain"
assert f["size"] == len(payload)
# URI shape matches the Go handler's contract — canvas / agent code
# that already resolves "workspace:..." paths keeps working.
assert f["uri"].startswith("workspace:") and f["uri"].endswith("greeting.txt")
# On-disk content matches.
stored_path = f["uri"][len("workspace:"):]
# In the test, CHAT_UPLOAD_DIR was redirected to chat_uploads_dir,
# so stored_path's prefix is the redirected dir.
assert stored_path.startswith(str(chat_uploads_dir))
assert Path(stored_path).read_bytes() == payload
def test_multiple_uploads_in_one_batch(client: TestClient, chat_uploads_dir: Path):
files = [
("files", ("a.txt", b"AAA", "text/plain")),
("files", ("b.png", b"BBBBBB", "image/png")),
]
r = client.post(
"/internal/chat/uploads/ingest",
files=files,
headers={"Authorization": "Bearer test-secret"},
)
assert r.status_code == 200, r.text
items = r.json()["files"]
assert len(items) == 2
names = sorted(f["name"] for f in items)
assert names == ["a.txt", "b.png"]
sizes = sorted(f["size"] for f in items)
assert sizes == [3, 6]
def test_uploads_get_unique_random_prefix(client: TestClient, chat_uploads_dir: Path):
"""Two uploads with the same filename land at distinct paths."""
files = [
("files", ("dup.txt", b"first")),
("files", ("dup.txt", b"second")),
]
r = client.post(
"/internal/chat/uploads/ingest",
files=files,
headers={"Authorization": "Bearer test-secret"},
)
assert r.status_code == 200
items = r.json()["files"]
uri_a, uri_b = items[0]["uri"], items[1]["uri"]
assert uri_a != uri_b, "uniqueness via random prefix"
path_a = uri_a[len("workspace:"):]
path_b = uri_b[len("workspace:"):]
assert Path(path_a).read_bytes() == b"first"
assert Path(path_b).read_bytes() == b"second"
def test_mime_type_falls_back_to_extension_guess(client: TestClient):
"""When the part doesn't carry a Content-Type header, guess from the
extension. Matches the Go handler's precedence."""
r = client.post(
"/internal/chat/uploads/ingest",
files={"files": ("doc.pdf", b"%PDF-")},
headers={"Authorization": "Bearer test-secret"},
)
assert r.status_code == 200
f = r.json()["files"][0]
assert f["mimeType"].startswith("application/pdf"), f["mimeType"]
# ───────────── failure modes ─────────────
def test_no_files_field_returns_400(client: TestClient):
"""multipart with NO `files` part → 400, not 200 with empty list."""
r = client.post(
"/internal/chat/uploads/ingest",
data={"unrelated": "field"},
headers={"Authorization": "Bearer test-secret"},
)
assert r.status_code == 400
def test_per_file_oversize_returns_413(client: TestClient, monkeypatch: pytest.MonkeyPatch):
"""Per-file cap is enforced. Lower the cap for the test so we don't
have to construct a real 25 MB body."""
monkeypatch.setattr(internal_chat_uploads, "CHAT_UPLOAD_MAX_FILE_BYTES", 16)
big = b"x" * 32 # > 16
r = client.post(
"/internal/chat/uploads/ingest",
files={"files": ("big.bin", big)},
headers={"Authorization": "Bearer test-secret"},
)
assert r.status_code == 413
assert "exceeds per-file limit" in r.json()["error"]
def test_total_request_body_oversize_returns_413(client: TestClient, monkeypatch: pytest.MonkeyPatch):
"""Header-side total cap. Set the limit BELOW the actual body and
confirm we reject before parsing multipart."""
monkeypatch.setattr(internal_chat_uploads, "CHAT_UPLOAD_MAX_BYTES", 8)
r = client.post(
"/internal/chat/uploads/ingest",
files={"files": ("a.txt", b"this is much more than 8 bytes")},
headers={"Authorization": "Bearer test-secret"},
)
assert r.status_code == 413
def test_symlink_at_target_is_refused(client: TestClient, chat_uploads_dir: Path, monkeypatch: pytest.MonkeyPatch):
"""If a pre-existing symlink at the destination redirects writes to
a sensitive path, the upload MUST refuse rather than follow.
We force a deterministic prefix by patching pysecrets.token_hex so
we know exactly which path to plant the symlink at.
"""
chat_uploads_dir.mkdir(parents=True, exist_ok=True)
# Plant a symlink pointing at a "secret" location.
sentinel = chat_uploads_dir / "decoy-target"
sentinel.write_bytes(b"original")
monkeypatch.setattr(internal_chat_uploads.pysecrets, "token_hex", lambda n: "deadbeef" * (n // 4))
target_path = chat_uploads_dir / ("deadbeef" * 4 + "-evil.txt")
os.symlink(sentinel, target_path)
r = client.post(
"/internal/chat/uploads/ingest",
files={"files": ("evil.txt", b"PWNED")},
headers={"Authorization": "Bearer test-secret"},
)
assert r.status_code == 500, r.text
# Sentinel content unchanged — the symlink wasn't followed.
assert sentinel.read_bytes() == b"original"

View File

@ -0,0 +1,120 @@
"""Unit tests for platform_inbound_auth — the workspace-side auth gate
on /internal/* routes."""
from __future__ import annotations
import os
from pathlib import Path
import pytest
import platform_inbound_auth
from platform_inbound_auth import (
get_inbound_secret,
inbound_authorized,
reset_cache,
)
@pytest.fixture(autouse=True)
def _reset_cache_each_test():
"""get_inbound_secret caches the disk read on first call. Tests
that overwrite the file or change CONFIGS_DIR need a clean slate."""
reset_cache()
yield
reset_cache()
@pytest.fixture
def configs_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
return tmp_path
# ───────────── inbound_authorized — pure logic ─────────────
def test_authorized_happy_path():
assert inbound_authorized("the-secret", "Bearer the-secret") is True
def test_unauthorized_missing_expected():
"""A missing secret file (None) MUST fail closed — the #2308 lesson:
half-broken auth is worse than loud 503s."""
assert inbound_authorized(None, "Bearer the-secret") is False
def test_unauthorized_empty_expected():
assert inbound_authorized("", "Bearer the-secret") is False
def test_unauthorized_wrong_secret():
assert inbound_authorized("the-secret", "Bearer wrong-secret") is False
def test_unauthorized_missing_bearer_prefix():
"""Bearer prefix is case-sensitive — matches the platform's
wsauth.BearerTokenFromHeader contract."""
assert inbound_authorized("the-secret", "the-secret") is False
assert inbound_authorized("the-secret", "bearer the-secret") is False
def test_unauthorized_empty_header():
assert inbound_authorized("the-secret", "") is False
# ───────────── get_inbound_secret — disk read ─────────────
def test_get_secret_reads_from_file(configs_dir: Path):
(configs_dir / ".platform_inbound_secret").write_text("disk-secret")
assert get_inbound_secret() == "disk-secret"
def test_get_secret_strips_trailing_whitespace(configs_dir: Path):
"""Operator-edited secret files commonly have trailing newlines.
Strip on read so the constant-time compare doesn't reject."""
(configs_dir / ".platform_inbound_secret").write_text("disk-secret\n \n")
assert get_inbound_secret() == "disk-secret"
def test_get_secret_returns_none_when_missing(configs_dir: Path):
"""File not present → None. Callers MUST treat None as fail-closed
(mirrors transcript_auth.py:#328)."""
assert get_inbound_secret() is None
def test_get_secret_returns_none_when_empty(configs_dir: Path):
(configs_dir / ".platform_inbound_secret").write_text("")
assert get_inbound_secret() is None
def test_get_secret_returns_none_when_whitespace_only(configs_dir: Path):
(configs_dir / ".platform_inbound_secret").write_text(" \n ")
assert get_inbound_secret() is None
def test_get_secret_caches(configs_dir: Path):
"""Hot path: two reads should hit disk once. Verified by overwriting
the file after the first read and confirming the cached value persists."""
(configs_dir / ".platform_inbound_secret").write_text("first-value")
assert get_inbound_secret() == "first-value"
(configs_dir / ".platform_inbound_secret").write_text("second-value")
assert get_inbound_secret() == "first-value" # still cached
reset_cache()
assert get_inbound_secret() == "second-value"
def test_get_secret_default_dir_when_env_unset(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Default falls back to /configs. We can't write to /configs in the
test sandbox; instead verify the path computation hits the default."""
monkeypatch.delenv("CONFIGS_DIR", raising=False)
assert platform_inbound_auth._secret_file() == Path("/configs/.platform_inbound_secret")
# ───────────── end-to-end: file → authorized ─────────────
def test_end_to_end_file_to_authorized(configs_dir: Path):
"""The two halves wire up: reading the file produces the value the
request must present."""
(configs_dir / ".platform_inbound_secret").write_text("e2e-secret")
secret = get_inbound_secret()
assert inbound_authorized(secret, "Bearer e2e-secret") is True
assert inbound_authorized(secret, "Bearer not-this") is False