From f81813f7080ced1b525cd67b38193f8e16552ab6 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 04:38:41 -0700 Subject: [PATCH] =?UTF-8?q?feat(rfc):=20poll-mode=20chat=20upload=20?= =?UTF-8?q?=E2=80=94=20phase=202=20workspace=20inbox=20extension?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workspace-side fetcher for the platform-staged chat uploads written by phase 1. Stack atop feat/poll-mode-chat-upload-phase1. Wire shape — the platform writes one activity_logs row per uploaded file with `activity_type=a2a_receive`, `method=chat_upload_receive`, and a `request_body={file_id, name, mimeType, size, uri}` carrying the synthetic `platform-pending:/` URI. Workspace-side flow (new module workspace/inbox_uploads.py): 1. Fetch via GET /workspaces/:id/pending-uploads/:file_id/content 2. Stage to /workspace/.molecule/chat-uploads/<32-hex>- (same on-disk shape as internal_chat_uploads.py — agent-side URI resolvers see no contract change) 3. POST /workspaces/:id/pending-uploads/:file_id/ack 4. Cache `platform-pending: → workspace:` so the eventual chat message that REFERENCES the upload (separate, later activity row) gets URI-rewritten before the agent sees it. Inbox poller extension (workspace/inbox.py): - is_chat_upload_row(row) discriminator on `method` - upload-receive rows trigger fetch_and_stage and are NOT enqueued as InboxMessages (they're side-effect rows, not chat messages) - cursor advances past them regardless of fetch outcome — a permanent /content failure must not stall the cursor and block real chat traffic - message_from_activity calls rewrite_request_body to swap platform-pending: URIs to local workspace: URIs in subsequent chat messages' file parts. Cache miss leaves the URI untouched so the agent surfaces an unresolvable URI rather than the inbox silently dropping the part. Filename sanitization mirrors workspace-server/internal/handlers /chat_files.go::SanitizeFilename and workspace/internal_chat_uploads .py::sanitize_filename — pinned by the existing parity test suites. Coverage: 100% on inbox_uploads.py; the inbox.py extension is fully covered by three new tests in test_inbox.py (skip-from-queue, cursor-advance-past-broken-fetch, URI-rewrite ordering). --- workspace/inbox.py | 43 +- workspace/inbox_uploads.py | 475 ++++++++++++++++++ workspace/tests/test_inbox.py | 162 ++++++ workspace/tests/test_inbox_uploads.py | 697 ++++++++++++++++++++++++++ 4 files changed, 1376 insertions(+), 1 deletion(-) create mode 100644 workspace/inbox_uploads.py create mode 100644 workspace/tests/test_inbox_uploads.py diff --git a/workspace/inbox.py b/workspace/inbox.py index 94417243..6c7ea895 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -432,7 +432,17 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool: def message_from_activity(row: dict[str, Any]) -> InboxMessage: - """Convert one /activity row into an InboxMessage.""" + """Convert one /activity row into an InboxMessage. + + Mutates ``row['request_body']`` in-place to swap any + ``platform-pending:`` URIs to the locally-staged ``workspace:`` URIs + (see ``inbox_uploads.rewrite_request_body``) — by the time the + upstream chat message arrives via this path, the upload-receive row + that staged the bytes has already populated the URI cache (lower + activity_logs.id, processed earlier in the same poll batch). A + cache miss leaves the URI untouched; the agent surfaces an + unresolvable URI rather than the inbox silently dropping the part. + """ request_body = row.get("request_body") if isinstance(request_body, str): # The Go handler returns request_body as json.RawMessage; httpx @@ -443,6 +453,14 @@ def message_from_activity(row: dict[str, Any]) -> InboxMessage: except (TypeError, ValueError): request_body = None + # Rewrite platform-pending: URIs → workspace: URIs in-place. Imported + # at call time to keep the import graph clean for the in-container + # path that doesn't use this module (also avoids a circular: the + # uploads module is small enough that re-importing per call is + # cheap, and the Python import cache makes it free after the first). + from inbox_uploads import rewrite_request_body + rewrite_request_body(request_body) + return InboxMessage( activity_id=str(row.get("id", "")), text=_extract_text(request_body, row.get("summary")), @@ -532,11 +550,34 @@ def _poll_once( if cursor is None: rows = list(reversed(rows)) + # Imported lazily at use-site so a runtime that never sees an + # upload-receive row never imports the module. Cheap on the hot + # path because Python caches the import. + from inbox_uploads import is_chat_upload_row, fetch_and_stage + new_count = 0 last_id: str | None = None for row in rows: if not isinstance(row, dict): continue + if is_chat_upload_row(row): + # Side-effect row from the platform's poll-mode chat-upload + # handler — fetch the bytes, stage to /workspace/.molecule/ + # chat-uploads, ack. NOT enqueued as an InboxMessage; the + # agent will see the chat message that REFERENCES this + # upload via a separate (later) activity row, with the + # pending: URI rewritten to a workspace: URI by + # message_from_activity. We DO advance the cursor past + # this row so a permanent network outage on /content + # doesn't stall the cursor and block real chat traffic. + fetch_and_stage( + row, + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + ) + last_id = str(row.get("id", "")) or last_id + continue if _is_self_notify_row(row): # The workspace-server's `/notify` handler writes the agent's # own send_message_to_user POSTs to activity_logs with diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py new file mode 100644 index 00000000..798f18de --- /dev/null +++ b/workspace/inbox_uploads.py @@ -0,0 +1,475 @@ +"""Poll-mode chat-upload fetcher + URI cache for the standalone path. + +Companion to ``inbox.py``. When the workspace's inbox poller sees an +``activity_logs`` row with ``method='chat_upload_receive'`` (written by +the platform's ``uploadPollMode`` handler — workspace-server +``internal/handlers/chat_files.go``), this module: + + 1. Pulls the bytes from + ``GET /workspaces/:id/pending-uploads/:file_id/content``. + 2. Writes them to ``/workspace/.molecule/chat-uploads/-`` + — same on-disk shape as the push-mode handler in + ``internal_chat_uploads.py``, so anything downstream that already + resolves ``workspace:/workspace/.molecule/chat-uploads/...`` URIs + works unchanged. + 3. POSTs ``/workspaces/:id/pending-uploads/:file_id/ack`` so Phase 3 + sweep can clean up the platform-side ``pending_uploads`` row. + 4. Records a ``platform-pending:/ → + workspace:/workspace/.molecule/chat-uploads/...`` mapping in a + process-local cache so the chat message that arrives later + (referencing the platform-pending URI) gets rewritten before the + agent sees it. + +URI rewrite ordering — the chat message containing the +``platform-pending:`` URI is logged by the platform AFTER the +``chat_upload_receive`` row, so the inbox poller sees the upload-receive +row first (lower activity_logs.id) and stages the bytes before the chat +message arrives in the same poll batch (or a later one). The URI cache +is therefore populated before the message_from_activity path needs it. +A miss (network race, restart with stale cursor) is handled by keeping +the original ``platform-pending:`` URI in the rewritten body — the agent +will see something it can't open, which is preferable to silently +dropping the URI. + +Auth — same Bearer token the inbox poller uses (``platform_auth.auth_headers``). +Both endpoints are on the wsAuth-gated route, so this module can never +read another tenant's bytes even if a token is misrouted. +""" +from __future__ import annotations + +import logging +import mimetypes +import os +import re +import secrets as pysecrets +import threading +from collections import OrderedDict +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +# Same on-disk root as internal_chat_uploads.CHAT_UPLOAD_DIR — keeping +# these decoupled would let drift sneak in. Imported here rather than +# from internal_chat_uploads to avoid pulling in starlette as a +# transitive dep (this module runs in the standalone MCP path which +# doesn't ship the in-container HTTP server). +CHAT_UPLOAD_DIR = "/workspace/.molecule/chat-uploads" + +# Per-file safety net. The platform enforces 25 MB on the staging side, +# but a buggy or hostile platform response shouldn't be able to fill the +# workspace's disk — refuse to write more than this even if the response +# claims a larger Content-Length. +MAX_FILE_BYTES = 25 * 1024 * 1024 + +# Network deadline for the GET. Tuned for a 25 MB transfer over a +# reasonable consumer link (~5 Mbps gives ~40s for the full payload), +# plus headroom for TLS + platform auth. Aligned with inbox poller's +# 10s default for /activity calls — both are user-perceived latency. +DEFAULT_FETCH_TIMEOUT = 60.0 + +# Cap on the URI cache. A long-lived workspace handling thousands of +# uploads shouldn't grow without bound; an LRU cap of 1024 keeps the +# entries-needed-for-a-typical-conversation well within memory. +URI_CACHE_MAX_ENTRIES = 1024 + +# Same character class as internal_chat_uploads — kept duplicated rather +# than imported to avoid dragging starlette into the standalone path. +_UNSAFE_FILENAME_CHARS = re.compile(r"[^a-zA-Z0-9._\-]") + + +def sanitize_filename(name: str) -> str: + """Reduce a user-supplied filename to a safe form. + + Mirrors ``internal_chat_uploads.sanitize_filename`` and the Go + handler's ``SanitizeFilename`` — three-way parity is pinned by + ``workspace-server/internal/handlers/sanitize_filename_test.go`` and + ``workspace/tests/test_internal_chat_uploads.py`` so the URI shape + is identical regardless of which path handles the upload. + """ + base = os.path.basename(name) + base = base.replace(" ", "_") + base = _UNSAFE_FILENAME_CHARS.sub("_", base) + if len(base) > 100: + ext = "" + dot = base.rfind(".") + if dot >= 0 and len(base) - dot <= 16: + ext = base[dot:] + base = base[: 100 - len(ext)] + ext + if base in ("", ".", ".."): + return "file" + return base + + +# --------------------------------------------------------------------------- +# URI cache — maps platform-pending URIs to local workspace: URIs +# --------------------------------------------------------------------------- + + +class _URICache: + """Thread-safe bounded LRU mapping of platform-pending → workspace URIs. + + Bounded so a workspace that runs for months and handles thousands of + uploads doesn't accumulate entries forever. ``OrderedDict.move_to_end`` + promotes recently-used entries; eviction takes the oldest. + + The cache is intentionally per-process — there is no persistence + across a workspace restart. A restart with a stale inbox cursor that + re-poll an upload-receive row will re-fetch (the bytes are already + on disk from the prior session — see ``stage_to_disk``'s O_EXCL + handling) and re-register; a chat message that referenced the + platform-pending URI BEFORE the restart and arrives AFTER would miss + the rewrite and surface the platform-pending URI to the agent. That + is preferable to a stale persisted mapping that points at a deleted + file. + """ + + def __init__(self, max_entries: int = URI_CACHE_MAX_ENTRIES): + self._max = max_entries + self._lock = threading.Lock() + self._entries: "OrderedDict[str, str]" = OrderedDict() + + def get(self, pending_uri: str) -> str | None: + with self._lock: + local = self._entries.get(pending_uri) + if local is not None: + self._entries.move_to_end(pending_uri) + return local + + def set(self, pending_uri: str, local_uri: str) -> None: + with self._lock: + self._entries[pending_uri] = local_uri + self._entries.move_to_end(pending_uri) + while len(self._entries) > self._max: + self._entries.popitem(last=False) + + def __len__(self) -> int: + with self._lock: + return len(self._entries) + + def clear(self) -> None: + with self._lock: + self._entries.clear() + + +_cache = _URICache() + + +def get_cache() -> _URICache: + """Expose the module-singleton cache for tests and the rewrite path.""" + return _cache + + +def resolve_pending_uri(uri: str) -> str | None: + """Return the local ``workspace:`` URI for a ``platform-pending:`` URI, + or None if not yet staged. Convenience for callers that want to + fall back to an on-demand fetch — pass the result through to + ``executor_helpers.resolve_attachment_uri``. + """ + return _cache.get(uri) + + +# --------------------------------------------------------------------------- +# On-disk staging +# --------------------------------------------------------------------------- + + +def _open_safe(path: str) -> int: + """Open ``path`` for write with ``O_CREAT|O_EXCL|O_NOFOLLOW``. + + Same shape as ``internal_chat_uploads._open_safe`` — refuses to + follow a pre-existing symlink at the target and refuses to overwrite + an existing regular file. The 16-byte random prefix makes a name + collision astronomical, but defense-in-depth costs nothing. + """ + flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL + if hasattr(os, "O_NOFOLLOW"): + flags |= os.O_NOFOLLOW + return os.open(path, flags, 0o600) + + +def stage_to_disk(content: bytes, filename: str) -> str: + """Write ``content`` under ``CHAT_UPLOAD_DIR`` and return the local URI. + + Returns ``workspace:/workspace/.molecule/chat-uploads/-``. + The 32-hex prefix makes the on-disk name unguessable to anything + that didn't see the response, so even if a stale agent has a guess + at the original filename it can't construct a URL to a sibling's + upload. + + Raises: + OSError: write failure (mkdir, open, or write). Caller is + expected to log + skip; the activity row stays unacked so a + future poll re-tries. + ValueError: ``content`` exceeds ``MAX_FILE_BYTES``. Pre-staging + guard belt-and-braces above the platform's same-side cap. + """ + if len(content) > MAX_FILE_BYTES: + raise ValueError( + f"content size {len(content)} exceeds workspace cap {MAX_FILE_BYTES}" + ) + + Path(CHAT_UPLOAD_DIR).mkdir(parents=True, exist_ok=True) + + sanitized = sanitize_filename(filename) + prefix = pysecrets.token_hex(16) + stored = f"{prefix}-{sanitized}" + target = os.path.join(CHAT_UPLOAD_DIR, stored) + + fd = _open_safe(target) + try: + with os.fdopen(fd, "wb") as f: + f.write(content) + except OSError: + # Best-effort cleanup — partial writes leave a stub file that + # would mask a future retry's success otherwise. + try: + os.unlink(target) + except OSError: + pass + raise + + return f"workspace:{CHAT_UPLOAD_DIR}/{stored}" + + +# --------------------------------------------------------------------------- +# Activity row → fetch/stage/ack flow +# --------------------------------------------------------------------------- + + +def _request_body_dict(row: dict[str, Any]) -> dict[str, Any] | None: + """Coerce ``row['request_body']`` into a dict. + + The /activity API returns request_body as JSON (already-deserialized + by httpx). Some legacy paths or mocked transports may emit a string; + handle defensively rather than raising. + """ + body = row.get("request_body") + if isinstance(body, dict): + return body + if isinstance(body, str): + import json + try: + decoded = json.loads(body) + except (TypeError, ValueError): + return None + return decoded if isinstance(decoded, dict) else None + return None + + +def is_chat_upload_row(row: dict[str, Any]) -> bool: + """True if ``row`` is the platform's chat-upload-receive activity. + + Used by the inbox poller to fork the row off the regular A2A + message handling path — this row is not a peer message; it's an + instruction to fetch + stage bytes. Match on ``method`` only; + ``activity_type`` is already filtered to ``a2a_receive`` upstream. + """ + return row.get("method") == "chat_upload_receive" + + +def fetch_and_stage( + row: dict[str, Any], + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + timeout_secs: float = DEFAULT_FETCH_TIMEOUT, +) -> str | None: + """Fetch the row's bytes, stage them under chat-uploads, and ack. + + Returns the local ``workspace:`` URI on success, or ``None`` if any + step failed (logged with enough detail to triage). Failure leaves + the platform-side row unacked, so a subsequent poll retries — the + activity row stays in the cursor's window because we DO advance the + cursor (the row is "handled" from the inbox's perspective even on + fetch failure; otherwise a permanent network outage would stall the + cursor and block real chat traffic). + + On success, the URI cache is updated so a subsequent chat message + referencing the same ``platform-pending:`` URI is rewritten before + the agent sees it. + """ + body = _request_body_dict(row) + if body is None: + logger.warning( + "inbox_uploads: row %s missing request_body; cannot fetch", + row.get("id"), + ) + return None + + file_id = body.get("file_id") + if not isinstance(file_id, str) or not file_id: + logger.warning( + "inbox_uploads: row %s has no file_id in request_body", + row.get("id"), + ) + return None + + pending_uri = body.get("uri") + if not isinstance(pending_uri, str) or not pending_uri: + # Reconstruct what the platform would have written — defensive + # against a row whose uri field got truncated. Same shape as the + # Go handler's URI builder. + pending_uri = f"platform-pending:{workspace_id}/{file_id}" + + filename = body.get("name") or "file" + if not isinstance(filename, str): + filename = "file" + + # Lazy httpx import: the standalone MCP path uses httpx; an in- + # container caller that imports this module by accident shouldn't + # explode at import time. + try: + import httpx # noqa: WPS433 + except ImportError: + logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) + return None + + content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content" + ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack" + + try: + with httpx.Client(timeout=timeout_secs) as client: + resp = client.get(content_url, headers=headers) + except Exception as exc: # noqa: BLE001 + logger.warning( + "inbox_uploads: GET %s failed: %s", content_url, exc + ) + return None + + if resp.status_code == 404: + # Row was swept or already acked by a previous poll race — nothing + # to fetch. Don't ack again; the platform's GC handles it. This is + # a soft-skip, not an error — log at INFO so triage isn't noisy. + logger.info( + "inbox_uploads: pending upload %s already gone (404); skipping", + file_id, + ) + return None + if resp.status_code >= 400: + logger.warning( + "inbox_uploads: GET %s returned %d: %s", + content_url, + resp.status_code, + (resp.text or "")[:200], + ) + return None + + content = resp.content or b"" + if len(content) > MAX_FILE_BYTES: + logger.warning( + "inbox_uploads: refusing to stage %s — size %d exceeds cap %d", + file_id, + len(content), + MAX_FILE_BYTES, + ) + return None + + # Mimetype precedence: platform's Content-Type header → request_body + # mimeType field → extension guess. Same precedence as the in- + # container ingest handler. + mime_header = resp.headers.get("content-type", "").split(";")[0].strip() + mime = ( + mime_header + or (body.get("mimeType") if isinstance(body.get("mimeType"), str) else "") + or (mimetypes.guess_type(filename)[0] or "") + ) + + try: + local_uri = stage_to_disk(content, filename) + except (OSError, ValueError) as exc: + logger.error( + "inbox_uploads: failed to stage %s (%s) to disk: %s", + file_id, + filename, + exc, + ) + return None + + _cache.set(pending_uri, local_uri) + logger.info( + "inbox_uploads: staged file_id=%s name=%s size=%d mime=%s pending_uri=%s local_uri=%s", + file_id, + filename, + len(content), + mime, + pending_uri, + local_uri, + ) + + # Ack last so a write failure above leaves the row available for a + # retry on the next poll. A failed ack is logged but doesn't roll + # back the on-disk file — the platform's sweep will clean up + # eventually. + try: + with httpx.Client(timeout=timeout_secs) as client: + ack_resp = client.post(ack_url, headers=headers) + if ack_resp.status_code >= 400: + logger.warning( + "inbox_uploads: ack %s returned %d: %s", + ack_url, + ack_resp.status_code, + (ack_resp.text or "")[:200], + ) + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: POST %s failed: %s", ack_url, exc) + + return local_uri + + +# --------------------------------------------------------------------------- +# URI rewrite for incoming chat messages +# --------------------------------------------------------------------------- +# +# The chat message that references a staged upload arrives as a +# SEPARATE activity_log row, with parts of kind=file containing +# platform-pending: URIs in the file.uri field. Walk the structure +# in-place and rewrite to the local workspace: URI when the cache has it. +# Unknown URIs pass through unchanged — the agent gets to choose how +# to react (most runtimes log + ignore an unresolvable URI). + + +def _rewrite_part(part: Any) -> None: + """Mutate a single A2A Part dict to swap platform-pending: URIs.""" + if not isinstance(part, dict): + return + file_obj = part.get("file") + if not isinstance(file_obj, dict): + return + uri = file_obj.get("uri") + if not isinstance(uri, str) or not uri.startswith("platform-pending:"): + return + rewritten = _cache.get(uri) + if rewritten: + file_obj["uri"] = rewritten + + +def rewrite_request_body(body: Any) -> None: + """Mutate ``body`` in-place, replacing platform-pending: URIs with + the cached local equivalents. + + Walks the same shapes ``inbox._extract_text`` accepts: + + - ``body['parts']`` + - ``body['params']['parts']`` + - ``body['params']['message']['parts']`` + + No-op for shapes that don't match — the message simply passes + through to the agent as-is. + """ + if not isinstance(body, dict): + return + candidates: list[Any] = [] + params = body.get("params") if isinstance(body.get("params"), dict) else None + if params: + message = params.get("message") if isinstance(params.get("message"), dict) else None + if message: + candidates.append(message.get("parts")) + candidates.append(params.get("parts")) + candidates.append(body.get("parts")) + + for parts in candidates: + if isinstance(parts, list): + for part in parts: + _rewrite_part(part) diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 6731701a..162c32c2 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -701,3 +701,165 @@ def test_set_notification_callback_none_clears(state: inbox.InboxState): state.record(_msg("act-1")) assert received == [] + + +# --------------------------------------------------------------------------- +# Phase 2 — chat_upload_receive rows route to inbox_uploads.fetch_and_stage +# --------------------------------------------------------------------------- + + +def test_poll_once_skips_chat_upload_row_from_queue(state: inbox.InboxState, monkeypatch, tmp_path): + """A row with method='chat_upload_receive' must NOT enqueue as a + chat message — it's a side-effect telling the workspace to fetch + bytes. Pin the contract so a refactor that flattens the row loop + can't silently re-enqueue these as 'empty A2A message' rows.""" + import inbox_uploads + monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads")) + inbox_uploads.get_cache().clear() + + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "chat_upload_receive", + "summary": "chat_upload_receive: foo.pdf", + "request_body": { + "file_id": "abc123", + "name": "foo.pdf", + "mimeType": "application/pdf", + "size": 4, + "uri": "platform-pending:ws-1/abc123", + }, + "created_at": "2026-05-04T10:00:00Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + fetch_called = [] + + def fake_fetch(row, **kwargs): + fetch_called.append((row.get("id"), kwargs["workspace_id"])) + return "workspace:/local/foo.pdf" + + with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + # Not enqueued + cursor advanced. + assert n == 0 + assert state.peek(10) == [] + assert state.load_cursor() == "act-1" + # fetch_and_stage was invoked with the row and workspace_id. + assert fetch_called == [("act-1", "ws-1")] + + +def test_poll_once_chat_upload_row_then_chat_message_rewrites_uri(state: inbox.InboxState, monkeypatch, tmp_path): + """The classic ordering: upload-receive row first (lower id), chat + message referencing platform-pending: URI second. The chat message + that lands in the inbox must have its URI rewritten to the local + workspace: URI before the agent sees it. + """ + import inbox_uploads + monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads")) + cache = inbox_uploads.get_cache() + cache.clear() + + # Pretend the fetch already populated the cache. (The real flow + # populates it inside fetch_and_stage; we patch that to keep the + # test focused on the rewrite contract.) + cache.set("platform-pending:ws-1/abc123", "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf") + + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "chat_upload_receive", + "summary": "chat_upload_receive: foo.pdf", + "request_body": { + "file_id": "abc123", + "name": "foo.pdf", + "mimeType": "application/pdf", + "size": 4, + "uri": "platform-pending:ws-1/abc123", + }, + "created_at": "2026-05-04T10:00:00Z", + }, + { + "id": "act-2", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": { + "params": { + "message": { + "parts": [ + {"kind": "text", "text": "look at this"}, + { + "kind": "file", + "file": { + "uri": "platform-pending:ws-1/abc123", + "name": "foo.pdf", + }, + }, + ] + } + } + }, + "created_at": "2026-05-04T10:00:01Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + + def fake_fetch(row, **kwargs): + return "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf" + + with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + # Only the chat message is enqueued. + assert n == 1 + queue = state.peek(10) + assert len(queue) == 1 + msg = queue[0] + assert msg.activity_id == "act-2" + # The URI in the row's request_body was mutated by message_from_activity + # → rewrite_request_body. Re-extracting reveals the rewritten value. + rewritten = rows[1]["request_body"]["params"]["message"]["parts"][1]["file"]["uri"] + assert rewritten == "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf" + + +def test_poll_once_chat_upload_row_advances_cursor_even_on_fetch_failure( + state: inbox.InboxState, monkeypatch, tmp_path +): + """A permanent network failure on /content must NOT stall the cursor + — otherwise one bad upload blocks all real chat traffic for the + workspace. fetch_and_stage returns None on failure, but the row is + still considered handled from the cursor's perspective.""" + import inbox_uploads + monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads")) + + rows = [ + { + "id": "act-broken", + "source_id": None, + "method": "chat_upload_receive", + "summary": "chat_upload_receive: doomed.pdf", + "request_body": { + "file_id": "doom", + "name": "doomed.pdf", + "uri": "platform-pending:ws-1/doom", + }, + "created_at": "2026-05-04T10:00:00Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + + def fake_fetch(row, **kwargs): + return None # network failure + + with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch): + inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert state.peek(10) == [] + assert state.load_cursor() == "act-broken" diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py new file mode 100644 index 00000000..515616e2 --- /dev/null +++ b/workspace/tests/test_inbox_uploads.py @@ -0,0 +1,697 @@ +"""Tests for workspace/inbox_uploads.py — poll-mode chat-upload fetcher. + +Covers the full activity-row → fetch → stage-on-disk → ack flow plus +the URI cache and the rewrite that swaps platform-pending: URIs to +local workspace: URIs in subsequent chat messages. +""" +from __future__ import annotations + +import os +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +import inbox_uploads + + +@pytest.fixture(autouse=True) +def _reset_cache_and_dir(tmp_path, monkeypatch): + """Each test starts with an empty URI cache and a temp upload dir + so on-disk artifacts from one test don't leak into the next.""" + inbox_uploads.get_cache().clear() + monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads")) + yield + inbox_uploads.get_cache().clear() + + +# --------------------------------------------------------------------------- +# sanitize_filename — parity with internal_chat_uploads + Go SanitizeFilename +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "raw,want", + [ + ("../../etc/passwd", "passwd"), + ("/etc/passwd", "passwd"), + ("hello world.pdf", "hello_world.pdf"), + ("weird;chars!?.txt", "weird_chars__.txt"), + ("中文.docx", "__.docx"), + ("file (1).pdf", "file__1_.pdf"), + ("report-2026.05.04_v2.pdf", "report-2026.05.04_v2.pdf"), + ("", "file"), + (".", "file"), + ("..", "file"), + ], +) +def test_sanitize_filename_parity_with_python_internal(raw, want): + assert inbox_uploads.sanitize_filename(raw) == want + + +def test_sanitize_filename_caps_at_100_preserves_short_extension(): + long = "a" * 200 + ".pdf" + got = inbox_uploads.sanitize_filename(long) + assert len(got) == 100 + assert got.endswith(".pdf") + + +def test_sanitize_filename_drops_long_extension(): + long = "c" * 90 + ".thisisaverylongextensionnotpreserved" + got = inbox_uploads.sanitize_filename(long) + assert len(got) == 100 + assert ".thisisaverylongextensionnotpreserved" not in got + + +# --------------------------------------------------------------------------- +# _URICache — LRU semantics +# --------------------------------------------------------------------------- + + +def test_uricache_set_get_roundtrip(): + c = inbox_uploads._URICache(max_entries=10) + c.set("platform-pending:ws/1", "workspace:/local/1") + assert c.get("platform-pending:ws/1") == "workspace:/local/1" + + +def test_uricache_get_missing_returns_none(): + c = inbox_uploads._URICache(max_entries=10) + assert c.get("platform-pending:ws/missing") is None + + +def test_uricache_evicts_oldest_at_capacity(): + c = inbox_uploads._URICache(max_entries=2) + c.set("a", "A") + c.set("b", "B") + c.set("c", "C") # evicts "a" + assert c.get("a") is None + assert c.get("b") == "B" + assert c.get("c") == "C" + assert len(c) == 2 + + +def test_uricache_get_promotes_recently_used(): + c = inbox_uploads._URICache(max_entries=2) + c.set("a", "A") + c.set("b", "B") + # Promote "a" by reading; next set should evict "b" instead of "a". + assert c.get("a") == "A" + c.set("c", "C") + assert c.get("a") == "A" + assert c.get("b") is None + assert c.get("c") == "C" + + +def test_uricache_overwrite_updates_value(): + c = inbox_uploads._URICache(max_entries=10) + c.set("k", "v1") + c.set("k", "v2") + assert c.get("k") == "v2" + assert len(c) == 1 + + +def test_uricache_clear(): + c = inbox_uploads._URICache(max_entries=10) + c.set("a", "A") + c.set("b", "B") + c.clear() + assert c.get("a") is None + assert len(c) == 0 + + +def test_resolve_pending_uri_uses_module_cache(): + inbox_uploads.get_cache().set("platform-pending:ws/x", "workspace:/local/x") + assert inbox_uploads.resolve_pending_uri("platform-pending:ws/x") == "workspace:/local/x" + assert inbox_uploads.resolve_pending_uri("platform-pending:ws/missing") is None + + +# --------------------------------------------------------------------------- +# stage_to_disk +# --------------------------------------------------------------------------- + + +def test_stage_to_disk_writes_file_and_returns_workspace_uri(tmp_path): + uri = inbox_uploads.stage_to_disk(b"hello", "report.pdf") + assert uri.startswith("workspace:") + path = uri[len("workspace:"):] + assert os.path.isfile(path) + with open(path, "rb") as f: + assert f.read() == b"hello" + assert path.endswith("-report.pdf") + # Prefix is 32 hex chars + "-" + name. + name = os.path.basename(path) + prefix, _, _ = name.partition("-") + assert len(prefix) == 32 + + +def test_stage_to_disk_sanitizes_filename(): + uri = inbox_uploads.stage_to_disk(b"x", "../../evil.txt") + name = os.path.basename(uri) + assert "/" not in name + assert name.endswith("-evil.txt") + + +def test_stage_to_disk_rejects_oversize(): + with pytest.raises(ValueError): + inbox_uploads.stage_to_disk(b"x" * (inbox_uploads.MAX_FILE_BYTES + 1), "big.bin") + + +def test_stage_to_disk_creates_directory_if_missing(): + # CHAT_UPLOAD_DIR is monkeypatched to a non-existent tmp path; the + # call must mkdir -p it on first write. + assert not os.path.exists(inbox_uploads.CHAT_UPLOAD_DIR) + inbox_uploads.stage_to_disk(b"x", "a.txt") + assert os.path.isdir(inbox_uploads.CHAT_UPLOAD_DIR) + + +def test_stage_to_disk_write_failure_cleans_partial_file(tmp_path, monkeypatch): + # open() succeeds but write() fails — the partial file must be + # removed so a retry can claim a fresh prefix without colliding. + real_fdopen = os.fdopen + written_paths: list[str] = [] + + def boom_fdopen(fd, mode): + # Wrap the real file with one whose write() raises. + f = real_fdopen(fd, mode) + # Track which path's fd we opened by inspecting the chat-upload dir. + for entry in os.listdir(inbox_uploads.CHAT_UPLOAD_DIR): + written_paths.append(os.path.join(inbox_uploads.CHAT_UPLOAD_DIR, entry)) + original_write = f.write + + def bad_write(b): + original_write(b"") # ensure file exists + raise OSError(28, "no space") + f.write = bad_write + return f + + monkeypatch.setattr(os, "fdopen", boom_fdopen) + with pytest.raises(OSError): + inbox_uploads.stage_to_disk(b"data", "x.txt") + # All staged files cleaned up. + for p in written_paths: + assert not os.path.exists(p) + + +def test_stage_to_disk_write_failure_unlink_failure_swallowed(monkeypatch): + # open() succeeds, write() fails, unlink() ALSO fails — the unlink + # error is swallowed and the original write error propagates. + real_fdopen = os.fdopen + + def boom_fdopen(fd, mode): + f = real_fdopen(fd, mode) + + def bad_write(_): + raise OSError(28, "no space") + f.write = bad_write + return f + + def bad_unlink(_): + raise OSError(13, "permission denied") + + monkeypatch.setattr(os, "fdopen", boom_fdopen) + monkeypatch.setattr(os, "unlink", bad_unlink) + with pytest.raises(OSError) as ei: + inbox_uploads.stage_to_disk(b"data", "x.txt") + # Original write error, not the unlink error. + assert ei.value.errno == 28 + + +def test_stage_to_disk_propagates_oserror_and_cleans_partial(tmp_path, monkeypatch): + # Make the dir read-only AFTER mkdir succeeds, so open() fails. Skip + # this on platforms where the dir's permissions don't restrict the + # process owner (root in Docker, etc.). + inbox_uploads.stage_to_disk(b"first", "a.txt") + if os.geteuid() == 0: + pytest.skip("root bypasses permission bits") + os.chmod(inbox_uploads.CHAT_UPLOAD_DIR, 0o500) + try: + with pytest.raises(OSError): + inbox_uploads.stage_to_disk(b"second", "b.txt") + finally: + os.chmod(inbox_uploads.CHAT_UPLOAD_DIR, 0o755) + + +# --------------------------------------------------------------------------- +# is_chat_upload_row + _request_body_dict +# --------------------------------------------------------------------------- + + +def test_is_chat_upload_row_true_on_method_match(): + assert inbox_uploads.is_chat_upload_row({"method": "chat_upload_receive"}) + + +def test_is_chat_upload_row_false_on_other_methods(): + assert not inbox_uploads.is_chat_upload_row({"method": "message/send"}) + assert not inbox_uploads.is_chat_upload_row({"method": None}) + assert not inbox_uploads.is_chat_upload_row({}) + + +def test_request_body_dict_passthrough(): + body = {"file_id": "x"} + assert inbox_uploads._request_body_dict({"request_body": body}) is body + + +def test_request_body_dict_string_decoded(): + assert inbox_uploads._request_body_dict({"request_body": '{"a": 1}'}) == {"a": 1} + + +def test_request_body_dict_invalid_string_returns_none(): + assert inbox_uploads._request_body_dict({"request_body": "not json"}) is None + + +def test_request_body_dict_non_dict_after_decode_returns_none(): + assert inbox_uploads._request_body_dict({"request_body": "[1, 2]"}) is None + + +def test_request_body_dict_other_type_returns_none(): + assert inbox_uploads._request_body_dict({"request_body": 123}) is None + + +# --------------------------------------------------------------------------- +# fetch_and_stage — the full GET / write / ack flow +# --------------------------------------------------------------------------- + + +def _make_resp(status_code: int, content: bytes = b"", content_type: str = "", text: str = "") -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.content = content + headers: dict[str, str] = {} + if content_type: + headers["content-type"] = content_type + resp.headers = headers + resp.text = text + return resp + + +def _patch_httpx_for_fetch(get_resp: MagicMock, ack_resp: MagicMock | None = None): + """Patch httpx.Client so each new context-manager returns a client + whose .get() returns get_resp and .post() returns ack_resp. + """ + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(return_value=get_resp) + client.post = MagicMock(return_value=ack_resp or _make_resp(200)) + return patch("httpx.Client", return_value=client), client + + +def _row(file_id: str = "file-1", uri: str | None = None, name: str = "report.pdf", body_extra: dict | None = None) -> dict: + body: dict[str, Any] = { + "file_id": file_id, + "name": name, + "mimeType": "application/pdf", + "size": 9, + } + if uri is not None: + body["uri"] = uri + if body_extra: + body.update(body_extra) + return { + "id": "act-100", + "source_id": None, + "method": "chat_upload_receive", + "summary": "chat_upload_receive: report.pdf", + "request_body": body, + "created_at": "2026-05-04T10:00:00Z", + } + + +def test_fetch_and_stage_happy_path_writes_file_acks_and_caches(): + pending_uri = "platform-pending:ws-1/file-1" + row = _row(uri=pending_uri) + get_resp = _make_resp(200, content=b"PDF-bytes", content_type="application/pdf") + p, client = _patch_httpx_for_fetch(get_resp) + with p: + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={"Authorization": "Bearer t"} + ) + assert local_uri is not None + assert local_uri.startswith("workspace:") + # On-disk file content matches. + path = local_uri[len("workspace:"):] + with open(path, "rb") as f: + assert f.read() == b"PDF-bytes" + # Cache populated. + assert inbox_uploads.get_cache().get(pending_uri) == local_uri + # Ack POSTed to the right URL. + client.post.assert_called_once() + args, kwargs = client.post.call_args + assert "/pending-uploads/file-1/ack" in args[0] + assert kwargs["headers"]["Authorization"] == "Bearer t" + + +def test_fetch_and_stage_reconstructs_uri_when_missing_in_body(): + row = _row(uri=None) # request_body has no 'uri' + get_resp = _make_resp(200, content=b"x", content_type="text/plain") + p, _ = _patch_httpx_for_fetch(get_resp) + with p: + inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + # Cache key reconstructed from workspace_id + file_id. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-1") is not None + + +def test_fetch_and_stage_returns_none_on_missing_request_body(): + row = {"id": "act-100", "method": "chat_upload_receive"} + # No httpx call should happen, but we patch defensively. + p, client = _patch_httpx_for_fetch(_make_resp(200)) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.get.assert_not_called() + + +def test_fetch_and_stage_returns_none_on_missing_file_id(): + row = {"id": "act-100", "method": "chat_upload_receive", "request_body": {"name": "x.pdf"}} + p, client = _patch_httpx_for_fetch(_make_resp(200)) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.get.assert_not_called() + + +def test_fetch_and_stage_handles_nonstring_file_id(): + row = {"id": "act-100", "method": "chat_upload_receive", "request_body": {"file_id": 123}} + p, client = _patch_httpx_for_fetch(_make_resp(200)) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.get.assert_not_called() + + +def test_fetch_and_stage_404_returns_none_no_ack(): + row = _row() + get_resp = _make_resp(404, text="gone") + ack_resp = _make_resp(200) + p, client = _patch_httpx_for_fetch(get_resp, ack_resp) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + # No ack — the row is already gone. + client.post.assert_not_called() + + +def test_fetch_and_stage_500_returns_none_no_ack(): + row = _row() + p, client = _patch_httpx_for_fetch(_make_resp(500, text="boom")) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.post.assert_not_called() + + +def test_fetch_and_stage_network_error_returns_none(): + row = _row() + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(side_effect=RuntimeError("connection refused")) + with patch("httpx.Client", return_value=client): + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + + +def test_fetch_and_stage_oversize_response_refused(): + row = _row() + big = b"x" * (inbox_uploads.MAX_FILE_BYTES + 1) + p, client = _patch_httpx_for_fetch(_make_resp(200, content=big, content_type="application/octet-stream")) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.post.assert_not_called() + + +def test_fetch_and_stage_ack_failure_does_not_invalidate_local_uri(): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"data", content_type="text/plain") + ack_resp = _make_resp(500, text="ack failed") + p, _ = _patch_httpx_for_fetch(get_resp, ack_resp) + with p: + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + # On-disk staging succeeded; ack failure is logged but doesn't + # roll back the cache. + assert local_uri is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-1") == local_uri + + +def test_fetch_and_stage_ack_network_error_swallowed(): + row = _row(uri="platform-pending:ws-1/file-1") + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(return_value=_make_resp(200, content=b"data", content_type="text/plain")) + client.post = MagicMock(side_effect=RuntimeError("ack network error")) + with patch("httpx.Client", return_value=client): + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is not None # GET succeeded → URI returned even if ack blew up + + +def test_fetch_and_stage_uses_response_content_type_when_present(): + row = _row(name="thing.bin", body_extra={"mimeType": "application/x-bogus"}) + # Response says image/png; should win over body's mimeType. + get_resp = _make_resp(200, content=b"PNG", content_type="image/png; charset=binary") + p, _ = _patch_httpx_for_fetch(get_resp) + with p: + # We don't assert on returned mime (not part of the contract); + # the test just verifies the happy path runs without trying to + # parse the trailing parameter. + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is not None + + +def test_fetch_and_stage_nonstring_filename_falls_back_to_file(): + # body['name'] is a non-string (e.g. truncated to None or a number); + # filename must default to "file" so sanitize_filename has something + # to work with. + row = _row(body_extra={"name": 12345}) + p, _ = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain")) + with p: + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert local_uri is not None + assert local_uri.endswith("-file") + + +def test_fetch_and_stage_default_filename_when_missing(): + row = { + "id": "act", + "method": "chat_upload_receive", + "request_body": {"file_id": "file-1"}, + } + p, _ = _patch_httpx_for_fetch(_make_resp(200, content=b"data", content_type="text/plain")) + with p: + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert local_uri is not None + assert local_uri.endswith("-file") # default filename + + +def test_fetch_and_stage_disk_write_failure_returns_none(monkeypatch): + row = _row() + p, client = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain")) + + def bad_stage(*args, **kwargs): + raise OSError(28, "no space left") + monkeypatch.setattr(inbox_uploads, "stage_to_disk", bad_stage) + + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.post.assert_not_called() + + +def test_fetch_and_stage_disk_value_error_returns_none(monkeypatch): + row = _row() + p, client = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain")) + + def bad_stage(*args, **kwargs): + raise ValueError("oversize after sanity check") + monkeypatch.setattr(inbox_uploads, "stage_to_disk", bad_stage) + + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.post.assert_not_called() + + +def test_fetch_and_stage_httpx_missing_returns_none(monkeypatch): + row = _row() + # Simulate httpx not installed by making the import fail. + import sys + real_httpx = sys.modules.pop("httpx", None) + monkeypatch.setitem(sys.modules, "httpx", None) + try: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + finally: + if real_httpx is not None: + sys.modules["httpx"] = real_httpx + else: + sys.modules.pop("httpx", None) + assert result is None + + +def test_fetch_and_stage_falls_back_to_extension_mime(monkeypatch): + row = _row(name="snap.png", body_extra={"mimeType": ""}) # no mimeType in body + # Response also has no content-type so it falls through to mimetypes.guess_type. + get_resp = _make_resp(200, content=b"PNG", content_type="") + p, _ = _patch_httpx_for_fetch(get_resp) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is not None + + +# --------------------------------------------------------------------------- +# rewrite_request_body — URI swap in chat-message bodies +# --------------------------------------------------------------------------- + + +def test_rewrite_request_body_swaps_pending_uri_in_message_parts(): + inbox_uploads.get_cache().set("platform-pending:ws/1", "workspace:/local/1") + body = { + "method": "message/send", + "params": { + "message": { + "parts": [ + {"kind": "text", "text": "see this"}, + {"kind": "file", "file": {"uri": "platform-pending:ws/1", "name": "a.pdf"}}, + ] + } + }, + } + inbox_uploads.rewrite_request_body(body) + assert body["params"]["message"]["parts"][1]["file"]["uri"] == "workspace:/local/1" + + +def test_rewrite_request_body_swaps_in_params_parts(): + inbox_uploads.get_cache().set("platform-pending:ws/2", "workspace:/local/2") + body = { + "params": { + "parts": [ + {"kind": "file", "file": {"uri": "platform-pending:ws/2"}}, + ] + } + } + inbox_uploads.rewrite_request_body(body) + assert body["params"]["parts"][0]["file"]["uri"] == "workspace:/local/2" + + +def test_rewrite_request_body_swaps_in_top_level_parts(): + inbox_uploads.get_cache().set("platform-pending:ws/3", "workspace:/local/3") + body = { + "parts": [{"kind": "file", "file": {"uri": "platform-pending:ws/3"}}] + } + inbox_uploads.rewrite_request_body(body) + assert body["parts"][0]["file"]["uri"] == "workspace:/local/3" + + +def test_rewrite_request_body_leaves_unmatched_uri_unchanged(): + # No cache entry → URI stays as-is. Agent surfaces the unresolvable + # URI rather than the inbox silently dropping the part. + body = { + "parts": [{"kind": "file", "file": {"uri": "platform-pending:ws/missing"}}] + } + inbox_uploads.rewrite_request_body(body) + assert body["parts"][0]["file"]["uri"] == "platform-pending:ws/missing" + + +def test_rewrite_request_body_leaves_non_pending_uri_unchanged(): + inbox_uploads.get_cache().set("platform-pending:ws/3", "workspace:/local/3") + body = { + "parts": [ + {"kind": "file", "file": {"uri": "workspace:/already-local.pdf"}}, + {"kind": "file", "file": {"uri": "https://example.com/x.pdf"}}, + ] + } + inbox_uploads.rewrite_request_body(body) + assert body["parts"][0]["file"]["uri"] == "workspace:/already-local.pdf" + assert body["parts"][1]["file"]["uri"] == "https://example.com/x.pdf" + + +def test_rewrite_request_body_skips_non_dict_parts(): + body = {"parts": ["not a dict", 42, None]} + inbox_uploads.rewrite_request_body(body) # must not raise + assert body["parts"] == ["not a dict", 42, None] + + +def test_rewrite_request_body_skips_text_parts(): + body = { + "parts": [{"kind": "text", "text": "platform-pending:ws/should-not-rewrite"}] + } + inbox_uploads.rewrite_request_body(body) + # Text content not touched — only file.uri fields are URIs. + assert body["parts"][0]["text"] == "platform-pending:ws/should-not-rewrite" + + +def test_rewrite_request_body_skips_part_without_file_dict(): + body = {"parts": [{"kind": "file"}]} # no file key + inbox_uploads.rewrite_request_body(body) + assert body["parts"] == [{"kind": "file"}] + + +def test_rewrite_request_body_skips_file_without_uri(): + body = {"parts": [{"kind": "file", "file": {"name": "x.pdf"}}]} + inbox_uploads.rewrite_request_body(body) + assert body["parts"][0]["file"] == {"name": "x.pdf"} + + +def test_rewrite_request_body_skips_nonstring_uri(): + body = {"parts": [{"kind": "file", "file": {"uri": None}}]} + inbox_uploads.rewrite_request_body(body) # must not raise + + +def test_rewrite_request_body_handles_non_dict_body(): + inbox_uploads.rewrite_request_body(None) # no-op + inbox_uploads.rewrite_request_body("string body") # no-op + inbox_uploads.rewrite_request_body([1, 2, 3]) # no-op + + +def test_rewrite_request_body_handles_non_dict_params(): + body = {"params": "not a dict", "parts": []} + inbox_uploads.rewrite_request_body(body) # must not raise + + +def test_rewrite_request_body_handles_non_dict_message(): + body = {"params": {"message": "not a dict"}} + inbox_uploads.rewrite_request_body(body) # must not raise + + +def test_rewrite_request_body_handles_non_list_parts(): + body = {"parts": "not a list"} + inbox_uploads.rewrite_request_body(body) # must not raise + + +def test_rewrite_request_body_handles_non_dict_file(): + body = {"parts": [{"kind": "file", "file": "not a dict"}]} + inbox_uploads.rewrite_request_body(body) # must not raise