From 54932de63c0d9bd0318349e7b522dbaf9c97cd3a Mon Sep 17 00:00:00 2001 From: hongming-pc2 Date: Thu, 21 May 2026 18:58:49 -0700 Subject: [PATCH] feat(inbox-uploads): TS port of upload-resolution flow (RFC#640 Layer B) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors molecule_runtime/inbox_uploads.py — closes the asymmetry between Python SDK (full 724 LOC module + production-deployed) and TS base MCP (previously zero inbox plumbing). Every TS adapter polling /activity that consumes chat_upload_receive rows can now `import { resolvePendingUpload, URICache, rewritePendingURIs } from '@molecule-ai/mcp-server/inbox-uploads'` instead of hand-rolling the fetch + persist + ack + cache + rewrite flow per-adapter. Empirical trigger: 2026-05-21 ~23:12Z agents-team canvas paste — channel plugin had no resolution code path and the agent saw a platform-pending: URI it couldn't open. Layer A made the spec MANDATORY; this layer is the implementation TS adapters consume. API surface (re-exported from index.ts): - `URICache`: bounded LRU mapping platform-pending: → local URI. Default 32 entries (URI_CACHE_MAX_ENTRIES) — tighter than Python's 1024 because TS adapters typically have less memory budget. - `resolvePendingUpload(opts)`: runs the 5-step flow: 1. GET /workspaces//pending-uploads//content 2. Write to /<32-hex-prefix>- mode 0600 3. POST /workspaces//pending-uploads//ack 4. cache.set("platform-pending:/", "file://") 5. (URI rewrite is the caller's concern — use rewritePendingURIs) Returns {localPath, localUri, mimeType, size, cachedPendingUri}. - `rewritePendingURIs(body, cache)`: deep walk + non-destructive rewrite across attachments[] + message.parts[*].file.uri surfaces. Cache miss preserves the URI (no silent drop). - `isChatUploadReceiveRow(row)`: convenience matcher. package.json adds `./inbox-uploads` to `exports` for the `@molecule-ai/mcp-server/inbox-uploads` subpath import; the root re-export still works for the standard `from '@molecule-ai/mcp-server'` shape. Test envelope (src/__tests__/inbox-uploads.test.ts, 27 cases): - URICache: LRU eviction + promote-on-get/set + size/clear + bounds. - isChatUploadReceiveRow: positive + 4 defensive negative cases. - resolvePendingUpload: fetch+persist+ack+cache happy path / non-2xx throws / size-cap-breach pre-write / ack-failure warns but no throw / default-filename + traversal sanitization / URL percent-encoding / required-field validation. - rewritePendingURIs: bare string / cache miss preserves / top-level attachments[] rewrite / nested message.parts[*].file.uri rewrite / non-URI strings pass through / no-mutation / null+undefined+primitive defenses / deep nested walk. Full jest suite: 6 suites / 162 passed, 1 skipped. No regression. File header carries the BIDIRECTIONAL DRIFT GUARD per CEO-A's brief: "if you edit this file, mirror the change in the Python reference + update the Layer A spec section if you're changing the contract". Both sides can catch drift from the other. Origin: RFC#640 4-layer cascade Layer B. CTO chat GO via "4-layer, dispatch team and follow SOP" 2026-05-22T01:31:48Z. Co-Authored-By: Claude Opus 4.7 (1M context) --- package.json | 1 + src/__tests__/inbox-uploads.test.ts | 435 ++++++++++++++++++++++++++++ src/inbox-uploads.ts | 385 ++++++++++++++++++++++++ src/index.ts | 16 + 4 files changed, 837 insertions(+) create mode 100644 src/__tests__/inbox-uploads.test.ts create mode 100644 src/inbox-uploads.ts diff --git a/package.json b/package.json index 0f69e6f..468a29c 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "exports": { ".": "./dist/index.js", "./external-workspace-tools": "./dist/external_workspace_tools.js", + "./inbox-uploads": "./dist/inbox-uploads.js", "./targets": "./dist/targets.js" }, "types": "./dist/index.d.ts", diff --git a/src/__tests__/inbox-uploads.test.ts b/src/__tests__/inbox-uploads.test.ts new file mode 100644 index 0000000..4922010 --- /dev/null +++ b/src/__tests__/inbox-uploads.test.ts @@ -0,0 +1,435 @@ +/** + * Tests for src/inbox-uploads.ts (Layer B of RFC#640 4-layer cascade). + * + * Three surfaces under test: + * 1. URICache — LRU eviction, promote-on-get/set, size, clear, bounds. + * 2. resolvePendingUpload — fetch + persist + ack + cache flow, with + * mock fetch + mock fs (real fs via tmpdir). + * 3. rewritePendingURIs — deep walk across attachments[] + + * message.parts[*].file.uri surfaces; cache miss preserves URI. + * + * Mirrors the Python reference's test envelope shape — the contract + * is bidirectional (Python tests pin the spec text; TS tests pin the + * implementation correctness). + */ + +import * as fs from "node:fs"; +import * as path from "node:path"; +import * as os from "node:os"; + +import { + URICache, + URI_CACHE_MAX_ENTRIES, + resolvePendingUpload, + rewritePendingURIs, + isChatUploadReceiveRow, +} from "../inbox-uploads.js"; + +// --------------------------------------------------------------------------- +// URICache +// --------------------------------------------------------------------------- + +describe("URICache", () => { + it("returns undefined for missing key", () => { + const c = new URICache(); + expect(c.get("platform-pending:ws/x")).toBeUndefined(); + }); + + it("returns the stored URI", () => { + const c = new URICache(); + c.set("platform-pending:ws/x", "file:///tmp/x"); + expect(c.get("platform-pending:ws/x")).toBe("file:///tmp/x"); + }); + + it("set replaces existing entry without growing size", () => { + const c = new URICache(); + c.set("k", "v1"); + c.set("k", "v2"); + expect(c.size()).toBe(1); + expect(c.get("k")).toBe("v2"); + }); + + it("evicts oldest when cap exceeded", () => { + const c = new URICache(3); + c.set("a", "1"); + c.set("b", "2"); + c.set("c", "3"); + c.set("d", "4"); // evicts "a" + expect(c.size()).toBe(3); + expect(c.get("a")).toBeUndefined(); + expect(c.get("b")).toBe("2"); + expect(c.get("c")).toBe("3"); + expect(c.get("d")).toBe("4"); + }); + + it("promotes on get — most-recently-accessed survives eviction", () => { + const c = new URICache(3); + c.set("a", "1"); + c.set("b", "2"); + c.set("c", "3"); + // Touch "a" so it becomes most-recent. + expect(c.get("a")).toBe("1"); + // Set "d" — eviction should now drop "b" (which is the new oldest). + c.set("d", "4"); + expect(c.get("a")).toBe("1"); + expect(c.get("b")).toBeUndefined(); + expect(c.get("c")).toBe("3"); + expect(c.get("d")).toBe("4"); + }); + + it("clear empties the cache", () => { + const c = new URICache(); + c.set("a", "1"); + c.set("b", "2"); + expect(c.size()).toBe(2); + c.clear(); + expect(c.size()).toBe(0); + expect(c.get("a")).toBeUndefined(); + }); + + it("rejects maxEntries < 1", () => { + expect(() => new URICache(0)).toThrow(); + expect(() => new URICache(-1)).toThrow(); + }); + + it("default URI_CACHE_MAX_ENTRIES is 32 (TS-adapter budget)", () => { + // Python reference uses 1024 because the in-container runtime has + // the workspace's full memory; TS adapters in tighter budgets use 32. + expect(URI_CACHE_MAX_ENTRIES).toBe(32); + }); +}); + +// --------------------------------------------------------------------------- +// isChatUploadReceiveRow +// --------------------------------------------------------------------------- + +describe("isChatUploadReceiveRow", () => { + it("matches chat_upload_receive method", () => { + expect(isChatUploadReceiveRow({ method: "chat_upload_receive" })).toBe(true); + }); + it("rejects other methods", () => { + expect(isChatUploadReceiveRow({ method: "message/send" })).toBe(false); + expect(isChatUploadReceiveRow({ method: "notify" })).toBe(false); + }); + it("rejects non-object input defensively", () => { + expect(isChatUploadReceiveRow(null)).toBe(false); + expect(isChatUploadReceiveRow(undefined)).toBe(false); + expect(isChatUploadReceiveRow("chat_upload_receive")).toBe(false); + expect(isChatUploadReceiveRow(42)).toBe(false); + }); + it("rejects object without method field", () => { + expect(isChatUploadReceiveRow({ activity_id: "x" })).toBe(false); + }); +}); + +// --------------------------------------------------------------------------- +// resolvePendingUpload +// --------------------------------------------------------------------------- + +describe("resolvePendingUpload", () => { + let tmpDir: string; + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "mcp-inbox-test-")); + }); + afterEach(() => { + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch { + // best-effort cleanup + } + }); + + it("fetches content + writes file + acks + caches", async () => { + const bytes = new Uint8Array([1, 2, 3, 4, 5]); + const calls: Array<{ url: string; method: string }> = []; + const mockFetch: typeof fetch = async (url, init) => { + const m = (init?.method ?? "GET") as string; + const u = (url as string).toString(); + calls.push({ url: u, method: m }); + if (u.endsWith("/content")) { + return new Response(bytes, { + status: 200, + headers: { "content-type": "image/png" }, + }); + } + if (u.endsWith("/ack")) { + return new Response("", { status: 200 }); + } + return new Response("not found", { status: 404 }); + }; + + const cache = new URICache(); + const result = await resolvePendingUpload({ + workspaceId: "ws-1", + fileId: "file-abc", + authHeaders: { Authorization: "Bearer test-token" }, + cacheDir: tmpDir, + filename: "pasted.png", + cache, + platformUrl: "https://api.test", + fetchImpl: mockFetch, + }); + + // Both endpoints called exactly once with the right shape. + expect(calls.length).toBe(2); + expect(calls[0].method).toBe("GET"); + expect(calls[0].url).toBe( + "https://api.test/workspaces/ws-1/pending-uploads/file-abc/content", + ); + expect(calls[1].method).toBe("POST"); + expect(calls[1].url).toBe( + "https://api.test/workspaces/ws-1/pending-uploads/file-abc/ack", + ); + + // File written to disk with the expected size + mode. + expect(fs.existsSync(result.localPath)).toBe(true); + const stat = fs.statSync(result.localPath); + expect(stat.size).toBe(5); + // Filename has the 32-hex prefix + sanitized name. + expect(path.basename(result.localPath)).toMatch(/^[0-9a-f]{32}-pasted\.png$/); + + // Result envelope shape. + expect(result.size).toBe(5); + expect(result.mimeType).toBe("image/png"); + expect(result.localUri).toBe(`file://${result.localPath}`); + expect(result.cachedPendingUri).toBe("platform-pending:ws-1/file-abc"); + + // Cache populated. + expect(cache.get("platform-pending:ws-1/file-abc")).toBe(result.localUri); + }); + + it("throws on GET non-2xx", async () => { + const mockFetch: typeof fetch = async () => + new Response("denied", { status: 403, statusText: "Forbidden" }); + await expect( + resolvePendingUpload({ + workspaceId: "ws-1", + fileId: "file-abc", + authHeaders: {}, + cacheDir: tmpDir, + fetchImpl: mockFetch, + }), + ).rejects.toThrow(/403 Forbidden/); + }); + + it("throws on size-cap breach BEFORE writing", async () => { + const bigBytes = new Uint8Array(11); + const mockFetch: typeof fetch = async (url) => { + const u = (url as string).toString(); + if (u.endsWith("/content")) { + return new Response(bigBytes, { status: 200 }); + } + return new Response("", { status: 200 }); + }; + await expect( + resolvePendingUpload({ + workspaceId: "ws-1", + fileId: "file-abc", + authHeaders: {}, + cacheDir: tmpDir, + maxBytes: 10, + fetchImpl: mockFetch, + }), + ).rejects.toThrow(/exceeds maxBytes/); + // Tmpdir stayed empty — no partial write. + expect(fs.readdirSync(tmpDir).length).toBe(0); + }); + + it("logs but does not throw when ack fails", async () => { + const warn = jest.spyOn(console, "warn").mockImplementation(() => {}); + const mockFetch: typeof fetch = async (url) => { + const u = (url as string).toString(); + if (u.endsWith("/content")) { + return new Response(new Uint8Array([1]), { status: 200 }); + } + // Ack returns 500. + return new Response("server error", { status: 500, statusText: "Server Error" }); + }; + const result = await resolvePendingUpload({ + workspaceId: "ws-1", + fileId: "file-abc", + authHeaders: {}, + cacheDir: tmpDir, + fetchImpl: mockFetch, + }); + expect(result.size).toBe(1); + expect(warn).toHaveBeenCalledWith(expect.stringMatching(/POST .*\/ack returned 500/)); + warn.mockRestore(); + }); + + it("default filename + sanitizes traversal attempts", async () => { + const mockFetch: typeof fetch = async (url) => { + const u = (url as string).toString(); + if (u.endsWith("/content")) { + return new Response(new Uint8Array([0]), { status: 200 }); + } + return new Response("", { status: 200 }); + }; + const result = await resolvePendingUpload({ + workspaceId: "ws-1", + fileId: "file-abc", + authHeaders: {}, + cacheDir: tmpDir, + filename: "../../../etc/passwd", + fetchImpl: mockFetch, + }); + // Final filename strips the path components and keeps a safe name. + const base = path.basename(result.localPath); + expect(base).not.toContain("../"); + expect(base).toMatch(/^[0-9a-f]{32}-passwd$/); + }); + + it("uses workspaceId + fileId in URL encoding", async () => { + const calls: string[] = []; + const mockFetch: typeof fetch = async (url) => { + calls.push((url as string).toString()); + return new Response(new Uint8Array([1]), { status: 200 }); + }; + await resolvePendingUpload({ + workspaceId: "ws with space", + fileId: "file/with/slash", + authHeaders: {}, + cacheDir: tmpDir, + fetchImpl: mockFetch, + platformUrl: "https://api.test", + }); + // Both ws and fileId percent-encoded. + expect(calls[0]).toBe( + "https://api.test/workspaces/ws%20with%20space/pending-uploads/file%2Fwith%2Fslash/content", + ); + }); + + it("validates required workspaceId, fileId, cacheDir", async () => { + const noop: typeof fetch = async () => new Response("", { status: 200 }); + await expect( + resolvePendingUpload({ + workspaceId: "", + fileId: "f", + authHeaders: {}, + cacheDir: tmpDir, + fetchImpl: noop, + }), + ).rejects.toThrow(/workspaceId/); + await expect( + resolvePendingUpload({ + workspaceId: "w", + fileId: "", + authHeaders: {}, + cacheDir: tmpDir, + fetchImpl: noop, + }), + ).rejects.toThrow(/fileId/); + await expect( + resolvePendingUpload({ + workspaceId: "w", + fileId: "f", + authHeaders: {}, + cacheDir: "", + fetchImpl: noop, + }), + ).rejects.toThrow(/cacheDir/); + }); +}); + +// --------------------------------------------------------------------------- +// rewritePendingURIs +// --------------------------------------------------------------------------- + +describe("rewritePendingURIs", () => { + it("rewrites a bare platform-pending: string", () => { + const cache = new URICache(); + cache.set("platform-pending:ws/a", "file:///tmp/x"); + expect(rewritePendingURIs("platform-pending:ws/a", cache)).toBe("file:///tmp/x"); + }); + + it("preserves URI on cache miss (no silent drop)", () => { + const cache = new URICache(); + expect(rewritePendingURIs("platform-pending:ws/missing", cache)).toBe( + "platform-pending:ws/missing", + ); + }); + + it("rewrites top-level attachments[] uri", () => { + const cache = new URICache(); + cache.set("platform-pending:ws/a", "file:///tmp/a.png"); + const body = { + attachments: [ + { kind: "image", uri: "platform-pending:ws/a", name: "a.png", mime_type: "image/png" }, + ], + text: "hello", + }; + const out = rewritePendingURIs(body, cache) as typeof body; + expect(out.attachments[0].uri).toBe("file:///tmp/a.png"); + expect(out.attachments[0].name).toBe("a.png"); + expect(out.text).toBe("hello"); + }); + + it("rewrites embedded message.parts[*].file.uri", () => { + const cache = new URICache(); + cache.set("platform-pending:ws/img", "file:///tmp/img.png"); + cache.set("platform-pending:ws/aud", "file:///tmp/aud.mp3"); + const body = { + params: { + message: { + parts: [ + { kind: "text", text: "see attached" }, + { + kind: "image", + file: { uri: "platform-pending:ws/img", mime_type: "image/png", name: "img.png" }, + }, + { + kind: "audio", + file: { uri: "platform-pending:ws/aud", mime_type: "audio/mpeg", name: "aud.mp3" }, + }, + ], + }, + }, + }; + const out = rewritePendingURIs(body, cache) as typeof body; + expect(out.params.message.parts[0]).toEqual({ kind: "text", text: "see attached" }); + expect(out.params.message.parts[1].file!.uri).toBe("file:///tmp/img.png"); + expect(out.params.message.parts[2].file!.uri).toBe("file:///tmp/aud.mp3"); + }); + + it("non-URI strings pass through unchanged", () => { + const cache = new URICache(); + cache.set("platform-pending:ws/x", "file:///tmp/x"); + expect(rewritePendingURIs("hello world", cache)).toBe("hello world"); + expect(rewritePendingURIs("workspace:/tmp/foo.pdf", cache)).toBe( + "workspace:/tmp/foo.pdf", + ); + }); + + it("does not mutate the input", () => { + const cache = new URICache(); + cache.set("platform-pending:ws/a", "file:///tmp/a"); + const input = { x: "platform-pending:ws/a" }; + const out = rewritePendingURIs(input, cache) as typeof input; + // Input unchanged. + expect(input.x).toBe("platform-pending:ws/a"); + // Output rewritten. + expect(out.x).toBe("file:///tmp/a"); + // Different identity (new object). + expect(out).not.toBe(input); + }); + + it("handles null + undefined + primitives", () => { + const cache = new URICache(); + expect(rewritePendingURIs(null, cache)).toBeNull(); + expect(rewritePendingURIs(undefined, cache)).toBeUndefined(); + expect(rewritePendingURIs(42, cache)).toBe(42); + expect(rewritePendingURIs(true, cache)).toBe(true); + }); + + it("walks deep into nested arrays + objects", () => { + const cache = new URICache(); + cache.set("platform-pending:ws/deep", "file:///tmp/deep"); + const body = { + a: { b: { c: [{ d: "platform-pending:ws/deep" }] } }, + }; + const out = rewritePendingURIs(body, cache) as { + a: { b: { c: Array<{ d: string }> } }; + }; + expect(out.a.b.c[0].d).toBe("file:///tmp/deep"); + }); +}); diff --git a/src/inbox-uploads.ts b/src/inbox-uploads.ts new file mode 100644 index 0000000..ba4c9a6 --- /dev/null +++ b/src/inbox-uploads.ts @@ -0,0 +1,385 @@ +/** + * inbox-uploads — chat-upload resolution flow for /activity-polling adapters. + * + * MANDATORY contract surface for any TS adapter that consumes `chat_upload_receive` + * activity rows. Mirrors the Python reference at + * molecule_runtime/inbox_uploads.py + * in `molecule-ai-workspace-runtime` (724 LOC; the in-container runtime's + * upload-resolution module). + * + * IF YOU EDIT THIS FILE: + * - Mirror the change in the Python reference (`molecule_runtime/inbox_uploads.py`). + * - If the contract semantics change (steps, ordering, endpoint shape), + * ALSO update the spec section in + * `molecule_runtime/a2a_mcp_server.py::_build_channel_instructions` + * ("Upload resolution (MANDATORY...)" block). + * - The Layer D contract test in `__tests__/inbox-uploads-import-contract.test.ts` + * will fail-CI on any TS file that imports `apiCall` from + * `@molecule-ai/mcp-server` to poll /activity but does NOT also import + * `resolvePendingUpload` (or opts out via the documented magic comment). + * + * Bidirectional drift catchable from either side: + * - Python side: `tests/test_upload_resolution_contract.py` pins the + * spec text (steps named verbatim, references to BOTH this TS file + * AND the Python file, kind enumeration including video). + * - TS side: `__tests__/inbox-uploads.test.ts` pins URICache LRU + * semantics, fetch/persist/ack/cache/rewrite flow, JSON-walk rewrite + * across attachments[] and message.parts surfaces. + * + * Origin: RFC#640 4-layer cascade Layer B. CTO chat GO 2026-05-22T01:31:48Z. + * Empirical trigger: 2026-05-21 ~23:12Z agents-team canvas paste — + * channel plugin had no resolution code path and surfaced + * `platform-pending:` URIs the agent couldn't open. Layer B closes the + * asymmetry between Python SDK (full module) and TS base MCP (zero module). + */ + +import * as fs from "node:fs/promises"; +import * as path from "node:path"; +import * as crypto from "node:crypto"; + +import { PLATFORM_URL } from "./api.js"; + +// --------------------------------------------------------------------------- +// LRU cache (mirrors molecule_runtime/inbox_uploads.py::_URICache semantics) +// --------------------------------------------------------------------------- + +/** + * Default LRU bound for TS adapters. Tighter than the Python reference + * (which uses `URI_CACHE_MAX_ENTRIES=1024` because the in-container + * runtime has the workspace's full memory budget) because TS adapters + * — channel plugin, telegram-style adapters, codex bridges — typically + * run in a host shell or sidecar with less memory headroom. 32 entries + * comfortably covers a single agent session's upload count (the + * empirical canvas paste was 1 file; even an aggressive multi-file + * drag rarely exceeds 5-10). + * + * Adapters with looser budgets can override via the URICache constructor. + */ +export const URI_CACHE_MAX_ENTRIES = 32; + +/** + * Bounded LRU mapping `platform-pending:/` → local file URI. + * + * JS Maps preserve insertion order, so we use the Map's natural iteration + * order for LRU: on `set`, delete-and-reinsert promotes the entry to + * most-recent; on `get`, same delete-and-reinsert promotes; eviction + * pops the first (oldest) entry. + * + * Not thread-safe — Node.js is single-threaded with cooperative async + * scheduling, so the Python reference's `threading.Lock` doesn't apply. + * A future Worker-thread adapter would need to add synchronization. + */ +export class URICache { + private entries: Map = new Map(); + + constructor(private readonly maxEntries: number = URI_CACHE_MAX_ENTRIES) { + if (maxEntries < 1) { + throw new Error(`URICache maxEntries must be >= 1, got ${maxEntries}`); + } + } + + get(pendingUri: string): string | undefined { + const local = this.entries.get(pendingUri); + if (local !== undefined) { + // Promote to most-recent. + this.entries.delete(pendingUri); + this.entries.set(pendingUri, local); + } + return local; + } + + set(pendingUri: string, localUri: string): void { + // If already present, delete first so the re-set lands at most-recent. + if (this.entries.has(pendingUri)) { + this.entries.delete(pendingUri); + } + this.entries.set(pendingUri, localUri); + while (this.entries.size > this.maxEntries) { + const oldest = this.entries.keys().next().value; + if (oldest === undefined) break; + this.entries.delete(oldest); + } + } + + size(): number { + return this.entries.size; + } + + clear(): void { + this.entries.clear(); + } +} + +// --------------------------------------------------------------------------- +// Activity-row matcher (mirrors molecule_runtime/inbox_uploads.py::is_chat_upload_row) +// --------------------------------------------------------------------------- + +/** + * True iff `row` is a `chat_upload_receive` activity row. + * + * Adapters fork this row off the regular A2A message handling path — + * it's not a peer message; it's an instruction to fetch + stage bytes. + * Match on `method` only; the upstream `/activity` filter already + * scopes by `activity_type=a2a_receive` if needed. + */ +export function isChatUploadReceiveRow(row: unknown): boolean { + return ( + typeof row === "object" && + row !== null && + (row as { method?: unknown }).method === "chat_upload_receive" + ); +} + +// --------------------------------------------------------------------------- +// Fetch + persist + ack flow +// --------------------------------------------------------------------------- + +/** + * Result of a successful resolvePendingUpload call. + * + * - `localPath`: absolute path on the local filesystem where bytes were + * written. Adapters that surface a `file://` URI to the agent use + * this directly. + * - `localUri`: `file://...` URI variant of localPath; convenience for + * adapters that pass URIs through to the agent / model context. + * - `mimeType`: from the platform's Content-Type response header, if + * present and parseable. Undefined when the platform doesn't supply. + * - `size`: byte count of what was written. + * - `cachedPendingUri`: the `platform-pending:/` URI used + * as the cache key. Adapters that want to update an external URI + * cache (beyond the one passed in via opts.cache) use this. + */ +export interface ResolveUploadResult { + localPath: string; + localUri: string; + mimeType?: string; + size: number; + cachedPendingUri: string; +} + +/** + * Options for resolvePendingUpload. + * + * Required: + * - `workspaceId`: the workspace UUID — same one used for /activity polling. + * - `fileId`: the `` from `platform-pending:/` or + * from the activity row's request_body. + * - `authHeaders`: HTTP headers including the Bearer auth — adapters + * pass the SAME headers they use for /activity polling. The + * /pending-uploads//content + /ack endpoints are wsAuth-gated, so + * the workspace's bearer is sufficient (no separate handshake). + * - `cacheDir`: absolute directory path where bytes are persisted. + * Adapter-specific: + * - Claude Code channel plugin: `~/.claude/channels/molecule/inbox/` + * - In-container Python runtime: `/workspace/.molecule/chat-uploads/` + * - Other adapters: pick a stable, adapter-specific path. + * + * Optional: + * - `filename`: hint for the on-disk filename (without prefix). The + * final filename is `<32-hex-prefix>-` so that + * parallel uploads with the same source name don't collide. + * Default `upload.bin` if not supplied. + * - `cache`: a URICache instance to populate with the + * `platform-pending:/` → `file://` mapping + * on success. If omitted, no cache write happens (caller manages + * cache separately). + * - `platformUrl`: override the platform base URL (defaults to + * PLATFORM_URL from `./api.js` — `MOLECULE_API_URL` env var). + * - `fetchImpl`: override `globalThis.fetch` for testing. + * - `maxBytes`: per-file safety cap. Default 25 MiB matching the + * platform's same-side staging cap. + */ +export interface ResolveUploadOptions { + workspaceId: string; + fileId: string; + authHeaders: Record; + cacheDir: string; + filename?: string; + cache?: URICache; + platformUrl?: string; + fetchImpl?: typeof fetch; + maxBytes?: number; +} + +const DEFAULT_MAX_BYTES = 25 * 1024 * 1024; + +/** + * Fetch the bytes of a `platform-pending:/` upload, persist + * to a local cache dir, ack the platform-side `pending_uploads` row, + * and (if a cache is provided) record the URI mapping. + * + * Returns the full result envelope. On any failure (network, non-2xx, + * fs write error, size-cap breach) throws an Error with a structured + * message. The platform-side row stays unacked when the throw originates + * upstream of the ack POST — adapters' poll-loop retry semantics carry + * it through to a future invocation. + * + * This is the 5-step MANDATORY flow named in the + * `_build_channel_instructions` spec section. Skipping any step results + * in silent file loss — the agent sees `platform-pending:` URIs it + * cannot open with no error surfaced. The flow: + * + * 1. GET /workspaces//pending-uploads//content + * 2. mkdir + write to cacheDir/- (mode 0600) + * 3. POST /workspaces//pending-uploads//ack + * 4. cache.set("platform-pending:/", "file://") + * 5. (URI rewrite is the caller's concern — use rewritePendingURIs()) + */ +export async function resolvePendingUpload( + opts: ResolveUploadOptions, +): Promise { + const { + workspaceId, + fileId, + authHeaders, + cacheDir, + filename = "upload.bin", + cache, + platformUrl = PLATFORM_URL, + fetchImpl = fetch, + maxBytes = DEFAULT_MAX_BYTES, + } = opts; + + if (!workspaceId) throw new Error("resolvePendingUpload: workspaceId required"); + if (!fileId) throw new Error("resolvePendingUpload: fileId required"); + if (!cacheDir) throw new Error("resolvePendingUpload: cacheDir required"); + + const pendingUri = `platform-pending:${workspaceId}/${fileId}`; + const baseUrl = `${platformUrl}/workspaces/${encodeURIComponent(workspaceId)}/pending-uploads/${encodeURIComponent(fileId)}`; + const contentUrl = `${baseUrl}/content`; + const ackUrl = `${baseUrl}/ack`; + + // Step 1: fetch content + const res = await fetchImpl(contentUrl, { + method: "GET", + headers: authHeaders, + }); + if (!res.ok) { + throw new Error( + `resolvePendingUpload: GET ${contentUrl} returned ${res.status} ${res.statusText}`, + ); + } + const ab = await res.arrayBuffer(); + const bytes = new Uint8Array(ab); + if (bytes.byteLength > maxBytes) { + throw new Error( + `resolvePendingUpload: content size ${bytes.byteLength} exceeds maxBytes ${maxBytes}`, + ); + } + const mimeType = (res.headers.get("content-type") ?? undefined) || undefined; + + // Step 2: persist to local cache dir + await fs.mkdir(cacheDir, { recursive: true }); + const sanitized = sanitizeFilename(filename); + // 32-hex prefix matches Python's pysecrets.token_hex(16) — random + // enough that two parallel uploads of the same source filename can't + // collide; also defeats any "guess the on-disk name" attack from a + // stale agent that knows the original filename. + const prefix = crypto.randomBytes(16).toString("hex"); + const stored = `${prefix}-${sanitized}`; + const localPath = path.join(cacheDir, stored); + // mode 0o600 — only this process's user can read. Matches the Python + // reference's _open_safe pattern. wx mode rejects pre-existing files + // at the target (the 32-hex prefix makes collision astronomical, but + // defense-in-depth costs nothing). + await fs.writeFile(localPath, bytes, { mode: 0o600, flag: "wx" }); + + // Step 3: ack + const ackRes = await fetchImpl(ackUrl, { + method: "POST", + headers: authHeaders, + }); + if (!ackRes.ok) { + // Failure here means the bytes ARE on disk but the platform row + // stays in the pending queue. Phase 3 sweep will eventually + // surface the stale row; the agent already has the local file. + // We log + continue rather than throw, because the user-visible + // outcome (agent can read the file) is achieved. + // eslint-disable-next-line no-console + console.warn( + `resolvePendingUpload: POST ${ackUrl} returned ${ackRes.status} ${ackRes.statusText} ` + + `— bytes written locally but platform-side row not reclaimed`, + ); + } + + // Step 4: cache the mapping + const localUri = `file://${localPath}`; + if (cache) { + cache.set(pendingUri, localUri); + } + + return { + localPath, + localUri, + mimeType, + size: bytes.byteLength, + cachedPendingUri: pendingUri, + }; +} + +// --------------------------------------------------------------------------- +// URI rewrite (mirrors molecule_runtime/inbox_uploads.py::rewrite_request_body +// + the broader walk semantics) +// --------------------------------------------------------------------------- + +/** + * Walk `body` (arbitrary JSON-shaped value) and rewrite any + * `platform-pending:/` URIs to their cached local URIs. + * + * The walk is deep + non-destructive: returns a new value with + * substitutions applied; the input is not mutated. + * + * Two surfaces are explicitly handled because they're the documented + * inbound shapes that carry attachment URIs: + * - Top-level `attachments[]` array (peer_info-enriched activity rows) + * - Embedded `params.message.parts[*].file.uri` (a2a-sdk v1 message + * parts; the in-container runtime emits these for peer-agent + * attachments) + * + * The walk is conservative: it ONLY rewrites string values that exactly + * start with `platform-pending:` and are present in the cache. Other + * strings (text content, identity fields, etc.) pass through unchanged. + * A cache miss (URI not yet resolved) leaves the URI in place — the + * agent will see something it can't open, which is preferable to + * silently dropping the URI. + */ +export function rewritePendingURIs(body: unknown, cache: URICache): unknown { + if (body === null || body === undefined) return body; + if (typeof body === "string") { + if (body.startsWith("platform-pending:")) { + const local = cache.get(body); + return local ?? body; + } + return body; + } + if (Array.isArray(body)) { + return body.map((item) => rewritePendingURIs(item, cache)); + } + if (typeof body === "object") { + const out: Record = {}; + for (const [k, v] of Object.entries(body as Record)) { + out[k] = rewritePendingURIs(v, cache); + } + return out; + } + return body; +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +/** + * Sanitize a filename: keep alnum + dash + underscore + dot, collapse + * everything else to `_`. Defense against ../ traversal, shell-meta + * chars, and null bytes in user-supplied filenames. + */ +function sanitizeFilename(name: string): string { + if (!name) return "upload.bin"; + // Strip any directory components. + const base = name.replace(/^.*[/\\]/, ""); + // Drop null bytes + non-portable chars; collapse runs of `_`. + const cleaned = base.replace(/[^A-Za-z0-9._-]/g, "_").replace(/_+/g, "_"); + if (!cleaned || cleaned === "." || cleaned === "..") return "upload.bin"; + return cleaned.slice(0, 240); // ext4 NAME_MAX = 255; leave room for the prefix +} diff --git a/src/index.ts b/src/index.ts index 31584e2..ba23c7b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -33,6 +33,22 @@ import { registerRemoteAgentTools } from "./tools/remote_agents.js"; // export triggers a compile error instead of a silent undefined at import. export { PLATFORM_URL, apiCall, isApiError, platformGet, toMcpResult, toMcpText } from "./api.js"; export type { ApiError } from "./api.js"; +// RFC#640 Layer B — chat-upload resolution flow. MANDATORY surface for +// any /activity-polling adapter (channel plugin, telegram-style +// adapters, codex bridges) that consumes chat_upload_receive rows. +// See molecule_runtime/a2a_mcp_server.py::_build_channel_instructions +// "Upload resolution (MANDATORY...)" for the spec. +export { + URICache, + URI_CACHE_MAX_ENTRIES, + resolvePendingUpload, + rewritePendingURIs, + isChatUploadReceiveRow, +} from "./inbox-uploads.js"; +export type { + ResolveUploadOptions, + ResolveUploadResult, +} from "./inbox-uploads.js"; export { formatTargetSummary, parseWorkspaceTargets } from "./targets.js"; export type { WorkspaceTarget } from "./targets.js"; export { -- 2.52.0