feat(inbox-uploads): TS port of upload-resolution flow (RFC#640 Layer B) #26

Merged
plugin-dev merged 1 commits from feat/inbox-uploads-rfc640-layer-b into main 2026-05-22 02:34:33 +00:00
4 changed files with 837 additions and 0 deletions
+1
View File
@@ -6,6 +6,7 @@
"exports": {
".": "./dist/index.js",
"./external-workspace-tools": "./dist/external_workspace_tools.js",
"./inbox-uploads": "./dist/inbox-uploads.js",
"./targets": "./dist/targets.js"
},
"types": "./dist/index.d.ts",
+435
View File
@@ -0,0 +1,435 @@
/**
* Tests for src/inbox-uploads.ts (Layer B of RFC#640 4-layer cascade).
*
* Three surfaces under test:
* 1. URICache — LRU eviction, promote-on-get/set, size, clear, bounds.
* 2. resolvePendingUpload — fetch + persist + ack + cache flow, with
* mock fetch + mock fs (real fs via tmpdir).
* 3. rewritePendingURIs — deep walk across attachments[] +
* message.parts[*].file.uri surfaces; cache miss preserves URI.
*
* Mirrors the Python reference's test envelope shape — the contract
* is bidirectional (Python tests pin the spec text; TS tests pin the
* implementation correctness).
*/
import * as fs from "node:fs";
import * as path from "node:path";
import * as os from "node:os";
import {
URICache,
URI_CACHE_MAX_ENTRIES,
resolvePendingUpload,
rewritePendingURIs,
isChatUploadReceiveRow,
} from "../inbox-uploads.js";
// ---------------------------------------------------------------------------
// URICache
// ---------------------------------------------------------------------------
describe("URICache", () => {
it("returns undefined for missing key", () => {
const c = new URICache();
expect(c.get("platform-pending:ws/x")).toBeUndefined();
});
it("returns the stored URI", () => {
const c = new URICache();
c.set("platform-pending:ws/x", "file:///tmp/x");
expect(c.get("platform-pending:ws/x")).toBe("file:///tmp/x");
});
it("set replaces existing entry without growing size", () => {
const c = new URICache();
c.set("k", "v1");
c.set("k", "v2");
expect(c.size()).toBe(1);
expect(c.get("k")).toBe("v2");
});
it("evicts oldest when cap exceeded", () => {
const c = new URICache(3);
c.set("a", "1");
c.set("b", "2");
c.set("c", "3");
c.set("d", "4"); // evicts "a"
expect(c.size()).toBe(3);
expect(c.get("a")).toBeUndefined();
expect(c.get("b")).toBe("2");
expect(c.get("c")).toBe("3");
expect(c.get("d")).toBe("4");
});
it("promotes on get — most-recently-accessed survives eviction", () => {
const c = new URICache(3);
c.set("a", "1");
c.set("b", "2");
c.set("c", "3");
// Touch "a" so it becomes most-recent.
expect(c.get("a")).toBe("1");
// Set "d" — eviction should now drop "b" (which is the new oldest).
c.set("d", "4");
expect(c.get("a")).toBe("1");
expect(c.get("b")).toBeUndefined();
expect(c.get("c")).toBe("3");
expect(c.get("d")).toBe("4");
});
it("clear empties the cache", () => {
const c = new URICache();
c.set("a", "1");
c.set("b", "2");
expect(c.size()).toBe(2);
c.clear();
expect(c.size()).toBe(0);
expect(c.get("a")).toBeUndefined();
});
it("rejects maxEntries < 1", () => {
expect(() => new URICache(0)).toThrow();
expect(() => new URICache(-1)).toThrow();
});
it("default URI_CACHE_MAX_ENTRIES is 32 (TS-adapter budget)", () => {
// Python reference uses 1024 because the in-container runtime has
// the workspace's full memory; TS adapters in tighter budgets use 32.
expect(URI_CACHE_MAX_ENTRIES).toBe(32);
});
});
// ---------------------------------------------------------------------------
// isChatUploadReceiveRow
// ---------------------------------------------------------------------------
describe("isChatUploadReceiveRow", () => {
it("matches chat_upload_receive method", () => {
expect(isChatUploadReceiveRow({ method: "chat_upload_receive" })).toBe(true);
});
it("rejects other methods", () => {
expect(isChatUploadReceiveRow({ method: "message/send" })).toBe(false);
expect(isChatUploadReceiveRow({ method: "notify" })).toBe(false);
});
it("rejects non-object input defensively", () => {
expect(isChatUploadReceiveRow(null)).toBe(false);
expect(isChatUploadReceiveRow(undefined)).toBe(false);
expect(isChatUploadReceiveRow("chat_upload_receive")).toBe(false);
expect(isChatUploadReceiveRow(42)).toBe(false);
});
it("rejects object without method field", () => {
expect(isChatUploadReceiveRow({ activity_id: "x" })).toBe(false);
});
});
// ---------------------------------------------------------------------------
// resolvePendingUpload
// ---------------------------------------------------------------------------
describe("resolvePendingUpload", () => {
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "mcp-inbox-test-"));
});
afterEach(() => {
try {
fs.rmSync(tmpDir, { recursive: true, force: true });
} catch {
// best-effort cleanup
}
});
it("fetches content + writes file + acks + caches", async () => {
const bytes = new Uint8Array([1, 2, 3, 4, 5]);
const calls: Array<{ url: string; method: string }> = [];
const mockFetch: typeof fetch = async (url, init) => {
const m = (init?.method ?? "GET") as string;
const u = (url as string).toString();
calls.push({ url: u, method: m });
if (u.endsWith("/content")) {
return new Response(bytes, {
status: 200,
headers: { "content-type": "image/png" },
});
}
if (u.endsWith("/ack")) {
return new Response("", { status: 200 });
}
return new Response("not found", { status: 404 });
};
const cache = new URICache();
const result = await resolvePendingUpload({
workspaceId: "ws-1",
fileId: "file-abc",
authHeaders: { Authorization: "Bearer test-token" },
cacheDir: tmpDir,
filename: "pasted.png",
cache,
platformUrl: "https://api.test",
fetchImpl: mockFetch,
});
// Both endpoints called exactly once with the right shape.
expect(calls.length).toBe(2);
expect(calls[0].method).toBe("GET");
expect(calls[0].url).toBe(
"https://api.test/workspaces/ws-1/pending-uploads/file-abc/content",
);
expect(calls[1].method).toBe("POST");
expect(calls[1].url).toBe(
"https://api.test/workspaces/ws-1/pending-uploads/file-abc/ack",
);
// File written to disk with the expected size + mode.
expect(fs.existsSync(result.localPath)).toBe(true);
const stat = fs.statSync(result.localPath);
expect(stat.size).toBe(5);
// Filename has the 32-hex prefix + sanitized name.
expect(path.basename(result.localPath)).toMatch(/^[0-9a-f]{32}-pasted\.png$/);
// Result envelope shape.
expect(result.size).toBe(5);
expect(result.mimeType).toBe("image/png");
expect(result.localUri).toBe(`file://${result.localPath}`);
expect(result.cachedPendingUri).toBe("platform-pending:ws-1/file-abc");
// Cache populated.
expect(cache.get("platform-pending:ws-1/file-abc")).toBe(result.localUri);
});
it("throws on GET non-2xx", async () => {
const mockFetch: typeof fetch = async () =>
new Response("denied", { status: 403, statusText: "Forbidden" });
await expect(
resolvePendingUpload({
workspaceId: "ws-1",
fileId: "file-abc",
authHeaders: {},
cacheDir: tmpDir,
fetchImpl: mockFetch,
}),
).rejects.toThrow(/403 Forbidden/);
});
it("throws on size-cap breach BEFORE writing", async () => {
const bigBytes = new Uint8Array(11);
const mockFetch: typeof fetch = async (url) => {
const u = (url as string).toString();
if (u.endsWith("/content")) {
return new Response(bigBytes, { status: 200 });
}
return new Response("", { status: 200 });
};
await expect(
resolvePendingUpload({
workspaceId: "ws-1",
fileId: "file-abc",
authHeaders: {},
cacheDir: tmpDir,
maxBytes: 10,
fetchImpl: mockFetch,
}),
).rejects.toThrow(/exceeds maxBytes/);
// Tmpdir stayed empty — no partial write.
expect(fs.readdirSync(tmpDir).length).toBe(0);
});
it("logs but does not throw when ack fails", async () => {
const warn = jest.spyOn(console, "warn").mockImplementation(() => {});
const mockFetch: typeof fetch = async (url) => {
const u = (url as string).toString();
if (u.endsWith("/content")) {
return new Response(new Uint8Array([1]), { status: 200 });
}
// Ack returns 500.
return new Response("server error", { status: 500, statusText: "Server Error" });
};
const result = await resolvePendingUpload({
workspaceId: "ws-1",
fileId: "file-abc",
authHeaders: {},
cacheDir: tmpDir,
fetchImpl: mockFetch,
});
expect(result.size).toBe(1);
expect(warn).toHaveBeenCalledWith(expect.stringMatching(/POST .*\/ack returned 500/));
warn.mockRestore();
});
it("default filename + sanitizes traversal attempts", async () => {
const mockFetch: typeof fetch = async (url) => {
const u = (url as string).toString();
if (u.endsWith("/content")) {
return new Response(new Uint8Array([0]), { status: 200 });
}
return new Response("", { status: 200 });
};
const result = await resolvePendingUpload({
workspaceId: "ws-1",
fileId: "file-abc",
authHeaders: {},
cacheDir: tmpDir,
filename: "../../../etc/passwd",
fetchImpl: mockFetch,
});
// Final filename strips the path components and keeps a safe name.
const base = path.basename(result.localPath);
expect(base).not.toContain("../");
expect(base).toMatch(/^[0-9a-f]{32}-passwd$/);
});
it("uses workspaceId + fileId in URL encoding", async () => {
const calls: string[] = [];
const mockFetch: typeof fetch = async (url) => {
calls.push((url as string).toString());
return new Response(new Uint8Array([1]), { status: 200 });
};
await resolvePendingUpload({
workspaceId: "ws with space",
fileId: "file/with/slash",
authHeaders: {},
cacheDir: tmpDir,
fetchImpl: mockFetch,
platformUrl: "https://api.test",
});
// Both ws and fileId percent-encoded.
expect(calls[0]).toBe(
"https://api.test/workspaces/ws%20with%20space/pending-uploads/file%2Fwith%2Fslash/content",
);
});
it("validates required workspaceId, fileId, cacheDir", async () => {
const noop: typeof fetch = async () => new Response("", { status: 200 });
await expect(
resolvePendingUpload({
workspaceId: "",
fileId: "f",
authHeaders: {},
cacheDir: tmpDir,
fetchImpl: noop,
}),
).rejects.toThrow(/workspaceId/);
await expect(
resolvePendingUpload({
workspaceId: "w",
fileId: "",
authHeaders: {},
cacheDir: tmpDir,
fetchImpl: noop,
}),
).rejects.toThrow(/fileId/);
await expect(
resolvePendingUpload({
workspaceId: "w",
fileId: "f",
authHeaders: {},
cacheDir: "",
fetchImpl: noop,
}),
).rejects.toThrow(/cacheDir/);
});
});
// ---------------------------------------------------------------------------
// rewritePendingURIs
// ---------------------------------------------------------------------------
describe("rewritePendingURIs", () => {
it("rewrites a bare platform-pending: string", () => {
const cache = new URICache();
cache.set("platform-pending:ws/a", "file:///tmp/x");
expect(rewritePendingURIs("platform-pending:ws/a", cache)).toBe("file:///tmp/x");
});
it("preserves URI on cache miss (no silent drop)", () => {
const cache = new URICache();
expect(rewritePendingURIs("platform-pending:ws/missing", cache)).toBe(
"platform-pending:ws/missing",
);
});
it("rewrites top-level attachments[] uri", () => {
const cache = new URICache();
cache.set("platform-pending:ws/a", "file:///tmp/a.png");
const body = {
attachments: [
{ kind: "image", uri: "platform-pending:ws/a", name: "a.png", mime_type: "image/png" },
],
text: "hello",
};
const out = rewritePendingURIs(body, cache) as typeof body;
expect(out.attachments[0].uri).toBe("file:///tmp/a.png");
expect(out.attachments[0].name).toBe("a.png");
expect(out.text).toBe("hello");
});
it("rewrites embedded message.parts[*].file.uri", () => {
const cache = new URICache();
cache.set("platform-pending:ws/img", "file:///tmp/img.png");
cache.set("platform-pending:ws/aud", "file:///tmp/aud.mp3");
const body = {
params: {
message: {
parts: [
{ kind: "text", text: "see attached" },
{
kind: "image",
file: { uri: "platform-pending:ws/img", mime_type: "image/png", name: "img.png" },
},
{
kind: "audio",
file: { uri: "platform-pending:ws/aud", mime_type: "audio/mpeg", name: "aud.mp3" },
},
],
},
},
};
const out = rewritePendingURIs(body, cache) as typeof body;
expect(out.params.message.parts[0]).toEqual({ kind: "text", text: "see attached" });
expect(out.params.message.parts[1].file!.uri).toBe("file:///tmp/img.png");
expect(out.params.message.parts[2].file!.uri).toBe("file:///tmp/aud.mp3");
});
it("non-URI strings pass through unchanged", () => {
const cache = new URICache();
cache.set("platform-pending:ws/x", "file:///tmp/x");
expect(rewritePendingURIs("hello world", cache)).toBe("hello world");
expect(rewritePendingURIs("workspace:/tmp/foo.pdf", cache)).toBe(
"workspace:/tmp/foo.pdf",
);
});
it("does not mutate the input", () => {
const cache = new URICache();
cache.set("platform-pending:ws/a", "file:///tmp/a");
const input = { x: "platform-pending:ws/a" };
const out = rewritePendingURIs(input, cache) as typeof input;
// Input unchanged.
expect(input.x).toBe("platform-pending:ws/a");
// Output rewritten.
expect(out.x).toBe("file:///tmp/a");
// Different identity (new object).
expect(out).not.toBe(input);
});
it("handles null + undefined + primitives", () => {
const cache = new URICache();
expect(rewritePendingURIs(null, cache)).toBeNull();
expect(rewritePendingURIs(undefined, cache)).toBeUndefined();
expect(rewritePendingURIs(42, cache)).toBe(42);
expect(rewritePendingURIs(true, cache)).toBe(true);
});
it("walks deep into nested arrays + objects", () => {
const cache = new URICache();
cache.set("platform-pending:ws/deep", "file:///tmp/deep");
const body = {
a: { b: { c: [{ d: "platform-pending:ws/deep" }] } },
};
const out = rewritePendingURIs(body, cache) as {
a: { b: { c: Array<{ d: string }> } };
};
expect(out.a.b.c[0].d).toBe("file:///tmp/deep");
});
});
+385
View File
@@ -0,0 +1,385 @@
/**
* inbox-uploads — chat-upload resolution flow for /activity-polling adapters.
*
* MANDATORY contract surface for any TS adapter that consumes `chat_upload_receive`
* activity rows. Mirrors the Python reference at
* molecule_runtime/inbox_uploads.py
* in `molecule-ai-workspace-runtime` (724 LOC; the in-container runtime's
* upload-resolution module).
*
* IF YOU EDIT THIS FILE:
* - Mirror the change in the Python reference (`molecule_runtime/inbox_uploads.py`).
* - If the contract semantics change (steps, ordering, endpoint shape),
* ALSO update the spec section in
* `molecule_runtime/a2a_mcp_server.py::_build_channel_instructions`
* ("Upload resolution (MANDATORY...)" block).
* - The Layer D contract test in `__tests__/inbox-uploads-import-contract.test.ts`
* will fail-CI on any TS file that imports `apiCall` from
* `@molecule-ai/mcp-server` to poll /activity but does NOT also import
* `resolvePendingUpload` (or opts out via the documented magic comment).
*
* Bidirectional drift catchable from either side:
* - Python side: `tests/test_upload_resolution_contract.py` pins the
* spec text (steps named verbatim, references to BOTH this TS file
* AND the Python file, kind enumeration including video).
* - TS side: `__tests__/inbox-uploads.test.ts` pins URICache LRU
* semantics, fetch/persist/ack/cache/rewrite flow, JSON-walk rewrite
* across attachments[] and message.parts surfaces.
*
* Origin: RFC#640 4-layer cascade Layer B. CTO chat GO 2026-05-22T01:31:48Z.
* Empirical trigger: 2026-05-21 ~23:12Z agents-team canvas paste —
* channel plugin had no resolution code path and surfaced
* `platform-pending:` URIs the agent couldn't open. Layer B closes the
* asymmetry between Python SDK (full module) and TS base MCP (zero module).
*/
import * as fs from "node:fs/promises";
import * as path from "node:path";
import * as crypto from "node:crypto";
import { PLATFORM_URL } from "./api.js";
// ---------------------------------------------------------------------------
// LRU cache (mirrors molecule_runtime/inbox_uploads.py::_URICache semantics)
// ---------------------------------------------------------------------------
/**
* Default LRU bound for TS adapters. Tighter than the Python reference
* (which uses `URI_CACHE_MAX_ENTRIES=1024` because the in-container
* runtime has the workspace's full memory budget) because TS adapters
* — channel plugin, telegram-style adapters, codex bridges — typically
* run in a host shell or sidecar with less memory headroom. 32 entries
* comfortably covers a single agent session's upload count (the
* empirical canvas paste was 1 file; even an aggressive multi-file
* drag rarely exceeds 5-10).
*
* Adapters with looser budgets can override via the URICache constructor.
*/
export const URI_CACHE_MAX_ENTRIES = 32;
/**
* Bounded LRU mapping `platform-pending:<ws>/<file_id>` → local file URI.
*
* JS Maps preserve insertion order, so we use the Map's natural iteration
* order for LRU: on `set`, delete-and-reinsert promotes the entry to
* most-recent; on `get`, same delete-and-reinsert promotes; eviction
* pops the first (oldest) entry.
*
* Not thread-safe — Node.js is single-threaded with cooperative async
* scheduling, so the Python reference's `threading.Lock` doesn't apply.
* A future Worker-thread adapter would need to add synchronization.
*/
export class URICache {
private entries: Map<string, string> = new Map();
constructor(private readonly maxEntries: number = URI_CACHE_MAX_ENTRIES) {
if (maxEntries < 1) {
throw new Error(`URICache maxEntries must be >= 1, got ${maxEntries}`);
}
}
get(pendingUri: string): string | undefined {
const local = this.entries.get(pendingUri);
if (local !== undefined) {
// Promote to most-recent.
this.entries.delete(pendingUri);
this.entries.set(pendingUri, local);
}
return local;
}
set(pendingUri: string, localUri: string): void {
// If already present, delete first so the re-set lands at most-recent.
if (this.entries.has(pendingUri)) {
this.entries.delete(pendingUri);
}
this.entries.set(pendingUri, localUri);
while (this.entries.size > this.maxEntries) {
const oldest = this.entries.keys().next().value;
if (oldest === undefined) break;
this.entries.delete(oldest);
}
}
size(): number {
return this.entries.size;
}
clear(): void {
this.entries.clear();
}
}
// ---------------------------------------------------------------------------
// Activity-row matcher (mirrors molecule_runtime/inbox_uploads.py::is_chat_upload_row)
// ---------------------------------------------------------------------------
/**
* True iff `row` is a `chat_upload_receive` activity row.
*
* Adapters fork this row off the regular A2A message handling path —
* it's not a peer message; it's an instruction to fetch + stage bytes.
* Match on `method` only; the upstream `/activity` filter already
* scopes by `activity_type=a2a_receive` if needed.
*/
export function isChatUploadReceiveRow(row: unknown): boolean {
return (
typeof row === "object" &&
row !== null &&
(row as { method?: unknown }).method === "chat_upload_receive"
);
}
// ---------------------------------------------------------------------------
// Fetch + persist + ack flow
// ---------------------------------------------------------------------------
/**
* Result of a successful resolvePendingUpload call.
*
* - `localPath`: absolute path on the local filesystem where bytes were
* written. Adapters that surface a `file://` URI to the agent use
* this directly.
* - `localUri`: `file://...` URI variant of localPath; convenience for
* adapters that pass URIs through to the agent / model context.
* - `mimeType`: from the platform's Content-Type response header, if
* present and parseable. Undefined when the platform doesn't supply.
* - `size`: byte count of what was written.
* - `cachedPendingUri`: the `platform-pending:<ws>/<file_id>` URI used
* as the cache key. Adapters that want to update an external URI
* cache (beyond the one passed in via opts.cache) use this.
*/
export interface ResolveUploadResult {
localPath: string;
localUri: string;
mimeType?: string;
size: number;
cachedPendingUri: string;
}
/**
* Options for resolvePendingUpload.
*
* Required:
* - `workspaceId`: the workspace UUID — same one used for /activity polling.
* - `fileId`: the `<file_id>` from `platform-pending:<ws>/<file_id>` or
* from the activity row's request_body.
* - `authHeaders`: HTTP headers including the Bearer auth — adapters
* pass the SAME headers they use for /activity polling. The
* /pending-uploads/<id>/content + /ack endpoints are wsAuth-gated, so
* the workspace's bearer is sufficient (no separate handshake).
* - `cacheDir`: absolute directory path where bytes are persisted.
* Adapter-specific:
* - Claude Code channel plugin: `~/.claude/channels/molecule/inbox/`
* - In-container Python runtime: `/workspace/.molecule/chat-uploads/`
* - Other adapters: pick a stable, adapter-specific path.
*
* Optional:
* - `filename`: hint for the on-disk filename (without prefix). The
* final filename is `<32-hex-prefix>-<sanitized-filename>` so that
* parallel uploads with the same source name don't collide.
* Default `upload.bin` if not supplied.
* - `cache`: a URICache instance to populate with the
* `platform-pending:<ws>/<file_id>` → `file://<localPath>` mapping
* on success. If omitted, no cache write happens (caller manages
* cache separately).
* - `platformUrl`: override the platform base URL (defaults to
* PLATFORM_URL from `./api.js` — `MOLECULE_API_URL` env var).
* - `fetchImpl`: override `globalThis.fetch` for testing.
* - `maxBytes`: per-file safety cap. Default 25 MiB matching the
* platform's same-side staging cap.
*/
export interface ResolveUploadOptions {
workspaceId: string;
fileId: string;
authHeaders: Record<string, string>;
cacheDir: string;
filename?: string;
cache?: URICache;
platformUrl?: string;
fetchImpl?: typeof fetch;
maxBytes?: number;
}
const DEFAULT_MAX_BYTES = 25 * 1024 * 1024;
/**
* Fetch the bytes of a `platform-pending:<ws>/<file_id>` upload, persist
* to a local cache dir, ack the platform-side `pending_uploads` row,
* and (if a cache is provided) record the URI mapping.
*
* Returns the full result envelope. On any failure (network, non-2xx,
* fs write error, size-cap breach) throws an Error with a structured
* message. The platform-side row stays unacked when the throw originates
* upstream of the ack POST — adapters' poll-loop retry semantics carry
* it through to a future invocation.
*
* This is the 5-step MANDATORY flow named in the
* `_build_channel_instructions` spec section. Skipping any step results
* in silent file loss — the agent sees `platform-pending:` URIs it
* cannot open with no error surfaced. The flow:
*
* 1. GET /workspaces/<ws>/pending-uploads/<file_id>/content
* 2. mkdir + write to cacheDir/<prefix>-<filename> (mode 0600)
* 3. POST /workspaces/<ws>/pending-uploads/<file_id>/ack
* 4. cache.set("platform-pending:<ws>/<file_id>", "file://<localPath>")
* 5. (URI rewrite is the caller's concern — use rewritePendingURIs())
*/
export async function resolvePendingUpload(
opts: ResolveUploadOptions,
): Promise<ResolveUploadResult> {
const {
workspaceId,
fileId,
authHeaders,
cacheDir,
filename = "upload.bin",
cache,
platformUrl = PLATFORM_URL,
fetchImpl = fetch,
maxBytes = DEFAULT_MAX_BYTES,
} = opts;
if (!workspaceId) throw new Error("resolvePendingUpload: workspaceId required");
if (!fileId) throw new Error("resolvePendingUpload: fileId required");
if (!cacheDir) throw new Error("resolvePendingUpload: cacheDir required");
const pendingUri = `platform-pending:${workspaceId}/${fileId}`;
const baseUrl = `${platformUrl}/workspaces/${encodeURIComponent(workspaceId)}/pending-uploads/${encodeURIComponent(fileId)}`;
const contentUrl = `${baseUrl}/content`;
const ackUrl = `${baseUrl}/ack`;
// Step 1: fetch content
const res = await fetchImpl(contentUrl, {
method: "GET",
headers: authHeaders,
});
if (!res.ok) {
throw new Error(
`resolvePendingUpload: GET ${contentUrl} returned ${res.status} ${res.statusText}`,
);
}
const ab = await res.arrayBuffer();
const bytes = new Uint8Array(ab);
if (bytes.byteLength > maxBytes) {
throw new Error(
`resolvePendingUpload: content size ${bytes.byteLength} exceeds maxBytes ${maxBytes}`,
);
}
const mimeType = (res.headers.get("content-type") ?? undefined) || undefined;
// Step 2: persist to local cache dir
await fs.mkdir(cacheDir, { recursive: true });
const sanitized = sanitizeFilename(filename);
// 32-hex prefix matches Python's pysecrets.token_hex(16) — random
// enough that two parallel uploads of the same source filename can't
// collide; also defeats any "guess the on-disk name" attack from a
// stale agent that knows the original filename.
const prefix = crypto.randomBytes(16).toString("hex");
const stored = `${prefix}-${sanitized}`;
const localPath = path.join(cacheDir, stored);
// mode 0o600 — only this process's user can read. Matches the Python
// reference's _open_safe pattern. wx mode rejects pre-existing files
// at the target (the 32-hex prefix makes collision astronomical, but
// defense-in-depth costs nothing).
await fs.writeFile(localPath, bytes, { mode: 0o600, flag: "wx" });
// Step 3: ack
const ackRes = await fetchImpl(ackUrl, {
method: "POST",
headers: authHeaders,
});
if (!ackRes.ok) {
// Failure here means the bytes ARE on disk but the platform row
// stays in the pending queue. Phase 3 sweep will eventually
// surface the stale row; the agent already has the local file.
// We log + continue rather than throw, because the user-visible
// outcome (agent can read the file) is achieved.
// eslint-disable-next-line no-console
console.warn(
`resolvePendingUpload: POST ${ackUrl} returned ${ackRes.status} ${ackRes.statusText} ` +
`— bytes written locally but platform-side row not reclaimed`,
);
}
// Step 4: cache the mapping
const localUri = `file://${localPath}`;
if (cache) {
cache.set(pendingUri, localUri);
}
return {
localPath,
localUri,
mimeType,
size: bytes.byteLength,
cachedPendingUri: pendingUri,
};
}
// ---------------------------------------------------------------------------
// URI rewrite (mirrors molecule_runtime/inbox_uploads.py::rewrite_request_body
// + the broader walk semantics)
// ---------------------------------------------------------------------------
/**
* Walk `body` (arbitrary JSON-shaped value) and rewrite any
* `platform-pending:<ws>/<file_id>` URIs to their cached local URIs.
*
* The walk is deep + non-destructive: returns a new value with
* substitutions applied; the input is not mutated.
*
* Two surfaces are explicitly handled because they're the documented
* inbound shapes that carry attachment URIs:
* - Top-level `attachments[]` array (peer_info-enriched activity rows)
* - Embedded `params.message.parts[*].file.uri` (a2a-sdk v1 message
* parts; the in-container runtime emits these for peer-agent
* attachments)
*
* The walk is conservative: it ONLY rewrites string values that exactly
* start with `platform-pending:` and are present in the cache. Other
* strings (text content, identity fields, etc.) pass through unchanged.
* A cache miss (URI not yet resolved) leaves the URI in place — the
* agent will see something it can't open, which is preferable to
* silently dropping the URI.
*/
export function rewritePendingURIs(body: unknown, cache: URICache): unknown {
if (body === null || body === undefined) return body;
if (typeof body === "string") {
if (body.startsWith("platform-pending:")) {
const local = cache.get(body);
return local ?? body;
}
return body;
}
if (Array.isArray(body)) {
return body.map((item) => rewritePendingURIs(item, cache));
}
if (typeof body === "object") {
const out: Record<string, unknown> = {};
for (const [k, v] of Object.entries(body as Record<string, unknown>)) {
out[k] = rewritePendingURIs(v, cache);
}
return out;
}
return body;
}
// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------
/**
* Sanitize a filename: keep alnum + dash + underscore + dot, collapse
* everything else to `_`. Defense against ../ traversal, shell-meta
* chars, and null bytes in user-supplied filenames.
*/
function sanitizeFilename(name: string): string {
if (!name) return "upload.bin";
// Strip any directory components.
const base = name.replace(/^.*[/\\]/, "");
// Drop null bytes + non-portable chars; collapse runs of `_`.
const cleaned = base.replace(/[^A-Za-z0-9._-]/g, "_").replace(/_+/g, "_");
if (!cleaned || cleaned === "." || cleaned === "..") return "upload.bin";
return cleaned.slice(0, 240); // ext4 NAME_MAX = 255; leave room for the prefix
}
+16
View File
@@ -33,6 +33,22 @@ import { registerRemoteAgentTools } from "./tools/remote_agents.js";
// export triggers a compile error instead of a silent undefined at import.
export { PLATFORM_URL, apiCall, isApiError, platformGet, toMcpResult, toMcpText } from "./api.js";
export type { ApiError } from "./api.js";
// RFC#640 Layer B — chat-upload resolution flow. MANDATORY surface for
// any /activity-polling adapter (channel plugin, telegram-style
// adapters, codex bridges) that consumes chat_upload_receive rows.
// See molecule_runtime/a2a_mcp_server.py::_build_channel_instructions
// "Upload resolution (MANDATORY...)" for the spec.
export {
URICache,
URI_CACHE_MAX_ENTRIES,
resolvePendingUpload,
rewritePendingURIs,
isChatUploadReceiveRow,
} from "./inbox-uploads.js";
export type {
ResolveUploadOptions,
ResolveUploadResult,
} from "./inbox-uploads.js";
export { formatTargetSummary, parseWorkspaceTargets } from "./targets.js";
export type { WorkspaceTarget } from "./targets.js";
export {