molecule-core/workspace/inbox_uploads.py
Hongming Wang 30fb507165 feat(poll-upload): phase 5b — concurrent BatchFetcher + httpx client reuse
Resolves the two remaining findings from the Phase 1-4 retrospective
review (the Python-side counterparts to phase 5a):

1. Important — inbox_uploads.fetch_and_stage blocked the inbox poll
   loop synchronously per row. A user dragging 4 files into chat at
   once would stall the poller for 4× per-fetch latency before the
   chat message reached the agent. Add BatchFetcher: a thread-pool
   wrapper (default 4 workers) that submits fetches concurrently and
   exposes wait_all() as the barrier the inbox loop calls before
   processing the chat-message row that references the uploads.

   The drain barrier is the correctness invariant: rewrite_request_body
   must observe a populated URI cache when it walks the chat-message
   row's parts. _poll_once now drains the BatchFetcher inline before
   the first non-upload row, AND at end-of-batch (case: batch contains
   only upload rows; the corresponding chat message arrives in a later
   poll, but the future-poll-races-current-fetch race is closed).

2. Nit — fetch_and_stage created two httpx.Client instances per row
   (one for GET /content, one for POST /ack). Refactor so a single
   client serves both calls. When called from BatchFetcher, the
   batch-shared client serves every row's GET + ack — so the second
   fetch reuses the TCP+TLS handshake from the first.

Comprehensive tests:

- 13 new inbox_uploads tests:
  - fetch_and_stage with supplied client: zero httpx.Client
    constructions, GET+POST through the same client, caller's client
    not closed (lifecycle owned by caller).
  - fetch_and_stage without supplied client: exactly one
    httpx.Client constructed (was 2 pre-fix), closed on the way out.
  - BatchFetcher: 3 rows × 120ms = parallel completion < 250ms
    (vs. ~360ms serial), URI cache hot when wait_all returns,
    per-row failure isolation, single-client reuse across all
    submits, idempotent close, submit-after-close raises,
    owned-vs-supplied client lifecycle, no-op wait_all on empty
    batch, graceful httpx-missing degradation.

- 3 new inbox tests:
  - poll_once drains uploads before processing the chat-message row
    (in-place mutation of row['request_body'] proves the URI was
    rewritten BEFORE message_from_activity returned).
  - poll_once with only upload rows still drains at end-of-batch.
  - poll_once with no upload rows never constructs a BatchFetcher
    (zero overhead on the no-upload happy path).

133 total inbox + inbox_uploads tests pass; 0 regressions.

Closes the chat-upload poll-mode-perf gap end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 11:26:55 -07:00

695 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Poll-mode chat-upload fetcher + URI cache for the standalone path.
Companion to ``inbox.py``. When the workspace's inbox poller sees an
``activity_logs`` row with ``method='chat_upload_receive'`` (written by
the platform's ``uploadPollMode`` handler — workspace-server
``internal/handlers/chat_files.go``), this module:
1. Pulls the bytes from
``GET /workspaces/:id/pending-uploads/:file_id/content``.
2. Writes them to ``/workspace/.molecule/chat-uploads/<prefix>-<name>``
— same on-disk shape as the push-mode handler in
``internal_chat_uploads.py``, so anything downstream that already
resolves ``workspace:/workspace/.molecule/chat-uploads/...`` URIs
works unchanged.
3. POSTs ``/workspaces/:id/pending-uploads/:file_id/ack`` so Phase 3
sweep can clean up the platform-side ``pending_uploads`` row.
4. Records a ``platform-pending:<wsid>/<file_id> →
workspace:/workspace/.molecule/chat-uploads/...`` mapping in a
process-local cache so the chat message that arrives later
(referencing the platform-pending URI) gets rewritten before the
agent sees it.
URI rewrite ordering — the chat message containing the
``platform-pending:`` URI is logged by the platform AFTER the
``chat_upload_receive`` row, so the inbox poller sees the upload-receive
row first (lower activity_logs.id) and stages the bytes before the chat
message arrives in the same poll batch (or a later one). The URI cache
is therefore populated before the message_from_activity path needs it.
A miss (network race, restart with stale cursor) is handled by keeping
the original ``platform-pending:`` URI in the rewritten body — the agent
will see something it can't open, which is preferable to silently
dropping the URI.
Auth — same Bearer token the inbox poller uses (``platform_auth.auth_headers``).
Both endpoints are on the wsAuth-gated route, so this module can never
read another tenant's bytes even if a token is misrouted.
"""
from __future__ import annotations
import concurrent.futures
import logging
import mimetypes
import os
import re
import secrets as pysecrets
import threading
from collections import OrderedDict
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Same on-disk root as internal_chat_uploads.CHAT_UPLOAD_DIR — keeping
# these decoupled would let drift sneak in. Imported here rather than
# from internal_chat_uploads to avoid pulling in starlette as a
# transitive dep (this module runs in the standalone MCP path which
# doesn't ship the in-container HTTP server).
CHAT_UPLOAD_DIR = "/workspace/.molecule/chat-uploads"
# Per-file safety net. The platform enforces 25 MB on the staging side,
# but a buggy or hostile platform response shouldn't be able to fill the
# workspace's disk — refuse to write more than this even if the response
# claims a larger Content-Length.
MAX_FILE_BYTES = 25 * 1024 * 1024
# Network deadline for the GET. Tuned for a 25 MB transfer over a
# reasonable consumer link (~5 Mbps gives ~40s for the full payload),
# plus headroom for TLS + platform auth. Aligned with inbox poller's
# 10s default for /activity calls — both are user-perceived latency.
DEFAULT_FETCH_TIMEOUT = 60.0
# Concurrency cap for ``BatchFetcher``. Four workers is enough headroom
# for the realistic "user dragged 3-4 files into chat at once" case
# while bounding the platform's per-workspace fan-out. The cap matters
# because the platform's /content endpoint reads bytea from Postgres in
# a single round-trip per request — N workers = N concurrent DB reads
# of up to 25 MB each, so a higher cap could pressure platform memory
# without much UX win (network bandwidth is the bottleneck once the
# bytes are buffered).
DEFAULT_BATCH_FETCH_WORKERS = 4
# Upper bound on how long ``BatchFetcher.wait_all`` blocks the inbox
# poll loop before giving up on still-in-flight fetches. Aligned with
# DEFAULT_FETCH_TIMEOUT so a single hung fetch can't stall the loop
# longer than its own deadline. A timeout fires only if a worker thread
# is stuck past the underlying httpx timeout — pathological case;
# normal completion is bounded by per-fetch timeout × ceil(N/W).
DEFAULT_BATCH_WAIT_TIMEOUT = DEFAULT_FETCH_TIMEOUT + 5.0
# Cap on the URI cache. A long-lived workspace handling thousands of
# uploads shouldn't grow without bound; an LRU cap of 1024 keeps the
# entries-needed-for-a-typical-conversation well within memory.
URI_CACHE_MAX_ENTRIES = 1024
# Same character class as internal_chat_uploads — kept duplicated rather
# than imported to avoid dragging starlette into the standalone path.
_UNSAFE_FILENAME_CHARS = re.compile(r"[^a-zA-Z0-9._\-]")
def sanitize_filename(name: str) -> str:
"""Reduce a user-supplied filename to a safe form.
Mirrors ``internal_chat_uploads.sanitize_filename`` and the Go
handler's ``SanitizeFilename`` — three-way parity is pinned by
``workspace-server/internal/handlers/sanitize_filename_test.go`` and
``workspace/tests/test_internal_chat_uploads.py`` so the URI shape
is identical regardless of which path handles the upload.
"""
base = os.path.basename(name)
base = base.replace(" ", "_")
base = _UNSAFE_FILENAME_CHARS.sub("_", base)
if len(base) > 100:
ext = ""
dot = base.rfind(".")
if dot >= 0 and len(base) - dot <= 16:
ext = base[dot:]
base = base[: 100 - len(ext)] + ext
if base in ("", ".", ".."):
return "file"
return base
# ---------------------------------------------------------------------------
# URI cache — maps platform-pending URIs to local workspace: URIs
# ---------------------------------------------------------------------------
class _URICache:
"""Thread-safe bounded LRU mapping of platform-pending → workspace URIs.
Bounded so a workspace that runs for months and handles thousands of
uploads doesn't accumulate entries forever. ``OrderedDict.move_to_end``
promotes recently-used entries; eviction takes the oldest.
The cache is intentionally per-process — there is no persistence
across a workspace restart. A restart with a stale inbox cursor that
re-poll an upload-receive row will re-fetch (the bytes are already
on disk from the prior session — see ``stage_to_disk``'s O_EXCL
handling) and re-register; a chat message that referenced the
platform-pending URI BEFORE the restart and arrives AFTER would miss
the rewrite and surface the platform-pending URI to the agent. That
is preferable to a stale persisted mapping that points at a deleted
file.
"""
def __init__(self, max_entries: int = URI_CACHE_MAX_ENTRIES):
self._max = max_entries
self._lock = threading.Lock()
self._entries: "OrderedDict[str, str]" = OrderedDict()
def get(self, pending_uri: str) -> str | None:
with self._lock:
local = self._entries.get(pending_uri)
if local is not None:
self._entries.move_to_end(pending_uri)
return local
def set(self, pending_uri: str, local_uri: str) -> None:
with self._lock:
self._entries[pending_uri] = local_uri
self._entries.move_to_end(pending_uri)
while len(self._entries) > self._max:
self._entries.popitem(last=False)
def __len__(self) -> int:
with self._lock:
return len(self._entries)
def clear(self) -> None:
with self._lock:
self._entries.clear()
_cache = _URICache()
def get_cache() -> _URICache:
"""Expose the module-singleton cache for tests and the rewrite path."""
return _cache
def resolve_pending_uri(uri: str) -> str | None:
"""Return the local ``workspace:`` URI for a ``platform-pending:`` URI,
or None if not yet staged. Convenience for callers that want to
fall back to an on-demand fetch — pass the result through to
``executor_helpers.resolve_attachment_uri``.
"""
return _cache.get(uri)
# ---------------------------------------------------------------------------
# On-disk staging
# ---------------------------------------------------------------------------
def _open_safe(path: str) -> int:
"""Open ``path`` for write with ``O_CREAT|O_EXCL|O_NOFOLLOW``.
Same shape as ``internal_chat_uploads._open_safe`` — refuses to
follow a pre-existing symlink at the target and refuses to overwrite
an existing regular file. The 16-byte random prefix makes a name
collision astronomical, but defense-in-depth costs nothing.
"""
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
if hasattr(os, "O_NOFOLLOW"):
flags |= os.O_NOFOLLOW
return os.open(path, flags, 0o600)
def stage_to_disk(content: bytes, filename: str) -> str:
"""Write ``content`` under ``CHAT_UPLOAD_DIR`` and return the local URI.
Returns ``workspace:/workspace/.molecule/chat-uploads/<prefix>-<sanitized>``.
The 32-hex prefix makes the on-disk name unguessable to anything
that didn't see the response, so even if a stale agent has a guess
at the original filename it can't construct a URL to a sibling's
upload.
Raises:
OSError: write failure (mkdir, open, or write). Caller is
expected to log + skip; the activity row stays unacked so a
future poll re-tries.
ValueError: ``content`` exceeds ``MAX_FILE_BYTES``. Pre-staging
guard belt-and-braces above the platform's same-side cap.
"""
if len(content) > MAX_FILE_BYTES:
raise ValueError(
f"content size {len(content)} exceeds workspace cap {MAX_FILE_BYTES}"
)
Path(CHAT_UPLOAD_DIR).mkdir(parents=True, exist_ok=True)
sanitized = sanitize_filename(filename)
prefix = pysecrets.token_hex(16)
stored = f"{prefix}-{sanitized}"
target = os.path.join(CHAT_UPLOAD_DIR, stored)
fd = _open_safe(target)
try:
with os.fdopen(fd, "wb") as f:
f.write(content)
except OSError:
# Best-effort cleanup — partial writes leave a stub file that
# would mask a future retry's success otherwise.
try:
os.unlink(target)
except OSError:
pass
raise
return f"workspace:{CHAT_UPLOAD_DIR}/{stored}"
# ---------------------------------------------------------------------------
# Activity row → fetch/stage/ack flow
# ---------------------------------------------------------------------------
def _request_body_dict(row: dict[str, Any]) -> dict[str, Any] | None:
"""Coerce ``row['request_body']`` into a dict.
The /activity API returns request_body as JSON (already-deserialized
by httpx). Some legacy paths or mocked transports may emit a string;
handle defensively rather than raising.
"""
body = row.get("request_body")
if isinstance(body, dict):
return body
if isinstance(body, str):
import json
try:
decoded = json.loads(body)
except (TypeError, ValueError):
return None
return decoded if isinstance(decoded, dict) else None
return None
def is_chat_upload_row(row: dict[str, Any]) -> bool:
"""True if ``row`` is the platform's chat-upload-receive activity.
Used by the inbox poller to fork the row off the regular A2A
message handling path — this row is not a peer message; it's an
instruction to fetch + stage bytes. Match on ``method`` only;
``activity_type`` is already filtered to ``a2a_receive`` upstream.
"""
return row.get("method") == "chat_upload_receive"
def fetch_and_stage(
row: dict[str, Any],
*,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
client: Any = None,
) -> str | None:
"""Fetch the row's bytes, stage them under chat-uploads, and ack.
Returns the local ``workspace:`` URI on success, or ``None`` if any
step failed (logged with enough detail to triage). Failure leaves
the platform-side row unacked, so a subsequent poll retries — the
activity row stays in the cursor's window because we DO advance the
cursor (the row is "handled" from the inbox's perspective even on
fetch failure; otherwise a permanent network outage would stall the
cursor and block real chat traffic).
On success, the URI cache is updated so a subsequent chat message
referencing the same ``platform-pending:`` URI is rewritten before
the agent sees it.
Pass ``client`` to reuse a shared ``httpx.Client`` for both GET and
POST ack (saves one TLS handshake per row vs. constructing one
per-call). ``BatchFetcher`` does this across an entire poll batch so
N concurrent fetches share one connection pool.
"""
body = _request_body_dict(row)
if body is None:
logger.warning(
"inbox_uploads: row %s missing request_body; cannot fetch",
row.get("id"),
)
return None
file_id = body.get("file_id")
if not isinstance(file_id, str) or not file_id:
logger.warning(
"inbox_uploads: row %s has no file_id in request_body",
row.get("id"),
)
return None
pending_uri = body.get("uri")
if not isinstance(pending_uri, str) or not pending_uri:
# Reconstruct what the platform would have written — defensive
# against a row whose uri field got truncated. Same shape as the
# Go handler's URI builder.
pending_uri = f"platform-pending:{workspace_id}/{file_id}"
filename = body.get("name") or "file"
if not isinstance(filename, str):
filename = "file"
# Caller-supplied client: reuse for both GET + POST ack. Otherwise
# build a one-shot client and close it on the way out. Lazy httpx
# import keeps the standalone MCP path's optional dep optional.
own_client = client is None
if own_client:
try:
import httpx # noqa: WPS433
except ImportError:
logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id)
return None
client = httpx.Client(timeout=timeout_secs)
try:
return _fetch_and_stage_with_client(
client,
platform_url=platform_url,
workspace_id=workspace_id,
headers=headers,
file_id=file_id,
pending_uri=pending_uri,
filename=filename,
body=body,
)
finally:
if own_client:
try:
client.close()
except Exception: # noqa: BLE001 — close should never crash the caller
pass
def _fetch_and_stage_with_client(
client: Any,
*,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
file_id: str,
pending_uri: str,
filename: str,
body: dict[str, Any],
) -> str | None:
"""Inner body of fetch_and_stage. Always uses the supplied client for
both GET and POST so the connection pool is shared across the call.
"""
content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content"
ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack"
try:
resp = client.get(content_url, headers=headers)
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: GET %s failed: %s", content_url, exc)
return None
if resp.status_code == 404:
# Row was swept or already acked by a previous poll race — nothing
# to fetch. Don't ack again; the platform's GC handles it. This is
# a soft-skip, not an error — log at INFO so triage isn't noisy.
logger.info(
"inbox_uploads: pending upload %s already gone (404); skipping",
file_id,
)
return None
if resp.status_code >= 400:
logger.warning(
"inbox_uploads: GET %s returned %d: %s",
content_url,
resp.status_code,
(resp.text or "")[:200],
)
return None
content = resp.content or b""
if len(content) > MAX_FILE_BYTES:
logger.warning(
"inbox_uploads: refusing to stage %s — size %d exceeds cap %d",
file_id,
len(content),
MAX_FILE_BYTES,
)
return None
# Mimetype precedence: platform's Content-Type header → request_body
# mimeType field → extension guess. Same precedence as the in-
# container ingest handler.
mime_header = resp.headers.get("content-type", "").split(";")[0].strip()
mime = (
mime_header
or (body.get("mimeType") if isinstance(body.get("mimeType"), str) else "")
or (mimetypes.guess_type(filename)[0] or "")
)
try:
local_uri = stage_to_disk(content, filename)
except (OSError, ValueError) as exc:
logger.error(
"inbox_uploads: failed to stage %s (%s) to disk: %s",
file_id,
filename,
exc,
)
return None
_cache.set(pending_uri, local_uri)
logger.info(
"inbox_uploads: staged file_id=%s name=%s size=%d mime=%s pending_uri=%s local_uri=%s",
file_id,
filename,
len(content),
mime,
pending_uri,
local_uri,
)
# Ack last so a write failure above leaves the row available for a
# retry on the next poll. A failed ack is logged but doesn't roll
# back the on-disk file — the platform's sweep will clean up
# eventually.
try:
ack_resp = client.post(ack_url, headers=headers)
if ack_resp.status_code >= 400:
logger.warning(
"inbox_uploads: ack %s returned %d: %s",
ack_url,
ack_resp.status_code,
(ack_resp.text or "")[:200],
)
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: POST %s failed: %s", ack_url, exc)
return local_uri
# ---------------------------------------------------------------------------
# BatchFetcher — concurrent fetch across a single poll batch
# ---------------------------------------------------------------------------
class BatchFetcher:
"""Fetch + stage + ack a batch of upload-receive rows concurrently.
Why this exists: the inbox poll loop used to call ``fetch_and_stage``
serially per row. With N upload rows in a batch (a user dragging
multiple files into chat at once), the loop blocked for
``N × per_fetch_latency`` before processing the chat message that
referenced them — a 4-file upload at 5s each = 20s of stall
before the agent saw the user's prompt. ``BatchFetcher`` runs the
fetches on a small thread pool (default 4 workers) so the stall is
bounded by ``ceil(N/W) × per_fetch_latency`` instead.
Connection reuse: one ``httpx.Client`` is shared across every fetch
in the batch. httpx clients carry a connection pool, so a second
fetch to the same platform host reuses the TCP+TLS handshake from
the first — measurable win when fetches happen back-to-back.
Correctness invariant the caller MUST preserve: the inbox loop is
expected to call ``wait_all()`` before processing the chat-message
activity row that REFERENCES one of these uploads. Without the
barrier, the URI cache is empty when ``rewrite_request_body`` runs
and the agent sees the un-rewritten ``platform-pending:`` URI. The
caller-side test ``test_poll_once_waits_for_uploads_before_messages``
pins this end-to-end.
Use as a context manager so the executor + client are torn down
even if the caller raises mid-batch.
"""
def __init__(
self,
*,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
max_workers: int = DEFAULT_BATCH_FETCH_WORKERS,
client: Any = None,
):
self._platform_url = platform_url
self._workspace_id = workspace_id
self._headers = dict(headers) # copy so caller mutations don't leak in
self._timeout_secs = timeout_secs
# Caller can inject a client (tests do this); production callers
# let us build one. Track ownership so we only close ours.
self._own_client = client is None
if self._own_client:
try:
import httpx # noqa: WPS433
except ImportError:
# Match fetch_and_stage's behavior: log + degrade rather
# than raising at construction time. submit() will then
# return None for every row.
logger.error("inbox_uploads: httpx not installed; BatchFetcher inert")
self._client: Any = None
else:
self._client = httpx.Client(timeout=timeout_secs)
else:
self._client = client
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="upload-fetch",
)
self._futures: list[concurrent.futures.Future[Any]] = []
self._closed = False
def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None:
"""Submit ``row`` for fetch + stage + ack. Non-blocking — the
worker thread runs ``fetch_and_stage`` with the shared client.
Returns the Future so a caller that wants per-row outcome can
await it; ``None`` if the BatchFetcher is in a degraded state
(httpx missing).
"""
if self._closed:
raise RuntimeError("BatchFetcher: submit after close")
if self._client is None:
return None
fut = self._executor.submit(
fetch_and_stage,
row,
platform_url=self._platform_url,
workspace_id=self._workspace_id,
headers=self._headers,
timeout_secs=self._timeout_secs,
client=self._client,
)
self._futures.append(fut)
return fut
def wait_all(self, timeout: float | None = DEFAULT_BATCH_WAIT_TIMEOUT) -> None:
"""Block until every submitted future completes (or times out).
Per-future exceptions are logged + swallowed — ``fetch_and_stage``
already converts every error path to ``return None``, so a real
exception propagating up to here is unexpected and we don't want
one bad fetch to abort the whole batch.
Timeouts are also logged + swallowed; the caller will move on
and the un-acked rows will be retried by the next poll.
"""
if not self._futures:
return
try:
done, not_done = concurrent.futures.wait(
self._futures,
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED,
)
except Exception as exc: # noqa: BLE001 — concurrent.futures shouldn't raise here
logger.warning("inbox_uploads: BatchFetcher.wait_all crashed: %s", exc)
return
for fut in done:
exc = fut.exception()
if exc is not None:
logger.warning(
"inbox_uploads: BatchFetcher worker raised: %s", exc
)
if not_done:
logger.warning(
"inbox_uploads: BatchFetcher.wait_all left %d in-flight after %ss timeout",
len(not_done),
timeout,
)
def close(self) -> None:
"""Tear down the executor + (if owned) the httpx client.
Idempotent. After close, ``submit`` raises and the BatchFetcher
cannot be reused — construct a fresh one for the next poll.
"""
if self._closed:
return
self._closed = True
# Drain remaining futures so worker threads aren't killed mid-
# request. wait=True is the safe default; for an inbox poller a
# 60s tail at shutdown is acceptable since uploads in flight are
# the only thing close() is called between.
try:
self._executor.shutdown(wait=True)
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: executor shutdown error: %s", exc)
if self._own_client and self._client is not None:
try:
self._client.close()
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: client close error: %s", exc)
def __enter__(self) -> "BatchFetcher":
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
# ---------------------------------------------------------------------------
# URI rewrite for incoming chat messages
# ---------------------------------------------------------------------------
#
# The chat message that references a staged upload arrives as a
# SEPARATE activity_log row, with parts of kind=file containing
# platform-pending: URIs in the file.uri field. Walk the structure
# in-place and rewrite to the local workspace: URI when the cache has it.
# Unknown URIs pass through unchanged — the agent gets to choose how
# to react (most runtimes log + ignore an unresolvable URI).
def _rewrite_part(part: Any) -> None:
"""Mutate a single A2A Part dict to swap platform-pending: URIs."""
if not isinstance(part, dict):
return
file_obj = part.get("file")
if not isinstance(file_obj, dict):
return
uri = file_obj.get("uri")
if not isinstance(uri, str) or not uri.startswith("platform-pending:"):
return
rewritten = _cache.get(uri)
if rewritten:
file_obj["uri"] = rewritten
def rewrite_request_body(body: Any) -> None:
"""Mutate ``body`` in-place, replacing platform-pending: URIs with
the cached local equivalents.
Walks the same shapes ``inbox._extract_text`` accepts:
- ``body['parts']``
- ``body['params']['parts']``
- ``body['params']['message']['parts']``
No-op for shapes that don't match — the message simply passes
through to the agent as-is.
"""
if not isinstance(body, dict):
return
candidates: list[Any] = []
params = body.get("params") if isinstance(body.get("params"), dict) else None
if params:
message = params.get("message") if isinstance(params.get("message"), dict) else None
if message:
candidates.append(message.get("parts"))
candidates.append(params.get("parts"))
candidates.append(body.get("parts"))
for parts in candidates:
if isinstance(parts, list):
for part in parts:
_rewrite_part(part)