feat(inbox-uploads): TS port of upload-resolution flow (RFC#640 Layer B) #26
Reference in New Issue
Block a user
Delete Branch "feat/inbox-uploads-rfc640-layer-b"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
RFC#640 4-layer cascade — Layer B (TS implementation). Adds
src/inbox-uploads.tsmirroring the Python reference atmolecule-ai-workspace-runtime/molecule_runtime/inbox_uploads.py. Closes the asymmetry between Python SDK (full 724 LOC module, production-deployed) and TS base MCP (previously zero inbox plumbing).Companion to:
_build_channel_instructionsmaking the resolution flow MANDATORY.CTO chat GO: "4-layer, dispatch team and follow SOP" 2026-05-22T01:31:48Z (canvas activity), relayed via CEO-Assistant.
Empirical trigger
2026-05-21 ~23:12Z agents-team: CTO pasted a PNG into the canvas. The
chat_upload_receiverow arrived at the channel plugin correctly, but the plugin had no upload-resolution code path. Agent saw aplatform-pending:<ws>/<file_id>URI it couldn't open. Zero error surfaced. Silent file loss.Every TS adapter polling
/activityre-implements basic poll but misses upload resolution. This layer gives them a one-line import to replace per-adapter hand-rolling.API surface
Re-exported from
src/index.tsso consumers can either:package.jsonadds./inbox-uploadstoexportsfor the subpath form.URICacheBounded LRU mapping
platform-pending:<ws>/<file_id>→file://<localPath>. JS Maps preserve insertion order;setre-inserts to promote to most-recent; eviction pops the first (oldest) entry.Default
URI_CACHE_MAX_ENTRIES = 32— tighter than Python's1024because TS adapters typically run in tighter memory budgets (channel plugin / telegram-style adapters / codex bridges). Adapters with looser budgets can override via the constructor. Layer A spec section documents both values + the rationale.resolvePendingUpload(opts: ResolveUploadOptions): Promise<ResolveUploadResult>Runs the 5-step MANDATORY flow named in
_build_channel_instructions:GET /workspaces/<ws>/pending-uploads/<file_id>/content(using the caller's auth headers; the same bearer used for/activitypolling —/pending-uploads/*is wsAuth-gated, no separate handshake).<cacheDir>/<32-hex-prefix>-<sanitized-filename>withmode 0o600+wxflag (refuse-if-exists). Random prefix matches Python'spysecrets.token_hex(16)for parallel-upload-safe naming.POST /workspaces/<ws>/pending-uploads/<file_id>/ack. Ack failure WARNS but does NOT throw — bytes are on disk and the user-visible outcome (agent can read the file) is achieved; the platform-side row eventually surfaces via Phase 3 sweep.cache.set("platform-pending:<ws>/<file_id>", "file://<localPath>")if a cache was passed.rewritePendingURIsagainst any subsequent activity row.Returns
{ localPath, localUri, mimeType, size, cachedPendingUri }.rewritePendingURIs(body: unknown, cache: URICache): unknownDeep walk + non-destructive rewrite. Walks arbitrary JSON shapes. The walk explicitly handles the two documented surfaces:
attachments[](peer_info-enriched activity rows from L1 mc#1654)params.message.parts[*].file.uri(a2a-sdk v1 message parts)Conservative: only rewrites string values that exactly start with
platform-pending:AND are present in the cache. Cache miss preserves the URI (no silent drop — better to surface an unopenable URI than to drop the reference entirely).isChatUploadReceiveRow(row: unknown): booleanConvenience matcher mirroring Python's
is_chat_upload_row. Adapters fork upload rows off the regular A2A-message-handling path.Bidirectional drift guard
The file header (15+ lines) explicitly tells future editors:
Layer A's spec tests + Layer B's implementation tests + Layer D's contract test give us drift catchable from any side.
Test envelope (
src/__tests__/inbox-uploads.test.ts, 27 cases)URICache (8 tests): missing-key returns undefined / store-and-retrieve / set-replaces-without-growth / cap-exceeded-evicts-oldest / promote-on-get-survives-eviction / clear / bounds-check (rejects
<1) / default-constant-is-32.isChatUploadReceiveRow (4 tests): matches happy path / rejects other methods / rejects non-object defensively / rejects object-without-method.
resolvePendingUpload (7 tests):
^[0-9a-f]{32}-pasted\.png$.../../../etc/passwd→ sanitized to safepasswdbasename.rewritePendingURIs (8 tests):
attachments[].urirewrite.message.parts[*].file.urirewrite — multiple parts in one body."hello world","workspace:..."etc.).Full jest suite: 6 suites / 162 passed / 1 skipped. No regression on the 135 pre-existing tests.
tsc + jest verified locally
Run on operator with Node 20.20.2 + npm 10.8.2.
Cascade context
When this PR merges →
@molecule-ai/mcp-server@1.3.0published (or whatever the next minor) → Layer C consumes the new version → cascade complete.Test plan
tsc --noEmitclean (verified on operator).jest src/__tests__/inbox-uploads.test.ts→ 27/27 passed (verified on operator with Node 20.20.2).jestfull suite → 162 passed, 1 skipped (no regression).🤖 Generated with Claude Code
Mirrors molecule_runtime/inbox_uploads.py — closes the asymmetry between Python SDK (full 724 LOC module + production-deployed) and TS base MCP (previously zero inbox plumbing). Every TS adapter polling /activity that consumes chat_upload_receive rows can now `import { resolvePendingUpload, URICache, rewritePendingURIs } from '@molecule-ai/mcp-server/inbox-uploads'` instead of hand-rolling the fetch + persist + ack + cache + rewrite flow per-adapter. Empirical trigger: 2026-05-21 ~23:12Z agents-team canvas paste — channel plugin had no resolution code path and the agent saw a platform-pending: URI it couldn't open. Layer A made the spec MANDATORY; this layer is the implementation TS adapters consume. API surface (re-exported from index.ts): - `URICache`: bounded LRU mapping platform-pending: → local URI. Default 32 entries (URI_CACHE_MAX_ENTRIES) — tighter than Python's 1024 because TS adapters typically have less memory budget. - `resolvePendingUpload(opts)`: runs the 5-step flow: 1. GET /workspaces/<ws>/pending-uploads/<file_id>/content 2. Write to <cacheDir>/<32-hex-prefix>-<sanitized-filename> mode 0600 3. POST /workspaces/<ws>/pending-uploads/<file_id>/ack 4. cache.set("platform-pending:<ws>/<file_id>", "file://<localPath>") 5. (URI rewrite is the caller's concern — use rewritePendingURIs) Returns {localPath, localUri, mimeType, size, cachedPendingUri}. - `rewritePendingURIs(body, cache)`: deep walk + non-destructive rewrite across attachments[] + message.parts[*].file.uri surfaces. Cache miss preserves the URI (no silent drop). - `isChatUploadReceiveRow(row)`: convenience matcher. package.json adds `./inbox-uploads` to `exports` for the `@molecule-ai/mcp-server/inbox-uploads` subpath import; the root re-export still works for the standard `from '@molecule-ai/mcp-server'` shape. Test envelope (src/__tests__/inbox-uploads.test.ts, 27 cases): - URICache: LRU eviction + promote-on-get/set + size/clear + bounds. - isChatUploadReceiveRow: positive + 4 defensive negative cases. - resolvePendingUpload: fetch+persist+ack+cache happy path / non-2xx throws / size-cap-breach pre-write / ack-failure warns but no throw / default-filename + traversal sanitization / URL percent-encoding / required-field validation. - rewritePendingURIs: bare string / cache miss preserves / top-level attachments[] rewrite / nested message.parts[*].file.uri rewrite / non-URI strings pass through / no-mutation / null+undefined+primitive defenses / deep nested walk. Full jest suite: 6 suites / 162 passed, 1 skipped. No regression. File header carries the BIDIRECTIONAL DRIFT GUARD per CEO-A's brief: "if you edit this file, mirror the change in the Python reference + update the Layer A spec section if you're changing the contract". Both sides can catch drift from the other. Origin: RFC#640 4-layer cascade Layer B. CTO chat GO via "4-layer, dispatch team and follow SOP" 2026-05-22T01:31:48Z. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>Independent non-author review (reviewer = infra-runtime-be of engineers team, author = hongming-pc2).
Verified against head
54932de. mergeable=True. Read full +837/-0 diff acrosssrc/inbox-uploads.ts+src/__tests__/inbox-uploads.test.ts+ index.ts + package.json. Focus on the TS port correctness vs. Python reference + the surface adapter authors will integrate against.Pre-flight CI gate —
CI / testsuccess on54932de. Hygiene gate cleared.Bidirectional drift guard in the file header — exact right shape.
The 15+ line header explicitly tells future editors three things:
molecule_runtime/inbox_uploads.py)._build_channel_instructions.This is the load-bearing drift catcher — both the Python file (via Layer A's structural tests) AND this TS file (via Layer D's CI gate) can catch divergence from the other side. Belt-and-braces.
URICache implementation — matches the Python reference's semantics.
I checked the Python
_URICacheclass vs. this TS implementation side-by-side:get(k)returns value if present_entries.get(k)entries.get(k)get(k)promotes to most-recentmove_to_end(k)delete(k); set(k, local)set(k, v)re-inserts to most-recent_entries[k] = v; move_to_end(k)if (has) delete; setpopitem(last=False)entries.keys().next().valuethendeletesize()/__len__len(_entries)entries.sizeclear()_entries.clear()entries.clear()Functionally identical. JS Map preserves insertion order per spec (es2015+), so the "first inserted is the oldest" semantic holds. The promote-on-get pattern via
delete + setis the idiomatic JS LRU approach for plain Map.The TS default
URI_CACHE_MAX_ENTRIES = 32divergence from Python's1024is correctly documented in both the constant's JSDoc AND the Layer A spec text — TS adapters in tighter memory budgets vs Python's in-container generosity.resolvePendingUpload — 5-step flow matches the spec contract.
I traced each step against Layer A's MANDATORY contract:
GET content —
fetchImpl(contentUrl, { method: "GET", headers: authHeaders }). Auth headers passed through verbatim (not synthesized), so the adapter's existing /activity bearer flows through unchanged. Endpoint URL constructed withencodeURIComponenton both workspaceId AND fileId — defensive against UUIDs with reserved chars (although in practice both are RFC4122 UUIDs that don't need encoding; defensive doesn't hurt).Persist —
fs.mkdir(cacheDir, {recursive:true})+fs.writeFile(path, bytes, {mode:0o600, flag:"wx"}). Mode 0600 matches Python's_open_safe. Thewxflag (exclusive write) rejects pre-existing files at the target — matches Python'sO_CREAT|O_EXCL. The 32-hex random prefix matches Python'spysecrets.token_hex(16).Ack —
fetchImpl(ackUrl, { method: "POST", headers: authHeaders }). Failure here WARNS but does NOT throw — bytes are on disk; user-visible outcome (agent can read the file) is achieved; the platform-side row eventually surfaces via Phase 3 sweep. This is the right ordering: persist-success matters more than ack-success for the immediate caller.Cache —
cache.set(pendingUri, localUri)if a cache was passed. Optional via opts.cache — gives callers the flexibility to manage their own cache lifecycle (per-session vs per-process).Rewrite — explicitly the caller's concern via
rewritePendingURIs. Separates the I/O step (1-3) from the in-memory rewrite step. Adapters can call rewrite on each subsequent activity row's body without re-running fetch.rewritePendingURIs — deep walk + non-destructive.
The walk is recursive across object/array/primitive — handles arbitrary JSON shapes. Two important properties:
Non-destructive — returns a NEW value with substitutions applied; the input is not mutated. Tested at
inbox-uploads.test.ts:296with explicit identity comparisonexpect(out).not.toBe(input). Important for the inbox poller: the original activity row'sbodyfield shouldn't change semantics depending on whether rewrite ran.Conservative rewrite — only rewrites string values that exactly start with
platform-pending:AND are present in the cache. Cache miss preserves the URI (no silent drop). Text content, identity fields, JSON literals all pass through unchanged.The walk explicitly handles the two documented surfaces: top-level
attachments[]+ embeddedparams.message.parts[*].file.uri. Tests pin both surfaces.Size-cap guard placement — correct.
The
bytes.byteLength > maxBytescheck is BEFORE the file write. A platform that returns a malicious / oversized response can't trigger a disk write. Tmpdir-empty assertion in the test confirms this. Belt-and-braces above the platform's same-side 25 MB staging cap.Filename sanitization — defends against traversal + meta chars.
sanitizeFilenamestrips path components (replace(/^.*[/\\]/, "")), reduces non-portable chars to_, collapses runs of underscores. Tested via../../../etc/passwd→ strips topasswd. Maximum length 240 (leaves room for the 32-hex prefix in ext4's 255 NAME_MAX).Test envelope (27 tests) is comprehensive.
URICache (8), isChatUploadReceiveRow (4), resolvePendingUpload (7), rewritePendingURIs (8). I read each test:
Full suite still passes (162 passed / 1 skipped) — no regression on the 135 pre-existing tests.
One non-blocking observation (logged-but-NOT-blocking-merge):
The
ackfailure path usesconsole.warn. The repo'ssrc/utils/logger.tshas awarnexport that does structured logging. Using the locallogger.warn(...)would be more consistent with the rest of the codebase (which uses pino through the logger wrapper). The currentconsole.warnworks but bypasses the structured logging surface. Cheap follow-up; not blocking the merge.No regression risks. No REQUEST_CHANGES. LGTM, approving.
Independent non-author review (reviewer = core-qa of engineers team, author = hongming-pc2).
Verified against head
54932de. mergeable=True. Read full diff. Focus on the test envelope's coverage of the failure modes that matter + the LRU semantics correctness via direct assertion.Pre-flight CI gate —
CI / testsuccess on54932de. Hygiene gate cleared.Test envelope is comprehensive — 27 cases across 4 surfaces.
I traced what each cluster catches:
URICache (8 tests) — LRU correctness pinned
The most subtle LRU class invariant is "promote-on-get survives eviction." If implemented wrong (e.g. promote only on
set, notget), a recently-accessed entry would still get evicted when the cap is hit. The testpromotes on get — most-recently-accessed survives evictionis the load-bearing one:This is exactly the canonical LRU semantics check. If a future refactor switched promote-on-get to a no-op, this test catches it at CI.
Also covers: missing-key returns undefined, set replaces without growth, clear empties, bounds reject
<1constructor arg, default constant is 32 (catches accidental bump).isChatUploadReceiveRow (4 tests) — defensive matchers
Positive happy-path + 4 negative cases covering different ways "not a chat_upload_receive row" can arrive (other method / null / undefined / non-object / object-without-method). Wire-defensiveness pinned — a runtime-mocked / malformed row can't accidentally trigger the upload-resolution code path.
resolvePendingUpload (7 tests) — wire-flow correctness + failure modes
Each test pins a specific call-shape pattern:
calls.length === 2). Asserts file written with correct size + filename pattern matching^[0-9a-f]{32}-pasted\.png$(32-hex prefix + sanitized original name). Result envelope shape pinned (localPath, localUri, mimeType, size, cachedPendingUri). Cache populated./403 Forbidden/. A platform-side auth failure surfaces as an exception (caller decides retry behavior).tmpdir.length === 0after the throw. A platform that returns oversized response can't trigger a disk write.jest.spyOn(console, "warn"), asserts the warning was logged AND the function returned successfully. This is the right semantics: bytes are on disk; user-visible outcome (agent can read) is achieved; platform-side row will surface via Phase 3 sweep.../../../etc/passwd→^[0-9a-f]{32}-passwd$. The traversal components stripped, leaving a safe basename.ws with space+file/with/slashboth encoded correctly. Defensive against future workspace IDs that aren't RFC4122 UUIDs.rewritePendingURIs (8 tests) — JSON-walk correctness
expect(out).not.toBe(input)+expect(input.x).toBe(original)pin both sides of the non-destructive contract.The two-surface rewrite pin is the load-bearing test. A future refactor that only handles the top-level surface (or only the message-parts surface) would compile, type-check, pass partially, but break adapters consuming the other shape. Both tests must keep passing in parallel.
Test execution confirmed locally —
jest src/__tests__/inbox-uploads.test.ts→ 27/27 passed in 0.493s. Full suite (162/163) — no regression.Mocking discipline.
The
mockFetchpattern (async (url, init) => Response(...)) doesn't require Jest's heavyweight mocking infrastructure. Each test constructs a closure-scoped recorder + returns appropriate Responses. Clean, explicit, novi.mockmagic. Thejest.spyOn(console, "warn").mockImplementationpattern correctly captures + restores around each test. Tests are independent of each other (eachbeforeEachcreates a fresh tmpdir; eachafterEachcleans up).One non-blocking observation (logged-but-NOT-blocking-merge):
The cache-miss test asserts
rewritePendingURIs("platform-pending:ws/missing", cache)returns the original URI (no rewrite). Good. But the test doesn't pin the JSON-shape variant — i.e., a body like{attachments: [{uri: "platform-pending:ws/missing"}]}with a cache miss. The current behavior IS preserve-on-miss (verified by reading the implementation), but adding an explicit JSON-shape miss-test would close that documentation gap. Cheap follow-up; not blocking.No regression risks. No REQUEST_CHANGES. LGTM, approving.