Merge pull request #2881 from Molecule-AI/feat/poll-mode-chat-upload-phase2

feat(rfc): poll-mode chat upload — phase 2 workspace inbox extension
This commit is contained in:
Hongming Wang 2026-05-05 11:44:36 +00:00 committed by GitHub
commit d80bffe3e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1377 additions and 1 deletions

View File

@ -69,6 +69,7 @@ TOP_LEVEL_MODULES = {
"executor_helpers",
"heartbeat",
"inbox",
"inbox_uploads",
"initial_prompt",
"internal_chat_uploads",
"internal_file_read",

View File

@ -432,7 +432,17 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage."""
"""Convert one /activity row into an InboxMessage.
Mutates ``row['request_body']`` in-place to swap any
``platform-pending:`` URIs to the locally-staged ``workspace:`` URIs
(see ``inbox_uploads.rewrite_request_body``) by the time the
upstream chat message arrives via this path, the upload-receive row
that staged the bytes has already populated the URI cache (lower
activity_logs.id, processed earlier in the same poll batch). A
cache miss leaves the URI untouched; the agent surfaces an
unresolvable URI rather than the inbox silently dropping the part.
"""
request_body = row.get("request_body")
if isinstance(request_body, str):
# The Go handler returns request_body as json.RawMessage; httpx
@ -443,6 +453,14 @@ def message_from_activity(row: dict[str, Any]) -> InboxMessage:
except (TypeError, ValueError):
request_body = None
# Rewrite platform-pending: URIs → workspace: URIs in-place. Imported
# at call time to keep the import graph clean for the in-container
# path that doesn't use this module (also avoids a circular: the
# uploads module is small enough that re-importing per call is
# cheap, and the Python import cache makes it free after the first).
from inbox_uploads import rewrite_request_body
rewrite_request_body(request_body)
return InboxMessage(
activity_id=str(row.get("id", "")),
text=_extract_text(request_body, row.get("summary")),
@ -532,11 +550,34 @@ def _poll_once(
if cursor is None:
rows = list(reversed(rows))
# Imported lazily at use-site so a runtime that never sees an
# upload-receive row never imports the module. Cheap on the hot
# path because Python caches the import.
from inbox_uploads import is_chat_upload_row, fetch_and_stage
new_count = 0
last_id: str | None = None
for row in rows:
if not isinstance(row, dict):
continue
if is_chat_upload_row(row):
# Side-effect row from the platform's poll-mode chat-upload
# handler — fetch the bytes, stage to /workspace/.molecule/
# chat-uploads, ack. NOT enqueued as an InboxMessage; the
# agent will see the chat message that REFERENCES this
# upload via a separate (later) activity row, with the
# pending: URI rewritten to a workspace: URI by
# message_from_activity. We DO advance the cursor past
# this row so a permanent network outage on /content
# doesn't stall the cursor and block real chat traffic.
fetch_and_stage(
row,
platform_url=platform_url,
workspace_id=workspace_id,
headers=headers,
)
last_id = str(row.get("id", "")) or last_id
continue
if _is_self_notify_row(row):
# The workspace-server's `/notify` handler writes the agent's
# own send_message_to_user POSTs to activity_logs with

475
workspace/inbox_uploads.py Normal file
View File

@ -0,0 +1,475 @@
"""Poll-mode chat-upload fetcher + URI cache for the standalone path.
Companion to ``inbox.py``. When the workspace's inbox poller sees an
``activity_logs`` row with ``method='chat_upload_receive'`` (written by
the platform's ``uploadPollMode`` handler — workspace-server
``internal/handlers/chat_files.go``), this module:
1. Pulls the bytes from
``GET /workspaces/:id/pending-uploads/:file_id/content``.
2. Writes them to ``/workspace/.molecule/chat-uploads/<prefix>-<name>``
same on-disk shape as the push-mode handler in
``internal_chat_uploads.py``, so anything downstream that already
resolves ``workspace:/workspace/.molecule/chat-uploads/...`` URIs
works unchanged.
3. POSTs ``/workspaces/:id/pending-uploads/:file_id/ack`` so Phase 3
sweep can clean up the platform-side ``pending_uploads`` row.
4. Records a ``platform-pending:<wsid>/<file_id>
workspace:/workspace/.molecule/chat-uploads/...`` mapping in a
process-local cache so the chat message that arrives later
(referencing the platform-pending URI) gets rewritten before the
agent sees it.
URI rewrite ordering the chat message containing the
``platform-pending:`` URI is logged by the platform AFTER the
``chat_upload_receive`` row, so the inbox poller sees the upload-receive
row first (lower activity_logs.id) and stages the bytes before the chat
message arrives in the same poll batch (or a later one). The URI cache
is therefore populated before the message_from_activity path needs it.
A miss (network race, restart with stale cursor) is handled by keeping
the original ``platform-pending:`` URI in the rewritten body the agent
will see something it can't open, which is preferable to silently
dropping the URI.
Auth same Bearer token the inbox poller uses (``platform_auth.auth_headers``).
Both endpoints are on the wsAuth-gated route, so this module can never
read another tenant's bytes even if a token is misrouted.
"""
from __future__ import annotations
import logging
import mimetypes
import os
import re
import secrets as pysecrets
import threading
from collections import OrderedDict
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Same on-disk root as internal_chat_uploads.CHAT_UPLOAD_DIR — keeping
# these decoupled would let drift sneak in. Imported here rather than
# from internal_chat_uploads to avoid pulling in starlette as a
# transitive dep (this module runs in the standalone MCP path which
# doesn't ship the in-container HTTP server).
CHAT_UPLOAD_DIR = "/workspace/.molecule/chat-uploads"
# Per-file safety net. The platform enforces 25 MB on the staging side,
# but a buggy or hostile platform response shouldn't be able to fill the
# workspace's disk — refuse to write more than this even if the response
# claims a larger Content-Length.
MAX_FILE_BYTES = 25 * 1024 * 1024
# Network deadline for the GET. Tuned for a 25 MB transfer over a
# reasonable consumer link (~5 Mbps gives ~40s for the full payload),
# plus headroom for TLS + platform auth. Aligned with inbox poller's
# 10s default for /activity calls — both are user-perceived latency.
DEFAULT_FETCH_TIMEOUT = 60.0
# Cap on the URI cache. A long-lived workspace handling thousands of
# uploads shouldn't grow without bound; an LRU cap of 1024 keeps the
# entries-needed-for-a-typical-conversation well within memory.
URI_CACHE_MAX_ENTRIES = 1024
# Same character class as internal_chat_uploads — kept duplicated rather
# than imported to avoid dragging starlette into the standalone path.
_UNSAFE_FILENAME_CHARS = re.compile(r"[^a-zA-Z0-9._\-]")
def sanitize_filename(name: str) -> str:
"""Reduce a user-supplied filename to a safe form.
Mirrors ``internal_chat_uploads.sanitize_filename`` and the Go
handler's ``SanitizeFilename`` — three-way parity is pinned by
``workspace-server/internal/handlers/sanitize_filename_test.go`` and
``workspace/tests/test_internal_chat_uploads.py`` so the URI shape
is identical regardless of which path handles the upload.
"""
base = os.path.basename(name)
base = base.replace(" ", "_")
base = _UNSAFE_FILENAME_CHARS.sub("_", base)
if len(base) > 100:
ext = ""
dot = base.rfind(".")
if dot >= 0 and len(base) - dot <= 16:
ext = base[dot:]
base = base[: 100 - len(ext)] + ext
if base in ("", ".", ".."):
return "file"
return base
# ---------------------------------------------------------------------------
# URI cache — maps platform-pending URIs to local workspace: URIs
# ---------------------------------------------------------------------------
class _URICache:
"""Thread-safe bounded LRU mapping of platform-pending → workspace URIs.
Bounded so a workspace that runs for months and handles thousands of
uploads doesn't accumulate entries forever. ``OrderedDict.move_to_end``
promotes recently-used entries; eviction takes the oldest.
The cache is intentionally per-process there is no persistence
across a workspace restart. A restart with a stale inbox cursor that
re-poll an upload-receive row will re-fetch (the bytes are already
on disk from the prior session see ``stage_to_disk``'s O_EXCL
handling) and re-register; a chat message that referenced the
platform-pending URI BEFORE the restart and arrives AFTER would miss
the rewrite and surface the platform-pending URI to the agent. That
is preferable to a stale persisted mapping that points at a deleted
file.
"""
def __init__(self, max_entries: int = URI_CACHE_MAX_ENTRIES):
self._max = max_entries
self._lock = threading.Lock()
self._entries: "OrderedDict[str, str]" = OrderedDict()
def get(self, pending_uri: str) -> str | None:
with self._lock:
local = self._entries.get(pending_uri)
if local is not None:
self._entries.move_to_end(pending_uri)
return local
def set(self, pending_uri: str, local_uri: str) -> None:
with self._lock:
self._entries[pending_uri] = local_uri
self._entries.move_to_end(pending_uri)
while len(self._entries) > self._max:
self._entries.popitem(last=False)
def __len__(self) -> int:
with self._lock:
return len(self._entries)
def clear(self) -> None:
with self._lock:
self._entries.clear()
_cache = _URICache()
def get_cache() -> _URICache:
"""Expose the module-singleton cache for tests and the rewrite path."""
return _cache
def resolve_pending_uri(uri: str) -> str | None:
"""Return the local ``workspace:`` URI for a ``platform-pending:`` URI,
or None if not yet staged. Convenience for callers that want to
fall back to an on-demand fetch pass the result through to
``executor_helpers.resolve_attachment_uri``.
"""
return _cache.get(uri)
# ---------------------------------------------------------------------------
# On-disk staging
# ---------------------------------------------------------------------------
def _open_safe(path: str) -> int:
"""Open ``path`` for write with ``O_CREAT|O_EXCL|O_NOFOLLOW``.
Same shape as ``internal_chat_uploads._open_safe`` refuses to
follow a pre-existing symlink at the target and refuses to overwrite
an existing regular file. The 16-byte random prefix makes a name
collision astronomical, but defense-in-depth costs nothing.
"""
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
if hasattr(os, "O_NOFOLLOW"):
flags |= os.O_NOFOLLOW
return os.open(path, flags, 0o600)
def stage_to_disk(content: bytes, filename: str) -> str:
"""Write ``content`` under ``CHAT_UPLOAD_DIR`` and return the local URI.
Returns ``workspace:/workspace/.molecule/chat-uploads/<prefix>-<sanitized>``.
The 32-hex prefix makes the on-disk name unguessable to anything
that didn't see the response, so even if a stale agent has a guess
at the original filename it can't construct a URL to a sibling's
upload.
Raises:
OSError: write failure (mkdir, open, or write). Caller is
expected to log + skip; the activity row stays unacked so a
future poll re-tries.
ValueError: ``content`` exceeds ``MAX_FILE_BYTES``. Pre-staging
guard belt-and-braces above the platform's same-side cap.
"""
if len(content) > MAX_FILE_BYTES:
raise ValueError(
f"content size {len(content)} exceeds workspace cap {MAX_FILE_BYTES}"
)
Path(CHAT_UPLOAD_DIR).mkdir(parents=True, exist_ok=True)
sanitized = sanitize_filename(filename)
prefix = pysecrets.token_hex(16)
stored = f"{prefix}-{sanitized}"
target = os.path.join(CHAT_UPLOAD_DIR, stored)
fd = _open_safe(target)
try:
with os.fdopen(fd, "wb") as f:
f.write(content)
except OSError:
# Best-effort cleanup — partial writes leave a stub file that
# would mask a future retry's success otherwise.
try:
os.unlink(target)
except OSError:
pass
raise
return f"workspace:{CHAT_UPLOAD_DIR}/{stored}"
# ---------------------------------------------------------------------------
# Activity row → fetch/stage/ack flow
# ---------------------------------------------------------------------------
def _request_body_dict(row: dict[str, Any]) -> dict[str, Any] | None:
"""Coerce ``row['request_body']`` into a dict.
The /activity API returns request_body as JSON (already-deserialized
by httpx). Some legacy paths or mocked transports may emit a string;
handle defensively rather than raising.
"""
body = row.get("request_body")
if isinstance(body, dict):
return body
if isinstance(body, str):
import json
try:
decoded = json.loads(body)
except (TypeError, ValueError):
return None
return decoded if isinstance(decoded, dict) else None
return None
def is_chat_upload_row(row: dict[str, Any]) -> bool:
"""True if ``row`` is the platform's chat-upload-receive activity.
Used by the inbox poller to fork the row off the regular A2A
message handling path this row is not a peer message; it's an
instruction to fetch + stage bytes. Match on ``method`` only;
``activity_type`` is already filtered to ``a2a_receive`` upstream.
"""
return row.get("method") == "chat_upload_receive"
def fetch_and_stage(
row: dict[str, Any],
*,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
) -> str | None:
"""Fetch the row's bytes, stage them under chat-uploads, and ack.
Returns the local ``workspace:`` URI on success, or ``None`` if any
step failed (logged with enough detail to triage). Failure leaves
the platform-side row unacked, so a subsequent poll retries the
activity row stays in the cursor's window because we DO advance the
cursor (the row is "handled" from the inbox's perspective even on
fetch failure; otherwise a permanent network outage would stall the
cursor and block real chat traffic).
On success, the URI cache is updated so a subsequent chat message
referencing the same ``platform-pending:`` URI is rewritten before
the agent sees it.
"""
body = _request_body_dict(row)
if body is None:
logger.warning(
"inbox_uploads: row %s missing request_body; cannot fetch",
row.get("id"),
)
return None
file_id = body.get("file_id")
if not isinstance(file_id, str) or not file_id:
logger.warning(
"inbox_uploads: row %s has no file_id in request_body",
row.get("id"),
)
return None
pending_uri = body.get("uri")
if not isinstance(pending_uri, str) or not pending_uri:
# Reconstruct what the platform would have written — defensive
# against a row whose uri field got truncated. Same shape as the
# Go handler's URI builder.
pending_uri = f"platform-pending:{workspace_id}/{file_id}"
filename = body.get("name") or "file"
if not isinstance(filename, str):
filename = "file"
# Lazy httpx import: the standalone MCP path uses httpx; an in-
# container caller that imports this module by accident shouldn't
# explode at import time.
try:
import httpx # noqa: WPS433
except ImportError:
logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id)
return None
content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content"
ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack"
try:
with httpx.Client(timeout=timeout_secs) as client:
resp = client.get(content_url, headers=headers)
except Exception as exc: # noqa: BLE001
logger.warning(
"inbox_uploads: GET %s failed: %s", content_url, exc
)
return None
if resp.status_code == 404:
# Row was swept or already acked by a previous poll race — nothing
# to fetch. Don't ack again; the platform's GC handles it. This is
# a soft-skip, not an error — log at INFO so triage isn't noisy.
logger.info(
"inbox_uploads: pending upload %s already gone (404); skipping",
file_id,
)
return None
if resp.status_code >= 400:
logger.warning(
"inbox_uploads: GET %s returned %d: %s",
content_url,
resp.status_code,
(resp.text or "")[:200],
)
return None
content = resp.content or b""
if len(content) > MAX_FILE_BYTES:
logger.warning(
"inbox_uploads: refusing to stage %s — size %d exceeds cap %d",
file_id,
len(content),
MAX_FILE_BYTES,
)
return None
# Mimetype precedence: platform's Content-Type header → request_body
# mimeType field → extension guess. Same precedence as the in-
# container ingest handler.
mime_header = resp.headers.get("content-type", "").split(";")[0].strip()
mime = (
mime_header
or (body.get("mimeType") if isinstance(body.get("mimeType"), str) else "")
or (mimetypes.guess_type(filename)[0] or "")
)
try:
local_uri = stage_to_disk(content, filename)
except (OSError, ValueError) as exc:
logger.error(
"inbox_uploads: failed to stage %s (%s) to disk: %s",
file_id,
filename,
exc,
)
return None
_cache.set(pending_uri, local_uri)
logger.info(
"inbox_uploads: staged file_id=%s name=%s size=%d mime=%s pending_uri=%s local_uri=%s",
file_id,
filename,
len(content),
mime,
pending_uri,
local_uri,
)
# Ack last so a write failure above leaves the row available for a
# retry on the next poll. A failed ack is logged but doesn't roll
# back the on-disk file — the platform's sweep will clean up
# eventually.
try:
with httpx.Client(timeout=timeout_secs) as client:
ack_resp = client.post(ack_url, headers=headers)
if ack_resp.status_code >= 400:
logger.warning(
"inbox_uploads: ack %s returned %d: %s",
ack_url,
ack_resp.status_code,
(ack_resp.text or "")[:200],
)
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: POST %s failed: %s", ack_url, exc)
return local_uri
# ---------------------------------------------------------------------------
# URI rewrite for incoming chat messages
# ---------------------------------------------------------------------------
#
# The chat message that references a staged upload arrives as a
# SEPARATE activity_log row, with parts of kind=file containing
# platform-pending: URIs in the file.uri field. Walk the structure
# in-place and rewrite to the local workspace: URI when the cache has it.
# Unknown URIs pass through unchanged — the agent gets to choose how
# to react (most runtimes log + ignore an unresolvable URI).
def _rewrite_part(part: Any) -> None:
"""Mutate a single A2A Part dict to swap platform-pending: URIs."""
if not isinstance(part, dict):
return
file_obj = part.get("file")
if not isinstance(file_obj, dict):
return
uri = file_obj.get("uri")
if not isinstance(uri, str) or not uri.startswith("platform-pending:"):
return
rewritten = _cache.get(uri)
if rewritten:
file_obj["uri"] = rewritten
def rewrite_request_body(body: Any) -> None:
"""Mutate ``body`` in-place, replacing platform-pending: URIs with
the cached local equivalents.
Walks the same shapes ``inbox._extract_text`` accepts:
- ``body['parts']``
- ``body['params']['parts']``
- ``body['params']['message']['parts']``
No-op for shapes that don't match — the message simply passes
through to the agent as-is.
"""
if not isinstance(body, dict):
return
candidates: list[Any] = []
params = body.get("params") if isinstance(body.get("params"), dict) else None
if params:
message = params.get("message") if isinstance(params.get("message"), dict) else None
if message:
candidates.append(message.get("parts"))
candidates.append(params.get("parts"))
candidates.append(body.get("parts"))
for parts in candidates:
if isinstance(parts, list):
for part in parts:
_rewrite_part(part)

View File

@ -701,3 +701,165 @@ def test_set_notification_callback_none_clears(state: inbox.InboxState):
state.record(_msg("act-1"))
assert received == []
# ---------------------------------------------------------------------------
# Phase 2 — chat_upload_receive rows route to inbox_uploads.fetch_and_stage
# ---------------------------------------------------------------------------
def test_poll_once_skips_chat_upload_row_from_queue(state: inbox.InboxState, monkeypatch, tmp_path):
"""A row with method='chat_upload_receive' must NOT enqueue as a
chat message it's a side-effect telling the workspace to fetch
bytes. Pin the contract so a refactor that flattens the row loop
can't silently re-enqueue these as 'empty A2A message' rows."""
import inbox_uploads
monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads"))
inbox_uploads.get_cache().clear()
rows = [
{
"id": "act-1",
"source_id": None,
"method": "chat_upload_receive",
"summary": "chat_upload_receive: foo.pdf",
"request_body": {
"file_id": "abc123",
"name": "foo.pdf",
"mimeType": "application/pdf",
"size": 4,
"uri": "platform-pending:ws-1/abc123",
},
"created_at": "2026-05-04T10:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
fetch_called = []
def fake_fetch(row, **kwargs):
fetch_called.append((row.get("id"), kwargs["workspace_id"]))
return "workspace:/local/foo.pdf"
with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Not enqueued + cursor advanced.
assert n == 0
assert state.peek(10) == []
assert state.load_cursor() == "act-1"
# fetch_and_stage was invoked with the row and workspace_id.
assert fetch_called == [("act-1", "ws-1")]
def test_poll_once_chat_upload_row_then_chat_message_rewrites_uri(state: inbox.InboxState, monkeypatch, tmp_path):
"""The classic ordering: upload-receive row first (lower id), chat
message referencing platform-pending: URI second. The chat message
that lands in the inbox must have its URI rewritten to the local
workspace: URI before the agent sees it.
"""
import inbox_uploads
monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads"))
cache = inbox_uploads.get_cache()
cache.clear()
# Pretend the fetch already populated the cache. (The real flow
# populates it inside fetch_and_stage; we patch that to keep the
# test focused on the rewrite contract.)
cache.set("platform-pending:ws-1/abc123", "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf")
rows = [
{
"id": "act-1",
"source_id": None,
"method": "chat_upload_receive",
"summary": "chat_upload_receive: foo.pdf",
"request_body": {
"file_id": "abc123",
"name": "foo.pdf",
"mimeType": "application/pdf",
"size": 4,
"uri": "platform-pending:ws-1/abc123",
},
"created_at": "2026-05-04T10:00:00Z",
},
{
"id": "act-2",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {
"params": {
"message": {
"parts": [
{"kind": "text", "text": "look at this"},
{
"kind": "file",
"file": {
"uri": "platform-pending:ws-1/abc123",
"name": "foo.pdf",
},
},
]
}
}
},
"created_at": "2026-05-04T10:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
def fake_fetch(row, **kwargs):
return "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf"
with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the chat message is enqueued.
assert n == 1
queue = state.peek(10)
assert len(queue) == 1
msg = queue[0]
assert msg.activity_id == "act-2"
# The URI in the row's request_body was mutated by message_from_activity
# → rewrite_request_body. Re-extracting reveals the rewritten value.
rewritten = rows[1]["request_body"]["params"]["message"]["parts"][1]["file"]["uri"]
assert rewritten == "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf"
def test_poll_once_chat_upload_row_advances_cursor_even_on_fetch_failure(
state: inbox.InboxState, monkeypatch, tmp_path
):
"""A permanent network failure on /content must NOT stall the cursor
otherwise one bad upload blocks all real chat traffic for the
workspace. fetch_and_stage returns None on failure, but the row is
still considered handled from the cursor's perspective."""
import inbox_uploads
monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads"))
rows = [
{
"id": "act-broken",
"source_id": None,
"method": "chat_upload_receive",
"summary": "chat_upload_receive: doomed.pdf",
"request_body": {
"file_id": "doom",
"name": "doomed.pdf",
"uri": "platform-pending:ws-1/doom",
},
"created_at": "2026-05-04T10:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
def fake_fetch(row, **kwargs):
return None # network failure
with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch):
inbox._poll_once(state, "http://platform", "ws-1", {})
assert state.peek(10) == []
assert state.load_cursor() == "act-broken"

View File

@ -0,0 +1,697 @@
"""Tests for workspace/inbox_uploads.py — poll-mode chat-upload fetcher.
Covers the full activity-row fetch stage-on-disk ack flow plus
the URI cache and the rewrite that swaps platform-pending: URIs to
local workspace: URIs in subsequent chat messages.
"""
from __future__ import annotations
import os
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
import inbox_uploads
@pytest.fixture(autouse=True)
def _reset_cache_and_dir(tmp_path, monkeypatch):
"""Each test starts with an empty URI cache and a temp upload dir
so on-disk artifacts from one test don't leak into the next."""
inbox_uploads.get_cache().clear()
monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads"))
yield
inbox_uploads.get_cache().clear()
# ---------------------------------------------------------------------------
# sanitize_filename — parity with internal_chat_uploads + Go SanitizeFilename
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw,want",
[
("../../etc/passwd", "passwd"),
("/etc/passwd", "passwd"),
("hello world.pdf", "hello_world.pdf"),
("weird;chars!?.txt", "weird_chars__.txt"),
("中文.docx", "__.docx"),
("file (1).pdf", "file__1_.pdf"),
("report-2026.05.04_v2.pdf", "report-2026.05.04_v2.pdf"),
("", "file"),
(".", "file"),
("..", "file"),
],
)
def test_sanitize_filename_parity_with_python_internal(raw, want):
assert inbox_uploads.sanitize_filename(raw) == want
def test_sanitize_filename_caps_at_100_preserves_short_extension():
long = "a" * 200 + ".pdf"
got = inbox_uploads.sanitize_filename(long)
assert len(got) == 100
assert got.endswith(".pdf")
def test_sanitize_filename_drops_long_extension():
long = "c" * 90 + ".thisisaverylongextensionnotpreserved"
got = inbox_uploads.sanitize_filename(long)
assert len(got) == 100
assert ".thisisaverylongextensionnotpreserved" not in got
# ---------------------------------------------------------------------------
# _URICache — LRU semantics
# ---------------------------------------------------------------------------
def test_uricache_set_get_roundtrip():
c = inbox_uploads._URICache(max_entries=10)
c.set("platform-pending:ws/1", "workspace:/local/1")
assert c.get("platform-pending:ws/1") == "workspace:/local/1"
def test_uricache_get_missing_returns_none():
c = inbox_uploads._URICache(max_entries=10)
assert c.get("platform-pending:ws/missing") is None
def test_uricache_evicts_oldest_at_capacity():
c = inbox_uploads._URICache(max_entries=2)
c.set("a", "A")
c.set("b", "B")
c.set("c", "C") # evicts "a"
assert c.get("a") is None
assert c.get("b") == "B"
assert c.get("c") == "C"
assert len(c) == 2
def test_uricache_get_promotes_recently_used():
c = inbox_uploads._URICache(max_entries=2)
c.set("a", "A")
c.set("b", "B")
# Promote "a" by reading; next set should evict "b" instead of "a".
assert c.get("a") == "A"
c.set("c", "C")
assert c.get("a") == "A"
assert c.get("b") is None
assert c.get("c") == "C"
def test_uricache_overwrite_updates_value():
c = inbox_uploads._URICache(max_entries=10)
c.set("k", "v1")
c.set("k", "v2")
assert c.get("k") == "v2"
assert len(c) == 1
def test_uricache_clear():
c = inbox_uploads._URICache(max_entries=10)
c.set("a", "A")
c.set("b", "B")
c.clear()
assert c.get("a") is None
assert len(c) == 0
def test_resolve_pending_uri_uses_module_cache():
inbox_uploads.get_cache().set("platform-pending:ws/x", "workspace:/local/x")
assert inbox_uploads.resolve_pending_uri("platform-pending:ws/x") == "workspace:/local/x"
assert inbox_uploads.resolve_pending_uri("platform-pending:ws/missing") is None
# ---------------------------------------------------------------------------
# stage_to_disk
# ---------------------------------------------------------------------------
def test_stage_to_disk_writes_file_and_returns_workspace_uri(tmp_path):
uri = inbox_uploads.stage_to_disk(b"hello", "report.pdf")
assert uri.startswith("workspace:")
path = uri[len("workspace:"):]
assert os.path.isfile(path)
with open(path, "rb") as f:
assert f.read() == b"hello"
assert path.endswith("-report.pdf")
# Prefix is 32 hex chars + "-" + name.
name = os.path.basename(path)
prefix, _, _ = name.partition("-")
assert len(prefix) == 32
def test_stage_to_disk_sanitizes_filename():
uri = inbox_uploads.stage_to_disk(b"x", "../../evil.txt")
name = os.path.basename(uri)
assert "/" not in name
assert name.endswith("-evil.txt")
def test_stage_to_disk_rejects_oversize():
with pytest.raises(ValueError):
inbox_uploads.stage_to_disk(b"x" * (inbox_uploads.MAX_FILE_BYTES + 1), "big.bin")
def test_stage_to_disk_creates_directory_if_missing():
# CHAT_UPLOAD_DIR is monkeypatched to a non-existent tmp path; the
# call must mkdir -p it on first write.
assert not os.path.exists(inbox_uploads.CHAT_UPLOAD_DIR)
inbox_uploads.stage_to_disk(b"x", "a.txt")
assert os.path.isdir(inbox_uploads.CHAT_UPLOAD_DIR)
def test_stage_to_disk_write_failure_cleans_partial_file(tmp_path, monkeypatch):
# open() succeeds but write() fails — the partial file must be
# removed so a retry can claim a fresh prefix without colliding.
real_fdopen = os.fdopen
written_paths: list[str] = []
def boom_fdopen(fd, mode):
# Wrap the real file with one whose write() raises.
f = real_fdopen(fd, mode)
# Track which path's fd we opened by inspecting the chat-upload dir.
for entry in os.listdir(inbox_uploads.CHAT_UPLOAD_DIR):
written_paths.append(os.path.join(inbox_uploads.CHAT_UPLOAD_DIR, entry))
original_write = f.write
def bad_write(b):
original_write(b"") # ensure file exists
raise OSError(28, "no space")
f.write = bad_write
return f
monkeypatch.setattr(os, "fdopen", boom_fdopen)
with pytest.raises(OSError):
inbox_uploads.stage_to_disk(b"data", "x.txt")
# All staged files cleaned up.
for p in written_paths:
assert not os.path.exists(p)
def test_stage_to_disk_write_failure_unlink_failure_swallowed(monkeypatch):
# open() succeeds, write() fails, unlink() ALSO fails — the unlink
# error is swallowed and the original write error propagates.
real_fdopen = os.fdopen
def boom_fdopen(fd, mode):
f = real_fdopen(fd, mode)
def bad_write(_):
raise OSError(28, "no space")
f.write = bad_write
return f
def bad_unlink(_):
raise OSError(13, "permission denied")
monkeypatch.setattr(os, "fdopen", boom_fdopen)
monkeypatch.setattr(os, "unlink", bad_unlink)
with pytest.raises(OSError) as ei:
inbox_uploads.stage_to_disk(b"data", "x.txt")
# Original write error, not the unlink error.
assert ei.value.errno == 28
def test_stage_to_disk_propagates_oserror_and_cleans_partial(tmp_path, monkeypatch):
# Make the dir read-only AFTER mkdir succeeds, so open() fails. Skip
# this on platforms where the dir's permissions don't restrict the
# process owner (root in Docker, etc.).
inbox_uploads.stage_to_disk(b"first", "a.txt")
if os.geteuid() == 0:
pytest.skip("root bypasses permission bits")
os.chmod(inbox_uploads.CHAT_UPLOAD_DIR, 0o500)
try:
with pytest.raises(OSError):
inbox_uploads.stage_to_disk(b"second", "b.txt")
finally:
os.chmod(inbox_uploads.CHAT_UPLOAD_DIR, 0o755)
# ---------------------------------------------------------------------------
# is_chat_upload_row + _request_body_dict
# ---------------------------------------------------------------------------
def test_is_chat_upload_row_true_on_method_match():
assert inbox_uploads.is_chat_upload_row({"method": "chat_upload_receive"})
def test_is_chat_upload_row_false_on_other_methods():
assert not inbox_uploads.is_chat_upload_row({"method": "message/send"})
assert not inbox_uploads.is_chat_upload_row({"method": None})
assert not inbox_uploads.is_chat_upload_row({})
def test_request_body_dict_passthrough():
body = {"file_id": "x"}
assert inbox_uploads._request_body_dict({"request_body": body}) is body
def test_request_body_dict_string_decoded():
assert inbox_uploads._request_body_dict({"request_body": '{"a": 1}'}) == {"a": 1}
def test_request_body_dict_invalid_string_returns_none():
assert inbox_uploads._request_body_dict({"request_body": "not json"}) is None
def test_request_body_dict_non_dict_after_decode_returns_none():
assert inbox_uploads._request_body_dict({"request_body": "[1, 2]"}) is None
def test_request_body_dict_other_type_returns_none():
assert inbox_uploads._request_body_dict({"request_body": 123}) is None
# ---------------------------------------------------------------------------
# fetch_and_stage — the full GET / write / ack flow
# ---------------------------------------------------------------------------
def _make_resp(status_code: int, content: bytes = b"", content_type: str = "", text: str = "") -> MagicMock:
resp = MagicMock()
resp.status_code = status_code
resp.content = content
headers: dict[str, str] = {}
if content_type:
headers["content-type"] = content_type
resp.headers = headers
resp.text = text
return resp
def _patch_httpx_for_fetch(get_resp: MagicMock, ack_resp: MagicMock | None = None):
"""Patch httpx.Client so each new context-manager returns a client
whose .get() returns get_resp and .post() returns ack_resp.
"""
client = MagicMock()
client.__enter__ = MagicMock(return_value=client)
client.__exit__ = MagicMock(return_value=False)
client.get = MagicMock(return_value=get_resp)
client.post = MagicMock(return_value=ack_resp or _make_resp(200))
return patch("httpx.Client", return_value=client), client
def _row(file_id: str = "file-1", uri: str | None = None, name: str = "report.pdf", body_extra: dict | None = None) -> dict:
body: dict[str, Any] = {
"file_id": file_id,
"name": name,
"mimeType": "application/pdf",
"size": 9,
}
if uri is not None:
body["uri"] = uri
if body_extra:
body.update(body_extra)
return {
"id": "act-100",
"source_id": None,
"method": "chat_upload_receive",
"summary": "chat_upload_receive: report.pdf",
"request_body": body,
"created_at": "2026-05-04T10:00:00Z",
}
def test_fetch_and_stage_happy_path_writes_file_acks_and_caches():
pending_uri = "platform-pending:ws-1/file-1"
row = _row(uri=pending_uri)
get_resp = _make_resp(200, content=b"PDF-bytes", content_type="application/pdf")
p, client = _patch_httpx_for_fetch(get_resp)
with p:
local_uri = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={"Authorization": "Bearer t"}
)
assert local_uri is not None
assert local_uri.startswith("workspace:")
# On-disk file content matches.
path = local_uri[len("workspace:"):]
with open(path, "rb") as f:
assert f.read() == b"PDF-bytes"
# Cache populated.
assert inbox_uploads.get_cache().get(pending_uri) == local_uri
# Ack POSTed to the right URL.
client.post.assert_called_once()
args, kwargs = client.post.call_args
assert "/pending-uploads/file-1/ack" in args[0]
assert kwargs["headers"]["Authorization"] == "Bearer t"
def test_fetch_and_stage_reconstructs_uri_when_missing_in_body():
row = _row(uri=None) # request_body has no 'uri'
get_resp = _make_resp(200, content=b"x", content_type="text/plain")
p, _ = _patch_httpx_for_fetch(get_resp)
with p:
inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
# Cache key reconstructed from workspace_id + file_id.
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-1") is not None
def test_fetch_and_stage_returns_none_on_missing_request_body():
row = {"id": "act-100", "method": "chat_upload_receive"}
# No httpx call should happen, but we patch defensively.
p, client = _patch_httpx_for_fetch(_make_resp(200))
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
client.get.assert_not_called()
def test_fetch_and_stage_returns_none_on_missing_file_id():
row = {"id": "act-100", "method": "chat_upload_receive", "request_body": {"name": "x.pdf"}}
p, client = _patch_httpx_for_fetch(_make_resp(200))
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
client.get.assert_not_called()
def test_fetch_and_stage_handles_nonstring_file_id():
row = {"id": "act-100", "method": "chat_upload_receive", "request_body": {"file_id": 123}}
p, client = _patch_httpx_for_fetch(_make_resp(200))
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
client.get.assert_not_called()
def test_fetch_and_stage_404_returns_none_no_ack():
row = _row()
get_resp = _make_resp(404, text="gone")
ack_resp = _make_resp(200)
p, client = _patch_httpx_for_fetch(get_resp, ack_resp)
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
# No ack — the row is already gone.
client.post.assert_not_called()
def test_fetch_and_stage_500_returns_none_no_ack():
row = _row()
p, client = _patch_httpx_for_fetch(_make_resp(500, text="boom"))
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
client.post.assert_not_called()
def test_fetch_and_stage_network_error_returns_none():
row = _row()
client = MagicMock()
client.__enter__ = MagicMock(return_value=client)
client.__exit__ = MagicMock(return_value=False)
client.get = MagicMock(side_effect=RuntimeError("connection refused"))
with patch("httpx.Client", return_value=client):
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
def test_fetch_and_stage_oversize_response_refused():
row = _row()
big = b"x" * (inbox_uploads.MAX_FILE_BYTES + 1)
p, client = _patch_httpx_for_fetch(_make_resp(200, content=big, content_type="application/octet-stream"))
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
client.post.assert_not_called()
def test_fetch_and_stage_ack_failure_does_not_invalidate_local_uri():
row = _row(uri="platform-pending:ws-1/file-1")
get_resp = _make_resp(200, content=b"data", content_type="text/plain")
ack_resp = _make_resp(500, text="ack failed")
p, _ = _patch_httpx_for_fetch(get_resp, ack_resp)
with p:
local_uri = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
# On-disk staging succeeded; ack failure is logged but doesn't
# roll back the cache.
assert local_uri is not None
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-1") == local_uri
def test_fetch_and_stage_ack_network_error_swallowed():
row = _row(uri="platform-pending:ws-1/file-1")
client = MagicMock()
client.__enter__ = MagicMock(return_value=client)
client.__exit__ = MagicMock(return_value=False)
client.get = MagicMock(return_value=_make_resp(200, content=b"data", content_type="text/plain"))
client.post = MagicMock(side_effect=RuntimeError("ack network error"))
with patch("httpx.Client", return_value=client):
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is not None # GET succeeded → URI returned even if ack blew up
def test_fetch_and_stage_uses_response_content_type_when_present():
row = _row(name="thing.bin", body_extra={"mimeType": "application/x-bogus"})
# Response says image/png; should win over body's mimeType.
get_resp = _make_resp(200, content=b"PNG", content_type="image/png; charset=binary")
p, _ = _patch_httpx_for_fetch(get_resp)
with p:
# We don't assert on returned mime (not part of the contract);
# the test just verifies the happy path runs without trying to
# parse the trailing parameter.
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is not None
def test_fetch_and_stage_nonstring_filename_falls_back_to_file():
# body['name'] is a non-string (e.g. truncated to None or a number);
# filename must default to "file" so sanitize_filename has something
# to work with.
row = _row(body_extra={"name": 12345})
p, _ = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain"))
with p:
local_uri = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert local_uri is not None
assert local_uri.endswith("-file")
def test_fetch_and_stage_default_filename_when_missing():
row = {
"id": "act",
"method": "chat_upload_receive",
"request_body": {"file_id": "file-1"},
}
p, _ = _patch_httpx_for_fetch(_make_resp(200, content=b"data", content_type="text/plain"))
with p:
local_uri = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert local_uri is not None
assert local_uri.endswith("-file") # default filename
def test_fetch_and_stage_disk_write_failure_returns_none(monkeypatch):
row = _row()
p, client = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain"))
def bad_stage(*args, **kwargs):
raise OSError(28, "no space left")
monkeypatch.setattr(inbox_uploads, "stage_to_disk", bad_stage)
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
client.post.assert_not_called()
def test_fetch_and_stage_disk_value_error_returns_none(monkeypatch):
row = _row()
p, client = _patch_httpx_for_fetch(_make_resp(200, content=b"x", content_type="text/plain"))
def bad_stage(*args, **kwargs):
raise ValueError("oversize after sanity check")
monkeypatch.setattr(inbox_uploads, "stage_to_disk", bad_stage)
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is None
client.post.assert_not_called()
def test_fetch_and_stage_httpx_missing_returns_none(monkeypatch):
row = _row()
# Simulate httpx not installed by making the import fail.
import sys
real_httpx = sys.modules.pop("httpx", None)
monkeypatch.setitem(sys.modules, "httpx", None)
try:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
finally:
if real_httpx is not None:
sys.modules["httpx"] = real_httpx
else:
sys.modules.pop("httpx", None)
assert result is None
def test_fetch_and_stage_falls_back_to_extension_mime(monkeypatch):
row = _row(name="snap.png", body_extra={"mimeType": ""}) # no mimeType in body
# Response also has no content-type so it falls through to mimetypes.guess_type.
get_resp = _make_resp(200, content=b"PNG", content_type="")
p, _ = _patch_httpx_for_fetch(get_resp)
with p:
result = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert result is not None
# ---------------------------------------------------------------------------
# rewrite_request_body — URI swap in chat-message bodies
# ---------------------------------------------------------------------------
def test_rewrite_request_body_swaps_pending_uri_in_message_parts():
inbox_uploads.get_cache().set("platform-pending:ws/1", "workspace:/local/1")
body = {
"method": "message/send",
"params": {
"message": {
"parts": [
{"kind": "text", "text": "see this"},
{"kind": "file", "file": {"uri": "platform-pending:ws/1", "name": "a.pdf"}},
]
}
},
}
inbox_uploads.rewrite_request_body(body)
assert body["params"]["message"]["parts"][1]["file"]["uri"] == "workspace:/local/1"
def test_rewrite_request_body_swaps_in_params_parts():
inbox_uploads.get_cache().set("platform-pending:ws/2", "workspace:/local/2")
body = {
"params": {
"parts": [
{"kind": "file", "file": {"uri": "platform-pending:ws/2"}},
]
}
}
inbox_uploads.rewrite_request_body(body)
assert body["params"]["parts"][0]["file"]["uri"] == "workspace:/local/2"
def test_rewrite_request_body_swaps_in_top_level_parts():
inbox_uploads.get_cache().set("platform-pending:ws/3", "workspace:/local/3")
body = {
"parts": [{"kind": "file", "file": {"uri": "platform-pending:ws/3"}}]
}
inbox_uploads.rewrite_request_body(body)
assert body["parts"][0]["file"]["uri"] == "workspace:/local/3"
def test_rewrite_request_body_leaves_unmatched_uri_unchanged():
# No cache entry → URI stays as-is. Agent surfaces the unresolvable
# URI rather than the inbox silently dropping the part.
body = {
"parts": [{"kind": "file", "file": {"uri": "platform-pending:ws/missing"}}]
}
inbox_uploads.rewrite_request_body(body)
assert body["parts"][0]["file"]["uri"] == "platform-pending:ws/missing"
def test_rewrite_request_body_leaves_non_pending_uri_unchanged():
inbox_uploads.get_cache().set("platform-pending:ws/3", "workspace:/local/3")
body = {
"parts": [
{"kind": "file", "file": {"uri": "workspace:/already-local.pdf"}},
{"kind": "file", "file": {"uri": "https://example.com/x.pdf"}},
]
}
inbox_uploads.rewrite_request_body(body)
assert body["parts"][0]["file"]["uri"] == "workspace:/already-local.pdf"
assert body["parts"][1]["file"]["uri"] == "https://example.com/x.pdf"
def test_rewrite_request_body_skips_non_dict_parts():
body = {"parts": ["not a dict", 42, None]}
inbox_uploads.rewrite_request_body(body) # must not raise
assert body["parts"] == ["not a dict", 42, None]
def test_rewrite_request_body_skips_text_parts():
body = {
"parts": [{"kind": "text", "text": "platform-pending:ws/should-not-rewrite"}]
}
inbox_uploads.rewrite_request_body(body)
# Text content not touched — only file.uri fields are URIs.
assert body["parts"][0]["text"] == "platform-pending:ws/should-not-rewrite"
def test_rewrite_request_body_skips_part_without_file_dict():
body = {"parts": [{"kind": "file"}]} # no file key
inbox_uploads.rewrite_request_body(body)
assert body["parts"] == [{"kind": "file"}]
def test_rewrite_request_body_skips_file_without_uri():
body = {"parts": [{"kind": "file", "file": {"name": "x.pdf"}}]}
inbox_uploads.rewrite_request_body(body)
assert body["parts"][0]["file"] == {"name": "x.pdf"}
def test_rewrite_request_body_skips_nonstring_uri():
body = {"parts": [{"kind": "file", "file": {"uri": None}}]}
inbox_uploads.rewrite_request_body(body) # must not raise
def test_rewrite_request_body_handles_non_dict_body():
inbox_uploads.rewrite_request_body(None) # no-op
inbox_uploads.rewrite_request_body("string body") # no-op
inbox_uploads.rewrite_request_body([1, 2, 3]) # no-op
def test_rewrite_request_body_handles_non_dict_params():
body = {"params": "not a dict", "parts": []}
inbox_uploads.rewrite_request_body(body) # must not raise
def test_rewrite_request_body_handles_non_dict_message():
body = {"params": {"message": "not a dict"}}
inbox_uploads.rewrite_request_body(body) # must not raise
def test_rewrite_request_body_handles_non_list_parts():
body = {"parts": "not a list"}
inbox_uploads.rewrite_request_body(body) # must not raise
def test_rewrite_request_body_handles_non_dict_file():
body = {"parts": [{"kind": "file", "file": "not a dict"}]}
inbox_uploads.rewrite_request_body(body) # must not raise