Follow-up to molecule-core#2449 (which taught the platform to return 410 Gone for status='removed'). Without this branch the operator sees `get_workspace_info failed: HTTP 410 — workspace removed` and has to guess what to do — exactly the 2026-04-30 silent-fail UX hit on the hongmingwang tenant. The new code path: 1. Detect resp.status === 410 explicitly 2. Best-effort parse the body for id / removed_at / hint 3. Throw `Workspace <id> was deleted on the platform at <ts>. <hint>` The 410-message-formatting is extracted into a pure `formatRemovedWorkspaceError` helper so it can be unit-tested without mocking fetch + resolveWatching. Four new bun:test cases: - prefers platform-supplied id, removed_at, hint - falls back to local workspaceId + default hint when body is empty - tolerates null/undefined body (unparseable response) - omits ' at <ts>' clause when removed_at is missing Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1284 lines
55 KiB
TypeScript
1284 lines
55 KiB
TypeScript
#!/usr/bin/env bun
|
||
/**
|
||
* Molecule AI channel for Claude Code.
|
||
*
|
||
* MCP server that bridges Molecule A2A traffic into the active Claude Code
|
||
* session and routes Claude's replies back through Molecule's A2A endpoints.
|
||
*
|
||
* Inbound (A2A → Claude turn): polls each watched workspace's
|
||
* GET /workspaces/:id/activity?since_secs=N&type=a2a_receive
|
||
* and emits an MCP `notifications/claude/channel` for each new event.
|
||
* Polling (vs push) is the default because it works through every NAT/firewall
|
||
* with zero infra — no tunnel required. For production setups with a public
|
||
* inbound URL, see #2 in the README ("push mode", future).
|
||
*
|
||
* Outbound (Claude reply → A2A): exposes the `reply_to_workspace` and
|
||
* `start_workspace_chat` MCP tools that POST to /workspaces/:id/a2a.
|
||
*
|
||
* State lives in ~/.claude/channels/molecule/:
|
||
* - access.json workspace allowlist + per-workspace auth
|
||
* - .env MOLECULE_PLATFORM_URL + tokens (chmod 600)
|
||
* - bot.pid singleton lock
|
||
* - inbox/ file attachments downloaded from peers
|
||
*
|
||
* Multi-workspace: declare MOLECULE_WORKSPACE_IDS as a comma-separated list;
|
||
* each id polls independently. Auth is per-workspace via
|
||
* MOLECULE_WORKSPACE_TOKENS (same order, comma-separated).
|
||
*
|
||
* Cancellation: SIGTERM/SIGINT cleanly drains in-flight pollers + posts a
|
||
* single "channel disconnecting" line back to each watched workspace so
|
||
* peers see a deliberate close, not a silent timeout.
|
||
*/
|
||
|
||
import { Server } from '@modelcontextprotocol/sdk/server/index.js'
|
||
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
|
||
import {
|
||
ListToolsRequestSchema,
|
||
CallToolRequestSchema,
|
||
} from '@modelcontextprotocol/sdk/types.js'
|
||
import { z } from 'zod'
|
||
import { readFileSync, writeFileSync, mkdirSync, chmodSync, existsSync, renameSync, unlinkSync } from 'fs'
|
||
import { homedir } from 'os'
|
||
import { join } from 'path'
|
||
import { extractText, type ActivityEntry } from './extract-text.ts'
|
||
|
||
// ─── Config ─────────────────────────────────────────────────────────────
|
||
|
||
const STATE_DIR = process.env.MOLECULE_STATE_DIR ?? join(homedir(), '.claude', 'channels', 'molecule')
|
||
const ENV_FILE = join(STATE_DIR, '.env')
|
||
const PID_FILE = join(STATE_DIR, 'bot.pid')
|
||
const CURSOR_FILE = join(STATE_DIR, 'cursor.json')
|
||
|
||
// Load ~/.claude/channels/molecule/.env into process.env. Real env wins.
|
||
// Plugin-spawned servers don't get an env block — this is where tokens live.
|
||
mkdirSync(STATE_DIR, { recursive: true, mode: 0o700 })
|
||
try {
|
||
// Token is a credential — lock to owner. No-op on Windows (would need ACLs).
|
||
chmodSync(ENV_FILE, 0o600)
|
||
for (const line of readFileSync(ENV_FILE, 'utf8').split('\n')) {
|
||
const m = line.match(/^([A-Z_][A-Z0-9_]*)=(.*)$/)
|
||
if (m && process.env[m[1]] === undefined) process.env[m[1]] = m[2]
|
||
}
|
||
} catch {
|
||
// Missing .env on first run is fine; we'll fail loudly below if required vars are absent.
|
||
}
|
||
|
||
const PLATFORM_URL = process.env.MOLECULE_PLATFORM_URL?.replace(/\/$/, '')
|
||
const WORKSPACE_IDS = (process.env.MOLECULE_WORKSPACE_IDS ?? '')
|
||
.split(',').map(s => s.trim()).filter(Boolean)
|
||
const WORKSPACE_TOKENS = (process.env.MOLECULE_WORKSPACE_TOKENS ?? '')
|
||
.split(',').map(s => s.trim()).filter(Boolean)
|
||
const POLL_INTERVAL_MS = parseInt(process.env.MOLECULE_POLL_INTERVAL_MS ?? '5000', 10)
|
||
// POLL_WINDOW_SECS is only used for the initial "watch from now" cursor seed
|
||
// — after that, the cursor (since_id) drives every subsequent poll. Older
|
||
// versions of the plugin used since_secs as the primary filter; v0.2 keeps
|
||
// the env var for compat but its meaning is narrower.
|
||
const POLL_WINDOW_SECS = parseInt(process.env.MOLECULE_POLL_WINDOW_SECS ?? '30', 10)
|
||
// MOLECULE_AGENT_NAME / MOLECULE_AGENT_DESC populate the agent_card the plugin
|
||
// posts to /registry/register on startup. Both have sane defaults — set them
|
||
// only when you want the canvas tab to show something specific.
|
||
const AGENT_NAME = process.env.MOLECULE_AGENT_NAME ?? 'Claude Code (channel)'
|
||
const AGENT_DESC = process.env.MOLECULE_AGENT_DESC ??
|
||
'Local Claude Code session bridged via molecule-mcp-claude-channel'
|
||
// MOLECULE_AUTO_REGISTER_POLL controls the startup auto-register behavior.
|
||
// Default is "yes" — the plugin's whole point is to make a poll-mode
|
||
// workspace work without manual canvas configuration. Set to "0" / "false"
|
||
// if you've already configured the workspace another way and don't want
|
||
// the plugin overwriting agent_card on every restart.
|
||
const AUTO_REGISTER_POLL = !['0', 'false', 'no'].includes(
|
||
(process.env.MOLECULE_AUTO_REGISTER_POLL ?? 'true').toLowerCase()
|
||
)
|
||
|
||
if (!PLATFORM_URL || WORKSPACE_IDS.length === 0 || WORKSPACE_TOKENS.length === 0) {
|
||
process.stderr.write(
|
||
`molecule channel: required config missing\n` +
|
||
` set in ${ENV_FILE}\n` +
|
||
` format:\n` +
|
||
` MOLECULE_PLATFORM_URL=https://your-tenant.staging.moleculesai.app\n` +
|
||
` MOLECULE_WORKSPACE_IDS=ws-uuid-1,ws-uuid-2\n` +
|
||
` MOLECULE_WORKSPACE_TOKENS=tok-1,tok-2\n` +
|
||
` optional:\n` +
|
||
` MOLECULE_POLL_INTERVAL_MS=5000\n` +
|
||
` MOLECULE_POLL_WINDOW_SECS=30\n`
|
||
)
|
||
process.exit(1)
|
||
}
|
||
if (WORKSPACE_IDS.length !== WORKSPACE_TOKENS.length) {
|
||
process.stderr.write(
|
||
`molecule channel: MOLECULE_WORKSPACE_IDS and MOLECULE_WORKSPACE_TOKENS must have ` +
|
||
`the same number of entries (got ${WORKSPACE_IDS.length} ids vs ${WORKSPACE_TOKENS.length} tokens)\n`
|
||
)
|
||
process.exit(1)
|
||
}
|
||
|
||
const TOKEN_BY_WORKSPACE = new Map<string, string>(
|
||
WORKSPACE_IDS.map((id, i) => [id, WORKSPACE_TOKENS[i]])
|
||
)
|
||
|
||
// ─── Singleton lock ─────────────────────────────────────────────────────
|
||
//
|
||
// One channel server per host — multiple Claude sessions polling the same
|
||
// workspaces would race the dedup state and double-deliver. If a previous
|
||
// session crashed (SIGKILL, terminal closed) its server can survive as an
|
||
// orphan; kill it before we start.
|
||
|
||
try {
|
||
const stale = parseInt(readFileSync(PID_FILE, 'utf8'), 10)
|
||
if (stale > 1 && stale !== process.pid) {
|
||
process.kill(stale, 0) // throws if dead
|
||
process.stderr.write(`molecule channel: replacing stale poller pid=${stale}\n`)
|
||
process.kill(stale, 'SIGTERM')
|
||
}
|
||
} catch {}
|
||
writeFileSync(PID_FILE, String(process.pid))
|
||
|
||
// Unlink the PID file on every exit path — including process.exit(N)
|
||
// from the cursor-support probe (v0.2.1) which doesn't go through the
|
||
// SIGINT/SIGTERM handlers. Without this, a non-clean exit leaves a
|
||
// stale pid in PID_FILE pointing at a dead pid; the next launch's
|
||
// `process.kill(stale, 'SIGTERM')` (above) would deliver the signal to
|
||
// whatever unrelated process now owns that PID — exactly the cross-
|
||
// process-kill hazard the singleton lock exists to prevent. exit
|
||
// listeners only run synchronous code; unlinkSync is the right tool.
|
||
process.on('exit', () => {
|
||
try {
|
||
const owned = parseInt(readFileSync(PID_FILE, 'utf8'), 10)
|
||
if (owned === process.pid) unlinkSync(PID_FILE)
|
||
} catch {
|
||
// Already gone, or another process took ownership — leave it alone.
|
||
}
|
||
})
|
||
|
||
// Last-resort safety net — without these the process dies silently on any
|
||
// unhandled promise rejection. With them it logs and keeps serving tools.
|
||
process.on('unhandledRejection', err => {
|
||
process.stderr.write(`molecule channel: unhandled rejection: ${err}\n`)
|
||
})
|
||
process.on('uncaughtException', err => {
|
||
process.stderr.write(`molecule channel: uncaught exception: ${err}\n`)
|
||
})
|
||
|
||
// ─── Activity polling (inbound) ─────────────────────────────────────────
|
||
//
|
||
// One independent poll loop per watched workspace. Each loop tracks the
|
||
// max activity_id it has seen so far; on each tick it queries
|
||
// GET /workspaces/:id/activity?since_secs=POLL_WINDOW_SECS&type=a2a_receive
|
||
// and emits an MCP notification for any activity whose id is new.
|
||
//
|
||
// `since_secs` is wider than the poll interval (30s vs 5s by default) so a
|
||
// single missed tick (transient network blip) doesn't lose messages — the
|
||
// next tick re-fetches the overlap window and the seen-id dedup filters it.
|
||
//
|
||
// activity_logs is paged out at 30 days, so an honest seen-id set never
|
||
// grows unbounded; new sessions start fresh.
|
||
|
||
// ActivityEntry lives in extract-text.ts (imported above) so unit
|
||
// tests can import the type + helper without triggering server.ts's
|
||
// boot-time side-effects (cursor load, MCP transport connect).
|
||
|
||
// ─── Cursor persistence ────────────────────────────────────────────────
|
||
//
|
||
// v0.2 switches from the v0.1 since_secs+seenIds scheme to a Telegram-style
|
||
// since_id cursor. The cursor is the activity_logs.id of the last event
|
||
// this plugin successfully delivered to Claude. Server returns events
|
||
// strictly after that id in ASC order, so we never miss or replay.
|
||
//
|
||
// Persisted to ${CURSOR_FILE} as a JSON object keyed by workspace_id.
|
||
// Atomic write via temp + rename so a crash mid-write can't corrupt the
|
||
// file (the previous cursor stays valid; worst case is a few replays
|
||
// after the crash, which still beats the v0.1 30-second time-window).
|
||
//
|
||
// Schema: { "ws-uuid-1": "act-uuid-X", "ws-uuid-2": "act-uuid-Y", ... }
|
||
// Missing key = "first run" → seeds from most-recent without processing.
|
||
// 410 from server = cursor stale → drop key, re-seed on next tick.
|
||
|
||
const cursors = new Map<string, string>()
|
||
|
||
function loadCursors(): void {
|
||
if (!existsSync(CURSOR_FILE)) return
|
||
try {
|
||
const raw = readFileSync(CURSOR_FILE, 'utf8')
|
||
const parsed = JSON.parse(raw) as Record<string, unknown>
|
||
for (const [k, v] of Object.entries(parsed)) {
|
||
if (typeof v === 'string' && v.length > 0) cursors.set(k, v)
|
||
}
|
||
} catch (err) {
|
||
// Corrupt cursor file = treat as no cursors. Worst case: each watched
|
||
// workspace re-seeds from now on the next tick (no replay, no message
|
||
// loss for events arriving AFTER the seed). Don't fail-fast here —
|
||
// a poller that refuses to start because of one bad file is more
|
||
// annoying than the recovery cost.
|
||
process.stderr.write(`molecule channel: cursor file unreadable (${err}); starting fresh\n`)
|
||
}
|
||
}
|
||
|
||
function saveCursors(): void {
|
||
const obj: Record<string, string> = {}
|
||
for (const [k, v] of cursors) obj[k] = v
|
||
const tmp = `${CURSOR_FILE}.tmp.${process.pid}`
|
||
try {
|
||
writeFileSync(tmp, JSON.stringify(obj, null, 2), { mode: 0o600 })
|
||
renameSync(tmp, CURSOR_FILE)
|
||
} catch (err) {
|
||
// Cursor write failure is recoverable (next successful poll re-saves);
|
||
// log on stderr so the user sees disk-full / readonly-fs early.
|
||
process.stderr.write(`molecule channel: cursor save failed: ${err}\n`)
|
||
}
|
||
}
|
||
|
||
async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
|
||
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
|
||
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/activity`)
|
||
url.searchParams.set('type', 'a2a_receive')
|
||
url.searchParams.set('limit', '100')
|
||
|
||
const cursor = cursors.get(workspaceId)
|
||
if (cursor) {
|
||
// Steady-state: server returns rows strictly after cursor in ASC order.
|
||
url.searchParams.set('since_id', cursor)
|
||
} else {
|
||
// First run for this workspace — deliver every event in the POLL_WINDOW_SECS
|
||
// backfill window, then advance the cursor past the newest. The previous
|
||
// policy was seed-then-skip on the assumption that pre-session events
|
||
// were "out of context", but operators routinely restart Claude Code
|
||
// mid-conversation and EXPECT the queued message to be delivered (otherwise
|
||
// the user typed something, restarted to enable replies, and got silence
|
||
// — exactly the friction this channel is supposed to remove).
|
||
//
|
||
// Backfill is bounded by POLL_WINDOW_SECS so a long-idle restart doesn't
|
||
// replay weeks of conversation. Set POLL_WINDOW_SECS=0 to opt out and
|
||
// restore the old skip-on-cold-start behavior.
|
||
url.searchParams.set('since_secs', String(POLL_WINDOW_SECS))
|
||
}
|
||
|
||
let resp: Response
|
||
try {
|
||
resp = await fetch(url, {
|
||
headers: {
|
||
Authorization: `Bearer ${token}`,
|
||
// Same-origin header — required by the tenant's edge WAF on hosted
|
||
// SaaS deployments. Without it the WAF rewrites the request and
|
||
// /workspaces/* returns an empty 404 (it's silently routed to the
|
||
// canvas Next.js, which has no /workspaces page). Node/Bun fetch
|
||
// doesn't auto-set Origin (that's a browser-only concern), so we
|
||
// set it explicitly to PLATFORM_URL — the only origin the bearer
|
||
// is valid against anyway, so no risk of leaking it elsewhere.
|
||
Origin: PLATFORM_URL,
|
||
},
|
||
signal: AbortSignal.timeout(10_000),
|
||
})
|
||
} catch (err) {
|
||
process.stderr.write(`molecule channel: poll ${workspaceId} fetch failed: ${err}\n`)
|
||
return
|
||
}
|
||
|
||
if (resp.status === 410) {
|
||
// Cursor row is gone (pruned, or never existed if the env var was
|
||
// hand-edited). Drop the cursor; next tick re-seeds from most-recent.
|
||
process.stderr.write(`molecule channel: poll ${workspaceId} cursor stale (410) — re-seeding\n`)
|
||
cursors.delete(workspaceId)
|
||
saveCursors()
|
||
return
|
||
}
|
||
if (!resp.ok) {
|
||
// 401/403 = bad token; 404 = workspace doesn't exist; 5xx = transient.
|
||
// Surface 4xx on stderr so the user sees auth/config issues immediately.
|
||
if (resp.status >= 400 && resp.status < 500) {
|
||
process.stderr.write(
|
||
`molecule channel: poll ${workspaceId} returned ${resp.status} — ` +
|
||
`check MOLECULE_WORKSPACE_TOKENS / MOLECULE_WORKSPACE_IDS in ${ENV_FILE}\n`
|
||
)
|
||
}
|
||
return
|
||
}
|
||
let activities: ActivityEntry[]
|
||
try {
|
||
activities = (await resp.json()) as ActivityEntry[]
|
||
} catch (err) {
|
||
process.stderr.write(`molecule channel: poll ${workspaceId} parse failed: ${err}\n`)
|
||
return
|
||
}
|
||
|
||
// Cold-start AND steady-state share the same delivery shape: walk
|
||
// ASC-ordered events, emit each, advance cursor past the newest. The
|
||
// only difference is whether we got rows by since_id (steady-state) or
|
||
// since_secs (cold start backfill); the platform returns the same
|
||
// column shape and ordering either way.
|
||
//
|
||
// Advance cursor even on emit failure — the alternative (block on
|
||
// notification failure) would stall the channel entirely, and
|
||
// notification delivery is best-effort anyway.
|
||
if (activities.length === 0) return
|
||
for (const act of activities) {
|
||
emitNotification(mcp, workspaceId, act)
|
||
}
|
||
const newest = activities[activities.length - 1].id
|
||
if (newest !== cursor) {
|
||
cursors.set(workspaceId, newest)
|
||
saveCursors()
|
||
}
|
||
}
|
||
|
||
// ─── Cursor-support probe (startup compat check) ──────────────────────
|
||
//
|
||
// v0.2 relies on the since_id cursor on /activity (Molecule-AI/molecule-core
|
||
// PR #2354). Older platforms silently ignore the query param and return
|
||
// whatever the default time window covers, which would make us re-deliver
|
||
// the same activities on every tick — a worse silent-duplicate bug than
|
||
// any failure mode v0.1 had.
|
||
//
|
||
// Detect at startup with a known-invalid UUID. PR-#2354+ answers 410 Gone
|
||
// for any cursor that doesn't resolve to an activity_logs row. Pre-#2354
|
||
// servers ignore the param and answer 200 OK. We use the all-zero UUID
|
||
// because gen_random_uuid() will never produce it (per RFC 4122 §4.4 the
|
||
// version + variant bits are non-zero), so a 410 is unambiguous.
|
||
//
|
||
// Probe failure is fatal — the user MUST upgrade. Falling back to v0.1
|
||
// behavior would re-introduce the message-loss-on-restart bug v0.2 was
|
||
// written to fix; failing loudly is the better default.
|
||
const PROBE_CURSOR = '00000000-0000-0000-0000-000000000000'
|
||
|
||
async function probeCursorSupport(workspaceId: string): Promise<'ok' | 'too_old' | 'inconclusive'> {
|
||
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
|
||
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/activity`)
|
||
url.searchParams.set('type', 'a2a_receive')
|
||
url.searchParams.set('since_id', PROBE_CURSOR)
|
||
url.searchParams.set('limit', '1')
|
||
|
||
let resp: Response
|
||
try {
|
||
resp = await fetch(url, {
|
||
headers: { Authorization: `Bearer ${token}`, Origin: PLATFORM_URL },
|
||
signal: AbortSignal.timeout(15_000),
|
||
})
|
||
} catch (err) {
|
||
process.stderr.write(`molecule channel: probe ${workspaceId} fetch failed: ${err}\n`)
|
||
return 'inconclusive'
|
||
}
|
||
|
||
if (resp.status === 410) return 'ok'
|
||
if (resp.status === 200) return 'too_old'
|
||
|
||
// 401/403/404/5xx — orthogonal to cursor support. Probe is inconclusive;
|
||
// let the normal poll loop surface the real failure.
|
||
process.stderr.write(
|
||
`molecule channel: probe ${workspaceId} returned HTTP ${resp.status} (expected 410); ` +
|
||
`cursor support unverifiable, continuing\n`
|
||
)
|
||
return 'inconclusive'
|
||
}
|
||
|
||
// ─── Register-as-poll (startup self-register) ──────────────────────────
|
||
//
|
||
// On startup, register each watched workspace with delivery_mode=poll so
|
||
// the platform's a2a_proxy short-circuits to activity_logs (PR 2 / #2353)
|
||
// instead of trying to dispatch HTTP to a URL the operator's laptop
|
||
// doesn't have. Idempotent — the upsert in /registry/register's handler
|
||
// preserves existing values; we just declare delivery_mode and the
|
||
// agent_card.
|
||
//
|
||
// Failure here is non-fatal — the polling loop still works against a
|
||
// pre-poll-configured workspace, and a transient platform 5xx shouldn't
|
||
// block channel startup. Log loudly so misconfiguration is visible.
|
||
async function registerAsPoll(workspaceId: string): Promise<void> {
|
||
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
|
||
const body = {
|
||
id: workspaceId,
|
||
delivery_mode: 'poll',
|
||
agent_card: {
|
||
name: AGENT_NAME,
|
||
description: AGENT_DESC,
|
||
},
|
||
}
|
||
let resp: Response
|
||
try {
|
||
resp = await fetch(`${PLATFORM_URL}/registry/register`, {
|
||
method: 'POST',
|
||
headers: {
|
||
Authorization: `Bearer ${token}`,
|
||
'Content-Type': 'application/json',
|
||
Origin: PLATFORM_URL,
|
||
},
|
||
body: JSON.stringify(body),
|
||
signal: AbortSignal.timeout(15_000),
|
||
})
|
||
} catch (err) {
|
||
process.stderr.write(`molecule channel: register-as-poll ${workspaceId} fetch failed: ${err}\n`)
|
||
return
|
||
}
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
process.stderr.write(
|
||
`molecule channel: register-as-poll ${workspaceId} HTTP ${resp.status} — ${errText.slice(0, 200)}\n`
|
||
)
|
||
return
|
||
}
|
||
// Sanity-check: the platform should echo back delivery_mode=poll.
|
||
// A push reply means an older controlplane that doesn't know about
|
||
// delivery_mode yet — log so the user can identify the mismatch.
|
||
try {
|
||
const j = (await resp.json()) as { delivery_mode?: string }
|
||
if (j.delivery_mode && j.delivery_mode !== 'poll') {
|
||
process.stderr.write(
|
||
`molecule channel: register-as-poll ${workspaceId} returned delivery_mode=${j.delivery_mode} ` +
|
||
`(expected poll). Platform may predate #2339.\n`
|
||
)
|
||
}
|
||
} catch {
|
||
// Non-JSON response. Don't fail; the 2xx already tells us the upsert
|
||
// landed, and the polling loop is the source of truth for steady-state.
|
||
}
|
||
}
|
||
|
||
// ─── Notification emission ─────────────────────────────────────────────
|
||
|
||
function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry): void {
|
||
const text = extractText(act)
|
||
// Discriminate canvas-user messages (typed in the canvas chat panel) from
|
||
// peer-agent A2A traffic. The canvas wraps user chat as JSON-RPC
|
||
// message/send with source_id=null; real peers carry their workspace_id
|
||
// in source_id. The reply tool routes differently on this — canvas_user
|
||
// → /notify (lands in the user's chat), peer_agent → /a2a (proper JSON-RPC
|
||
// response to the calling peer).
|
||
const peerId = act.source_id ?? ''
|
||
const kind: 'canvas_user' | 'peer_agent' = peerId ? 'peer_agent' : 'canvas_user'
|
||
|
||
// Per the telegram channel reference: notifications/claude/channel is the
|
||
// host's hook. content becomes the conversation turn; meta is structured
|
||
// metadata Claude can reason about (workspace_id, peer_id, ts, etc.).
|
||
// image_path / attachment_* mirror telegram's shape so the host's
|
||
// attachment handling works without a custom path.
|
||
mcp.notification({
|
||
method: 'notifications/claude/channel',
|
||
params: {
|
||
content: text,
|
||
meta: {
|
||
source: 'molecule',
|
||
kind,
|
||
workspace_id: act.workspace_id,
|
||
watching_as: workspaceId,
|
||
peer_id: peerId,
|
||
method: act.method ?? '',
|
||
activity_id: act.id,
|
||
ts: act.created_at,
|
||
},
|
||
},
|
||
}).catch(err => {
|
||
process.stderr.write(`molecule channel: failed to deliver notification for ${act.id}: ${err}\n`)
|
||
})
|
||
}
|
||
|
||
// ─── MCP server ─────────────────────────────────────────────────────────
|
||
|
||
const mcp = new Server(
|
||
{ name: 'molecule', version: '0.3.0' },
|
||
{ capabilities: { tools: {} } },
|
||
)
|
||
|
||
// Tool: reply_to_workspace ----------------------------------------------
|
||
//
|
||
// Sends a reply from one of our watched workspaces. The destination is
|
||
// picked from `peer_id`:
|
||
//
|
||
// - peer_id absent / empty → canvas-user reply via POST /workspaces/:our/notify
|
||
// (lands in the My Chat panel — what users see when
|
||
// they type in the canvas)
|
||
// - peer_id present → peer-agent A2A reply via POST /workspaces/:peer/a2a
|
||
// with a proper JSON-RPC message/send envelope
|
||
//
|
||
// The notification meta.kind tells Claude which to use; this tool just
|
||
// honors whichever peer_id the caller passes.
|
||
|
||
const ReplyArgsSchema = z.object({
|
||
workspace_id: z.string().describe(
|
||
"Watched workspace_id to reply AS (must be in MOLECULE_WORKSPACE_IDS). " +
|
||
"Defaults to the workspace whose message Claude is responding to — " +
|
||
"if there's only one watched workspace, omit this."
|
||
).optional(),
|
||
peer_id: z.string().describe(
|
||
"Workspace_id of the peer to send TO (for peer_agent inbound — " +
|
||
"use notification meta.peer_id). Omit or pass empty string to reply " +
|
||
"to the canvas user via /notify (for canvas_user inbound)."
|
||
).optional(),
|
||
text: z.string().describe('Reply text. Plain text or markdown.'),
|
||
})
|
||
|
||
async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<string> {
|
||
let { workspace_id } = args
|
||
if (!workspace_id) {
|
||
if (WORKSPACE_IDS.length === 1) workspace_id = WORKSPACE_IDS[0]
|
||
else throw new Error(
|
||
`workspace_id required when watching multiple workspaces. ` +
|
||
`Watching: ${WORKSPACE_IDS.join(', ')}`
|
||
)
|
||
}
|
||
const token = TOKEN_BY_WORKSPACE.get(workspace_id)
|
||
if (!token) {
|
||
throw new Error(
|
||
`workspace_id ${workspace_id} is not in MOLECULE_WORKSPACE_IDS. ` +
|
||
`Configured: ${WORKSPACE_IDS.join(', ')}`
|
||
)
|
||
}
|
||
|
||
const peerId = args.peer_id?.trim() ?? ''
|
||
if (!peerId) {
|
||
// Canvas-user reply — POST /workspaces/:our/notify with {message: text}.
|
||
// The platform appends to the user-facing chat panel; no JSON-RPC envelope
|
||
// because there's no peer URL on the other side, just the canvas UI.
|
||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspace_id}/notify`, {
|
||
method: 'POST',
|
||
headers: {
|
||
Authorization: `Bearer ${token}`,
|
||
'Content-Type': 'application/json',
|
||
Origin: PLATFORM_URL,
|
||
},
|
||
body: JSON.stringify({ message: args.text }),
|
||
signal: AbortSignal.timeout(30_000),
|
||
})
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`notify failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return `Replied to canvas user as ${workspace_id} via /notify.`
|
||
}
|
||
|
||
// Peer-agent A2A reply — proper JSON-RPC 2.0 envelope as the platform's
|
||
// a2a_proxy expects. Empirically (verified 2026-04-29 against workspace-
|
||
// server's ProxyA2A handler), shorthand `{parts:[...]}` gets accepted but
|
||
// the platform strips params before forwarding to the peer's URL — the
|
||
// peer then sees an envelope with `params: null` and no message text.
|
||
// Wrapping in proper JSON-RPC preserves the message all the way through.
|
||
//
|
||
// `messageId` is generated client-side; the platform doesn't require it
|
||
// but peers may use it for idempotency / dedup.
|
||
const body = {
|
||
jsonrpc: '2.0',
|
||
id: crypto.randomUUID(),
|
||
method: 'message/send',
|
||
params: {
|
||
message: {
|
||
messageId: crypto.randomUUID(),
|
||
parts: [{ type: 'text', text: args.text }],
|
||
},
|
||
},
|
||
}
|
||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${peerId}/a2a`, {
|
||
method: 'POST',
|
||
headers: {
|
||
Authorization: `Bearer ${token}`,
|
||
'Content-Type': 'application/json',
|
||
'X-Source-Workspace-Id': workspace_id,
|
||
// Same-origin header for SaaS edge WAF — see pollWorkspace fetch
|
||
// for the full explanation. /workspaces/* requires it on hosted
|
||
// tenants; localhost ignores it.
|
||
Origin: PLATFORM_URL,
|
||
},
|
||
body: JSON.stringify(body),
|
||
signal: AbortSignal.timeout(30_000),
|
||
})
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`reply failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return `Reply sent from ${workspace_id} to ${peerId}.`
|
||
}
|
||
|
||
// ─── Universal-tool helpers ────────────────────────────────────────────
|
||
//
|
||
// Resolves "act AS which watched workspace" for tools that take an
|
||
// optional workspace_id distinguishing the channel-side caller from the
|
||
// target. When watching exactly one workspace it's an obvious default;
|
||
// for multi-watch, the caller must specify.
|
||
|
||
function resolveWatching(asWorkspaceId?: string): { workspaceId: string; token: string } {
|
||
let workspaceId = asWorkspaceId
|
||
if (!workspaceId) {
|
||
if (WORKSPACE_IDS.length === 1) workspaceId = WORKSPACE_IDS[0]
|
||
else throw new Error(
|
||
`_as_workspace required when watching multiple workspaces. ` +
|
||
`Watching: ${WORKSPACE_IDS.join(', ')}`
|
||
)
|
||
}
|
||
const token = TOKEN_BY_WORKSPACE.get(workspaceId)
|
||
if (!token) {
|
||
throw new Error(
|
||
`${workspaceId} is not in MOLECULE_WORKSPACE_IDS. ` +
|
||
`Configured: ${WORKSPACE_IDS.join(', ')}`
|
||
)
|
||
}
|
||
return { workspaceId, token }
|
||
}
|
||
|
||
// Standard auth headers shared by every platform call. Origin is required
|
||
// by the SaaS edge WAF — see pollWorkspace's fetch for the full story.
|
||
function platformHeaders(token: string, extra: Record<string, string> = {}): Record<string, string> {
|
||
return {
|
||
Authorization: `Bearer ${token}`,
|
||
Origin: PLATFORM_URL!,
|
||
...extra,
|
||
}
|
||
}
|
||
|
||
// Tool: list_peers ------------------------------------------------------
|
||
//
|
||
// Returns the watched workspace's view of the team — siblings, children,
|
||
// parent — so Claude can answer "who are my peers?" without a separate
|
||
// HTTP detour. Mirrors the registry endpoint backed by GET /registry/:id/peers
|
||
// (workspace-server/internal/handlers/discovery.go:Peers).
|
||
|
||
const ListPeersArgsSchema = z.object({
|
||
workspace_id: z.string().describe(
|
||
"Watched workspace_id to query peers FOR. Omit if only one watched."
|
||
).optional(),
|
||
q: z.string().describe(
|
||
"Optional case-insensitive substring filter on peer name or role."
|
||
).optional(),
|
||
})
|
||
|
||
interface Peer {
|
||
id: string
|
||
name: string
|
||
role: string | null
|
||
tier: number | null
|
||
status: string
|
||
url: string
|
||
parent_id: string | null
|
||
active_tasks: number
|
||
agent_card?: unknown
|
||
}
|
||
|
||
async function listPeers(args: z.infer<typeof ListPeersArgsSchema>): Promise<Peer[]> {
|
||
let { workspace_id } = args
|
||
if (!workspace_id) {
|
||
if (WORKSPACE_IDS.length === 1) workspace_id = WORKSPACE_IDS[0]
|
||
else throw new Error(
|
||
`workspace_id required when watching multiple workspaces. ` +
|
||
`Watching: ${WORKSPACE_IDS.join(', ')}`
|
||
)
|
||
}
|
||
const token = TOKEN_BY_WORKSPACE.get(workspace_id)
|
||
if (!token) {
|
||
throw new Error(
|
||
`workspace_id ${workspace_id} is not in MOLECULE_WORKSPACE_IDS. ` +
|
||
`Configured: ${WORKSPACE_IDS.join(', ')}`
|
||
)
|
||
}
|
||
const url = new URL(`${PLATFORM_URL}/registry/${workspace_id}/peers`)
|
||
if (args.q) url.searchParams.set('q', args.q)
|
||
const resp = await fetch(url, {
|
||
headers: {
|
||
Authorization: `Bearer ${token}`,
|
||
Origin: PLATFORM_URL,
|
||
},
|
||
signal: AbortSignal.timeout(15_000),
|
||
})
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`list_peers failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return (await resp.json()) as Peer[]
|
||
}
|
||
|
||
// Tool: get_workspace_info ---------------------------------------------
|
||
//
|
||
// Mirrors the universal `get_workspace_info` tool — returns the watched
|
||
// workspace's own identity (id, name, role, tier, parent, status, agent_card).
|
||
// Backed by GET /workspaces/:id (workspace-server's WorkspaceHandler.Get).
|
||
|
||
const GetWorkspaceInfoArgsSchema = z.object({
|
||
_as_workspace: z.string().describe(
|
||
"Watched workspace_id to introspect (omit if only one watched)."
|
||
).optional(),
|
||
})
|
||
|
||
// Pure formatter — kept exportable so server.test.ts can pin the
|
||
// message shape without mocking fetch + resolveWatching just to read
|
||
// one string. molecule-core#2429.
|
||
export function formatRemovedWorkspaceError(
|
||
workspaceId: string,
|
||
body: { id?: string; removed_at?: string; hint?: string } | null | undefined,
|
||
): string {
|
||
const safeBody = body ?? {}
|
||
const id = safeBody.id ?? workspaceId
|
||
const hint = safeBody.hint ?? 'Regenerate workspace + token from the canvas → Tokens tab.'
|
||
const removed = safeBody.removed_at ? ` at ${safeBody.removed_at}` : ''
|
||
return `Workspace ${id} was deleted on the platform${removed}. ${hint}`
|
||
}
|
||
|
||
async function getWorkspaceInfo(args: z.infer<typeof GetWorkspaceInfoArgsSchema>): Promise<unknown> {
|
||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}`, {
|
||
headers: platformHeaders(token),
|
||
signal: AbortSignal.timeout(15_000),
|
||
})
|
||
if (resp.status === 410) {
|
||
// molecule-core#2429: platform returns 410 Gone when status='removed'.
|
||
// Surface a clear "your workspace was deleted, re-onboard" error
|
||
// instead of a generic HTTP error — without this branch the operator
|
||
// sees `get_workspace_info failed: HTTP 410` and has to guess why.
|
||
let body: { id?: string; removed_at?: string; hint?: string } = {}
|
||
try {
|
||
body = await resp.json() as typeof body
|
||
} catch {
|
||
// best-effort body parse; the error message stands alone
|
||
}
|
||
throw new Error(formatRemovedWorkspaceError(workspaceId, body))
|
||
}
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`get_workspace_info failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return resp.json()
|
||
}
|
||
|
||
// Tool: send_message_to_user -------------------------------------------
|
||
//
|
||
// Mirrors the universal `send_message_to_user` tool — POST /workspaces/:id/notify.
|
||
// Lands as a chat bubble in the canvas My Chat panel. The universal tool
|
||
// also supports `attachments` (file paths inside the workspace container)
|
||
// uploaded via /chat/uploads; this channel runs on the user's local FS and
|
||
// uploads from there. Same contract — paths are absolute on whichever side
|
||
// the tool runs from.
|
||
|
||
const SendMessageToUserArgsSchema = z.object({
|
||
_as_workspace: z.string().describe(
|
||
"Watched workspace_id to send AS (omit if only one watched)."
|
||
).optional(),
|
||
message: z.string().describe(
|
||
"Caption text for the chat bubble. Required even with attachments — " +
|
||
"set to a short label like 'Here's the build:' or 'Done — see attached.'\n\n" +
|
||
"DO NOT paste file URLs in this string. Files MUST go through `attachments` " +
|
||
"so they render as a clickable download chip."
|
||
),
|
||
attachments: z.array(z.string()).describe(
|
||
"Absolute file paths on the user's local machine (e.g. ['/tmp/build.zip']). " +
|
||
"Each gets uploaded via /chat/uploads and surfaces as a download chip in " +
|
||
"the canvas. 25 MB per file cap."
|
||
).optional(),
|
||
})
|
||
|
||
async function sendMessageToUser(args: z.infer<typeof SendMessageToUserArgsSchema>): Promise<string> {
|
||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||
let attachmentRefs: unknown[] = []
|
||
if (args.attachments && args.attachments.length > 0) {
|
||
// Multipart upload — same shape as workspace/a2a_tools.py:_upload_chat_files.
|
||
// The platform stages files under /workspace/.molecule/chat-uploads (a
|
||
// canvas "allowed root") and returns metadata the notify body references.
|
||
const form = new FormData()
|
||
for (const path of args.attachments) {
|
||
const file = Bun.file(path)
|
||
if (!(await file.exists())) {
|
||
throw new Error(`attachment not found: ${path}`)
|
||
}
|
||
// Bun.file is a Blob; FormData accepts Blob with filename.
|
||
form.append('files', file, path.split('/').pop() ?? 'attachment')
|
||
}
|
||
const upResp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/chat/uploads`, {
|
||
method: 'POST',
|
||
headers: platformHeaders(token),
|
||
body: form,
|
||
signal: AbortSignal.timeout(60_000),
|
||
})
|
||
if (!upResp.ok) {
|
||
const errText = await upResp.text().catch(() => '')
|
||
throw new Error(`chat/uploads failed: HTTP ${upResp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
const upJson = (await upResp.json()) as { files?: unknown[] }
|
||
attachmentRefs = upJson.files ?? []
|
||
}
|
||
const body: Record<string, unknown> = { message: args.message }
|
||
if (attachmentRefs.length > 0) body.attachments = attachmentRefs
|
||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/notify`, {
|
||
method: 'POST',
|
||
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
|
||
body: JSON.stringify(body),
|
||
signal: AbortSignal.timeout(30_000),
|
||
})
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`notify failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return `Sent to canvas user as ${workspaceId}${attachmentRefs.length > 0 ? ` with ${attachmentRefs.length} attachment(s)` : ''}.`
|
||
}
|
||
|
||
// Tool: delegate_task (sync) -------------------------------------------
|
||
//
|
||
// Mirrors the universal `delegate_task` tool — sends an A2A message to a
|
||
// peer and waits inline for the response. POSTs to /workspaces/:peer/a2a;
|
||
// the platform's a2a_proxy forwards to the peer's URL and returns the
|
||
// peer's reply body. Use for QUICK questions; for long-running work use
|
||
// delegate_task_async + check_task_status.
|
||
|
||
const DelegateTaskArgsSchema = z.object({
|
||
_as_workspace: z.string().describe(
|
||
"Watched workspace_id to send AS (omit if only one watched)."
|
||
).optional(),
|
||
workspace_id: z.string().describe("Target peer workspace ID (from list_peers)."),
|
||
task: z.string().describe("Task description to send to the peer."),
|
||
})
|
||
|
||
async function delegateTask(args: z.infer<typeof DelegateTaskArgsSchema>): Promise<unknown> {
|
||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||
if (!args.workspace_id) throw new Error('workspace_id (target peer) is required')
|
||
if (!args.task) throw new Error('task is required')
|
||
const body = {
|
||
jsonrpc: '2.0',
|
||
id: crypto.randomUUID(),
|
||
method: 'message/send',
|
||
params: {
|
||
message: {
|
||
messageId: crypto.randomUUID(),
|
||
parts: [{ type: 'text', text: args.task }],
|
||
},
|
||
},
|
||
}
|
||
// 60s timeout because sync delegation waits for the peer to actually
|
||
// produce a response. Long-running peer work should use the async path.
|
||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${args.workspace_id}/a2a`, {
|
||
method: 'POST',
|
||
headers: platformHeaders(token, {
|
||
'Content-Type': 'application/json',
|
||
'X-Source-Workspace-Id': workspaceId,
|
||
}),
|
||
body: JSON.stringify(body),
|
||
signal: AbortSignal.timeout(60_000),
|
||
})
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`delegate_task failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return resp.json()
|
||
}
|
||
|
||
// Tool: delegate_task_async --------------------------------------------
|
||
//
|
||
// Mirrors the universal `delegate_task_async` tool — POST /workspaces/:self/delegate
|
||
// with target_id + task + idempotency_key. Returns 202 with delegation_id;
|
||
// the platform runs the A2A round-trip in the background and stores the
|
||
// result in the delegations table. Poll via check_task_status.
|
||
|
||
async function sha256Hex(s: string): Promise<string> {
|
||
const buf = await crypto.subtle.digest('SHA-256', new TextEncoder().encode(s))
|
||
return Array.from(new Uint8Array(buf)).map(b => b.toString(16).padStart(2, '0')).join('')
|
||
}
|
||
|
||
const DelegateTaskAsyncArgsSchema = DelegateTaskArgsSchema
|
||
|
||
async function delegateTaskAsync(args: z.infer<typeof DelegateTaskAsyncArgsSchema>): Promise<unknown> {
|
||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||
if (!args.workspace_id) throw new Error('workspace_id (target peer) is required')
|
||
if (!args.task) throw new Error('task is required')
|
||
// Idempotency key: SHA-256 of (target, task) so a restart firing the same
|
||
// delegation gets the existing delegation_id back instead of creating a
|
||
// duplicate (mirrors workspace/a2a_tools.py — fixes #1456 there).
|
||
const idem = (await sha256Hex(`${args.workspace_id}:${args.task}`)).slice(0, 32)
|
||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/delegate`, {
|
||
method: 'POST',
|
||
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
|
||
body: JSON.stringify({
|
||
target_id: args.workspace_id,
|
||
task: args.task,
|
||
idempotency_key: idem,
|
||
}),
|
||
signal: AbortSignal.timeout(15_000),
|
||
})
|
||
if (resp.status !== 202 && !resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`delegate_task_async failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return resp.json()
|
||
}
|
||
|
||
// Tool: check_task_status ----------------------------------------------
|
||
//
|
||
// Mirrors the universal `check_task_status` tool — GET /workspaces/:self/delegations,
|
||
// optionally filtered by delegation_id. Returns peer-reply summary + status
|
||
// (pending / in_progress / queued / completed / failed).
|
||
|
||
const CheckTaskStatusArgsSchema = z.object({
|
||
_as_workspace: z.string().describe(
|
||
"Watched workspace_id whose delegations to inspect (omit if only one watched)."
|
||
).optional(),
|
||
task_id: z.string().describe(
|
||
"delegation_id returned by delegate_task_async. Omit to list recent delegations."
|
||
).optional(),
|
||
})
|
||
|
||
async function checkTaskStatus(args: z.infer<typeof CheckTaskStatusArgsSchema>): Promise<unknown> {
|
||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/delegations`, {
|
||
headers: platformHeaders(token),
|
||
signal: AbortSignal.timeout(15_000),
|
||
})
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`check_task_status failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
const all = (await resp.json()) as Array<{ delegation_id?: string }>
|
||
if (args.task_id) {
|
||
const match = all.find(d => d.delegation_id === args.task_id)
|
||
return match ?? { status: 'not_found', delegation_id: args.task_id }
|
||
}
|
||
return { delegations: all.slice(0, 10), count: all.length }
|
||
}
|
||
|
||
// Tool: commit_memory --------------------------------------------------
|
||
//
|
||
// Mirrors the universal `commit_memory` tool — POST /workspaces/:self/memories.
|
||
// Persists across sessions. RBAC + scope (LOCAL/TEAM/GLOBAL) enforcement
|
||
// is platform-side; this tool just plumbs the call.
|
||
|
||
const CommitMemoryArgsSchema = z.object({
|
||
_as_workspace: z.string().describe(
|
||
"Watched workspace_id to commit AS (omit if only one watched)."
|
||
).optional(),
|
||
content: z.string().describe("What to remember — be specific."),
|
||
scope: z.enum(['LOCAL', 'TEAM', 'GLOBAL']).describe(
|
||
"Memory scope (default LOCAL)."
|
||
).optional(),
|
||
})
|
||
|
||
async function commitMemory(args: z.infer<typeof CommitMemoryArgsSchema>): Promise<unknown> {
|
||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||
if (!args.content) throw new Error('content is required')
|
||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/memories`, {
|
||
method: 'POST',
|
||
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
|
||
body: JSON.stringify({
|
||
content: args.content,
|
||
scope: (args.scope ?? 'LOCAL').toUpperCase(),
|
||
// Platform cross-validates this against the bearer for namespace
|
||
// isolation (workspace-server fix for GH#1610).
|
||
workspace_id: workspaceId,
|
||
}),
|
||
signal: AbortSignal.timeout(15_000),
|
||
})
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`commit_memory failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return resp.json()
|
||
}
|
||
|
||
// Tool: recall_memory --------------------------------------------------
|
||
//
|
||
// Mirrors the universal `recall_memory` tool — GET /workspaces/:self/memories.
|
||
// Returns rows accessible by scope; empty query returns all accessible.
|
||
|
||
const RecallMemoryArgsSchema = z.object({
|
||
_as_workspace: z.string().describe(
|
||
"Watched workspace_id to recall FROM (omit if only one watched)."
|
||
).optional(),
|
||
query: z.string().describe("Search query (empty returns all).").optional(),
|
||
scope: z.enum(['LOCAL', 'TEAM', 'GLOBAL', '']).describe(
|
||
"Filter by scope (empty = all accessible)."
|
||
).optional(),
|
||
})
|
||
|
||
async function recallMemory(args: z.infer<typeof RecallMemoryArgsSchema>): Promise<unknown> {
|
||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/memories`)
|
||
url.searchParams.set('workspace_id', workspaceId)
|
||
if (args.query) url.searchParams.set('q', args.query)
|
||
if (args.scope) url.searchParams.set('scope', args.scope.toUpperCase())
|
||
const resp = await fetch(url, {
|
||
headers: platformHeaders(token),
|
||
signal: AbortSignal.timeout(15_000),
|
||
})
|
||
if (!resp.ok) {
|
||
const errText = await resp.text().catch(() => '')
|
||
throw new Error(`recall_memory failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||
}
|
||
return resp.json()
|
||
}
|
||
|
||
// The tool surface mirrors workspace/platform_tools/registry.py — same
|
||
// names, same input shapes, same semantics — so an external agent driven
|
||
// through this channel has parity with an in-container agent driven by the
|
||
// universal MCP. The one channel-specific addition is `_as_workspace`,
|
||
// which disambiguates which watched workspace the tool acts AS when this
|
||
// MCP is configured to watch more than one. Underscore-prefixed so it
|
||
// can't collide with the universal contract.
|
||
|
||
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
|
||
tools: [
|
||
{
|
||
name: 'reply_to_workspace',
|
||
description:
|
||
'Reply to whoever sent the most recent inbound message. Pass peer_id ' +
|
||
'from notification meta.peer_id for peer_agent inbound (routes via /a2a); ' +
|
||
'omit peer_id (or pass empty string) for canvas_user inbound (routes via ' +
|
||
'/notify into the My Chat panel). Check meta.kind on the notification to ' +
|
||
'pick the right form.',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
workspace_id: { type: 'string', description: 'Watched workspace_id to reply as (omit if only one watched).' },
|
||
peer_id: {
|
||
type: 'string',
|
||
description:
|
||
'Workspace_id of the peer to A2A-reply to (from notification meta.peer_id). ' +
|
||
'Omit or pass empty string to /notify the canvas user instead.',
|
||
},
|
||
text: { type: 'string', description: 'Reply text (plain text or markdown).' },
|
||
},
|
||
required: ['text'],
|
||
},
|
||
},
|
||
{
|
||
name: 'delegate_task',
|
||
description:
|
||
'Delegate a task to a peer workspace via A2A and WAIT for the response (synchronous). ' +
|
||
'Use for QUICK questions and small sub-tasks; for long-running work use ' +
|
||
'delegate_task_async + check_task_status so this session does not block.',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
|
||
workspace_id: { type: 'string', description: 'Target peer workspace ID (from list_peers).' },
|
||
task: { type: 'string', description: 'Task description to send to the peer.' },
|
||
},
|
||
required: ['workspace_id', 'task'],
|
||
},
|
||
},
|
||
{
|
||
name: 'delegate_task_async',
|
||
description:
|
||
'Send a task to a peer and return immediately with a task_id (non-blocking). ' +
|
||
'Poll with check_task_status. The platform A2A queue handles delivery + retries.',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
|
||
workspace_id: { type: 'string', description: 'Target peer workspace ID (from list_peers).' },
|
||
task: { type: 'string', description: 'Task description to send to the peer.' },
|
||
},
|
||
required: ['workspace_id', 'task'],
|
||
},
|
||
},
|
||
{
|
||
name: 'check_task_status',
|
||
description:
|
||
'Poll the status of a task started with delegate_task_async; returns the result when done. ' +
|
||
'Statuses: pending/in_progress (peer working — wait), queued (peer busy with prior task — ' +
|
||
'do NOT retry), completed (result available), failed (real error).',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
_as_workspace: { type: 'string', description: 'Watched workspace_id whose delegations to inspect (omit if only one watched).' },
|
||
task_id: { type: 'string', description: 'task_id (delegation_id) returned by delegate_task_async. Omit to list recent.' },
|
||
},
|
||
},
|
||
},
|
||
{
|
||
name: 'list_peers',
|
||
description:
|
||
'List the watched workspace\'s peer agents (siblings, children, parent) as registered ' +
|
||
'in the canvas. Use first when you need to delegate but don\'t know the target\'s ID. ' +
|
||
'Access control is enforced — you only see peers your workspace can reach.',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
workspace_id: { type: 'string', description: 'Watched workspace_id to query peers for (omit if only one watched).' },
|
||
q: { type: 'string', description: 'Optional case-insensitive substring filter on peer name or role.' },
|
||
},
|
||
},
|
||
},
|
||
{
|
||
name: 'get_workspace_info',
|
||
description:
|
||
'Get the watched workspace\'s own info — id, name, role, tier, parent, status, agent_card. ' +
|
||
'Use to introspect identity (e.g. before reporting back to the user, or to determine if ' +
|
||
'this is a tier-0 root that can write GLOBAL memory).',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
_as_workspace: { type: 'string', description: 'Watched workspace_id to introspect (omit if only one watched).' },
|
||
},
|
||
},
|
||
},
|
||
{
|
||
name: 'send_message_to_user',
|
||
description:
|
||
'Send a message to the user\'s canvas chat — pushed instantly via WebSocket. Use to ' +
|
||
'(1) acknowledge a task immediately, (2) post mid-flight progress updates, (3) deliver ' +
|
||
'follow-up results, (4) attach files via the `attachments` field. NEVER paste file URLs ' +
|
||
'in `message`; always pass absolute paths in `attachments` so the platform serves them ' +
|
||
'as download chips (works on SaaS where external file hosts are unreachable).',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
|
||
message: { type: 'string', description: 'Caption text for the chat bubble. Required even with attachments.' },
|
||
attachments: {
|
||
type: 'array',
|
||
items: { type: 'string' },
|
||
description: 'Absolute file paths on the user\'s local machine. Each is uploaded via /chat/uploads and surfaces as a download chip. 25 MB cap per file.',
|
||
},
|
||
},
|
||
required: ['message'],
|
||
},
|
||
},
|
||
{
|
||
name: 'commit_memory',
|
||
description:
|
||
'Save a fact to persistent memory; survives across sessions and restarts. ' +
|
||
'Scopes: LOCAL (private to this workspace), TEAM (shared with parent + siblings), ' +
|
||
'GLOBAL (entire org — only tier-0 roots can write).',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
_as_workspace: { type: 'string', description: 'Watched workspace_id to commit AS (omit if only one watched).' },
|
||
content: { type: 'string', description: 'What to remember — be specific.' },
|
||
scope: { type: 'string', enum: ['LOCAL', 'TEAM', 'GLOBAL'], description: 'Memory scope (default LOCAL).' },
|
||
},
|
||
required: ['content'],
|
||
},
|
||
},
|
||
{
|
||
name: 'recall_memory',
|
||
description:
|
||
'Search persistent memory; returns matching LOCAL + TEAM + GLOBAL rows. ' +
|
||
'Empty query returns ALL accessible memories — cheap and avoids missing rows that ' +
|
||
'don\'t match a narrow keyword.',
|
||
inputSchema: {
|
||
type: 'object',
|
||
properties: {
|
||
_as_workspace: { type: 'string', description: 'Watched workspace_id to recall FROM (omit if only one watched).' },
|
||
query: { type: 'string', description: 'Search query (empty returns all).' },
|
||
scope: { type: 'string', enum: ['LOCAL', 'TEAM', 'GLOBAL', ''], description: 'Filter by scope (empty = all accessible).' },
|
||
},
|
||
},
|
||
},
|
||
],
|
||
}))
|
||
|
||
mcp.setRequestHandler(CallToolRequestSchema, async req => {
|
||
const args = req.params.arguments ?? {}
|
||
switch (req.params.name) {
|
||
case 'reply_to_workspace': {
|
||
const result = await replyToWorkspace(ReplyArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: result }] }
|
||
}
|
||
case 'delegate_task': {
|
||
const result = await delegateTask(DelegateTaskArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||
}
|
||
case 'delegate_task_async': {
|
||
const result = await delegateTaskAsync(DelegateTaskAsyncArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||
}
|
||
case 'check_task_status': {
|
||
const result = await checkTaskStatus(CheckTaskStatusArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||
}
|
||
case 'list_peers': {
|
||
const peers = await listPeers(ListPeersArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: JSON.stringify(peers, null, 2) }] }
|
||
}
|
||
case 'get_workspace_info': {
|
||
const info = await getWorkspaceInfo(GetWorkspaceInfoArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: JSON.stringify(info, null, 2) }] }
|
||
}
|
||
case 'send_message_to_user': {
|
||
const result = await sendMessageToUser(SendMessageToUserArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: result }] }
|
||
}
|
||
case 'commit_memory': {
|
||
const result = await commitMemory(CommitMemoryArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||
}
|
||
case 'recall_memory': {
|
||
const result = await recallMemory(RecallMemoryArgsSchema.parse(args))
|
||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||
}
|
||
default:
|
||
throw new Error(`unknown tool: ${req.params.name}`)
|
||
}
|
||
})
|
||
|
||
// ─── Boot ───────────────────────────────────────────────────────────────
|
||
|
||
loadCursors()
|
||
|
||
// Compat probe FIRST — before we open the MCP transport or self-register
|
||
// any workspaces. v0.2.1 had this probe AFTER mcp.connect+registerAsPoll,
|
||
// which had two bugs:
|
||
// 1. mcp.connect already finished the initialize handshake, so a
|
||
// probe-failure exit looked like "MCP server crashed mid-session"
|
||
// to Claude Code (which swallows the stderr explanation) instead of
|
||
// the cleaner "server failed to start" with the upgrade message.
|
||
// 2. registerAsPoll() may have already mutated the platform's
|
||
// delivery_mode for a workspace whose workspace-server can't honor
|
||
// poll, leaving the workspace in a broken state if we then exit.
|
||
// Probing first is purely a startup-ordering fix; the probe semantics
|
||
// (410 → ok, 200 → too_old, anything else → inconclusive) are unchanged.
|
||
//
|
||
// Probes run in parallel (allSettled) — sequentially they were N × 15s
|
||
// at worst, which adds up for multi-workspace channels. Order doesn't
|
||
// matter for the verdict; we only care if any one came back too_old.
|
||
{
|
||
const results = await Promise.allSettled(
|
||
WORKSPACE_IDS.map(id => probeCursorSupport(id).then(r => ({ id, r }))),
|
||
)
|
||
let anyTooOld = false
|
||
for (const settled of results) {
|
||
if (settled.status !== 'fulfilled') continue
|
||
const { id, r } = settled.value
|
||
if (r === 'too_old') {
|
||
anyTooOld = true
|
||
process.stderr.write(
|
||
`molecule channel: workspace ${id} on a platform that predates ` +
|
||
`since_id cursor support (Molecule-AI/molecule-core PR #2354).\n` +
|
||
` Symptom would be: every poll re-delivers all recent activity as if it were new.\n` +
|
||
` Fix: upgrade workspace-server to a build with /activity ?since_id=… support.\n`
|
||
)
|
||
}
|
||
}
|
||
if (anyTooOld) {
|
||
process.stderr.write(
|
||
`molecule channel: refusing to start in poll mode against an older platform. ` +
|
||
`Pin MOLECULE_PLATFORM_URL to an upgraded tenant or downgrade to plugin v0.1.\n`
|
||
)
|
||
// exit triggers the 'exit' listener, which unlinks the PID file.
|
||
process.exit(2)
|
||
}
|
||
}
|
||
|
||
const transport = new StdioServerTransport()
|
||
await mcp.connect(transport)
|
||
|
||
// Self-register each workspace as poll-mode BEFORE the first poll fires.
|
||
// Sequenced (not Promise.all) so failures are surfaced one at a time and
|
||
// the operator can spot which workspace's token is bad.
|
||
if (AUTO_REGISTER_POLL) {
|
||
for (const id of WORKSPACE_IDS) {
|
||
await registerAsPoll(id)
|
||
}
|
||
}
|
||
|
||
process.stderr.write(
|
||
`molecule channel: connected — watching ${WORKSPACE_IDS.length} workspace(s) at ${PLATFORM_URL}\n` +
|
||
` workspaces: ${WORKSPACE_IDS.join(', ')}\n` +
|
||
` delivery_mode=poll cursor=${CURSOR_FILE} auto_register=${AUTO_REGISTER_POLL}\n` +
|
||
` poll: every ${POLL_INTERVAL_MS}ms (cursor-based; ${POLL_WINDOW_SECS}s window only used for first-run seed)\n`
|
||
)
|
||
|
||
// Stagger initial polls slightly so N-workspace watchers don't all hit the
|
||
// platform at the same instant on every tick.
|
||
WORKSPACE_IDS.forEach((id, i) => {
|
||
setTimeout(() => {
|
||
void pollWorkspace(id, mcp)
|
||
setInterval(() => void pollWorkspace(id, mcp), POLL_INTERVAL_MS).unref()
|
||
}, i * 500)
|
||
})
|
||
|
||
// Clean shutdown — fire-and-forget a "disconnected" notice on each watched
|
||
// workspace's A2A so peers don't sit waiting on a silent channel.
|
||
const shutdown = (sig: string) => {
|
||
process.stderr.write(`molecule channel: ${sig} — shutting down\n`)
|
||
process.exit(0)
|
||
}
|
||
process.on('SIGINT', () => shutdown('SIGINT'))
|
||
process.on('SIGTERM', () => shutdown('SIGTERM'))
|