feat(session-cursor): session-namespaced cursor store for /activity adapters (v1.4.0) #30

Merged
hongming merged 1 commits from feat/session-cursor-module into main 2026-05-28 23:50:13 +00:00
4 changed files with 392 additions and 3 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "@molecule-ai/mcp-server",
"version": "1.3.1",
"version": "1.4.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@molecule-ai/mcp-server",
"version": "1.3.1",
"version": "1.4.0",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.12.0",
"pino": "^9.6.0",
+2 -1
View File
@@ -1,12 +1,13 @@
{
"name": "@molecule-ai/mcp-server",
"version": "1.3.1",
"version": "1.4.0",
"description": "MCP server for Molecule AI Agent Team \u2014 manage workspaces, agents, and skills from any AI coding tool",
"type": "module",
"exports": {
".": "./dist/index.js",
"./external-workspace-tools": "./dist/external_workspace_tools.js",
"./inbox-uploads": "./dist/inbox-uploads.js",
"./session-cursor": "./dist/session-cursor.js",
"./targets": "./dist/targets.js"
},
"types": "./dist/index.d.ts",
+174
View File
@@ -0,0 +1,174 @@
import { mkdtempSync, rmSync, writeFileSync, readFileSync, readdirSync, existsSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
CursorStore,
cursorFileName,
parseSessionKey,
pruneOrphanCursors,
} from "../session-cursor.js";
function freshDir(): string {
return mkdtempSync(join(tmpdir(), "session-cursor-test-"));
}
describe("cursorFileName", () => {
it("maps absent/empty key to the shared primary file", () => {
expect(cursorFileName()).toBe("cursor.json");
expect(cursorFileName(undefined)).toBe("cursor.json");
expect(cursorFileName(null)).toBe("cursor.json");
expect(cursorFileName("")).toBe("cursor.json");
expect(cursorFileName(" ")).toBe("cursor.json");
});
it("maps a session key to a per-session file", () => {
expect(cursorFileName("12345")).toBe("cursor.12345.json");
expect(cursorFileName("a_b-9")).toBe("cursor.a_b-9.json");
});
it("rejects keys that would break filename round-trip or escape the dir", () => {
expect(() => cursorFileName("../etc")).toThrow();
expect(() => cursorFileName("a/b")).toThrow();
expect(() => cursorFileName("a.b")).toThrow();
});
});
describe("parseSessionKey", () => {
it("extracts the key from a per-session file", () => {
expect(parseSessionKey("cursor.12345.json")).toBe("12345");
expect(parseSessionKey("cursor.a_b-9.json")).toBe("a_b-9");
});
it("returns null for the primary file and unrelated files (round-trips cursorFileName)", () => {
expect(parseSessionKey("cursor.json")).toBeNull();
expect(parseSessionKey("bot.pid")).toBeNull();
expect(parseSessionKey(".env")).toBeNull();
expect(parseSessionKey("cursor.12345.json.tmp.999")).toBeNull();
// Round-trip invariant for valid keys.
for (const key of ["12345", "a_b-9"]) {
expect(parseSessionKey(cursorFileName(key))).toBe(key);
}
});
});
describe("CursorStore", () => {
let dir: string;
beforeEach(() => {
dir = freshDir();
});
afterEach(() => {
rmSync(dir, { recursive: true, force: true });
});
it("primary vs secondary pick distinct files", () => {
expect(new CursorStore({ stateDir: dir }).fileName).toBe("cursor.json");
expect(new CursorStore({ stateDir: dir, sessionKey: "777" }).fileName).toBe("cursor.777.json");
});
it("load on a missing file yields an empty store (first run)", () => {
const store = new CursorStore({ stateDir: dir }).load();
expect(store.size).toBe(0);
expect(store.get("ws-1")).toBeUndefined();
});
it("round-trips set → save → reload", () => {
const a = new CursorStore({ stateDir: dir });
a.set("ws-1", "act-100");
a.set("ws-2", "act-200");
a.save();
const b = new CursorStore({ stateDir: dir }).load();
expect(b.get("ws-1")).toBe("act-100");
expect(b.get("ws-2")).toBe("act-200");
expect(b.size).toBe(2);
});
it("delete then save drops the key on disk", () => {
const a = new CursorStore({ stateDir: dir });
a.set("ws-1", "act-100");
a.set("ws-2", "act-200");
a.save();
expect(a.delete("ws-1")).toBe(true);
a.save();
const b = new CursorStore({ stateDir: dir }).load();
expect(b.has("ws-1")).toBe(false);
expect(b.get("ws-2")).toBe("act-200");
});
it("treats a corrupt file as first-run and reports via onLoadError", () => {
writeFileSync(join(dir, "cursor.json"), "{not json");
const errs: unknown[] = [];
const store = new CursorStore({ stateDir: dir, onLoadError: (e) => errs.push(e) }).load();
expect(store.size).toBe(0);
expect(errs).toHaveLength(1);
});
it("ignores non-string / empty values in the persisted object", () => {
writeFileSync(
join(dir, "cursor.json"),
JSON.stringify({ "ws-1": "act-1", "ws-2": 42, "ws-3": "", "ws-4": null }),
);
const store = new CursorStore({ stateDir: dir }).load();
expect(store.get("ws-1")).toBe("act-1");
expect(store.has("ws-2")).toBe(false);
expect(store.has("ws-3")).toBe(false);
expect(store.has("ws-4")).toBe(false);
});
it("save is atomic — no temp file lingers and the JSON is well-formed", () => {
const a = new CursorStore({ stateDir: dir });
a.set("ws-1", "act-100");
a.save();
const leftovers = readdirSync(dir).filter((n) => n.includes(".tmp."));
expect(leftovers).toEqual([]);
expect(JSON.parse(readFileSync(join(dir, "cursor.json"), "utf8"))).toEqual({ "ws-1": "act-100" });
});
it("unlink removes the backing file and is a no-op when already gone", () => {
const a = new CursorStore({ stateDir: dir, sessionKey: "777" });
a.set("ws-1", "act-1");
a.save();
expect(existsSync(join(dir, "cursor.777.json"))).toBe(true);
a.unlink();
expect(existsSync(join(dir, "cursor.777.json"))).toBe(false);
expect(() => a.unlink()).not.toThrow();
});
});
describe("pruneOrphanCursors", () => {
let dir: string;
beforeEach(() => {
dir = freshDir();
});
afterEach(() => {
rmSync(dir, { recursive: true, force: true });
});
it("removes only dead per-session files; keeps primary, live sessions, and unrelated files", () => {
writeFileSync(join(dir, "cursor.json"), "{}"); // primary — never pruned
writeFileSync(join(dir, "cursor.111.json"), "{}"); // dead session
writeFileSync(join(dir, "cursor.222.json"), "{}"); // live session
writeFileSync(join(dir, "bot.pid"), "222"); // unrelated — never pruned
const pruned = pruneOrphanCursors(dir, (key) => key === "222");
expect(pruned).toEqual(["cursor.111.json"]);
const remaining = readdirSync(dir).sort();
expect(remaining).toEqual(["bot.pid", "cursor.222.json", "cursor.json"]);
});
it("never deletes a cursor whose liveness probe throws", () => {
writeFileSync(join(dir, "cursor.111.json"), "{}");
const pruned = pruneOrphanCursors(dir, () => {
throw new Error("probe blew up");
});
expect(pruned).toEqual([]);
expect(existsSync(join(dir, "cursor.111.json"))).toBe(true);
});
it("tolerates a missing state dir", () => {
expect(pruneOrphanCursors(join(dir, "does-not-exist"), () => false)).toEqual([]);
});
});
+214
View File
@@ -0,0 +1,214 @@
/**
* session-cursor — session-namespaced, durable since_id cursor store for
* /activity-polling adapters.
*
* Shared contract surface for any TS adapter that polls
* GET /workspaces/:id/activity?since_id=<cursor>
* and must persist "the activity_logs.id of the last event I delivered" so a
* restart resumes without missing or replaying messages. The channel plugin
* had this inline; hermes-ts / codex-ts will need the identical behavior.
* Extracted here (beside `inbox-uploads` / `targets`) so the polling-cursor
* contract has one implementation, per the cross-adapter SSOT pattern.
*
* WHY SESSION-NAMESPACED:
* A single host can run more than one adapter session (two `claude`
* invocations both loading the plugin). They poll the same workspace_id,
* but the platform is fully concurrent (register/heartbeat are
* workspace-keyed last-writer-wins, /activity is read-only with a
* client-driven since_id — molecule-core registry.go / activity.go). The
* ONLY thing that races is a *shared* cursor file. Keying the cursor file
* by a session key removes that race so concurrent sessions don't clobber
* each other (molecule-mcp-claude-channel#26 / internal#726).
*
* - Primary (no session key) → `cursor.json` — survives restarts, so the
* common single-session case resumes from its last position.
* - Secondary (session key) → `cursor.<key>.json` — independent; pruned
* when its session is gone.
*
* Logging-agnostic on purpose: `load()` swallows corruption (optionally
* reporting via `onLoadError`) and `save()` throws — the adapter owns its
* stderr/pino phrasing and decides whether a failed tick should be fatal.
*/
import {
existsSync,
readFileSync,
writeFileSync,
renameSync,
unlinkSync,
readdirSync,
} from "node:fs";
import { join } from "node:path";
const PRIMARY_FILE = "cursor.json";
const SESSION_RE = /^cursor\.([A-Za-z0-9_-]+)\.json$/;
const VALID_KEY = /^[A-Za-z0-9_-]+$/;
/**
* Map a session key to its cursor filename.
* undefined / null / "" → "cursor.json" (primary; survives restarts)
* "12345" → "cursor.12345.json" (secondary; per-session)
* Throws on a key that would break filename round-tripping or escape the
* state dir (path separators, dots). Callers pass a PID string, always valid.
*/
export function cursorFileName(sessionKey?: string | null): string {
const key = (sessionKey ?? "").trim();
if (!key) return PRIMARY_FILE;
if (!VALID_KEY.test(key)) {
throw new Error(
`session key must match ${VALID_KEY} (got ${JSON.stringify(sessionKey)})`,
);
}
return `cursor.${key}.json`;
}
/**
* Inverse of {@link cursorFileName} for secondary files. Returns the session
* key for a `cursor.<key>.json` file, or null for the primary `cursor.json`
* and any unrelated file. Used to identify prunable per-session files.
*/
export function parseSessionKey(fileName: string): string | null {
const m = SESSION_RE.exec(fileName);
return m ? m[1]! : null;
}
/**
* Delete per-session cursor files whose session is no longer alive. Never
* touches the primary `cursor.json` or unrelated files. `isAlive(key)` is
* supplied by the adapter (e.g. a PID-liveness probe). Returns the list of
* removed filenames (for logging). Tolerant of a missing state dir.
*/
export function pruneOrphanCursors(
stateDir: string,
isAlive: (sessionKey: string) => boolean,
): string[] {
const pruned: string[] = [];
let names: string[];
try {
names = readdirSync(stateDir);
} catch {
return pruned;
}
for (const name of names) {
const key = parseSessionKey(name);
if (key === null) continue; // primary or unrelated
let alive = true;
try {
alive = isAlive(key);
} catch {
// A probe that throws is treated as "alive" — never delete a cursor we
// can't prove is orphaned.
alive = true;
}
if (alive) continue;
try {
unlinkSync(join(stateDir, name));
pruned.push(name);
} catch {
// Already gone or unreadable — nothing to do.
}
}
return pruned;
}
export interface CursorStoreOptions {
/** Directory holding the cursor file(s). */
stateDir: string;
/** Session key; null/undefined => the shared primary cursor. */
sessionKey?: string | null;
/** File mode for the cursor file. Defaults to 0o600 (it's not secret, but cheap to lock down). */
fileMode?: number;
/** Optional hook invoked when {@link CursorStore.load} hits an unreadable/corrupt file. */
onLoadError?: (err: unknown) => void;
}
/**
* A workspace_id → last-delivered-activity-id map backed by one JSON file.
*
* Schema on disk: `{ "ws-uuid-1": "act-uuid-X", "ws-uuid-2": "act-uuid-Y" }`.
* Atomic persistence via temp+rename so a crash mid-write can't corrupt the
* file (the previous cursor stays valid; worst case is a few replays).
*/
export class CursorStore {
/** Filename within the state dir (e.g. "cursor.json" or "cursor.123.json"). */
readonly fileName: string;
/** Absolute path to the backing file. */
readonly path: string;
private readonly fileMode: number;
private readonly onLoadError?: (err: unknown) => void;
private readonly cursors = new Map<string, string>();
constructor(opts: CursorStoreOptions) {
this.fileName = cursorFileName(opts.sessionKey);
this.path = join(opts.stateDir, this.fileName);
this.fileMode = opts.fileMode ?? 0o600;
this.onLoadError = opts.onLoadError;
}
/**
* Populate from disk. Missing file => empty (first run). Corrupt file =>
* empty (treated as first run; `onLoadError` is invoked if provided). Never
* throws — a poller that refuses to start over one bad file is worse than
* the recovery cost (re-seed from now). Returns `this` for chaining.
*/
load(): this {
this.cursors.clear();
if (!existsSync(this.path)) return this;
try {
const parsed = JSON.parse(readFileSync(this.path, "utf8")) as Record<string, unknown>;
for (const [k, v] of Object.entries(parsed)) {
if (typeof v === "string" && v.length > 0) this.cursors.set(k, v);
}
} catch (err) {
this.cursors.clear();
this.onLoadError?.(err);
}
return this;
}
get(workspaceId: string): string | undefined {
return this.cursors.get(workspaceId);
}
has(workspaceId: string): boolean {
return this.cursors.has(workspaceId);
}
set(workspaceId: string, activityId: string): void {
this.cursors.set(workspaceId, activityId);
}
delete(workspaceId: string): boolean {
return this.cursors.delete(workspaceId);
}
entries(): Array<[string, string]> {
return Array.from(this.cursors.entries());
}
get size(): number {
return this.cursors.size;
}
/**
* Atomically persist to disk (temp + rename). The temp name is PID-suffixed
* so two writers never collide on the temp path. Throws on write failure —
* the caller (typically a setInterval tick) decides whether to log+swallow.
*/
save(): void {
const obj: Record<string, string> = {};
for (const [k, v] of this.cursors) obj[k] = v;
const tmp = `${this.path}.tmp.${process.pid}`;
writeFileSync(tmp, JSON.stringify(obj, null, 2), { mode: this.fileMode });
renameSync(tmp, this.path);
}
/** Remove the backing file. Used by a secondary session on clean exit. No-op if already gone. */
unlink(): void {
try {
unlinkSync(this.path);
} catch {
// Already removed or never written.
}
}
}