diff --git a/workspace/inbox.py b/workspace/inbox.py index 94417243..6c7ea895 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -432,7 +432,17 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool: def message_from_activity(row: dict[str, Any]) -> InboxMessage: - """Convert one /activity row into an InboxMessage.""" + """Convert one /activity row into an InboxMessage. + + Mutates ``row['request_body']`` in-place to swap any + ``platform-pending:`` URIs to the locally-staged ``workspace:`` URIs + (see ``inbox_uploads.rewrite_request_body``) — by the time the + upstream chat message arrives via this path, the upload-receive row + that staged the bytes has already populated the URI cache (lower + activity_logs.id, processed earlier in the same poll batch). A + cache miss leaves the URI untouched; the agent surfaces an + unresolvable URI rather than the inbox silently dropping the part. + """ request_body = row.get("request_body") if isinstance(request_body, str): # The Go handler returns request_body as json.RawMessage; httpx @@ -443,6 +453,14 @@ def message_from_activity(row: dict[str, Any]) -> InboxMessage: except (TypeError, ValueError): request_body = None + # Rewrite platform-pending: URIs → workspace: URIs in-place. Imported + # at call time to keep the import graph clean for the in-container + # path that doesn't use this module (also avoids a circular: the + # uploads module is small enough that re-importing per call is + # cheap, and the Python import cache makes it free after the first). + from inbox_uploads import rewrite_request_body + rewrite_request_body(request_body) + return InboxMessage( activity_id=str(row.get("id", "")), text=_extract_text(request_body, row.get("summary")), @@ -532,11 +550,34 @@ def _poll_once( if cursor is None: rows = list(reversed(rows)) + # Imported lazily at use-site so a runtime that never sees an + # upload-receive row never imports the module. Cheap on the hot + # path because Python caches the import. + from inbox_uploads import is_chat_upload_row, fetch_and_stage + new_count = 0 last_id: str | None = None for row in rows: if not isinstance(row, dict): continue + if is_chat_upload_row(row): + # Side-effect row from the platform's poll-mode chat-upload + # handler — fetch the bytes, stage to /workspace/.molecule/ + # chat-uploads, ack. NOT enqueued as an InboxMessage; the + # agent will see the chat message that REFERENCES this + # upload via a separate (later) activity row, with the + # pending: URI rewritten to a workspace: URI by + # message_from_activity. We DO advance the cursor past + # this row so a permanent network outage on /content + # doesn't stall the cursor and block real chat traffic. + fetch_and_stage( + row, + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + ) + last_id = str(row.get("id", "")) or last_id + continue if _is_self_notify_row(row): # The workspace-server's `/notify` handler writes the agent's # own send_message_to_user POSTs to activity_logs with diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py new file mode 100644 index 00000000..798f18de --- /dev/null +++ b/workspace/inbox_uploads.py @@ -0,0 +1,475 @@ +"""Poll-mode chat-upload fetcher + URI cache for the standalone path. + +Companion to ``inbox.py``. When the workspace's inbox poller sees an +``activity_logs`` row with ``method='chat_upload_receive'`` (written by +the platform's ``uploadPollMode`` handler — workspace-server +``internal/handlers/chat_files.go``), this module: + + 1. Pulls the bytes from + ``GET /workspaces/:id/pending-uploads/:file_id/content``. + 2. Writes them to ``/workspace/.molecule/chat-uploads/-`` + — same on-disk shape as the push-mode handler in + ``internal_chat_uploads.py``, so anything downstream that already + resolves ``workspace:/workspace/.molecule/chat-uploads/...`` URIs + works unchanged. + 3. POSTs ``/workspaces/:id/pending-uploads/:file_id/ack`` so Phase 3 + sweep can clean up the platform-side ``pending_uploads`` row. + 4. Records a ``platform-pending:/ → + workspace:/workspace/.molecule/chat-uploads/...`` mapping in a + process-local cache so the chat message that arrives later + (referencing the platform-pending URI) gets rewritten before the + agent sees it. + +URI rewrite ordering — the chat message containing the +``platform-pending:`` URI is logged by the platform AFTER the +``chat_upload_receive`` row, so the inbox poller sees the upload-receive +row first (lower activity_logs.id) and stages the bytes before the chat +message arrives in the same poll batch (or a later one). The URI cache +is therefore populated before the message_from_activity path needs it. +A miss (network race, restart with stale cursor) is handled by keeping +the original ``platform-pending:`` URI in the rewritten body — the agent +will see something it can't open, which is preferable to silently +dropping the URI. + +Auth — same Bearer token the inbox poller uses (``platform_auth.auth_headers``). +Both endpoints are on the wsAuth-gated route, so this module can never +read another tenant's bytes even if a token is misrouted. +""" +from __future__ import annotations + +import logging +import mimetypes +import os +import re +import secrets as pysecrets +import threading +from collections import OrderedDict +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +# Same on-disk root as internal_chat_uploads.CHAT_UPLOAD_DIR — keeping +# these decoupled would let drift sneak in. Imported here rather than +# from internal_chat_uploads to avoid pulling in starlette as a +# transitive dep (this module runs in the standalone MCP path which +# doesn't ship the in-container HTTP server). +CHAT_UPLOAD_DIR = "/workspace/.molecule/chat-uploads" + +# Per-file safety net. The platform enforces 25 MB on the staging side, +# but a buggy or hostile platform response shouldn't be able to fill the +# workspace's disk — refuse to write more than this even if the response +# claims a larger Content-Length. +MAX_FILE_BYTES = 25 * 1024 * 1024 + +# Network deadline for the GET. Tuned for a 25 MB transfer over a +# reasonable consumer link (~5 Mbps gives ~40s for the full payload), +# plus headroom for TLS + platform auth. Aligned with inbox poller's +# 10s default for /activity calls — both are user-perceived latency. +DEFAULT_FETCH_TIMEOUT = 60.0 + +# Cap on the URI cache. A long-lived workspace handling thousands of +# uploads shouldn't grow without bound; an LRU cap of 1024 keeps the +# entries-needed-for-a-typical-conversation well within memory. +URI_CACHE_MAX_ENTRIES = 1024 + +# Same character class as internal_chat_uploads — kept duplicated rather +# than imported to avoid dragging starlette into the standalone path. +_UNSAFE_FILENAME_CHARS = re.compile(r"[^a-zA-Z0-9._\-]") + + +def sanitize_filename(name: str) -> str: + """Reduce a user-supplied filename to a safe form. + + Mirrors ``internal_chat_uploads.sanitize_filename`` and the Go + handler's ``SanitizeFilename`` — three-way parity is pinned by + ``workspace-server/internal/handlers/sanitize_filename_test.go`` and + ``workspace/tests/test_internal_chat_uploads.py`` so the URI shape + is identical regardless of which path handles the upload. + """ + base = os.path.basename(name) + base = base.replace(" ", "_") + base = _UNSAFE_FILENAME_CHARS.sub("_", base) + if len(base) > 100: + ext = "" + dot = base.rfind(".") + if dot >= 0 and len(base) - dot <= 16: + ext = base[dot:] + base = base[: 100 - len(ext)] + ext + if base in ("", ".", ".."): + return "file" + return base + + +# --------------------------------------------------------------------------- +# URI cache — maps platform-pending URIs to local workspace: URIs +# --------------------------------------------------------------------------- + + +class _URICache: + """Thread-safe bounded LRU mapping of platform-pending → workspace URIs. + + Bounded so a workspace that runs for months and handles thousands of + uploads doesn't accumulate entries forever. ``OrderedDict.move_to_end`` + promotes recently-used entries; eviction takes the oldest. + + The cache is intentionally per-process — there is no persistence + across a workspace restart. A restart with a stale inbox cursor that + re-poll an upload-receive row will re-fetch (the bytes are already + on disk from the prior session — see ``stage_to_disk``'s O_EXCL + handling) and re-register; a chat message that referenced the + platform-pending URI BEFORE the restart and arrives AFTER would miss + the rewrite and surface the platform-pending URI to the agent. That + is preferable to a stale persisted mapping that points at a deleted + file. + """ + + def __init__(self, max_entries: int = URI_CACHE_MAX_ENTRIES): + self._max = max_entries + self._lock = threading.Lock() + self._entries: "OrderedDict[str, str]" = OrderedDict() + + def get(self, pending_uri: str) -> str | None: + with self._lock: + local = self._entries.get(pending_uri) + if local is not None: + self._entries.move_to_end(pending_uri) + return local + + def set(self, pending_uri: str, local_uri: str) -> None: + with self._lock: + self._entries[pending_uri] = local_uri + self._entries.move_to_end(pending_uri) + while len(self._entries) > self._max: + self._entries.popitem(last=False) + + def __len__(self) -> int: + with self._lock: + return len(self._entries) + + def clear(self) -> None: + with self._lock: + self._entries.clear() + + +_cache = _URICache() + + +def get_cache() -> _URICache: + """Expose the module-singleton cache for tests and the rewrite path.""" + return _cache + + +def resolve_pending_uri(uri: str) -> str | None: + """Return the local ``workspace:`` URI for a ``platform-pending:`` URI, + or None if not yet staged. Convenience for callers that want to + fall back to an on-demand fetch — pass the result through to + ``executor_helpers.resolve_attachment_uri``. + """ + return _cache.get(uri) + + +# --------------------------------------------------------------------------- +# On-disk staging +# --------------------------------------------------------------------------- + + +def _open_safe(path: str) -> int: + """Open ``path`` for write with ``O_CREAT|O_EXCL|O_NOFOLLOW``. + + Same shape as ``internal_chat_uploads._open_safe`` — refuses to + follow a pre-existing symlink at the target and refuses to overwrite + an existing regular file. The 16-byte random prefix makes a name + collision astronomical, but defense-in-depth costs nothing. + """ + flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL + if hasattr(os, "O_NOFOLLOW"): + flags |= os.O_NOFOLLOW + return os.open(path, flags, 0o600) + + +def stage_to_disk(content: bytes, filename: str) -> str: + """Write ``content`` under ``CHAT_UPLOAD_DIR`` and return the local URI. + + Returns ``workspace:/workspace/.molecule/chat-uploads/-``. + The 32-hex prefix makes the on-disk name unguessable to anything + that didn't see the response, so even if a stale agent has a guess + at the original filename it can't construct a URL to a sibling's + upload. + + Raises: + OSError: write failure (mkdir, open, or write). Caller is + expected to log + skip; the activity row stays unacked so a + future poll re-tries. + ValueError: ``content`` exceeds ``MAX_FILE_BYTES``. Pre-staging + guard belt-and-braces above the platform's same-side cap. + """ + if len(content) > MAX_FILE_BYTES: + raise ValueError( + f"content size {len(content)} exceeds workspace cap {MAX_FILE_BYTES}" + ) + + Path(CHAT_UPLOAD_DIR).mkdir(parents=True, exist_ok=True) + + sanitized = sanitize_filename(filename) + prefix = pysecrets.token_hex(16) + stored = f"{prefix}-{sanitized}" + target = os.path.join(CHAT_UPLOAD_DIR, stored) + + fd = _open_safe(target) + try: + with os.fdopen(fd, "wb") as f: + f.write(content) + except OSError: + # Best-effort cleanup — partial writes leave a stub file that + # would mask a future retry's success otherwise. + try: + os.unlink(target) + except OSError: + pass + raise + + return f"workspace:{CHAT_UPLOAD_DIR}/{stored}" + + +# --------------------------------------------------------------------------- +# Activity row → fetch/stage/ack flow +# --------------------------------------------------------------------------- + + +def _request_body_dict(row: dict[str, Any]) -> dict[str, Any] | None: + """Coerce ``row['request_body']`` into a dict. + + The /activity API returns request_body as JSON (already-deserialized + by httpx). Some legacy paths or mocked transports may emit a string; + handle defensively rather than raising. + """ + body = row.get("request_body") + if isinstance(body, dict): + return body + if isinstance(body, str): + import json + try: + decoded = json.loads(body) + except (TypeError, ValueError): + return None + return decoded if isinstance(decoded, dict) else None + return None + + +def is_chat_upload_row(row: dict[str, Any]) -> bool: + """True if ``row`` is the platform's chat-upload-receive activity. + + Used by the inbox poller to fork the row off the regular A2A + message handling path — this row is not a peer message; it's an + instruction to fetch + stage bytes. Match on ``method`` only; + ``activity_type`` is already filtered to ``a2a_receive`` upstream. + """ + return row.get("method") == "chat_upload_receive" + + +def fetch_and_stage( + row: dict[str, Any], + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + timeout_secs: float = DEFAULT_FETCH_TIMEOUT, +) -> str | None: + """Fetch the row's bytes, stage them under chat-uploads, and ack. + + Returns the local ``workspace:`` URI on success, or ``None`` if any + step failed (logged with enough detail to triage). Failure leaves + the platform-side row unacked, so a subsequent poll retries — the + activity row stays in the cursor's window because we DO advance the + cursor (the row is "handled" from the inbox's perspective even on + fetch failure; otherwise a permanent network outage would stall the + cursor and block real chat traffic). + + On success, the URI cache is updated so a subsequent chat message + referencing the same ``platform-pending:`` URI is rewritten before + the agent sees it. + """ + body = _request_body_dict(row) + if body is None: + logger.warning( + "inbox_uploads: row %s missing request_body; cannot fetch", + row.get("id"), + ) + return None + + file_id = body.get("file_id") + if not isinstance(file_id, str) or not file_id: + logger.warning( + "inbox_uploads: row %s has no file_id in request_body", + row.get("id"), + ) + return None + + pending_uri = body.get("uri") + if not isinstance(pending_uri, str) or not pending_uri: + # Reconstruct what the platform would have written — defensive + # against a row whose uri field got truncated. Same shape as the + # Go handler's URI builder. + pending_uri = f"platform-pending:{workspace_id}/{file_id}" + + filename = body.get("name") or "file" + if not isinstance(filename, str): + filename = "file" + + # Lazy httpx import: the standalone MCP path uses httpx; an in- + # container caller that imports this module by accident shouldn't + # explode at import time. + try: + import httpx # noqa: WPS433 + except ImportError: + logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) + return None + + content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content" + ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack" + + try: + with httpx.Client(timeout=timeout_secs) as client: + resp = client.get(content_url, headers=headers) + except Exception as exc: # noqa: BLE001 + logger.warning( + "inbox_uploads: GET %s failed: %s", content_url, exc + ) + return None + + if resp.status_code == 404: + # Row was swept or already acked by a previous poll race — nothing + # to fetch. Don't ack again; the platform's GC handles it. This is + # a soft-skip, not an error — log at INFO so triage isn't noisy. + logger.info( + "inbox_uploads: pending upload %s already gone (404); skipping", + file_id, + ) + return None + if resp.status_code >= 400: + logger.warning( + "inbox_uploads: GET %s returned %d: %s", + content_url, + resp.status_code, + (resp.text or "")[:200], + ) + return None + + content = resp.content or b"" + if len(content) > MAX_FILE_BYTES: + logger.warning( + "inbox_uploads: refusing to stage %s — size %d exceeds cap %d", + file_id, + len(content), + MAX_FILE_BYTES, + ) + return None + + # Mimetype precedence: platform's Content-Type header → request_body + # mimeType field → extension guess. Same precedence as the in- + # container ingest handler. + mime_header = resp.headers.get("content-type", "").split(";")[0].strip() + mime = ( + mime_header + or (body.get("mimeType") if isinstance(body.get("mimeType"), str) else "") + or (mimetypes.guess_type(filename)[0] or "") + ) + + try: + local_uri = stage_to_disk(content, filename) + except (OSError, ValueError) as exc: + logger.error( + "inbox_uploads: failed to stage %s (%s) to disk: %s", + file_id, + filename, + exc, + ) + return None + + _cache.set(pending_uri, local_uri) + logger.info( + "inbox_uploads: staged file_id=%s name=%s size=%d mime=%s pending_uri=%s local_uri=%s", + file_id, + filename, + len(content), + mime, + pending_uri, + local_uri, + ) + + # Ack last so a write failure above leaves the row available for a + # retry on the next poll. A failed ack is logged but doesn't roll + # back the on-disk file — the platform's sweep will clean up + # eventually. + try: + with httpx.Client(timeout=timeout_secs) as client: + ack_resp = client.post(ack_url, headers=headers) + if ack_resp.status_code >= 400: + logger.warning( + "inbox_uploads: ack %s returned %d: %s", + ack_url, + ack_resp.status_code, + (ack_resp.text or "")[:200], + ) + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: POST %s failed: %s", ack_url, exc) + + return local_uri + + +# --------------------------------------------------------------------------- +# URI rewrite for incoming chat messages +# --------------------------------------------------------------------------- +# +# The chat message that references a staged upload arrives as a +# SEPARATE activity_log row, with parts of kind=file containing +# platform-pending: URIs in the file.uri field. Walk the structure +# in-place and rewrite to the local workspace: URI when the cache has it. +# Unknown URIs pass through unchanged — the agent gets to choose how +# to react (most runtimes log + ignore an unresolvable URI). + + +def _rewrite_part(part: Any) -> None: + """Mutate a single A2A Part dict to swap platform-pending: URIs.""" + if not isinstance(part, dict): + return + file_obj = part.get("file") + if not isinstance(file_obj, dict): + return + uri = file_obj.get("uri") + if not isinstance(uri, str) or not uri.startswith("platform-pending:"): + return + rewritten = _cache.get(uri) + if rewritten: + file_obj["uri"] = rewritten + + +def rewrite_request_body(body: Any) -> None: + """Mutate ``body`` in-place, replacing platform-pending: URIs with + the cached local equivalents. + + Walks the same shapes ``inbox._extract_text`` accepts: + + - ``body['parts']`` + - ``body['params']['parts']`` + - ``body['params']['message']['parts']`` + + No-op for shapes that don't match — the message simply passes + through to the agent as-is. + """ + if not isinstance(body, dict): + return + candidates: list[Any] = [] + params = body.get("params") if isinstance(body.get("params"), dict) else None + if params: + message = params.get("message") if isinstance(params.get("message"), dict) else None + if message: + candidates.append(message.get("parts")) + candidates.append(params.get("parts")) + candidates.append(body.get("parts")) + + for parts in candidates: + if isinstance(parts, list): + for part in parts: + _rewrite_part(part) diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 6731701a..162c32c2 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -701,3 +701,165 @@ def test_set_notification_callback_none_clears(state: inbox.InboxState): state.record(_msg("act-1")) assert received == [] + + +# --------------------------------------------------------------------------- +# Phase 2 — chat_upload_receive rows route to inbox_uploads.fetch_and_stage +# --------------------------------------------------------------------------- + + +def test_poll_once_skips_chat_upload_row_from_queue(state: inbox.InboxState, monkeypatch, tmp_path): + """A row with method='chat_upload_receive' must NOT enqueue as a + chat message — it's a side-effect telling the workspace to fetch + bytes. Pin the contract so a refactor that flattens the row loop + can't silently re-enqueue these as 'empty A2A message' rows.""" + import inbox_uploads + monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads")) + inbox_uploads.get_cache().clear() + + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "chat_upload_receive", + "summary": "chat_upload_receive: foo.pdf", + "request_body": { + "file_id": "abc123", + "name": "foo.pdf", + "mimeType": "application/pdf", + "size": 4, + "uri": "platform-pending:ws-1/abc123", + }, + "created_at": "2026-05-04T10:00:00Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + fetch_called = [] + + def fake_fetch(row, **kwargs): + fetch_called.append((row.get("id"), kwargs["workspace_id"])) + return "workspace:/local/foo.pdf" + + with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + # Not enqueued + cursor advanced. + assert n == 0 + assert state.peek(10) == [] + assert state.load_cursor() == "act-1" + # fetch_and_stage was invoked with the row and workspace_id. + assert fetch_called == [("act-1", "ws-1")] + + +def test_poll_once_chat_upload_row_then_chat_message_rewrites_uri(state: inbox.InboxState, monkeypatch, tmp_path): + """The classic ordering: upload-receive row first (lower id), chat + message referencing platform-pending: URI second. The chat message + that lands in the inbox must have its URI rewritten to the local + workspace: URI before the agent sees it. + """ + import inbox_uploads + monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads")) + cache = inbox_uploads.get_cache() + cache.clear() + + # Pretend the fetch already populated the cache. (The real flow + # populates it inside fetch_and_stage; we patch that to keep the + # test focused on the rewrite contract.) + cache.set("platform-pending:ws-1/abc123", "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf") + + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "chat_upload_receive", + "summary": "chat_upload_receive: foo.pdf", + "request_body": { + "file_id": "abc123", + "name": "foo.pdf", + "mimeType": "application/pdf", + "size": 4, + "uri": "platform-pending:ws-1/abc123", + }, + "created_at": "2026-05-04T10:00:00Z", + }, + { + "id": "act-2", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": { + "params": { + "message": { + "parts": [ + {"kind": "text", "text": "look at this"}, + { + "kind": "file", + "file": { + "uri": "platform-pending:ws-1/abc123", + "name": "foo.pdf", + }, + }, + ] + } + } + }, + "created_at": "2026-05-04T10:00:01Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + + def fake_fetch(row, **kwargs): + return "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf" + + with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + # Only the chat message is enqueued. + assert n == 1 + queue = state.peek(10) + assert len(queue) == 1 + msg = queue[0] + assert msg.activity_id == "act-2" + # The URI in the row's request_body was mutated by message_from_activity + # → rewrite_request_body. Re-extracting reveals the rewritten value. + rewritten = rows[1]["request_body"]["params"]["message"]["parts"][1]["file"]["uri"] + assert rewritten == "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf" + + +def test_poll_once_chat_upload_row_advances_cursor_even_on_fetch_failure( + state: inbox.InboxState, monkeypatch, tmp_path +): + """A permanent network failure on /content must NOT stall the cursor + — otherwise one bad upload blocks all real chat traffic for the + workspace. fetch_and_stage returns None on failure, but the row is + still considered handled from the cursor's perspective.""" + import inbox_uploads + monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads")) + + rows = [ + { + "id": "act-broken", + "source_id": None, + "method": "chat_upload_receive", + "summary": "chat_upload_receive: doomed.pdf", + "request_body": { + "file_id": "doom", + "name": "doomed.pdf", + "uri": "platform-pending:ws-1/doom", + }, + "created_at": "2026-05-04T10:00:00Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + + def fake_fetch(row, **kwargs): + return None # network failure + + with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch): + inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert state.peek(10) == [] + assert state.load_cursor() == "act-broken" diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py new file mode 100644 index 00000000..515616e2 --- /dev/null +++ b/workspace/tests/test_inbox_uploads.py @@ -0,0 +1,697 @@ +"""Tests for workspace/inbox_uploads.py — poll-mode chat-upload fetcher. + +Covers the full activity-row → fetch → stage-on-disk → ack flow plus +the URI cache and the rewrite that swaps platform-pending: URIs to +local workspace: URIs in subsequent chat messages. +""" +from __future__ import annotations + +import os +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +import inbox_uploads + + +@pytest.fixture(autouse=True) +def _reset_cache_and_dir(tmp_path, monkeypatch): + """Each test starts with an empty URI cache and a temp upload dir + so on-disk artifacts from one test don't leak into the next.""" + inbox_uploads.get_cache().clear() + monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads")) + yield + inbox_uploads.get_cache().clear() + + +# --------------------------------------------------------------------------- +# sanitize_filename — parity with internal_chat_uploads + Go SanitizeFilename +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "raw,want", + [ + ("../../etc/passwd", "passwd"), + ("/etc/passwd", "passwd"), + ("hello world.pdf", "hello_world.pdf"), + ("weird;chars!?.txt", "weird_chars__.txt"), + ("中文.docx", "__.docx"), + ("file (1).pdf", "file__1_.pdf"), + ("report-2026.05.04_v2.pdf", "report-2026.05.04_v2.pdf"), + ("", "file"), + (".", "file"), + ("..", "file"), + ], +) +def test_sanitize_filename_parity_with_python_internal(raw, want): + assert inbox_uploads.sanitize_filename(raw) == want + + +def test_sanitize_filename_caps_at_100_preserves_short_extension(): + long = "a" * 200 + ".pdf" + got = inbox_uploads.sanitize_filename(long) + assert len(got) == 100 + assert got.endswith(".pdf") + + +def test_sanitize_filename_drops_long_extension(): + long = "c" * 90 + ".thisisaverylongextensionnotpreserved" + got = inbox_uploads.sanitize_filename(long) + assert len(got) == 100 + assert ".thisisaverylongextensionnotpreserved" not in got + + +# --------------------------------------------------------------------------- +# _URICache — LRU semantics +# --------------------------------------------------------------------------- + + +def test_uricache_set_get_roundtrip(): + c = inbox_uploads._URICache(max_entries=10) + c.set("platform-pending:ws/1", "workspace:/local/1") + assert c.get("platform-pending:ws/1") == "workspace:/local/1" + + +def test_uricache_get_missing_returns_none(): + c = inbox_uploads._URICache(max_entries=10) + assert c.get("platform-pending:ws/missing") is None + + +def test_uricache_evicts_oldest_at_capacity(): + c = inbox_uploads._URICache(max_entries=2) + c.set("a", "A") + c.set("b", "B") + c.set("c", "C") # evicts "a" + assert c.get("a") is None + assert c.get("b") == "B" + assert c.get("c") == "C" + assert len(c) == 2 + + +def test_uricache_get_promotes_recently_used(): + c = inbox_uploads._URICache(max_entries=2) + c.set("a", "A") + c.set("b", "B") + # Promote "a" by reading; next set should evict "b" instead of "a". + assert c.get("a") == "A" + c.set("c", "C") + assert c.get("a") == "A" + assert c.get("b") is None + assert c.get("c") == "C" + + +def test_uricache_overwrite_updates_value(): + c = inbox_uploads._URICache(max_entries=10) + c.set("k", "v1") + c.set("k", "v2") + assert c.get("k") == "v2" + assert len(c) == 1 + + +def test_uricache_clear(): + c = inbox_uploads._URICache(max_entries=10) + c.set("a", "A") + c.set("b", "B") + c.clear() + assert c.get("a") is None + assert len(c) == 0 + + +def test_resolve_pending_uri_uses_module_cache(): + inbox_uploads.get_cache().set("platform-pending:ws/x", "workspace:/local/x") + assert inbox_uploads.resolve_pending_uri("platform-pending:ws/x") == "workspace:/local/x" + assert inbox_uploads.resolve_pending_uri("platform-pending:ws/missing") is None + + +# --------------------------------------------------------------------------- +# stage_to_disk +# --------------------------------------------------------------------------- + + +def test_stage_to_disk_writes_file_and_returns_workspace_uri(tmp_path): + uri = inbox_uploads.stage_to_disk(b"hello", "report.pdf") + assert uri.startswith("workspace:") + path = uri[len("workspace:"):] + assert os.path.isfile(path) + with open(path, "rb") as f: + assert f.read() == b"hello" + assert path.endswith("-report.pdf") + # Prefix is 32 hex chars + "-" + name. + name = os.path.basename(path) + prefix, _, _ = name.partition("-") + assert len(prefix) == 32 + + +def test_stage_to_disk_sanitizes_filename(): + uri = inbox_uploads.stage_to_disk(b"x", "../../evil.txt") + name = os.path.basename(uri) + assert "/" not in name + assert name.endswith("-evil.txt") + + +def test_stage_to_disk_rejects_oversize(): + with pytest.raises(ValueError): + inbox_uploads.stage_to_disk(b"x" * (inbox_uploads.MAX_FILE_BYTES + 1), "big.bin") + + +def test_stage_to_disk_creates_directory_if_missing(): + # CHAT_UPLOAD_DIR is monkeypatched to a non-existent tmp path; the + # call must mkdir -p it on first write. + assert not os.path.exists(inbox_uploads.CHAT_UPLOAD_DIR) + inbox_uploads.stage_to_disk(b"x", "a.txt") + assert os.path.isdir(inbox_uploads.CHAT_UPLOAD_DIR) + + +def test_stage_to_disk_write_failure_cleans_partial_file(tmp_path, monkeypatch): + # open() succeeds but write() fails — the partial file must be + # removed so a retry can claim a fresh prefix without colliding. + real_fdopen = os.fdopen + written_paths: list[str] = [] + + def boom_fdopen(fd, mode): + # Wrap the real file with one whose write() raises. + f = real_fdopen(fd, mode) + # Track which path's fd we opened by inspecting the chat-upload dir. + for entry in os.listdir(inbox_uploads.CHAT_UPLOAD_DIR): + written_paths.append(os.path.join(inbox_uploads.CHAT_UPLOAD_DIR, entry)) + original_write = f.write + + def bad_write(b): + original_write(b"") # ensure file exists + raise OSError(28, "no space") + f.write = bad_write + return f + + monkeypatch.setattr(os, "fdopen", boom_fdopen) + with pytest.raises(OSError): + inbox_uploads.stage_to_disk(b"data", "x.txt") + # All staged files cleaned up. + for p in written_paths: + assert not os.path.exists(p) + + +def test_stage_to_disk_write_failure_unlink_failure_swallowed(monkeypatch): + # open() succeeds, write() fails, unlink() ALSO fails — the unlink + # error is swallowed and the original write error propagates. + real_fdopen = os.fdopen + + def boom_fdopen(fd, mode): + f = real_fdopen(fd, mode) + + def bad_write(_): + raise OSError(28, "no space") + f.write = bad_write + return f + + def bad_unlink(_): + raise OSError(13, "permission denied") + + monkeypatch.setattr(os, "fdopen", boom_fdopen) + monkeypatch.setattr(os, "unlink", bad_unlink) + with pytest.raises(OSError) as ei: + inbox_uploads.stage_to_disk(b"data", "x.txt") + # Original write error, not the unlink error. + assert ei.value.errno == 28 + + +def test_stage_to_disk_propagates_oserror_and_cleans_partial(tmp_path, monkeypatch): + # Make the dir read-only AFTER mkdir succeeds, so open() fails. Skip + # this on platforms where the dir's permissions don't restrict the + # process owner (root in Docker, etc.). + inbox_uploads.stage_to_disk(b"first", "a.txt") + if os.geteuid() == 0: + pytest.skip("root bypasses permission bits") + os.chmod(inbox_uploads.CHAT_UPLOAD_DIR, 0o500) + try: + with pytest.raises(OSError): + inbox_uploads.stage_to_disk(b"second", "b.txt") + finally: + os.chmod(inbox_uploads.CHAT_UPLOAD_DIR, 0o755) + + +# --------------------------------------------------------------------------- +# is_chat_upload_row + _request_body_dict +# --------------------------------------------------------------------------- + + +def test_is_chat_upload_row_true_on_method_match(): + assert inbox_uploads.is_chat_upload_row({"method": "chat_upload_receive"}) + + +def test_is_chat_upload_row_false_on_other_methods(): + assert not inbox_uploads.is_chat_upload_row({"method": "message/send"}) + assert not inbox_uploads.is_chat_upload_row({"method": None}) + assert not inbox_uploads.is_chat_upload_row({}) + + +def test_request_body_dict_passthrough(): + body = {"file_id": "x"} + assert inbox_uploads._request_body_dict({"request_body": body}) is body + + +def test_request_body_dict_string_decoded(): + assert inbox_uploads._request_body_dict({"request_body": '{"a": 1}'}) == {"a": 1} + + +def test_request_body_dict_invalid_string_returns_none(): + assert inbox_uploads._request_body_dict({"request_body": "not json"}) is None + + +def test_request_body_dict_non_dict_after_decode_returns_none(): + assert inbox_uploads._request_body_dict({"request_body": "[1, 2]"}) is None + + +def test_request_body_dict_other_type_returns_none(): + assert inbox_uploads._request_body_dict({"request_body": 123}) is None + + +# --------------------------------------------------------------------------- +# fetch_and_stage — the full GET / write / ack flow +# --------------------------------------------------------------------------- + + +def _make_resp(status_code: int, content: bytes = b"", content_type: str = "", text: str = "") -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.content = content + headers: dict[str, str] = {} + if content_type: + headers["content-type"] = content_type + resp.headers = headers + resp.text = text + return resp + + +def _patch_httpx_for_fetch(get_resp: MagicMock, ack_resp: MagicMock | None = None): + """Patch httpx.Client so each new context-manager returns a client + whose .get() returns get_resp and .post() returns ack_resp. + """ + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(return_value=get_resp) + client.post = MagicMock(return_value=ack_resp or _make_resp(200)) + return patch("httpx.Client", return_value=client), client + + +def _row(file_id: str = "file-1", uri: str | None = None, name: str = "report.pdf", body_extra: dict | None = None) -> dict: + body: dict[str, Any] = { + "file_id": file_id, + "name": name, + "mimeType": "application/pdf", + "size": 9, + } + if uri is not None: + body["uri"] = uri + if body_extra: + body.update(body_extra) + return { + "id": "act-100", + "source_id": None, + "method": "chat_upload_receive", + "summary": "chat_upload_receive: report.pdf", + "request_body": body, + "created_at": "2026-05-04T10:00:00Z", + } + + +def test_fetch_and_stage_happy_path_writes_file_acks_and_caches(): + pending_uri = "platform-pending:ws-1/file-1" + row = _row(uri=pending_uri) + get_resp = _make_resp(200, content=b"PDF-bytes", content_type="application/pdf") + p, client = _patch_httpx_for_fetch(get_resp) + with p: + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={"Authorization": "Bearer t"} + ) + assert local_uri is not None + assert local_uri.startswith("workspace:") + # On-disk file content matches. + path = local_uri[len("workspace:"):] + with open(path, "rb") as f: + assert f.read() == b"PDF-bytes" + # Cache populated. + assert inbox_uploads.get_cache().get(pending_uri) == local_uri + # Ack POSTed to the right URL. + client.post.assert_called_once() + args, kwargs = client.post.call_args + assert "/pending-uploads/file-1/ack" in args[0] + assert kwargs["headers"]["Authorization"] == "Bearer t" + + +def test_fetch_and_stage_reconstructs_uri_when_missing_in_body(): + row = _row(uri=None) # request_body has no 'uri' + get_resp = _make_resp(200, content=b"x", content_type="text/plain") + p, _ = _patch_httpx_for_fetch(get_resp) + with p: + inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + # Cache key reconstructed from workspace_id + file_id. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-1") is not None + + +def test_fetch_and_stage_returns_none_on_missing_request_body(): + row = {"id": "act-100", "method": "chat_upload_receive"} + # No httpx call should happen, but we patch defensively. + p, client = _patch_httpx_for_fetch(_make_resp(200)) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.get.assert_not_called() + + +def test_fetch_and_stage_returns_none_on_missing_file_id(): + row = {"id": "act-100", "method": "chat_upload_receive", "request_body": {"name": "x.pdf"}} + p, client = _patch_httpx_for_fetch(_make_resp(200)) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.get.assert_not_called() + + +def test_fetch_and_stage_handles_nonstring_file_id(): + row = {"id": "act-100", "method": "chat_upload_receive", "request_body": {"file_id": 123}} + p, client = _patch_httpx_for_fetch(_make_resp(200)) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.get.assert_not_called() + + +def test_fetch_and_stage_404_returns_none_no_ack(): + row = _row() + get_resp = _make_resp(404, text="gone") + ack_resp = _make_resp(200) + p, client = _patch_httpx_for_fetch(get_resp, ack_resp) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + # No ack — the row is already gone. + client.post.assert_not_called() + + +def test_fetch_and_stage_500_returns_none_no_ack(): + row = _row() + p, client = _patch_httpx_for_fetch(_make_resp(500, text="boom")) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.post.assert_not_called() + + +def test_fetch_and_stage_network_error_returns_none(): + row = _row() + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(side_effect=RuntimeError("connection refused")) + with patch("httpx.Client", return_value=client): + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + + +def test_fetch_and_stage_oversize_response_refused(): + row = _row() + big = b"x" * (inbox_uploads.MAX_FILE_BYTES + 1) + p, client = _patch_httpx_for_fetch(_make_resp(200, content=big, content_type="application/octet-stream")) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.post.assert_not_called() + + +def test_fetch_and_stage_ack_failure_does_not_invalidate_local_uri(): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"data", content_type="text/plain") + ack_resp = _make_resp(500, text="ack failed") + p, _ = _patch_httpx_for_fetch(get_resp, ack_resp) + with p: + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + # On-disk staging succeeded; ack failure is logged but doesn't + # roll back the cache. + assert local_uri is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-1") == local_uri + + +def test_fetch_and_stage_ack_network_error_swallowed(): + row = _row(uri="platform-pending:ws-1/file-1") + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(return_value=_make_resp(200, content=b"data", content_type="text/plain")) + client.post = MagicMock(side_effect=RuntimeError("ack network error")) + with patch("httpx.Client", return_value=client): + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is not None # GET succeeded → URI returned even if ack blew up + + +def test_fetch_and_stage_uses_response_content_type_when_present(): + row = _row(name="thing.bin", body_extra={"mimeType": "application/x-bogus"}) + # Response says image/png; should win over body's mimeType. + get_resp = _make_resp(200, content=b"PNG", content_type="image/png; charset=binary") + p, _ = _patch_httpx_for_fetch(get_resp) + with p: + # We don't assert on returned mime (not part of the contract); + # the test just verifies the happy path runs without trying to + # parse the trailing parameter. + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is not None + + +def test_fetch_and_stage_nonstring_filename_falls_back_to_file(): + # body['name'] is a non-string (e.g. truncated to None or a number); + # filename must default to "file" so sanitize_filename has something + # to work with. + row = _row(body_extra={"name": 12345}) + p, _ = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain")) + with p: + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert local_uri is not None + assert local_uri.endswith("-file") + + +def test_fetch_and_stage_default_filename_when_missing(): + row = { + "id": "act", + "method": "chat_upload_receive", + "request_body": {"file_id": "file-1"}, + } + p, _ = _patch_httpx_for_fetch(_make_resp(200, content=b"data", content_type="text/plain")) + with p: + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert local_uri is not None + assert local_uri.endswith("-file") # default filename + + +def test_fetch_and_stage_disk_write_failure_returns_none(monkeypatch): + row = _row() + p, client = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain")) + + def bad_stage(*args, **kwargs): + raise OSError(28, "no space left") + monkeypatch.setattr(inbox_uploads, "stage_to_disk", bad_stage) + + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.post.assert_not_called() + + +def test_fetch_and_stage_disk_value_error_returns_none(monkeypatch): + row = _row() + p, client = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain")) + + def bad_stage(*args, **kwargs): + raise ValueError("oversize after sanity check") + monkeypatch.setattr(inbox_uploads, "stage_to_disk", bad_stage) + + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is None + client.post.assert_not_called() + + +def test_fetch_and_stage_httpx_missing_returns_none(monkeypatch): + row = _row() + # Simulate httpx not installed by making the import fail. + import sys + real_httpx = sys.modules.pop("httpx", None) + monkeypatch.setitem(sys.modules, "httpx", None) + try: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + finally: + if real_httpx is not None: + sys.modules["httpx"] = real_httpx + else: + sys.modules.pop("httpx", None) + assert result is None + + +def test_fetch_and_stage_falls_back_to_extension_mime(monkeypatch): + row = _row(name="snap.png", body_extra={"mimeType": ""}) # no mimeType in body + # Response also has no content-type so it falls through to mimetypes.guess_type. + get_resp = _make_resp(200, content=b"PNG", content_type="") + p, _ = _patch_httpx_for_fetch(get_resp) + with p: + result = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert result is not None + + +# --------------------------------------------------------------------------- +# rewrite_request_body — URI swap in chat-message bodies +# --------------------------------------------------------------------------- + + +def test_rewrite_request_body_swaps_pending_uri_in_message_parts(): + inbox_uploads.get_cache().set("platform-pending:ws/1", "workspace:/local/1") + body = { + "method": "message/send", + "params": { + "message": { + "parts": [ + {"kind": "text", "text": "see this"}, + {"kind": "file", "file": {"uri": "platform-pending:ws/1", "name": "a.pdf"}}, + ] + } + }, + } + inbox_uploads.rewrite_request_body(body) + assert body["params"]["message"]["parts"][1]["file"]["uri"] == "workspace:/local/1" + + +def test_rewrite_request_body_swaps_in_params_parts(): + inbox_uploads.get_cache().set("platform-pending:ws/2", "workspace:/local/2") + body = { + "params": { + "parts": [ + {"kind": "file", "file": {"uri": "platform-pending:ws/2"}}, + ] + } + } + inbox_uploads.rewrite_request_body(body) + assert body["params"]["parts"][0]["file"]["uri"] == "workspace:/local/2" + + +def test_rewrite_request_body_swaps_in_top_level_parts(): + inbox_uploads.get_cache().set("platform-pending:ws/3", "workspace:/local/3") + body = { + "parts": [{"kind": "file", "file": {"uri": "platform-pending:ws/3"}}] + } + inbox_uploads.rewrite_request_body(body) + assert body["parts"][0]["file"]["uri"] == "workspace:/local/3" + + +def test_rewrite_request_body_leaves_unmatched_uri_unchanged(): + # No cache entry → URI stays as-is. Agent surfaces the unresolvable + # URI rather than the inbox silently dropping the part. + body = { + "parts": [{"kind": "file", "file": {"uri": "platform-pending:ws/missing"}}] + } + inbox_uploads.rewrite_request_body(body) + assert body["parts"][0]["file"]["uri"] == "platform-pending:ws/missing" + + +def test_rewrite_request_body_leaves_non_pending_uri_unchanged(): + inbox_uploads.get_cache().set("platform-pending:ws/3", "workspace:/local/3") + body = { + "parts": [ + {"kind": "file", "file": {"uri": "workspace:/already-local.pdf"}}, + {"kind": "file", "file": {"uri": "https://example.com/x.pdf"}}, + ] + } + inbox_uploads.rewrite_request_body(body) + assert body["parts"][0]["file"]["uri"] == "workspace:/already-local.pdf" + assert body["parts"][1]["file"]["uri"] == "https://example.com/x.pdf" + + +def test_rewrite_request_body_skips_non_dict_parts(): + body = {"parts": ["not a dict", 42, None]} + inbox_uploads.rewrite_request_body(body) # must not raise + assert body["parts"] == ["not a dict", 42, None] + + +def test_rewrite_request_body_skips_text_parts(): + body = { + "parts": [{"kind": "text", "text": "platform-pending:ws/should-not-rewrite"}] + } + inbox_uploads.rewrite_request_body(body) + # Text content not touched — only file.uri fields are URIs. + assert body["parts"][0]["text"] == "platform-pending:ws/should-not-rewrite" + + +def test_rewrite_request_body_skips_part_without_file_dict(): + body = {"parts": [{"kind": "file"}]} # no file key + inbox_uploads.rewrite_request_body(body) + assert body["parts"] == [{"kind": "file"}] + + +def test_rewrite_request_body_skips_file_without_uri(): + body = {"parts": [{"kind": "file", "file": {"name": "x.pdf"}}]} + inbox_uploads.rewrite_request_body(body) + assert body["parts"][0]["file"] == {"name": "x.pdf"} + + +def test_rewrite_request_body_skips_nonstring_uri(): + body = {"parts": [{"kind": "file", "file": {"uri": None}}]} + inbox_uploads.rewrite_request_body(body) # must not raise + + +def test_rewrite_request_body_handles_non_dict_body(): + inbox_uploads.rewrite_request_body(None) # no-op + inbox_uploads.rewrite_request_body("string body") # no-op + inbox_uploads.rewrite_request_body([1, 2, 3]) # no-op + + +def test_rewrite_request_body_handles_non_dict_params(): + body = {"params": "not a dict", "parts": []} + inbox_uploads.rewrite_request_body(body) # must not raise + + +def test_rewrite_request_body_handles_non_dict_message(): + body = {"params": {"message": "not a dict"}} + inbox_uploads.rewrite_request_body(body) # must not raise + + +def test_rewrite_request_body_handles_non_list_parts(): + body = {"parts": "not a list"} + inbox_uploads.rewrite_request_body(body) # must not raise + + +def test_rewrite_request_body_handles_non_dict_file(): + body = {"parts": [{"kind": "file", "file": "not a dict"}]} + inbox_uploads.rewrite_request_body(body) # must not raise