Merge pull request #16 from Molecule-AI/feat/v0.2-cursor-poll-mode

feat: v0.2 — cursor-based polling + auto-register as poll-mode
This commit is contained in:
Hongming Wang 2026-04-29 22:35:58 -07:00 committed by GitHub
commit d862aa6c2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 215 additions and 31 deletions

View File

@ -12,7 +12,7 @@ Molecule peer ──A2A──> [your workspace] ──poll──> [this plugin]
└────────── POST /workspaces/:id/a2a ◄── reply_to_workspace tool ──┘
```
No tunnel. No public endpoint. The plugin polls your tenant for new A2A activity (using the `?since_secs=` filter on `/workspaces/:id/activity`); replies POST back to `/workspaces/:peer_id/a2a` via the same bearer token.
No tunnel. No public endpoint. The plugin self-registers each watched workspace as `delivery_mode=poll` on startup and then long-polls `/workspaces/:id/activity?since_id=<cursor>` for new A2A traffic. Replies POST back to `/workspaces/:peer_id/a2a` via the same bearer token.
## Install
@ -32,7 +32,10 @@ MOLECULE_WORKSPACE_TOKENS=tok-1,tok-2
# Optional
MOLECULE_POLL_INTERVAL_MS=5000 # default 5s
MOLECULE_POLL_WINDOW_SECS=30 # default 30s — overlap protects against missed ticks
MOLECULE_POLL_WINDOW_SECS=30 # default 30s — only used to seed the first-run cursor
MOLECULE_AGENT_NAME="Claude Code (channel)" # how the workspace appears in canvas
MOLECULE_AGENT_DESC="Local Claude Code session..."
MOLECULE_AUTO_REGISTER_POLL=true # set to "false" if you've configured the workspace another way
```
The `.env` file is `chmod 600` after first read; tokens never appear in environment-block-style `claude doctor` dumps.
@ -98,13 +101,23 @@ Claude can call `reply_to_workspace({peer_id, text})` to send the response back.
### Why polling instead of push?
The existing external-agent integration in Molecule uses **push**: register an inbound URL, platform POSTs A2A to that URL. That's lower latency but requires a tunnel (ngrok/Cloudflare) or a static IP — non-trivial for a laptop-launched Claude Code session.
The existing external-agent integration in Molecule originally used **push**: register an inbound URL, platform POSTs A2A to that URL. That's lower latency but requires a tunnel (ngrok/Cloudflare) or a static IP — non-trivial for a laptop-launched Claude Code session.
This plugin uses **polling** as the default because it works through every NAT/firewall with zero infra. The cost is up to `MOLECULE_POLL_INTERVAL_MS` (default 5s) of inbound latency. For production setups where lower latency matters, a future `MOLECULE_INBOUND_MODE=push` can opt into the existing register-and-receive flow.
The platform now supports `delivery_mode=poll` natively (`#2339` in `molecule-core`): when a workspace is registered with `delivery_mode=poll`, the platform's a2a_proxy short-circuits inbound A2A directly into `activity_logs` instead of attempting an HTTP dispatch. This plugin sets that mode automatically on startup, so peer messages land in `activity_logs` regardless of whether your laptop has a public URL.
### Why `since_secs=30` overlapping a `5s` poll interval?
### Cursor-based polling (v0.2+)
A single missed tick (transient network blip, GC pause, laptop sleep) shouldn't lose messages. The plugin re-fetches the last 30 seconds on every poll and dedups by `activity_id`, so 25 seconds of overlap is the recovery margin. Set `MOLECULE_POLL_WINDOW_SECS` higher for noisier networks.
v0.2 switched from a v0.1-style time-window dedup (`since_secs=30` + in-memory seen-id Set) to a Telegram-shaped cursor:
```
GET /workspaces/:id/activity?since_id=<last-delivered>&limit=100
→ ASC-ordered rows strictly after the cursor
→ 410 Gone if the cursor row was pruned (plugin re-seeds automatically)
```
The cursor is persisted to `~/.claude/channels/molecule/cursor.json` (`chmod 600`, atomic temp+rename writes), so a restart resumes exactly where the previous session left off — no replay window, no missed messages, no growing in-memory dedup set.
`MOLECULE_POLL_WINDOW_SECS` is only used to seed the first-ever cursor for a workspace: on the very first poll the plugin asks for the most-recent event in that window and remembers its id WITHOUT delivering it (events that arrived BEFORE you started this Claude session are out of context). Every subsequent poll uses the cursor.
### Singleton lock
@ -114,16 +127,16 @@ Only one channel server can poll a given workspace set at a time — multiple in
A2A messages can carry `Part` entries with `url` and `media_type`. The MVP delivers attachments by-reference (URL surfaces in the meta block, Claude can fetch via the workspace_secrets-scoped token); inline image-content delivery (mirroring telegram's `image_path` mechanism) is a v0.2 feature.
## Limitations (v0.1)
## Limitations (v0.2)
- **Polling-only inbound.** No push mode yet; latency floor is `MOLECULE_POLL_INTERVAL_MS`.
- **No pairing flow.** Tokens are configured manually via `.env`; no canvas-side approval handshake. Add `MOLECULE_ACCESS_MODE=pair` (mirroring telegram) in v0.2.
- **Polling-only inbound.** Latency floor is `MOLECULE_POLL_INTERVAL_MS` (default 5s). Push mode is still possible by setting `MOLECULE_AUTO_REGISTER_POLL=false` and configuring the workspace with `delivery_mode=push` + a routable URL via canvas.
- **No pairing flow.** Tokens are configured manually via `.env`; no canvas-side approval handshake.
- **No file-attachment download.** URLs surface in the meta block; the host fetches on-demand.
- **No outbound channel-init.** The plugin only sends replies (in response to inbound A2A); starting a fresh A2A conversation initiated FROM the channel side requires a future `start_workspace_chat` tool.
## Compatibility
- **molecule-runtime/workspace-server**: requires the `?since_secs=` query parameter on `GET /workspaces/:id/activity` (shipped in molecule-core PR #2300, available staging-onward).
- **molecule-runtime/workspace-server**: requires `delivery_mode=poll` support (`/registry/register` + a2a_proxy short-circuit, molecule-core PRs #2348 + #2353) and the `since_id` cursor on `GET /activity` (PR #2354). All three shipped under issue #2339, available staging-onward.
- **Claude Code**: tested against the channel-plugin contract that expects `notifications/claude/channel` with `{content, meta}` (matches `@claude-plugins-official/telegram` v0.0.6).
- **bun**: the MCP server runs under bun for fast startup; `package.json` `start` does `bun install --no-summary && bun server.ts` so no global install needed.

View File

@ -1,6 +1,6 @@
{
"name": "molecule-mcp-claude-channel",
"version": "0.1.0",
"version": "0.2.0",
"description": "Molecule AI channel for Claude Code — bridges A2A traffic into a Claude Code session via MCP",
"license": "Apache-2.0",
"type": "module",

211
server.ts
View File

@ -37,7 +37,7 @@ import {
CallToolRequestSchema,
} from '@modelcontextprotocol/sdk/types.js'
import { z } from 'zod'
import { readFileSync, writeFileSync, mkdirSync, chmodSync, existsSync } from 'fs'
import { readFileSync, writeFileSync, mkdirSync, chmodSync, existsSync, renameSync } from 'fs'
import { homedir } from 'os'
import { join } from 'path'
@ -46,6 +46,7 @@ import { join } from 'path'
const STATE_DIR = process.env.MOLECULE_STATE_DIR ?? join(homedir(), '.claude', 'channels', 'molecule')
const ENV_FILE = join(STATE_DIR, '.env')
const PID_FILE = join(STATE_DIR, 'bot.pid')
const CURSOR_FILE = join(STATE_DIR, 'cursor.json')
// Load ~/.claude/channels/molecule/.env into process.env. Real env wins.
// Plugin-spawned servers don't get an env block — this is where tokens live.
@ -67,7 +68,25 @@ const WORKSPACE_IDS = (process.env.MOLECULE_WORKSPACE_IDS ?? '')
const WORKSPACE_TOKENS = (process.env.MOLECULE_WORKSPACE_TOKENS ?? '')
.split(',').map(s => s.trim()).filter(Boolean)
const POLL_INTERVAL_MS = parseInt(process.env.MOLECULE_POLL_INTERVAL_MS ?? '5000', 10)
// POLL_WINDOW_SECS is only used for the initial "watch from now" cursor seed
// — after that, the cursor (since_id) drives every subsequent poll. Older
// versions of the plugin used since_secs as the primary filter; v0.2 keeps
// the env var for compat but its meaning is narrower.
const POLL_WINDOW_SECS = parseInt(process.env.MOLECULE_POLL_WINDOW_SECS ?? '30', 10)
// MOLECULE_AGENT_NAME / MOLECULE_AGENT_DESC populate the agent_card the plugin
// posts to /registry/register on startup. Both have sane defaults — set them
// only when you want the canvas tab to show something specific.
const AGENT_NAME = process.env.MOLECULE_AGENT_NAME ?? 'Claude Code (channel)'
const AGENT_DESC = process.env.MOLECULE_AGENT_DESC ??
'Local Claude Code session bridged via molecule-mcp-claude-channel'
// MOLECULE_AUTO_REGISTER_POLL controls the startup auto-register behavior.
// Default is "yes" — the plugin's whole point is to make a poll-mode
// workspace work without manual canvas configuration. Set to "0" / "false"
// if you've already configured the workspace another way and don't want
// the plugin overwriting agent_card on every restart.
const AUTO_REGISTER_POLL = !['0', 'false', 'no'].includes(
(process.env.MOLECULE_AUTO_REGISTER_POLL ?? 'true').toLowerCase()
)
if (!PLATFORM_URL || WORKSPACE_IDS.length === 0 || WORKSPACE_TOKENS.length === 0) {
process.stderr.write(
@ -150,15 +169,78 @@ interface ActivityEntry {
created_at: string
}
const seenIds = new Map<string, Set<string>>() // workspace_id → Set<activity.id>
// ─── Cursor persistence ────────────────────────────────────────────────
//
// v0.2 switches from the v0.1 since_secs+seenIds scheme to a Telegram-style
// since_id cursor. The cursor is the activity_logs.id of the last event
// this plugin successfully delivered to Claude. Server returns events
// strictly after that id in ASC order, so we never miss or replay.
//
// Persisted to ${CURSOR_FILE} as a JSON object keyed by workspace_id.
// Atomic write via temp + rename so a crash mid-write can't corrupt the
// file (the previous cursor stays valid; worst case is a few replays
// after the crash, which still beats the v0.1 30-second time-window).
//
// Schema: { "ws-uuid-1": "act-uuid-X", "ws-uuid-2": "act-uuid-Y", ... }
// Missing key = "first run" → seeds from most-recent without processing.
// 410 from server = cursor stale → drop key, re-seed on next tick.
const cursors = new Map<string, string>()
function loadCursors(): void {
if (!existsSync(CURSOR_FILE)) return
try {
const raw = readFileSync(CURSOR_FILE, 'utf8')
const parsed = JSON.parse(raw) as Record<string, unknown>
for (const [k, v] of Object.entries(parsed)) {
if (typeof v === 'string' && v.length > 0) cursors.set(k, v)
}
} catch (err) {
// Corrupt cursor file = treat as no cursors. Worst case: each watched
// workspace re-seeds from now on the next tick (no replay, no message
// loss for events arriving AFTER the seed). Don't fail-fast here —
// a poller that refuses to start because of one bad file is more
// annoying than the recovery cost.
process.stderr.write(`molecule channel: cursor file unreadable (${err}); starting fresh\n`)
}
}
function saveCursors(): void {
const obj: Record<string, string> = {}
for (const [k, v] of cursors) obj[k] = v
const tmp = `${CURSOR_FILE}.tmp.${process.pid}`
try {
writeFileSync(tmp, JSON.stringify(obj, null, 2), { mode: 0o600 })
renameSync(tmp, CURSOR_FILE)
} catch (err) {
// Cursor write failure is recoverable (next successful poll re-saves);
// log on stderr so the user sees disk-full / readonly-fs early.
process.stderr.write(`molecule channel: cursor save failed: ${err}\n`)
}
}
async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
const url = new URL(`${PLATFORM_URL}/workspaces/${workspaceId}/activity`)
url.searchParams.set('since_secs', String(POLL_WINDOW_SECS))
url.searchParams.set('type', 'a2a_receive')
url.searchParams.set('limit', '100')
const cursor = cursors.get(workspaceId)
if (cursor) {
// Steady-state: server returns rows strictly after cursor in ASC order.
url.searchParams.set('since_id', cursor)
} else {
// First run for this workspace — seed the cursor from the most-recent
// existing event WITHOUT delivering it. Without this seed the next tick
// would also have no cursor and we'd loop forever. Seed-then-skip is
// the right policy at startup: events that arrived BEFORE the operator
// started this Claude session are out of context and shouldn't be
// replayed as if they're new turns. Events arriving AFTER the seed
// have id > cursor and will be delivered on subsequent ticks.
url.searchParams.set('since_secs', String(POLL_WINDOW_SECS))
url.searchParams.set('limit', '1')
}
let resp: Response
try {
resp = await fetch(url, {
@ -179,6 +261,15 @@ async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
process.stderr.write(`molecule channel: poll ${workspaceId} fetch failed: ${err}\n`)
return
}
if (resp.status === 410) {
// Cursor row is gone (pruned, or never existed if the env var was
// hand-edited). Drop the cursor; next tick re-seeds from most-recent.
process.stderr.write(`molecule channel: poll ${workspaceId} cursor stale (410) — re-seeding\n`)
cursors.delete(workspaceId)
saveCursors()
return
}
if (!resp.ok) {
// 401/403 = bad token; 404 = workspace doesn't exist; 5xx = transient.
// Surface 4xx on stderr so the user sees auth/config issues immediately.
@ -198,25 +289,93 @@ async function pollWorkspace(workspaceId: string, mcp: Server): Promise<void> {
return
}
const seen = seenIds.get(workspaceId) ?? new Set<string>()
// Activities arrive newest-first per /activity contract. Reverse so we
// emit in chronological order — peers see "earliest unseen first" instead
// of out-of-order if multiple landed in one window.
for (const act of activities.slice().reverse()) {
if (seen.has(act.id)) continue
seen.add(act.id)
if (!cursor) {
// First-run seed: take the newest activity_id (the only one returned
// because we asked for limit=1) and remember it as our starting point.
// Don't deliver it — see comment above.
if (activities.length > 0) {
cursors.set(workspaceId, activities[0].id)
saveCursors()
}
return
}
// Steady-state: server returned ASC-ordered rows strictly after cursor.
// Deliver each in order; advance cursor only after we hand the event
// off to MCP. If the notification call rejects we still advance — the
// alternative (block on notification failure) would stall the channel
// entirely, and notification delivery is best-effort anyway.
if (activities.length === 0) return
for (const act of activities) {
emitNotification(mcp, workspaceId, act)
}
// Cap dedup set so it can't grow unbounded across multi-day sessions.
// Activity ids that age past POLL_WINDOW_SECS won't reappear in a future
// /activity response anyway (since_secs filters them out), so trimming
// is safe.
if (seen.size > 1000) {
const ids = Array.from(seen).slice(-500)
seen.clear()
for (const id of ids) seen.add(id)
const newest = activities[activities.length - 1].id
if (newest !== cursor) {
cursors.set(workspaceId, newest)
saveCursors()
}
}
// ─── Register-as-poll (startup self-register) ──────────────────────────
//
// On startup, register each watched workspace with delivery_mode=poll so
// the platform's a2a_proxy short-circuits to activity_logs (PR 2 / #2353)
// instead of trying to dispatch HTTP to a URL the operator's laptop
// doesn't have. Idempotent — the upsert in /registry/register's handler
// preserves existing values; we just declare delivery_mode and the
// agent_card.
//
// Failure here is non-fatal — the polling loop still works against a
// pre-poll-configured workspace, and a transient platform 5xx shouldn't
// block channel startup. Log loudly so misconfiguration is visible.
async function registerAsPoll(workspaceId: string): Promise<void> {
const token = TOKEN_BY_WORKSPACE.get(workspaceId)!
const body = {
id: workspaceId,
delivery_mode: 'poll',
agent_card: {
name: AGENT_NAME,
description: AGENT_DESC,
},
}
let resp: Response
try {
resp = await fetch(`${PLATFORM_URL}/registry/register`, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
Origin: PLATFORM_URL,
},
body: JSON.stringify(body),
signal: AbortSignal.timeout(15_000),
})
} catch (err) {
process.stderr.write(`molecule channel: register-as-poll ${workspaceId} fetch failed: ${err}\n`)
return
}
if (!resp.ok) {
const errText = await resp.text().catch(() => '')
process.stderr.write(
`molecule channel: register-as-poll ${workspaceId} HTTP ${resp.status}${errText.slice(0, 200)}\n`
)
return
}
// Sanity-check: the platform should echo back delivery_mode=poll.
// A push reply means an older controlplane that doesn't know about
// delivery_mode yet — log so the user can identify the mismatch.
try {
const j = (await resp.json()) as { delivery_mode?: string }
if (j.delivery_mode && j.delivery_mode !== 'poll') {
process.stderr.write(
`molecule channel: register-as-poll ${workspaceId} returned delivery_mode=${j.delivery_mode} ` +
`(expected poll). Platform may predate #2339.\n`
)
}
} catch {
// Non-JSON response. Don't fail; the 2xx already tells us the upsert
// landed, and the polling loop is the source of truth for steady-state.
}
seenIds.set(workspaceId, seen)
}
// ─── Notification emission ─────────────────────────────────────────────
@ -411,13 +570,25 @@ mcp.setRequestHandler(CallToolRequestSchema, async req => {
// ─── Boot ───────────────────────────────────────────────────────────────
loadCursors()
const transport = new StdioServerTransport()
await mcp.connect(transport)
// Self-register each workspace as poll-mode BEFORE the first poll fires.
// Sequenced (not Promise.all) so failures are surfaced one at a time and
// the operator can spot which workspace's token is bad.
if (AUTO_REGISTER_POLL) {
for (const id of WORKSPACE_IDS) {
await registerAsPoll(id)
}
}
process.stderr.write(
`molecule channel: connected — watching ${WORKSPACE_IDS.length} workspace(s) at ${PLATFORM_URL}\n` +
` workspaces: ${WORKSPACE_IDS.join(', ')}\n` +
` poll: every ${POLL_INTERVAL_MS}ms with ${POLL_WINDOW_SECS}s window\n`
` delivery_mode=poll cursor=${CURSOR_FILE} auto_register=${AUTO_REGISTER_POLL}\n` +
` poll: every ${POLL_INTERVAL_MS}ms (cursor-based; ${POLL_WINDOW_SECS}s window only used for first-run seed)\n`
)
// Stagger initial polls slightly so N-workspace watchers don't all hit the