feat: initial scaffold — Molecule channel plugin for Claude Code
Bridges Molecule A2A traffic into a Claude Code session via MCP. Inbound A2A messages from watched workspaces surface as conversation turns (notifications/claude/channel); replies route back through the existing POST /workspaces/:id/a2a endpoint via the reply_to_workspace MCP tool. Architecture: - Polling-based inbound (uses /activity?since_secs= shipped in molecule-core PR #2300). Works through every NAT/firewall, no tunnel required — optimized for laptop-launched Claude Code sessions vs the existing push-based external-agent flow that needs ngrok. - Per-workspace bearer auth (MOLECULE_WORKSPACE_TOKENS, comma-separated to match MOLECULE_WORKSPACE_IDS). Same token covers /activity (read) and /a2a (write). - Singleton lock at ~/.claude/channels/molecule/bot.pid prevents two channel servers racing the dedup state. - Dedup by activity.id; 30s overlap window over a 5s poll interval protects against missed ticks (laptop sleep, transient network blips). v0.1 ships: - .claude-plugin/plugin.json, .mcp.json, package.json, LICENSE (Apache-2.0) - server.ts: MCP server with notification emission + reply_to_workspace tool - README: install + .env config + architecture notes + v0.2 roadmap v0.1 explicit non-goals (tracked in README): - No push-mode inbound (requires tunnel; deferred to v0.2) - No pairing flow (manual .env tokens; canvas pairing in v0.2) - No file-attachment download (URLs surface in meta; host fetches on-demand) - No outbound channel-init (only replies; start_workspace_chat in v0.2) Mirrors the architecture of @claude-plugins-official/telegram v0.0.6 (MCP notification contract: notifications/claude/channel with {content, meta}) so the host's existing channel-handling logic works without custom adapters. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
commit
d07363cbe5
12
.claude-plugin/plugin.json
Normal file
12
.claude-plugin/plugin.json
Normal file
@ -0,0 +1,12 @@
|
||||
{
|
||||
"name": "molecule",
|
||||
"description": "Molecule AI channel for Claude Code — bridges Molecule A2A traffic into a Claude Code session via MCP. Subscribe to one or more Molecule workspaces; A2A messages from peers surface as conversation turns; replies route back through Molecule's A2A endpoints.",
|
||||
"version": "0.1.0",
|
||||
"keywords": [
|
||||
"molecule",
|
||||
"molecule-ai",
|
||||
"a2a",
|
||||
"channel",
|
||||
"mcp"
|
||||
]
|
||||
}
|
||||
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
node_modules/
|
||||
bun.lock
|
||||
*.log
|
||||
.env
|
||||
.env.local
|
||||
8
.mcp.json
Normal file
8
.mcp.json
Normal file
@ -0,0 +1,8 @@
|
||||
{
|
||||
"mcpServers": {
|
||||
"molecule": {
|
||||
"command": "bun",
|
||||
"args": ["run", "--cwd", "${CLAUDE_PLUGIN_ROOT}", "--shell=bun", "--silent", "start"]
|
||||
}
|
||||
}
|
||||
}
|
||||
17
LICENSE
Normal file
17
LICENSE
Normal file
@ -0,0 +1,17 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
Copyright 2026 Molecule AI
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
136
README.md
Normal file
136
README.md
Normal file
@ -0,0 +1,136 @@
|
||||
# molecule-mcp-claude-channel
|
||||
|
||||
Claude Code channel plugin for [Molecule AI](https://moleculesai.app). Bridges Molecule A2A traffic into a Claude Code session: peer messages from your watched workspaces surface as conversation turns, and your replies route back through Molecule's A2A.
|
||||
|
||||
## What it does
|
||||
|
||||
When you launch Claude Code with this plugin enabled and configure it to watch one or more Molecule workspaces, every A2A message your watched workspaces receive shows up in the session as a user-turn. You reply normally; the plugin's MCP `reply_to_workspace` tool sends the response back through Molecule.
|
||||
|
||||
```
|
||||
Molecule peer ──A2A──> [your workspace] ──poll──> [this plugin] ──MCP notification──> Claude Code session
|
||||
^ │
|
||||
└────────── POST /workspaces/:id/a2a ◄── reply_to_workspace tool ──┘
|
||||
```
|
||||
|
||||
No tunnel. No public endpoint. The plugin polls your tenant for new A2A activity (using the `?since_secs=` filter on `/workspaces/:id/activity`); replies POST back to `/workspaces/:peer_id/a2a` via the same bearer token.
|
||||
|
||||
## Install
|
||||
|
||||
```bash
|
||||
claude --channels plugin:molecule@Molecule-AI/molecule-mcp-claude-channel
|
||||
```
|
||||
|
||||
On first launch the plugin creates `~/.claude/channels/molecule/` and exits with a config-missing error pointing at `.env`. Fill it in:
|
||||
|
||||
```
|
||||
# ~/.claude/channels/molecule/.env
|
||||
|
||||
# Required
|
||||
MOLECULE_PLATFORM_URL=https://your-tenant.staging.moleculesai.app
|
||||
MOLECULE_WORKSPACE_IDS=ws-uuid-1,ws-uuid-2
|
||||
MOLECULE_WORKSPACE_TOKENS=tok-1,tok-2
|
||||
|
||||
# Optional
|
||||
MOLECULE_POLL_INTERVAL_MS=5000 # default 5s
|
||||
MOLECULE_POLL_WINDOW_SECS=30 # default 30s — overlap protects against missed ticks
|
||||
```
|
||||
|
||||
The `.env` file is `chmod 600` after first read; tokens never appear in environment-block-style `claude doctor` dumps.
|
||||
|
||||
Re-launch Claude Code:
|
||||
|
||||
```bash
|
||||
claude --channels plugin:molecule@Molecule-AI/molecule-mcp-claude-channel
|
||||
```
|
||||
|
||||
You should see on stderr:
|
||||
|
||||
```
|
||||
molecule channel: connected — watching 2 workspace(s) at https://your-tenant.staging.moleculesai.app
|
||||
workspaces: ws-uuid-1, ws-uuid-2
|
||||
poll: every 5000ms with 30s window
|
||||
```
|
||||
|
||||
## Getting workspace_id + token
|
||||
|
||||
Every Molecule workspace has a workspace-scoped bearer that authenticates against `/activity` (read) and `/a2a` (write). Two ways to get one:
|
||||
|
||||
### From Canvas (recommended)
|
||||
|
||||
1. Open the workspace in Canvas
|
||||
2. Settings tab → "Auth tokens" → **Create channel token**
|
||||
3. Copy the workspace_id (UUID at the top) and the token (shown once)
|
||||
|
||||
### From the API
|
||||
|
||||
```bash
|
||||
curl -X POST "$MOLECULE_PLATFORM_URL/admin/workspaces/$WORKSPACE_ID/tokens" \
|
||||
-H "Authorization: Bearer $ADMIN_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"label": "claude-channel"}'
|
||||
```
|
||||
|
||||
## How replies work
|
||||
|
||||
When a peer's message lands in your session, the meta block carries the routing data Claude needs:
|
||||
|
||||
```json
|
||||
{
|
||||
"method": "notifications/claude/channel",
|
||||
"params": {
|
||||
"content": "Hey, can you take a look at this? <issue body>",
|
||||
"meta": {
|
||||
"source": "molecule",
|
||||
"workspace_id": "ws-uuid-1",
|
||||
"watching_as": "ws-uuid-1",
|
||||
"peer_id": "ws-uuid-pm-coordinator",
|
||||
"method": "user_message",
|
||||
"activity_id": "act-...",
|
||||
"ts": "2026-04-29T..."
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Claude can call `reply_to_workspace({peer_id, text})` to send the response back. If only one workspace is watched, `workspace_id` is implicit. Multi-workspace setups need the watched id explicitly.
|
||||
|
||||
## Architecture notes
|
||||
|
||||
### Why polling instead of push?
|
||||
|
||||
The existing external-agent integration in Molecule uses **push**: register an inbound URL, platform POSTs A2A to that URL. That's lower latency but requires a tunnel (ngrok/Cloudflare) or a static IP — non-trivial for a laptop-launched Claude Code session.
|
||||
|
||||
This plugin uses **polling** as the default because it works through every NAT/firewall with zero infra. The cost is up to `MOLECULE_POLL_INTERVAL_MS` (default 5s) of inbound latency. For production setups where lower latency matters, a future `MOLECULE_INBOUND_MODE=push` can opt into the existing register-and-receive flow.
|
||||
|
||||
### Why `since_secs=30` overlapping a `5s` poll interval?
|
||||
|
||||
A single missed tick (transient network blip, GC pause, laptop sleep) shouldn't lose messages. The plugin re-fetches the last 30 seconds on every poll and dedups by `activity_id`, so 25 seconds of overlap is the recovery margin. Set `MOLECULE_POLL_WINDOW_SECS` higher for noisier networks.
|
||||
|
||||
### Singleton lock
|
||||
|
||||
Only one channel server can poll a given workspace set at a time — multiple instances would race the dedup state and double-deliver. The plugin maintains a PID file at `~/.claude/channels/molecule/bot.pid` and on startup kills any stale predecessor (matches the telegram channel pattern).
|
||||
|
||||
### File attachments
|
||||
|
||||
A2A messages can carry `Part` entries with `url` and `media_type`. The MVP delivers attachments by-reference (URL surfaces in the meta block, Claude can fetch via the workspace_secrets-scoped token); inline image-content delivery (mirroring telegram's `image_path` mechanism) is a v0.2 feature.
|
||||
|
||||
## Limitations (v0.1)
|
||||
|
||||
- **Polling-only inbound.** No push mode yet; latency floor is `MOLECULE_POLL_INTERVAL_MS`.
|
||||
- **No pairing flow.** Tokens are configured manually via `.env`; no canvas-side approval handshake. Add `MOLECULE_ACCESS_MODE=pair` (mirroring telegram) in v0.2.
|
||||
- **No file-attachment download.** URLs surface in the meta block; the host fetches on-demand.
|
||||
- **No outbound channel-init.** The plugin only sends replies (in response to inbound A2A); starting a fresh A2A conversation initiated FROM the channel side requires a future `start_workspace_chat` tool.
|
||||
|
||||
## Compatibility
|
||||
|
||||
- **molecule-runtime/workspace-server**: requires the `?since_secs=` query parameter on `GET /workspaces/:id/activity` (shipped in molecule-core PR #2300, available staging-onward).
|
||||
- **Claude Code**: tested against the channel-plugin contract that expects `notifications/claude/channel` with `{content, meta}` (matches `@claude-plugins-official/telegram` v0.0.6).
|
||||
- **bun**: the MCP server runs under bun for fast startup; `package.json` `start` does `bun install --no-summary && bun server.ts` so no global install needed.
|
||||
|
||||
## Contributing
|
||||
|
||||
Single-file MCP server. The whole bridge lives in `server.ts`. Open issues at [Molecule-AI/molecule-mcp-claude-channel](https://github.com/Molecule-AI/molecule-mcp-claude-channel/issues).
|
||||
|
||||
## License
|
||||
|
||||
Apache-2.0 — see LICENSE.
|
||||
14
package.json
Normal file
14
package.json
Normal file
@ -0,0 +1,14 @@
|
||||
{
|
||||
"name": "molecule-mcp-claude-channel",
|
||||
"version": "0.1.0",
|
||||
"description": "Molecule AI channel for Claude Code — bridges A2A traffic into a Claude Code session via MCP",
|
||||
"license": "Apache-2.0",
|
||||
"type": "module",
|
||||
"bin": "./server.ts",
|
||||
"scripts": {
|
||||
"start": "bun install --no-summary && bun server.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"@modelcontextprotocol/sdk": "^1.0.0"
|
||||
}
|
||||
}
|
||||
386
server.ts
Normal file
386
server.ts
Normal file
@ -0,0 +1,386 @@
|
||||
#!/usr/bin/env bun
|
||||
/**
|
||||
* Molecule AI channel for Claude Code.
|
||||
*
|
||||
* MCP server that bridges Molecule A2A traffic into the active Claude Code
|
||||
* session and routes Claude's replies back through Molecule's A2A endpoints.
|
||||
*
|
||||
* Inbound (A2A → Claude turn): polls each watched workspace's
|
||||
* GET /workspaces/:id/activity?since_secs=N&type=a2a_receive
|
||||
* and emits an MCP `notifications/claude/channel` for each new event.
|
||||
* Polling (vs push) is the default because it works through every NAT/firewall
|
||||
* with zero infra — no tunnel required. For production setups with a public
|
||||
* inbound URL, see #2 in the README ("push mode", future).
|
||||
*
|
||||
* Outbound (Claude reply → A2A): exposes the `reply_to_workspace` and
|
||||
* `start_workspace_chat` MCP tools that POST to /workspaces/:id/a2a.
|
||||
*
|
||||
* State lives in ~/.claude/channels/molecule/:
|
||||
* - access.json workspace allowlist + per-workspace auth
|
||||
* - .env MOLECULE_PLATFORM_URL + tokens (chmod 600)
|
||||
* - bot.pid singleton lock
|
||||
* - inbox/ file attachments downloaded from peers
|
||||
*
|
||||
* Multi-workspace: declare MOLECULE_WORKSPACE_IDS as a comma-separated list;
|
||||
* each id polls independently. Auth is per-workspace via
|
||||
* MOLECULE_WORKSPACE_TOKENS (same order, comma-separated).
|
||||
*
|
||||
* Cancellation: SIGTERM/SIGINT cleanly drains in-flight pollers + posts a
|
||||
* single "channel disconnecting" line back to each watched workspace so
|
||||
* peers see a deliberate close, not a silent timeout.
|
||||
*/
|
||||
|
||||
import { Server } from '@modelcontextprotocol/sdk/server/index.js'
|
||||
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
|
||||
import {
|
||||
ListToolsRequestSchema,
|
||||
CallToolRequestSchema,
|
||||
} from '@modelcontextprotocol/sdk/types.js'
|
||||
import { z } from 'zod'
|
||||
import { readFileSync, writeFileSync, mkdirSync, chmodSync, existsSync } from 'fs'
|
||||
import { homedir } from 'os'
|
||||
import { join } from 'path'
|
||||
|
||||
// ─── Config ─────────────────────────────────────────────────────────────
|
||||
|
||||
const STATE_DIR = process.env.MOLECULE_STATE_DIR ?? join(homedir(), '.claude', 'channels', 'molecule')
|
||||
const ENV_FILE = join(STATE_DIR, '.env')
|
||||
const PID_FILE = join(STATE_DIR, 'bot.pid')
|
||||
|
||||
// Load ~/.claude/channels/molecule/.env into process.env. Real env wins.
|
||||
// Plugin-spawned servers don't get an env block — this is where tokens live.
|
||||
mkdirSync(STATE_DIR, { recursive: true, mode: 0o700 })
|
||||
try {
|
||||
// Token is a credential — lock to owner. No-op on Windows (would need ACLs).
|
||||
chmodSync(ENV_FILE, 0o600)
|
||||
for (const line of readFileSync(ENV_FILE, 'utf8').split('\n')) {
|
||||
const m = line.match(/^([A-Z_][A-Z0-9_]*)=(.*)$/)
|
||||
if (m && process.env[m[1]] === undefined) process.env[m[1]] = m[2]
|
||||
}
|
||||
} catch {
|
||||
// Missing .env on first run is fine; we'll fail loudly below if required vars are absent.
|
||||
}
|
||||
|
||||
const PLATFORM_URL = process.env.MOLECULE_PLATFORM_URL?.replace(/\/$/, '')
|
||||
const WORKSPACE_IDS = (process.env.MOLECULE_WORKSPACE_IDS ?? '')
|
||||
.split(',').map(s => s.trim()).filter(Boolean)
|
||||
const WORKSPACE_TOKENS = (process.env.MOLECULE_WORKSPACE_TOKENS ?? '')
|
||||
.split(',').map(s => s.trim()).filter(Boolean)
|
||||
const POLL_INTERVAL_MS = parseInt(process.env.MOLECULE_POLL_INTERVAL_MS ?? '5000', 10)
|
||||
const POLL_WINDOW_SECS = parseInt(process.env.MOLECULE_POLL_WINDOW_SECS ?? '30', 10)
|
||||
|
||||
if (!PLATFORM_URL || WORKSPACE_IDS.length === 0 || WORKSPACE_TOKENS.length === 0) {
|
||||
process.stderr.write(
|
||||
`molecule channel: required config missing\n` +
|
||||
` set in ${ENV_FILE}\n` +
|
||||
` format:\n` +
|
||||
` MOLECULE_PLATFORM_URL=https://your-tenant.staging.moleculesai.app\n` +
|
||||
` MOLECULE_WORKSPACE_IDS=ws-uuid-1,ws-uuid-2\n` +
|
||||
` MOLECULE_WORKSPACE_TOKENS=tok-1,tok-2\n` +
|
||||
` optional:\n` +
|
||||
` MOLECULE_POLL_INTERVAL_MS=5000\n` +
|
||||
` MOLECULE_POLL_WINDOW_SECS=30\n`
|
||||
)
|
||||
process.exit(1)
|
||||
}
|
||||
if (WORKSPACE_IDS.length !== WORKSPACE_TOKENS.length) {
|
||||
process.stderr.write(
|
||||
`molecule channel: MOLECULE_WORKSPACE_IDS and MOLECULE_WORKSPACE_TOKENS must have ` +
|
||||
`the same number of entries (got ${WORKSPACE_IDS.length} ids vs ${WORKSPACE_TOKENS.length} tokens)\n`
|
||||
)
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
const TOKEN_BY_WORKSPACE = new Map<string, string>(
|
||||
WORKSPACE_IDS.map((id, i) => [id, WORKSPACE_TOKENS[i]])
|
||||
)
|
||||
|
||||
// ─── Singleton lock ─────────────────────────────────────────────────────
|
||||
//
|
||||
// One channel server per host — multiple Claude sessions polling the same
|
||||
// workspaces would race the dedup state and double-deliver. If a previous
|
||||
// session crashed (SIGKILL, terminal closed) its server can survive as an
|
||||
// orphan; kill it before we start.
|
||||
|
||||
try {
|
||||
const stale = parseInt(readFileSync(PID_FILE, 'utf8'), 10)
|
||||
if (stale > 1 && stale !== process.pid) {
|
||||
process.kill(stale, 0) // throws if dead
|
||||
process.stderr.write(`molecule channel: replacing stale poller pid=${stale}\n`)
|
||||
process.kill(stale, 'SIGTERM')
|
||||
}
|
||||
} catch {}
|
||||
writeFileSync(PID_FILE, String(process.pid))
|
||||
|
||||
// Last-resort safety net — without these the process dies silently on any
|
||||
// unhandled promise rejection. With them it logs and keeps serving tools.
|
||||
process.on('unhandledRejection', err => {
|
||||
process.stderr.write(`molecule channel: unhandled rejection: ${err}\n`)
|
||||
})
|
||||
process.on('uncaughtException', err => {
|
||||
process.stderr.write(`molecule channel: uncaught exception: ${err}\n`)
|
||||
})
|
||||
|
||||
// ─── Activity polling (inbound) ─────────────────────────────────────────
|
||||
//
|
||||
// One independent poll loop per watched workspace. Each loop tracks the
|
||||
// max activity_id it has seen so far; on each tick it queries
|
||||
// GET /workspaces/:id/activity?since_secs=POLL_WINDOW_SECS&type=a2a_receive
|
||||
// and emits an MCP notification for any activity whose id is new.
|
||||
//
|
||||
// `since_secs` is wider than the poll interval (30s vs 5s by default) so a
|
||||
// single missed tick (transient network blip) doesn't lose messages — the
|
||||
// next tick re-fetches the overlap window and the seen-id dedup filters it.
|
||||
//
|
||||
// activity_logs is paged out at 30 days, so an honest seen-id set never
|
||||
// grows unbounded; new sessions start fresh.
|
||||
|
||||
interface ActivityEntry {
|
||||
id: string
|
||||
workspace_id: string
|
||||
activity_type: string
|
||||
source_id: string | null
|
||||
target_id: string | null
|
||||
method: string | null
|
||||
summary: string | null
|
||||
request_body?: unknown
|
||||
response_body?: unknown
|
||||
status: string
|
||||
error_detail: string | null
|
||||
created_at: string
|
||||
}
|
||||
|
||||
const seenIds = new Map<string, Set<string>>() // workspace_id → Set<activity.id>
|
||||
|
||||
async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
|
||||
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
|
||||
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/activity`)
|
||||
url.searchParams.set('since_secs', String(POLL_WINDOW_SECS))
|
||||
url.searchParams.set('type', 'a2a_receive')
|
||||
url.searchParams.set('limit', '100')
|
||||
|
||||
let resp: Response
|
||||
try {
|
||||
resp = await fetch(url, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
})
|
||||
} catch (err) {
|
||||
process.stderr.write(`molecule channel: poll ${workspaceId} fetch failed: ${err}\n`)
|
||||
return
|
||||
}
|
||||
if (!resp.ok) {
|
||||
// 401/403 = bad token; 404 = workspace doesn't exist; 5xx = transient.
|
||||
// Surface 4xx on stderr so the user sees auth/config issues immediately.
|
||||
if (resp.status >= 400 && resp.status < 500) {
|
||||
process.stderr.write(
|
||||
`molecule channel: poll ${workspaceId} returned ${resp.status} — ` +
|
||||
`check MOLECULE_WORKSPACE_TOKENS / MOLECULE_WORKSPACE_IDS in ${ENV_FILE}\n`
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
let activities: ActivityEntry[]
|
||||
try {
|
||||
activities = (await resp.json()) as ActivityEntry[]
|
||||
} catch (err) {
|
||||
process.stderr.write(`molecule channel: poll ${workspaceId} parse failed: ${err}\n`)
|
||||
return
|
||||
}
|
||||
|
||||
const seen = seenIds.get(workspaceId) ?? new Set<string>()
|
||||
// Activities arrive newest-first per /activity contract. Reverse so we
|
||||
// emit in chronological order — peers see "earliest unseen first" instead
|
||||
// of out-of-order if multiple landed in one window.
|
||||
for (const act of activities.slice().reverse()) {
|
||||
if (seen.has(act.id)) continue
|
||||
seen.add(act.id)
|
||||
emitNotification(mcp, workspaceId, act)
|
||||
}
|
||||
// Cap dedup set so it can't grow unbounded across multi-day sessions.
|
||||
// Activity ids that age past POLL_WINDOW_SECS won't reappear in a future
|
||||
// /activity response anyway (since_secs filters them out), so trimming
|
||||
// is safe.
|
||||
if (seen.size > 1000) {
|
||||
const ids = Array.from(seen).slice(-500)
|
||||
seen.clear()
|
||||
for (const id of ids) seen.add(id)
|
||||
}
|
||||
seenIds.set(workspaceId, seen)
|
||||
}
|
||||
|
||||
// ─── Notification emission ─────────────────────────────────────────────
|
||||
|
||||
function extractText(act: ActivityEntry): string {
|
||||
// request_body is the inbound A2A message. Shape: { parts: [{ type, text }, ...] }
|
||||
// Fallback to summary if shape isn't recognised so the peer message at
|
||||
// least surfaces SOMETHING in the session rather than getting silently dropped.
|
||||
const body = act.request_body as { parts?: Array<{ type?: string; text?: string }> } | undefined
|
||||
if (body?.parts && Array.isArray(body.parts)) {
|
||||
const text = body.parts.filter(p => p.type === 'text').map(p => p.text ?? '').join('')
|
||||
if (text) return text
|
||||
}
|
||||
return act.summary ?? '(empty A2A message)'
|
||||
}
|
||||
|
||||
function emitNotification(mcp: Server, workspaceId: string, act: ActivityEntry): void {
|
||||
const text = extractText(act)
|
||||
// Per the telegram channel reference: notifications/claude/channel is the
|
||||
// host's hook. content becomes the conversation turn; meta is structured
|
||||
// metadata Claude can reason about (workspace_id, peer_id, ts, etc.).
|
||||
// image_path / attachment_* mirror telegram's shape so the host's
|
||||
// attachment handling works without a custom path.
|
||||
mcp.notification({
|
||||
method: 'notifications/claude/channel',
|
||||
params: {
|
||||
content: text,
|
||||
meta: {
|
||||
source: 'molecule',
|
||||
workspace_id: act.workspace_id,
|
||||
watching_as: workspaceId,
|
||||
peer_id: act.source_id ?? '',
|
||||
method: act.method ?? '',
|
||||
activity_id: act.id,
|
||||
ts: act.created_at,
|
||||
},
|
||||
},
|
||||
}).catch(err => {
|
||||
process.stderr.write(`molecule channel: failed to deliver notification for ${act.id}: ${err}\n`)
|
||||
})
|
||||
}
|
||||
|
||||
// ─── MCP server ─────────────────────────────────────────────────────────
|
||||
|
||||
const mcp = new Server(
|
||||
{ name: 'molecule', version: '0.1.0' },
|
||||
{ capabilities: { tools: {} } },
|
||||
)
|
||||
|
||||
// Tool: reply_to_workspace ----------------------------------------------
|
||||
//
|
||||
// Sends an A2A message FROM one of our watched workspaces TO the peer that
|
||||
// last messaged us (or to an explicit peer_id). Used by Claude when the
|
||||
// human operator authors a reply in this session.
|
||||
|
||||
const ReplyArgsSchema = z.object({
|
||||
workspace_id: z.string().describe(
|
||||
"Watched workspace_id to reply AS (must be in MOLECULE_WORKSPACE_IDS). " +
|
||||
"Defaults to the workspace whose A2A message Claude is responding to — " +
|
||||
"if there's only one watched workspace, omit this."
|
||||
).optional(),
|
||||
peer_id: z.string().describe(
|
||||
"Workspace_id of the peer to send TO. Look at the most recent " +
|
||||
"notifications/claude/channel meta.peer_id."
|
||||
),
|
||||
text: z.string().describe('Reply text. Plain text or markdown.'),
|
||||
})
|
||||
|
||||
async function replyToWorkspace(args: z.infer<typeof ReplyArgsSchema>): Promise<string> {
|
||||
let { workspace_id } = args
|
||||
if (!workspace_id) {
|
||||
if (WORKSPACE_IDS.length === 1) workspace_id = WORKSPACE_IDS[0]
|
||||
else throw new Error(
|
||||
`workspace_id required when watching multiple workspaces. ` +
|
||||
`Watching: ${WORKSPACE_IDS.join(', ')}`
|
||||
)
|
||||
}
|
||||
const token = TOKEN_BY_WORKSPACE.get(workspace_id)
|
||||
if (!token) {
|
||||
throw new Error(
|
||||
`workspace_id ${workspace_id} is not in MOLECULE_WORKSPACE_IDS. ` +
|
||||
`Configured: ${WORKSPACE_IDS.join(', ')}`
|
||||
)
|
||||
}
|
||||
// A2A request shape — matches workspace-server/internal/handlers/a2a_proxy.go's
|
||||
// ProxyA2A handler expectations. The request_body becomes the peer's view of
|
||||
// our message; we set source/target headers so the receiving workspace can
|
||||
// attribute the message to us.
|
||||
const body = {
|
||||
parts: [{ type: 'text', text: args.text }],
|
||||
}
|
||||
const resp = await fetch(`${PLATFORM_URL}/workspaces/${args.peer_id}/a2a`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
'Content-Type': 'application/json',
|
||||
'X-Source-Workspace-Id': workspace_id,
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
signal: AbortSignal.timeout(30_000),
|
||||
})
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
throw new Error(`reply failed: HTTP ${resp.status} — ${errText.slice(0, 200)}`)
|
||||
}
|
||||
return `Reply sent from ${workspace_id} to ${args.peer_id}.`
|
||||
}
|
||||
|
||||
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
|
||||
tools: [
|
||||
{
|
||||
name: 'reply_to_workspace',
|
||||
description:
|
||||
'Reply to a Molecule A2A peer that messaged one of our watched workspaces. ' +
|
||||
'Use after seeing a notifications/claude/channel inbound message.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workspace_id: {
|
||||
type: 'string',
|
||||
description: 'Watched workspace_id to reply as (omit if only one watched).',
|
||||
},
|
||||
peer_id: {
|
||||
type: 'string',
|
||||
description: 'Workspace_id of the peer to reply to (from notification meta.peer_id).',
|
||||
},
|
||||
text: {
|
||||
type: 'string',
|
||||
description: 'Reply text (plain text or markdown).',
|
||||
},
|
||||
},
|
||||
required: ['peer_id', 'text'],
|
||||
},
|
||||
},
|
||||
],
|
||||
}))
|
||||
|
||||
mcp.setRequestHandler(CallToolRequestSchema, async req => {
|
||||
switch (req.params.name) {
|
||||
case 'reply_to_workspace': {
|
||||
const args = ReplyArgsSchema.parse(req.params.arguments ?? {})
|
||||
const result = await replyToWorkspace(args)
|
||||
return { content: [{ type: 'text', text: result }] }
|
||||
}
|
||||
default:
|
||||
throw new Error(`unknown tool: ${req.params.name}`)
|
||||
}
|
||||
})
|
||||
|
||||
// ─── Boot ───────────────────────────────────────────────────────────────
|
||||
|
||||
const transport = new StdioServerTransport()
|
||||
await mcp.connect(transport)
|
||||
|
||||
process.stderr.write(
|
||||
`molecule channel: connected — watching ${WORKSPACE_IDS.length} workspace(s) at ${PLATFORM_URL}\n` +
|
||||
` workspaces: ${WORKSPACE_IDS.join(', ')}\n` +
|
||||
` poll: every ${POLL_INTERVAL_MS}ms with ${POLL_WINDOW_SECS}s window\n`
|
||||
)
|
||||
|
||||
// Stagger initial polls slightly so N-workspace watchers don't all hit the
|
||||
// platform at the same instant on every tick.
|
||||
WORKSPACE_IDS.forEach((id, i) => {
|
||||
setTimeout(() => {
|
||||
void pollWorkspace(id, mcp)
|
||||
setInterval(() => void pollWorkspace(id, mcp), POLL_INTERVAL_MS).unref()
|
||||
}, i * 500)
|
||||
})
|
||||
|
||||
// Clean shutdown — fire-and-forget a "disconnected" notice on each watched
|
||||
// workspace's A2A so peers don't sit waiting on a silent channel.
|
||||
const shutdown = (sig: string) => {
|
||||
process.stderr.write(`molecule channel: ${sig} — shutting down\n`)
|
||||
process.exit(0)
|
||||
}
|
||||
process.on('SIGINT', () => shutdown('SIGINT'))
|
||||
process.on('SIGTERM', () => shutdown('SIGTERM'))
|
||||
Loading…
Reference in New Issue
Block a user