From f440e5b52d09bbd58c8f5a55b71e96df2f14b5b8 Mon Sep 17 00:00:00 2001 From: hongming Date: Thu, 28 May 2026 15:47:43 -0700 Subject: [PATCH] feat(session-cursor): session-namespaced cursor store for /activity adapters (v1.4.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `./session-cursor` — a shared, session-keyed durable since_id cursor store, beside `inbox-uploads`/`targets`, so every /activity-polling TS adapter (channel today; hermes-ts / codex-ts next) uses one implementation of the polling-cursor contract instead of re-implementing (and re-bugging) it inline. Why session-namespaced: a host can run more than one adapter session (two `claude` invocations both loading the plugin). The platform is fully concurrent — register/heartbeat are workspace-keyed last-writer-wins and /activity is read-only with a client-driven since_id (molecule-core registry.go / activity.go) — so the ONLY thing that races is a *shared* cursor file. Keying the file by session removes that race: - primary (no key) -> cursor.json (survives restarts; resumes) - secondary (key) -> cursor..json (independent; pruned when gone) Surface: `CursorStore` (load/get/has/set/delete/entries/save/unlink, atomic temp+rename, 0600), `cursorFileName(sessionKey?)`, `parseSessionKey`, `pruneOrphanCursors(stateDir, isAlive)`. Logging-agnostic: load() swallows corruption (optional onLoadError hook) and save() throws — the adapter owns its phrasing and fatal-vs-recoverable policy. Additive: new subpath export only; existing 1.3.x consumers unaffected. Context: molecule-mcp-claude-channel#26 (secondary) / internal#726. Co-Authored-By: Claude Opus 4.8 (1M context) --- package-lock.json | 4 +- package.json | 3 +- src/__tests__/session-cursor.test.ts | 174 ++++++++++++++++++++++ src/session-cursor.ts | 214 +++++++++++++++++++++++++++ 4 files changed, 392 insertions(+), 3 deletions(-) create mode 100644 src/__tests__/session-cursor.test.ts create mode 100644 src/session-cursor.ts diff --git a/package-lock.json b/package-lock.json index 9e6d348..9d68f4a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@molecule-ai/mcp-server", - "version": "1.3.1", + "version": "1.4.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@molecule-ai/mcp-server", - "version": "1.3.1", + "version": "1.4.0", "dependencies": { "@modelcontextprotocol/sdk": "^1.12.0", "pino": "^9.6.0", diff --git a/package.json b/package.json index 4365773..6bb0976 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,13 @@ { "name": "@molecule-ai/mcp-server", - "version": "1.3.1", + "version": "1.4.0", "description": "MCP server for Molecule AI Agent Team \u2014 manage workspaces, agents, and skills from any AI coding tool", "type": "module", "exports": { ".": "./dist/index.js", "./external-workspace-tools": "./dist/external_workspace_tools.js", "./inbox-uploads": "./dist/inbox-uploads.js", + "./session-cursor": "./dist/session-cursor.js", "./targets": "./dist/targets.js" }, "types": "./dist/index.d.ts", diff --git a/src/__tests__/session-cursor.test.ts b/src/__tests__/session-cursor.test.ts new file mode 100644 index 0000000..6d4ff23 --- /dev/null +++ b/src/__tests__/session-cursor.test.ts @@ -0,0 +1,174 @@ +import { mkdtempSync, rmSync, writeFileSync, readFileSync, readdirSync, existsSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { + CursorStore, + cursorFileName, + parseSessionKey, + pruneOrphanCursors, +} from "../session-cursor.js"; + +function freshDir(): string { + return mkdtempSync(join(tmpdir(), "session-cursor-test-")); +} + +describe("cursorFileName", () => { + it("maps absent/empty key to the shared primary file", () => { + expect(cursorFileName()).toBe("cursor.json"); + expect(cursorFileName(undefined)).toBe("cursor.json"); + expect(cursorFileName(null)).toBe("cursor.json"); + expect(cursorFileName("")).toBe("cursor.json"); + expect(cursorFileName(" ")).toBe("cursor.json"); + }); + + it("maps a session key to a per-session file", () => { + expect(cursorFileName("12345")).toBe("cursor.12345.json"); + expect(cursorFileName("a_b-9")).toBe("cursor.a_b-9.json"); + }); + + it("rejects keys that would break filename round-trip or escape the dir", () => { + expect(() => cursorFileName("../etc")).toThrow(); + expect(() => cursorFileName("a/b")).toThrow(); + expect(() => cursorFileName("a.b")).toThrow(); + }); +}); + +describe("parseSessionKey", () => { + it("extracts the key from a per-session file", () => { + expect(parseSessionKey("cursor.12345.json")).toBe("12345"); + expect(parseSessionKey("cursor.a_b-9.json")).toBe("a_b-9"); + }); + + it("returns null for the primary file and unrelated files (round-trips cursorFileName)", () => { + expect(parseSessionKey("cursor.json")).toBeNull(); + expect(parseSessionKey("bot.pid")).toBeNull(); + expect(parseSessionKey(".env")).toBeNull(); + expect(parseSessionKey("cursor.12345.json.tmp.999")).toBeNull(); + // Round-trip invariant for valid keys. + for (const key of ["12345", "a_b-9"]) { + expect(parseSessionKey(cursorFileName(key))).toBe(key); + } + }); +}); + +describe("CursorStore", () => { + let dir: string; + beforeEach(() => { + dir = freshDir(); + }); + afterEach(() => { + rmSync(dir, { recursive: true, force: true }); + }); + + it("primary vs secondary pick distinct files", () => { + expect(new CursorStore({ stateDir: dir }).fileName).toBe("cursor.json"); + expect(new CursorStore({ stateDir: dir, sessionKey: "777" }).fileName).toBe("cursor.777.json"); + }); + + it("load on a missing file yields an empty store (first run)", () => { + const store = new CursorStore({ stateDir: dir }).load(); + expect(store.size).toBe(0); + expect(store.get("ws-1")).toBeUndefined(); + }); + + it("round-trips set → save → reload", () => { + const a = new CursorStore({ stateDir: dir }); + a.set("ws-1", "act-100"); + a.set("ws-2", "act-200"); + a.save(); + + const b = new CursorStore({ stateDir: dir }).load(); + expect(b.get("ws-1")).toBe("act-100"); + expect(b.get("ws-2")).toBe("act-200"); + expect(b.size).toBe(2); + }); + + it("delete then save drops the key on disk", () => { + const a = new CursorStore({ stateDir: dir }); + a.set("ws-1", "act-100"); + a.set("ws-2", "act-200"); + a.save(); + expect(a.delete("ws-1")).toBe(true); + a.save(); + + const b = new CursorStore({ stateDir: dir }).load(); + expect(b.has("ws-1")).toBe(false); + expect(b.get("ws-2")).toBe("act-200"); + }); + + it("treats a corrupt file as first-run and reports via onLoadError", () => { + writeFileSync(join(dir, "cursor.json"), "{not json"); + const errs: unknown[] = []; + const store = new CursorStore({ stateDir: dir, onLoadError: (e) => errs.push(e) }).load(); + expect(store.size).toBe(0); + expect(errs).toHaveLength(1); + }); + + it("ignores non-string / empty values in the persisted object", () => { + writeFileSync( + join(dir, "cursor.json"), + JSON.stringify({ "ws-1": "act-1", "ws-2": 42, "ws-3": "", "ws-4": null }), + ); + const store = new CursorStore({ stateDir: dir }).load(); + expect(store.get("ws-1")).toBe("act-1"); + expect(store.has("ws-2")).toBe(false); + expect(store.has("ws-3")).toBe(false); + expect(store.has("ws-4")).toBe(false); + }); + + it("save is atomic — no temp file lingers and the JSON is well-formed", () => { + const a = new CursorStore({ stateDir: dir }); + a.set("ws-1", "act-100"); + a.save(); + const leftovers = readdirSync(dir).filter((n) => n.includes(".tmp.")); + expect(leftovers).toEqual([]); + expect(JSON.parse(readFileSync(join(dir, "cursor.json"), "utf8"))).toEqual({ "ws-1": "act-100" }); + }); + + it("unlink removes the backing file and is a no-op when already gone", () => { + const a = new CursorStore({ stateDir: dir, sessionKey: "777" }); + a.set("ws-1", "act-1"); + a.save(); + expect(existsSync(join(dir, "cursor.777.json"))).toBe(true); + a.unlink(); + expect(existsSync(join(dir, "cursor.777.json"))).toBe(false); + expect(() => a.unlink()).not.toThrow(); + }); +}); + +describe("pruneOrphanCursors", () => { + let dir: string; + beforeEach(() => { + dir = freshDir(); + }); + afterEach(() => { + rmSync(dir, { recursive: true, force: true }); + }); + + it("removes only dead per-session files; keeps primary, live sessions, and unrelated files", () => { + writeFileSync(join(dir, "cursor.json"), "{}"); // primary — never pruned + writeFileSync(join(dir, "cursor.111.json"), "{}"); // dead session + writeFileSync(join(dir, "cursor.222.json"), "{}"); // live session + writeFileSync(join(dir, "bot.pid"), "222"); // unrelated — never pruned + + const pruned = pruneOrphanCursors(dir, (key) => key === "222"); + + expect(pruned).toEqual(["cursor.111.json"]); + const remaining = readdirSync(dir).sort(); + expect(remaining).toEqual(["bot.pid", "cursor.222.json", "cursor.json"]); + }); + + it("never deletes a cursor whose liveness probe throws", () => { + writeFileSync(join(dir, "cursor.111.json"), "{}"); + const pruned = pruneOrphanCursors(dir, () => { + throw new Error("probe blew up"); + }); + expect(pruned).toEqual([]); + expect(existsSync(join(dir, "cursor.111.json"))).toBe(true); + }); + + it("tolerates a missing state dir", () => { + expect(pruneOrphanCursors(join(dir, "does-not-exist"), () => false)).toEqual([]); + }); +}); diff --git a/src/session-cursor.ts b/src/session-cursor.ts new file mode 100644 index 0000000..6e8c9fc --- /dev/null +++ b/src/session-cursor.ts @@ -0,0 +1,214 @@ +/** + * session-cursor — session-namespaced, durable since_id cursor store for + * /activity-polling adapters. + * + * Shared contract surface for any TS adapter that polls + * GET /workspaces/:id/activity?since_id= + * and must persist "the activity_logs.id of the last event I delivered" so a + * restart resumes without missing or replaying messages. The channel plugin + * had this inline; hermes-ts / codex-ts will need the identical behavior. + * Extracted here (beside `inbox-uploads` / `targets`) so the polling-cursor + * contract has one implementation, per the cross-adapter SSOT pattern. + * + * WHY SESSION-NAMESPACED: + * A single host can run more than one adapter session (two `claude` + * invocations both loading the plugin). They poll the same workspace_id, + * but the platform is fully concurrent (register/heartbeat are + * workspace-keyed last-writer-wins, /activity is read-only with a + * client-driven since_id — molecule-core registry.go / activity.go). The + * ONLY thing that races is a *shared* cursor file. Keying the cursor file + * by a session key removes that race so concurrent sessions don't clobber + * each other (molecule-mcp-claude-channel#26 / internal#726). + * + * - Primary (no session key) → `cursor.json` — survives restarts, so the + * common single-session case resumes from its last position. + * - Secondary (session key) → `cursor..json` — independent; pruned + * when its session is gone. + * + * Logging-agnostic on purpose: `load()` swallows corruption (optionally + * reporting via `onLoadError`) and `save()` throws — the adapter owns its + * stderr/pino phrasing and decides whether a failed tick should be fatal. + */ + +import { + existsSync, + readFileSync, + writeFileSync, + renameSync, + unlinkSync, + readdirSync, +} from "node:fs"; +import { join } from "node:path"; + +const PRIMARY_FILE = "cursor.json"; +const SESSION_RE = /^cursor\.([A-Za-z0-9_-]+)\.json$/; +const VALID_KEY = /^[A-Za-z0-9_-]+$/; + +/** + * Map a session key to its cursor filename. + * undefined / null / "" → "cursor.json" (primary; survives restarts) + * "12345" → "cursor.12345.json" (secondary; per-session) + * Throws on a key that would break filename round-tripping or escape the + * state dir (path separators, dots). Callers pass a PID string, always valid. + */ +export function cursorFileName(sessionKey?: string | null): string { + const key = (sessionKey ?? "").trim(); + if (!key) return PRIMARY_FILE; + if (!VALID_KEY.test(key)) { + throw new Error( + `session key must match ${VALID_KEY} (got ${JSON.stringify(sessionKey)})`, + ); + } + return `cursor.${key}.json`; +} + +/** + * Inverse of {@link cursorFileName} for secondary files. Returns the session + * key for a `cursor..json` file, or null for the primary `cursor.json` + * and any unrelated file. Used to identify prunable per-session files. + */ +export function parseSessionKey(fileName: string): string | null { + const m = SESSION_RE.exec(fileName); + return m ? m[1]! : null; +} + +/** + * Delete per-session cursor files whose session is no longer alive. Never + * touches the primary `cursor.json` or unrelated files. `isAlive(key)` is + * supplied by the adapter (e.g. a PID-liveness probe). Returns the list of + * removed filenames (for logging). Tolerant of a missing state dir. + */ +export function pruneOrphanCursors( + stateDir: string, + isAlive: (sessionKey: string) => boolean, +): string[] { + const pruned: string[] = []; + let names: string[]; + try { + names = readdirSync(stateDir); + } catch { + return pruned; + } + for (const name of names) { + const key = parseSessionKey(name); + if (key === null) continue; // primary or unrelated + let alive = true; + try { + alive = isAlive(key); + } catch { + // A probe that throws is treated as "alive" — never delete a cursor we + // can't prove is orphaned. + alive = true; + } + if (alive) continue; + try { + unlinkSync(join(stateDir, name)); + pruned.push(name); + } catch { + // Already gone or unreadable — nothing to do. + } + } + return pruned; +} + +export interface CursorStoreOptions { + /** Directory holding the cursor file(s). */ + stateDir: string; + /** Session key; null/undefined => the shared primary cursor. */ + sessionKey?: string | null; + /** File mode for the cursor file. Defaults to 0o600 (it's not secret, but cheap to lock down). */ + fileMode?: number; + /** Optional hook invoked when {@link CursorStore.load} hits an unreadable/corrupt file. */ + onLoadError?: (err: unknown) => void; +} + +/** + * A workspace_id → last-delivered-activity-id map backed by one JSON file. + * + * Schema on disk: `{ "ws-uuid-1": "act-uuid-X", "ws-uuid-2": "act-uuid-Y" }`. + * Atomic persistence via temp+rename so a crash mid-write can't corrupt the + * file (the previous cursor stays valid; worst case is a few replays). + */ +export class CursorStore { + /** Filename within the state dir (e.g. "cursor.json" or "cursor.123.json"). */ + readonly fileName: string; + /** Absolute path to the backing file. */ + readonly path: string; + private readonly fileMode: number; + private readonly onLoadError?: (err: unknown) => void; + private readonly cursors = new Map(); + + constructor(opts: CursorStoreOptions) { + this.fileName = cursorFileName(opts.sessionKey); + this.path = join(opts.stateDir, this.fileName); + this.fileMode = opts.fileMode ?? 0o600; + this.onLoadError = opts.onLoadError; + } + + /** + * Populate from disk. Missing file => empty (first run). Corrupt file => + * empty (treated as first run; `onLoadError` is invoked if provided). Never + * throws — a poller that refuses to start over one bad file is worse than + * the recovery cost (re-seed from now). Returns `this` for chaining. + */ + load(): this { + this.cursors.clear(); + if (!existsSync(this.path)) return this; + try { + const parsed = JSON.parse(readFileSync(this.path, "utf8")) as Record; + for (const [k, v] of Object.entries(parsed)) { + if (typeof v === "string" && v.length > 0) this.cursors.set(k, v); + } + } catch (err) { + this.cursors.clear(); + this.onLoadError?.(err); + } + return this; + } + + get(workspaceId: string): string | undefined { + return this.cursors.get(workspaceId); + } + + has(workspaceId: string): boolean { + return this.cursors.has(workspaceId); + } + + set(workspaceId: string, activityId: string): void { + this.cursors.set(workspaceId, activityId); + } + + delete(workspaceId: string): boolean { + return this.cursors.delete(workspaceId); + } + + entries(): Array<[string, string]> { + return Array.from(this.cursors.entries()); + } + + get size(): number { + return this.cursors.size; + } + + /** + * Atomically persist to disk (temp + rename). The temp name is PID-suffixed + * so two writers never collide on the temp path. Throws on write failure — + * the caller (typically a setInterval tick) decides whether to log+swallow. + */ + save(): void { + const obj: Record = {}; + for (const [k, v] of this.cursors) obj[k] = v; + const tmp = `${this.path}.tmp.${process.pid}`; + writeFileSync(tmp, JSON.stringify(obj, null, 2), { mode: this.fileMode }); + renameSync(tmp, this.path); + } + + /** Remove the backing file. Used by a secondary session on clean exit. No-op if already gone. */ + unlink(): void { + try { + unlinkSync(this.path); + } catch { + // Already removed or never written. + } + } +} -- 2.52.0