feat(v0.3): mirror universal-MCP surface for external workspaces
Brings the channel's tool surface up to parity with the in-container
universal MCP (workspace/platform_tools/registry.py). External agents
driven through this channel now get the same 9 tools that an in-container
agent gets — same names, same input shapes, same semantics — so a hermes
or codex session bridged via this channel can do anything a containerized
claude-code can.
Tools:
- reply_to_workspace — smart-routed (canvas_user → /notify, peer_agent → /a2a).
Was peer-only; now handles canvas chat replies too. Works for the most
common case (user types in My Chat, agent replies in My Chat) which the
v0.2 channel silently dropped.
- delegate_task / delegate_task_async / check_task_status — proactive A2A.
Previously only inbound peer messages could trigger an outbound a2a;
the agent could not initiate.
- list_peers — peer discovery. Backed by GET /registry/:id/peers.
- get_workspace_info — self-introspection. Needed for memory scope checks
(only tier-0 roots can write GLOBAL).
- send_message_to_user — canvas push with attachments. Multipart upload via
/chat/uploads then /notify, mirrors the universal tool's two-phase shape.
- commit_memory / recall_memory — persistent memory across sessions, with
LOCAL/TEAM/GLOBAL scopes. Platform enforces RBAC + scope.
Inbound delivery:
- Drop seed-then-skip on first run. The previous policy assumed events
before session start were "out of context"; in practice operators
restart Claude Code mid-conversation and EXPECT the queued messages.
Cold start now delivers everything in POLL_WINDOW_SECS and advances
the cursor past it.
- emitNotification adds meta.kind ('canvas_user' | 'peer_agent') so
Claude can pick the right reply form without re-parsing the row.
Channel-specific:
- New tools take an optional _as_workspace param to disambiguate when
watching multiple workspaces. Underscore-prefixed so it can't collide
with the universal contract (which is bound to a single workspace).
This commit is contained in:
parent
fef6a6210e
commit
bcc7db5c86
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "molecule-mcp-claude-channel",
|
||||
"version": "0.2.2",
|
||||
"version": "0.3.0",
|
||||
"description": "Molecule AI channel for Claude Code — bridges A2A traffic into a Claude Code session via MCP",
|
||||
"license": "Apache-2.0",
|
||||
"type": "module",
|
||||
|
||||
697
server.ts
697
server.ts
@ -247,15 +247,18 @@ async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
|
||||
// Steady-state: server returns rows strictly after cursor in ASC order.
|
||||
url.searchParams.set('since_id', cursor)
|
||||
} else {
|
||||
// First run for this workspace — seed the cursor from the most-recent
|
||||
// existing event WITHOUT delivering it. Without this seed the next tick
|
||||
// would also have no cursor and we'd loop forever. Seed-then-skip is
|
||||
// the right policy at startup: events that arrived BEFORE the operator
|
||||
// started this Claude session are out of context and shouldn't be
|
||||
// replayed as if they're new turns. Events arriving AFTER the seed
|
||||
// have id > cursor and will be delivered on subsequent ticks.
|
||||
// First run for this workspace — deliver every event in the POLL_WINDOW_SECS
|
||||
// backfill window, then advance the cursor past the newest. The previous
|
||||
// policy was seed-then-skip on the assumption that pre-session events
|
||||
// were "out of context", but operators routinely restart Claude Code
|
||||
// mid-conversation and EXPECT the queued message to be delivered (otherwise
|
||||
// the user typed something, restarted to enable replies, and got silence
|
||||
// — exactly the friction this channel is supposed to remove).
|
||||
//
|
||||
// Backfill is bounded by POLL_WINDOW_SECS so a long-idle restart doesn't
|
||||
// replay weeks of conversation. Set POLL_WINDOW_SECS=0 to opt out and
|
||||
// restore the old skip-on-cold-start behavior.
|
||||
url.searchParams.set('since_secs', String(POLL_WINDOW_SECS))
|
||||
url.searchParams.set('limit', '1')
|
||||
}
|
||||
|
||||
let resp: Response
|
||||
@ -306,22 +309,15 @@ async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
|
||||
return
|
||||
}
|
||||
|
||||
if (!cursor) {
|
||||
// First-run seed: take the newest activity_id (the only one returned
|
||||
// because we asked for limit=1) and remember it as our starting point.
|
||||
// Don't deliver it — see comment above.
|
||||
if (activities.length > 0) {
|
||||
cursors.set(workspaceId, activities[0].id)
|
||||
saveCursors()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Steady-state: server returned ASC-ordered rows strictly after cursor.
|
||||
// Deliver each in order; advance cursor only after we hand the event
|
||||
// off to MCP. If the notification call rejects we still advance — the
|
||||
// alternative (block on notification failure) would stall the channel
|
||||
// entirely, and notification delivery is best-effort anyway.
|
||||
// Cold-start AND steady-state share the same delivery shape: walk
|
||||
// ASC-ordered events, emit each, advance cursor past the newest. The
|
||||
// only difference is whether we got rows by since_id (steady-state) or
|
||||
// since_secs (cold start backfill); the platform returns the same
|
||||
// column shape and ordering either way.
|
||||
//
|
||||
// Advance cursor even on emit failure — the alternative (block on
|
||||
// notification failure) would stall the channel entirely, and
|
||||
// notification delivery is best-effort anyway.
|
||||
if (activities.length === 0) return
|
||||
for (const act of activities) {
|
||||
emitNotification(mcp, workspaceId, act)
|
||||
@ -485,6 +481,15 @@ function extractText(act: ActivityEntry): string {
|
||||
|
||||
function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry): void {
|
||||
const text = extractText(act)
|
||||
// Discriminate canvas-user messages (typed in the canvas chat panel) from
|
||||
// peer-agent A2A traffic. The canvas wraps user chat as JSON-RPC
|
||||
// message/send with source_id=null; real peers carry their workspace_id
|
||||
// in source_id. The reply tool routes differently on this — canvas_user
|
||||
// → /notify (lands in the user's chat), peer_agent → /a2a (proper JSON-RPC
|
||||
// response to the calling peer).
|
||||
const peerId = act.source_id ?? ''
|
||||
const kind: 'canvas_user' | 'peer_agent' = peerId ? 'peer_agent' : 'canvas_user'
|
||||
|
||||
// Per the telegram channel reference: notifications/claude/channel is the
|
||||
// host's hook. content becomes the conversation turn; meta is structured
|
||||
// metadata Claude can reason about (workspace_id, peer_id, ts, etc.).
|
||||
@ -496,9 +501,10 @@ function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry):
|
||||
content: text,
|
||||
meta: {
|
||||
source: 'molecule',
|
||||
kind,
|
||||
workspace_id: act.workspace_id,
|
||||
watching_as: workspaceId,
|
||||
peer_id: act.source_id ?? '',
|
||||
peer_id: peerId,
|
||||
method: act.method ?? '',
|
||||
activity_id: act.id,
|
||||
ts: act.created_at,
|
||||
@ -512,26 +518,35 @@ function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry):
|
||||
// ─── MCP server ─────────────────────────────────────────────────────────
|
||||
|
||||
const mcp = new Server(
|
||||
{ name: 'molecule', version: '0.2.2' },
|
||||
{ name: 'molecule', version: '0.3.0' },
|
||||
{ capabilities: { tools: {} } },
|
||||
)
|
||||
|
||||
// Tool: reply_to_workspace ----------------------------------------------
|
||||
//
|
||||
// Sends an A2A message FROM one of our watched workspaces TO the peer that
|
||||
// last messaged us (or to an explicit peer_id). Used by Claude when the
|
||||
// human operator authors a reply in this session.
|
||||
// Sends a reply from one of our watched workspaces. The destination is
|
||||
// picked from `peer_id`:
|
||||
//
|
||||
// - peer_id absent / empty → canvas-user reply via POST /workspaces/:our/notify
|
||||
// (lands in the My Chat panel — what users see when
|
||||
// they type in the canvas)
|
||||
// - peer_id present → peer-agent A2A reply via POST /workspaces/:peer/a2a
|
||||
// with a proper JSON-RPC message/send envelope
|
||||
//
|
||||
// The notification meta.kind tells Claude which to use; this tool just
|
||||
// honors whichever peer_id the caller passes.
|
||||
|
||||
const ReplyArgsSchema = z.object({
|
||||
workspace_id: z.string().describe(
|
||||
"Watched workspace_id to reply AS (must be in MOLECULE_WORKSPACE_IDS). " +
|
||||
"Defaults to the workspace whose A2A message Claude is responding to — " +
|
||||
"Defaults to the workspace whose message Claude is responding to — " +
|
||||
"if there's only one watched workspace, omit this."
|
||||
).optional(),
|
||||
peer_id: z.string().describe(
|
||||
"Workspace_id of the peer to send TO. Look at the most recent " +
|
||||
"notifications/claude/channel meta.peer_id."
|
||||
),
|
||||
"Workspace_id of the peer to send TO (for peer_agent inbound — " +
|
||||
"use notification meta.peer_id). Omit or pass empty string to reply " +
|
||||
"to the canvas user via /notify (for canvas_user inbound)."
|
||||
).optional(),
|
||||
text: z.string().describe('Reply text. Plain text or markdown.'),
|
||||
})
|
||||
|
||||
@ -551,16 +566,38 @@ async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<
|
||||
`Configured: ${WORKSPACE_IDS.join(', ')}`
|
||||
)
|
||||
}
|
||||
// A2A request shape — proper JSON-RPC 2.0 envelope as the platform's a2a_proxy
|
||||
// expects. Empirically (verified 2026-04-29 against workspace-server's
|
||||
// ProxyA2A handler), shorthand `{parts:[...]}` gets accepted but the platform
|
||||
// strips params before forwarding to the peer's URL — the peer then sees an
|
||||
// envelope with `params: null` and no message text. Wrapping in proper
|
||||
// JSON-RPC preserves the message all the way through.
|
||||
|
||||
const peerId = args.peer_id?.trim() ?? ''
|
||||
if (!peerId) {
|
||||
// Canvas-user reply — POST /workspaces/:our/notify with {message: text}.
|
||||
// The platform appends to the user-facing chat panel; no JSON-RPC envelope
|
||||
// because there's no peer URL on the other side, just the canvas UI.
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspace_id}/notify`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
'Content-Type': 'application/json',
|
||||
Origin: PLATFORM_URL,
|
||||
},
|
||||
body: JSON.stringify({ message: args.text }),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`notify failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return `Replied to canvas user as ${workspace_id} via /notify.`
|
||||
}
|
||||
|
||||
// Peer-agent A2A reply — proper JSON-RPC 2.0 envelope as the platform's
|
||||
// a2a_proxy expects. Empirically (verified 2026-04-29 against workspace-
|
||||
// server's ProxyA2A handler), shorthand `{parts:[...]}` gets accepted but
|
||||
// the platform strips params before forwarding to the peer's URL — the
|
||||
// peer then sees an envelope with `params: null` and no message text.
|
||||
// Wrapping in proper JSON-RPC preserves the message all the way through.
|
||||
//
|
||||
// `messageId` is generated client-side; the platform doesn't require it but
|
||||
// peers may use it for idempotency / dedup. Random hex matches the a2a-sdk
|
||||
// convention.
|
||||
// `messageId` is generated client-side; the platform doesn't require it
|
||||
// but peers may use it for idempotency / dedup.
|
||||
const body = {
|
||||
jsonrpc: '2.0',
|
||||
id: crypto.randomUUID(),
|
||||
@ -572,7 +609,7 @@ async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<
|
||||
},
|
||||
},
|
||||
}
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${args.peer_id}/a2a`, {
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${peerId}/a2a`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
@ -590,45 +627,591 @@ async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`reply failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return `Reply sent from ${workspace_id} to ${args.peer_id}.`
|
||||
return `Reply sent from ${workspace_id} to ${peerId}.`
|
||||
}
|
||||
|
||||
// ─── Universal-tool helpers ────────────────────────────────────────────
|
||||
//
|
||||
// Resolves "act AS which watched workspace" for tools that take an
|
||||
// optional workspace_id distinguishing the channel-side caller from the
|
||||
// target. When watching exactly one workspace it's an obvious default;
|
||||
// for multi-watch, the caller must specify.
|
||||
|
||||
function resolveWatching(asWorkspaceId?: string): { workspaceId: string; token: string } {
|
||||
let workspaceId = asWorkspaceId
|
||||
if (!workspaceId) {
|
||||
if (WORKSPACE_IDS.length === 1) workspaceId = WORKSPACE_IDS[0]
|
||||
else throw new Error(
|
||||
`_as_workspace required when watching multiple workspaces. ` +
|
||||
`Watching: ${WORKSPACE_IDS.join(', ')}`
|
||||
)
|
||||
}
|
||||
const token = TOKEN_BY_WORKSPACE.get(workspaceId)
|
||||
if (!token) {
|
||||
throw new Error(
|
||||
`${workspaceId} is not in MOLECULE_WORKSPACE_IDS. ` +
|
||||
`Configured: ${WORKSPACE_IDS.join(', ')}`
|
||||
)
|
||||
}
|
||||
return { workspaceId, token }
|
||||
}
|
||||
|
||||
// Standard auth headers shared by every platform call. Origin is required
|
||||
// by the SaaS edge WAF — see pollWorkspace's fetch for the full story.
|
||||
function platformHeaders(token: string, extra: Record<string, string> = {}): Record<string, string> {
|
||||
return {
|
||||
Authorization: `Bearer ${token}`,
|
||||
Origin: PLATFORM_URL!,
|
||||
...extra,
|
||||
}
|
||||
}
|
||||
|
||||
// Tool: list_peers ------------------------------------------------------
|
||||
//
|
||||
// Returns the watched workspace's view of the team — siblings, children,
|
||||
// parent — so Claude can answer "who are my peers?" without a separate
|
||||
// HTTP detour. Mirrors the registry endpoint backed by GET /registry/:id/peers
|
||||
// (workspace-server/internal/handlers/discovery.go:Peers).
|
||||
|
||||
const ListPeersArgsSchema = z.object({
|
||||
workspace_id: z.string().describe(
|
||||
"Watched workspace_id to query peers FOR. Omit if only one watched."
|
||||
).optional(),
|
||||
q: z.string().describe(
|
||||
"Optional case-insensitive substring filter on peer name or role."
|
||||
).optional(),
|
||||
})
|
||||
|
||||
interface Peer {
|
||||
id: string
|
||||
name: string
|
||||
role: string | null
|
||||
tier: number | null
|
||||
status: string
|
||||
url: string
|
||||
parent_id: string | null
|
||||
active_tasks: number
|
||||
agent_card?: unknown
|
||||
}
|
||||
|
||||
async function listPeers(args: z.infer<typeof ListPeersArgsSchema>): Promise<Peer[]> {
|
||||
let { workspace_id } = args
|
||||
if (!workspace_id) {
|
||||
if (WORKSPACE_IDS.length === 1) workspace_id = WORKSPACE_IDS[0]
|
||||
else throw new Error(
|
||||
`workspace_id required when watching multiple workspaces. ` +
|
||||
`Watching: ${WORKSPACE_IDS.join(', ')}`
|
||||
)
|
||||
}
|
||||
const token = TOKEN_BY_WORKSPACE.get(workspace_id)
|
||||
if (!token) {
|
||||
throw new Error(
|
||||
`workspace_id ${workspace_id} is not in MOLECULE_WORKSPACE_IDS. ` +
|
||||
`Configured: ${WORKSPACE_IDS.join(', ')}`
|
||||
)
|
||||
}
|
||||
const url = new URL(`${PLATFORM_URL}/registry/${workspace_id}/peers`)
|
||||
if (args.q) url.searchParams.set('q', args.q)
|
||||
const resp = await fetch(url, {
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
Origin: PLATFORM_URL,
|
||||
},
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`list_peers failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return (await resp.json()) as Peer[]
|
||||
}
|
||||
|
||||
// Tool: get_workspace_info ---------------------------------------------
|
||||
//
|
||||
// Mirrors the universal `get_workspace_info` tool — returns the watched
|
||||
// workspace's own identity (id, name, role, tier, parent, status, agent_card).
|
||||
// Backed by GET /workspaces/:id (workspace-server's WorkspaceHandler.Get).
|
||||
|
||||
const GetWorkspaceInfoArgsSchema = z.object({
|
||||
_as_workspace: z.string().describe(
|
||||
"Watched workspace_id to introspect (omit if only one watched)."
|
||||
).optional(),
|
||||
})
|
||||
|
||||
async function getWorkspaceInfo(args: z.infer<typeof GetWorkspaceInfoArgsSchema>): Promise<unknown> {
|
||||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}`, {
|
||||
headers: platformHeaders(token),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`get_workspace_info failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return resp.json()
|
||||
}
|
||||
|
||||
// Tool: send_message_to_user -------------------------------------------
|
||||
//
|
||||
// Mirrors the universal `send_message_to_user` tool — POST /workspaces/:id/notify.
|
||||
// Lands as a chat bubble in the canvas My Chat panel. The universal tool
|
||||
// also supports `attachments` (file paths inside the workspace container)
|
||||
// uploaded via /chat/uploads; this channel runs on the user's local FS and
|
||||
// uploads from there. Same contract — paths are absolute on whichever side
|
||||
// the tool runs from.
|
||||
|
||||
const SendMessageToUserArgsSchema = z.object({
|
||||
_as_workspace: z.string().describe(
|
||||
"Watched workspace_id to send AS (omit if only one watched)."
|
||||
).optional(),
|
||||
message: z.string().describe(
|
||||
"Caption text for the chat bubble. Required even with attachments — " +
|
||||
"set to a short label like 'Here's the build:' or 'Done — see attached.'\n\n" +
|
||||
"DO NOT paste file URLs in this string. Files MUST go through `attachments` " +
|
||||
"so they render as a clickable download chip."
|
||||
),
|
||||
attachments: z.array(z.string()).describe(
|
||||
"Absolute file paths on the user's local machine (e.g. ['/tmp/build.zip']). " +
|
||||
"Each gets uploaded via /chat/uploads and surfaces as a download chip in " +
|
||||
"the canvas. 25 MB per file cap."
|
||||
).optional(),
|
||||
})
|
||||
|
||||
async function sendMessageToUser(args: z.infer<typeof SendMessageToUserArgsSchema>): Promise<string> {
|
||||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||||
let attachmentRefs: unknown[] = []
|
||||
if (args.attachments && args.attachments.length > 0) {
|
||||
// Multipart upload — same shape as workspace/a2a_tools.py:_upload_chat_files.
|
||||
// The platform stages files under /workspace/.molecule/chat-uploads (a
|
||||
// canvas "allowed root") and returns metadata the notify body references.
|
||||
const form = new FormData()
|
||||
for (const path of args.attachments) {
|
||||
const file = Bun.file(path)
|
||||
if (!(await file.exists())) {
|
||||
throw new Error(`attachment not found: ${path}`)
|
||||
}
|
||||
// Bun.file is a Blob; FormData accepts Blob with filename.
|
||||
form.append('files', file, path.split('/').pop() ?? 'attachment')
|
||||
}
|
||||
const upResp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/chat/uploads`, {
|
||||
method: 'POST',
|
||||
headers: platformHeaders(token),
|
||||
body: form,
|
||||
signal: AbortSignal.timeout(60_000),
|
||||
})
|
||||
if (!upResp.ok) {
|
||||
const errText = await upResp.text().catch(() => '')
|
||||
throw new Error(`chat/uploads failed: HTTP ${upResp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
const upJson = (await upResp.json()) as { files?: unknown[] }
|
||||
attachmentRefs = upJson.files ?? []
|
||||
}
|
||||
const body: Record<string, unknown> = { message: args.message }
|
||||
if (attachmentRefs.length > 0) body.attachments = attachmentRefs
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/notify`, {
|
||||
method: 'POST',
|
||||
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
|
||||
body: JSON.stringify(body),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`notify failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return `Sent to canvas user as ${workspaceId}${attachmentRefs.length > 0 ? ` with ${attachmentRefs.length} attachment(s)` : ''}.`
|
||||
}
|
||||
|
||||
// Tool: delegate_task (sync) -------------------------------------------
|
||||
//
|
||||
// Mirrors the universal `delegate_task` tool — sends an A2A message to a
|
||||
// peer and waits inline for the response. POSTs to /workspaces/:peer/a2a;
|
||||
// the platform's a2a_proxy forwards to the peer's URL and returns the
|
||||
// peer's reply body. Use for QUICK questions; for long-running work use
|
||||
// delegate_task_async + check_task_status.
|
||||
|
||||
const DelegateTaskArgsSchema = z.object({
|
||||
_as_workspace: z.string().describe(
|
||||
"Watched workspace_id to send AS (omit if only one watched)."
|
||||
).optional(),
|
||||
workspace_id: z.string().describe("Target peer workspace ID (from list_peers)."),
|
||||
task: z.string().describe("Task description to send to the peer."),
|
||||
})
|
||||
|
||||
async function delegateTask(args: z.infer<typeof DelegateTaskArgsSchema>): Promise<unknown> {
|
||||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||||
if (!args.workspace_id) throw new Error('workspace_id (target peer) is required')
|
||||
if (!args.task) throw new Error('task is required')
|
||||
const body = {
|
||||
jsonrpc: '2.0',
|
||||
id: crypto.randomUUID(),
|
||||
method: 'message/send',
|
||||
params: {
|
||||
message: {
|
||||
messageId: crypto.randomUUID(),
|
||||
parts: [{ type: 'text', text: args.task }],
|
||||
},
|
||||
},
|
||||
}
|
||||
// 60s timeout because sync delegation waits for the peer to actually
|
||||
// produce a response. Long-running peer work should use the async path.
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${args.workspace_id}/a2a`, {
|
||||
method: 'POST',
|
||||
headers: platformHeaders(token, {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Source-Workspace-Id': workspaceId,
|
||||
}),
|
||||
body: JSON.stringify(body),
|
||||
signal: AbortSignal.timeout(60_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`delegate_task failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return resp.json()
|
||||
}
|
||||
|
||||
// Tool: delegate_task_async --------------------------------------------
|
||||
//
|
||||
// Mirrors the universal `delegate_task_async` tool — POST /workspaces/:self/delegate
|
||||
// with target_id + task + idempotency_key. Returns 202 with delegation_id;
|
||||
// the platform runs the A2A round-trip in the background and stores the
|
||||
// result in the delegations table. Poll via check_task_status.
|
||||
|
||||
async function sha256Hex(s: string): Promise<string> {
|
||||
const buf = await crypto.subtle.digest('SHA-256', new TextEncoder().encode(s))
|
||||
return Array.from(new Uint8Array(buf)).map(b => b.toString(16).padStart(2, '0')).join('')
|
||||
}
|
||||
|
||||
const DelegateTaskAsyncArgsSchema = DelegateTaskArgsSchema
|
||||
|
||||
async function delegateTaskAsync(args: z.infer<typeof DelegateTaskAsyncArgsSchema>): Promise<unknown> {
|
||||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||||
if (!args.workspace_id) throw new Error('workspace_id (target peer) is required')
|
||||
if (!args.task) throw new Error('task is required')
|
||||
// Idempotency key: SHA-256 of (target, task) so a restart firing the same
|
||||
// delegation gets the existing delegation_id back instead of creating a
|
||||
// duplicate (mirrors workspace/a2a_tools.py — fixes #1456 there).
|
||||
const idem = (await sha256Hex(`${args.workspace_id}:${args.task}`)).slice(0, 32)
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/delegate`, {
|
||||
method: 'POST',
|
||||
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
|
||||
body: JSON.stringify({
|
||||
target_id: args.workspace_id,
|
||||
task: args.task,
|
||||
idempotency_key: idem,
|
||||
}),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
})
|
||||
if (resp.status !== 202 && !resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`delegate_task_async failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return resp.json()
|
||||
}
|
||||
|
||||
// Tool: check_task_status ----------------------------------------------
|
||||
//
|
||||
// Mirrors the universal `check_task_status` tool — GET /workspaces/:self/delegations,
|
||||
// optionally filtered by delegation_id. Returns peer-reply summary + status
|
||||
// (pending / in_progress / queued / completed / failed).
|
||||
|
||||
const CheckTaskStatusArgsSchema = z.object({
|
||||
_as_workspace: z.string().describe(
|
||||
"Watched workspace_id whose delegations to inspect (omit if only one watched)."
|
||||
).optional(),
|
||||
task_id: z.string().describe(
|
||||
"delegation_id returned by delegate_task_async. Omit to list recent delegations."
|
||||
).optional(),
|
||||
})
|
||||
|
||||
async function checkTaskStatus(args: z.infer<typeof CheckTaskStatusArgsSchema>): Promise<unknown> {
|
||||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/delegations`, {
|
||||
headers: platformHeaders(token),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`check_task_status failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
const all = (await resp.json()) as Array<{ delegation_id?: string }>
|
||||
if (args.task_id) {
|
||||
const match = all.find(d => d.delegation_id === args.task_id)
|
||||
return match ?? { status: 'not_found', delegation_id: args.task_id }
|
||||
}
|
||||
return { delegations: all.slice(0, 10), count: all.length }
|
||||
}
|
||||
|
||||
// Tool: commit_memory --------------------------------------------------
|
||||
//
|
||||
// Mirrors the universal `commit_memory` tool — POST /workspaces/:self/memories.
|
||||
// Persists across sessions. RBAC + scope (LOCAL/TEAM/GLOBAL) enforcement
|
||||
// is platform-side; this tool just plumbs the call.
|
||||
|
||||
const CommitMemoryArgsSchema = z.object({
|
||||
_as_workspace: z.string().describe(
|
||||
"Watched workspace_id to commit AS (omit if only one watched)."
|
||||
).optional(),
|
||||
content: z.string().describe("What to remember — be specific."),
|
||||
scope: z.enum(['LOCAL', 'TEAM', 'GLOBAL']).describe(
|
||||
"Memory scope (default LOCAL)."
|
||||
).optional(),
|
||||
})
|
||||
|
||||
async function commitMemory(args: z.infer<typeof CommitMemoryArgsSchema>): Promise<unknown> {
|
||||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||||
if (!args.content) throw new Error('content is required')
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${workspaceId}/memories`, {
|
||||
method: 'POST',
|
||||
headers: platformHeaders(token, { 'Content-Type': 'application/json' }),
|
||||
body: JSON.stringify({
|
||||
content: args.content,
|
||||
scope: (args.scope ?? 'LOCAL').toUpperCase(),
|
||||
// Platform cross-validates this against the bearer for namespace
|
||||
// isolation (workspace-server fix for GH#1610).
|
||||
workspace_id: workspaceId,
|
||||
}),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`commit_memory failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return resp.json()
|
||||
}
|
||||
|
||||
// Tool: recall_memory --------------------------------------------------
|
||||
//
|
||||
// Mirrors the universal `recall_memory` tool — GET /workspaces/:self/memories.
|
||||
// Returns rows accessible by scope; empty query returns all accessible.
|
||||
|
||||
const RecallMemoryArgsSchema = z.object({
|
||||
_as_workspace: z.string().describe(
|
||||
"Watched workspace_id to recall FROM (omit if only one watched)."
|
||||
).optional(),
|
||||
query: z.string().describe("Search query (empty returns all).").optional(),
|
||||
scope: z.enum(['LOCAL', 'TEAM', 'GLOBAL', '']).describe(
|
||||
"Filter by scope (empty = all accessible)."
|
||||
).optional(),
|
||||
})
|
||||
|
||||
async function recallMemory(args: z.infer<typeof RecallMemoryArgsSchema>): Promise<unknown> {
|
||||
const { workspaceId, token } = resolveWatching(args._as_workspace)
|
||||
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/memories`)
|
||||
url.searchParams.set('workspace_id', workspaceId)
|
||||
if (args.query) url.searchParams.set('q', args.query)
|
||||
if (args.scope) url.searchParams.set('scope', args.scope.toUpperCase())
|
||||
const resp = await fetch(url, {
|
||||
headers: platformHeaders(token),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`recall_memory failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return resp.json()
|
||||
}
|
||||
|
||||
// The tool surface mirrors workspace/platform_tools/registry.py — same
|
||||
// names, same input shapes, same semantics — so an external agent driven
|
||||
// through this channel has parity with an in-container agent driven by the
|
||||
// universal MCP. The one channel-specific addition is `_as_workspace`,
|
||||
// which disambiguates which watched workspace the tool acts AS when this
|
||||
// MCP is configured to watch more than one. Underscore-prefixed so it
|
||||
// can't collide with the universal contract.
|
||||
|
||||
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
|
||||
tools: [
|
||||
{
|
||||
name: 'reply_to_workspace',
|
||||
description:
|
||||
'Reply to a Molecule A2A peer that messaged one of our watched workspaces. ' +
|
||||
'Use after seeing a notifications/claude/channel inbound message.',
|
||||
'Reply to whoever sent the most recent inbound message. Pass peer_id ' +
|
||||
'from notification meta.peer_id for peer_agent inbound (routes via /a2a); ' +
|
||||
'omit peer_id (or pass empty string) for canvas_user inbound (routes via ' +
|
||||
'/notify into the My Chat panel). Check meta.kind on the notification to ' +
|
||||
'pick the right form.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workspace_id: {
|
||||
type: 'string',
|
||||
description: 'Watched workspace_id to reply as (omit if only one watched).',
|
||||
},
|
||||
workspace_id: { type: 'string', description: 'Watched workspace_id to reply as (omit if only one watched).' },
|
||||
peer_id: {
|
||||
type: 'string',
|
||||
description: 'Workspace_id of the peer to reply to (from notification meta.peer_id).',
|
||||
description:
|
||||
'Workspace_id of the peer to A2A-reply to (from notification meta.peer_id). ' +
|
||||
'Omit or pass empty string to /notify the canvas user instead.',
|
||||
},
|
||||
text: {
|
||||
type: 'string',
|
||||
description: 'Reply text (plain text or markdown).',
|
||||
text: { type: 'string', description: 'Reply text (plain text or markdown).' },
|
||||
},
|
||||
required: ['text'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'delegate_task',
|
||||
description:
|
||||
'Delegate a task to a peer workspace via A2A and WAIT for the response (synchronous). ' +
|
||||
'Use for QUICK questions and small sub-tasks; for long-running work use ' +
|
||||
'delegate_task_async + check_task_status so this session does not block.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
|
||||
workspace_id: { type: 'string', description: 'Target peer workspace ID (from list_peers).' },
|
||||
task: { type: 'string', description: 'Task description to send to the peer.' },
|
||||
},
|
||||
required: ['workspace_id', 'task'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'delegate_task_async',
|
||||
description:
|
||||
'Send a task to a peer and return immediately with a task_id (non-blocking). ' +
|
||||
'Poll with check_task_status. The platform A2A queue handles delivery + retries.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
|
||||
workspace_id: { type: 'string', description: 'Target peer workspace ID (from list_peers).' },
|
||||
task: { type: 'string', description: 'Task description to send to the peer.' },
|
||||
},
|
||||
required: ['workspace_id', 'task'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'check_task_status',
|
||||
description:
|
||||
'Poll the status of a task started with delegate_task_async; returns the result when done. ' +
|
||||
'Statuses: pending/in_progress (peer working — wait), queued (peer busy with prior task — ' +
|
||||
'do NOT retry), completed (result available), failed (real error).',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
_as_workspace: { type: 'string', description: 'Watched workspace_id whose delegations to inspect (omit if only one watched).' },
|
||||
task_id: { type: 'string', description: 'task_id (delegation_id) returned by delegate_task_async. Omit to list recent.' },
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'list_peers',
|
||||
description:
|
||||
'List the watched workspace\'s peer agents (siblings, children, parent) as registered ' +
|
||||
'in the canvas. Use first when you need to delegate but don\'t know the target\'s ID. ' +
|
||||
'Access control is enforced — you only see peers your workspace can reach.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workspace_id: { type: 'string', description: 'Watched workspace_id to query peers for (omit if only one watched).' },
|
||||
q: { type: 'string', description: 'Optional case-insensitive substring filter on peer name or role.' },
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'get_workspace_info',
|
||||
description:
|
||||
'Get the watched workspace\'s own info — id, name, role, tier, parent, status, agent_card. ' +
|
||||
'Use to introspect identity (e.g. before reporting back to the user, or to determine if ' +
|
||||
'this is a tier-0 root that can write GLOBAL memory).',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
_as_workspace: { type: 'string', description: 'Watched workspace_id to introspect (omit if only one watched).' },
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'send_message_to_user',
|
||||
description:
|
||||
'Send a message to the user\'s canvas chat — pushed instantly via WebSocket. Use to ' +
|
||||
'(1) acknowledge a task immediately, (2) post mid-flight progress updates, (3) deliver ' +
|
||||
'follow-up results, (4) attach files via the `attachments` field. NEVER paste file URLs ' +
|
||||
'in `message`; always pass absolute paths in `attachments` so the platform serves them ' +
|
||||
'as download chips (works on SaaS where external file hosts are unreachable).',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
_as_workspace: { type: 'string', description: 'Watched workspace_id to send AS (omit if only one watched).' },
|
||||
message: { type: 'string', description: 'Caption text for the chat bubble. Required even with attachments.' },
|
||||
attachments: {
|
||||
type: 'array',
|
||||
items: { type: 'string' },
|
||||
description: 'Absolute file paths on the user\'s local machine. Each is uploaded via /chat/uploads and surfaces as a download chip. 25 MB cap per file.',
|
||||
},
|
||||
},
|
||||
required: ['peer_id', 'text'],
|
||||
required: ['message'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'commit_memory',
|
||||
description:
|
||||
'Save a fact to persistent memory; survives across sessions and restarts. ' +
|
||||
'Scopes: LOCAL (private to this workspace), TEAM (shared with parent + siblings), ' +
|
||||
'GLOBAL (entire org — only tier-0 roots can write).',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
_as_workspace: { type: 'string', description: 'Watched workspace_id to commit AS (omit if only one watched).' },
|
||||
content: { type: 'string', description: 'What to remember — be specific.' },
|
||||
scope: { type: 'string', enum: ['LOCAL', 'TEAM', 'GLOBAL'], description: 'Memory scope (default LOCAL).' },
|
||||
},
|
||||
required: ['content'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'recall_memory',
|
||||
description:
|
||||
'Search persistent memory; returns matching LOCAL + TEAM + GLOBAL rows. ' +
|
||||
'Empty query returns ALL accessible memories — cheap and avoids missing rows that ' +
|
||||
'don\'t match a narrow keyword.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
_as_workspace: { type: 'string', description: 'Watched workspace_id to recall FROM (omit if only one watched).' },
|
||||
query: { type: 'string', description: 'Search query (empty returns all).' },
|
||||
scope: { type: 'string', enum: ['LOCAL', 'TEAM', 'GLOBAL', ''], description: 'Filter by scope (empty = all accessible).' },
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
}))
|
||||
|
||||
mcp.setRequestHandler(CallToolRequestSchema, async req => {
|
||||
const args = req.params.arguments ?? {}
|
||||
switch (req.params.name) {
|
||||
case 'reply_to_workspace': {
|
||||
const args = ReplyArgsSchema.parse(req.params.arguments ?? {})
|
||||
const result = await replyToWorkspace(args)
|
||||
const result = await replyToWorkspace(ReplyArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: result }] }
|
||||
}
|
||||
case 'delegate_task': {
|
||||
const result = await delegateTask(DelegateTaskArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||||
}
|
||||
case 'delegate_task_async': {
|
||||
const result = await delegateTaskAsync(DelegateTaskAsyncArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||||
}
|
||||
case 'check_task_status': {
|
||||
const result = await checkTaskStatus(CheckTaskStatusArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||||
}
|
||||
case 'list_peers': {
|
||||
const peers = await listPeers(ListPeersArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: JSON.stringify(peers, null, 2) }] }
|
||||
}
|
||||
case 'get_workspace_info': {
|
||||
const info = await getWorkspaceInfo(GetWorkspaceInfoArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: JSON.stringify(info, null, 2) }] }
|
||||
}
|
||||
case 'send_message_to_user': {
|
||||
const result = await sendMessageToUser(SendMessageToUserArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: result }] }
|
||||
}
|
||||
case 'commit_memory': {
|
||||
const result = await commitMemory(CommitMemoryArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||||
}
|
||||
case 'recall_memory': {
|
||||
const result = await recallMemory(RecallMemoryArgsSchema.parse(args))
|
||||
return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }] }
|
||||
}
|
||||
default:
|
||||
throw new Error(`unknown tool: ${req.params.name}`)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user