forked from molecule-ai/molecule-core
Merge pull request #2897 from Molecule-AI/staging
staging → main: auto-promote 2cb1b26
This commit is contained in:
commit
33fabdf483
@ -55,6 +55,7 @@ TOP_LEVEL_MODULES = {
|
||||
"a2a_executor",
|
||||
"a2a_mcp_server",
|
||||
"a2a_tools",
|
||||
"a2a_tools_rbac",
|
||||
"adapter_base",
|
||||
"agent",
|
||||
"agents_md",
|
||||
@ -75,6 +76,9 @@ TOP_LEVEL_MODULES = {
|
||||
"internal_file_read",
|
||||
"main",
|
||||
"mcp_cli",
|
||||
"mcp_heartbeat",
|
||||
"mcp_inbox_pollers",
|
||||
"mcp_workspace_resolver",
|
||||
"molecule_ai_status",
|
||||
"not_configured_handler",
|
||||
"platform_auth",
|
||||
|
||||
@ -1,11 +1,15 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"go/ast"
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@ -119,6 +123,60 @@ func TestLookupExistingChild_DBError_Propagates(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// workspacesInsertRE matches a SQL literal that begins (after optional
|
||||
// leading whitespace) with `INSERT INTO workspaces` followed by `(` —
|
||||
// requiring the open-paren rules out lookalikes like
|
||||
// `INSERT INTO workspaces_audit`, `INSERT INTO workspace_secrets`,
|
||||
// `INSERT INTO workspace_channels`, `INSERT INTO canvas_layouts`. The
|
||||
// previous bytes.Index gate accepted `workspaces_audit` as a prefix
|
||||
// match — see RFC #2872 Important-1 for the silent-false-pass shape.
|
||||
var workspacesInsertRE = regexp.MustCompile(`(?s)^\s*INSERT\s+INTO\s+workspaces\s*\(`)
|
||||
|
||||
// findLookupAndWorkspacesInsertPos walks the AST of `src` and returns
|
||||
// the source positions of (a) the first call to `lookupExistingChild`
|
||||
// and (b) the first CallExpr whose argument list contains a STRING
|
||||
// BasicLit matching workspacesInsertRE. Either may be token.NoPos if
|
||||
// not found.
|
||||
//
|
||||
// Extracted as a helper so the gate logic can be exercised against
|
||||
// synthetic source — TestGate_FailsWhenLookupAfterInsert below proves
|
||||
// the gate actually catches the bug shape, not just the happy path.
|
||||
func findLookupAndWorkspacesInsertPos(t *testing.T, fname string, src []byte) (lookupPos, insertPos token.Pos, fset *token.FileSet) {
|
||||
t.Helper()
|
||||
fset = token.NewFileSet()
|
||||
file, err := parser.ParseFile(fset, fname, src, parser.ParseComments)
|
||||
if err != nil {
|
||||
t.Fatalf("parse %s: %v", fname, err)
|
||||
}
|
||||
lookupPos, insertPos = token.NoPos, token.NoPos
|
||||
ast.Inspect(file, func(n ast.Node) bool {
|
||||
call, ok := n.(*ast.CallExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
if sel, ok := call.Fun.(*ast.SelectorExpr); ok {
|
||||
if sel.Sel.Name == "lookupExistingChild" && lookupPos == token.NoPos {
|
||||
lookupPos = call.Pos()
|
||||
}
|
||||
}
|
||||
for _, arg := range call.Args {
|
||||
lit, ok := arg.(*ast.BasicLit)
|
||||
if !ok || lit.Kind != token.STRING {
|
||||
continue
|
||||
}
|
||||
raw := lit.Value
|
||||
if unq, err := strconv.Unquote(raw); err == nil {
|
||||
raw = unq
|
||||
}
|
||||
if workspacesInsertRE.MatchString(raw) && insertPos == token.NoPos {
|
||||
insertPos = call.Pos()
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Source-level guard — pins that org_import.go calls
|
||||
// h.lookupExistingChild BEFORE its INSERT INTO workspaces.
|
||||
//
|
||||
@ -126,6 +184,11 @@ func TestLookupExistingChild_DBError_Propagates(t *testing.T) {
|
||||
// (idempotency check before INSERT), not just function names. If a
|
||||
// future refactor reintroduces the un-checked INSERT (the original
|
||||
// bug shape that leaked 72 workspaces in 4 days), this test fails.
|
||||
//
|
||||
// AST-walk implementation closes the silent-false-pass mode that the
|
||||
// previous bytes.Index gate had — see workspacesInsertRE comment for
|
||||
// the failure mode (workspaces_audit / workspace_secrets / etc.
|
||||
// shadowing the real target via prefix match).
|
||||
func TestCreateWorkspaceTree_CallsLookupBeforeInsert(t *testing.T) {
|
||||
wd, err := os.Getwd()
|
||||
if err != nil {
|
||||
@ -135,17 +198,189 @@ func TestCreateWorkspaceTree_CallsLookupBeforeInsert(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("read org_import.go: %v", err)
|
||||
}
|
||||
lookupPos, insertPos, fset := findLookupAndWorkspacesInsertPos(t, "org_import.go", src)
|
||||
|
||||
lookupAt := bytes.Index(src, []byte("h.lookupExistingChild("))
|
||||
insertAt := bytes.Index(src, []byte("INSERT INTO workspaces"))
|
||||
|
||||
if lookupAt < 0 {
|
||||
t.Fatalf("org_import.go missing call to h.lookupExistingChild — idempotency check removed?")
|
||||
if lookupPos == token.NoPos {
|
||||
t.Fatalf("AST: no call to lookupExistingChild in org_import.go — idempotency check removed?")
|
||||
}
|
||||
if insertAt < 0 {
|
||||
t.Fatalf("org_import.go missing INSERT INTO workspaces — schema change?")
|
||||
if insertPos == token.NoPos {
|
||||
t.Fatalf("AST: no SQL literal matching `^\\s*INSERT INTO workspaces\\s*\\(` in any CallExpr in org_import.go — schema change or rename?")
|
||||
}
|
||||
if lookupAt > insertAt {
|
||||
t.Errorf("h.lookupExistingChild must come BEFORE INSERT INTO workspaces in org_import.go (lookup@%d, insert@%d) — non-idempotent ordering would re-leak under repeat /org/import calls", lookupAt, insertAt)
|
||||
if lookupPos > insertPos {
|
||||
t.Errorf("lookupExistingChild call at %s must come BEFORE INSERT INTO workspaces at %s — non-idempotent ordering would re-leak under repeat /org/import calls",
|
||||
fset.Position(lookupPos), fset.Position(insertPos))
|
||||
}
|
||||
}
|
||||
|
||||
// TestGate_FailsWhenLookupAfterInsert proves the gate actually catches
|
||||
// the bug it's named after — running it against synthetic Go source
|
||||
// where the lookup call is positioned AFTER the workspaces INSERT must
|
||||
// produce lookupPos > insertPos, which the production gate flags as
|
||||
// an ERROR. Without this test the gate could regress to "always pass"
|
||||
// and we wouldn't notice until the bug shipped again.
|
||||
//
|
||||
// Per memory feedback_assert_exact_not_substring.md: verify a
|
||||
// tightened test FAILS on old code before merging.
|
||||
func TestGate_FailsWhenLookupAfterInsert(t *testing.T) {
|
||||
const buggySrc = `package handlers
|
||||
|
||||
import "context"
|
||||
|
||||
type fakeDB struct{}
|
||||
|
||||
func (fakeDB) ExecContext(ctx context.Context, sql string, args ...interface{}) {}
|
||||
|
||||
type fakeOrgHandler struct{}
|
||||
|
||||
func (h *fakeOrgHandler) lookupExistingChild(ctx context.Context, name string, parentID *string) (string, bool, error) {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
func buggyCreate(h *fakeOrgHandler, db fakeDB, ctx context.Context, name string, parentID *string) {
|
||||
// Bug shape: INSERT runs FIRST, lookup runs AFTER. This is the
|
||||
// non-idempotent ordering the gate exists to forbid.
|
||||
db.ExecContext(ctx, ` + "`INSERT INTO workspaces (id, name) VALUES ($1, $2)`" + `, "x", name)
|
||||
h.lookupExistingChild(ctx, name, parentID)
|
||||
}
|
||||
`
|
||||
lookupPos, insertPos, _ := findLookupAndWorkspacesInsertPos(t, "buggy.go", []byte(buggySrc))
|
||||
if lookupPos == token.NoPos || insertPos == token.NoPos {
|
||||
t.Fatalf("synthetic buggy source missing expected nodes (lookupPos=%v insertPos=%v) — helper logic regression", lookupPos, insertPos)
|
||||
}
|
||||
if lookupPos < insertPos {
|
||||
t.Fatalf("synthetic bug shape (lookup AFTER insert) returned lookupPos=%d < insertPos=%d — gate would NOT fire on actual bug, regression!", lookupPos, insertPos)
|
||||
}
|
||||
// Implicit: lookupPos > insertPos here, which the production gate
|
||||
// flags via t.Errorf. This proves the gate is live, not vestigial.
|
||||
}
|
||||
|
||||
// TestGate_IgnoresAuditTableShadow proves the regex tightening
|
||||
// actually ignores `INSERT INTO workspaces_audit` literals — the
|
||||
// specific shape #2872 cited as the silent-false-pass failure mode
|
||||
// for the previous bytes.Index gate.
|
||||
func TestGate_IgnoresAuditTableShadow(t *testing.T) {
|
||||
// Synthetic source with audit-table INSERT at line 1 (would be
|
||||
// position 0 under prefix-match) and lookup + real INSERT at later
|
||||
// positions. With the tightened regex, the audit literal is
|
||||
// ignored: insertPos points at the REAL INSERT, lookup precedes it,
|
||||
// gate passes correctly.
|
||||
const src = `package handlers
|
||||
|
||||
import "context"
|
||||
|
||||
type fakeDB struct{}
|
||||
|
||||
func (fakeDB) ExecContext(ctx context.Context, sql string, args ...interface{}) {}
|
||||
|
||||
type fakeOrgHandler struct{}
|
||||
|
||||
func (h *fakeOrgHandler) lookupExistingChild(ctx context.Context, name string, parentID *string) (string, bool, error) {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
func okCreateWithAudit(h *fakeOrgHandler, db fakeDB, ctx context.Context, name string, parentID *string) {
|
||||
// Audit-table INSERT — should be IGNORED by the tightened regex.
|
||||
db.ExecContext(ctx, ` + "`INSERT INTO workspaces_audit (id, action) VALUES ($1, $2)`" + `, "x", "create_attempt")
|
||||
// Lookup BEFORE real INSERT — correct order.
|
||||
h.lookupExistingChild(ctx, name, parentID)
|
||||
// Real INSERT.
|
||||
db.ExecContext(ctx, ` + "`INSERT INTO workspaces (id, name) VALUES ($1, $2)`" + `, "x", name)
|
||||
}
|
||||
`
|
||||
lookupPos, insertPos, fset := findLookupAndWorkspacesInsertPos(t, "shadow.go", []byte(src))
|
||||
if lookupPos == token.NoPos || insertPos == token.NoPos {
|
||||
t.Fatalf("expected to find lookup + real INSERT, got lookupPos=%v insertPos=%v", lookupPos, insertPos)
|
||||
}
|
||||
// The audit-table INSERT is at line ~16 (column ~20-ish), the
|
||||
// lookup is at line 19, the real INSERT is at line 21. If the
|
||||
// regex regressed to prefix-match, insertPos would point at the
|
||||
// audit literal at line 16, and the gate would falsely fail
|
||||
// (lookup at 19 > "insert" at 16). With the tightened regex,
|
||||
// insertPos correctly points at line 21, and the gate passes.
|
||||
insertLine := fset.Position(insertPos).Line
|
||||
lookupLine := fset.Position(lookupPos).Line
|
||||
if insertLine < lookupLine {
|
||||
t.Errorf("regex regressed: audit shadow at line %d swallowed real INSERT (lookup at line %d). insertPos should point at the real INSERT (line ~21), not the audit literal.",
|
||||
insertLine, lookupLine)
|
||||
}
|
||||
if lookupPos > insertPos {
|
||||
t.Errorf("synthetic source has lookup at line %d before real INSERT at line %d, gate should pass (lookupPos < insertPos), got lookupPos=%d > insertPos=%d",
|
||||
lookupLine, insertLine, lookupPos, insertPos)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkspacesInsertRE_RejectsLookalikes pins the regex that
|
||||
// discriminates the real workspaces INSERT from prefix-matching
|
||||
// lookalikes. If this regex regresses to a substring match, the
|
||||
// AST gate above silently false-passes when a future refactor
|
||||
// shadows the real INSERT with a workspaces_audit / workspace_secrets
|
||||
// / canvas_layouts literal placed earlier in source.
|
||||
func TestWorkspacesInsertRE_RejectsLookalikes(t *testing.T) {
|
||||
cases := []struct {
|
||||
sql string
|
||||
want bool
|
||||
comment string
|
||||
}{
|
||||
{"INSERT INTO workspaces (id, name) VALUES ($1, $2)", true, "real target"},
|
||||
{"\n\t\tINSERT INTO workspaces (id, name)\n\t\tVALUES ($1, $2)", true, "real target with leading whitespace + newlines (raw string literal shape)"},
|
||||
{"INSERT INTO workspaces_audit (id) VALUES ($1)", false, "underscore-suffix lookalike (the #2872 specific failure mode)"},
|
||||
{"INSERT INTO workspace_secrets (key, value) VALUES ($1, $2)", false, "prefix without trailing 's' (workspace_*)"},
|
||||
{"INSERT INTO workspace_channels (id) VALUES ($1)", false, "another workspace_* prefix"},
|
||||
{"INSERT INTO canvas_layouts (workspace_id, x, y) VALUES ($1, $2, $3)", false, "unrelated table that contains 'workspace' in a column ref"},
|
||||
{"UPDATE workspaces SET status='running' WHERE id=$1", false, "UPDATE shouldn't match"},
|
||||
{"SELECT * FROM workspaces WHERE id=$1", false, "SELECT shouldn't match"},
|
||||
{"-- comment about INSERT INTO workspaces (\nSELECT 1", false, "comment shouldn't match"},
|
||||
}
|
||||
for _, c := range cases {
|
||||
got := workspacesInsertRE.MatchString(c.sql)
|
||||
if got != c.want {
|
||||
t.Errorf("workspacesInsertRE.MatchString(%q) = %v, want %v (%s)", c.sql, got, c.want, c.comment)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Confirm the regex actually matches the literal currently in
|
||||
// org_import.go. Pins the shape so `gofmt` reflows or trivial edits
|
||||
// to the SQL string don't silently disable the gate above.
|
||||
func TestWorkspacesInsertRE_MatchesActualSourceLiteral(t *testing.T) {
|
||||
wd, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("getwd: %v", err)
|
||||
}
|
||||
src, err := os.ReadFile(filepath.Join(wd, "org_import.go"))
|
||||
if err != nil {
|
||||
t.Fatalf("read org_import.go: %v", err)
|
||||
}
|
||||
// Strip backtick strings, find any whose content matches.
|
||||
// Walk the source via parser.ParseFile to avoid string-search
|
||||
// drift if the literal is reflowed.
|
||||
fset := token.NewFileSet()
|
||||
file, err := parser.ParseFile(fset, filepath.Join(wd, "org_import.go"), src, parser.ParseComments)
|
||||
if err != nil {
|
||||
t.Fatalf("parse org_import.go: %v", err)
|
||||
}
|
||||
var matched bool
|
||||
ast.Inspect(file, func(n ast.Node) bool {
|
||||
lit, ok := n.(*ast.BasicLit)
|
||||
if !ok || lit.Kind != token.STRING {
|
||||
return true
|
||||
}
|
||||
raw := lit.Value
|
||||
if unq, err := strconv.Unquote(raw); err == nil {
|
||||
raw = unq
|
||||
}
|
||||
if workspacesInsertRE.MatchString(raw) {
|
||||
matched = true
|
||||
}
|
||||
return true
|
||||
})
|
||||
if !matched {
|
||||
t.Fatalf("no SQL literal in org_import.go matches workspacesInsertRE — gate is dead. Either the INSERT was renamed (update the regex) or the file was restructured (review the gate logic).")
|
||||
}
|
||||
// strings.Contains keeps the test informative: if the regex
|
||||
// stopped matching but the literal source still contains the
|
||||
// magic phrase, that's a regex-side failure (test the fix above).
|
||||
if !strings.Contains(string(src), "INSERT INTO workspaces") {
|
||||
t.Fatalf("org_import.go has no `INSERT INTO workspaces` substring at all — schema change?")
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,96 +28,20 @@ from platform_auth import list_registered_workspaces
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RBAC helpers (mirror builtin_tools/audit.py for a2a_tools isolation)
|
||||
# RBAC + auth helpers — extracted to a2a_tools_rbac (RFC #2873 iter 4a).
|
||||
# Re-exported here under the legacy underscore names so existing tests'
|
||||
# patch("a2a_tools._check_memory_write_permission", …) and call sites
|
||||
# inside this module that resolve bare names against the module-level
|
||||
# namespace continue to work unchanged.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ROLE_PERMISSIONS = {
|
||||
"admin": {"delegate", "approve", "memory.read", "memory.write"},
|
||||
"operator": {"delegate", "approve", "memory.read", "memory.write"},
|
||||
"read-only": {"memory.read"},
|
||||
"no-delegation": {"approve", "memory.read", "memory.write"},
|
||||
"no-approval": {"delegate", "memory.read", "memory.write"},
|
||||
"memory-readonly": {"memory.read"},
|
||||
}
|
||||
|
||||
|
||||
def _get_workspace_tier() -> int:
|
||||
"""Return the workspace tier from config (0 = root, 1+ = tenant)."""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
return getattr(cfg, "tier", 1)
|
||||
except Exception:
|
||||
return int(os.environ.get("WORKSPACE_TIER", 1))
|
||||
|
||||
|
||||
def _check_memory_write_permission() -> bool:
|
||||
"""Return True if this workspace's RBAC roles grant memory.write."""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
roles = list(getattr(cfg, "rbac", None).roles or ["operator"])
|
||||
allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {})
|
||||
except Exception:
|
||||
# Fail closed: deny when config is unavailable
|
||||
roles = ["operator"]
|
||||
allowed = {}
|
||||
|
||||
for role in roles:
|
||||
if role == "admin":
|
||||
return True
|
||||
if role in allowed:
|
||||
if "memory.write" in allowed[role]:
|
||||
return True
|
||||
elif role in _ROLE_PERMISSIONS and "memory.write" in _ROLE_PERMISSIONS[role]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _check_memory_read_permission() -> bool:
|
||||
"""Return True if this workspace's RBAC roles grant memory.read."""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
roles = list(getattr(cfg, "rbac", None).roles or ["operator"])
|
||||
allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {})
|
||||
except Exception:
|
||||
roles = ["operator"]
|
||||
allowed = {}
|
||||
|
||||
for role in roles:
|
||||
if role == "admin":
|
||||
return True
|
||||
if role in allowed:
|
||||
if "memory.read" in allowed[role]:
|
||||
return True
|
||||
elif role in _ROLE_PERMISSIONS and "memory.read" in _ROLE_PERMISSIONS[role]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _is_root_workspace() -> bool:
|
||||
"""Return True if this workspace is tier 0 (root/root-org)."""
|
||||
return _get_workspace_tier() == 0
|
||||
|
||||
|
||||
def _auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]:
|
||||
"""Return Phase 30.1 auth headers; tolerate platform_auth being absent
|
||||
in older installs (e.g. during rolling upgrade).
|
||||
|
||||
``workspace_id`` selects the per-workspace token from the multi-
|
||||
workspace registry when set (PR-1: external agent registered in
|
||||
multiple workspaces). With no arg the legacy single-token path is
|
||||
unchanged.
|
||||
"""
|
||||
try:
|
||||
from platform_auth import auth_headers
|
||||
return auth_headers(workspace_id) if workspace_id else auth_headers()
|
||||
except Exception:
|
||||
return {}
|
||||
from a2a_tools_rbac import ( # noqa: E402 (import after the from-a2a_client block)
|
||||
_auth_headers_for_heartbeat,
|
||||
_check_memory_read_permission,
|
||||
_check_memory_write_permission,
|
||||
_get_workspace_tier,
|
||||
_is_root_workspace,
|
||||
_ROLE_PERMISSIONS,
|
||||
)
|
||||
|
||||
|
||||
# Per-field caps on the heartbeat / activity payload. Borrowed from
|
||||
|
||||
138
workspace/a2a_tools_rbac.py
Normal file
138
workspace/a2a_tools_rbac.py
Normal file
@ -0,0 +1,138 @@
|
||||
"""RBAC + auth-header helpers shared by all a2a_tools tool handlers.
|
||||
|
||||
Extracted from ``a2a_tools.py`` (RFC #2873 iter 4a). Centralises the
|
||||
"what can this workspace do" + "how do I prove it on a platform call"
|
||||
concerns into a single module so:
|
||||
|
||||
* Future tools added under ``a2a_tools/`` see one obvious helper to
|
||||
call instead of re-implementing the role/tier check.
|
||||
* The role-permission table is in ONE place — adding a new role
|
||||
or capability touches one file, not every tool that gates on it.
|
||||
* Tests targeting these helpers don't have to import the whole
|
||||
991-LOC ``a2a_tools`` surface.
|
||||
|
||||
Public surface:
|
||||
|
||||
* ``ROLE_PERMISSIONS`` — canonical role → action set table.
|
||||
* ``get_workspace_tier()`` — config-resolved tier (0 = root).
|
||||
* ``check_memory_write_permission()`` — boolean.
|
||||
* ``check_memory_read_permission()`` — boolean.
|
||||
* ``is_root_workspace()`` — boolean (tier == 0).
|
||||
* ``auth_headers_for_heartbeat(workspace_id=None)`` — auth-header dict
|
||||
with the multi-workspace registry lookup; tolerates ``platform_auth``
|
||||
missing on older installs (returns ``{}``).
|
||||
|
||||
Underscore-prefixed back-compat aliases (``_ROLE_PERMISSIONS``,
|
||||
``_check_memory_write_permission``, etc.) match the names previously
|
||||
exposed in ``a2a_tools`` so existing tests'
|
||||
``patch("a2a_tools._foo", ...)`` continue to work via the re-exports
|
||||
in ``a2a_tools.py``.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
|
||||
# Mirror ``builtin_tools/audit.py`` for a2a_tools isolation. Listed as a
|
||||
# module-level constant rather than computed lazily so the table is
|
||||
# discoverable in static analysis + ``grep``.
|
||||
ROLE_PERMISSIONS: dict[str, set[str]] = {
|
||||
"admin": {"delegate", "approve", "memory.read", "memory.write"},
|
||||
"operator": {"delegate", "approve", "memory.read", "memory.write"},
|
||||
"read-only": {"memory.read"},
|
||||
"no-delegation": {"approve", "memory.read", "memory.write"},
|
||||
"no-approval": {"delegate", "memory.read", "memory.write"},
|
||||
"memory-readonly": {"memory.read"},
|
||||
}
|
||||
|
||||
|
||||
def get_workspace_tier() -> int:
|
||||
"""Return the workspace tier from config (0 = root, 1+ = tenant)."""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
return getattr(cfg, "tier", 1)
|
||||
except Exception:
|
||||
return int(os.environ.get("WORKSPACE_TIER", 1))
|
||||
|
||||
|
||||
def _resolve_role_state() -> tuple[list[str], dict]:
|
||||
"""Return (roles, allowed_actions) from config.
|
||||
|
||||
Fail-closed: if config is unavailable, fall back to an "operator"
|
||||
default with no per-role overrides. Operator has memory.read +
|
||||
memory.write but not the elevated approve/delegate over GLOBAL
|
||||
scope, so a config outage doesn't grant unexpected privileges.
|
||||
"""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
roles = list(getattr(cfg, "rbac", None).roles or ["operator"])
|
||||
allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {})
|
||||
return roles, allowed
|
||||
except Exception:
|
||||
return ["operator"], {}
|
||||
|
||||
|
||||
def check_memory_write_permission() -> bool:
|
||||
"""Return True if this workspace's RBAC roles grant memory.write."""
|
||||
roles, allowed = _resolve_role_state()
|
||||
for role in roles:
|
||||
if role == "admin":
|
||||
return True
|
||||
if role in allowed:
|
||||
if "memory.write" in allowed[role]:
|
||||
return True
|
||||
elif role in ROLE_PERMISSIONS and "memory.write" in ROLE_PERMISSIONS[role]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def check_memory_read_permission() -> bool:
|
||||
"""Return True if this workspace's RBAC roles grant memory.read."""
|
||||
roles, allowed = _resolve_role_state()
|
||||
for role in roles:
|
||||
if role == "admin":
|
||||
return True
|
||||
if role in allowed:
|
||||
if "memory.read" in allowed[role]:
|
||||
return True
|
||||
elif role in ROLE_PERMISSIONS and "memory.read" in ROLE_PERMISSIONS[role]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_root_workspace() -> bool:
|
||||
"""Return True if this workspace is tier 0 (root/root-org)."""
|
||||
return get_workspace_tier() == 0
|
||||
|
||||
|
||||
def auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]:
|
||||
"""Return Phase 30.1 auth headers; tolerate platform_auth being absent
|
||||
in older installs (e.g. during rolling upgrade).
|
||||
|
||||
``workspace_id`` selects the per-workspace token from the multi-
|
||||
workspace registry when set (PR-1: external agent registered in
|
||||
multiple workspaces). With no arg the legacy single-token path is
|
||||
unchanged.
|
||||
"""
|
||||
try:
|
||||
from platform_auth import auth_headers
|
||||
return auth_headers(workspace_id) if workspace_id else auth_headers()
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
# ============== Back-compat aliases for the previous a2a_tools names ==============
|
||||
# Tests + downstream call sites refer to the pre-extract names; aliasing
|
||||
# keeps both forms valid. The new public names (no underscore prefix)
|
||||
# are preferred for new code.
|
||||
|
||||
_ROLE_PERMISSIONS = ROLE_PERMISSIONS
|
||||
_get_workspace_tier = get_workspace_tier
|
||||
_check_memory_write_permission = check_memory_write_permission
|
||||
_check_memory_read_permission = check_memory_read_permission
|
||||
_is_root_workspace = is_root_workspace
|
||||
_auth_headers_for_heartbeat = auth_headers_for_heartbeat
|
||||
@ -31,422 +31,53 @@ dependency via ``a2a-sdk``.
|
||||
In-container usage (``python -m molecule_runtime.a2a_mcp_server`` or
|
||||
direct import) bypasses this wrapper — the workspace runtime has its
|
||||
own heartbeat loop in ``heartbeat.py`` so we don't double-heartbeat.
|
||||
|
||||
Module layout (RFC #2873 iter 3 split):
|
||||
* ``mcp_heartbeat`` — register POST + heartbeat loop + auth-failure
|
||||
escalation + inbound-secret persistence.
|
||||
* ``mcp_workspace_resolver`` — env validation, single + multi-workspace
|
||||
resolution, operator-help printer, on-disk token-file read.
|
||||
* ``mcp_inbox_pollers`` — activate the inbox singleton + spawn one
|
||||
daemon poller per workspace.
|
||||
|
||||
This file keeps just ``main()`` plus thin re-exports of the private
|
||||
symbols so existing tests' imports (``mcp_cli._build_agent_card``,
|
||||
``mcp_cli._heartbeat_loop``, etc.) keep working without churn.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import configs_dir
|
||||
import mcp_heartbeat
|
||||
import mcp_inbox_pollers
|
||||
import mcp_workspace_resolver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Heartbeat cadence. Must be tighter than healthsweep's stale window
|
||||
# (currently 60-90s — see registry/healthsweep.go) by a comfortable
|
||||
# margin so a single missed heartbeat doesn't flip awaiting_agent.
|
||||
# 20s gives the operator's network 3 attempts within the budget; long
|
||||
# enough that it doesn't spam, short enough to recover quickly after
|
||||
# laptop sleep.
|
||||
HEARTBEAT_INTERVAL_SECONDS = 20.0
|
||||
# Re-export public surface for back-compat with the pre-split callers
|
||||
# and tests. The underscore-prefixed names mirror the names that
|
||||
# existed in this module before the split — keeping them ensures
|
||||
# `mcp_cli._build_agent_card`, `mcp_cli._heartbeat_loop`, etc.
|
||||
# resolve identically to the new functions.
|
||||
HEARTBEAT_INTERVAL_SECONDS = mcp_heartbeat.HEARTBEAT_INTERVAL_SECONDS
|
||||
_HEARTBEAT_AUTH_LOUD_THRESHOLD = mcp_heartbeat.HEARTBEAT_AUTH_LOUD_THRESHOLD
|
||||
_HEARTBEAT_AUTH_RELOG_INTERVAL = mcp_heartbeat.HEARTBEAT_AUTH_RELOG_INTERVAL
|
||||
|
||||
# After this many consecutive 401/403 heartbeats, escalate from
|
||||
# WARNING to ERROR with re-onboard guidance. 3 ticks at 20s = ~1 minute
|
||||
# of sustained auth failure — enough to rule out a transient platform
|
||||
# blip but quick enough that an operator doesn't sit puzzled for 10
|
||||
# minutes wondering why their MCP tools 401. Same threshold used for
|
||||
# repeat-logging at 20-tick (~7 min) intervals so a long-running
|
||||
# session that missed the first ERROR still sees the message.
|
||||
_HEARTBEAT_AUTH_LOUD_THRESHOLD = 3
|
||||
_HEARTBEAT_AUTH_RELOG_INTERVAL = 20
|
||||
_build_agent_card = mcp_heartbeat.build_agent_card
|
||||
_platform_register = mcp_heartbeat.platform_register
|
||||
_heartbeat_loop = mcp_heartbeat.heartbeat_loop
|
||||
_log_heartbeat_auth_failure = mcp_heartbeat.log_heartbeat_auth_failure
|
||||
_persist_inbound_secret_from_heartbeat = mcp_heartbeat.persist_inbound_secret_from_heartbeat
|
||||
_start_heartbeat_thread = mcp_heartbeat.start_heartbeat_thread
|
||||
|
||||
_resolve_workspaces = mcp_workspace_resolver.resolve_workspaces
|
||||
_print_missing_env_help = mcp_workspace_resolver.print_missing_env_help
|
||||
_read_token_file = mcp_workspace_resolver.read_token_file
|
||||
|
||||
def _build_agent_card(workspace_id: str) -> dict:
|
||||
"""Build the ``agent_card`` payload sent to /registry/register.
|
||||
|
||||
Three optional env vars override the defaults so an operator can
|
||||
surface human-readable identity + capabilities to peers and the
|
||||
canvas Skills tab without code changes:
|
||||
|
||||
* ``MOLECULE_AGENT_NAME`` — display name (defaults to
|
||||
``molecule-mcp-{id[:8]}``). Surfaced in canvas workspace cards
|
||||
and ``list_peers`` output.
|
||||
* ``MOLECULE_AGENT_DESCRIPTION`` — one-liner about the agent's
|
||||
purpose. Rendered in canvas Details + Skills tabs.
|
||||
* ``MOLECULE_AGENT_SKILLS`` — comma-separated skill names
|
||||
(e.g. ``research,code-review,memory-curation``). Each name is
|
||||
expanded to a ``{"name": ...}`` skill object — the minimum
|
||||
shape that satisfies both ``shared_runtime.summarize_peers``
|
||||
(uses ``s["name"]``) and the canvas SkillsTab.tsx schema
|
||||
(id falls back to name when omitted). Empty / whitespace
|
||||
entries are dropped.
|
||||
|
||||
Defaults match the previous hardcoded behaviour exactly so this
|
||||
is a strict superset — an operator who sets none of the env vars
|
||||
sees no change.
|
||||
"""
|
||||
name = (os.environ.get("MOLECULE_AGENT_NAME") or "").strip()
|
||||
if not name:
|
||||
name = f"molecule-mcp-{workspace_id[:8]}"
|
||||
|
||||
description = (os.environ.get("MOLECULE_AGENT_DESCRIPTION") or "").strip()
|
||||
|
||||
skills_raw = (os.environ.get("MOLECULE_AGENT_SKILLS") or "").strip()
|
||||
skills: list[dict] = []
|
||||
if skills_raw:
|
||||
for s in skills_raw.split(","):
|
||||
label = s.strip()
|
||||
if label:
|
||||
skills.append({"name": label})
|
||||
|
||||
card: dict = {"name": name, "skills": skills}
|
||||
if description:
|
||||
card["description"] = description
|
||||
return card
|
||||
|
||||
|
||||
def _platform_register(platform_url: str, workspace_id: str, token: str) -> None:
|
||||
"""One-shot register at startup; fails fast on auth errors.
|
||||
|
||||
Lifts the workspace from ``awaiting_agent`` to ``online`` for
|
||||
operators who never ran the curl-register snippet. Safe to call
|
||||
repeatedly: the platform's register handler is an upsert that
|
||||
just refreshes ``url``, ``agent_card``, and ``status``.
|
||||
|
||||
Failure model (post-review):
|
||||
- 401 / 403 → ``sys.exit(3)`` immediately. The operator's
|
||||
token is wrong; silently looping in a broken state would
|
||||
make this hard to diagnose because the MCP tools would 401
|
||||
on every call too. Hard-fail is the kindest option.
|
||||
- Other 4xx/5xx → log a warning + continue. The heartbeat
|
||||
thread will surface persistent failures; transient platform
|
||||
blips shouldn't abort the MCP loop.
|
||||
- Network / transport errors → log + continue. Same reasoning.
|
||||
|
||||
Origin header is required by the SaaS edge WAF; without it
|
||||
/registry/register currently still works (it's on the WAF
|
||||
allowlist), but the heartbeat path needs Origin and we want one
|
||||
consistent header set across both calls.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
# httpx is a transitive dep via a2a-sdk; if missing, the MCP
|
||||
# server won't import either. Let the caller's later import
|
||||
# surface the real error.
|
||||
return
|
||||
|
||||
payload = {
|
||||
"id": workspace_id,
|
||||
"url": "",
|
||||
"agent_card": _build_agent_card(workspace_id),
|
||||
"delivery_mode": "poll",
|
||||
}
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Origin": platform_url,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
try:
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
resp = client.post(
|
||||
f"{platform_url}/registry/register",
|
||||
json=payload,
|
||||
headers=headers,
|
||||
)
|
||||
if resp.status_code in (401, 403):
|
||||
print(
|
||||
f"molecule-mcp: register rejected with HTTP {resp.status_code} — "
|
||||
f"the token in MOLECULE_WORKSPACE_TOKEN is invalid for workspace "
|
||||
f"{workspace_id}. Regenerate from the canvas → Tokens tab.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(3)
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
"molecule-mcp: register POST returned HTTP %d: %s",
|
||||
resp.status_code,
|
||||
(resp.text or "")[:200],
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"molecule-mcp: registered workspace %s with platform",
|
||||
workspace_id,
|
||||
)
|
||||
except SystemExit:
|
||||
raise
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("molecule-mcp: register POST failed: %s", exc)
|
||||
|
||||
|
||||
def _heartbeat_loop(
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
token: str,
|
||||
interval: float = HEARTBEAT_INTERVAL_SECONDS,
|
||||
) -> None:
|
||||
"""Daemon thread body: POST /registry/heartbeat every ``interval``s.
|
||||
|
||||
Failures are logged at WARNING and the loop continues. The thread
|
||||
exits when the main process does (daemon=True). Each iteration
|
||||
rebuilds the payload + headers — cheap and ensures token rotation
|
||||
via env var (rare but possible) is picked up on the next tick.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
consecutive_auth_failures = 0
|
||||
while True:
|
||||
body = {
|
||||
"workspace_id": workspace_id,
|
||||
"error_rate": 0.0,
|
||||
"sample_error": "",
|
||||
"active_tasks": 0,
|
||||
"uptime_seconds": int(time.time() - start_time),
|
||||
}
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Origin": platform_url,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
try:
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
resp = client.post(
|
||||
f"{platform_url}/registry/heartbeat",
|
||||
json=body,
|
||||
headers=headers,
|
||||
)
|
||||
if resp.status_code in (401, 403):
|
||||
consecutive_auth_failures += 1
|
||||
_log_heartbeat_auth_failure(
|
||||
consecutive_auth_failures, workspace_id, resp.status_code,
|
||||
)
|
||||
elif resp.status_code >= 400:
|
||||
# Non-auth HTTP error — log, but DO NOT touch the
|
||||
# auth-failure counter (5xx blips, 429, etc. are
|
||||
# transient and unrelated to token validity).
|
||||
logger.warning(
|
||||
"molecule-mcp: heartbeat HTTP %d: %s",
|
||||
resp.status_code,
|
||||
(resp.text or "")[:200],
|
||||
)
|
||||
else:
|
||||
consecutive_auth_failures = 0
|
||||
_persist_inbound_secret_from_heartbeat(resp)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("molecule-mcp: heartbeat failed: %s", exc)
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def _log_heartbeat_auth_failure(count: int, workspace_id: str, status_code: int) -> None:
|
||||
"""Escalate consecutive heartbeat 401/403s from quiet WARNING to
|
||||
actionable ERROR.
|
||||
|
||||
The operator's first sign of trouble shouldn't be "tools 401 with no
|
||||
explanation" — that was the failure mode that motivated this code,
|
||||
triggered by a workspace being deleted server-side and its tokens
|
||||
revoked while the runtime kept heartbeating in silence.
|
||||
|
||||
Cadence:
|
||||
* count < threshold: WARNING per tick (transient — could be a
|
||||
platform blip, don't shout yet)
|
||||
* count == threshold: ERROR with re-onboard instructions
|
||||
(the first signal the operator can't miss)
|
||||
* count > threshold and (count - threshold) % relog == 0: re-log
|
||||
ERROR (so a session that started after the first ERROR still
|
||||
sees the message scrolling past in their logs)
|
||||
"""
|
||||
if count < _HEARTBEAT_AUTH_LOUD_THRESHOLD:
|
||||
logger.warning(
|
||||
"molecule-mcp: heartbeat HTTP %d (auth failure %d/%d) — "
|
||||
"token may be revoked. Will retry; if persistent, regenerate "
|
||||
"from canvas → Tokens.",
|
||||
status_code, count, _HEARTBEAT_AUTH_LOUD_THRESHOLD,
|
||||
)
|
||||
return
|
||||
# At or past the threshold — this is the loud actionable error.
|
||||
if count == _HEARTBEAT_AUTH_LOUD_THRESHOLD or (
|
||||
count - _HEARTBEAT_AUTH_LOUD_THRESHOLD
|
||||
) % _HEARTBEAT_AUTH_RELOG_INTERVAL == 0:
|
||||
logger.error(
|
||||
"molecule-mcp: %d consecutive heartbeat auth failures (HTTP %d) — "
|
||||
"the token in MOLECULE_WORKSPACE_TOKEN has been REVOKED, likely "
|
||||
"because workspace %s was deleted server-side. The MCP server is "
|
||||
"still running but every platform call will fail. Regenerate the "
|
||||
"workspace + token from the canvas (Tokens tab), update your MCP "
|
||||
"config, and restart your runtime.",
|
||||
count, status_code, workspace_id,
|
||||
)
|
||||
|
||||
|
||||
def _persist_inbound_secret_from_heartbeat(resp: object) -> None:
|
||||
"""Persist ``platform_inbound_secret`` from a heartbeat response, if any.
|
||||
|
||||
The platform's heartbeat handler returns the secret on every beat
|
||||
(mirroring /registry/register) so a workspace that lazy-healed the
|
||||
secret on the platform side — typical recovery path for a workspace
|
||||
whose row had a NULL ``platform_inbound_secret`` after a partial
|
||||
bootstrap — picks it up within one heartbeat tick instead of
|
||||
requiring a runtime restart.
|
||||
|
||||
Without this delivery path the chat-upload code path's "secret was
|
||||
just minted, will pick up on next heartbeat" 503 message is a lie
|
||||
and the workspace stays 401-forever until the operator restarts
|
||||
the runtime. Caught 2026-04-30 on hongmingwang tenant.
|
||||
|
||||
Failure is non-fatal: if the body isn't JSON, doesn't carry the
|
||||
field, or the disk write fails, the next heartbeat retries. This
|
||||
matches the cold-start register flow in main.py:319-323.
|
||||
"""
|
||||
try:
|
||||
body = resp.json()
|
||||
except Exception: # noqa: BLE001
|
||||
return
|
||||
if not isinstance(body, dict):
|
||||
return
|
||||
secret = body.get("platform_inbound_secret")
|
||||
if not secret:
|
||||
return
|
||||
try:
|
||||
from platform_inbound_auth import save_inbound_secret
|
||||
|
||||
save_inbound_secret(secret)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning(
|
||||
"molecule-mcp: persist inbound secret from heartbeat failed: %s", exc
|
||||
)
|
||||
|
||||
|
||||
def _start_heartbeat_thread(
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
token: str,
|
||||
) -> threading.Thread:
|
||||
"""Start the heartbeat daemon thread. Returns the Thread handle.
|
||||
|
||||
The MCP stdio loop runs in the foreground (asyncio); this thread
|
||||
runs alongside it. ``daemon=True`` so when the operator hits
|
||||
Ctrl-C / closes the runtime, the heartbeat dies with it instead
|
||||
of leaking and writing to a stale workspace.
|
||||
"""
|
||||
t = threading.Thread(
|
||||
target=_heartbeat_loop,
|
||||
args=(platform_url, workspace_id, token),
|
||||
name="molecule-mcp-heartbeat",
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
return t
|
||||
|
||||
|
||||
def _resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]:
|
||||
"""Return the list of ``(workspace_id, token)`` pairs to register.
|
||||
|
||||
Resolution order:
|
||||
|
||||
1. ``MOLECULE_WORKSPACES`` env var — JSON array of
|
||||
``{"id": "...", "token": "..."}`` objects. Activates the
|
||||
multi-workspace external-agent path (one process registered into
|
||||
N workspaces). When set, ``WORKSPACE_ID`` / ``MOLECULE_WORKSPACE_TOKEN``
|
||||
are IGNORED — the JSON is the source of truth.
|
||||
|
||||
2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token from
|
||||
``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``.
|
||||
This is the pre-existing path; back-compat exact.
|
||||
|
||||
Returns ``(workspaces, errors)``:
|
||||
* ``workspaces``: list of ``(workspace_id, token)`` — non-empty
|
||||
on the happy path.
|
||||
* ``errors``: human-readable strings describing what's missing /
|
||||
malformed. ``main()`` surfaces these with the same shape as
|
||||
``_print_missing_env_help`` so the operator's first run gives
|
||||
actionable output.
|
||||
|
||||
Why JSON env (not file): ergonomic for Claude Code MCP config (one
|
||||
string in ``mcpServers.molecule.env`` instead of a sidecar file)
|
||||
and for CI / launchers. A separate config-file path can be added
|
||||
later without breaking this.
|
||||
"""
|
||||
raw = os.environ.get("MOLECULE_WORKSPACES", "").strip()
|
||||
if raw:
|
||||
try:
|
||||
parsed = json.loads(raw)
|
||||
except json.JSONDecodeError as exc:
|
||||
return [], [
|
||||
f"MOLECULE_WORKSPACES is not valid JSON ({exc.msg} at pos "
|
||||
f"{exc.pos}). Expected: '[{{\"id\":\"<wsid>\",\"token\":"
|
||||
f"\"<tok>\"}},{{...}}]'"
|
||||
]
|
||||
if not isinstance(parsed, list) or not parsed:
|
||||
return [], [
|
||||
"MOLECULE_WORKSPACES must be a non-empty JSON array of "
|
||||
"{\"id\":\"...\",\"token\":\"...\"} objects"
|
||||
]
|
||||
out: list[tuple[str, str]] = []
|
||||
seen: set[str] = set()
|
||||
errors: list[str] = []
|
||||
for i, entry in enumerate(parsed):
|
||||
if not isinstance(entry, dict):
|
||||
errors.append(
|
||||
f"MOLECULE_WORKSPACES[{i}] is not an object — got {type(entry).__name__}"
|
||||
)
|
||||
continue
|
||||
wsid = str(entry.get("id", "")).strip()
|
||||
tok = str(entry.get("token", "")).strip()
|
||||
if not wsid or not tok:
|
||||
errors.append(
|
||||
f"MOLECULE_WORKSPACES[{i}] missing 'id' or 'token'"
|
||||
)
|
||||
continue
|
||||
if wsid in seen:
|
||||
errors.append(
|
||||
f"MOLECULE_WORKSPACES[{i}] duplicate workspace id {wsid!r}"
|
||||
)
|
||||
continue
|
||||
seen.add(wsid)
|
||||
out.append((wsid, tok))
|
||||
if errors:
|
||||
return [], errors
|
||||
return out, []
|
||||
|
||||
# Single-workspace back-compat path.
|
||||
wsid = os.environ.get("WORKSPACE_ID", "").strip()
|
||||
if not wsid:
|
||||
return [], ["WORKSPACE_ID (or MOLECULE_WORKSPACES) is required"]
|
||||
tok = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip()
|
||||
if not tok:
|
||||
tok = _read_token_file()
|
||||
if not tok:
|
||||
return [], [
|
||||
"MOLECULE_WORKSPACE_TOKEN (or CONFIGS_DIR/.auth_token) is required"
|
||||
]
|
||||
return [(wsid, tok)], []
|
||||
|
||||
|
||||
def _print_missing_env_help(missing: list[str], have_token_file: bool) -> None:
|
||||
print("molecule-mcp: missing required environment.\n", file=sys.stderr)
|
||||
print("Set the following before running molecule-mcp:", file=sys.stderr)
|
||||
print(" WORKSPACE_ID — your workspace UUID (from canvas)", file=sys.stderr)
|
||||
print(
|
||||
" PLATFORM_URL — base URL of your Molecule platform "
|
||||
"(e.g. https://your-tenant.staging.moleculesai.app)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if not have_token_file:
|
||||
print(
|
||||
" MOLECULE_WORKSPACE_TOKEN — bearer token for this workspace "
|
||||
"(canvas → Tokens tab)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
print("", file=sys.stderr)
|
||||
print(f"Currently missing: {', '.join(missing)}", file=sys.stderr)
|
||||
_start_inbox_pollers = mcp_inbox_pollers.start_inbox_pollers
|
||||
|
||||
|
||||
def main() -> None:
|
||||
@ -558,69 +189,5 @@ def main() -> None:
|
||||
cli_main()
|
||||
|
||||
|
||||
def _start_inbox_pollers(platform_url: str, workspace_ids: list[str]) -> None:
|
||||
"""Activate the inbox singleton + spawn one poller daemon thread per workspace.
|
||||
|
||||
Done lazily here (not at module import) because importing inbox
|
||||
pulls in platform_auth, which only resolves cleanly AFTER env
|
||||
validation succeeds. Activation is idempotent within a process,
|
||||
so a stray double-call (e.g. test harness re-entering main) is
|
||||
harmless.
|
||||
|
||||
The poller threads are daemon=True — die with the main process.
|
||||
|
||||
Single-workspace path: one poller, single cursor file at the legacy
|
||||
location (``.mcp_inbox_cursor``). Cursor-key resolution falls back
|
||||
to the empty string for back-compat with operators whose existing
|
||||
on-disk cursor was written by the pre-multi-workspace code.
|
||||
|
||||
Multi-workspace path: N pollers, each with its own cursor file
|
||||
keyed by ``workspace_id[:8]``. Cursors live next to each other in
|
||||
configs_dir so an operator inspecting state sees all of them
|
||||
together.
|
||||
"""
|
||||
try:
|
||||
import inbox
|
||||
except ImportError as exc:
|
||||
logger.warning("molecule-mcp: inbox module unavailable: %s", exc)
|
||||
return
|
||||
|
||||
if len(workspace_ids) <= 1:
|
||||
# Back-compat exact: single-workspace mode reuses the legacy
|
||||
# cursor filename + cursor_path constructor arg, so an existing
|
||||
# operator's on-disk state isn't invalidated by upgrade.
|
||||
wsid = workspace_ids[0]
|
||||
state = inbox.InboxState(cursor_path=inbox.default_cursor_path())
|
||||
inbox.activate(state)
|
||||
inbox.start_poller_thread(state, platform_url, wsid)
|
||||
return
|
||||
|
||||
# Multi-workspace: per-workspace cursor file, one shared queue.
|
||||
cursor_paths = {wsid: inbox.default_cursor_path(wsid) for wsid in workspace_ids}
|
||||
state = inbox.InboxState(cursor_paths=cursor_paths)
|
||||
inbox.activate(state)
|
||||
for wsid in workspace_ids:
|
||||
inbox.start_poller_thread(state, platform_url, wsid)
|
||||
|
||||
|
||||
def _read_token_file() -> str:
|
||||
"""Read the token from the resolved configs dir's ``.auth_token`` if
|
||||
present.
|
||||
|
||||
Mirrors platform_auth._token_file's location resolution but without
|
||||
importing the heavy module here (that import triggers a2a_client's
|
||||
WORKSPACE_ID guard which is fine after env validation, but cheaper
|
||||
to inline a 4-line file read than pull in the whole stack just for
|
||||
the path).
|
||||
"""
|
||||
path = configs_dir.resolve() / ".auth_token"
|
||||
if not path.is_file():
|
||||
return ""
|
||||
try:
|
||||
return path.read_text().strip()
|
||||
except OSError:
|
||||
return ""
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
|
||||
325
workspace/mcp_heartbeat.py
Normal file
325
workspace/mcp_heartbeat.py
Normal file
@ -0,0 +1,325 @@
|
||||
"""Heartbeat + register thread for the standalone ``molecule-mcp`` wrapper.
|
||||
|
||||
Extracted from ``mcp_cli.py`` (RFC #2873 iter 3) so the heartbeat /
|
||||
register concern lives in its own module. The console-script entry
|
||||
``mcp_cli:main`` still drives the spawn, but the loop body, auth-failure
|
||||
escalation, and inbound-secret persistence now live here so they can be
|
||||
read, tested, and replaced independently of the orchestrator.
|
||||
|
||||
Public surface:
|
||||
|
||||
* ``HEARTBEAT_INTERVAL_SECONDS`` — cadence constant.
|
||||
* ``build_agent_card(workspace_id)`` — payload helper.
|
||||
* ``platform_register(platform_url, workspace_id, token)`` — one-shot
|
||||
POST /registry/register at startup.
|
||||
* ``start_heartbeat_thread(platform_url, workspace_id, token)`` — spawn
|
||||
the daemon thread.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Heartbeat cadence. Must be tighter than healthsweep's stale window
|
||||
# (currently 60-90s — see registry/healthsweep.go) by a comfortable
|
||||
# margin so a single missed heartbeat doesn't flip awaiting_agent.
|
||||
# 20s gives the operator's network 3 attempts within the budget; long
|
||||
# enough that it doesn't spam, short enough to recover quickly after
|
||||
# laptop sleep.
|
||||
HEARTBEAT_INTERVAL_SECONDS = 20.0
|
||||
|
||||
# After this many consecutive 401/403 heartbeats, escalate from
|
||||
# WARNING to ERROR with re-onboard guidance. 3 ticks at 20s = ~1 minute
|
||||
# of sustained auth failure — enough to rule out a transient platform
|
||||
# blip but quick enough that an operator doesn't sit puzzled for 10
|
||||
# minutes wondering why their MCP tools 401. Same threshold used for
|
||||
# repeat-logging at 20-tick (~7 min) intervals so a long-running
|
||||
# session that missed the first ERROR still sees the message.
|
||||
HEARTBEAT_AUTH_LOUD_THRESHOLD = 3
|
||||
HEARTBEAT_AUTH_RELOG_INTERVAL = 20
|
||||
|
||||
|
||||
def build_agent_card(workspace_id: str) -> dict:
|
||||
"""Build the ``agent_card`` payload sent to /registry/register.
|
||||
|
||||
Three optional env vars override the defaults so an operator can
|
||||
surface human-readable identity + capabilities to peers and the
|
||||
canvas Skills tab without code changes:
|
||||
|
||||
* ``MOLECULE_AGENT_NAME`` — display name (defaults to
|
||||
``molecule-mcp-{id[:8]}``). Surfaced in canvas workspace cards
|
||||
and ``list_peers`` output.
|
||||
* ``MOLECULE_AGENT_DESCRIPTION`` — one-liner about the agent's
|
||||
purpose. Rendered in canvas Details + Skills tabs.
|
||||
* ``MOLECULE_AGENT_SKILLS`` — comma-separated skill names
|
||||
(e.g. ``research,code-review,memory-curation``). Each name is
|
||||
expanded to a ``{"name": ...}`` skill object — the minimum
|
||||
shape that satisfies both ``shared_runtime.summarize_peers``
|
||||
(uses ``s["name"]``) and the canvas SkillsTab.tsx schema
|
||||
(id falls back to name when omitted). Empty / whitespace
|
||||
entries are dropped.
|
||||
|
||||
Defaults match the previous hardcoded behaviour exactly so this
|
||||
is a strict superset — an operator who sets none of the env vars
|
||||
sees no change.
|
||||
"""
|
||||
name = (os.environ.get("MOLECULE_AGENT_NAME") or "").strip()
|
||||
if not name:
|
||||
name = f"molecule-mcp-{workspace_id[:8]}"
|
||||
|
||||
description = (os.environ.get("MOLECULE_AGENT_DESCRIPTION") or "").strip()
|
||||
|
||||
skills_raw = (os.environ.get("MOLECULE_AGENT_SKILLS") or "").strip()
|
||||
skills: list[dict] = []
|
||||
if skills_raw:
|
||||
for s in skills_raw.split(","):
|
||||
label = s.strip()
|
||||
if label:
|
||||
skills.append({"name": label})
|
||||
|
||||
card: dict = {"name": name, "skills": skills}
|
||||
if description:
|
||||
card["description"] = description
|
||||
return card
|
||||
|
||||
|
||||
def platform_register(platform_url: str, workspace_id: str, token: str) -> None:
|
||||
"""One-shot register at startup; fails fast on auth errors.
|
||||
|
||||
Lifts the workspace from ``awaiting_agent`` to ``online`` for
|
||||
operators who never ran the curl-register snippet. Safe to call
|
||||
repeatedly: the platform's register handler is an upsert that
|
||||
just refreshes ``url``, ``agent_card``, and ``status``.
|
||||
|
||||
Failure model (post-review):
|
||||
- 401 / 403 → ``sys.exit(3)`` immediately. The operator's
|
||||
token is wrong; silently looping in a broken state would
|
||||
make this hard to diagnose because the MCP tools would 401
|
||||
on every call too. Hard-fail is the kindest option.
|
||||
- Other 4xx/5xx → log a warning + continue. The heartbeat
|
||||
thread will surface persistent failures; transient platform
|
||||
blips shouldn't abort the MCP loop.
|
||||
- Network / transport errors → log + continue. Same reasoning.
|
||||
|
||||
Origin header is required by the SaaS edge WAF; without it
|
||||
/registry/register currently still works (it's on the WAF
|
||||
allowlist), but the heartbeat path needs Origin and we want one
|
||||
consistent header set across both calls.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
# httpx is a transitive dep via a2a-sdk; if missing, the MCP
|
||||
# server won't import either. Let the caller's later import
|
||||
# surface the real error.
|
||||
return
|
||||
|
||||
payload = {
|
||||
"id": workspace_id,
|
||||
"url": "",
|
||||
"agent_card": build_agent_card(workspace_id),
|
||||
"delivery_mode": "poll",
|
||||
}
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Origin": platform_url,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
try:
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
resp = client.post(
|
||||
f"{platform_url}/registry/register",
|
||||
json=payload,
|
||||
headers=headers,
|
||||
)
|
||||
if resp.status_code in (401, 403):
|
||||
print(
|
||||
f"molecule-mcp: register rejected with HTTP {resp.status_code} — "
|
||||
f"the token in MOLECULE_WORKSPACE_TOKEN is invalid for workspace "
|
||||
f"{workspace_id}. Regenerate from the canvas → Tokens tab.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(3)
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
"molecule-mcp: register POST returned HTTP %d: %s",
|
||||
resp.status_code,
|
||||
(resp.text or "")[:200],
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"molecule-mcp: registered workspace %s with platform",
|
||||
workspace_id,
|
||||
)
|
||||
except SystemExit:
|
||||
raise
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("molecule-mcp: register POST failed: %s", exc)
|
||||
|
||||
|
||||
def heartbeat_loop(
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
token: str,
|
||||
interval: float = HEARTBEAT_INTERVAL_SECONDS,
|
||||
) -> None:
|
||||
"""Daemon thread body: POST /registry/heartbeat every ``interval``s.
|
||||
|
||||
Failures are logged at WARNING and the loop continues. The thread
|
||||
exits when the main process does (daemon=True). Each iteration
|
||||
rebuilds the payload + headers — cheap and ensures token rotation
|
||||
via env var (rare but possible) is picked up on the next tick.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
consecutive_auth_failures = 0
|
||||
while True:
|
||||
body = {
|
||||
"workspace_id": workspace_id,
|
||||
"error_rate": 0.0,
|
||||
"sample_error": "",
|
||||
"active_tasks": 0,
|
||||
"uptime_seconds": int(time.time() - start_time),
|
||||
}
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Origin": platform_url,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
try:
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
resp = client.post(
|
||||
f"{platform_url}/registry/heartbeat",
|
||||
json=body,
|
||||
headers=headers,
|
||||
)
|
||||
if resp.status_code in (401, 403):
|
||||
consecutive_auth_failures += 1
|
||||
log_heartbeat_auth_failure(
|
||||
consecutive_auth_failures, workspace_id, resp.status_code,
|
||||
)
|
||||
elif resp.status_code >= 400:
|
||||
# Non-auth HTTP error — log, but DO NOT touch the
|
||||
# auth-failure counter (5xx blips, 429, etc. are
|
||||
# transient and unrelated to token validity).
|
||||
logger.warning(
|
||||
"molecule-mcp: heartbeat HTTP %d: %s",
|
||||
resp.status_code,
|
||||
(resp.text or "")[:200],
|
||||
)
|
||||
else:
|
||||
consecutive_auth_failures = 0
|
||||
persist_inbound_secret_from_heartbeat(resp)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("molecule-mcp: heartbeat failed: %s", exc)
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def log_heartbeat_auth_failure(count: int, workspace_id: str, status_code: int) -> None:
|
||||
"""Escalate consecutive heartbeat 401/403s from quiet WARNING to
|
||||
actionable ERROR.
|
||||
|
||||
The operator's first sign of trouble shouldn't be "tools 401 with no
|
||||
explanation" — that was the failure mode that motivated this code,
|
||||
triggered by a workspace being deleted server-side and its tokens
|
||||
revoked while the runtime kept heartbeating in silence.
|
||||
|
||||
Cadence:
|
||||
* count < threshold: WARNING per tick (transient — could be a
|
||||
platform blip, don't shout yet)
|
||||
* count == threshold: ERROR with re-onboard instructions
|
||||
(the first signal the operator can't miss)
|
||||
* count > threshold and (count - threshold) % relog == 0: re-log
|
||||
ERROR (so a session that started after the first ERROR still
|
||||
sees the message scrolling past in their logs)
|
||||
"""
|
||||
if count < HEARTBEAT_AUTH_LOUD_THRESHOLD:
|
||||
logger.warning(
|
||||
"molecule-mcp: heartbeat HTTP %d (auth failure %d/%d) — "
|
||||
"token may be revoked. Will retry; if persistent, regenerate "
|
||||
"from canvas → Tokens.",
|
||||
status_code, count, HEARTBEAT_AUTH_LOUD_THRESHOLD,
|
||||
)
|
||||
return
|
||||
# At or past the threshold — this is the loud actionable error.
|
||||
if count == HEARTBEAT_AUTH_LOUD_THRESHOLD or (
|
||||
count - HEARTBEAT_AUTH_LOUD_THRESHOLD
|
||||
) % HEARTBEAT_AUTH_RELOG_INTERVAL == 0:
|
||||
logger.error(
|
||||
"molecule-mcp: %d consecutive heartbeat auth failures (HTTP %d) — "
|
||||
"the token in MOLECULE_WORKSPACE_TOKEN has been REVOKED, likely "
|
||||
"because workspace %s was deleted server-side. The MCP server is "
|
||||
"still running but every platform call will fail. Regenerate the "
|
||||
"workspace + token from the canvas (Tokens tab), update your MCP "
|
||||
"config, and restart your runtime.",
|
||||
count, status_code, workspace_id,
|
||||
)
|
||||
|
||||
|
||||
def persist_inbound_secret_from_heartbeat(resp: object) -> None:
|
||||
"""Persist ``platform_inbound_secret`` from a heartbeat response, if any.
|
||||
|
||||
The platform's heartbeat handler returns the secret on every beat
|
||||
(mirroring /registry/register) so a workspace that lazy-healed the
|
||||
secret on the platform side — typical recovery path for a workspace
|
||||
whose row had a NULL ``platform_inbound_secret`` after a partial
|
||||
bootstrap — picks it up within one heartbeat tick instead of
|
||||
requiring a runtime restart.
|
||||
|
||||
Without this delivery path the chat-upload code path's "secret was
|
||||
just minted, will pick up on next heartbeat" 503 message is a lie
|
||||
and the workspace stays 401-forever until the operator restarts
|
||||
the runtime. Caught 2026-04-30 on hongmingwang tenant.
|
||||
|
||||
Failure is non-fatal: if the body isn't JSON, doesn't carry the
|
||||
field, or the disk write fails, the next heartbeat retries. This
|
||||
matches the cold-start register flow in main.py:319-323.
|
||||
"""
|
||||
try:
|
||||
body = resp.json()
|
||||
except Exception: # noqa: BLE001
|
||||
return
|
||||
if not isinstance(body, dict):
|
||||
return
|
||||
secret = body.get("platform_inbound_secret")
|
||||
if not secret:
|
||||
return
|
||||
try:
|
||||
from platform_inbound_auth import save_inbound_secret
|
||||
|
||||
save_inbound_secret(secret)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning(
|
||||
"molecule-mcp: persist inbound secret from heartbeat failed: %s", exc
|
||||
)
|
||||
|
||||
|
||||
def start_heartbeat_thread(
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
token: str,
|
||||
) -> threading.Thread:
|
||||
"""Start the heartbeat daemon thread. Returns the Thread handle.
|
||||
|
||||
The MCP stdio loop runs in the foreground (asyncio); this thread
|
||||
runs alongside it. ``daemon=True`` so when the operator hits
|
||||
Ctrl-C / closes the runtime, the heartbeat dies with it instead
|
||||
of leaking and writing to a stale workspace.
|
||||
"""
|
||||
t = threading.Thread(
|
||||
target=heartbeat_loop,
|
||||
args=(platform_url, workspace_id, token),
|
||||
name="molecule-mcp-heartbeat",
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
return t
|
||||
63
workspace/mcp_inbox_pollers.py
Normal file
63
workspace/mcp_inbox_pollers.py
Normal file
@ -0,0 +1,63 @@
|
||||
"""Inbox-poller spawn helpers for the standalone ``molecule-mcp`` wrapper.
|
||||
|
||||
Extracted from ``mcp_cli.py`` (RFC #2873 iter 3). The poller is the
|
||||
INBOUND side of the standalone path — without it, the universal MCP
|
||||
server is outbound-only (can call ``delegate_task`` /
|
||||
``send_message_to_user``, never observes canvas-user / peer-agent
|
||||
messages).
|
||||
|
||||
Public surface:
|
||||
|
||||
* ``start_inbox_pollers(platform_url, workspace_ids)`` — activate the
|
||||
inbox singleton and spawn one daemon poller per workspace.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def start_inbox_pollers(platform_url: str, workspace_ids: list[str]) -> None:
|
||||
"""Activate the inbox singleton + spawn one poller daemon thread per workspace.
|
||||
|
||||
Done lazily here (not at module import) because importing inbox
|
||||
pulls in platform_auth, which only resolves cleanly AFTER env
|
||||
validation succeeds. Activation is idempotent within a process,
|
||||
so a stray double-call (e.g. test harness re-entering main) is
|
||||
harmless.
|
||||
|
||||
The poller threads are daemon=True — die with the main process.
|
||||
|
||||
Single-workspace path: one poller, single cursor file at the legacy
|
||||
location (``.mcp_inbox_cursor``). Cursor-key resolution falls back
|
||||
to the empty string for back-compat with operators whose existing
|
||||
on-disk cursor was written by the pre-multi-workspace code.
|
||||
|
||||
Multi-workspace path: N pollers, each with its own cursor file
|
||||
keyed by ``workspace_id[:8]``. Cursors live next to each other in
|
||||
configs_dir so an operator inspecting state sees all of them
|
||||
together.
|
||||
"""
|
||||
try:
|
||||
import inbox
|
||||
except ImportError as exc:
|
||||
logger.warning("molecule-mcp: inbox module unavailable: %s", exc)
|
||||
return
|
||||
|
||||
if len(workspace_ids) <= 1:
|
||||
# Back-compat exact: single-workspace mode reuses the legacy
|
||||
# cursor filename + cursor_path constructor arg, so an existing
|
||||
# operator's on-disk state isn't invalidated by upgrade.
|
||||
wsid = workspace_ids[0]
|
||||
state = inbox.InboxState(cursor_path=inbox.default_cursor_path())
|
||||
inbox.activate(state)
|
||||
inbox.start_poller_thread(state, platform_url, wsid)
|
||||
return
|
||||
|
||||
# Multi-workspace: per-workspace cursor file, one shared queue.
|
||||
cursor_paths = {wsid: inbox.default_cursor_path(wsid) for wsid in workspace_ids}
|
||||
state = inbox.InboxState(cursor_paths=cursor_paths)
|
||||
inbox.activate(state)
|
||||
for wsid in workspace_ids:
|
||||
inbox.start_poller_thread(state, platform_url, wsid)
|
||||
146
workspace/mcp_workspace_resolver.py
Normal file
146
workspace/mcp_workspace_resolver.py
Normal file
@ -0,0 +1,146 @@
|
||||
"""Env validation + workspace resolution for the standalone ``molecule-mcp``.
|
||||
|
||||
Extracted from ``mcp_cli.py`` (RFC #2873 iter 3). Deals with the two
|
||||
shapes ``molecule-mcp`` accepts:
|
||||
|
||||
* Single-workspace legacy shape: ``WORKSPACE_ID`` + token from
|
||||
``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``.
|
||||
* Multi-workspace JSON shape: ``MOLECULE_WORKSPACES`` env var carries a
|
||||
JSON array of ``{"id": ..., "token": ...}`` entries.
|
||||
|
||||
Public surface:
|
||||
|
||||
* ``resolve_workspaces()`` → ``(workspaces, errors)``.
|
||||
* ``read_token_file()`` → token text or ``""``.
|
||||
* ``print_missing_env_help(missing, have_token_file)`` — operator-help
|
||||
printer.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
import configs_dir
|
||||
|
||||
|
||||
def resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]:
|
||||
"""Return the list of ``(workspace_id, token)`` pairs to register.
|
||||
|
||||
Resolution order:
|
||||
|
||||
1. ``MOLECULE_WORKSPACES`` env var — JSON array of
|
||||
``{"id": "...", "token": "..."}`` objects. Activates the
|
||||
multi-workspace external-agent path (one process registered into
|
||||
N workspaces). When set, ``WORKSPACE_ID`` / ``MOLECULE_WORKSPACE_TOKEN``
|
||||
are IGNORED — the JSON is the source of truth.
|
||||
|
||||
2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token from
|
||||
``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``.
|
||||
This is the pre-existing path; back-compat exact.
|
||||
|
||||
Returns ``(workspaces, errors)``:
|
||||
* ``workspaces``: list of ``(workspace_id, token)`` — non-empty
|
||||
on the happy path.
|
||||
* ``errors``: human-readable strings describing what's missing /
|
||||
malformed. ``main()`` surfaces these with the same shape as
|
||||
``print_missing_env_help`` so the operator's first run gives
|
||||
actionable output.
|
||||
|
||||
Why JSON env (not file): ergonomic for Claude Code MCP config (one
|
||||
string in ``mcpServers.molecule.env`` instead of a sidecar file)
|
||||
and for CI / launchers. A separate config-file path can be added
|
||||
later without breaking this.
|
||||
"""
|
||||
raw = os.environ.get("MOLECULE_WORKSPACES", "").strip()
|
||||
if raw:
|
||||
try:
|
||||
parsed = json.loads(raw)
|
||||
except json.JSONDecodeError as exc:
|
||||
return [], [
|
||||
f"MOLECULE_WORKSPACES is not valid JSON ({exc.msg} at pos "
|
||||
f"{exc.pos}). Expected: '[{{\"id\":\"<wsid>\",\"token\":"
|
||||
f"\"<tok>\"}},{{...}}]'"
|
||||
]
|
||||
if not isinstance(parsed, list) or not parsed:
|
||||
return [], [
|
||||
"MOLECULE_WORKSPACES must be a non-empty JSON array of "
|
||||
"{\"id\":\"...\",\"token\":\"...\"} objects"
|
||||
]
|
||||
out: list[tuple[str, str]] = []
|
||||
seen: set[str] = set()
|
||||
errors: list[str] = []
|
||||
for i, entry in enumerate(parsed):
|
||||
if not isinstance(entry, dict):
|
||||
errors.append(
|
||||
f"MOLECULE_WORKSPACES[{i}] is not an object — got {type(entry).__name__}"
|
||||
)
|
||||
continue
|
||||
wsid = str(entry.get("id", "")).strip()
|
||||
tok = str(entry.get("token", "")).strip()
|
||||
if not wsid or not tok:
|
||||
errors.append(
|
||||
f"MOLECULE_WORKSPACES[{i}] missing 'id' or 'token'"
|
||||
)
|
||||
continue
|
||||
if wsid in seen:
|
||||
errors.append(
|
||||
f"MOLECULE_WORKSPACES[{i}] duplicate workspace id {wsid!r}"
|
||||
)
|
||||
continue
|
||||
seen.add(wsid)
|
||||
out.append((wsid, tok))
|
||||
if errors:
|
||||
return [], errors
|
||||
return out, []
|
||||
|
||||
# Single-workspace back-compat path.
|
||||
wsid = os.environ.get("WORKSPACE_ID", "").strip()
|
||||
if not wsid:
|
||||
return [], ["WORKSPACE_ID (or MOLECULE_WORKSPACES) is required"]
|
||||
tok = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip()
|
||||
if not tok:
|
||||
tok = read_token_file()
|
||||
if not tok:
|
||||
return [], [
|
||||
"MOLECULE_WORKSPACE_TOKEN (or CONFIGS_DIR/.auth_token) is required"
|
||||
]
|
||||
return [(wsid, tok)], []
|
||||
|
||||
|
||||
def print_missing_env_help(missing: list[str], have_token_file: bool) -> None:
|
||||
print("molecule-mcp: missing required environment.\n", file=sys.stderr)
|
||||
print("Set the following before running molecule-mcp:", file=sys.stderr)
|
||||
print(" WORKSPACE_ID — your workspace UUID (from canvas)", file=sys.stderr)
|
||||
print(
|
||||
" PLATFORM_URL — base URL of your Molecule platform "
|
||||
"(e.g. https://your-tenant.staging.moleculesai.app)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if not have_token_file:
|
||||
print(
|
||||
" MOLECULE_WORKSPACE_TOKEN — bearer token for this workspace "
|
||||
"(canvas → Tokens tab)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
print("", file=sys.stderr)
|
||||
print(f"Currently missing: {', '.join(missing)}", file=sys.stderr)
|
||||
|
||||
|
||||
def read_token_file() -> str:
|
||||
"""Read the token from the resolved configs dir's ``.auth_token`` if
|
||||
present.
|
||||
|
||||
Mirrors platform_auth._token_file's location resolution but without
|
||||
importing the heavy module here (that import triggers a2a_client's
|
||||
WORKSPACE_ID guard which is fine after env validation, but cheaper
|
||||
to inline a 4-line file read than pull in the whole stack just for
|
||||
the path).
|
||||
"""
|
||||
path = configs_dir.resolve() / ".auth_token"
|
||||
if not path.is_file():
|
||||
return ""
|
||||
try:
|
||||
return path.read_text().strip()
|
||||
except OSError:
|
||||
return ""
|
||||
281
workspace/tests/test_a2a_tools_rbac.py
Normal file
281
workspace/tests/test_a2a_tools_rbac.py
Normal file
@ -0,0 +1,281 @@
|
||||
"""Direct tests for ``a2a_tools_rbac`` (RFC #2873 iter 4a).
|
||||
|
||||
The full behavior matrix is exercised through ``a2a_tools._foo`` aliases
|
||||
in ``test_a2a_tools_impl.py``. This file pins:
|
||||
|
||||
1. **Drift gate** — ``a2a_tools._foo is a2a_tools_rbac.foo`` for every
|
||||
extracted symbol. A refactor that wraps or re-implements an alias
|
||||
fails this test.
|
||||
2. **Direct unit coverage** for each helper without going through the
|
||||
a2a_tools surface, so regressions in the small RBAC layer surface
|
||||
against THIS module's tests, not the 991-LOC tool-handler tests.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _require_workspace_id(monkeypatch):
|
||||
# a2a_client raises at import-time without WORKSPACE_ID. Setting it
|
||||
# once per test isolates the env so an absent value in CI doesn't
|
||||
# surface as an opaque RuntimeError from a2a_tools' import.
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
yield
|
||||
|
||||
|
||||
# ============== Drift gate ==============
|
||||
|
||||
class TestBackCompatAliases:
|
||||
"""Pin that every legacy underscore name in ``a2a_tools`` is the
|
||||
EXACT same callable / object as the new public name in
|
||||
``a2a_tools_rbac``. Catches accidental re-implementation in either
|
||||
direction."""
|
||||
|
||||
def test_role_permissions_is_same_object(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools._ROLE_PERMISSIONS is a2a_tools_rbac.ROLE_PERMISSIONS
|
||||
|
||||
def test_get_workspace_tier_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools._get_workspace_tier is a2a_tools_rbac.get_workspace_tier
|
||||
|
||||
def test_check_memory_write_permission_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert (
|
||||
a2a_tools._check_memory_write_permission
|
||||
is a2a_tools_rbac.check_memory_write_permission
|
||||
)
|
||||
|
||||
def test_check_memory_read_permission_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert (
|
||||
a2a_tools._check_memory_read_permission
|
||||
is a2a_tools_rbac.check_memory_read_permission
|
||||
)
|
||||
|
||||
def test_is_root_workspace_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools._is_root_workspace is a2a_tools_rbac.is_root_workspace
|
||||
|
||||
def test_auth_headers_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert (
|
||||
a2a_tools._auth_headers_for_heartbeat
|
||||
is a2a_tools_rbac.auth_headers_for_heartbeat
|
||||
)
|
||||
|
||||
|
||||
# ============== get_workspace_tier ==============
|
||||
|
||||
class TestGetWorkspaceTier:
|
||||
def test_uses_config_when_available(self):
|
||||
"""Happy path: load_config returns an object with .tier."""
|
||||
import a2a_tools_rbac
|
||||
|
||||
class _Cfg:
|
||||
tier = 0
|
||||
|
||||
with patch("config.load_config", return_value=_Cfg()):
|
||||
assert a2a_tools_rbac.get_workspace_tier() == 0
|
||||
|
||||
def test_default_tier_when_config_lacks_attr(self):
|
||||
import a2a_tools_rbac
|
||||
|
||||
class _Cfg:
|
||||
pass
|
||||
|
||||
with patch("config.load_config", return_value=_Cfg()):
|
||||
# getattr default = 1
|
||||
assert a2a_tools_rbac.get_workspace_tier() == 1
|
||||
|
||||
def test_falls_back_to_env_var(self, monkeypatch):
|
||||
"""When load_config raises, read WORKSPACE_TIER from env."""
|
||||
import a2a_tools_rbac
|
||||
monkeypatch.setenv("WORKSPACE_TIER", "5")
|
||||
with patch("config.load_config", side_effect=RuntimeError("config unavailable")):
|
||||
assert a2a_tools_rbac.get_workspace_tier() == 5
|
||||
|
||||
def test_fallback_default_one_when_env_unset(self, monkeypatch):
|
||||
import a2a_tools_rbac
|
||||
monkeypatch.delenv("WORKSPACE_TIER", raising=False)
|
||||
with patch("config.load_config", side_effect=RuntimeError("boom")):
|
||||
assert a2a_tools_rbac.get_workspace_tier() == 1
|
||||
|
||||
|
||||
# ============== is_root_workspace ==============
|
||||
|
||||
class TestIsRootWorkspace:
|
||||
def test_tier_zero_is_root(self):
|
||||
import a2a_tools_rbac
|
||||
with patch.object(a2a_tools_rbac, "get_workspace_tier", return_value=0):
|
||||
assert a2a_tools_rbac.is_root_workspace() is True
|
||||
|
||||
def test_nonzero_tier_is_not_root(self):
|
||||
import a2a_tools_rbac
|
||||
for tier in (1, 2, 99):
|
||||
with patch.object(a2a_tools_rbac, "get_workspace_tier", return_value=tier):
|
||||
assert a2a_tools_rbac.is_root_workspace() is False, f"tier={tier}"
|
||||
|
||||
|
||||
# ============== check_memory_write_permission ==============
|
||||
|
||||
class _RBACCfg:
|
||||
"""Minimal config stub matching the load_config().rbac shape."""
|
||||
|
||||
def __init__(self, roles=None, allowed_actions=None):
|
||||
class _RBAC:
|
||||
pass
|
||||
self.rbac = _RBAC()
|
||||
self.rbac.roles = roles or ["operator"]
|
||||
self.rbac.allowed_actions = allowed_actions or {}
|
||||
|
||||
|
||||
class TestCheckMemoryWritePermission:
|
||||
def test_admin_role_grants_write(self):
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["admin"])):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is True
|
||||
|
||||
def test_operator_role_grants_write(self):
|
||||
"""Operator is in the canonical ROLE_PERMISSIONS table with
|
||||
memory.write — must work without per-role overrides."""
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["operator"])):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is True
|
||||
|
||||
def test_read_only_role_denies_write(self):
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["read-only"])):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is False
|
||||
|
||||
def test_per_role_override_grants(self):
|
||||
"""Per-role override in allowed_actions wins over the canonical
|
||||
table — operators can grant write to memory-readonly via config."""
|
||||
import a2a_tools_rbac
|
||||
cfg = _RBACCfg(
|
||||
roles=["memory-readonly"],
|
||||
allowed_actions={"memory-readonly": {"memory.read", "memory.write"}},
|
||||
)
|
||||
with patch("config.load_config", return_value=cfg):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is True
|
||||
|
||||
def test_per_role_override_denies(self):
|
||||
"""Per-role override that drops write blocks an operator from
|
||||
writing — the override is the authoritative source when present."""
|
||||
import a2a_tools_rbac
|
||||
cfg = _RBACCfg(
|
||||
roles=["operator"],
|
||||
allowed_actions={"operator": {"memory.read"}},
|
||||
)
|
||||
with patch("config.load_config", return_value=cfg):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is False
|
||||
|
||||
def test_fail_closed_when_config_unavailable(self):
|
||||
"""Fail-closed contract: config outage falls back to ['operator']
|
||||
with no overrides — operator has memory.write in the canonical
|
||||
table, so write IS granted in this fallback. The fail-closed
|
||||
property is for ELEVATED ops (admin scope), not for the basic
|
||||
write that operator has by default. This test pins the contract:
|
||||
config errors do not silently grant admin."""
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", side_effect=RuntimeError("boom")):
|
||||
# operator has memory.write → True (preserved behavior)
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is True
|
||||
|
||||
|
||||
# ============== check_memory_read_permission ==============
|
||||
|
||||
class TestCheckMemoryReadPermission:
|
||||
def test_admin_grants_read(self):
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["admin"])):
|
||||
assert a2a_tools_rbac.check_memory_read_permission() is True
|
||||
|
||||
def test_read_only_grants_read(self):
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["read-only"])):
|
||||
assert a2a_tools_rbac.check_memory_read_permission() is True
|
||||
|
||||
def test_unknown_role_denies(self):
|
||||
"""A role that's not in ROLE_PERMISSIONS and not in
|
||||
allowed_actions overrides denies by default."""
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["random-undefined-role"])):
|
||||
assert a2a_tools_rbac.check_memory_read_permission() is False
|
||||
|
||||
|
||||
# ============== auth_headers_for_heartbeat ==============
|
||||
|
||||
class TestAuthHeadersForHeartbeat:
|
||||
def test_no_workspace_id_uses_legacy_path(self):
|
||||
"""No-arg call routes to platform_auth.auth_headers() — the
|
||||
legacy single-token path."""
|
||||
import a2a_tools_rbac
|
||||
called: dict[str, object] = {}
|
||||
|
||||
def fake_auth_headers(*args):
|
||||
called["args"] = args
|
||||
return {"Authorization": "Bearer legacy-token"}
|
||||
|
||||
with patch("platform_auth.auth_headers", fake_auth_headers):
|
||||
out = a2a_tools_rbac.auth_headers_for_heartbeat()
|
||||
assert out == {"Authorization": "Bearer legacy-token"}
|
||||
# Legacy path is auth_headers() with no arg
|
||||
assert called["args"] == ()
|
||||
|
||||
def test_with_workspace_id_routes_per_workspace(self):
|
||||
import a2a_tools_rbac
|
||||
called: dict[str, object] = {}
|
||||
|
||||
def fake_auth_headers(wsid):
|
||||
called["wsid"] = wsid
|
||||
return {"Authorization": f"Bearer tok-{wsid}"}
|
||||
|
||||
with patch("platform_auth.auth_headers", fake_auth_headers):
|
||||
out = a2a_tools_rbac.auth_headers_for_heartbeat("ws-abc")
|
||||
assert out == {"Authorization": "Bearer tok-ws-abc"}
|
||||
assert called["wsid"] == "ws-abc"
|
||||
|
||||
def test_returns_empty_when_platform_auth_missing(self, monkeypatch):
|
||||
"""Older installs without platform_auth get {} so callers don't
|
||||
crash — they'll just send unauthed and the platform 401 handler
|
||||
surfaces the real error."""
|
||||
import a2a_tools_rbac
|
||||
# Force ImportError by setting sys.modules entry to None
|
||||
monkeypatch.setitem(sys.modules, "platform_auth", None)
|
||||
out = a2a_tools_rbac.auth_headers_for_heartbeat("ws-1")
|
||||
assert out == {}
|
||||
|
||||
|
||||
# ============== ROLE_PERMISSIONS canonical table ==============
|
||||
|
||||
class TestRolePermissionsTable:
|
||||
def test_admin_has_all_actions(self):
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools_rbac.ROLE_PERMISSIONS["admin"] == {
|
||||
"delegate", "approve", "memory.read", "memory.write",
|
||||
}
|
||||
|
||||
def test_read_only_has_only_memory_read(self):
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools_rbac.ROLE_PERMISSIONS["read-only"] == {"memory.read"}
|
||||
|
||||
def test_no_delegation_is_missing_delegate(self):
|
||||
import a2a_tools_rbac
|
||||
assert "delegate" not in a2a_tools_rbac.ROLE_PERMISSIONS["no-delegation"]
|
||||
|
||||
def test_no_approval_is_missing_approve(self):
|
||||
import a2a_tools_rbac
|
||||
assert "approve" not in a2a_tools_rbac.ROLE_PERMISSIONS["no-approval"]
|
||||
@ -13,6 +13,7 @@ from pathlib import Path
|
||||
import pytest
|
||||
|
||||
import mcp_cli
|
||||
import mcp_heartbeat
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@ -739,8 +740,13 @@ def test_heartbeat_loop_calls_persist_on_success(monkeypatch):
|
||||
def fake_persist(resp):
|
||||
saw.append(resp)
|
||||
|
||||
# Patch on mcp_heartbeat — that's where heartbeat_loop's internal
|
||||
# name resolution looks up persist_inbound_secret_from_heartbeat
|
||||
# after the RFC #2873 iter 3 split. The mcp_cli._persist_…_from_heartbeat
|
||||
# back-compat re-export still exists, but patching it here would not
|
||||
# affect the loop body.
|
||||
monkeypatch.setattr(
|
||||
mcp_cli, "_persist_inbound_secret_from_heartbeat", fake_persist
|
||||
mcp_heartbeat, "persist_inbound_secret_from_heartbeat", fake_persist
|
||||
)
|
||||
|
||||
class FakeResp:
|
||||
@ -786,8 +792,8 @@ def test_heartbeat_loop_skips_persist_on_4xx(monkeypatch):
|
||||
"""Heartbeat 4xx error path must NOT invoke persist (no body to trust)."""
|
||||
saw: list[object] = []
|
||||
monkeypatch.setattr(
|
||||
mcp_cli,
|
||||
"_persist_inbound_secret_from_heartbeat",
|
||||
mcp_heartbeat,
|
||||
"persist_inbound_secret_from_heartbeat",
|
||||
lambda r: saw.append(r),
|
||||
)
|
||||
|
||||
@ -899,7 +905,7 @@ def test_heartbeat_single_401_logs_warning_not_error(monkeypatch, caplog):
|
||||
transient platform blip. Log at WARNING; don't shout."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
caplog.set_level(logging.WARNING, logger="mcp_heartbeat")
|
||||
|
||||
_multi_iter_runner(monkeypatch, [401])
|
||||
|
||||
@ -923,7 +929,7 @@ def test_heartbeat_three_consecutive_401s_escalates_to_error(monkeypatch, caplog
|
||||
LOUD ERROR with re-onboard guidance — not buried at WARNING."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
caplog.set_level(logging.WARNING, logger="mcp_heartbeat")
|
||||
|
||||
_multi_iter_runner(monkeypatch, [401, 401, 401])
|
||||
|
||||
@ -949,7 +955,7 @@ def test_heartbeat_403_treated_same_as_401(monkeypatch, caplog):
|
||||
not authorized for this workspace). Same escalation path."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
caplog.set_level(logging.WARNING, logger="mcp_heartbeat")
|
||||
|
||||
_multi_iter_runner(monkeypatch, [403, 403, 403])
|
||||
|
||||
@ -963,7 +969,7 @@ def test_heartbeat_recovery_resets_consecutive_counter(monkeypatch, caplog):
|
||||
later should NOT immediately escalate."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
caplog.set_level(logging.WARNING, logger="mcp_heartbeat")
|
||||
|
||||
# Two 401s, then 200, then one 401. If counter resets correctly,
|
||||
# the final 401 is "1 consecutive" and should NOT escalate.
|
||||
@ -982,7 +988,7 @@ def test_heartbeat_500_does_not_increment_auth_counter(monkeypatch, caplog):
|
||||
misleading the operator."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
caplog.set_level(logging.WARNING, logger="mcp_heartbeat")
|
||||
|
||||
_multi_iter_runner(monkeypatch, [500, 500, 500])
|
||||
|
||||
|
||||
231
workspace/tests/test_mcp_cli_split.py
Normal file
231
workspace/tests/test_mcp_cli_split.py
Normal file
@ -0,0 +1,231 @@
|
||||
"""RFC #2873 iter 3 — drift gate + behavior tests for the post-split surface.
|
||||
|
||||
The bulk of the heartbeat / resolver behavior is exercised by
|
||||
``test_mcp_cli.py`` and ``test_mcp_cli_multi_workspace.py`` through the
|
||||
``mcp_cli._symbol`` back-compat aliases. This file pins:
|
||||
|
||||
1. The split is **behavior-neutral via aliasing** — every previously-
|
||||
exposed ``mcp_cli._foo`` symbol is the SAME callable as the new
|
||||
module's authoritative function. If a refactor accidentally drops
|
||||
an alias or points it at a stale copy, this fails.
|
||||
|
||||
2. ``mcp_inbox_pollers.start_inbox_pollers`` works for both single-
|
||||
workspace (legacy back-compat) and multi-workspace shapes.
|
||||
``mcp_cli`` had no direct test for this branch before the split.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
|
||||
import pytest
|
||||
|
||||
import mcp_cli
|
||||
import mcp_heartbeat
|
||||
import mcp_inbox_pollers
|
||||
import mcp_workspace_resolver
|
||||
|
||||
|
||||
# ============== Drift gate: back-compat aliases point at the real fn ==============
|
||||
|
||||
class TestBackCompatAliases:
|
||||
"""Pin that ``mcp_cli._foo is real_fn``. A test that re-implements
|
||||
the alias would still pass — the ``is`` check guarantees we didn't
|
||||
create a wrapper that drifts."""
|
||||
|
||||
def test_heartbeat_aliases(self):
|
||||
assert mcp_cli._build_agent_card is mcp_heartbeat.build_agent_card
|
||||
assert mcp_cli._platform_register is mcp_heartbeat.platform_register
|
||||
assert mcp_cli._heartbeat_loop is mcp_heartbeat.heartbeat_loop
|
||||
assert mcp_cli._log_heartbeat_auth_failure is mcp_heartbeat.log_heartbeat_auth_failure
|
||||
assert (
|
||||
mcp_cli._persist_inbound_secret_from_heartbeat
|
||||
is mcp_heartbeat.persist_inbound_secret_from_heartbeat
|
||||
)
|
||||
assert mcp_cli._start_heartbeat_thread is mcp_heartbeat.start_heartbeat_thread
|
||||
|
||||
def test_resolver_aliases(self):
|
||||
assert mcp_cli._resolve_workspaces is mcp_workspace_resolver.resolve_workspaces
|
||||
assert mcp_cli._print_missing_env_help is mcp_workspace_resolver.print_missing_env_help
|
||||
assert mcp_cli._read_token_file is mcp_workspace_resolver.read_token_file
|
||||
|
||||
def test_inbox_pollers_alias(self):
|
||||
assert mcp_cli._start_inbox_pollers is mcp_inbox_pollers.start_inbox_pollers
|
||||
|
||||
def test_constants_match(self):
|
||||
assert (
|
||||
mcp_cli.HEARTBEAT_INTERVAL_SECONDS
|
||||
== mcp_heartbeat.HEARTBEAT_INTERVAL_SECONDS
|
||||
)
|
||||
assert (
|
||||
mcp_cli._HEARTBEAT_AUTH_LOUD_THRESHOLD
|
||||
== mcp_heartbeat.HEARTBEAT_AUTH_LOUD_THRESHOLD
|
||||
)
|
||||
assert (
|
||||
mcp_cli._HEARTBEAT_AUTH_RELOG_INTERVAL
|
||||
== mcp_heartbeat.HEARTBEAT_AUTH_RELOG_INTERVAL
|
||||
)
|
||||
|
||||
|
||||
# ============== mcp_inbox_pollers — both shapes + degraded import ==============
|
||||
|
||||
class _FakeInboxState:
|
||||
def __init__(self, **kwargs):
|
||||
self.kwargs = kwargs
|
||||
|
||||
|
||||
def _install_fake_inbox(monkeypatch):
|
||||
"""Inject a fake ``inbox`` module so we observe the spawn calls
|
||||
without pulling in the real platform_auth dependency tree."""
|
||||
activations: list[_FakeInboxState] = []
|
||||
spawned: list[tuple[_FakeInboxState, str, str]] = []
|
||||
cursor_paths: list[str] = []
|
||||
|
||||
def default_cursor_path(wsid=None):
|
||||
# Mirror the real signature: optional wsid → distinct path per id,
|
||||
# absent → legacy single path.
|
||||
path = f"/tmp/.mcp_inbox_cursor.{wsid[:8]}" if wsid else "/tmp/.mcp_inbox_cursor"
|
||||
cursor_paths.append(path)
|
||||
return path
|
||||
|
||||
def activate(state):
|
||||
activations.append(state)
|
||||
|
||||
def start_poller_thread(state, platform_url, wsid):
|
||||
spawned.append((state, platform_url, wsid))
|
||||
|
||||
fake = types.ModuleType("inbox")
|
||||
fake.InboxState = _FakeInboxState
|
||||
fake.activate = activate
|
||||
fake.default_cursor_path = default_cursor_path
|
||||
fake.start_poller_thread = start_poller_thread
|
||||
monkeypatch.setitem(sys.modules, "inbox", fake)
|
||||
return activations, spawned, cursor_paths
|
||||
|
||||
|
||||
class TestStartInboxPollers:
|
||||
def test_single_workspace_uses_legacy_cursor_path(self, monkeypatch):
|
||||
"""Back-compat exact: single-workspace mode reuses the legacy
|
||||
cursor filename so an existing operator's on-disk state isn't
|
||||
invalidated by upgrade."""
|
||||
activations, spawned, cursor_paths = _install_fake_inbox(monkeypatch)
|
||||
|
||||
mcp_inbox_pollers.start_inbox_pollers(
|
||||
"https://test.moleculesai.app", ["ws-only-one"]
|
||||
)
|
||||
|
||||
assert len(activations) == 1, "exactly one inbox.activate call"
|
||||
assert len(spawned) == 1, "exactly one poller thread spawned"
|
||||
# Single-workspace path uses default_cursor_path() with no arg —
|
||||
# the cursor_path captured here must be the legacy filename
|
||||
# (no per-ws suffix).
|
||||
assert cursor_paths == ["/tmp/.mcp_inbox_cursor"]
|
||||
# State carries cursor_path, not cursor_paths
|
||||
state = activations[0]
|
||||
assert state.kwargs == {"cursor_path": "/tmp/.mcp_inbox_cursor"}
|
||||
# Spawned poller is for the right workspace
|
||||
assert spawned[0] == (state, "https://test.moleculesai.app", "ws-only-one")
|
||||
|
||||
def test_multi_workspace_uses_per_workspace_cursor_paths(self, monkeypatch):
|
||||
"""Multi-workspace path: per-workspace cursor file, one shared
|
||||
InboxState. N pollers, each pointed at the same state so the
|
||||
agent's inbox_peek/pop sees a merged view."""
|
||||
activations, spawned, _ = _install_fake_inbox(monkeypatch)
|
||||
|
||||
wsids = ["ws-aaaaaaaa", "ws-bbbbbbbb", "ws-cccccccc"]
|
||||
mcp_inbox_pollers.start_inbox_pollers(
|
||||
"https://test.moleculesai.app", wsids
|
||||
)
|
||||
|
||||
# One state, one activate, three pollers
|
||||
assert len(activations) == 1
|
||||
assert len(spawned) == 3
|
||||
state = activations[0]
|
||||
# Multi-workspace state carries cursor_paths (mapping)
|
||||
assert "cursor_paths" in state.kwargs
|
||||
assert set(state.kwargs["cursor_paths"].keys()) == set(wsids)
|
||||
# All pollers share the same state
|
||||
for s, _url, _wsid in spawned:
|
||||
assert s is state
|
||||
# All workspace ids covered
|
||||
assert sorted(t[2] for t in spawned) == sorted(wsids)
|
||||
|
||||
def test_inbox_module_unavailable_logs_and_returns(self, monkeypatch, caplog):
|
||||
"""If ``import inbox`` fails (older install or stripped
|
||||
runtime), spawn must NOT raise — log a warning and continue.
|
||||
The MCP server can still serve outbound tools."""
|
||||
import logging
|
||||
|
||||
# Force ImportError by injecting a module sentinel that raises.
|
||||
class _Boom:
|
||||
def __getattr__(self, _name):
|
||||
raise ImportError("inbox stripped from this build")
|
||||
|
||||
# Setting sys.modules["inbox"] to a broken object isn't enough —
|
||||
# the import statement reads sys.modules first; if the entry is
|
||||
# truthy, Python returns it. We need to force the import to raise.
|
||||
# Easiest: pre-poison sys.modules so the `import inbox` line
|
||||
# raises by setting the entry to None (Python special-cases None
|
||||
# as "explicit ImportError").
|
||||
monkeypatch.setitem(sys.modules, "inbox", None)
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_inbox_pollers")
|
||||
# Should not raise.
|
||||
mcp_inbox_pollers.start_inbox_pollers(
|
||||
"https://test.moleculesai.app", ["ws-1"]
|
||||
)
|
||||
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
|
||||
assert any("inbox module unavailable" in r.message for r in warnings), (
|
||||
f"expected a 'inbox module unavailable' warning, got: "
|
||||
f"{[r.message for r in warnings]}"
|
||||
)
|
||||
|
||||
|
||||
# ============== mcp_heartbeat.build_agent_card — short direct tests ==============
|
||||
|
||||
class TestBuildAgentCardDirect:
|
||||
"""Spot-check the new module's public surface; the full test matrix
|
||||
lives in ``test_mcp_cli.py`` reaching through ``mcp_cli._build_agent_card``.
|
||||
"""
|
||||
|
||||
def test_default_card_shape(self, monkeypatch):
|
||||
for v in ("MOLECULE_AGENT_NAME", "MOLECULE_AGENT_DESCRIPTION", "MOLECULE_AGENT_SKILLS"):
|
||||
monkeypatch.delenv(v, raising=False)
|
||||
card = mcp_heartbeat.build_agent_card("8dad3e29-c32a-4ec7-9ea7-94fe2d2d98ec")
|
||||
assert card == {"name": "molecule-mcp-8dad3e29", "skills": []}
|
||||
|
||||
def test_skills_csv_split_and_trim(self, monkeypatch):
|
||||
monkeypatch.setenv("MOLECULE_AGENT_SKILLS", "research, , code-review,memory-curation, ")
|
||||
card = mcp_heartbeat.build_agent_card("ws-1")
|
||||
assert card["skills"] == [
|
||||
{"name": "research"},
|
||||
{"name": "code-review"},
|
||||
{"name": "memory-curation"},
|
||||
]
|
||||
|
||||
|
||||
# ============== mcp_workspace_resolver — short direct tests ==============
|
||||
|
||||
class TestResolveWorkspacesDirect:
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolate(self, monkeypatch, tmp_path):
|
||||
for v in ("WORKSPACE_ID", "MOLECULE_WORKSPACE_TOKEN", "MOLECULE_WORKSPACES"):
|
||||
monkeypatch.delenv(v, raising=False)
|
||||
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
|
||||
yield
|
||||
|
||||
def test_single_workspace_via_env(self, monkeypatch):
|
||||
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
||||
out, errors = mcp_workspace_resolver.resolve_workspaces()
|
||||
assert out == [("ws-1", "tok")]
|
||||
assert errors == []
|
||||
|
||||
def test_multi_workspace_via_json_env(self, monkeypatch):
|
||||
monkeypatch.setenv(
|
||||
"MOLECULE_WORKSPACES",
|
||||
'[{"id":"ws-a","token":"a"},{"id":"ws-b","token":"b"}]',
|
||||
)
|
||||
out, errors = mcp_workspace_resolver.resolve_workspaces()
|
||||
assert out == [("ws-a", "a"), ("ws-b", "b")]
|
||||
assert errors == []
|
||||
Loading…
Reference in New Issue
Block a user