fix(presence): POST /registry/heartbeat ticker so canvas badge stays online (closes #6, closes molecule-core#24)
All checks were successful
Test / bun test (pull_request) Successful in 20s
All checks were successful
Test / bun test (pull_request) Successful in 20s
v0.4.0-gitea.1 polled /workspaces/:id/activity but never sent /registry/heartbeat. The platform's healthsweep (workspace-server/internal/registry/healthsweep.go) flipped any runtime='external' workspace whose last_heartbeat_at was older than 90s back to status='awaiting_agent', so the canvas presence badge stuck on awaiting_agent within 90s of plugin start even while A2A traffic flowed fine over the long-poll loop. Fix: per-workspace heartbeat ticker (default 30s, three ticks inside the 90s stale window) POSTs the minimal HeartbeatPayload shape (workspace_id only) — same path the Python runtime in workspace/heartbeat.py uses when it has nothing else to report. heartbeat.ts pure module + Bun.serve fixture test pin the wire shape (POST + bearer + Origin + workspace_id body) so a future refactor that drops any of those silently re-breaks the badge. Bump 0.4.0-gitea.1 → 0.4.0-gitea.2 and document MOLECULE_HEARTBEAT_INTERVAL_MS in README. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ed5cbe02b5
commit
b3b79a5efc
@ -13,7 +13,7 @@
|
||||
"url": "https://git.moleculesai.app/molecule-ai/molecule-mcp-claude-channel.git"
|
||||
},
|
||||
"description": "Bridges Molecule A2A traffic into a Claude Code session via MCP. Subscribe to one or more Molecule workspaces; A2A messages from peers surface as conversation turns; replies route back through Molecule's A2A endpoints.",
|
||||
"version": "0.4.0-gitea.1",
|
||||
"version": "0.4.0-gitea.2",
|
||||
"homepage": "https://git.moleculesai.app/molecule-ai/molecule-mcp-claude-channel",
|
||||
"license": "Apache-2.0",
|
||||
"keywords": [
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "molecule",
|
||||
"description": "Molecule AI channel for Claude Code — bridges Molecule A2A traffic into a Claude Code session via MCP. Subscribe to one or more Molecule workspaces; A2A messages from peers surface as conversation turns; replies route back through Molecule's A2A endpoints.",
|
||||
"version": "0.4.0-gitea.1",
|
||||
"version": "0.4.0-gitea.2",
|
||||
"keywords": [
|
||||
"molecule",
|
||||
"molecule-ai",
|
||||
|
||||
@ -48,6 +48,7 @@ MOLECULE_POLL_WINDOW_SECS=30 # default 30s — only used to seed the first
|
||||
MOLECULE_AGENT_NAME="Claude Code (channel)" # how the workspace appears in canvas
|
||||
MOLECULE_AGENT_DESC="Local Claude Code session..."
|
||||
MOLECULE_AUTO_REGISTER_POLL=true # set to "false" if you've configured the workspace another way
|
||||
MOLECULE_HEARTBEAT_INTERVAL_MS=30000 # default 30s — keeps the canvas presence badge on "online"; set to 0 to disable
|
||||
```
|
||||
|
||||
The `.env` file is `chmod 600` after first read; tokens never appear in environment-block-style `claude doctor` dumps.
|
||||
|
||||
172
heartbeat.test.ts
Normal file
172
heartbeat.test.ts
Normal file
@ -0,0 +1,172 @@
|
||||
// heartbeat.test.ts — pin the POST /registry/heartbeat shape against a
|
||||
// local Bun.serve fixture. Closes #6 / molecule-core#24 — the v0.4.0-gitea.1
|
||||
// channel plugin polled /workspaces/:id/activity but never POSTed
|
||||
// /registry/heartbeat, so the platform's healthsweep flipped the canvas
|
||||
// presence badge to `awaiting_agent` within 90s of plugin start.
|
||||
//
|
||||
// The poll loop is read-only on the platform side (activity.go is a SELECT
|
||||
// — /workspaces/:id/activity does NOT bump last_heartbeat_at), so without
|
||||
// a dedicated keepalive POST the row stales out and the badge looks
|
||||
// offline even while A2A traffic flows fine.
|
||||
//
|
||||
// Asserts the actual HTTP wire shape:
|
||||
// - method = POST
|
||||
// - path = /registry/heartbeat
|
||||
// - Authorization: Bearer <token-for-workspace>
|
||||
// - Content-Type: application/json
|
||||
// - Origin: <platformUrl> (SaaS edge WAF — same as register)
|
||||
// - body.workspace_id = <id>
|
||||
//
|
||||
// Pre-fix code path: heartbeat.ts does not exist. Post-fix: this test
|
||||
// passes against the real function and would FAIL if a refactor swapped
|
||||
// POST→GET, dropped the bearer token, renamed workspace_id, or stopped
|
||||
// drainage on the success path — all of which would silently re-break
|
||||
// the presence badge or leak sockets.
|
||||
|
||||
import { afterAll, afterEach, beforeAll, describe, expect, it } from 'bun:test'
|
||||
|
||||
import { sendHeartbeat } from './heartbeat.ts'
|
||||
|
||||
interface CapturedRequest {
|
||||
method: string
|
||||
pathname: string
|
||||
headers: Record<string, string>
|
||||
body: unknown
|
||||
}
|
||||
|
||||
let captured: CapturedRequest[] = []
|
||||
let nextStatus = 200
|
||||
let nextResponseBody: string = '{}'
|
||||
|
||||
const fixture = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
const url = new URL(req.url)
|
||||
let body: unknown = undefined
|
||||
try {
|
||||
body = await req.json()
|
||||
} catch {
|
||||
body = await req.text().catch(() => undefined)
|
||||
}
|
||||
const hdrs: Record<string, string> = {}
|
||||
req.headers.forEach((v, k) => { hdrs[k.toLowerCase()] = v })
|
||||
captured.push({ method: req.method, pathname: url.pathname, headers: hdrs, body })
|
||||
return new Response(nextResponseBody, {
|
||||
status: nextStatus,
|
||||
headers: { 'content-type': 'application/json' },
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
const platformUrl = `http://127.0.0.1:${fixture.port}`
|
||||
|
||||
beforeAll(() => {
|
||||
captured = []
|
||||
nextStatus = 200
|
||||
nextResponseBody = '{}'
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
captured = []
|
||||
nextStatus = 200
|
||||
nextResponseBody = '{}'
|
||||
})
|
||||
|
||||
afterAll(() => {
|
||||
fixture.stop(true)
|
||||
})
|
||||
|
||||
describe('sendHeartbeat — POST /registry/heartbeat shape (closes #6 / molecule-core#24)', () => {
|
||||
it('POSTs the workspace_id payload with the per-workspace bearer token + Origin header', async () => {
|
||||
nextStatus = 200
|
||||
await sendHeartbeat({
|
||||
platformUrl,
|
||||
workspaceId: 'ws-heartbeat-test-id',
|
||||
token: 'tok-heartbeat-test',
|
||||
})
|
||||
|
||||
expect(captured).toHaveLength(1)
|
||||
const req = captured[0]!
|
||||
expect(req.method).toBe('POST')
|
||||
expect(req.pathname).toBe('/registry/heartbeat')
|
||||
expect(req.headers['authorization']).toBe('Bearer tok-heartbeat-test')
|
||||
expect(req.headers['content-type']).toContain('application/json')
|
||||
// Origin pinned because SaaS edge WAF rewrites /workspaces/* and
|
||||
// /registry/* to the Next.js front-end without it (per saved memory
|
||||
// `reference_saas_waf_origin_header.md`). Heartbeat would silently
|
||||
// 404 on saas tenants without it; pin so a refactor that drops it
|
||||
// surfaces here, not in production.
|
||||
expect(req.headers['origin']).toBe(platformUrl)
|
||||
expect(req.body).toEqual({ workspace_id: 'ws-heartbeat-test-id' })
|
||||
})
|
||||
|
||||
it('does not throw on platform 5xx — logs and returns so the next tick retries', async () => {
|
||||
nextStatus = 503
|
||||
nextResponseBody = 'service unavailable'
|
||||
const logs: string[] = []
|
||||
// sendHeartbeat must not propagate — the setInterval caller relies on
|
||||
// resolution-not-rejection so a transient platform 503 doesn't kill
|
||||
// the heartbeat loop for the rest of the plugin's lifetime.
|
||||
await expect(sendHeartbeat({
|
||||
platformUrl,
|
||||
workspaceId: 'ws-x',
|
||||
token: 'tok-x',
|
||||
log: (line) => { logs.push(line) },
|
||||
})).resolves.toBeUndefined()
|
||||
expect(captured).toHaveLength(1)
|
||||
expect(logs.join('')).toContain('HTTP 503')
|
||||
expect(logs.join('')).toContain('service unavailable')
|
||||
})
|
||||
|
||||
it('does not throw on platform 401 — auth-token revocation surfaces in stderr but does not crash', async () => {
|
||||
nextStatus = 401
|
||||
nextResponseBody = '{"error":"invalid token"}'
|
||||
const logs: string[] = []
|
||||
await expect(sendHeartbeat({
|
||||
platformUrl,
|
||||
workspaceId: 'ws-y',
|
||||
token: 'tok-revoked',
|
||||
log: (line) => { logs.push(line) },
|
||||
})).resolves.toBeUndefined()
|
||||
expect(captured).toHaveLength(1)
|
||||
expect(logs.join('')).toContain('HTTP 401')
|
||||
})
|
||||
|
||||
it('does not throw on network error — fetch failure logged, next tick retries', async () => {
|
||||
const logs: string[] = []
|
||||
// Use a port that's almost certainly closed (port 1 is reserved/usually
|
||||
// unreachable in user space). On any plausible test host the connection
|
||||
// refuses immediately, surfacing the fetch-failed branch.
|
||||
await expect(sendHeartbeat({
|
||||
platformUrl: 'http://127.0.0.1:1',
|
||||
workspaceId: 'ws-net',
|
||||
token: 'tok',
|
||||
log: (line) => { logs.push(line) },
|
||||
timeoutMs: 1_000,
|
||||
})).resolves.toBeUndefined()
|
||||
expect(logs.join('')).toContain('fetch failed')
|
||||
})
|
||||
|
||||
it('drains the response body on success so connections can be reused', async () => {
|
||||
// Pre-fix concern: a body-not-drained refactor would leak sockets in
|
||||
// production over the lifetime of a long-running session. The
|
||||
// contract the production code relies on is "after sendHeartbeat
|
||||
// resolves, the body is consumed" — verifiable indirectly by
|
||||
// observing that a follow-up call still sees a fresh fixture entry.
|
||||
nextStatus = 200
|
||||
nextResponseBody = '{"ok":true,"some":"large-response-body-with-content"}'
|
||||
await sendHeartbeat({
|
||||
platformUrl,
|
||||
workspaceId: 'ws-1',
|
||||
token: 'tok-1',
|
||||
})
|
||||
await sendHeartbeat({
|
||||
platformUrl,
|
||||
workspaceId: 'ws-2',
|
||||
token: 'tok-2',
|
||||
})
|
||||
expect(captured).toHaveLength(2)
|
||||
expect(captured[0]!.body).toEqual({ workspace_id: 'ws-1' })
|
||||
expect(captured[1]!.body).toEqual({ workspace_id: 'ws-2' })
|
||||
})
|
||||
})
|
||||
109
heartbeat.ts
Normal file
109
heartbeat.ts
Normal file
@ -0,0 +1,109 @@
|
||||
// heartbeat.ts — POST /registry/heartbeat keepalive that flips the
|
||||
// canvas presence badge from `awaiting_agent` to `online`. Closes #6
|
||||
// and molecule-core#24.
|
||||
//
|
||||
// Why this file exists:
|
||||
//
|
||||
// The platform's healthsweep (workspace-server's
|
||||
// internal/registry/healthsweep.go) flips any `runtime='external'`
|
||||
// workspace whose `last_heartbeat_at` is older than 90s back to
|
||||
// `status='awaiting_agent'`. The v0.4.0-gitea.1 channel plugin only
|
||||
// POSTed /registry/register at startup (which DOES bump
|
||||
// last_heartbeat_at via registry.go:369) but never heartbeated again.
|
||||
// Within 90s of plugin start the row goes stale, the canvas badge
|
||||
// flips to `awaiting_agent`, and the workspace looks offline even
|
||||
// though A2A traffic flows fine over the long-poll loop.
|
||||
//
|
||||
// /workspaces/:id/activity GET (the poll loop) is read-only on the
|
||||
// platform side — it does NOT touch presence. /registry/heartbeat is
|
||||
// the only endpoint the platform's healthsweep actually watches.
|
||||
//
|
||||
// Why a separate module:
|
||||
//
|
||||
// server.ts has top-level side effects (PID-file lock, MCP connect,
|
||||
// compat probe, register-as-poll, ticker start). Importing it from a
|
||||
// test triggers all of them. Pure helpers — formatRemovedWorkspaceError,
|
||||
// computeJitteredInterval, resolvePlatformUrls — already live in
|
||||
// their own modules so tests can pin contracts without booting the
|
||||
// server. This file follows the same pattern: heartbeat is a
|
||||
// fetch-and-log function with a single dependency (workspace_id +
|
||||
// token + base URL), trivially testable against a Bun.serve fixture.
|
||||
|
||||
/**
|
||||
* Send one POST /registry/heartbeat to the platform.
|
||||
*
|
||||
* On success: 2xx, body drained.
|
||||
* On platform 4xx/5xx: logged to stderr with status + truncated body,
|
||||
* resolves cleanly so the next caller's setInterval tick retries.
|
||||
* On network error: logged to stderr, resolves cleanly.
|
||||
*
|
||||
* The function NEVER throws — the typical caller is a setInterval
|
||||
* tick, and an unhandled rejection there would kill the heartbeat
|
||||
* loop for the rest of the plugin's lifetime, leaving the canvas
|
||||
* badge stuck on awaiting_agent with no log to point at.
|
||||
*
|
||||
* Wire shape (pinned by heartbeat.test.ts):
|
||||
* POST {platformUrl}/registry/heartbeat
|
||||
* Authorization: Bearer {token}
|
||||
* Content-Type: application/json
|
||||
* Origin: {platformUrl} -- SaaS edge WAF requires this
|
||||
* {"workspace_id": "<id>"} -- minimal HeartbeatPayload
|
||||
*
|
||||
* The body is the smallest valid HeartbeatPayload — workspace_id is the
|
||||
* only required field, everything else (error_rate, sample_error,
|
||||
* active_tasks, uptime_seconds, current_task) is `omitempty`-friendly
|
||||
* on the platform side. The Python runtime in workspace/heartbeat.py
|
||||
* sends the same shape when it has no per-tick metrics to attach.
|
||||
*/
|
||||
export interface HeartbeatOptions {
|
||||
/** Platform base URL, no trailing slash. e.g. https://tenant.staging.moleculesai.app */
|
||||
platformUrl: string
|
||||
/** Workspace UUID being heartbeated. */
|
||||
workspaceId: string
|
||||
/** Bearer token issued for this workspace by /registry/register. */
|
||||
token: string
|
||||
/** Optional fetch override for tests. Defaults to globalThis.fetch. */
|
||||
fetchImpl?: typeof fetch
|
||||
/** Optional stderr override for tests. Defaults to writing to process.stderr. */
|
||||
log?: (line: string) => void
|
||||
/** Optional request timeout in ms. Defaults to 10s — heartbeat is a thin
|
||||
* DB UPDATE; if it can't land in 10s the network is wedged enough that
|
||||
* the next tick fires sooner than waiting longer would help. */
|
||||
timeoutMs?: number
|
||||
}
|
||||
|
||||
export async function sendHeartbeat(opts: HeartbeatOptions): Promise<void> {
|
||||
const fetchImpl = opts.fetchImpl ?? fetch
|
||||
const log = opts.log ?? ((line: string) => { process.stderr.write(line) })
|
||||
const timeoutMs = opts.timeoutMs ?? 10_000
|
||||
|
||||
let resp: Response
|
||||
try {
|
||||
resp = await fetchImpl(`${opts.platformUrl}/registry/heartbeat`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${opts.token}`,
|
||||
'Content-Type': 'application/json',
|
||||
Origin: opts.platformUrl,
|
||||
},
|
||||
body: JSON.stringify({ workspace_id: opts.workspaceId }),
|
||||
signal: AbortSignal.timeout(timeoutMs),
|
||||
})
|
||||
} catch (err) {
|
||||
log(`molecule channel: heartbeat ${opts.workspaceId} fetch failed: ${err}\n`)
|
||||
return
|
||||
}
|
||||
|
||||
if (!resp.ok) {
|
||||
const errText = await resp.text().catch(() => '')
|
||||
log(
|
||||
`molecule channel: heartbeat ${opts.workspaceId} HTTP ${resp.status} — ${errText.slice(0, 200)}\n`,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// 2xx — drain body so the connection can be reused. We don't consume
|
||||
// any field from the heartbeat response; /registry/register is where
|
||||
// platform_inbound_secret + auth_token are surfaced.
|
||||
await resp.text().catch(() => '')
|
||||
}
|
||||
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "molecule-mcp-claude-channel",
|
||||
"version": "0.4.0-gitea.1",
|
||||
"version": "0.4.0-gitea.2",
|
||||
"description": "Molecule AI channel for Claude Code — bridges A2A traffic into a Claude Code session via MCP",
|
||||
"license": "Apache-2.0",
|
||||
"type": "module",
|
||||
|
||||
56
server.ts
56
server.ts
@ -41,6 +41,7 @@ import { readFileSync, writeFileSync, mkdirSync, chmodSync, existsSync, renameSy
|
||||
import { homedir } from 'os'
|
||||
import { join } from 'path'
|
||||
import { extractText, type ActivityEntry } from './extract-text.ts'
|
||||
import { sendHeartbeat } from './heartbeat.ts'
|
||||
|
||||
// ─── Config ─────────────────────────────────────────────────────────────
|
||||
|
||||
@ -88,6 +89,23 @@ const AGENT_DESC = process.env.MOLECULE_AGENT_DESC ??
|
||||
const AUTO_REGISTER_POLL = !['0', 'false', 'no'].includes(
|
||||
(process.env.MOLECULE_AUTO_REGISTER_POLL ?? 'true').toLowerCase()
|
||||
)
|
||||
// MOLECULE_HEARTBEAT_INTERVAL_MS — cadence for the per-workspace
|
||||
// /registry/heartbeat ping that keeps the canvas presence badge on
|
||||
// "online" (closes #6 / molecule-core#24).
|
||||
//
|
||||
// Default 30_000ms (30s) matches the Python runtime's HEARTBEAT_INTERVAL
|
||||
// in workspace/heartbeat.py and is well under the platform's 90s
|
||||
// `REMOTE_LIVENESS_STALE_AFTER` window — three heartbeat ticks fit
|
||||
// inside the staleness budget so a single dropped POST doesn't flap
|
||||
// the workspace to `awaiting_agent`.
|
||||
//
|
||||
// Set to 0 to disable the heartbeat loop entirely (useful for tests
|
||||
// or for operators who run a separate heartbeat daemon). Negative
|
||||
// values are clamped to 0.
|
||||
const HEARTBEAT_INTERVAL_MS = Math.max(
|
||||
0,
|
||||
parseInt(process.env.MOLECULE_HEARTBEAT_INTERVAL_MS ?? '30000', 10) || 0,
|
||||
)
|
||||
|
||||
if (!PLATFORM_URL || WORKSPACE_IDS.length === 0 || WORKSPACE_TOKENS.length === 0) {
|
||||
process.stderr.write(
|
||||
@ -1261,7 +1279,11 @@ process.stderr.write(
|
||||
`molecule channel: connected — watching ${WORKSPACE_IDS.length} workspace(s) at ${PLATFORM_URL}\n` +
|
||||
` workspaces: ${WORKSPACE_IDS.join(', ')}\n` +
|
||||
` delivery_mode=poll cursor=${CURSOR_FILE} auto_register=${AUTO_REGISTER_POLL}\n` +
|
||||
` poll: every ${POLL_INTERVAL_MS}ms (cursor-based; ${POLL_WINDOW_SECS}s window only used for first-run seed)\n`
|
||||
` poll: every ${POLL_INTERVAL_MS}ms (cursor-based; ${POLL_WINDOW_SECS}s window only used for first-run seed)\n` +
|
||||
` heartbeat: ` +
|
||||
(HEARTBEAT_INTERVAL_MS > 0
|
||||
? `every ${HEARTBEAT_INTERVAL_MS}ms (POST /registry/heartbeat — keeps canvas presence on 'online')\n`
|
||||
: `disabled (MOLECULE_HEARTBEAT_INTERVAL_MS=0; canvas will flip to 'awaiting_agent' after 90s)\n`)
|
||||
)
|
||||
|
||||
// Stagger initial polls slightly so N-workspace watchers don't all hit the
|
||||
@ -1273,6 +1295,38 @@ WORKSPACE_IDS.forEach((id, i) => {
|
||||
}, i * 500)
|
||||
})
|
||||
|
||||
// Per-workspace heartbeat ticker — closes #6 / molecule-core#24.
|
||||
//
|
||||
// The startup `registerAsPoll` upsert already bumped `last_heartbeat_at`
|
||||
// on each row, so the workspace is "online" from boot. The first heartbeat
|
||||
// fires after one full HEARTBEAT_INTERVAL_MS so we don't double-pump on
|
||||
// startup; subsequent ticks keep the row fresh inside the 90s stale
|
||||
// window enforced by workspace-server's healthsweep.
|
||||
//
|
||||
// Stagger by i * 500ms so N-workspace plugins don't fan-spike the
|
||||
// platform — same shape as the poll-loop staggering above.
|
||||
//
|
||||
// Conditional on HEARTBEAT_INTERVAL_MS > 0 so tests / unusual deploys
|
||||
// can disable the loop without hacking around the ticker. .unref() so
|
||||
// the heartbeat doesn't keep the event loop alive at shutdown.
|
||||
//
|
||||
// `sendHeartbeat` is imported from ./heartbeat.ts — see that file for
|
||||
// the full presence-bug rationale + wire-shape contract.
|
||||
if (HEARTBEAT_INTERVAL_MS > 0) {
|
||||
WORKSPACE_IDS.forEach((id, i) => {
|
||||
setTimeout(() => {
|
||||
setInterval(
|
||||
() => void sendHeartbeat({
|
||||
platformUrl: PLATFORM_URL,
|
||||
workspaceId: id,
|
||||
token: TOKEN_BY_WORKSPACE.get(id)!,
|
||||
}),
|
||||
HEARTBEAT_INTERVAL_MS,
|
||||
).unref()
|
||||
}, i * 500)
|
||||
})
|
||||
}
|
||||
|
||||
// Clean shutdown — fire-and-forget a "disconnected" notice on each watched
|
||||
// workspace's A2A so peers don't sit waiting on a silent channel.
|
||||
const shutdown = (sig: string) => {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user