From b3b79a5efcd173dc8c93aa155ed03380ddc532bc Mon Sep 17 00:00:00 2001 From: "claude-ceo-assistant (Claude Opus 4.7 on Hongming's MacBook)" Date: Thu, 7 May 2026 08:28:24 -0700 Subject: [PATCH] fix(presence): POST /registry/heartbeat ticker so canvas badge stays online (closes #6, closes molecule-core#24) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit v0.4.0-gitea.1 polled /workspaces/:id/activity but never sent /registry/heartbeat. The platform's healthsweep (workspace-server/internal/registry/healthsweep.go) flipped any runtime='external' workspace whose last_heartbeat_at was older than 90s back to status='awaiting_agent', so the canvas presence badge stuck on awaiting_agent within 90s of plugin start even while A2A traffic flowed fine over the long-poll loop. Fix: per-workspace heartbeat ticker (default 30s, three ticks inside the 90s stale window) POSTs the minimal HeartbeatPayload shape (workspace_id only) — same path the Python runtime in workspace/heartbeat.py uses when it has nothing else to report. heartbeat.ts pure module + Bun.serve fixture test pin the wire shape (POST + bearer + Origin + workspace_id body) so a future refactor that drops any of those silently re-breaks the badge. Bump 0.4.0-gitea.1 → 0.4.0-gitea.2 and document MOLECULE_HEARTBEAT_INTERVAL_MS in README. Co-Authored-By: Claude Opus 4.7 (1M context) --- .claude-plugin/marketplace.json | 2 +- .claude-plugin/plugin.json | 2 +- README.md | 1 + heartbeat.test.ts | 172 ++++++++++++++++++++++++++++++++ heartbeat.ts | 109 ++++++++++++++++++++ package.json | 2 +- server.ts | 56 ++++++++++- 7 files changed, 340 insertions(+), 4 deletions(-) create mode 100644 heartbeat.test.ts create mode 100644 heartbeat.ts diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index c6beb49..d5b77c9 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -13,7 +13,7 @@ "url": "https://git.moleculesai.app/molecule-ai/molecule-mcp-claude-channel.git" }, "description": "Bridges Molecule A2A traffic into a Claude Code session via MCP. Subscribe to one or more Molecule workspaces; A2A messages from peers surface as conversation turns; replies route back through Molecule's A2A endpoints.", - "version": "0.4.0-gitea.1", + "version": "0.4.0-gitea.2", "homepage": "https://git.moleculesai.app/molecule-ai/molecule-mcp-claude-channel", "license": "Apache-2.0", "keywords": [ diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json index c96c907..895170a 100644 --- a/.claude-plugin/plugin.json +++ b/.claude-plugin/plugin.json @@ -1,7 +1,7 @@ { "name": "molecule", "description": "Molecule AI channel for Claude Code — bridges Molecule A2A traffic into a Claude Code session via MCP. Subscribe to one or more Molecule workspaces; A2A messages from peers surface as conversation turns; replies route back through Molecule's A2A endpoints.", - "version": "0.4.0-gitea.1", + "version": "0.4.0-gitea.2", "keywords": [ "molecule", "molecule-ai", diff --git a/README.md b/README.md index c6d5e41..6e8b231 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ MOLECULE_POLL_WINDOW_SECS=30 # default 30s — only used to seed the first MOLECULE_AGENT_NAME="Claude Code (channel)" # how the workspace appears in canvas MOLECULE_AGENT_DESC="Local Claude Code session..." MOLECULE_AUTO_REGISTER_POLL=true # set to "false" if you've configured the workspace another way +MOLECULE_HEARTBEAT_INTERVAL_MS=30000 # default 30s — keeps the canvas presence badge on "online"; set to 0 to disable ``` The `.env` file is `chmod 600` after first read; tokens never appear in environment-block-style `claude doctor` dumps. diff --git a/heartbeat.test.ts b/heartbeat.test.ts new file mode 100644 index 0000000..ec82288 --- /dev/null +++ b/heartbeat.test.ts @@ -0,0 +1,172 @@ +// heartbeat.test.ts — pin the POST /registry/heartbeat shape against a +// local Bun.serve fixture. Closes #6 / molecule-core#24 — the v0.4.0-gitea.1 +// channel plugin polled /workspaces/:id/activity but never POSTed +// /registry/heartbeat, so the platform's healthsweep flipped the canvas +// presence badge to `awaiting_agent` within 90s of plugin start. +// +// The poll loop is read-only on the platform side (activity.go is a SELECT +// — /workspaces/:id/activity does NOT bump last_heartbeat_at), so without +// a dedicated keepalive POST the row stales out and the badge looks +// offline even while A2A traffic flows fine. +// +// Asserts the actual HTTP wire shape: +// - method = POST +// - path = /registry/heartbeat +// - Authorization: Bearer +// - Content-Type: application/json +// - Origin: (SaaS edge WAF — same as register) +// - body.workspace_id = +// +// Pre-fix code path: heartbeat.ts does not exist. Post-fix: this test +// passes against the real function and would FAIL if a refactor swapped +// POST→GET, dropped the bearer token, renamed workspace_id, or stopped +// drainage on the success path — all of which would silently re-break +// the presence badge or leak sockets. + +import { afterAll, afterEach, beforeAll, describe, expect, it } from 'bun:test' + +import { sendHeartbeat } from './heartbeat.ts' + +interface CapturedRequest { + method: string + pathname: string + headers: Record + body: unknown +} + +let captured: CapturedRequest[] = [] +let nextStatus = 200 +let nextResponseBody: string = '{}' + +const fixture = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url) + let body: unknown = undefined + try { + body = await req.json() + } catch { + body = await req.text().catch(() => undefined) + } + const hdrs: Record = {} + req.headers.forEach((v, k) => { hdrs[k.toLowerCase()] = v }) + captured.push({ method: req.method, pathname: url.pathname, headers: hdrs, body }) + return new Response(nextResponseBody, { + status: nextStatus, + headers: { 'content-type': 'application/json' }, + }) + }, +}) + +const platformUrl = `http://127.0.0.1:${fixture.port}` + +beforeAll(() => { + captured = [] + nextStatus = 200 + nextResponseBody = '{}' +}) + +afterEach(() => { + captured = [] + nextStatus = 200 + nextResponseBody = '{}' +}) + +afterAll(() => { + fixture.stop(true) +}) + +describe('sendHeartbeat — POST /registry/heartbeat shape (closes #6 / molecule-core#24)', () => { + it('POSTs the workspace_id payload with the per-workspace bearer token + Origin header', async () => { + nextStatus = 200 + await sendHeartbeat({ + platformUrl, + workspaceId: 'ws-heartbeat-test-id', + token: 'tok-heartbeat-test', + }) + + expect(captured).toHaveLength(1) + const req = captured[0]! + expect(req.method).toBe('POST') + expect(req.pathname).toBe('/registry/heartbeat') + expect(req.headers['authorization']).toBe('Bearer tok-heartbeat-test') + expect(req.headers['content-type']).toContain('application/json') + // Origin pinned because SaaS edge WAF rewrites /workspaces/* and + // /registry/* to the Next.js front-end without it (per saved memory + // `reference_saas_waf_origin_header.md`). Heartbeat would silently + // 404 on saas tenants without it; pin so a refactor that drops it + // surfaces here, not in production. + expect(req.headers['origin']).toBe(platformUrl) + expect(req.body).toEqual({ workspace_id: 'ws-heartbeat-test-id' }) + }) + + it('does not throw on platform 5xx — logs and returns so the next tick retries', async () => { + nextStatus = 503 + nextResponseBody = 'service unavailable' + const logs: string[] = [] + // sendHeartbeat must not propagate — the setInterval caller relies on + // resolution-not-rejection so a transient platform 503 doesn't kill + // the heartbeat loop for the rest of the plugin's lifetime. + await expect(sendHeartbeat({ + platformUrl, + workspaceId: 'ws-x', + token: 'tok-x', + log: (line) => { logs.push(line) }, + })).resolves.toBeUndefined() + expect(captured).toHaveLength(1) + expect(logs.join('')).toContain('HTTP 503') + expect(logs.join('')).toContain('service unavailable') + }) + + it('does not throw on platform 401 — auth-token revocation surfaces in stderr but does not crash', async () => { + nextStatus = 401 + nextResponseBody = '{"error":"invalid token"}' + const logs: string[] = [] + await expect(sendHeartbeat({ + platformUrl, + workspaceId: 'ws-y', + token: 'tok-revoked', + log: (line) => { logs.push(line) }, + })).resolves.toBeUndefined() + expect(captured).toHaveLength(1) + expect(logs.join('')).toContain('HTTP 401') + }) + + it('does not throw on network error — fetch failure logged, next tick retries', async () => { + const logs: string[] = [] + // Use a port that's almost certainly closed (port 1 is reserved/usually + // unreachable in user space). On any plausible test host the connection + // refuses immediately, surfacing the fetch-failed branch. + await expect(sendHeartbeat({ + platformUrl: 'http://127.0.0.1:1', + workspaceId: 'ws-net', + token: 'tok', + log: (line) => { logs.push(line) }, + timeoutMs: 1_000, + })).resolves.toBeUndefined() + expect(logs.join('')).toContain('fetch failed') + }) + + it('drains the response body on success so connections can be reused', async () => { + // Pre-fix concern: a body-not-drained refactor would leak sockets in + // production over the lifetime of a long-running session. The + // contract the production code relies on is "after sendHeartbeat + // resolves, the body is consumed" — verifiable indirectly by + // observing that a follow-up call still sees a fresh fixture entry. + nextStatus = 200 + nextResponseBody = '{"ok":true,"some":"large-response-body-with-content"}' + await sendHeartbeat({ + platformUrl, + workspaceId: 'ws-1', + token: 'tok-1', + }) + await sendHeartbeat({ + platformUrl, + workspaceId: 'ws-2', + token: 'tok-2', + }) + expect(captured).toHaveLength(2) + expect(captured[0]!.body).toEqual({ workspace_id: 'ws-1' }) + expect(captured[1]!.body).toEqual({ workspace_id: 'ws-2' }) + }) +}) diff --git a/heartbeat.ts b/heartbeat.ts new file mode 100644 index 0000000..e3d8a81 --- /dev/null +++ b/heartbeat.ts @@ -0,0 +1,109 @@ +// heartbeat.ts — POST /registry/heartbeat keepalive that flips the +// canvas presence badge from `awaiting_agent` to `online`. Closes #6 +// and molecule-core#24. +// +// Why this file exists: +// +// The platform's healthsweep (workspace-server's +// internal/registry/healthsweep.go) flips any `runtime='external'` +// workspace whose `last_heartbeat_at` is older than 90s back to +// `status='awaiting_agent'`. The v0.4.0-gitea.1 channel plugin only +// POSTed /registry/register at startup (which DOES bump +// last_heartbeat_at via registry.go:369) but never heartbeated again. +// Within 90s of plugin start the row goes stale, the canvas badge +// flips to `awaiting_agent`, and the workspace looks offline even +// though A2A traffic flows fine over the long-poll loop. +// +// /workspaces/:id/activity GET (the poll loop) is read-only on the +// platform side — it does NOT touch presence. /registry/heartbeat is +// the only endpoint the platform's healthsweep actually watches. +// +// Why a separate module: +// +// server.ts has top-level side effects (PID-file lock, MCP connect, +// compat probe, register-as-poll, ticker start). Importing it from a +// test triggers all of them. Pure helpers — formatRemovedWorkspaceError, +// computeJitteredInterval, resolvePlatformUrls — already live in +// their own modules so tests can pin contracts without booting the +// server. This file follows the same pattern: heartbeat is a +// fetch-and-log function with a single dependency (workspace_id + +// token + base URL), trivially testable against a Bun.serve fixture. + +/** + * Send one POST /registry/heartbeat to the platform. + * + * On success: 2xx, body drained. + * On platform 4xx/5xx: logged to stderr with status + truncated body, + * resolves cleanly so the next caller's setInterval tick retries. + * On network error: logged to stderr, resolves cleanly. + * + * The function NEVER throws — the typical caller is a setInterval + * tick, and an unhandled rejection there would kill the heartbeat + * loop for the rest of the plugin's lifetime, leaving the canvas + * badge stuck on awaiting_agent with no log to point at. + * + * Wire shape (pinned by heartbeat.test.ts): + * POST {platformUrl}/registry/heartbeat + * Authorization: Bearer {token} + * Content-Type: application/json + * Origin: {platformUrl} -- SaaS edge WAF requires this + * {"workspace_id": ""} -- minimal HeartbeatPayload + * + * The body is the smallest valid HeartbeatPayload — workspace_id is the + * only required field, everything else (error_rate, sample_error, + * active_tasks, uptime_seconds, current_task) is `omitempty`-friendly + * on the platform side. The Python runtime in workspace/heartbeat.py + * sends the same shape when it has no per-tick metrics to attach. + */ +export interface HeartbeatOptions { + /** Platform base URL, no trailing slash. e.g. https://tenant.staging.moleculesai.app */ + platformUrl: string + /** Workspace UUID being heartbeated. */ + workspaceId: string + /** Bearer token issued for this workspace by /registry/register. */ + token: string + /** Optional fetch override for tests. Defaults to globalThis.fetch. */ + fetchImpl?: typeof fetch + /** Optional stderr override for tests. Defaults to writing to process.stderr. */ + log?: (line: string) => void + /** Optional request timeout in ms. Defaults to 10s — heartbeat is a thin + * DB UPDATE; if it can't land in 10s the network is wedged enough that + * the next tick fires sooner than waiting longer would help. */ + timeoutMs?: number +} + +export async function sendHeartbeat(opts: HeartbeatOptions): Promise { + const fetchImpl = opts.fetchImpl ?? fetch + const log = opts.log ?? ((line: string) => { process.stderr.write(line) }) + const timeoutMs = opts.timeoutMs ?? 10_000 + + let resp: Response + try { + resp = await fetchImpl(`${opts.platformUrl}/registry/heartbeat`, { + method: 'POST', + headers: { + Authorization: `Bearer ${opts.token}`, + 'Content-Type': 'application/json', + Origin: opts.platformUrl, + }, + body: JSON.stringify({ workspace_id: opts.workspaceId }), + signal: AbortSignal.timeout(timeoutMs), + }) + } catch (err) { + log(`molecule channel: heartbeat ${opts.workspaceId} fetch failed: ${err}\n`) + return + } + + if (!resp.ok) { + const errText = await resp.text().catch(() => '') + log( + `molecule channel: heartbeat ${opts.workspaceId} HTTP ${resp.status} — ${errText.slice(0, 200)}\n`, + ) + return + } + + // 2xx — drain body so the connection can be reused. We don't consume + // any field from the heartbeat response; /registry/register is where + // platform_inbound_secret + auth_token are surfaced. + await resp.text().catch(() => '') +} diff --git a/package.json b/package.json index 19c9c91..d26c59c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "molecule-mcp-claude-channel", - "version": "0.4.0-gitea.1", + "version": "0.4.0-gitea.2", "description": "Molecule AI channel for Claude Code — bridges A2A traffic into a Claude Code session via MCP", "license": "Apache-2.0", "type": "module", diff --git a/server.ts b/server.ts index 259cdd9..2b35359 100644 --- a/server.ts +++ b/server.ts @@ -41,6 +41,7 @@ import { readFileSync, writeFileSync, mkdirSync, chmodSync, existsSync, renameSy import { homedir } from 'os' import { join } from 'path' import { extractText, type ActivityEntry } from './extract-text.ts' +import { sendHeartbeat } from './heartbeat.ts' // ─── Config ───────────────────────────────────────────────────────────── @@ -88,6 +89,23 @@ const AGENT_DESC = process.env.MOLECULE_AGENT_DESC ?? const AUTO_REGISTER_POLL = !['0', 'false', 'no'].includes( (process.env.MOLECULE_AUTO_REGISTER_POLL ?? 'true').toLowerCase() ) +// MOLECULE_HEARTBEAT_INTERVAL_MS — cadence for the per-workspace +// /registry/heartbeat ping that keeps the canvas presence badge on +// "online" (closes #6 / molecule-core#24). +// +// Default 30_000ms (30s) matches the Python runtime's HEARTBEAT_INTERVAL +// in workspace/heartbeat.py and is well under the platform's 90s +// `REMOTE_LIVENESS_STALE_AFTER` window — three heartbeat ticks fit +// inside the staleness budget so a single dropped POST doesn't flap +// the workspace to `awaiting_agent`. +// +// Set to 0 to disable the heartbeat loop entirely (useful for tests +// or for operators who run a separate heartbeat daemon). Negative +// values are clamped to 0. +const HEARTBEAT_INTERVAL_MS = Math.max( + 0, + parseInt(process.env.MOLECULE_HEARTBEAT_INTERVAL_MS ?? '30000', 10) || 0, +) if (!PLATFORM_URL || WORKSPACE_IDS.length === 0 || WORKSPACE_TOKENS.length === 0) { process.stderr.write( @@ -1261,7 +1279,11 @@ process.stderr.write( `molecule channel: connected — watching ${WORKSPACE_IDS.length} workspace(s) at ${PLATFORM_URL}\n` + ` workspaces: ${WORKSPACE_IDS.join(', ')}\n` + ` delivery_mode=poll cursor=${CURSOR_FILE} auto_register=${AUTO_REGISTER_POLL}\n` + - ` poll: every ${POLL_INTERVAL_MS}ms (cursor-based; ${POLL_WINDOW_SECS}s window only used for first-run seed)\n` + ` poll: every ${POLL_INTERVAL_MS}ms (cursor-based; ${POLL_WINDOW_SECS}s window only used for first-run seed)\n` + + ` heartbeat: ` + + (HEARTBEAT_INTERVAL_MS > 0 + ? `every ${HEARTBEAT_INTERVAL_MS}ms (POST /registry/heartbeat — keeps canvas presence on 'online')\n` + : `disabled (MOLECULE_HEARTBEAT_INTERVAL_MS=0; canvas will flip to 'awaiting_agent' after 90s)\n`) ) // Stagger initial polls slightly so N-workspace watchers don't all hit the @@ -1273,6 +1295,38 @@ WORKSPACE_IDS.forEach((id, i) => { }, i * 500) }) +// Per-workspace heartbeat ticker — closes #6 / molecule-core#24. +// +// The startup `registerAsPoll` upsert already bumped `last_heartbeat_at` +// on each row, so the workspace is "online" from boot. The first heartbeat +// fires after one full HEARTBEAT_INTERVAL_MS so we don't double-pump on +// startup; subsequent ticks keep the row fresh inside the 90s stale +// window enforced by workspace-server's healthsweep. +// +// Stagger by i * 500ms so N-workspace plugins don't fan-spike the +// platform — same shape as the poll-loop staggering above. +// +// Conditional on HEARTBEAT_INTERVAL_MS > 0 so tests / unusual deploys +// can disable the loop without hacking around the ticker. .unref() so +// the heartbeat doesn't keep the event loop alive at shutdown. +// +// `sendHeartbeat` is imported from ./heartbeat.ts — see that file for +// the full presence-bug rationale + wire-shape contract. +if (HEARTBEAT_INTERVAL_MS > 0) { + WORKSPACE_IDS.forEach((id, i) => { + setTimeout(() => { + setInterval( + () => void sendHeartbeat({ + platformUrl: PLATFORM_URL, + workspaceId: id, + token: TOKEN_BY_WORKSPACE.get(id)!, + }), + HEARTBEAT_INTERVAL_MS, + ).unref() + }, i * 500) + }) +} + // Clean shutdown — fire-and-forget a "disconnected" notice on each watched // workspace's A2A so peers don't sit waiting on a silent channel. const shutdown = (sig: string) => {