molecule-mcp-claude-channel/server.ts
Hongming Wang 68d8ae981f test(extractText): bootstrap bun:test + pin v0/v1 part discriminator
Extracts the extractText helper + ActivityEntry type into their own
module so unit tests can import them without dragging server.ts's
top-level boot side-effects (cursor load, MCP transport connect, poll
loop) into the test runner. server.ts re-imports both — the wire
behavior is unchanged.

Tests cover the v0/v1 part-discriminator regression that landed on
2026-04-30 (every canvas peer message returning act.summary because
parts had `kind` instead of `type`):
- v1 `kind: text` parts (current production shape)
- v0 `type: text` back-compat
- multi-part text join, ignore non-text parts
- body-shape priority: params.message.parts > params.parts > body.parts
- empty-text-part recovery: skips empty candidate, tries next
- summary fallback when no shape matches
- `(empty A2A message)` fallback when summary is null

Adds:
- extract-text.ts (helper + type, no side-effects)
- extract-text.test.ts (9 tests)
- .github/workflows/test.yml (bun test on push/PR)
- "test": "bun test" script in package.json

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 20:15:31 -07:00

1257 lines
54 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env bun
/**
* Molecule AI channel for Claude Code.
*
* MCP server that bridges Molecule A2A traffic into the active Claude Code
* session and routes Claude's replies back through Molecule's A2A endpoints.
*
* Inbound (A2A → Claude turn): polls each watched workspace's
* GET /workspaces/:id/activity?since_secs=N&type=a2a_receive
* and emits an MCP `notifications/claude/channel` for each new event.
* Polling (vs push) is the default because it works through every NAT/firewall
* with zero infra — no tunnel required. For production setups with a public
* inbound URL, see #2 in the README ("push mode", future).
*
* Outbound (Claude reply → A2A): exposes the `reply_to_workspace` and
* `start_workspace_chat` MCP tools that POST to /workspaces/:id/a2a.
*
* State lives in ~/.claude/channels/molecule/:
* - access.json workspace allowlist + per-workspace auth
* - .env MOLECULE_PLATFORM_URL + tokens (chmod 600)
* - bot.pid singleton lock
* - inbox/ file attachments downloaded from peers
*
* Multi-workspace: declare MOLECULE_WORKSPACE_IDS as a comma-separated list;
* each id polls independently. Auth is per-workspace via
* MOLECULE_WORKSPACE_TOKENS (same order, comma-separated).
*
* Cancellation: SIGTERM/SIGINT cleanly drains in-flight pollers + posts a
* single "channel disconnecting" line back to each watched workspace so
* peers see a deliberate close, not a silent timeout.
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js'
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
import {
ListToolsRequestSchema,
CallToolRequestSchema,
} from '@modelcontextprotocol/sdk/types.js'
import { z } from 'zod'
import { readFileSync, writeFileSync, mkdirSync, chmodSync, existsSync, renameSync, unlinkSync } from 'fs'
import { homedir } from 'os'
import { join } from 'path'
import { extractText, type ActivityEntry } from './extract-text.ts'
// ─── Config ─────────────────────────────────────────────────────────────
const STATE_DIR = process.env.MOLECULE_STATE_DIR ?? join(homedir(), '.claude', 'channels', 'molecule')
const ENV_FILE = join(STATE_DIR, '.env')
const PID_FILE = join(STATE_DIR, 'bot.pid')
const CURSOR_FILE = join(STATE_DIR, 'cursor.json')
// Load ~/.claude/channels/molecule/.env into process.env. Real env wins.
// Plugin-spawned servers don't get an env block — this is where tokens live.
mkdirSync(STATE_DIR, { recursive: true, mode: 0o700 })
try {
// Token is a credential — lock to owner. No-op on Windows (would need ACLs).
chmodSync(ENV_FILE, 0o600)
for (const line of readFileSync(ENV_FILE, 'utf8').split('\n')) {
const m = line.match(/^([A-Z_][A-Z0-9_]*)=(.*)$/)
if (m && process.env[m[1]] === undefined) process.env[m[1]] = m[2]
}
} catch {
// Missing .env on first run is fine; we'll fail loudly below if required vars are absent.
}
const PLATFORM_URL = process.env.MOLECULE_PLATFORM_URL?.replace(/\/$/, '')
const WORKSPACE_IDS = (process.env.MOLECULE_WORKSPACE_IDS ?? '')
.split(',').map(s => s.trim()).filter(Boolean)
const WORKSPACE_TOKENS = (process.env.MOLECULE_WORKSPACE_TOKENS ?? '')
.split(',').map(s => s.trim()).filter(Boolean)
const POLL_INTERVAL_MS = parseInt(process.env.MOLECULE_POLL_INTERVAL_MS ?? '5000', 10)
// POLL_WINDOW_SECS is only used for the initial "watch from now" cursor seed
// — after that, the cursor (since_id) drives every subsequent poll. Older
// versions of the plugin used since_secs as the primary filter; v0.2 keeps
// the env var for compat but its meaning is narrower.
const POLL_WINDOW_SECS = parseInt(process.env.MOLECULE_POLL_WINDOW_SECS ?? '30', 10)
// MOLECULE_AGENT_NAME / MOLECULE_AGENT_DESC populate the agent_card the plugin
// posts to /registry/register on startup. Both have sane defaults — set them
// only when you want the canvas tab to show something specific.
const AGENT_NAME = process.env.MOLECULE_AGENT_NAME ?? 'Claude Code (channel)'
const AGENT_DESC = process.env.MOLECULE_AGENT_DESC ??
'Local Claude Code session bridged via molecule-mcp-claude-channel'
// MOLECULE_AUTO_REGISTER_POLL controls the startup auto-register behavior.
// Default is "yes" — the plugin's whole point is to make a poll-mode
// workspace work without manual canvas configuration. Set to "0" / "false"
// if you've already configured the workspace another way and don't want
// the plugin overwriting agent_card on every restart.
const AUTO_REGISTER_POLL = !['0', 'false', 'no'].includes(
(process.env.MOLECULE_AUTO_REGISTER_POLL ?? 'true').toLowerCase()
)
if (!PLATFORM_URL || WORKSPACE_IDS.length === 0 || WORKSPACE_TOKENS.length === 0) {
process.stderr.write(
`molecule channel: required config missing\n` +
` set in ${ENV_FILE}\n` +
` format:\n` +
` MOLECULE_PLATFORM_URL=https://your-tenant.staging.moleculesai.app\n` +
` MOLECULE_WORKSPACE_IDS=ws-uuid-1,ws-uuid-2\n` +
` MOLECULE_WORKSPACE_TOKENS=tok-1,tok-2\n` +
` optional:\n` +
` MOLECULE_POLL_INTERVAL_MS=5000\n` +
` MOLECULE_POLL_WINDOW_SECS=30\n`
)
process.exit(1)
}
if (WORKSPACE_IDS.length !== WORKSPACE_TOKENS.length) {
process.stderr.write(
`molecule channel: MOLECULE_WORKSPACE_IDS and MOLECULE_WORKSPACE_TOKENS must have ` +
`the same number of entries (got ${WORKSPACE_IDS.length} ids vs ${WORKSPACE_TOKENS.length} tokens)\n`
)
process.exit(1)
}
const TOKEN_BY_WORKSPACE = new Map<string, string>(
WORKSPACE_IDS.map((id, i) => [id, WORKSPACE_TOKENS[i]])
)
// ─── Singleton lock ─────────────────────────────────────────────────────
//
// One channel server per host — multiple Claude sessions polling the same
// workspaces would race the dedup state and double-deliver. If a previous
// session crashed (SIGKILL, terminal closed) its server can survive as an
// orphan; kill it before we start.
try {
const stale = parseInt(readFileSync(PID_FILE, 'utf8'), 10)
if (stale > 1 && stale !== process.pid) {
process.kill(stale, 0) // throws if dead
process.stderr.write(`molecule channel: replacing stale poller pid=${stale}\n`)
process.kill(stale, 'SIGTERM')
}
} catch {}
writeFileSync(PID_FILE, String(process.pid))
// Unlink the PID file on every exit path — including process.exit(N)
// from the cursor-support probe (v0.2.1) which doesn't go through the
// SIGINT/SIGTERM handlers. Without this, a non-clean exit leaves a
// stale pid in PID_FILE pointing at a dead pid; the next launch's
// `process.kill(stale, 'SIGTERM')` (above) would deliver the signal to
// whatever unrelated process now owns that PID — exactly the cross-
// process-kill hazard the singleton lock exists to prevent. exit
// listeners only run synchronous code; unlinkSync is the right tool.
process.on('exit', () => {
try {
const owned = parseInt(readFileSync(PID_FILE, 'utf8'), 10)
if (owned === process.pid) unlinkSync(PID_FILE)
} catch {
// Already gone, or another process took ownership — leave it alone.
}
})
// Last-resort safety net — without these the process dies silently on any
// unhandled promise rejection. With them it logs and keeps serving tools.
process.on('unhandledRejection', err => {
process.stderr.write(`molecule channel: unhandled rejection: ${err}\n`)
})
process.on('uncaughtException', err => {
process.stderr.write(`molecule channel: uncaught exception: ${err}\n`)
})
// ─── Activity polling (inbound) ─────────────────────────────────────────
//
// One independent poll loop per watched workspace. Each loop tracks the
// max activity_id it has seen so far; on each tick it queries
// GET /workspaces/:id/activity?since_secs=POLL_WINDOW_SECS&type=a2a_receive
// and emits an MCP notification for any activity whose id is new.
//
// `since_secs` is wider than the poll interval (30s vs 5s by default) so a
// single missed tick (transient network blip) doesn't lose messages — the
// next tick re-fetches the overlap window and the seen-id dedup filters it.
//
// activity_logs is paged out at 30 days, so an honest seen-id set never
// grows unbounded; new sessions start fresh.
// ActivityEntry lives in extract-text.ts (imported above) so unit
// tests can import the type + helper without triggering server.ts's
// boot-time side-effects (cursor load, MCP transport connect).
// ─── Cursor persistence ────────────────────────────────────────────────
//
// v0.2 switches from the v0.1 since_secs+seenIds scheme to a Telegram-style
// since_id cursor. The cursor is the activity_logs.id of the last event
// this plugin successfully delivered to Claude. Server returns events
// strictly after that id in ASC order, so we never miss or replay.
//
// Persisted to ${CURSOR_FILE} as a JSON object keyed by workspace_id.
// Atomic write via temp + rename so a crash mid-write can't corrupt the
// file (the previous cursor stays valid; worst case is a few replays
// after the crash, which still beats the v0.1 30-second time-window).
//
// Schema: { "ws-uuid-1": "act-uuid-X", "ws-uuid-2": "act-uuid-Y", ... }
// Missing key = "first run" → seeds from most-recent without processing.
// 410 from server = cursor stale → drop key, re-seed on next tick.
const cursors = new Map<string, string>()
function loadCursors(): void {
if (!existsSync(CURSOR_FILE)) return
try {
const raw = readFileSync(CURSOR_FILE, 'utf8')
const parsed = JSON.parse(raw) as Record<string, unknown>
for (const [k, v] of Object.entries(parsed)) {
if (typeof v === 'string' && v.length > 0) cursors.set(k, v)
}
} catch (err) {
// Corrupt cursor file = treat as no cursors. Worst case: each watched
// workspace re-seeds from now on the next tick (no replay, no message
// loss for events arriving AFTER the seed). Don't fail-fast here —
// a poller that refuses to start because of one bad file is more
// annoying than the recovery cost.
process.stderr.write(`molecule channel: cursor file unreadable (${err}); starting fresh\n`)
}
}
function saveCursors(): void {
const obj: Record<string, string> = {}
for (const [k, v] of cursors) obj[k] = v
const tmp = `${CURSOR_FILE}.tmp.${process.pid}`
try {
writeFileSync(tmp, JSON.stringify(obj, null, 2), { mode: 0o600 })
renameSync(tmp, CURSOR_FILE)
} catch (err) {
// Cursor write failure is recoverable (next successful poll re-saves);
// log on stderr so the user sees disk-full / readonly-fs early.
process.stderr.write(`molecule channel: cursor save failed: ${err}\n`)
}
}
async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/activity`)
url.searchParams.set('type', 'a2a_receive')
url.searchParams.set('limit', '100')
const cursor = cursors.get(workspaceId)
if (cursor) {
// Steady-state: server returns rows strictly after cursor in ASC order.
url.searchParams.set('since_id', cursor)
} else {
// First run for this workspace — deliver every event in the POLL_WINDOW_SECS
// backfill window, then advance the cursor past the newest. The previous
// policy was seed-then-skip on the assumption that pre-session events
// were "out of context", but operators routinely restart Claude Code
// mid-conversation and EXPECT the queued message to be delivered (otherwise
// the user typed something, restarted to enable replies, and got silence
// — exactly the friction this channel is supposed to remove).
//
// Backfill is bounded by POLL_WINDOW_SECS so a long-idle restart doesn't
// replay weeks of conversation. Set POLL_WINDOW_SECS=0 to opt out and
// restore the old skip-on-cold-start behavior.
url.searchParams.set('since_secs', String(POLL_WINDOW_SECS))
}
let resp: Response
try {
resp = await fetch(url, {
headers: {
Authorization: `Bearer ${token}`,
// Same-origin header — required by the tenant's edge WAF on hosted
// SaaS deployments. Without it the WAF rewrites the request and
// /workspaces/* returns an empty 404 (it's silently routed to the
// canvas Next.js, which has no /workspaces page). Node/Bun fetch
// doesn't auto-set Origin (that's a browser-only concern), so we
// set it explicitly to PLATFORM_URL — the only origin the bearer
// is valid against anyway, so no risk of leaking it elsewhere.
Origin: PLATFORM_URL,
},
signal: AbortSignal.timeout(10_000),
})
} catch (err) {
process.stderr.write(`molecule channel: poll ${workspaceId} fetch failed: ${err}\n`)
return
}
if (resp.status === 410) {
// Cursor row is gone (pruned, or never existed if the env var was
// hand-edited). Drop the cursor; next tick re-seeds from most-recent.
process.stderr.write(`molecule channel: poll ${workspaceId} cursor stale (410) — re-seeding\n`)
cursors.delete(workspaceId)
saveCursors()
return
}
if (!resp.ok) {
// 401/403 = bad token; 404 = workspace doesn't exist; 5xx = transient.
// Surface 4xx on stderr so the user sees auth/config issues immediately.
if (resp.status >= 400 && resp.status < 500) {
process.stderr.write(
`molecule channel: poll ${workspaceId} returned ${resp.status}` +
`check MOLECULE_WORKSPACE_TOKENS / MOLECULE_WORKSPACE_IDS in ${ENV_FILE}\n`
)
}
return
}
let activities: ActivityEntry[]
try {
activities = (await resp.json()) as ActivityEntry[]
} catch (err) {
process.stderr.write(`molecule channel: poll ${workspaceId} parse failed: ${err}\n`)
return
}
// Cold-start AND steady-state share the same delivery shape: walk
// ASC-ordered events, emit each, advance cursor past the newest. The
// only difference is whether we got rows by since_id (steady-state) or
// since_secs (cold start backfill); the platform returns the same
// column shape and ordering either way.
//
// Advance cursor even on emit failure — the alternative (block on
// notification failure) would stall the channel entirely, and
// notification delivery is best-effort anyway.
if (activities.length === 0) return
for (const act of activities) {
emitNotification(mcp, workspaceId, act)
}
const newest = activities[activities.length - 1].id
if (newest !== cursor) {
cursors.set(workspaceId, newest)
saveCursors()
}
}
// ─── Cursor-support probe (startup compat check) ──────────────────────
//
// v0.2 relies on the since_id cursor on /activity (Molecule-AI/molecule-core
// PR #2354). Older platforms silently ignore the query param and return
// whatever the default time window covers, which would make us re-deliver
// the same activities on every tick — a worse silent-duplicate bug than
// any failure mode v0.1 had.
//
// Detect at startup with a known-invalid UUID. PR-#2354+ answers 410 Gone
// for any cursor that doesn't resolve to an activity_logs row. Pre-#2354
// servers ignore the param and answer 200 OK. We use the all-zero UUID
// because gen_random_uuid() will never produce it (per RFC 4122 §4.4 the
// version + variant bits are non-zero), so a 410 is unambiguous.
//
// Probe failure is fatal — the user MUST upgrade. Falling back to v0.1
// behavior would re-introduce the message-loss-on-restart bug v0.2 was
// written to fix; failing loudly is the better default.
const PROBE_CURSOR = '00000000-0000-0000-0000-000000000000'
async function probeCursorSupport(workspaceId: string): Promise<'ok' | 'too_old' | 'inconclusive'> {
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/activity`)
url.searchParams.set('type', 'a2a_receive')
url.searchParams.set('since_id', PROBE_CURSOR)
url.searchParams.set('limit', '1')
let resp: Response
try {
resp = await fetch(url, {
headers: { Authorization: `Bearer ${token}`, Origin: PLATFORM_URL },
signal: AbortSignal.timeout(15_000),
})
} catch (err) {
process.stderr.write(`molecule channel: probe ${workspaceId} fetch failed: ${err}\n`)
return 'inconclusive'
}
if (resp.status === 410) return 'ok'
if (resp.status === 200) return 'too_old'
// 401/403/404/5xx — orthogonal to cursor support. Probe is inconclusive;
// let the normal poll loop surface the real failure.
process.stderr.write(
`molecule channel: probe ${workspaceId} returned HTTP ${resp.status} (expected 410); ` +
`cursor support unverifiable, continuing\n`
)
return 'inconclusive'
}
// ─── Register-as-poll (startup self-register) ──────────────────────────
//
// On startup, register each watched workspace with delivery_mode=poll so
// the platform's a2a_proxy short-circuits to activity_logs (PR 2 / #2353)
// instead of trying to dispatch HTTP to a URL the operator's laptop
// doesn't have. Idempotent — the upsert in /registry/register's handler
// preserves existing values; we just declare delivery_mode and the
// agent_card.
//
// Failure here is non-fatal — the polling loop still works against a
// pre-poll-configured workspace, and a transient platform 5xx shouldn't
// block channel startup. Log loudly so misconfiguration is visible.
async function registerAsPoll(workspaceId: string): Promise<void> {
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
const body = {
id: workspaceId,
delivery_mode: 'poll',
agent_card: {
name: AGENT_NAME,
description: AGENT_DESC,
},
}
let resp: Response
try {
resp = await fetch(`${PLATFORM_URL}/registry/register`, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
Origin: PLATFORM_URL,
},
body: JSON.stringify(body),
signal: AbortSignal.timeout(15_000),
})
} catch (err) {
process.stderr.write(`molecule channel: register-as-poll ${workspaceId} fetch failed: ${err}\n`)
return
}
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
process.stderr.write(
`molecule channel: register-as-poll ${workspaceId} HTTP ${resp.status}${errText.slice(0, 200)}\n`
)
return
}
// Sanity-check: the platform should echo back delivery_mode=poll.
// A push reply means an older controlplane that doesn't know about
// delivery_mode yet — log so the user can identify the mismatch.
try {
const j = (await resp.json()) as { delivery_mode?: string }
if (j.delivery_mode && j.delivery_mode !== 'poll') {
process.stderr.write(
`molecule channel: register-as-poll ${workspaceId} returned delivery_mode=${j.delivery_mode} ` +
`(expected poll). Platform may predate #2339.\n`
)
}
} catch {
// Non-JSON response. Don't fail; the 2xx already tells us the upsert
// landed, and the polling loop is the source of truth for steady-state.
}
}
// ─── Notification emission ─────────────────────────────────────────────
function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry): void {
const text = extractText(act)
// Discriminate canvas-user messages (typed in the canvas chat panel) from
// peer-agent A2A traffic. The canvas wraps user chat as JSON-RPC
// message/send with source_id=null; real peers carry their workspace_id
// in source_id. The reply tool routes differently on this — canvas_user
// → /notify (lands in the user's chat), peer_agent → /a2a (proper JSON-RPC
// response to the calling peer).
const peerId = act.source_id ?? ''
const kind: 'canvas_user' | 'peer_agent' = peerId ? 'peer_agent' : 'canvas_user'
// Per the telegram channel reference: notifications/claude/channel is the
// host's hook. content becomes the conversation turn; meta is structured
// metadata Claude can reason about (workspace_id, peer_id, ts, etc.).
// image_path / attachment_* mirror telegram's shape so the host's
// attachment handling works without a custom path.
mcp.notification({
method: 'notifications/claude/channel',
params: {
content: text,
meta: {
source: 'molecule',
kind,
workspace_id: act.workspace_id,
watching_as: workspaceId,
peer_id: peerId,
method: act.method ?? '',
activity_id: act.id,
ts: act.created_at,
},
},
}).catch(err => {
process.stderr.write(`molecule channel: failed to deliver notification for ${act.id}: ${err}\n`)
})
}
// ─── MCP server ─────────────────────────────────────────────────────────
const mcp = new Server(
{ name: 'molecule', version: '0.3.0' },
{ capabilities: { tools: {} } },
)
// Tool: reply_to_workspace ----------------------------------------------
//
// Sends a reply from one of our watched workspaces. The destination is
// picked from `peer_id`:
//
// - peer_id absent / empty → canvas-user reply via POST /workspaces/:our/notify
// (lands in the My Chat panel — what users see when
// they type in the canvas)
// - peer_id present → peer-agent A2A reply via POST /workspaces/:peer/a2a
// with a proper JSON-RPC message/send envelope
//
// The notification meta.kind tells Claude which to use; this tool just
// honors whichever peer_id the caller passes.
const ReplyArgsSchema = z.object({
workspace_id: z.string().describe(
"Watched workspace_id to reply AS (must be in MOLECULE_WORKSPACE_IDS). " +
"Defaults to the workspace whose message Claude is responding to — " +
"if there's only one watched workspace, omit this."
).optional(),
peer_id: z.string().describe(
"Workspace_id of the peer to send TO (for peer_agent inbound — " +
"use notification meta.peer_id). Omit or pass empty string to reply " +
"to the canvas user via /notify (for canvas_user inbound)."
).optional(),
text: z.string().describe('Reply text. Plain text or markdown.'),
})
async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<string> {
let { workspace_id } = args
if (!workspace_id) {
if (WORKSPACE_IDS.length === 1) workspace_id = WORKSPACE_IDS[0]
else throw new Error(
`workspace_id required when watching multiple workspaces. ` +
`Watching: ${WORKSPACE_IDS.join(', ')}`
)
}
const token = TOKEN_BY_WORKSPACE.get(workspace_id)
if (!token) {
throw new Error(
`workspace_id ${workspace_id} is not in MOLECULE_WORKSPACE_IDS. ` +
`Configured: ${WORKSPACE_IDS.join(', ')}`
)
}
const peerId = args.peer_id?.trim() ?? ''
if (!peerId) {
// Canvas-user reply — POST /workspaces/:our/notify with {message: text}.
// The platform appends to the user-facing chat panel; no JSON-RPC envelope
// because there's no peer URL on the other side, just the canvas UI.
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspace_id}/notify`, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
Origin: PLATFORM_URL,
},
body: JSON.stringify({ message: args.text }),
signal: AbortSignal.timeout(30_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`notify failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return `Replied to canvas user as ${workspace_id} via /notify.`
}
// Peer-agent A2A reply — proper JSON-RPC 2.0 envelope as the platform's
// a2a_proxy expects. Empirically (verified 2026-04-29 against workspace-
// server's ProxyA2A handler), shorthand `{parts:[...]}` gets accepted but
// the platform strips params before forwarding to the peer's URL — the
// peer then sees an envelope with `params: null` and no message text.
// Wrapping in proper JSON-RPC preserves the message all the way through.
//
// `messageId` is generated client-side; the platform doesn't require it
// but peers may use it for idempotency / dedup.
const body = {
jsonrpc: '2.0',
id: crypto.randomUUID(),
method: 'message/send',
params: {
message: {
messageId: crypto.randomUUID(),
parts: [{ type: 'text', text: args.text }],
},
},
}
const resp = await fetch(`${PLATFORM_URL}/workspaces/${peerId}/a2a`, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
'X-Source-Workspace-Id': workspace_id,
// Same-origin header for SaaS edge WAF — see pollWorkspace fetch
// for the full explanation. /workspaces/* requires it on hosted
// tenants; localhost ignores it.
Origin: PLATFORM_URL,
},
body: JSON.stringify(body),
signal: AbortSignal.timeout(30_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`reply failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return `Reply sent from ${workspace_id} to ${peerId}.`
}
// ─── Universal-tool helpers ────────────────────────────────────────────
//
// Resolves "act AS which watched workspace" for tools that take an
// optional workspace_id distinguishing the channel-side caller from the
// target. When watching exactly one workspace it's an obvious default;
// for multi-watch, the caller must specify.
function resolveWatching(asWorkspaceId?: string): { workspaceId: string; token: string } {
let workspaceId = asWorkspaceId
if (!workspaceId) {
if (WORKSPACE_IDS.length === 1) workspaceId = WORKSPACE_IDS[0]
else throw new Error(
`_as_workspace required when watching multiple workspaces. ` +
`Watching: ${WORKSPACE_IDS.join(', ')}`
)
}
const token = TOKEN_BY_WORKSPACE.get(workspaceId)
if (!token) {
throw new Error(
`${workspaceId} is not in MOLECULE_WORKSPACE_IDS. ` +
`Configured: ${WORKSPACE_IDS.join(', ')}`
)
}
return { workspaceId, token }
}
// Standard auth headers shared by every platform call. Origin is required
// by the SaaS edge WAF — see pollWorkspace's fetch for the full story.
function platformHeaders(token: string, extra: Record<string, string> = {}): Record<string, string> {
return {
Authorization: `Bearer ${token}`,
Origin: PLATFORM_URL!,
...extra,
}
}
// Tool: list_peers ------------------------------------------------------
//
// Returns the watched workspace's view of the team — siblings, children,
// parent — so Claude can answer "who are my peers?" without a separate
// HTTP detour. Mirrors the registry endpoint backed by GET /registry/:id/peers
// (workspace-server/internal/handlers/discovery.go:Peers).
const ListPeersArgsSchema = z.object({
workspace_id: z.string().describe(
"Watched workspace_id to query peers FOR. Omit if only one watched."
).optional(),
q: z.string().describe(
"Optional case-insensitive substring filter on peer name or role."
).optional(),
})
interface Peer {
id: string
name: string
role: string | null
tier: number | null
status: string
url: string
parent_id: string | null
active_tasks: number
agent_card?: unknown
}
async function listPeers(args: z.infer<typeof ListPeersArgsSchema>): Promise<Peer[]> {
let { workspace_id } = args
if (!workspace_id) {
if (WORKSPACE_IDS.length === 1) workspace_id = WORKSPACE_IDS[0]
else throw new Error(
`workspace_id required when watching multiple workspaces. ` +
`Watching: ${WORKSPACE_IDS.join(', ')}`
)
}
const token = TOKEN_BY_WORKSPACE.get(workspace_id)
if (!token) {
throw new Error(
`workspace_id ${workspace_id} is not in MOLECULE_WORKSPACE_IDS. ` +
`Configured: ${WORKSPACE_IDS.join(', ')}`
)
}
const url = new URL(`${PLATFORM_URL}/registry/${workspace_id}/peers`)
if (args.q) url.searchParams.set('q', args.q)
const resp = await fetch(url, {
headers: {
Authorization: `Bearer ${token}`,
Origin: PLATFORM_URL,
},
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`list_peers failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return (await resp.json()) as Peer[]
}
// Tool: get_workspace_info ---------------------------------------------
//
// Mirrors the universal `get_workspace_info` tool — returns the watched
// workspace's own identity (id, name, role, tier, parent, status, agent_card).
// Backed by GET /workspaces/:id (workspace-server's WorkspaceHandler.Get).
const GetWorkspaceInfoArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to introspect (omit if only one watched)."
).optional(),
})
async function getWorkspaceInfo(args: z.infer<typeof GetWorkspaceInfoArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}`, {
headers: platformHeaders(token),
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`get_workspace_info failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// Tool: send_message_to_user -------------------------------------------
//
// Mirrors the universal `send_message_to_user` tool — POST /workspaces/:id/notify.
// Lands as a chat bubble in the canvas My Chat panel. The universal tool
// also supports `attachments` (file paths inside the workspace container)
// uploaded via /chat/uploads; this channel runs on the user's local FS and
// uploads from there. Same contract — paths are absolute on whichever side
// the tool runs from.
const SendMessageToUserArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to send AS (omit if only one watched)."
).optional(),
message: z.string().describe(
"Caption text for the chat bubble. Required even with attachments — " +
"set to a short label like 'Here's the build:' or 'Done — see attached.'\n\n" +
"DO NOT paste file URLs in this string. Files MUST go through `attachments` " +
"so they render as a clickable download chip."
),
attachments: z.array(z.string()).describe(
"Absolute file paths on the user's local machine (e.g. ['/tmp/build.zip']). " +
"Each gets uploaded via /chat/uploads and surfaces as a download chip in " +
"the canvas. 25 MB per file cap."
).optional(),
})
async function sendMessageToUser(args: z.infer<typeof SendMessageToUserArgsSchema>): Promise<string> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
let attachmentRefs: unknown[] = []
if (args.attachments && args.attachments.length > 0) {
// Multipart upload — same shape as workspace/a2a_tools.py:_upload_chat_files.
// The platform stages files under /workspace/.molecule/chat-uploads (a
// canvas "allowed root") and returns metadata the notify body references.
const form = new FormData()
for (const path of args.attachments) {
const file = Bun.file(path)
if (!(await file.exists())) {
throw new Error(`attachment not found: ${path}`)
}
// Bun.file is a Blob; FormData accepts Blob with filename.
form.append('files', file, path.split('/').pop() ?? 'attachment')
}
const upResp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/chat/uploads`, {
method: 'POST',
headers: platformHeaders(token),
body: form,
signal: AbortSignal.timeout(60_000),
})
if (!upResp.ok) {
const errText = await upResp.text().catch(() => '')
throw new Error(`chat/uploads failed: HTTP ${upResp.status}${errText.slice(0, 200)}`)
}
const upJson = (await upResp.json()) as { files?: unknown[] }
attachmentRefs = upJson.files ?? []
}
const body: Record<string, unknown> = { message: args.message }
if (attachmentRefs.length > 0) body.attachments = attachmentRefs
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/notify`, {
method: 'POST',
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
body: JSON.stringify(body),
signal: AbortSignal.timeout(30_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`notify failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return `Sent to canvas user as ${workspaceId}${attachmentRefs.length > 0 ? ` with ${attachmentRefs.length} attachment(s)` : ''}.`
}
// Tool: delegate_task (sync) -------------------------------------------
//
// Mirrors the universal `delegate_task` tool — sends an A2A message to a
// peer and waits inline for the response. POSTs to /workspaces/:peer/a2a;
// the platform's a2a_proxy forwards to the peer's URL and returns the
// peer's reply body. Use for QUICK questions; for long-running work use
// delegate_task_async + check_task_status.
const DelegateTaskArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to send AS (omit if only one watched)."
).optional(),
workspace_id: z.string().describe("Target peer workspace ID (from list_peers)."),
task: z.string().describe("Task description to send to the peer."),
})
async function delegateTask(args: z.infer<typeof DelegateTaskArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
if (!args.workspace_id) throw new Error('workspace_id (target peer) is required')
if (!args.task) throw new Error('task is required')
const body = {
jsonrpc: '2.0',
id: crypto.randomUUID(),
method: 'message/send',
params: {
message: {
messageId: crypto.randomUUID(),
parts: [{ type: 'text', text: args.task }],
},
},
}
// 60s timeout because sync delegation waits for the peer to actually
// produce a response. Long-running peer work should use the async path.
const resp = await fetch(`${PLATFORM_URL}/workspaces/${args.workspace_id}/a2a`, {
method: 'POST',
headers: platformHeaders(token, {
'Content-Type': 'application/json',
'X-Source-Workspace-Id': workspaceId,
}),
body: JSON.stringify(body),
signal: AbortSignal.timeout(60_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`delegate_task failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// Tool: delegate_task_async --------------------------------------------
//
// Mirrors the universal `delegate_task_async` tool — POST /workspaces/:self/delegate
// with target_id + task + idempotency_key. Returns 202 with delegation_id;
// the platform runs the A2A round-trip in the background and stores the
// result in the delegations table. Poll via check_task_status.
async function sha256Hex(s: string): Promise<string> {
const buf = await crypto.subtle.digest('SHA-256', new TextEncoder().encode(s))
return Array.from(new Uint8Array(buf)).map(b => b.toString(16).padStart(2, '0')).join('')
}
const DelegateTaskAsyncArgsSchema = DelegateTaskArgsSchema
async function delegateTaskAsync(args: z.infer<typeof DelegateTaskAsyncArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
if (!args.workspace_id) throw new Error('workspace_id (target peer) is required')
if (!args.task) throw new Error('task is required')
// Idempotency key: SHA-256 of (target, task) so a restart firing the same
// delegation gets the existing delegation_id back instead of creating a
// duplicate (mirrors workspace/a2a_tools.py — fixes #1456 there).
const idem = (await sha256Hex(`${args.workspace_id}:${args.task}`)).slice(0, 32)
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/delegate`, {
method: 'POST',
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
body: JSON.stringify({
target_id: args.workspace_id,
task: args.task,
idempotency_key: idem,
}),
signal: AbortSignal.timeout(15_000),
})
if (resp.status !== 202 && !resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`delegate_task_async failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// Tool: check_task_status ----------------------------------------------
//
// Mirrors the universal `check_task_status` tool — GET /workspaces/:self/delegations,
// optionally filtered by delegation_id. Returns peer-reply summary + status
// (pending / in_progress / queued / completed / failed).
const CheckTaskStatusArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id whose delegations to inspect (omit if only one watched)."
).optional(),
task_id: z.string().describe(
"delegation_id returned by delegate_task_async. Omit to list recent delegations."
).optional(),
})
async function checkTaskStatus(args: z.infer<typeof CheckTaskStatusArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/delegations`, {
headers: platformHeaders(token),
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`check_task_status failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
const all = (await resp.json()) as Array<{ delegation_id?: string }>
if (args.task_id) {
const match = all.find(d => d.delegation_id === args.task_id)
return match ?? { status: 'not_found', delegation_id: args.task_id }
}
return { delegations: all.slice(0, 10), count: all.length }
}
// Tool: commit_memory --------------------------------------------------
//
// Mirrors the universal `commit_memory` tool — POST /workspaces/:self/memories.
// Persists across sessions. RBAC + scope (LOCAL/TEAM/GLOBAL) enforcement
// is platform-side; this tool just plumbs the call.
const CommitMemoryArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to commit AS (omit if only one watched)."
).optional(),
content: z.string().describe("What to remember — be specific."),
scope: z.enum(['LOCAL', 'TEAM', 'GLOBAL']).describe(
"Memory scope (default LOCAL)."
).optional(),
})
async function commitMemory(args: z.infer<typeof CommitMemoryArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
if (!args.content) throw new Error('content is required')
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/memories`, {
method: 'POST',
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
body: JSON.stringify({
content: args.content,
scope: (args.scope ?? 'LOCAL').toUpperCase(),
// Platform cross-validates this against the bearer for namespace
// isolation (workspace-server fix for GH#1610).
workspace_id: workspaceId,
}),
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`commit_memory failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// Tool: recall_memory --------------------------------------------------
//
// Mirrors the universal `recall_memory` tool — GET /workspaces/:self/memories.
// Returns rows accessible by scope; empty query returns all accessible.
const RecallMemoryArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to recall FROM (omit if only one watched)."
).optional(),
query: z.string().describe("Search query (empty returns all).").optional(),
scope: z.enum(['LOCAL', 'TEAM', 'GLOBAL', '']).describe(
"Filter by scope (empty = all accessible)."
).optional(),
})
async function recallMemory(args: z.infer<typeof RecallMemoryArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/memories`)
url.searchParams.set('workspace_id', workspaceId)
if (args.query) url.searchParams.set('q', args.query)
if (args.scope) url.searchParams.set('scope', args.scope.toUpperCase())
const resp = await fetch(url, {
headers: platformHeaders(token),
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`recall_memory failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// The tool surface mirrors workspace/platform_tools/registry.py — same
// names, same input shapes, same semantics — so an external agent driven
// through this channel has parity with an in-container agent driven by the
// universal MCP. The one channel-specific addition is `_as_workspace`,
// which disambiguates which watched workspace the tool acts AS when this
// MCP is configured to watch more than one. Underscore-prefixed so it
// can't collide with the universal contract.
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'reply_to_workspace',
description:
'Reply to whoever sent the most recent inbound message. Pass peer_id ' +
'from notification meta.peer_id for peer_agent inbound (routes via /a2a); ' +
'omit peer_id (or pass empty string) for canvas_user inbound (routes via ' +
'/notify into the My Chat panel). Check meta.kind on the notification to ' +
'pick the right form.',
inputSchema: {
type: 'object',
properties: {
workspace_id: { type: 'string', description: 'Watched workspace_id to reply as (omit if only one watched).' },
peer_id: {
type: 'string',
description:
'Workspace_id of the peer to A2A-reply to (from notification meta.peer_id). ' +
'Omit or pass empty string to /notify the canvas user instead.',
},
text: { type: 'string', description: 'Reply text (plain text or markdown).' },
},
required: ['text'],
},
},
{
name: 'delegate_task',
description:
'Delegate a task to a peer workspace via A2A and WAIT for the response (synchronous). ' +
'Use for QUICK questions and small sub-tasks; for long-running work use ' +
'delegate_task_async + check_task_status so this session does not block.',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
workspace_id: { type: 'string', description: 'Target peer workspace ID (from list_peers).' },
task: { type: 'string', description: 'Task description to send to the peer.' },
},
required: ['workspace_id', 'task'],
},
},
{
name: 'delegate_task_async',
description:
'Send a task to a peer and return immediately with a task_id (non-blocking). ' +
'Poll with check_task_status. The platform A2A queue handles delivery + retries.',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
workspace_id: { type: 'string', description: 'Target peer workspace ID (from list_peers).' },
task: { type: 'string', description: 'Task description to send to the peer.' },
},
required: ['workspace_id', 'task'],
},
},
{
name: 'check_task_status',
description:
'Poll the status of a task started with delegate_task_async; returns the result when done. ' +
'Statuses: pending/in_progress (peer working — wait), queued (peer busy with prior task — ' +
'do NOT retry), completed (result available), failed (real error).',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id whose delegations to inspect (omit if only one watched).' },
task_id: { type: 'string', description: 'task_id (delegation_id) returned by delegate_task_async. Omit to list recent.' },
},
},
},
{
name: 'list_peers',
description:
'List the watched workspace\'s peer agents (siblings, children, parent) as registered ' +
'in the canvas. Use first when you need to delegate but don\'t know the target\'s ID. ' +
'Access control is enforced — you only see peers your workspace can reach.',
inputSchema: {
type: 'object',
properties: {
workspace_id: { type: 'string', description: 'Watched workspace_id to query peers for (omit if only one watched).' },
q: { type: 'string', description: 'Optional case-insensitive substring filter on peer name or role.' },
},
},
},
{
name: 'get_workspace_info',
description:
'Get the watched workspace\'s own info — id, name, role, tier, parent, status, agent_card. ' +
'Use to introspect identity (e.g. before reporting back to the user, or to determine if ' +
'this is a tier-0 root that can write GLOBAL memory).',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to introspect (omit if only one watched).' },
},
},
},
{
name: 'send_message_to_user',
description:
'Send a message to the user\'s canvas chat — pushed instantly via WebSocket. Use to ' +
'(1) acknowledge a task immediately, (2) post mid-flight progress updates, (3) deliver ' +
'follow-up results, (4) attach files via the `attachments` field. NEVER paste file URLs ' +
'in `message`; always pass absolute paths in `attachments` so the platform serves them ' +
'as download chips (works on SaaS where external file hosts are unreachable).',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
message: { type: 'string', description: 'Caption text for the chat bubble. Required even with attachments.' },
attachments: {
type: 'array',
items: { type: 'string' },
description: 'Absolute file paths on the user\'s local machine. Each is uploaded via /chat/uploads and surfaces as a download chip. 25 MB cap per file.',
},
},
required: ['message'],
},
},
{
name: 'commit_memory',
description:
'Save a fact to persistent memory; survives across sessions and restarts. ' +
'Scopes: LOCAL (private to this workspace), TEAM (shared with parent + siblings), ' +
'GLOBAL (entire org — only tier-0 roots can write).',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to commit AS (omit if only one watched).' },
content: { type: 'string', description: 'What to remember — be specific.' },
scope: { type: 'string', enum: ['LOCAL', 'TEAM', 'GLOBAL'], description: 'Memory scope (default LOCAL).' },
},
required: ['content'],
},
},
{
name: 'recall_memory',
description:
'Search persistent memory; returns matching LOCAL + TEAM + GLOBAL rows. ' +
'Empty query returns ALL accessible memories — cheap and avoids missing rows that ' +
'don\'t match a narrow keyword.',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to recall FROM (omit if only one watched).' },
query: { type: 'string', description: 'Search query (empty returns all).' },
scope: { type: 'string', enum: ['LOCAL', 'TEAM', 'GLOBAL', ''], description: 'Filter by scope (empty = all accessible).' },
},
},
},
],
}))
mcp.setRequestHandler(CallToolRequestSchema, async req => {
const args = req.params.arguments ?? {}
switch (req.params.name) {
case 'reply_to_workspace': {
const result = await replyToWorkspace(ReplyArgsSchema.parse(args))
return { content: [{ type: 'text', text: result }] }
}
case 'delegate_task': {
const result = await delegateTask(DelegateTaskArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
case 'delegate_task_async': {
const result = await delegateTaskAsync(DelegateTaskAsyncArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
case 'check_task_status': {
const result = await checkTaskStatus(CheckTaskStatusArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
case 'list_peers': {
const peers = await listPeers(ListPeersArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(peers, null, 2) }] }
}
case 'get_workspace_info': {
const info = await getWorkspaceInfo(GetWorkspaceInfoArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(info, null, 2) }] }
}
case 'send_message_to_user': {
const result = await sendMessageToUser(SendMessageToUserArgsSchema.parse(args))
return { content: [{ type: 'text', text: result }] }
}
case 'commit_memory': {
const result = await commitMemory(CommitMemoryArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
case 'recall_memory': {
const result = await recallMemory(RecallMemoryArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
default:
throw new Error(`unknown tool: ${req.params.name}`)
}
})
// ─── Boot ───────────────────────────────────────────────────────────────
loadCursors()
// Compat probe FIRST — before we open the MCP transport or self-register
// any workspaces. v0.2.1 had this probe AFTER mcp.connect+registerAsPoll,
// which had two bugs:
// 1. mcp.connect already finished the initialize handshake, so a
// probe-failure exit looked like "MCP server crashed mid-session"
// to Claude Code (which swallows the stderr explanation) instead of
// the cleaner "server failed to start" with the upgrade message.
// 2. registerAsPoll() may have already mutated the platform's
// delivery_mode for a workspace whose workspace-server can't honor
// poll, leaving the workspace in a broken state if we then exit.
// Probing first is purely a startup-ordering fix; the probe semantics
// (410 → ok, 200 → too_old, anything else → inconclusive) are unchanged.
//
// Probes run in parallel (allSettled) — sequentially they were N × 15s
// at worst, which adds up for multi-workspace channels. Order doesn't
// matter for the verdict; we only care if any one came back too_old.
{
const results = await Promise.allSettled(
WORKSPACE_IDS.map(id => probeCursorSupport(id).then(r => ({ id, r }))),
)
let anyTooOld = false
for (const settled of results) {
if (settled.status !== 'fulfilled') continue
const { id, r } = settled.value
if (r === 'too_old') {
anyTooOld = true
process.stderr.write(
`molecule channel: workspace ${id} on a platform that predates ` +
`since_id cursor support (Molecule-AI/molecule-core PR #2354).\n` +
` Symptom would be: every poll re-delivers all recent activity as if it were new.\n` +
` Fix: upgrade workspace-server to a build with /activity ?since_id=… support.\n`
)
}
}
if (anyTooOld) {
process.stderr.write(
`molecule channel: refusing to start in poll mode against an older platform. ` +
`Pin MOLECULE_PLATFORM_URL to an upgraded tenant or downgrade to plugin v0.1.\n`
)
// exit triggers the 'exit' listener, which unlinks the PID file.
process.exit(2)
}
}
const transport = new StdioServerTransport()
await mcp.connect(transport)
// Self-register each workspace as poll-mode BEFORE the first poll fires.
// Sequenced (not Promise.all) so failures are surfaced one at a time and
// the operator can spot which workspace's token is bad.
if (AUTO_REGISTER_POLL) {
for (const id of WORKSPACE_IDS) {
await registerAsPoll(id)
}
}
process.stderr.write(
`molecule channel: connected — watching ${WORKSPACE_IDS.length} workspace(s) at ${PLATFORM_URL}\n` +
` workspaces: ${WORKSPACE_IDS.join(', ')}\n` +
` delivery_mode=poll cursor=${CURSOR_FILE} auto_register=${AUTO_REGISTER_POLL}\n` +
` poll: every ${POLL_INTERVAL_MS}ms (cursor-based; ${POLL_WINDOW_SECS}s window only used for first-run seed)\n`
)
// Stagger initial polls slightly so N-workspace watchers don't all hit the
// platform at the same instant on every tick.
WORKSPACE_IDS.forEach((id, i) => {
setTimeout(() => {
void pollWorkspace(id, mcp)
setInterval(() => void pollWorkspace(id, mcp), POLL_INTERVAL_MS).unref()
}, i * 500)
})
// Clean shutdown — fire-and-forget a "disconnected" notice on each watched
// workspace's A2A so peers don't sit waiting on a silent channel.
const shutdown = (sig: string) => {
process.stderr.write(`molecule channel: ${sig} — shutting down\n`)
process.exit(0)
}
process.on('SIGINT', () => shutdown('SIGINT'))
process.on('SIGTERM', () => shutdown('SIGTERM'))