Merge pull request #19 from Molecule-AI/feat/v0.3-canvas-user-reply-and-peer-discovery

feat(v0.3): mirror universal-MCP surface for external workspaces
This commit is contained in:
Hongming Wang 2026-04-30 15:05:52 -07:00 committed by GitHub
commit 40fa2d3810
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 641 additions and 58 deletions

View File

@ -1,6 +1,6 @@
{
"name": "molecule-mcp-claude-channel",
"version": "0.2.2",
"version": "0.3.0",
"description": "Molecule AI channel for Claude Code — bridges A2A traffic into a Claude Code session via MCP",
"license": "Apache-2.0",
"type": "module",

697
server.ts
View File

@ -247,15 +247,18 @@ async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
// Steady-state: server returns rows strictly after cursor in ASC order.
url.searchParams.set('since_id', cursor)
} else {
// First run for this workspace — seed the cursor from the most-recent
// existing event WITHOUT delivering it. Without this seed the next tick
// would also have no cursor and we'd loop forever. Seed-then-skip is
// the right policy at startup: events that arrived BEFORE the operator
// started this Claude session are out of context and shouldn't be
// replayed as if they're new turns. Events arriving AFTER the seed
// have id > cursor and will be delivered on subsequent ticks.
// First run for this workspace — deliver every event in the POLL_WINDOW_SECS
// backfill window, then advance the cursor past the newest. The previous
// policy was seed-then-skip on the assumption that pre-session events
// were "out of context", but operators routinely restart Claude Code
// mid-conversation and EXPECT the queued message to be delivered (otherwise
// the user typed something, restarted to enable replies, and got silence
// — exactly the friction this channel is supposed to remove).
//
// Backfill is bounded by POLL_WINDOW_SECS so a long-idle restart doesn't
// replay weeks of conversation. Set POLL_WINDOW_SECS=0 to opt out and
// restore the old skip-on-cold-start behavior.
url.searchParams.set('since_secs', String(POLL_WINDOW_SECS))
url.searchParams.set('limit', '1')
}
let resp: Response
@ -306,22 +309,15 @@ async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
return
}
if (!cursor) {
// First-run seed: take the newest activity_id (the only one returned
// because we asked for limit=1) and remember it as our starting point.
// Don't deliver it — see comment above.
if (activities.length > 0) {
cursors.set(workspaceId, activities[0].id)
saveCursors()
}
return
}
// Steady-state: server returned ASC-ordered rows strictly after cursor.
// Deliver each in order; advance cursor only after we hand the event
// off to MCP. If the notification call rejects we still advance — the
// alternative (block on notification failure) would stall the channel
// entirely, and notification delivery is best-effort anyway.
// Cold-start AND steady-state share the same delivery shape: walk
// ASC-ordered events, emit each, advance cursor past the newest. The
// only difference is whether we got rows by since_id (steady-state) or
// since_secs (cold start backfill); the platform returns the same
// column shape and ordering either way.
//
// Advance cursor even on emit failure — the alternative (block on
// notification failure) would stall the channel entirely, and
// notification delivery is best-effort anyway.
if (activities.length === 0) return
for (const act of activities) {
emitNotification(mcp, workspaceId, act)
@ -485,6 +481,15 @@ function extractText(act: ActivityEntry): string {
function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry): void {
const text = extractText(act)
// Discriminate canvas-user messages (typed in the canvas chat panel) from
// peer-agent A2A traffic. The canvas wraps user chat as JSON-RPC
// message/send with source_id=null; real peers carry their workspace_id
// in source_id. The reply tool routes differently on this — canvas_user
// → /notify (lands in the user's chat), peer_agent → /a2a (proper JSON-RPC
// response to the calling peer).
const peerId = act.source_id ?? ''
const kind: 'canvas_user' | 'peer_agent' = peerId ? 'peer_agent' : 'canvas_user'
// Per the telegram channel reference: notifications/claude/channel is the
// host's hook. content becomes the conversation turn; meta is structured
// metadata Claude can reason about (workspace_id, peer_id, ts, etc.).
@ -496,9 +501,10 @@ function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry):
content: text,
meta: {
source: 'molecule',
kind,
workspace_id: act.workspace_id,
watching_as: workspaceId,
peer_id: act.source_id ?? '',
peer_id: peerId,
method: act.method ?? '',
activity_id: act.id,
ts: act.created_at,
@ -512,26 +518,35 @@ function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry):
// ─── MCP server ─────────────────────────────────────────────────────────
const mcp = new Server(
{ name: 'molecule', version: '0.2.2' },
{ name: 'molecule', version: '0.3.0' },
{ capabilities: { tools: {} } },
)
// Tool: reply_to_workspace ----------------------------------------------
//
// Sends an A2A message FROM one of our watched workspaces TO the peer that
// last messaged us (or to an explicit peer_id). Used by Claude when the
// human operator authors a reply in this session.
// Sends a reply from one of our watched workspaces. The destination is
// picked from `peer_id`:
//
// - peer_id absent / empty → canvas-user reply via POST /workspaces/:our/notify
// (lands in the My Chat panel — what users see when
// they type in the canvas)
// - peer_id present → peer-agent A2A reply via POST /workspaces/:peer/a2a
// with a proper JSON-RPC message/send envelope
//
// The notification meta.kind tells Claude which to use; this tool just
// honors whichever peer_id the caller passes.
const ReplyArgsSchema = z.object({
workspace_id: z.string().describe(
"Watched workspace_id to reply AS (must be in MOLECULE_WORKSPACE_IDS). " +
"Defaults to the workspace whose A2A message Claude is responding to — " +
"Defaults to the workspace whose message Claude is responding to — " +
"if there's only one watched workspace, omit this."
).optional(),
peer_id: z.string().describe(
"Workspace_id of the peer to send TO. Look at the most recent " +
"notifications/claude/channel meta.peer_id."
),
"Workspace_id of the peer to send TO (for peer_agent inbound — " +
"use notification meta.peer_id). Omit or pass empty string to reply " +
"to the canvas user via /notify (for canvas_user inbound)."
).optional(),
text: z.string().describe('Reply text. Plain text or markdown.'),
})
@ -551,16 +566,38 @@ async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<
`Configured: ${WORKSPACE_IDS.join(', ')}`
)
}
// A2A request shape — proper JSON-RPC 2.0 envelope as the platform's a2a_proxy
// expects. Empirically (verified 2026-04-29 against workspace-server's
// ProxyA2A handler), shorthand `{parts:[...]}` gets accepted but the platform
// strips params before forwarding to the peer's URL — the peer then sees an
// envelope with `params: null` and no message text. Wrapping in proper
// JSON-RPC preserves the message all the way through.
const peerId = args.peer_id?.trim() ?? ''
if (!peerId) {
// Canvas-user reply — POST /workspaces/:our/notify with {message: text}.
// The platform appends to the user-facing chat panel; no JSON-RPC envelope
// because there's no peer URL on the other side, just the canvas UI.
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspace_id}/notify`, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
Origin: PLATFORM_URL,
},
body: JSON.stringify({ message: args.text }),
signal: AbortSignal.timeout(30_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`notify failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return `Replied to canvas user as ${workspace_id} via /notify.`
}
// Peer-agent A2A reply — proper JSON-RPC 2.0 envelope as the platform's
// a2a_proxy expects. Empirically (verified 2026-04-29 against workspace-
// server's ProxyA2A handler), shorthand `{parts:[...]}` gets accepted but
// the platform strips params before forwarding to the peer's URL — the
// peer then sees an envelope with `params: null` and no message text.
// Wrapping in proper JSON-RPC preserves the message all the way through.
//
// `messageId` is generated client-side; the platform doesn't require it but
// peers may use it for idempotency / dedup. Random hex matches the a2a-sdk
// convention.
// `messageId` is generated client-side; the platform doesn't require it
// but peers may use it for idempotency / dedup.
const body = {
jsonrpc: '2.0',
id: crypto.randomUUID(),
@ -572,7 +609,7 @@ async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<
},
},
}
const resp = await fetch(`${PLATFORM_URL}/workspaces/${args.peer_id}/a2a`, {
const resp = await fetch(`${PLATFORM_URL}/workspaces/${peerId}/a2a`, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
@ -590,45 +627,591 @@ async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<
const errText = await resp.text().catch(() => '')
throw new Error(`reply failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return `Reply sent from ${workspace_id} to ${args.peer_id}.`
return `Reply sent from ${workspace_id} to ${peerId}.`
}
// ─── Universal-tool helpers ────────────────────────────────────────────
//
// Resolves "act AS which watched workspace" for tools that take an
// optional workspace_id distinguishing the channel-side caller from the
// target. When watching exactly one workspace it's an obvious default;
// for multi-watch, the caller must specify.
function resolveWatching(asWorkspaceId?: string): { workspaceId: string; token: string } {
let workspaceId = asWorkspaceId
if (!workspaceId) {
if (WORKSPACE_IDS.length === 1) workspaceId = WORKSPACE_IDS[0]
else throw new Error(
`_as_workspace required when watching multiple workspaces. ` +
`Watching: ${WORKSPACE_IDS.join(', ')}`
)
}
const token = TOKEN_BY_WORKSPACE.get(workspaceId)
if (!token) {
throw new Error(
`${workspaceId} is not in MOLECULE_WORKSPACE_IDS. ` +
`Configured: ${WORKSPACE_IDS.join(', ')}`
)
}
return { workspaceId, token }
}
// Standard auth headers shared by every platform call. Origin is required
// by the SaaS edge WAF — see pollWorkspace's fetch for the full story.
function platformHeaders(token: string, extra: Record<string, string> = {}): Record<string, string> {
return {
Authorization: `Bearer ${token}`,
Origin: PLATFORM_URL!,
...extra,
}
}
// Tool: list_peers ------------------------------------------------------
//
// Returns the watched workspace's view of the team — siblings, children,
// parent — so Claude can answer "who are my peers?" without a separate
// HTTP detour. Mirrors the registry endpoint backed by GET /registry/:id/peers
// (workspace-server/internal/handlers/discovery.go:Peers).
const ListPeersArgsSchema = z.object({
workspace_id: z.string().describe(
"Watched workspace_id to query peers FOR. Omit if only one watched."
).optional(),
q: z.string().describe(
"Optional case-insensitive substring filter on peer name or role."
).optional(),
})
interface Peer {
id: string
name: string
role: string | null
tier: number | null
status: string
url: string
parent_id: string | null
active_tasks: number
agent_card?: unknown
}
async function listPeers(args: z.infer<typeof ListPeersArgsSchema>): Promise<Peer[]> {
let { workspace_id } = args
if (!workspace_id) {
if (WORKSPACE_IDS.length === 1) workspace_id = WORKSPACE_IDS[0]
else throw new Error(
`workspace_id required when watching multiple workspaces. ` +
`Watching: ${WORKSPACE_IDS.join(', ')}`
)
}
const token = TOKEN_BY_WORKSPACE.get(workspace_id)
if (!token) {
throw new Error(
`workspace_id ${workspace_id} is not in MOLECULE_WORKSPACE_IDS. ` +
`Configured: ${WORKSPACE_IDS.join(', ')}`
)
}
const url = new URL(`${PLATFORM_URL}/registry/${workspace_id}/peers`)
if (args.q) url.searchParams.set('q', args.q)
const resp = await fetch(url, {
headers: {
Authorization: `Bearer ${token}`,
Origin: PLATFORM_URL,
},
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`list_peers failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return (await resp.json()) as Peer[]
}
// Tool: get_workspace_info ---------------------------------------------
//
// Mirrors the universal `get_workspace_info` tool — returns the watched
// workspace's own identity (id, name, role, tier, parent, status, agent_card).
// Backed by GET /workspaces/:id (workspace-server's WorkspaceHandler.Get).
const GetWorkspaceInfoArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to introspect (omit if only one watched)."
).optional(),
})
async function getWorkspaceInfo(args: z.infer<typeof GetWorkspaceInfoArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}`, {
headers: platformHeaders(token),
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`get_workspace_info failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// Tool: send_message_to_user -------------------------------------------
//
// Mirrors the universal `send_message_to_user` tool — POST /workspaces/:id/notify.
// Lands as a chat bubble in the canvas My Chat panel. The universal tool
// also supports `attachments` (file paths inside the workspace container)
// uploaded via /chat/uploads; this channel runs on the user's local FS and
// uploads from there. Same contract — paths are absolute on whichever side
// the tool runs from.
const SendMessageToUserArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to send AS (omit if only one watched)."
).optional(),
message: z.string().describe(
"Caption text for the chat bubble. Required even with attachments — " +
"set to a short label like 'Here's the build:' or 'Done — see attached.'\n\n" +
"DO NOT paste file URLs in this string. Files MUST go through `attachments` " +
"so they render as a clickable download chip."
),
attachments: z.array(z.string()).describe(
"Absolute file paths on the user's local machine (e.g. ['/tmp/build.zip']). " +
"Each gets uploaded via /chat/uploads and surfaces as a download chip in " +
"the canvas. 25 MB per file cap."
).optional(),
})
async function sendMessageToUser(args: z.infer<typeof SendMessageToUserArgsSchema>): Promise<string> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
let attachmentRefs: unknown[] = []
if (args.attachments && args.attachments.length > 0) {
// Multipart upload — same shape as workspace/a2a_tools.py:_upload_chat_files.
// The platform stages files under /workspace/.molecule/chat-uploads (a
// canvas "allowed root") and returns metadata the notify body references.
const form = new FormData()
for (const path of args.attachments) {
const file = Bun.file(path)
if (!(await file.exists())) {
throw new Error(`attachment not found: ${path}`)
}
// Bun.file is a Blob; FormData accepts Blob with filename.
form.append('files', file, path.split('/').pop() ?? 'attachment')
}
const upResp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/chat/uploads`, {
method: 'POST',
headers: platformHeaders(token),
body: form,
signal: AbortSignal.timeout(60_000),
})
if (!upResp.ok) {
const errText = await upResp.text().catch(() => '')
throw new Error(`chat/uploads failed: HTTP ${upResp.status}${errText.slice(0, 200)}`)
}
const upJson = (await upResp.json()) as { files?: unknown[] }
attachmentRefs = upJson.files ?? []
}
const body: Record<string, unknown> = { message: args.message }
if (attachmentRefs.length > 0) body.attachments = attachmentRefs
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/notify`, {
method: 'POST',
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
body: JSON.stringify(body),
signal: AbortSignal.timeout(30_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`notify failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return `Sent to canvas user as ${workspaceId}${attachmentRefs.length > 0 ? ` with ${attachmentRefs.length} attachment(s)` : ''}.`
}
// Tool: delegate_task (sync) -------------------------------------------
//
// Mirrors the universal `delegate_task` tool — sends an A2A message to a
// peer and waits inline for the response. POSTs to /workspaces/:peer/a2a;
// the platform's a2a_proxy forwards to the peer's URL and returns the
// peer's reply body. Use for QUICK questions; for long-running work use
// delegate_task_async + check_task_status.
const DelegateTaskArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to send AS (omit if only one watched)."
).optional(),
workspace_id: z.string().describe("Target peer workspace ID (from list_peers)."),
task: z.string().describe("Task description to send to the peer."),
})
async function delegateTask(args: z.infer<typeof DelegateTaskArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
if (!args.workspace_id) throw new Error('workspace_id (target peer) is required')
if (!args.task) throw new Error('task is required')
const body = {
jsonrpc: '2.0',
id: crypto.randomUUID(),
method: 'message/send',
params: {
message: {
messageId: crypto.randomUUID(),
parts: [{ type: 'text', text: args.task }],
},
},
}
// 60s timeout because sync delegation waits for the peer to actually
// produce a response. Long-running peer work should use the async path.
const resp = await fetch(`${PLATFORM_URL}/workspaces/${args.workspace_id}/a2a`, {
method: 'POST',
headers: platformHeaders(token, {
'Content-Type': 'application/json',
'X-Source-Workspace-Id': workspaceId,
}),
body: JSON.stringify(body),
signal: AbortSignal.timeout(60_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`delegate_task failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// Tool: delegate_task_async --------------------------------------------
//
// Mirrors the universal `delegate_task_async` tool — POST /workspaces/:self/delegate
// with target_id + task + idempotency_key. Returns 202 with delegation_id;
// the platform runs the A2A round-trip in the background and stores the
// result in the delegations table. Poll via check_task_status.
async function sha256Hex(s: string): Promise<string> {
const buf = await crypto.subtle.digest('SHA-256', new TextEncoder().encode(s))
return Array.from(new Uint8Array(buf)).map(b => b.toString(16).padStart(2, '0')).join('')
}
const DelegateTaskAsyncArgsSchema = DelegateTaskArgsSchema
async function delegateTaskAsync(args: z.infer<typeof DelegateTaskAsyncArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
if (!args.workspace_id) throw new Error('workspace_id (target peer) is required')
if (!args.task) throw new Error('task is required')
// Idempotency key: SHA-256 of (target, task) so a restart firing the same
// delegation gets the existing delegation_id back instead of creating a
// duplicate (mirrors workspace/a2a_tools.py — fixes #1456 there).
const idem = (await sha256Hex(`${args.workspace_id}:${args.task}`)).slice(0, 32)
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/delegate`, {
method: 'POST',
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
body: JSON.stringify({
target_id: args.workspace_id,
task: args.task,
idempotency_key: idem,
}),
signal: AbortSignal.timeout(15_000),
})
if (resp.status !== 202 && !resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`delegate_task_async failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// Tool: check_task_status ----------------------------------------------
//
// Mirrors the universal `check_task_status` tool — GET /workspaces/:self/delegations,
// optionally filtered by delegation_id. Returns peer-reply summary + status
// (pending / in_progress / queued / completed / failed).
const CheckTaskStatusArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id whose delegations to inspect (omit if only one watched)."
).optional(),
task_id: z.string().describe(
"delegation_id returned by delegate_task_async. Omit to list recent delegations."
).optional(),
})
async function checkTaskStatus(args: z.infer<typeof CheckTaskStatusArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/delegations`, {
headers: platformHeaders(token),
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`check_task_status failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
const all = (await resp.json()) as Array<{ delegation_id?: string }>
if (args.task_id) {
const match = all.find(d => d.delegation_id === args.task_id)
return match ?? { status: 'not_found', delegation_id: args.task_id }
}
return { delegations: all.slice(0, 10), count: all.length }
}
// Tool: commit_memory --------------------------------------------------
//
// Mirrors the universal `commit_memory` tool — POST /workspaces/:self/memories.
// Persists across sessions. RBAC + scope (LOCAL/TEAM/GLOBAL) enforcement
// is platform-side; this tool just plumbs the call.
const CommitMemoryArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to commit AS (omit if only one watched)."
).optional(),
content: z.string().describe("What to remember — be specific."),
scope: z.enum(['LOCAL', 'TEAM', 'GLOBAL']).describe(
"Memory scope (default LOCAL)."
).optional(),
})
async function commitMemory(args: z.infer<typeof CommitMemoryArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
if (!args.content) throw new Error('content is required')
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/memories`, {
method: 'POST',
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
body: JSON.stringify({
content: args.content,
scope: (args.scope ?? 'LOCAL').toUpperCase(),
// Platform cross-validates this against the bearer for namespace
// isolation (workspace-server fix for GH#1610).
workspace_id: workspaceId,
}),
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`commit_memory failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// Tool: recall_memory --------------------------------------------------
//
// Mirrors the universal `recall_memory` tool — GET /workspaces/:self/memories.
// Returns rows accessible by scope; empty query returns all accessible.
const RecallMemoryArgsSchema = z.object({
_as_workspace: z.string().describe(
"Watched workspace_id to recall FROM (omit if only one watched)."
).optional(),
query: z.string().describe("Search query (empty returns all).").optional(),
scope: z.enum(['LOCAL', 'TEAM', 'GLOBAL', '']).describe(
"Filter by scope (empty = all accessible)."
).optional(),
})
async function recallMemory(args: z.infer<typeof RecallMemoryArgsSchema>): Promise<unknown> {
const { workspaceId, token } = resolveWatching(args._as_workspace)
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/memories`)
url.searchParams.set('workspace_id', workspaceId)
if (args.query) url.searchParams.set('q', args.query)
if (args.scope) url.searchParams.set('scope', args.scope.toUpperCase())
const resp = await fetch(url, {
headers: platformHeaders(token),
signal: AbortSignal.timeout(15_000),
})
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
throw new Error(`recall_memory failed: HTTP ${resp.status}${errText.slice(0, 200)}`)
}
return resp.json()
}
// The tool surface mirrors workspace/platform_tools/registry.py — same
// names, same input shapes, same semantics — so an external agent driven
// through this channel has parity with an in-container agent driven by the
// universal MCP. The one channel-specific addition is `_as_workspace`,
// which disambiguates which watched workspace the tool acts AS when this
// MCP is configured to watch more than one. Underscore-prefixed so it
// can't collide with the universal contract.
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'reply_to_workspace',
description:
'Reply to a Molecule A2A peer that messaged one of our watched workspaces. ' +
'Use after seeing a notifications/claude/channel inbound message.',
'Reply to whoever sent the most recent inbound message. Pass peer_id ' +
'from notification meta.peer_id for peer_agent inbound (routes via /a2a); ' +
'omit peer_id (or pass empty string) for canvas_user inbound (routes via ' +
'/notify into the My Chat panel). Check meta.kind on the notification to ' +
'pick the right form.',
inputSchema: {
type: 'object',
properties: {
workspace_id: {
type: 'string',
description: 'Watched workspace_id to reply as (omit if only one watched).',
},
workspace_id: { type: 'string', description: 'Watched workspace_id to reply as (omit if only one watched).' },
peer_id: {
type: 'string',
description: 'Workspace_id of the peer to reply to (from notification meta.peer_id).',
description:
'Workspace_id of the peer to A2A-reply to (from notification meta.peer_id). ' +
'Omit or pass empty string to /notify the canvas user instead.',
},
text: {
type: 'string',
description: 'Reply text (plain text or markdown).',
text: { type: 'string', description: 'Reply text (plain text or markdown).' },
},
required: ['text'],
},
},
required: ['peer_id', 'text'],
{
name: 'delegate_task',
description:
'Delegate a task to a peer workspace via A2A and WAIT for the response (synchronous). ' +
'Use for QUICK questions and small sub-tasks; for long-running work use ' +
'delegate_task_async + check_task_status so this session does not block.',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
workspace_id: { type: 'string', description: 'Target peer workspace ID (from list_peers).' },
task: { type: 'string', description: 'Task description to send to the peer.' },
},
required: ['workspace_id', 'task'],
},
},
{
name: 'delegate_task_async',
description:
'Send a task to a peer and return immediately with a task_id (non-blocking). ' +
'Poll with check_task_status. The platform A2A queue handles delivery + retries.',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
workspace_id: { type: 'string', description: 'Target peer workspace ID (from list_peers).' },
task: { type: 'string', description: 'Task description to send to the peer.' },
},
required: ['workspace_id', 'task'],
},
},
{
name: 'check_task_status',
description:
'Poll the status of a task started with delegate_task_async; returns the result when done. ' +
'Statuses: pending/in_progress (peer working — wait), queued (peer busy with prior task — ' +
'do NOT retry), completed (result available), failed (real error).',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id whose delegations to inspect (omit if only one watched).' },
task_id: { type: 'string', description: 'task_id (delegation_id) returned by delegate_task_async. Omit to list recent.' },
},
},
},
{
name: 'list_peers',
description:
'List the watched workspace\'s peer agents (siblings, children, parent) as registered ' +
'in the canvas. Use first when you need to delegate but don\'t know the target\'s ID. ' +
'Access control is enforced — you only see peers your workspace can reach.',
inputSchema: {
type: 'object',
properties: {
workspace_id: { type: 'string', description: 'Watched workspace_id to query peers for (omit if only one watched).' },
q: { type: 'string', description: 'Optional case-insensitive substring filter on peer name or role.' },
},
},
},
{
name: 'get_workspace_info',
description:
'Get the watched workspace\'s own info — id, name, role, tier, parent, status, agent_card. ' +
'Use to introspect identity (e.g. before reporting back to the user, or to determine if ' +
'this is a tier-0 root that can write GLOBAL memory).',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to introspect (omit if only one watched).' },
},
},
},
{
name: 'send_message_to_user',
description:
'Send a message to the user\'s canvas chat — pushed instantly via WebSocket. Use to ' +
'(1) acknowledge a task immediately, (2) post mid-flight progress updates, (3) deliver ' +
'follow-up results, (4) attach files via the `attachments` field. NEVER paste file URLs ' +
'in `message`; always pass absolute paths in `attachments` so the platform serves them ' +
'as download chips (works on SaaS where external file hosts are unreachable).',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
message: { type: 'string', description: 'Caption text for the chat bubble. Required even with attachments.' },
attachments: {
type: 'array',
items: { type: 'string' },
description: 'Absolute file paths on the user\'s local machine. Each is uploaded via /chat/uploads and surfaces as a download chip. 25 MB cap per file.',
},
},
required: ['message'],
},
},
{
name: 'commit_memory',
description:
'Save a fact to persistent memory; survives across sessions and restarts. ' +
'Scopes: LOCAL (private to this workspace), TEAM (shared with parent + siblings), ' +
'GLOBAL (entire org — only tier-0 roots can write).',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to commit AS (omit if only one watched).' },
content: { type: 'string', description: 'What to remember — be specific.' },
scope: { type: 'string', enum: ['LOCAL', 'TEAM', 'GLOBAL'], description: 'Memory scope (default LOCAL).' },
},
required: ['content'],
},
},
{
name: 'recall_memory',
description:
'Search persistent memory; returns matching LOCAL + TEAM + GLOBAL rows. ' +
'Empty query returns ALL accessible memories — cheap and avoids missing rows that ' +
'don\'t match a narrow keyword.',
inputSchema: {
type: 'object',
properties: {
_as_workspace: { type: 'string', description: 'Watched workspace_id to recall FROM (omit if only one watched).' },
query: { type: 'string', description: 'Search query (empty returns all).' },
scope: { type: 'string', enum: ['LOCAL', 'TEAM', 'GLOBAL', ''], description: 'Filter by scope (empty = all accessible).' },
},
},
},
],
}))
mcp.setRequestHandler(CallToolRequestSchema, async req => {
const args = req.params.arguments ?? {}
switch (req.params.name) {
case 'reply_to_workspace': {
const args = ReplyArgsSchema.parse(req.params.arguments ?? {})
const result = await replyToWorkspace(args)
const result = await replyToWorkspace(ReplyArgsSchema.parse(args))
return { content: [{ type: 'text', text: result }] }
}
case 'delegate_task': {
const result = await delegateTask(DelegateTaskArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
case 'delegate_task_async': {
const result = await delegateTaskAsync(DelegateTaskAsyncArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
case 'check_task_status': {
const result = await checkTaskStatus(CheckTaskStatusArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
case 'list_peers': {
const peers = await listPeers(ListPeersArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(peers, null, 2) }] }
}
case 'get_workspace_info': {
const info = await getWorkspaceInfo(GetWorkspaceInfoArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(info, null, 2) }] }
}
case 'send_message_to_user': {
const result = await sendMessageToUser(SendMessageToUserArgsSchema.parse(args))
return { content: [{ type: 'text', text: result }] }
}
case 'commit_memory': {
const result = await commitMemory(CommitMemoryArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
case 'recall_memory': {
const result = await recallMemory(RecallMemoryArgsSchema.parse(args))
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
}
default:
throw new Error(`unknown tool: ${req.params.name}`)
}