Compare commits

..

1 Commits

Author SHA1 Message Date
Molecule AI · core-uiux 3fd38e6deb fix(canvas): keep "agent is working" indicator alive for external/poll-mode turns
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 5s
E2E API Smoke Test / detect-changes (pull_request) Successful in 5s
E2E Chat / detect-changes (pull_request) Successful in 5s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
Harness Replays / detect-changes (pull_request) Successful in 6s
CI / Platform (Go) (pull_request) Successful in 4m58s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m0s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 4s
gate-check-v3 / gate-check (pull_request) Successful in 4s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 6s
qa-review / approved (pull_request) Successful in 4s
security-review / approved (pull_request) Successful in 4s
sop-tier-check / tier-check (pull_request) Successful in 6s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Python Lint & Test (pull_request) Successful in 2s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2s
E2E Chat / E2E Chat (pull_request) Failing after 2s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2s
Harness Replays / Harness Replays (pull_request) Successful in 2s
CI / Canvas (Next.js) (pull_request) Successful in 6m49s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 1s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Successful in 2s
sop-checklist / all-items-acked (pull_request) [info tier:low] acked: 5/7 — missing: root-cause, no-backwards-compat
sop-checklist / na-declarations (pull_request) N/A: (none)
audit-force-merge / audit (pull_request) Successful in 6s
External / MCP-registered workspaces (delivery_mode=poll, e.g. the
local-Mac orchestrator) have no public URL: POST /workspaces/:id/a2a
short-circuits server-side (a2a_proxy.go:402) and returns a synthetic
{status:"queued", delivery_mode:"poll"} envelope IMMEDIATELY with no
reply. useChatSend treated that as a terminal response and called
releaseSendGuards() → `sending` went false the instant the POST
returned → the ChatTab thinking indicator vanished and the external
turn looked dead, even though the agent had not started. Internal
agents reply on the same synchronous POST so their indicator naturally
holds — that's the whole asymmetry; the transport is shared.

Fix (Tier A, minimal, client-only): detect the synthetic poll envelope
and KEEP `sending` true so the existing internal working-indicator
persists as a "received — agent is working" state. The eventual reply
already flows AgentMessageWriter → AGENT_MESSAGE WS push →
useChatSocket onAgentMessage/onSendComplete → releaseSendGuards, i.e.
the external reply now drives the indicator through the exact same path
an internal async reply uses — no parallel system. A generous 15-min
safety timer surfaces an honest, actionable error instead of an
infinite spinner if an offline poll agent never replies.

Tier B (lifecycle-driven interim progress) and Tier C (tool-call
parity — has an operator-privacy tradeoff, CTO decision required) are
designed in internal RFC external-workspace-canvas-progress-feedback,
not in this PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 11:12:16 -07:00
83 changed files with 802 additions and 4529 deletions
-1
View File
@@ -1 +0,0 @@
refire:1778784369
+4 -11
View File
@@ -203,17 +203,12 @@ def ci_jobs_all(ci_doc: dict) -> set[str]:
def ci_job_names(ci_doc: dict) -> set[str]:
"""Set of job keys in ci.yml MINUS the sentinel itself MINUS jobs
whose `if:` gates on `github.event_name` or `github.ref` (those are
event-scoped and can legitimately be `skipped` for a given trigger;
if we required them under the sentinel `needs:`, every PR-only job
whose `if:` gates on `github.event_name` (those are event-scoped
and can legitimately be `skipped` for a given trigger; if we
required them under the sentinel `needs:`, every PR-only job
would be `skipped` on push and the sentinel would interpret
`skipped != success` as failure). RFC §4 spec.
`github.ref` is the companion gate for jobs that run only on direct
pushes to specific branches (e.g. `github.ref == 'refs/heads/main'`).
These never execute in a PR context, so flagging them as missing
from `all-required.needs:` is a false positive (mc#958 / mc#959).
Used for F1 (jobs missing from sentinel needs). NOT used for F1b
(typos in needs) — see `ci_jobs_all` for that."""
jobs = ci_doc.get("jobs")
@@ -226,9 +221,7 @@ def ci_job_names(ci_doc: dict) -> set[str]:
continue
if isinstance(v, dict):
gate = v.get("if")
if isinstance(gate, str) and (
"github.event_name" in gate or "github.ref" in gate
):
if isinstance(gate, str) and "github.event_name" in gate:
continue
names.add(k)
return names
+18 -95
View File
@@ -65,11 +65,6 @@ class ApiError(RuntimeError):
pass
class MergePermissionError(ApiError):
"""Merge failed with a permanent permission error (403/404/405).
The queue should skip this PR and move to the next one."""
@dataclasses.dataclass(frozen=True)
class MergeDecision:
ready: bool
@@ -153,38 +148,15 @@ def latest_statuses_by_context(statuses: list[dict]) -> dict[str, dict]:
return latest
def _is_tier_low_pending_ok(
latest_statuses: dict[str, dict],
context: str,
pr_labels: set[str],
) -> bool:
"""Return True if tier:low PR can tolerate sop-checklist pending state.
Per sop-checklist-config.yaml tier_failure_mode, tier:low uses soft-fail:
sop-checklist posts state=pending when acks are satisfied (missing
manager/ceo acks are informational only). The queue should accept
pending instead of waiting for success.
"""
if "tier:low" not in pr_labels:
return False
if "sop-checklist" not in context:
return False
status = latest_statuses.get(context) or {}
return status_state(status) == "pending"
def required_contexts_green(
latest_statuses: dict[str, dict],
contexts: list[str],
pr_labels: set[str] | None = None,
) -> tuple[bool, list[str]]:
missing_or_bad: list[str] = []
for context in contexts:
status = latest_statuses.get(context)
state = status_state(status or {})
if state != "success":
if pr_labels and _is_tier_low_pending_ok(latest_statuses, context, pr_labels):
continue # tier:low soft-fail: accept pending sop-checklist
missing_or_bad.append(f"{context}={state or 'missing'}")
return not missing_or_bad, missing_or_bad
@@ -237,7 +209,6 @@ def evaluate_merge_readiness(
pr_status: dict,
required_contexts: list[str],
pr_has_current_base: bool,
pr_labels: set[str] | None = None,
) -> MergeDecision:
# Check push-required contexts explicitly instead of combined state.
# Combined state can be "failure" due to non-blocking jobs
@@ -257,7 +228,7 @@ def evaluate_merge_readiness(
# The required_contexts list is the authoritative gate — it includes only
# the checks that actually block merges.
latest = latest_statuses_by_context(pr_status.get("statuses") or [])
ok, missing_or_bad = required_contexts_green(latest, required_contexts, pr_labels)
ok, missing_or_bad = required_contexts_green(latest, required_contexts)
if not ok:
return MergeDecision(False, "wait", "required contexts not green: " + ", ".join(missing_or_bad))
return MergeDecision(True, "merge", "ready")
@@ -282,32 +253,27 @@ def get_combined_status(sha: str) -> dict:
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
if not isinstance(combined, dict):
raise ApiError(f"status for {sha} response not object")
combined_statuses: list[dict] = combined.get("statuses") or []
# Fetch full statuses list; 200 covers >99% of real-world runs.
# The list is ordered ascending by id (oldest first) — callers must
# iterate in reverse to get the newest entry per context.
# Best-effort: large repos (main with 550+ statuses) may time out.
# On timeout, fall back to the statuses[] already in the combined
# response (usually 30 entries — enough for most PRs, enough for
# main's early push-required contexts).
try:
_, all_statuses_raw = api(
_, all_statuses = api(
"GET",
f"/repos/{OWNER}/{NAME}/commits/{sha}/statuses",
query={"limit": "50"},
)
if isinstance(all_statuses_raw, list):
all_statuses: list[dict] = list(all_statuses_raw)
else:
all_statuses = []
if isinstance(all_statuses, list):
combined["statuses"] = all_statuses
except (ApiError, urllib.error.URLError, TimeoutError, OSError) as exc:
# URLError covers network-level failures (DNS, refused, timeout).
# TimeoutError and OSError cover socket-level timeouts.
sys.stderr.write(f"::warning::could not fetch full statuses list for {sha[:8]}: {exc}\n")
all_statuses = []
# Build latest per context: process combined (ascending→reverse=newest
# first), then fill gaps from all_statuses (already newest-first).
latest: dict[str, dict] = {}
for status in reversed(sorted(combined_statuses, key=lambda s: s.get("id") or 0)):
ctx = status.get("context")
if isinstance(ctx, str) and ctx not in latest:
latest[ctx] = status
for status in all_statuses:
ctx = status.get("context")
if isinstance(ctx, str) and ctx not in latest:
latest[ctx] = status
combined["statuses"] = list(latest.values())
# Fall back to the statuses[] already in the combined response.
pass
return combined
@@ -372,16 +338,7 @@ def merge_pull(pr_number: int, *, dry_run: bool) -> None:
print(f"::notice::merging PR #{pr_number}")
if dry_run:
return
try:
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
except ApiError as exc:
# Re-raise permission-like errors so process_once can skip this PR.
# 403 = no push access, 404 = repo/pr not found, 405 = not allowed.
msg = str(exc)
for code in ("403", "404", "405"):
if code in msg:
raise MergePermissionError(msg) from exc
raise # re-raise other ApiErrors unchanged
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
def process_once(*, dry_run: bool = False) -> int:
@@ -423,13 +380,11 @@ def process_once(*, dry_run: bool = False) -> int:
commits = get_pull_commits(pr_number)
current_base = pr_has_current_base(pr, commits, main_sha)
pr_status = get_combined_status(head_sha)
pr_labels = label_names(pr)
decision = evaluate_merge_readiness(
main_status=main_status,
pr_status=pr_status,
required_contexts=contexts,
pr_has_current_base=current_base,
pr_labels=pr_labels,
)
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
@@ -452,25 +407,7 @@ def process_once(*, dry_run: bool = False) -> int:
"deferring to next tick"
)
return 0
try:
merge_pull(pr_number, dry_run=dry_run)
except MergePermissionError as exc:
# Permanent merge failure (HTTP 403/404/405). Post a comment so
# maintainers know why, then return 0 so this tick is done.
# The PR stays in the queue; future ticks can retry after the
# permission issue is resolved.
sys.stderr.write(f"::error::merge permission error for PR #{pr_number}: {exc}\n")
post_comment(
pr_number,
(
"merge-queue: merge failed with HTTP 405 'User not allowed to merge PR'. "
"No available token has Can-merge permission on this repo. "
"Fix: grant Can-merge to a token, or add a maintain/admin collaborator. "
"Skipping to next queued PR on next tick."
),
dry_run=dry_run,
)
return 0
merge_pull(pr_number, dry_run=dry_run)
return 0
return 0
@@ -480,21 +417,7 @@ def main() -> int:
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
_require_runtime_env()
try:
return process_once(dry_run=args.dry_run)
except ApiError as exc:
# API errors (401/403/404/500) are transient for a queue tick —
# log and exit 0 so the workflow is not marked failed and the next
# tick can retry. Returning non-zero would permanently fail the
# workflow run, blocking future ticks.
sys.stderr.write(f"::error::queue API error: {exc}\n")
return 0
except urllib.error.URLError as exc:
sys.stderr.write(f"::error::queue network error: {exc}\n")
return 0
except TimeoutError as exc:
sys.stderr.write(f"::error::queue timeout: {exc}\n")
return 0
return process_once(dry_run=args.dry_run)
if __name__ == "__main__":
+25 -168
View File
@@ -68,7 +68,7 @@ import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Callable
from typing import Any
# ---------------------------------------------------------------------------
@@ -110,7 +110,7 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s
# for /sop-revoke (RFC#351 open question 4 — reason is captured but not
# yet validated; future iteration may require a min-length).
_DIRECTIVE_RE = re.compile(
r"^[ \t]*/(sop-ack|sop-revoke|sop-n/a)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
r"^[ \t]*/(sop-ack|sop-revoke)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
re.MULTILINE,
)
@@ -118,21 +118,19 @@ _DIRECTIVE_RE = re.compile(
def parse_directives(
comment_body: str,
numeric_aliases: dict[int, str],
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
"""Extract /sop-ack, /sop-revoke, and /sop-n/a directives from a comment body.
) -> tuple[list[tuple[str, str, str]], list]:
"""Extract /sop-ack and /sop-revoke directives from a comment body.
Returns (directives, na_directives) where each is a list of
(kind, canonical_slug, note) tuples:
kind is "sop-ack", "sop-revoke", or "sop-n/a"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
The two lists are kept separate so call sites can unpack them
directly (e.g. directives, na_directives = parse_directives(...)).
Returns (directives, na_directives) where:
directives is a list of (kind, canonical_slug, note) tuples
kind is "sop-ack" or "sop-revoke"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
na_directives is reserved for future N/A handling (always [] for now)
"""
directives: list[tuple[str, str, str]] = []
na_directives: list[tuple[str, str, str]] = []
out: list[tuple[str, str, str]] = []
if not comment_body:
return directives, na_directives
return out, []
for m in _DIRECTIVE_RE.finditer(comment_body):
kind = m.group(1)
raw_slug = (m.group(2) or "").strip()
@@ -162,12 +160,8 @@ def parse_directives(
note_from_group = (m.group(3) or "").strip()
# If we collapsed multi-word slug into kebab and there's a
# trailing-text group too, append it.
entry = (kind, canonical, note_from_group)
if kind == "sop-n/a":
na_directives.append(entry)
else:
directives.append(entry)
return directives, na_directives
out.append((kind, canonical, note_from_group))
return out, []
# ---------------------------------------------------------------------------
@@ -180,8 +174,8 @@ def section_marker_present(body: str, marker: str) -> bool:
on a non-empty line (i.e. the author actually filled it in).
We require the marker substring AND non-whitespace content on the
same line OR within the next non-blank line — this prevents
trivially-empty checklists like:
same line OR within the next line — this prevents trivially-empty
checklists like:
## SOP-Checklist
- [ ] **Comprehensive testing performed**:
@@ -190,18 +184,9 @@ def section_marker_present(body: str, marker: str) -> bool:
from auto-passing the section-present check. The peer-ack is still
required, but answering with empty content is captured as a soft
finding via the section-present test alone.
NOTE: we scan forward through blank lines (the markdown-header pattern
is ## Header\\n\\ncontent) so that a header + blank-line + content
structure still satisfies the check. The backward checkbox fallback
catches inline markers without a preceding checkbox (mc#1099).
"""
if not body or not marker:
return False
# Strip trailing whitespace so the blank-line scan below can find
# content that appears on the very last line of the body (without
# being misled by a trailing \n or spaces).
body = body.rstrip()
body_lower = body.lower()
marker_lower = marker.lower()
idx = body_lower.find(marker_lower)
@@ -217,44 +202,13 @@ def section_marker_present(body: str, marker: str) -> bool:
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
# Fall through: scan forward, skipping blank-only lines, until we find
# non-empty content or run out of body. Handles:
# ## Header ← marker line (empty after marker)
# ← blank line (skipped)
# - actual content ← found
pos = line_end
while True:
# Skip the current newline and any additional newlines (blank lines).
while pos < len(body) and body[pos] == "\n":
pos += 1
if pos >= len(body):
break
line_end = body.find("\n", pos)
if line_end < 0:
line_end = len(body)
line = body[pos:line_end]
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
pos = line_end
# Last resort: the marker may appear mid-sentence (e.g.
# **Memory/saved-feedback consulted**: No applicable...).
# Search backward within the CURRENT LINE only (not preceding lines)
# to find a checkbox on the same line before the marker text.
# mc#1099 follow-up: memory-consulted detection was failing because
# the checkbox was on the same line before the inline marker.
_CHECKBOX_RE = re.compile(r"- \[[ x\]]|<input", re.IGNORECASE)
line_start = body.rfind("\n", 0, idx) + 1 # 0 if no newline before idx
before = body[line_start:idx]
m = _CHECKBOX_RE.search(before)
if not m:
return False
# Require meaningful content between the checkbox and the marker text
# (markdown formatting like ** or * must also be stripped).
# If only whitespace/markdown chars remain, the checkbox line is empty.
between = before[m.end() :]
stripped_between = re.sub(r"[\s\*:#\[\]_\-]+", "", between)
return bool(stripped_between)
# Fall through: check the NEXT line (multi-line answers).
next_line_end = body.find("\n", line_end + 1)
if next_line_end < 0:
next_line_end = len(body)
next_line = body[line_end + 1:next_line_end]
stripped_next = re.sub(r"[\s\*:\-\[\]]+", "", next_line)
return bool(stripped_next)
# ---------------------------------------------------------------------------
@@ -297,7 +251,8 @@ def compute_ack_state(
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, slug, _note in parse_directives(body, numeric_aliases)[0]:
directives, _na = parse_directives(body, numeric_aliases)
for kind, slug, _note in directives:
if not slug:
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
continue
@@ -349,63 +304,6 @@ def compute_ack_state(
}
# ---------------------------------------------------------------------------
# N/A-gate evaluation
# ---------------------------------------------------------------------------
def compute_na_state(
comments: list[dict[str, Any]],
author: str,
na_gates: dict[str, Any],
probe: Callable[[str, list[str]], list[str]],
) -> dict[str, dict[str, Any]]:
"""Evaluate which N/A gates have a valid declaration from a team member.
Returns dict[gate_name, dict] where each dict has:
declared: bool — at least one valid non-author team-member declared N/A
decl_ackers: list[str] — usernames who declared this gate N/A
rejected: dict with keys:
not_in_team: list[str] — users who tried but aren't in required teams
"""
# Build per-user latest N/A directive (most-recent wins per RFC#324).
latest_na: dict[str, tuple[str, str]] = {} # user → (gate, note)
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, gate, note in parse_directives(body, {})[1]:
# [1] = na_directives only
if gate in na_gates:
latest_na[user] = (gate, note)
result: dict[str, dict[str, Any]] = {}
for gate, gate_cfg in na_gates.items():
result[gate] = {
"declared": False,
"decl_ackers": [],
"rejected": {"not_in_team": []},
}
decl_ackers: list[str] = []
not_in_team: list[str] = []
for user, (g, _note) in latest_na.items():
if g != gate:
continue
if user == author:
continue # authors cannot self-declare N/A
approved = probe(gate, [user])
if approved:
decl_ackers.append(user)
else:
not_in_team.append(user)
result[gate]["declared"] = bool(decl_ackers)
result[gate]["decl_ackers"] = decl_ackers
result[gate]["rejected"]["not_in_team"] = not_in_team
return result
# ---------------------------------------------------------------------------
# Gitea API client
# ---------------------------------------------------------------------------
@@ -800,7 +698,6 @@ def main(argv: list[str] | None = None) -> int:
cfg = load_config(args.config)
items: list[dict[str, Any]] = cfg["items"]
items_by_slug = {it["slug"]: it for it in items}
na_gates: dict[str, Any] = cfg.get("n/a_gates", {})
numeric_aliases = {
int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias")
}
@@ -921,46 +818,6 @@ def main(argv: list[str] | None = None) -> int:
description=description, target_url=target_url,
)
print(f"::notice::status posted: {args.status_context}{state}")
# --- N/A gate status (RFC#324 §N/A follow-up) ---
# Post a separate status so review-check.sh can discover N/A declarations
# and waive the Gitea-approve requirement for that gate.
na_state: dict[str, dict[str, Any]] = {}
if na_gates:
na_state = compute_na_state(comments, author, na_gates, probe)
na_descs: list[str] = []
for gate, s in na_state.items():
if s["declared"]:
na_descs.append(gate)
decl = s["decl_ackers"]
rej = s["rejected"]["not_in_team"]
if decl:
print(f"::notice:: [N/A OK] {gate} — declared by {','.join(decl)}")
if rej:
print(
f"::notice:: [N/A REJ] {gate} — not-in-team: {','.join(rej)}",
file=sys.stderr,
)
na_desc = ", ".join(sorted(na_descs)) if na_descs else "(none)"
na_status_state = "success" if na_descs else "pending"
# review-check.sh reads the description to discover which gates are N/A.
# Include the gate names so it can grep for them.
na_description = f"N/A: {na_desc}" if na_descs else "N/A: (none)"
if not args.dry_run:
client.post_status(
args.owner, args.repo, head_sha,
state=na_status_state,
context="sop-checklist / na-declarations (pull_request)",
description=na_description,
target_url=target_url,
)
print(
f"::notice::na-declarations status → {na_status_state}: {na_description}"
)
# By default exit 0 — the POSTed status IS the gate, NOT the job
# conclusion. If the job exits 1 BP will see TWO failure signals
# (one from the job's auto-status, one from our POST), making the
@@ -118,13 +118,3 @@ def test_merge_decision_updates_stale_pr_before_merge():
assert decision.ready is False
assert decision.action == "update"
def test_MergePermissionError_inherits_from_ApiError():
assert issubclass(mq.MergePermissionError, mq.ApiError)
def test_MergePermissionError_message_preserved():
exc = mq.MergePermissionError("POST /merge -> HTTP 405: User not allowed")
assert "405" in str(exc)
assert "User not allowed" in str(exc)
@@ -551,55 +551,3 @@ class TestEndToEndAckFlow(unittest.TestCase):
if __name__ == "__main__":
unittest.main(verbosity=2)
# ---------------------------------------------------------------------------
# compute_na_state
# ---------------------------------------------------------------------------
class TestComputeNaState(unittest.TestCase):
"""Tests for /sop-n/a directive evaluation."""
def test_no_na_declarations(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = []
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda *_: [])
self.assertFalse(na_state["qa-review"]["declared"])
self.assertFalse(na_state["security-review"]["declared"])
def test_na_declared_by_authorized_user(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("bob", "/sop-n/a qa-review N/A: pure tooling change")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u)
self.assertTrue(na_state["qa-review"]["declared"])
self.assertEqual(na_state["qa-review"]["decl_ackers"], ["bob"])
def test_na_declared_by_unauthorized_user_rejected(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("mallory", "/sop-n/a qa-review N/A: not real team")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: [])
self.assertFalse(na_state["qa-review"]["declared"])
self.assertEqual(na_state["qa-review"]["rejected"]["not_in_team"], ["mallory"])
def test_author_cannot_self_declare_na(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("alice", "/sop-n/a qa-review N/A: I am the author")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u)
self.assertFalse(na_state["qa-review"]["declared"])
def test_parse_directives_separates_na_from_ack(self):
directives, na_directives = sop.parse_directives(
"/sop-ack comprehensive-testing\n/sop-n/a qa-review N/A: no surface",
{},
)
self.assertEqual(len(directives), 1)
self.assertEqual(directives[0][0], "sop-ack")
self.assertEqual(len(na_directives), 1)
self.assertEqual(na_directives[0][0], "sop-n/a")
self.assertEqual(na_directives[0][1], "qa-review")
self.assertIn("no surface", na_directives[0][2])
+93 -109
View File
@@ -348,15 +348,16 @@ jobs:
# Shellcheck (E2E scripts) — required check, always runs.
shellcheck:
name: Shellcheck (E2E scripts)
needs: changes
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
steps:
- if: false
- if: needs.changes.outputs.scripts != 'true'
run: echo "No tests/e2e/ or infra/scripts/ changes — skipping real shellcheck; this job always runs to satisfy the required-check name on branch protection."
- if: always()
- if: needs.changes.outputs.scripts == 'true'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Run shellcheck on tests/e2e/*.sh and infra/scripts/*.sh
# shellcheck is pre-installed on ubuntu-latest runners (via apt).
# infra/scripts/ is included because setup.sh + nuke.sh gate the
@@ -367,16 +368,16 @@ jobs:
find tests/e2e infra/scripts -type f -name '*.sh' -print0 \
| xargs -0 shellcheck --severity=warning
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Lint cleanup-trap hygiene (RFC #2873)
run: bash tests/e2e/lint_cleanup_traps.sh
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Run E2E bash unit tests (no live infra)
run: |
bash tests/e2e/test_model_slug.sh
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Test ECR promote-tenant-image script (mock-driven, no live infra)
# Covers scripts/promote-tenant-image.sh — the codified
# :staging-latest → :latest ECR promote + tenant fleet redeploy
@@ -386,7 +387,7 @@ jobs:
run: |
bash scripts/test-promote-tenant-image.sh
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Shellcheck promote-tenant-image script
# scripts/ is excluded from the bulk shellcheck pass above (legacy
# SC3040/SC3043 cleanup pending). Run shellcheck explicitly on
@@ -406,8 +407,8 @@ jobs:
# ci_job_names() detects this as github.ref-gated and skips it from F1.
# The step-level exit 0 handles the "not main push" case; the job-level
# `if:` makes the gating explicit so the drift script sees it.
# Runs on both main and staging pushes; step exits 0 when not applicable.
if: ${{ github.ref == 'refs/heads/main' || github.ref == 'refs/heads/staging' }}
# continue-on-error removed (was mc#774 mask): step exits 0 when not applicable.
if: ${{ github.ref == 'refs/heads/staging' }}
needs: [changes, canvas-build]
steps:
- name: Write deploy reminder to step summary
@@ -458,6 +459,7 @@ jobs:
# Python Lint & Test — required check, always runs.
python-lint:
name: Python Lint & Test
needs: changes
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
@@ -467,25 +469,25 @@ jobs:
run:
working-directory: workspace
steps:
- if: false
- if: needs.changes.outputs.python != 'true'
working-directory: .
run: echo "No workspace/** changes — skipping real lint+test; this job always runs to satisfy the required-check name on branch protection."
- if: always()
- if: needs.changes.outputs.python == 'true'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- if: always()
- if: needs.changes.outputs.python == 'true'
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: '3.11'
cache: pip
cache-dependency-path: workspace/requirements.txt
- if: always()
- if: needs.changes.outputs.python == 'true'
run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov sqlalchemy>=2.0.0
# Coverage flags + fail-under floor moved into workspace/pytest.ini
# (issue #1817) so local `pytest` and CI use identical config.
- if: always()
- if: needs.changes.outputs.python == 'true'
run: python -m pytest --tb=short
- if: always()
- if: needs.changes.outputs.python == 'true'
name: Per-file critical-path coverage (MCP / inbox / auth)
# MCP-critical Python files have a per-file floor on top of the
# 86% total floor in pytest.ini. See issue #2790 for full rationale.
@@ -550,104 +552,86 @@ jobs:
# red silently merged through. See internal#286 for the three concrete
# tonight-of-2026-05-11 incidents that prompted the emergency bump.
#
# This job deliberately has no `needs:`. Gitea 1.22/act_runner can mark a
# job-level `if: always()` + `needs:` sentinel as skipped before upstream
# jobs settle, leaving branch protection with a permanent pending
# `CI / all-required` context. Instead, this independent sentinel polls the
# required commit-status contexts for this SHA and fails if any fail, skip,
# or never emit.
# Three properties of this job each close a failure mode:
#
# canvas-deploy-reminder is intentionally NOT included in all-required.needs.
# It is an informational main-push reminder, not a PR quality gate. Keeping
# it in this dependency list lets a skipped reminder skip the required
# sentinel before the `always()` guard can emit a branch-protection status.
# 1. `if: always()` — runs even when an upstream fails. Without it the
# sentinel is `skipped` and protection treats that as missing → merge
# ungated.
#
# 2. Assertion is `result == "success"` per dep, NOT `!= "failure"`.
# A `skipped` upstream (job gated by `if:` evaluating false, matrix
# entry that couldn't run) must NOT silently pass through.
# `skipped`-as-green is exactly the failure mode this gate closes.
#
# 3. `needs:` is the canonical list of "what counts as required."
# status_check_contexts will reference only `ci/all-required` (Step 5
# follow-up — branch-protection PATCH is Owners-tier per
# `feedback_never_admin_merge_bypass`, separate PR); a new job is
# added simply by listing it in `needs:` here.
# `.gitea/workflows/ci-required-drift.yml` files a [ci-drift] issue
# hourly if this list diverges from status_check_contexts or from
# audit-force-merge.yml's REQUIRED_CHECKS env (RFC §4 + §6).
#
# canvas-deploy-reminder is intentionally excluded from all-required.needs:
# it needs canvas-build, which is skipped on CI-only PRs (canvas=false).
# Including it in all-required.needs causes all-required to hang on
# every CI-only PR. Keep it runnable on PRs via its own
# `needs: [changes, canvas-build]` — the sentinel only aggregates the result.
#
# Phase 3 (RFC #219 §1) safety: underlying build jobs carry
# continue-on-error: true so their failures are masked to null (2026-05-12: re-enabled mc#774 interim)
# (Gitea suppresses status reporting for CoE jobs). This sentinel
# runs with continue-on-error: false so it always reports its
# result to the API — without this, the required-status entry
# (CI / all-required (pull_request)) is never created, which
# blocks PR merges. When Phase 3 ends, flip underlying jobs to
# continue-on-error: false; this sentinel can then be flipped to
# continue-on-error: true if a Phase-4 regression requires it.
continue-on-error: false
runs-on: ubuntu-latest
timeout-minutes: 45
timeout-minutes: 1
needs:
- changes
- platform-build
- canvas-build
- shellcheck
- python-lint
- canvas-deploy-reminder
if: ${{ always() }}
steps:
- name: Wait for required CI contexts
env:
GITEA_TOKEN: ${{ secrets.GITHUB_TOKEN }}
API_ROOT: ${{ github.server_url }}/api/v1
REPOSITORY: ${{ github.repository }}
COMMIT_SHA: ${{ github.sha }}
EVENT_NAME: ${{ github.event_name }}
- name: Assert every required dependency succeeded
run: |
set -euo pipefail
python3 - <<'PY'
import json
import os
import sys
import time
import urllib.error
import urllib.request
token = os.environ["GITEA_TOKEN"]
api_root = os.environ["API_ROOT"].rstrip("/")
repo = os.environ["REPOSITORY"]
sha = os.environ["COMMIT_SHA"]
event = os.environ["EVENT_NAME"]
required = [
f"CI / Detect changes ({event})",
f"CI / Platform (Go) ({event})",
f"CI / Canvas (Next.js) ({event})",
f"CI / Shellcheck (E2E scripts) ({event})",
f"CI / Python Lint & Test ({event})",
]
terminal_bad = {"failure", "error"}
deadline = time.time() + 40 * 60
last_summary = None
def fetch_statuses():
statuses = []
for page in range(1, 6):
url = f"{api_root}/repos/{repo}/commits/{sha}/statuses?page={page}&limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
with urllib.request.urlopen(req, timeout=10) as resp:
chunk = json.load(resp)
if not chunk:
break
statuses.extend(chunk)
latest = {}
for item in statuses:
ctx = item.get("context")
if not ctx:
continue
prev = latest.get(ctx)
if prev is None or (item.get("updated_at") or item.get("created_at") or "") >= (prev.get("updated_at") or prev.get("created_at") or ""):
latest[ctx] = item
return latest
while True:
try:
latest = fetch_statuses()
except (TimeoutError, OSError, urllib.error.URLError) as exc:
if time.time() >= deadline:
print(f"FAIL: status polling did not recover before deadline: {exc}", file=sys.stderr)
sys.exit(1)
print(f"WARN: status poll failed, retrying: {exc}", flush=True)
time.sleep(15)
continue
states = {ctx: (latest.get(ctx) or {}).get("status") or (latest.get(ctx) or {}).get("state") or "missing" for ctx in required}
summary = ", ".join(f"{ctx}={state}" for ctx, state in states.items())
if summary != last_summary:
print(summary, flush=True)
last_summary = summary
bad = {ctx: state for ctx, state in states.items() if state in terminal_bad}
if bad:
print("FAIL: required CI context failed:", file=sys.stderr)
for ctx, state in bad.items():
desc = (latest.get(ctx) or {}).get("description") or ""
print(f" - {ctx}: {state} {desc}", file=sys.stderr)
sys.exit(1)
if all(state == "success" for state in states.values()):
print(f"OK: all {len(required)} required CI contexts succeeded")
sys.exit(0)
if time.time() >= deadline:
print("FAIL: timed out waiting for required CI contexts:", file=sys.stderr)
for ctx, state in states.items():
print(f" - {ctx}: {state}", file=sys.stderr)
sys.exit(1)
time.sleep(15)
PY
# `needs.*.result` is one of: success | failure | cancelled | skipped | null.
# We assert success per dep (not != failure) — see RFC §2 reasoning above.
# Null results are skipped: they come from Phase 3 (continue-on-error: true
# suppresses status) or from jobs still in-flight. The sentinel succeeds
# rather than blocking PRs on Phase 3 noise.
results='${{ toJSON(needs) }}'
echo "$results"
echo "$results" | python3 -c '
import json, sys
ns = json.load(sys.stdin)
# Phase 3 masked: jobs with continue-on-error: true may report "failure"
# Remove when mc#774 handler test failures are resolved.
PHASE3_MASKED = {"platform-build"}
# Exclude null (Phase 3 suppressed / in-flight) from the bad list.
bad = [(k, v.get("result")) for k, v in ns.items()
if v.get("result") not in ("success", None, "cancelled", "skipped") and k not in PHASE3_MASKED]
if bad:
print(f"FAIL: jobs not green:", file=sys.stderr)
for k, r in bad:
print(f" - {k}: {r}", file=sys.stderr)
sys.exit(1)
pending = [(k, v.get("result")) for k, v in ns.items()
if v.get("result") is None]
cancelled = [(k, v.get("result")) for k, v in ns.items()
if v.get("result") == "cancelled"]
if pending:
print(f"WARN: {len(pending)} job(s) still in-flight (result=null): " +
", ".join(k for k, _ in pending), file=sys.stderr)
if cancelled:
print(f"INFO: {len(cancelled)} job(s) masked by continue-on-error: " +
", ".join(k for k, _ in cancelled), file=sys.stderr)
print(f"OK: all {len(ns)} required jobs succeeded (or Phase-3 suppressed)")
'
-37
View File
@@ -69,13 +69,6 @@ name: E2E API Smoke Test
# 2318) shows Postgres ready in 3s, Redis in 1s, Platform in 1s when
# they DO come up. Timeouts are not the bottleneck; not bumped.
#
# Item #1046 (fixed 2026-05-14): Stale platform-server from cancelled runs
# lingers on :8080 after "Stop platform" step is skipped (workflow cancelled
# before reaching line 335). Added a pre-start "Kill stale platform-server"
# step (line 286) that scans /proc for zombie platform-server processes
# and kills them before the port probe or bind. Makes the ephemeral port
# probe + start sequence deterministic.
#
# Item explicitly NOT fixed here: failing test `Status back online`
# fails because the platform's langgraph workspace template image
# (ghcr.io/molecule-ai/workspace-template-langgraph:latest) returns
@@ -290,35 +283,6 @@ jobs:
echo "PORT=${PLATFORM_PORT}" >> "$GITHUB_ENV"
echo "BASE=http://127.0.0.1:${PLATFORM_PORT}" >> "$GITHUB_ENV"
echo "Platform host port: ${PLATFORM_PORT}"
- name: Kill stale platform-server before start (issue #1046)
if: needs.detect-changes.outputs.api == 'true'
run: |
# Concurrent runs on the same host-network act_runner can leave a
# zombie platform-server from a cancelled/timeout run. Cancelled
# runs never reach the "Stop platform" step (line 335), so the
# old process lingers. Kill it before the ephemeral port probe
# or start so the port is definitively free.
#
# /proc scan — works on any Linux without pkill/lsof/ss.
# comm field is truncated to 15 chars: "platform-serve" matches
# "platform-server". Verify with cmdline to avoid false positives.
killed=0
for pid in $(grep -l "platform-serve" /proc/[0-9]*/comm 2>/dev/null); do
kpid="${pid%/comm}"
kpid="${kpid##*/}"
cmdline=$(cat "/proc/${kpid}/cmdline" 2>/dev/null | tr '\0' ' ')
if echo "$cmdline" | grep -q "platform-server"; then
echo "Killing stale platform-server pid ${kpid}: ${cmdline}"
kill "$kpid" 2>/dev/null || true
killed=$((killed + 1))
fi
done
if [ "$killed" -gt 0 ]; then
sleep 2
echo "Killed $killed stale process(es); port(s) released."
else
echo "No stale platform-server found."
fi
- name: Start platform (background)
if: needs.detect-changes.outputs.api == 'true'
working-directory: workspace-server
@@ -382,4 +346,3 @@ jobs:
run: |
docker rm -f "$PG_CONTAINER" 2>/dev/null || true
docker rm -f "$REDIS_CONTAINER" 2>/dev/null || true
+3 -18
View File
@@ -97,7 +97,7 @@ jobs:
cache-dependency-path: workspace-server/go.sum
- if: needs.detect-changes.outputs.chat == 'true'
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
uses: actions/setup-node@60edb5dd545a775178f52524783378180af0d6f5 # v4
with:
node-version: '22'
cache: 'npm'
@@ -175,19 +175,6 @@ jobs:
echo "E2E_PLATFORM_URL=http://127.0.0.1:${PLATFORM_PORT}" >> "$GITHUB_ENV"
echo "Platform host port: ${PLATFORM_PORT}"
- name: Pick canvas port
if: needs.detect-changes.outputs.chat == 'true'
run: |
CANVAS_PORT=$(python3 - <<'PY'
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
print(s.getsockname()[1])
PY
)
echo "CANVAS_PORT=${CANVAS_PORT}" >> "$GITHUB_ENV"
echo "Canvas host port: ${CANVAS_PORT}"
- name: Start platform (background)
if: needs.detect-changes.outputs.chat == 'true'
working-directory: workspace-server
@@ -196,7 +183,6 @@ jobs:
export DATABASE_URL="${DATABASE_URL}"
export REDIS_URL="${REDIS_URL}"
export PORT="${PLATFORM_PORT}"
export CORS_ORIGINS="http://localhost:3000,http://localhost:3001,http://localhost:${CANVAS_PORT},http://127.0.0.1:${CANVAS_PORT}"
./platform-server > platform.log 2>&1 &
echo $! > platform.pid
@@ -230,10 +216,10 @@ jobs:
run: |
export NEXT_PUBLIC_PLATFORM_URL="http://127.0.0.1:${PLATFORM_PORT}"
export NEXT_PUBLIC_WS_URL="ws://127.0.0.1:${PLATFORM_PORT}/ws"
npx next dev --turbopack -p "${CANVAS_PORT}" > canvas.log 2>&1 &
npm run dev > canvas.log 2>&1 &
echo $! > canvas.pid
for i in $(seq 1 30); do
if curl -sf "http://localhost:${CANVAS_PORT}" > /dev/null 2>&1; then
if curl -sf http://localhost:3000 > /dev/null 2>&1; then
echo "Canvas up after ${i}s"
exit 0
fi
@@ -249,7 +235,6 @@ jobs:
run: |
export E2E_PLATFORM_URL="http://127.0.0.1:${PLATFORM_PORT}"
export E2E_DATABASE_URL="${DATABASE_URL}"
export PLAYWRIGHT_BASE_URL="http://localhost:${CANVAS_PORT}"
npx playwright test e2e/chat-desktop.spec.ts e2e/chat-mobile.spec.ts
- name: Dump platform log on failure
-225
View File
@@ -1,225 +0,0 @@
name: E2E Peer Visibility (literal MCP list_peers)
# WHY A DEDICATED WORKFLOW (not folded into e2e-staging-saas.yml)
# --------------------------------------------------------------
# This is the systemic fix for a real trust failure. Hermes and OpenClaw
# were reported "fleet-verified / cascade-complete" because the *proxy*
# signals were green (registry registration + heartbeat for Hermes; model
# round-trip 200 for OpenClaw). A freshly-provisioned workspace asked on
# canvas "can you see your peers" actually FAILS:
# - Hermes: 401 on the molecule MCP `list_peers` call
# - OpenClaw: native `sessions_list` fallback, sees no platform peers
# Tasks #142/#159 were even marked "completed" under this proxy flaw.
#
# A dedicated workflow (vs extending e2e-staging-saas.yml) because:
# - It must provision MULTIPLE distinct runtimes (hermes, openclaw,
# claude-code) in ONE org and assert each sees the others. The
# full-saas script is single-runtime-per-run (E2E_RUNTIME) and folding
# a multi-runtime matrix into it would conflate concerns and bloat its
# already-45-min run.
# - It needs its own concurrency group so it doesn't fight full-saas /
# canvas for the staging org-creation quota.
# - It needs an independent, non-required status-context name so it can
# be RED today (the in-flight Hermes-401 / OpenClaw-MCP-wiring fixes
# have not landed) WITHOUT wedging unrelated merges — and flipped to
# REQUIRED in one branch-protection edit once it goes green
# (flip-to-required checklist: molecule-core#1296).
#
# THE ASSERTION IS NOT A PROXY. The driving script
# tests/e2e/test_peer_visibility_mcp_staging.sh issues the byte-for-byte
# JSON-RPC `tools/call name=list_peers` envelope to `POST
# /workspaces/:id/mcp` using each workspace's OWN bearer token, through
# the real WorkspaceAuth + MCPRateLimiter middleware chain — the exact
# call mcp_molecule_list_peers makes from a canvas agent. It does NOT
# read a registry row, /health, the heartbeat table, or
# GET /registry/:id/peers.
#
# HONEST GATE — NO continue-on-error. Per feedback_fix_root_not_symptom a
# fake-green mask would defeat the entire purpose. This workflow goes red
# on today's broken behavior and green only when the root-cause fixes
# actually land. It is intentionally NOT in branch_protections — see PR
# body for the required-vs-not decision + flip tracking issue.
#
# Gitea 1.22.6 / act_runner notes honored:
# - No cross-repo `uses:` (feedback_gitea_cross_repo_uses_blocked). The
# actions/checkout SHA is the one e2e-staging-canvas.yml already uses
# successfully (a mirrored SHA — see #1277/PR#1292 root-cause).
# - Per-SHA concurrency, not global (feedback_concurrency_group_per_sha).
# - Workflow-level GITHUB_SERVER_URL pinned
# (feedback_act_runner_github_server_url).
# - pr-validate posts a status under the same check name so a
# workflow-only PR is not silently statusless and the context is
# flip-to-required-ready (mirrors e2e-staging-saas.yml's proven shape;
# real EC2-provisioning E2E is push/dispatch/cron only — it is 30+ min
# and cannot run per-PR-update).
on:
push:
branches: [main]
paths:
- 'workspace-server/internal/handlers/mcp.go'
- 'workspace-server/internal/handlers/mcp_tools.go'
- 'workspace-server/internal/middleware/**'
- 'workspace-server/internal/handlers/registry.go'
- 'workspace-server/internal/handlers/workspace.go'
- 'workspace/a2a_mcp_server.py'
- 'workspace/platform_tools/registry.py'
- 'tests/e2e/test_peer_visibility_mcp_staging.sh'
- '.gitea/workflows/e2e-peer-visibility.yml'
pull_request:
branches: [main]
paths:
- 'workspace-server/internal/handlers/mcp.go'
- 'workspace-server/internal/handlers/mcp_tools.go'
- 'workspace-server/internal/middleware/**'
- 'workspace-server/internal/handlers/registry.go'
- 'workspace-server/internal/handlers/workspace.go'
- 'workspace/a2a_mcp_server.py'
- 'workspace/platform_tools/registry.py'
- 'tests/e2e/test_peer_visibility_mcp_staging.sh'
- '.gitea/workflows/e2e-peer-visibility.yml'
workflow_dispatch:
schedule:
# 07:30 UTC daily — catches AMI / template-hermes / template-openclaw
# drift even on quiet days. Offset 30m from e2e-staging-saas (07:00)
# so the two don't collide on the staging org-creation quota.
- cron: '30 7 * * *'
concurrency:
# Per-SHA (feedback_concurrency_group_per_sha). A single global group
# would let a queued staging/main push behind a PR run get cancelled,
# leaving any gate that reads "completed run at SHA" stuck.
group: e2e-peer-visibility-${{ github.event.pull_request.head.sha || github.sha }}
cancel-in-progress: false
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
jobs:
# PR path: post a real status under the required-ready check name so a
# workflow-only PR is never silently statusless. The actual EC2 E2E is
# push/dispatch/cron only (30+ min). This is NOT a fake-green mask of
# the real assertion — it validates the driving script's bash syntax
# and inline-python so a broken test script fails at PR time.
pr-validate:
name: E2E Peer Visibility
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
timeout-minutes: 5
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Validate driving script
run: |
bash -n tests/e2e/test_peer_visibility_mcp_staging.sh
echo "test_peer_visibility_mcp_staging.sh — bash syntax OK"
echo "Real fresh-provision MCP list_peers E2E runs on push to"
echo "main / workflow_dispatch / daily cron (30+ min EC2 boot)."
# Real gate: provisions a throwaway org + sibling-per-runtime, drives
# the LITERAL list_peers MCP call per runtime, asserts 200 + expected
# peer set, then scoped teardown. push(main)/dispatch/cron only.
peer-visibility:
name: E2E Peer Visibility
runs-on: ubuntu-latest
if: github.event_name != 'pull_request'
timeout-minutes: 60
env:
MOLECULE_CP_URL: https://staging-api.moleculesai.app
MOLECULE_ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
# LLM provider key so each runtime can authenticate at boot.
# Priority MiniMax → direct-Anthropic → OpenAI matches
# test_staging_full_saas.sh's secrets-injection chain.
E2E_MINIMAX_API_KEY: ${{ secrets.MOLECULE_STAGING_MINIMAX_API_KEY }}
E2E_ANTHROPIC_API_KEY: ${{ secrets.MOLECULE_STAGING_ANTHROPIC_API_KEY }}
E2E_OPENAI_API_KEY: ${{ secrets.MOLECULE_STAGING_OPENAI_API_KEY }}
E2E_RUN_ID: "${{ github.run_id }}-${{ github.run_attempt }}"
PV_RUNTIMES: "hermes openclaw claude-code"
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Verify admin token present
run: |
if [ -z "$MOLECULE_ADMIN_TOKEN" ]; then
echo "::error::CP_STAGING_ADMIN_API_TOKEN secret not set (Railway staging CP_ADMIN_API_TOKEN)"
exit 2
fi
echo "Admin token present"
- name: Verify an LLM key present
run: |
if [ -z "${E2E_MINIMAX_API_KEY:-}" ] && [ -z "${E2E_ANTHROPIC_API_KEY:-}" ] && [ -z "${E2E_OPENAI_API_KEY:-}" ]; then
echo "::error::No LLM provider key set — workspaces fail at boot with 'No provider API key found'. Set MOLECULE_STAGING_MINIMAX_API_KEY (or ANTHROPIC / OPENAI)."
exit 2
fi
echo "LLM key present"
- name: CP staging health preflight
run: |
code=$(curl -sS -o /dev/null -w "%{http_code}" --max-time 10 "$MOLECULE_CP_URL/health")
if [ "$code" != "200" ]; then
echo "::error::Staging CP unhealthy (HTTP $code) — infra, not a workspace bug. Failing loud per feedback_fix_root_not_symptom."
exit 1
fi
echo "Staging CP healthy"
- name: Run fresh-provision peer-visibility E2E (literal MCP list_peers)
run: bash tests/e2e/test_peer_visibility_mcp_staging.sh
# Belt-and-braces scoped teardown: the script installs an EXIT/INT/
# TERM trap, but if the runner itself is cancelled the trap may not
# fire. This always() step deletes ONLY the e2e-pv-<run_id> org this
# run created — never a cluster-wide sweep
# (feedback_never_run_cluster_cleanup_tests_on_live_platform). The
# admin DELETE is idempotent so double-invoking is safe;
# sweep-stale-e2e-orgs is the final net (slug starts with 'e2e-').
- name: Teardown safety net (runs on cancel/failure)
if: always()
env:
ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
run: |
set +e
orgs=$(curl -sS "$MOLECULE_CP_URL/cp/admin/orgs?limit=500" \
-H "Authorization: Bearer $ADMIN_TOKEN" 2>/dev/null \
| python3 -c "
import json, sys, os, datetime
run_id = os.environ.get('GITHUB_RUN_ID', '')
try:
d = json.load(sys.stdin)
except Exception:
print(''); sys.exit(0)
# ONLY sweep slugs from THIS run. e2e-pv-<YYYYMMDD>-<run_id>-...
# Sweep today AND yesterday's UTC date so a midnight-crossing run
# still matches its own slug (same bug class as the saas/canvas
# safety nets).
today = datetime.date.today()
yest = today - datetime.timedelta(days=1)
dates = (today.strftime('%Y%m%d'), yest.strftime('%Y%m%d'))
if run_id:
prefixes = tuple(f'e2e-pv-{dt}-{run_id}-' for dt in dates)
else:
prefixes = tuple(f'e2e-pv-{dt}-' for dt in dates)
orgs = d if isinstance(d, list) else d.get('orgs', [])
cands = [o['slug'] for o in orgs
if any(o.get('slug','').startswith(p) for p in prefixes)
and o.get('instance_status') not in ('purged',)]
print('\n'.join(cands))
" 2>/dev/null)
for slug in $orgs; do
echo "Safety-net teardown: $slug"
set +e
curl -sS -o /tmp/pv-cleanup.out -w "%{http_code}" \
-X DELETE "$MOLECULE_CP_URL/cp/admin/tenants/$slug" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"confirm\":\"$slug\"}" >/tmp/pv-cleanup.code
set -e
code=$(cat /tmp/pv-cleanup.code 2>/dev/null || echo "000")
if [ "$code" = "200" ] || [ "$code" = "204" ]; then
echo "[teardown] deleted $slug (HTTP $code)"
else
echo "::warning::pv teardown for $slug returned HTTP $code — sweep-stale-e2e-orgs will catch it within MAX_AGE_MINUTES. Body: $(head -c 300 /tmp/pv-cleanup.out 2>/dev/null)"
fi
done
exit 0
+11 -27
View File
@@ -83,41 +83,25 @@ jobs:
REPO: ${{ github.repository }}
run: |
set -euo pipefail
# Fetch all open PRs and run gate-check on each. This scheduled
# refresher is advisory; a transient Gitea list timeout must not turn
# main red. PR-specific gate-check runs still use normal failure
# semantics.
# Fetch all open PRs and run gate-check on each
# socket.setdefaulttimeout(15): defence-in-depth for missing SOP_TIER_CHECK_TOKEN.
# gate_check.py uses timeout=15 on every urlopen call; this catches the
# inline Python polling loop too (issue #603).
pr_numbers=$(python3 <<'PY'
import json
import os
import socket
import sys
import time
import urllib.error
import urllib.request
socket.setdefaulttimeout(30)
socket.setdefaulttimeout(15)
token = os.environ["GITEA_TOKEN"]
repo = os.environ["REPO"]
url = f"https://git.moleculesai.app/api/v1/repos/{repo}/pulls?state=open&limit=100"
last_error = None
for attempt in range(1, 4):
req = urllib.request.Request(
url,
headers={"Authorization": f"token {token}", "Accept": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=30) as r:
prs = json.loads(r.read())
break
except (TimeoutError, OSError, urllib.error.URLError, urllib.error.HTTPError) as exc:
last_error = exc
print(f"warning: PR list fetch attempt {attempt}/3 failed: {exc}", file=sys.stderr)
if attempt < 3:
time.sleep(2 * attempt)
else:
print(f"warning: skipped scheduled gate-check refresh; failed to list open PRs after 3 attempts: {last_error}", file=sys.stderr)
raise SystemExit(0)
req = urllib.request.Request(
f"https://git.moleculesai.app/api/v1/repos/{repo}/pulls?state=open&limit=100",
headers={"Authorization": f"token {token}", "Accept": "application/json"},
)
with urllib.request.urlopen(req) as r:
prs = json.loads(r.read())
for pr in prs:
print(pr["number"])
PY
@@ -86,11 +86,7 @@ jobs:
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
# A full-history checkout can exceed the runner's quiet/startup
# window before the path filter emits logs. Fetch the common push
# case cheaply; the script below fetches the exact BASE SHA if it is
# not present in the shallow checkout.
fetch-depth: 2
fetch-depth: 0
- id: filter
# Inline replacement for dorny/paths-filter — see e2e-api.yml.
run: |
@@ -93,7 +93,7 @@ jobs:
lint:
name: lint-continue-on-error-tracking
runs-on: ubuntu-latest
timeout-minutes: 20
timeout-minutes: 10
# Phase 3 (RFC #219 §1): surface masked defects without blocking
# PRs. Pre-existing continue-on-error: true directives on main
# all violate this lint at first — intentional. Flip to false
+7 -11
View File
@@ -49,17 +49,13 @@ jobs:
# bp-exempt: post-merge image publication side effect; CI / all-required gates source changes.
build-and-push:
name: Build & push canvas image
# Dedicated publish/release lane (internal#462 / #394 / #399). Ship
# path (on: push:main, canvas/**) — reserved capacity so a merged
# canvas fix's image build never FIFO-queues behind PR required-CI.
# The `publish` label resolves ONLY to the molecule-runner-publish-*
# sub-pool (config.publish.yaml). HARD DEPENDENCY: this MUST land
# AFTER the publish-lane runners are registered/advertising `publish`
# — the earlier #599 `docker` label attempt queued indefinitely with
# zero eligible runners precisely because the label was targeted
# before any runner advertised it (see #576). The lane is registered
# in this rollout (internal#462) so the precondition holds.
runs-on: publish
# REVERTED (infra/revert-docker-runner-label): `runs-on: ubuntu-latest` restored.
# The `docker` label is not registered on any act_runner. `runs-on: [ubuntu-latest, docker]`
# causes jobs to queue indefinitely with zero eligible runners — strictly worse than the
# pre-#599 coin-flip (50% success rate). Once the `docker` label is registered on
# ≥2 runners, re-apply the fix from #599 (infra/docker-runner-label).
# See issue #576 + infra-lead pulse ~00:30Z.
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
+2 -8
View File
@@ -66,10 +66,7 @@ concurrency:
jobs:
publish:
# Dedicated publish/release lane (internal#462 / #394 / #399). Ship
# path (on: push tag runtime-v*) — reserved capacity, never FIFO
# behind PR-CI. `publish` resolves only to molecule-runner-publish-*.
runs-on: publish
runs-on: ubuntu-latest
outputs:
version: ${{ steps.version.outputs.version }}
wheel_sha256: ${{ steps.wheel_hash.outputs.wheel_sha256 }}
@@ -162,7 +159,6 @@ jobs:
exit 1
fi
python -m twine upload \
--verbose \
--repository pypi \
--username __token__ \
--password "$PYPI_TOKEN" \
@@ -170,9 +166,7 @@ jobs:
cascade:
needs: publish
# Publish/release lane (internal#462) — downstream of the runtime
# publish ship job; keep it on the reserved lane too.
runs-on: publish
runs-on: ubuntu-latest
steps:
- name: Wait for PyPI to propagate the new version
env:
@@ -54,14 +54,7 @@ env:
jobs:
build-and-push:
# Dedicated publish/release lane (internal#462 / #394 / #399). This
# is a post-merge ship job (on: push:main) — it must NOT FIFO-compete
# with PR required-CI on the shared pool (PR#1350's prod image build
# was delayed ~25min this way). The `publish` label resolves ONLY to
# the reserved molecule-runner-publish-* sub-pool (config.publish.yaml,
# OUTSIDE the managed 1..20 range) so a merged fix's image build
# starts immediately while PR-CI keeps the general pool.
runs-on: publish
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -188,9 +181,7 @@ jobs:
name: Production auto-deploy
needs: build-and-push
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }}
# Publish/release lane (internal#462) — production deploy of a merged
# fix; reserved capacity, never queued behind PR-CI.
runs-on: publish
runs-on: ubuntu-latest
timeout-minutes: 75
env:
CP_URL: ${{ vars.PROD_CP_URL || 'https://api.moleculesai.app' }}
@@ -68,10 +68,7 @@ jobs:
# bp-exempt: production redeploy is a side-effect workflow, not a merge gate.
redeploy:
if: ${{ github.event_name == 'workflow_dispatch' }}
# Dedicated publish/release lane (internal#462 / #394 / #399).
# Production tenant redeploy — a deploy action, reserved capacity so
# it never queues behind PR-CI. `publish` -> molecule-runner-publish-*.
runs-on: publish
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
@@ -75,10 +75,7 @@ env:
jobs:
# bp-exempt: post-merge staging redeploy side effect; CI / all-required gates source changes.
redeploy:
# Dedicated publish/release lane (internal#462 / #394 / #399).
# Post-merge staging redeploy — a deploy action, reserved capacity.
# `publish` -> molecule-runner-publish-* sub-pool.
runs-on: publish
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
@@ -18,10 +18,6 @@ permissions:
pull-requests: read
statuses: write
concurrency:
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.issue.number || github.ref }}
cancel-in-progress: true
jobs:
dispatch:
runs-on: ubuntu-latest
+1 -1
View File
@@ -70,7 +70,7 @@ name: sop-checklist
# Cancel any in-progress runs for the same PR to prevent
# stale runs from overwriting newer status contexts.
concurrency:
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.pull_request.number || github.event.issue.number || github.ref }}
group: ${{ github.repository }}-${{ github.event.pull_request.number }}
cancel-in-progress: true
# bp-required: yes ← emits sop-checklist / all-items-acked (pull_request)
-4
View File
@@ -61,10 +61,6 @@ on:
pull_request_review:
types: [submitted, dismissed, edited]
concurrency:
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
tier-check:
runs-on: ubuntu-latest
+1 -1
View File
@@ -1 +1 @@
staging trigger 2026-05-14T17:35:02Z
staging trigger
-1
View File
@@ -1 +0,0 @@
trigger
+1 -1
View File
@@ -8,7 +8,7 @@ export default defineConfig({
workers: 1,
retries: 0,
use: {
baseURL: process.env.PLAYWRIGHT_BASE_URL || "http://localhost:3000",
baseURL: "http://localhost:3000",
headless: true,
screenshot: "only-on-failure",
},
@@ -24,12 +24,8 @@ vi.mock("@/lib/theme-provider", () => ({
})),
}));
// Wrap cleanup in act() so any pending React state updates (e.g. from
// keyDown handlers that call setTheme) flush before DOM unmount. Without
// this, cleanup() can race against pending renders and cause INDEX_SIZE_ERR
// when the handleKeyDown callback tries to query the DOM mid-teardown.
afterEach(() => {
act(() => { cleanup(); });
cleanup();
vi.clearAllMocks();
});
@@ -150,7 +146,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
const radios = screen.getAllByRole("radio");
// dark (index 2) is current; ArrowRight should wrap to light (index 0)
act(() => { radios[2].focus(); });
act(() => { fireEvent.keyDown(radios[2], { key: "ArrowRight" }); });
fireEvent.keyDown(radios[2], { key: "ArrowRight" });
expect(mockSetTheme).toHaveBeenCalledWith("light");
});
@@ -164,7 +160,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
const radios = screen.getAllByRole("radio");
// light (index 0) is current; ArrowLeft should go to dark (index 2)
act(() => { radios[0].focus(); });
act(() => { fireEvent.keyDown(radios[0], { key: "ArrowLeft" }); });
fireEvent.keyDown(radios[0], { key: "ArrowLeft" });
expect(mockSetTheme).toHaveBeenCalledWith("dark");
});
@@ -178,7 +174,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
const radios = screen.getAllByRole("radio");
// light (index 0) is current; ArrowDown should go to system (index 1)
act(() => { radios[0].focus(); });
act(() => { fireEvent.keyDown(radios[0], { key: "ArrowDown" }); });
fireEvent.keyDown(radios[0], { key: "ArrowDown" });
expect(mockSetTheme).toHaveBeenCalledWith("system");
});
@@ -191,7 +187,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
render(<ThemeToggle />);
const radios = screen.getAllByRole("radio");
act(() => { radios[2].focus(); });
act(() => { fireEvent.keyDown(radios[2], { key: "Home" }); });
fireEvent.keyDown(radios[2], { key: "Home" });
expect(mockSetTheme).toHaveBeenCalledWith("light");
});
@@ -204,14 +200,14 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
render(<ThemeToggle />);
const radios = screen.getAllByRole("radio");
act(() => { radios[0].focus(); });
act(() => { fireEvent.keyDown(radios[0], { key: "End" }); });
fireEvent.keyDown(radios[0], { key: "End" });
expect(mockSetTheme).toHaveBeenCalledWith("dark");
});
it("does nothing on unrelated keys", () => {
render(<ThemeToggle />);
const radios = screen.getAllByRole("radio");
act(() => { fireEvent.keyDown(radios[0], { key: "Enter" }); });
fireEvent.keyDown(radios[0], { key: "Enter" });
expect(mockSetTheme).not.toHaveBeenCalled();
});
});
+17 -64
View File
@@ -2,11 +2,8 @@
// 04 · Chat — message thread + composer + sub-tabs.
// Wired to the same /workspaces/:id/a2a (method message/send) endpoint
// that the desktop ChatTab uses. Render parity with desktop ChatTab is
// achieved by reusing its renderers rather than forking a reduced
// mobile path: the Agent Comms sub-tab mounts the same AgentCommsPanel,
// and message attachments route through the same AttachmentPreview
// dispatch the desktop My-Chat bubble uses (#231/#232).
// that the desktop ChatTab uses, but with a slimmer surface: no
// attachments, no A2A topology overlay, no conversation tracing.
import { useEffect, useMemo, useRef, useState } from "react";
import ReactMarkdown from "react-markdown";
@@ -19,9 +16,6 @@ import {
useChatSend,
useChatSocket,
} from "@/components/tabs/chat/hooks";
import { AgentCommsPanel } from "@/components/tabs/chat/AgentCommsPanel";
import { AttachmentPreview } from "@/components/tabs/chat/AttachmentPreview";
import { downloadChatFile } from "@/components/tabs/chat/uploads";
import { toMobileAgent } from "./components";
import { MOBILE_FONT_MONO, MOBILE_FONT_SANS, usePalette } from "./palette";
@@ -310,17 +304,6 @@ export function MobileChat({
const removePendingFile = (index: number) =>
setPendingFiles((prev) => prev.filter((_, i) => i !== index));
// Route attachment downloads through the same authenticated helper
// the desktop ChatTab uses (downloadChatFile) so platform-scheme
// URIs get a real Blob with auth headers instead of about:blank.
const downloadAttachment = (att: ChatAttachment) => {
downloadChatFile(agentId, att).catch(() => {
// AttachmentPreview's own error affordance covers the in-bubble
// failure state; matches ChatTab's behaviour of not double-
// reporting a download failure.
});
};
const send = async () => {
const text = draft.trim();
if ((!text && pendingFiles.length === 0) || sending || !reachable) return;
@@ -450,19 +433,7 @@ export function MobileChat({
</div>
</div>
{/* Agent Comms — reuse the desktop AgentCommsPanel verbatim so
mobile renders the identical peer/A2A + delegation feed
(history GET + live socket events) instead of a placeholder
(#231). The panel owns its own scroll/load/error/empty
states, matching ChatTab's agent-comms tabpanel. */}
{tab === "a2a" && (
<div style={{ flex: 1, minHeight: 0, overflow: "hidden" }}>
<AgentCommsPanel workspaceId={agentId} />
</div>
)}
{/* Messages */}
{tab === "my" && (
<div
ref={scrollRef}
style={{
@@ -474,6 +445,18 @@ export function MobileChat({
gap: 8,
}}
>
{tab === "a2a" && (
<div
style={{
padding: "20px 4px",
textAlign: "center",
color: p.text3,
fontSize: 13,
}}
>
Agent Comms peer-to-peer A2A traffic surfaces in the Comms tab.
</div>
)}
{tab === "my" && historyLoading && (
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
Loading chat history
@@ -538,31 +521,9 @@ export function MobileChat({
overflowWrap: "anywhere",
}}
>
{m.content && (
<MarkdownBubble dark={dark} accent={p.accent}>
{m.content}
</MarkdownBubble>
)}
{m.attachments && m.attachments.length > 0 && (
<div
style={{
display: "flex",
flexWrap: "wrap",
gap: 4,
marginTop: m.content ? 6 : 0,
}}
>
{m.attachments.map((att, i) => (
<AttachmentPreview
key={`${m.id}-${i}`}
workspaceId={agentId}
attachment={att}
onDownload={downloadAttachment}
tone={mine ? "user" : "agent"}
/>
))}
</div>
)}
<MarkdownBubble dark={dark} accent={p.accent}>
{m.content}
</MarkdownBubble>
<div
style={{
fontSize: 10,
@@ -593,13 +554,7 @@ export function MobileChat({
</div>
)}
</div>
)}
{/* Footer ID + composer belong to My Chat only. The Agent Comms
tab is a read-only peer/A2A feed (parity with desktop
ChatTab, where the agent-comms tabpanel has no composer). */}
{tab === "my" && (
<>
{/* Footer ID */}
<div
style={{
@@ -791,8 +746,6 @@ export function MobileChat({
</button>
</div>
</div>
</>
)}
</div>
);
}
+4 -6
View File
@@ -12,7 +12,6 @@ import { useEffect, useState } from "react";
import { api } from "@/lib/api";
import { type Template } from "@/lib/deploy-preflight";
import { isSaaSTenant } from "@/lib/tenant";
import { tierCode } from "./palette";
import { MOBILE_FONT_MONO, MOBILE_FONT_SANS, type MobilePalette, usePalette } from "./palette";
@@ -27,7 +26,6 @@ const TIER_LABEL: Record<"T1" | "T2" | "T3" | "T4", string> = {
export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => void }) {
const p = usePalette(dark);
const isSaaS = isSaaSTenant();
const [templates, setTemplates] = useState<Template[]>([]);
const [loadingTemplates, setLoadingTemplates] = useState(true);
const [tplId, setTplId] = useState<string | null>(null);
@@ -45,7 +43,7 @@ export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => v
setTemplates(list);
if (list.length > 0) {
setTplId(list[0].id);
setTier(isSaaS ? "T4" : tierCode(list[0].tier));
setTier(tierCode(list[0].tier));
}
})
.catch(() => {
@@ -57,7 +55,7 @@ export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => v
return () => {
cancelled = true;
};
}, [isSaaS]);
}, []);
const handleSpawn = async () => {
if (busy || !tplId) return;
@@ -69,7 +67,7 @@ export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => v
await api.post<{ id: string }>("/workspaces", {
name: (name.trim() || chosen.name),
template: chosen.id,
tier: isSaaS ? 4 : Number(tier.slice(1)),
tier: Number(tier.slice(1)),
canvas: {
x: Math.random() * 400 + 100,
y: Math.random() * 300 + 100,
@@ -205,7 +203,7 @@ export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => v
>
{templates.map((t) => {
const on = tplId === t.id;
const tCode = isSaaS ? "T4" : tierCode(t.tier);
const tCode = tierCode(t.tier);
return (
<button
key={t.id}
@@ -21,14 +21,6 @@ import { MobileChat } from "../MobileChat";
vi.mock("@/lib/api");
import { api } from "@/lib/api";
// AgentCommsPanel (mounted by the Agent Comms sub-tab, #231) subscribes
// to the global socket via useSocketEvent. Stub it to a no-op so the
// panel mounts without the real ReconnectingSocket — the parity tests
// only assert the panel renders (vs the old static placeholder).
vi.mock("@/hooks/useSocketEvent", () => ({
useSocketEvent: vi.fn(),
}));
// ─── Mock store ───────────────────────────────────────────────────────────────
const mockAgentId = "ws-chat-test";
@@ -163,12 +155,6 @@ beforeEach(() => {
mockOnBack.mockClear();
mockStoreState.nodes = [];
mockStoreState.agentMessages = {};
// jsdom doesn't implement scrollIntoView. The Agent Comms tab now
// mounts AgentCommsPanel (#231), which scrolls its feed to bottom on
// arrival; a no-op stub keeps the panel from throwing under jsdom
// (same stub AgentCommsPanel's own render test installs).
Element.prototype.scrollIntoView =
vi.fn() as unknown as Element["scrollIntoView"];
// Set up spies on the real api methods. Tests override these per-call.
const getSpy = vi.spyOn(api, "get");
const postSpy = vi.spyOn(api, "post");
@@ -488,146 +474,3 @@ describe("MobileChat — chat history", () => {
expect(getSpy).toHaveBeenCalledTimes(2);
});
});
// ─── #232 · Attachment render parity with desktop ChatTab ────────────────────
//
// Regression for the CTO-reported mobile bug: MobileChat used to render
// only m.content (no attachment surface), so files sent/received in a
// conversation were invisible on mobile while desktop showed them. The
// fix routes m.attachments through the same AttachmentPreview the
// desktop ChatTab bubble uses.
describe("MobileChat — attachment render parity (#232)", () => {
beforeEach(() => {
mockStoreState.nodes = [onlineNode];
});
it("renders an attachment from a history message via AttachmentPreview", async () => {
const getSpy = vi.spyOn(api, "get");
// useChatHistory reads { messages, reached_end }.
getSpy.mockResolvedValueOnce({
messages: [
{
id: "m-att-1",
role: "agent",
content: "Here is the report",
attachments: [
{
name: "report.csv",
uri: "workspace://out/report.csv",
mimeType: "text/csv",
size: 2048,
},
],
timestamp: new Date().toISOString(),
},
],
reached_end: true,
});
let rr: ReturnType<typeof renderChat>;
await act(async () => {
rr = renderChat(mockAgentId);
});
const { container } = rr!;
// A non-image attachment renders the AttachmentChip download button
// with title="Download <name>" — same component the desktop bubble
// dispatches through AttachmentPreview.
await waitFor(() => {
const chip = container.querySelector('[title="Download report.csv"]');
expect(chip).toBeTruthy();
});
expect(container.textContent ?? "").toContain("report.csv");
});
});
// ─── #231 · Agent Comms (A2A/peer) render parity with desktop ChatTab ────────
//
// Regression for the CTO-reported mobile bug: the Agent Comms sub-tab
// rendered a static placeholder string ("peer-to-peer A2A traffic
// surfaces in the Comms tab") instead of the real feed. The fix mounts
// the same AgentCommsPanel the desktop ChatTab agent-comms tabpanel
// uses, so peer/A2A + delegation activity is visible on mobile.
describe("MobileChat — Agent Comms render parity (#231)", () => {
beforeEach(() => {
mockStoreState.nodes = [onlineNode];
});
it("mounts AgentCommsPanel on the Agent Comms tab (not the old placeholder)", async () => {
const getSpy = vi.spyOn(api, "get");
// 1st GET: useChatHistory (My Chat) on mount.
getSpy.mockResolvedValueOnce({ messages: [], reached_end: true });
// 2nd GET: AgentCommsPanel's activity load when the tab is shown.
// Empty list → panel renders its own empty state, which still
// proves AgentCommsPanel mounted (vs. the removed placeholder).
getSpy.mockResolvedValueOnce([]);
let rr: ReturnType<typeof renderChat>;
await act(async () => {
rr = renderChat(mockAgentId);
});
const { container } = rr!;
const commsTab = Array.from(container.querySelectorAll("button")).find(
(b) => b.textContent?.trim() === "Agent Comms",
);
expect(commsTab).toBeTruthy();
await act(async () => {
commsTab!.click();
});
await waitFor(() => {
const text = container.textContent ?? "";
// The panel's empty state — proves AgentCommsPanel mounted.
expect(text).toContain("No agent-to-agent communications yet.");
});
// The old hard-coded placeholder must be gone.
expect(container.textContent ?? "").not.toContain(
"peer-to-peer A2A traffic surfaces in the Comms tab",
);
// The panel hit its activity endpoint.
expect(getSpy).toHaveBeenCalledWith(
expect.stringContaining(`/workspaces/${mockAgentId}/activity`),
);
});
it("renders a peer message on the Agent Comms tab", async () => {
const getSpy = vi.spyOn(api, "get");
getSpy.mockResolvedValueOnce({ messages: [], reached_end: true });
// a2a_receive from a peer → AgentCommsPanel.toCommMessage maps it
// to an inbound bubble with the request text.
getSpy.mockResolvedValueOnce([
{
id: "act-1",
activity_type: "a2a_receive",
source_id: "peer-ws-uuid",
target_id: mockAgentId,
method: "message/send",
summary: "peer asked something",
request_body: { task: "Please review PR 42" },
response_body: null,
status: "ok",
created_at: new Date().toISOString(),
},
]);
let rr: ReturnType<typeof renderChat>;
await act(async () => {
rr = renderChat(mockAgentId);
});
const { container } = rr!;
const commsTab = Array.from(container.querySelectorAll("button")).find(
(b) => b.textContent?.trim() === "Agent Comms",
);
await act(async () => {
commsTab!.click();
});
await waitFor(() => {
expect(container.textContent ?? "").toContain("Please review PR 42");
});
});
});
@@ -1,102 +0,0 @@
import { describe, it, expect, beforeEach } from "vitest";
import { useCanvasStore } from "@/store/canvas";
import { resolveWorkspaceName } from "../hooks/resolveWorkspaceName";
beforeEach(() => {
// Reset store to a clean slate between tests so node lookup is deterministic.
useCanvasStore.setState({ nodes: [] });
});
describe("resolveWorkspaceName", () => {
it("returns the workspace name when a node with that ID exists", () => {
useCanvasStore.setState({
nodes: [
{
id: "ws-alpha-001",
type: "workspace",
data: { name: "Alpha Agent" },
position: { x: 0, y: 0 },
},
],
});
expect(resolveWorkspaceName("ws-alpha-001")).toBe("Alpha Agent");
});
it("falls back to the first 8 chars of the ID when no matching node exists", () => {
expect(resolveWorkspaceName("ws-zzz-not-found")).toBe("ws-zzz-n");
});
it("falls back to the first 8 chars when the node exists but has no name", () => {
useCanvasStore.setState({
nodes: [
{
id: "ws-no-name",
type: "workspace",
// data.name is deliberately absent
data: {},
position: { x: 0, y: 0 },
},
],
});
expect(resolveWorkspaceName("ws-no-name")).toBe("ws-no-na");
});
it("returns the first 8 chars for a very short ID", () => {
expect(resolveWorkspaceName("ab")).toBe("ab");
});
it("returns the first 8 chars when the ID is exactly 8 characters", () => {
// slice(0,8) of an 8-char string is the full string
const id = "12345678";
expect(resolveWorkspaceName(id)).toBe(id);
});
it("picks the right node when multiple workspaces share a prefix", () => {
useCanvasStore.setState({
nodes: [
{
id: "00000000-0000-0000-0000-000000000001",
type: "workspace",
data: { name: "Backend Agent" },
position: { x: 0, y: 0 },
},
{
id: "00000000-0000-0000-0000-000000000002",
type: "workspace",
data: { name: "Frontend Agent" },
position: { x: 100, y: 0 },
},
],
});
expect(resolveWorkspaceName("00000000-0000-0000-0000-000000000002")).toBe(
"Frontend Agent"
);
expect(resolveWorkspaceName("00000000-0000-0000-0000-000000000001")).toBe(
"Backend Agent"
);
});
it("does not mutate store state between calls", () => {
useCanvasStore.setState({
nodes: [
{
id: "stable-id",
type: "workspace",
data: { name: "Stable Workspace" },
position: { x: 0, y: 0 },
},
],
});
resolveWorkspaceName("stable-id");
resolveWorkspaceName("unknown-id");
// Store nodes must be unchanged — resolveWorkspaceName is read-only.
const nodes = useCanvasStore.getState().nodes;
expect(nodes).toHaveLength(1);
expect((nodes[0] as { id: string }).id).toBe("stable-id");
});
});
@@ -0,0 +1,209 @@
// @vitest-environment jsdom
/**
* Tests for useChatSend — the canvas user→agent send hook.
*
* Behavioural focus: the poll-mode ("queued") path. When the target
* workspace is an external / MCP-registered agent (delivery_mode=poll,
* e.g. an operator laptop running the molecule MCP channel), the
* platform's POST /workspaces/:id/a2a returns a synthetic
* {status:"queued", delivery_mode:"poll"} envelope IMMEDIATELY with no
* reply — the real reply arrives later over the AGENT_MESSAGE
* WebSocket push.
*
* Pre-fix the hook treated that synthetic envelope as a terminal
* response and called releaseSendGuards() → `sending` went false the
* instant the POST returned → the "agent is working" indicator
* vanished and the external turn looked dead. This suite pins the
* fixed contract:
*
* - a real reply still clears `sending` (regression guard)
* - a poll "queued" envelope KEEPS `sending` true (no terminal
* clear) so the existing thinking indicator persists
* - the eventual reply path (releaseSendGuards, the same call the
* AGENT_MESSAGE WS push makes via useChatSocket) clears it
* - an offline poll agent that never replies eventually surfaces an
* honest error instead of an infinite spinner
*
* Plus pure-function coverage for the poll-envelope detector.
*
* Root cause: workspace-server a2a_proxy.go:402 poll-mode
* short-circuit returns {status:"queued"} synchronously.
*/
import {
describe,
it,
expect,
vi,
beforeEach,
afterEach,
type Mock,
} from "vitest";
import { act, renderHook, cleanup } from "@testing-library/react";
const { mockApiPost } = vi.hoisted(() => ({ mockApiPost: vi.fn() }));
vi.mock("@/lib/api", () => ({
api: { post: mockApiPost },
}));
vi.mock("../uploads", () => ({
uploadChatFiles: vi.fn(),
}));
// Import AFTER mocks.
import {
useChatSend,
isPollQueuedResponse,
extractReplyText,
POLL_QUEUED_REPLY_TIMEOUT_MS,
} from "../useChatSend";
const flush = () => act(async () => { await Promise.resolve(); });
describe("isPollQueuedResponse", () => {
it("is true only for the synthetic poll-mode queued envelope", () => {
expect(isPollQueuedResponse({ status: "queued", delivery_mode: "poll" })).toBe(true);
});
it("is false for a real agent reply", () => {
expect(
isPollQueuedResponse({ result: { parts: [{ kind: "text", text: "hi" }] } }),
).toBe(false);
});
it("is false for null / undefined / partial shapes", () => {
expect(isPollQueuedResponse(null)).toBe(false);
expect(isPollQueuedResponse(undefined)).toBe(false);
// status=queued without delivery_mode=poll is NOT the poll envelope
// — don't accidentally swallow a real reply that happens to carry
// an unrelated status field.
expect(isPollQueuedResponse({ status: "queued" })).toBe(false);
expect(isPollQueuedResponse({ delivery_mode: "poll" })).toBe(false);
});
});
describe("extractReplyText (regression guard — unchanged by fix)", () => {
it("collects text parts from result", () => {
expect(
extractReplyText({ result: { parts: [{ kind: "text", text: "hello" }] } }),
).toBe("hello");
});
it("returns empty for the poll-queued envelope", () => {
expect(extractReplyText({ status: "queued", delivery_mode: "poll" })).toBe("");
});
});
describe("useChatSend — poll-mode in-progress state", () => {
beforeEach(() => {
vi.useFakeTimers();
mockApiPost.mockReset();
});
afterEach(() => {
vi.runOnlyPendingTimers();
vi.useRealTimers();
cleanup();
});
const setup = () => {
const onUserMessage = vi.fn();
const onAgentMessage = vi.fn();
const { result } = renderHook(() =>
useChatSend("ws-ext-1", {
getHistoryMessages: () => [],
onUserMessage,
onAgentMessage,
}),
);
return { result, onUserMessage, onAgentMessage };
};
it("a real reply clears `sending` (regression guard)", async () => {
mockApiPost.mockResolvedValue({
result: { parts: [{ kind: "text", text: "real reply" }] },
});
const { result, onAgentMessage } = setup();
await act(async () => {
void result.current.sendMessage("hi");
});
await flush();
expect(onAgentMessage).toHaveBeenCalledTimes(1);
expect(result.current.sending).toBe(false);
});
it("keeps `sending` true on a poll 'queued' envelope (no terminal clear)", async () => {
mockApiPost.mockResolvedValue({ status: "queued", delivery_mode: "poll" });
const { result, onAgentMessage } = setup();
await act(async () => {
void result.current.sendMessage("hi external agent");
});
await flush();
// The POST resolved, but it was only a queued ack — the indicator
// must stay up and no agent bubble should be rendered yet.
expect(result.current.sending).toBe(true);
expect(onAgentMessage).not.toHaveBeenCalled();
expect(result.current.error).toBeNull();
});
it("releaseSendGuards (the AGENT_MESSAGE-push path) clears the poll in-progress state", async () => {
mockApiPost.mockResolvedValue({ status: "queued", delivery_mode: "poll" });
const { result } = setup();
await act(async () => {
void result.current.sendMessage("hi");
});
await flush();
expect(result.current.sending).toBe(true);
// Simulate the terminal AGENT_MESSAGE WebSocket push arriving:
// useChatSocket's onAgentMessage / onSendComplete call
// releaseSendGuards. That must clear the in-progress state AND the
// safety timer (asserted by the next test).
act(() => {
result.current.releaseSendGuards();
});
expect(result.current.sending).toBe(false);
});
it("surfaces an honest error if a poll agent never replies (safety timeout)", async () => {
mockApiPost.mockResolvedValue({ status: "queued", delivery_mode: "poll" });
const { result } = setup();
await act(async () => {
void result.current.sendMessage("hi");
});
await flush();
expect(result.current.sending).toBe(true);
act(() => {
vi.advanceTimersByTime(POLL_QUEUED_REPLY_TIMEOUT_MS + 1000);
});
expect(result.current.sending).toBe(false);
expect(result.current.error).toMatch(/queued/i);
});
it("does NOT fire the safety error when the reply arrives before timeout", async () => {
mockApiPost.mockResolvedValue({ status: "queued", delivery_mode: "poll" });
const { result } = setup();
await act(async () => {
void result.current.sendMessage("hi");
});
await flush();
// Reply arrives (releaseSendGuards) well before the timeout.
act(() => {
result.current.releaseSendGuards();
});
act(() => {
vi.advanceTimersByTime(POLL_QUEUED_REPLY_TIMEOUT_MS + 1000);
});
expect(result.current.error).toBeNull();
expect(result.current.sending).toBe(false);
});
});
@@ -1,6 +1,6 @@
"use client";
import { useCallback, useRef, useState } from "react";
import { useCallback, useEffect, useRef, useState } from "react";
import { api } from "@/lib/api";
import { uploadChatFiles } from "../uploads";
import { createMessage, type ChatMessage, type ChatAttachment } from "../types";
@@ -22,8 +22,42 @@ interface A2AResponse {
parts?: A2APart[];
artifacts?: Array<{ parts: A2APart[] }>;
};
/** Synthetic poll-mode envelope. The platform returns this
* immediately (HTTP 200) when the target workspace is registered
* delivery_mode=poll — an external / MCP-registered agent with no
* public URL (e.g. an operator's laptop running the molecule MCP
* channel). The request has only been QUEUED into activity_logs;
* the agent will pick it up on its next poll and the real reply
* arrives asynchronously over the AGENT_MESSAGE WebSocket push
* (consumed by useChatSocket). See workspace-server
* a2a_proxy.go:402 (poll-mode short-circuit) and
* a2a_proxy_helpers.go:516 (logA2AReceiveQueued). */
status?: string;
delivery_mode?: string;
}
/** True when `resp` is the platform's synthetic poll-mode "queued"
* envelope rather than a real agent reply. For these the send is
* acknowledged-but-pending: the user's message landed and the agent
* is working, but there is no reply yet — the terminal AGENT_MESSAGE
* push will arrive later over the WebSocket. Treating this as a
* terminal response (the pre-fix behaviour) cleared the "agent is
* working" indicator the instant the POST returned, so an external
* workspace turn looked dead even though work had not started. */
export function isPollQueuedResponse(resp: A2AResponse | null | undefined): boolean {
return !!resp && resp.status === "queued" && resp.delivery_mode === "poll";
}
/** Hard ceiling on how long the "agent is working" indicator stays up
* for a poll-mode turn with no reply. The terminal AGENT_MESSAGE push
* normally clears it well before this. The cap exists so a poll-mode
* workspace that is offline / never consumes its queue doesn't pin a
* spinner forever — at which point we surface an honest, actionable
* error instead of an opaque dead spinner. Generous because poll
* agents (an operator laptop) can legitimately take minutes to wake,
* poll, and respond; the goal is "eventually honest", not fail-fast. */
export const POLL_QUEUED_REPLY_TIMEOUT_MS = 15 * 60 * 1000;
export function extractReplyText(resp: A2AResponse): string {
const collect = (parts: A2APart[] | undefined): string => {
if (!parts) return "";
@@ -59,14 +93,29 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
const sendInFlightRef = useRef(false);
const sendingFromAPIRef = useRef(false);
const sendTokenRef = useRef(0);
// Safety-net timer armed only for poll-mode ("queued") turns: the
// POST returns immediately with no reply, so the normal
// POST-resolves-→-clear-spinner path can't drive the indicator. The
// terminal AGENT_MESSAGE WebSocket push clears it via
// releaseSendGuards (which also clears this timer); the timer is the
// backstop for an offline poll agent that never consumes its queue.
const pollTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const optionsRef = useRef(options);
optionsRef.current = options;
const clearPollTimeout = useCallback(() => {
if (pollTimeoutRef.current !== null) {
clearTimeout(pollTimeoutRef.current);
pollTimeoutRef.current = null;
}
}, []);
const releaseSendGuards = useCallback(() => {
clearPollTimeout();
setSending(false);
sendingFromAPIRef.current = false;
sendInFlightRef.current = false;
}, []);
}, [clearPollTimeout]);
const clearError = useCallback(() => setError(null), []);
@@ -146,6 +195,33 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
sendInFlightRef.current = false;
return;
}
// Poll-mode ("queued") turn: the message landed and the
// external/MCP agent will pick it up on its next poll, but
// there is NO reply in this response. Pre-fix this fell
// through to releaseSendGuards() below and the "agent is
// working" indicator vanished the instant the POST returned —
// an external-workspace turn looked dead even though work had
// not started. Instead, keep `sending` true so the existing
// thinking indicator (the same one internal agents use)
// persists as a "received — agent is working" state; the
// terminal AGENT_MESSAGE WebSocket push (consumed by
// useChatSocket → onAgentMessage / onSendComplete →
// releaseSendGuards) clears it when the real reply arrives,
// exactly the path an internal async reply already uses.
if (isPollQueuedResponse(resp)) {
clearPollTimeout();
pollTimeoutRef.current = setTimeout(() => {
if (sendTokenRef.current !== myToken) return;
if (!sendingFromAPIRef.current) return;
releaseSendGuards();
setError(
"No response yet from this agent — it may be offline or " +
"busy. Your message was delivered and is queued; the " +
"reply will appear here if the agent picks it up.",
);
}, POLL_QUEUED_REPLY_TIMEOUT_MS);
return;
}
const replyText = extractReplyText(resp);
const replyFiles = extractFilesFromTask(
(resp?.result ?? {}) as Record<string, unknown>,
@@ -167,9 +243,15 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
setError("Failed to send message — agent may be unreachable");
});
},
[workspaceId, sending, uploading],
[workspaceId, sending, uploading, clearPollTimeout],
);
// Drop the poll-mode safety timer on unmount / workspace switch so a
// stale timeout can't fire setError against a panel the user has
// already navigated away from. sendTokenRef guards correctness if it
// ever did fire; this just avoids the wasted timer + setState churn.
useEffect(() => clearPollTimeout, [clearPollTimeout]);
return {
sending,
uploading,
+1 -2
View File
@@ -8,7 +8,6 @@ import {
type PreflightResult,
type Template,
} from "@/lib/deploy-preflight";
import { isSaaSTenant } from "@/lib/tenant";
import { MissingKeysModal } from "@/components/MissingKeysModal";
/**
@@ -106,7 +105,7 @@ export function useTemplateDeploy(
const ws = await api.post<{ id: string }>("/workspaces", {
name: template.name,
template: template.id,
tier: isSaaSTenant() ? 4 : template.tier,
tier: template.tier,
canvas: coords,
...(model ? { model } : {}),
});
+4 -1
View File
@@ -30,7 +30,10 @@
{"name": "openclaw", "repo": "molecule-ai/molecule-ai-workspace-template-openclaw", "ref": "main"},
{"name": "codex", "repo": "molecule-ai/molecule-ai-workspace-template-codex", "ref": "main"},
{"name": "langgraph", "repo": "molecule-ai/molecule-ai-workspace-template-langgraph", "ref": "main"},
{"name": "autogen", "repo": "molecule-ai/molecule-ai-workspace-template-autogen", "ref": "main"}
{"name": "crewai", "repo": "molecule-ai/molecule-ai-workspace-template-crewai", "ref": "main"},
{"name": "autogen", "repo": "molecule-ai/molecule-ai-workspace-template-autogen", "ref": "main"},
{"name": "deepagents", "repo": "molecule-ai/molecule-ai-workspace-template-deepagents", "ref": "main"},
{"name": "gemini-cli", "repo": "molecule-ai/molecule-ai-workspace-template-gemini-cli", "ref": "main"}
],
"org_templates": [
{"name": "molecule-dev", "repo": "molecule-ai/molecule-ai-org-template-molecule-dev", "ref": "main"},
@@ -1,376 +0,0 @@
#!/usr/bin/env bash
# Staging E2E — fresh-provision peer-visibility gate via the LITERAL MCP path.
#
# WHY THIS EXISTS
# ---------------
# Hermes and OpenClaw were repeatedly reported "fleet-verified / cascade-
# complete" because the *proxy* signals were green:
# - registry-registration + heartbeat (Hermes), and
# - model round-trip 200 (OpenClaw).
# But a freshly-provisioned workspace, asked on canvas "can you see your
# peers", actually FAILS:
# - Hermes: 401 on the molecule MCP `list_peers` call,
# - OpenClaw: falls back to native `sessions_list`, sees no platform peers.
# Tasks #142/#159 were even marked "completed" under this same proxy flaw.
#
# This script codifies the LITERAL user-facing path so it can never silently
# regress: it provisions a brand-new throwaway org + sibling workspaces via
# the real control-plane provisioning path, then for each runtime that should
# have platform peer-visibility it drives the EXACT MCP call the canvas agent
# makes — `POST /workspaces/:id/mcp` JSON-RPC tools/call name=list_peers,
# authenticated by that workspace's own bearer token through the real
# WorkspaceAuth + MCPRateLimiter middleware chain. It then asserts:
# (1) HTTP 200,
# (2) JSON-RPC `result` present (NOT an `error` object — a -32000
# "tool call failed" or a 401 from WorkspaceAuth fails here),
# (3) the returned peer set CONTAINS the other provisioned sibling
# workspace IDs — not an empty list, not a native-sessions fallback.
#
# This is NOT a proxy. It does not look at a registry row, /health, the
# heartbeat table, or `GET /registry/:id/peers`. It drives the byte-for-byte
# JSON-RPC envelope that mcp_molecule_list_peers issues from a real agent.
#
# It is written to FAIL on today's broken Hermes/OpenClaw behavior and go
# green only when the in-flight root-cause fixes (Hermes-401, OpenClaw MCP
# wiring) actually land. That is the point: it is the objective proof gate.
#
# AUTH MODEL (mirrors tests/e2e/test_staging_full_saas.sh)
# --------------------------------------------------------
# Single MOLECULE_ADMIN_TOKEN (= CP_ADMIN_API_TOKEN on Railway staging)
# drives: POST /cp/admin/orgs (provision), GET
# /cp/admin/orgs/:slug/admin-token (per-tenant token), DELETE
# /cp/admin/tenants/:slug (teardown). The per-tenant admin token drives
# tenant workspace creation; each workspace's OWN auth_token (returned by
# POST /workspaces) drives its MCP call.
#
# Required env:
# MOLECULE_ADMIN_TOKEN CP admin bearer — Railway staging CP_ADMIN_API_TOKEN
# Optional env:
# MOLECULE_CP_URL default https://staging-api.moleculesai.app
# E2E_RUN_ID slug suffix; CI passes ${GITHUB_RUN_ID}
# PV_RUNTIMES space list; default "hermes openclaw claude-code"
# E2E_PROVISION_TIMEOUT_SECS default 1800 (hermes/openclaw cold EC2 budget)
# E2E_MINIMAX_API_KEY / E2E_ANTHROPIC_API_KEY / E2E_OPENAI_API_KEY
# LLM provider key injected so the runtime can boot
# E2E_KEEP_ORG 1 → skip teardown (local debugging only)
#
# Exit codes:
# 0 every runtime saw its peers via the literal MCP call
# 1 generic failure
# 2 missing required env
# 3 provisioning timed out
# 4 teardown left orphan resources
# 10 peer-visibility regression reproduced (the gate firing as designed)
set -uo pipefail
CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}"
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLECULE_ADMIN_TOKEN required — Railway staging CP_ADMIN_API_TOKEN}"
RUN_ID_SUFFIX="${E2E_RUN_ID:-$(date +%H%M%S)-$$}"
PV_RUNTIMES="${PV_RUNTIMES:-hermes openclaw claude-code}"
PROVISION_TIMEOUT_SECS="${E2E_PROVISION_TIMEOUT_SECS:-1800}"
# Slug MUST start with 'e2e-' so the sweep-stale-e2e-orgs safety net
# (EPHEMERAL_PREFIXES) catches any leak this run fails to tear down.
SLUG="e2e-pv-$(date +%Y%m%d)-${RUN_ID_SUFFIX}"
SLUG=$(echo "$SLUG" | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9-' | head -c 32)
ORG_ID=""
TENANT_URL=""
TENANT_TOKEN=""
log() { echo "[$(date +%H:%M:%S)] $*"; }
fail() { echo "[$(date +%H:%M:%S)] ❌ $*" >&2; exit 1; }
ok() { echo "[$(date +%H:%M:%S)] ✅ $*"; }
admin_call() {
local method="$1" path="$2"; shift 2
curl -sS -X "$method" "$CP_URL$path" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" "$@"
}
tenant_call() {
local method="$1" path="$2"; shift 2
curl -sS -X "$method" "$TENANT_URL$path" \
-H "Authorization: Bearer $TENANT_TOKEN" \
-H "X-Molecule-Org-Id: $ORG_ID" \
-H "Content-Type: application/json" "$@"
}
# ─── Scoped teardown ───────────────────────────────────────────────────
# Deletes ONLY the org this run created (DELETE /cp/admin/tenants/$SLUG
# with the {"confirm":$SLUG} fat-finger guard). Never a cluster-wide
# sweep — honors feedback_cleanup_after_each_test and
# feedback_never_run_cluster_cleanup_tests_on_live_platform. The
# workflow's always() step + sweep-stale-e2e-orgs are the outer nets.
teardown() {
local rc=$?
set +e
if [ "${E2E_KEEP_ORG:-0}" = "1" ]; then
echo ""
log "[teardown] E2E_KEEP_ORG=1 — leaving $SLUG for debugging (REMEMBER TO DELETE)"
exit $rc
fi
echo ""
log "[teardown] DELETE /cp/admin/tenants/$SLUG (scoped to this run only)"
admin_call DELETE "/cp/admin/tenants/$SLUG" --max-time 120 \
-d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1
for j in $(seq 1 24); do
LIST=$(admin_call GET "/cp/admin/orgs?limit=500" 2>/dev/null)
LEAK=$(echo "$LIST" | python3 -c "
import sys, json
try: d = json.load(sys.stdin)
except Exception: print(1); sys.exit(0)
orgs = d if isinstance(d, list) else d.get('orgs', [])
print(sum(1 for o in orgs if o.get('slug') == '$SLUG' and o.get('instance_status') not in ('purged',) and o.get('status') != 'purged'))
" 2>/dev/null || echo 1)
if [ "$LEAK" = "0" ]; then
log "[teardown] ✓ $SLUG purged (after ${j}x5s)"
exit $rc
fi
sleep 5
done
echo "::warning::[teardown] $SLUG still present after 120s — sweep-stale-e2e-orgs will catch it within MAX_AGE_MINUTES" >&2
[ $rc -eq 0 ] && rc=4
exit $rc
}
trap teardown EXIT INT TERM
# ─── 1. Provision the throwaway org ────────────────────────────────────
log "1/6 POST /cp/admin/orgs — slug=$SLUG"
CREATE=$(admin_call POST /cp/admin/orgs \
-d "{\"slug\":\"$SLUG\",\"name\":\"E2E peer-visibility $SLUG\",\"owner_user_id\":\"e2e-runner:$SLUG\"}")
ORG_ID=$(echo "$CREATE" | python3 -c "import sys,json; print(json.load(sys.stdin).get('id',''))" 2>/dev/null)
[ -n "$ORG_ID" ] || fail "org creation failed: $(echo "$CREATE" | head -c 300)"
log " ORG_ID=$ORG_ID"
# ─── 2. Wait for tenant EC2 + DNS ──────────────────────────────────────
log "2/6 waiting for tenant instance_status=running (cold EC2 + cloudflared)..."
DEADLINE=$(( $(date +%s) + PROVISION_TIMEOUT_SECS ))
while true; do
[ "$(date +%s)" -gt "$DEADLINE" ] && fail "tenant never came up within ${PROVISION_TIMEOUT_SECS}s"
STATUS=$(admin_call GET "/cp/admin/orgs?limit=500" 2>/dev/null | python3 -c "
import sys, json
try: d = json.load(sys.stdin)
except Exception: sys.exit(0)
orgs = d if isinstance(d, list) else d.get('orgs', [])
for o in orgs:
if o.get('slug') == '$SLUG':
print(o.get('instance_status') or o.get('status') or 'unknown'); break
" 2>/dev/null)
case "$STATUS" in running|online|ready) break ;; esac
sleep 10
done
log " tenant status=$STATUS"
# ─── 3. Per-tenant admin token + tenant URL ────────────────────────────
log "3/6 fetching per-tenant admin token..."
TT_RESP=$(admin_call GET "/cp/admin/orgs/$SLUG/admin-token")
TENANT_TOKEN=$(echo "$TT_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin).get('admin_token',''))" 2>/dev/null)
[ -n "$TENANT_TOKEN" ] || fail "tenant token fetch failed: $(echo "$TT_RESP" | head -c 200)"
CP_HOST=$(echo "$CP_URL" | sed -E 's#^https?://##; s#/.*$##')
case "$CP_HOST" in
api.*) DERIVED_DOMAIN="${CP_HOST#api.}" ;;
staging-api.*) DERIVED_DOMAIN="staging.${CP_HOST#staging-api.}" ;;
*) DERIVED_DOMAIN="$CP_HOST" ;;
esac
TENANT_URL="https://${SLUG}.${DERIVED_DOMAIN}"
log " tenant url: $TENANT_URL"
log "3b. waiting for tenant /health (TLS/DNS, up to 10min)..."
for i in $(seq 1 120); do
curl -fsS "$TENANT_URL/health" -m 5 -k >/dev/null 2>&1 && { log " /health ok (attempt $i)"; break; }
sleep 5
done
# ─── 4. Provision the parent + one sibling per runtime under test ──────
# Inject the LLM provider key so each runtime can authenticate at boot.
# Priority: MiniMax → direct-Anthropic → OpenAI (mirrors
# test_staging_full_saas.sh's secrets-injection chain).
SECRETS_JSON='{}'
if [ -n "${E2E_MINIMAX_API_KEY:-}" ]; then
SECRETS_JSON=$(python3 -c "import json,os;k=os.environ['E2E_MINIMAX_API_KEY'];print(json.dumps({'ANTHROPIC_BASE_URL':'https://api.minimax.io/anthropic','ANTHROPIC_AUTH_TOKEN':k,'MINIMAX_API_KEY':k}))")
elif [ -n "${E2E_ANTHROPIC_API_KEY:-}" ]; then
SECRETS_JSON=$(python3 -c "import json,os;k=os.environ['E2E_ANTHROPIC_API_KEY'];print(json.dumps({'ANTHROPIC_API_KEY':k}))")
elif [ -n "${E2E_OPENAI_API_KEY:-}" ]; then
SECRETS_JSON=$(python3 -c "import json,os;k=os.environ['E2E_OPENAI_API_KEY'];print(json.dumps({'OPENAI_API_KEY':k,'OPENAI_BASE_URL':'https://api.openai.com/v1','MODEL_PROVIDER':'openai:gpt-4o','HERMES_INFERENCE_PROVIDER':'custom','HERMES_CUSTOM_BASE_URL':'https://api.openai.com/v1','HERMES_CUSTOM_API_KEY':k,'HERMES_CUSTOM_API_MODE':'chat_completions'}))")
fi
log "4/6 provisioning parent (claude-code) + one sibling per runtime under test..."
P_RESP=$(tenant_call POST /workspaces \
-d "{\"name\":\"pv-parent\",\"runtime\":\"claude-code\",\"tier\":3,\"secrets\":$SECRETS_JSON}")
PARENT_ID=$(echo "$P_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin).get('id',''))" 2>/dev/null)
[ -n "$PARENT_ID" ] || fail "parent create failed: $(echo "$P_RESP" | head -c 300)"
log " PARENT_ID=$PARENT_ID"
# WS_IDS[runtime]=id ; WS_TOKENS[runtime]=auth_token (the MCP bearer)
declare -A WS_IDS WS_TOKENS
ALL_WS_IDS="$PARENT_ID"
for rt in $PV_RUNTIMES; do
R=$(tenant_call POST /workspaces \
-d "{\"name\":\"pv-$rt\",\"runtime\":\"$rt\",\"tier\":2,\"parent_id\":\"$PARENT_ID\",\"secrets\":$SECRETS_JSON}")
WID=$(echo "$R" | python3 -c "import sys,json; print(json.load(sys.stdin).get('id',''))" 2>/dev/null)
# auth_token is top-level for container runtimes; external-like nest it
# under connection.auth_token (verified vs staging response shape).
WTOK=$(echo "$R" | python3 -c "
import sys, json
try: d = json.load(sys.stdin)
except Exception: print(''); sys.exit(0)
print(d.get('auth_token') or d.get('connection', {}).get('auth_token') or '')
" 2>/dev/null)
[ -n "$WID" ] || fail "$rt workspace create failed: $(echo "$R" | head -c 300)"
[ -n "$WTOK" ] || fail "$rt workspace did not return an auth_token — cannot drive its MCP call (resp: $(echo "$R" | head -c 300))"
WS_IDS[$rt]="$WID"
WS_TOKENS[$rt]="$WTOK"
ALL_WS_IDS="$ALL_WS_IDS $WID"
log " $rt$WID"
done
# ─── 5. Wait for every sibling online ──────────────────────────────────
log "5/6 waiting for all workspaces status=online (up to ${PROVISION_TIMEOUT_SECS}s — cold boot)..."
WS_DEADLINE=$(( $(date +%s) + PROVISION_TIMEOUT_SECS ))
for rt in $PV_RUNTIMES; do
wid="${WS_IDS[$rt]}"
LAST=""
while true; do
[ "$(date +%s)" -gt "$WS_DEADLINE" ] && fail "$rt ($wid) never reached online (last=$LAST)"
S=$(tenant_call GET "/workspaces/$wid" 2>/dev/null | python3 -c "
import sys, json
try: d = json.load(sys.stdin)
except Exception: sys.exit(0)
w = d.get('workspace') if isinstance(d.get('workspace'), dict) else d
print(w.get('status') or '')
" 2>/dev/null)
[ "$S" != "$LAST" ] && { log " $rt$S"; LAST="$S"; }
case "$S" in
online) break ;;
failed) sleep 10 ;; # transient: bootstrap-watcher 5-min deadline, heartbeat recovers
*) sleep 10 ;;
esac
done
ok " $rt online"
done
# ─── 6. THE GATE — literal mcp_molecule_list_peers via POST /:id/mcp ────
# This is the byte-for-byte user-facing call. NOT GET /registry/:id/peers,
# NOT /health, NOT the heartbeat table. JSON-RPC 2.0 tools/call,
# name=list_peers, authenticated by the workspace's OWN bearer token
# through WorkspaceAuth + MCPRateLimiter.
log "6/6 driving the LITERAL list_peers MCP call per runtime..."
echo ""
RPC_BODY='{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"list_peers","arguments":{}}}'
REGRESSED=0
declare -A VERDICT
for rt in $PV_RUNTIMES; do
wid="${WS_IDS[$rt]}"
wtok="${WS_TOKENS[$rt]}"
# The expected peer set = every OTHER provisioned workspace (parent +
# the sibling runtimes), excluding the caller itself.
EXPECT_IDS=$(echo "$ALL_WS_IDS" | tr ' ' '\n' | grep -v "^${wid}$" | grep -v '^$')
set +e
RESP=$(curl -sS -X POST "$TENANT_URL/workspaces/$wid/mcp" \
-H "Authorization: Bearer $wtok" \
-H "X-Molecule-Org-Id: $ORG_ID" \
-H "Content-Type: application/json" \
-d "$RPC_BODY" \
-o /tmp/pv_mcp_body.json -w "%{http_code}" 2>/dev/null)
set -e
HTTP_CODE="$RESP"
BODY=$(cat /tmp/pv_mcp_body.json 2>/dev/null || echo '')
echo "--- $rt (ws=$wid) ---"
echo " HTTP $HTTP_CODE"
echo " body: $(echo "$BODY" | head -c 600)"
# (1) HTTP 200 — a 401 (WorkspaceAuth reject, the Hermes symptom) fails here.
if [ "$HTTP_CODE" != "200" ]; then
echo "$rt: list_peers MCP call returned HTTP $HTTP_CODE (expected 200)"
VERDICT[$rt]="FAIL(http=$HTTP_CODE)"
REGRESSED=1
continue
fi
# (2) JSON-RPC result present, not an error object.
PARSE=$(echo "$BODY" | python3 -c "
import sys, json
expect = set(filter(None, '''$EXPECT_IDS'''.split()))
try:
d = json.load(sys.stdin)
except Exception as e:
print('PARSE_ERROR:' + str(e)); sys.exit(0)
if isinstance(d, dict) and d.get('error') is not None:
print('RPC_ERROR:' + json.dumps(d['error'])[:200]); sys.exit(0)
res = d.get('result') if isinstance(d, dict) else None
if res is None:
print('NO_RESULT'); sys.exit(0)
# MCP tools/call result shape: {content:[{type:text,text:'<json or prose>'}]}
text = ''
if isinstance(res, dict):
for c in res.get('content', []):
if c.get('type') == 'text':
text += c.get('text', '')
text_l = text.lower()
# Native-sessions fallback signature (the OpenClaw symptom): the agent
# answered from its own runtime session list, not the platform peer set.
if 'sessions_list' in text_l or 'no platform peers' in text_l or 'native session' in text_l:
print('NATIVE_FALLBACK:' + text[:200]); sys.exit(0)
# The expected sibling IDs must literally appear in the returned peer text.
found = sorted(i for i in expect if i in text)
missing = sorted(expect - set(found))
if not expect:
print('NO_EXPECTED_PEERS_CONFIGURED'); sys.exit(0)
if missing:
print('MISSING_PEERS:found=%d/%d missing=%s' % (len(found), len(expect), ','.join(m[:8] for m in missing)))
sys.exit(0)
print('OK:found=%d/%d' % (len(found), len(expect)))
" 2>/dev/null)
case "$PARSE" in
OK:*)
echo "$rt: list_peers returned 200 and contains all expected peers ($PARSE)"
VERDICT[$rt]="OK"
;;
NATIVE_FALLBACK:*)
echo "$rt: list_peers fell back to NATIVE sessions — sees no platform peers ($PARSE)"
VERDICT[$rt]="FAIL(native-fallback)"
REGRESSED=1
;;
RPC_ERROR:*|NO_RESULT|PARSE_ERROR:*)
echo "$rt: list_peers MCP call did not return a usable result ($PARSE)"
VERDICT[$rt]="FAIL(rpc=$PARSE)"
REGRESSED=1
;;
MISSING_PEERS:*)
echo "$rt: list_peers returned 200 but peer set is wrong/empty ($PARSE)"
VERDICT[$rt]="FAIL(peers=$PARSE)"
REGRESSED=1
;;
*)
echo "$rt: unexpected verdict '$PARSE'"
VERDICT[$rt]="FAIL(unknown)"
REGRESSED=1
;;
esac
echo ""
done
echo "=== SUMMARY — fresh-provision peer-visibility (literal MCP list_peers) ==="
for rt in $PV_RUNTIMES; do
printf ' %-14s %s\n' "$rt" "${VERDICT[$rt]:-NO_RUN}"
done
echo ""
if [ "$REGRESSED" -ne 0 ]; then
echo "✗ GATE FAILED — at least one runtime cannot see its peers via the"
echo " literal mcp_molecule_list_peers call. This is the real user-facing"
echo " failure the proxy signals (registry row / heartbeat / model 200)"
echo " were hiding. Expected RED until the Hermes-401 + OpenClaw-MCP-wiring"
echo " root-cause fixes land; goes green only when they actually do."
exit 10
fi
ok "GATE PASSED — every runtime under test sees its platform peers via the literal MCP call."
exit 0
@@ -1,160 +0,0 @@
package handlers
// Regression coverage for the POLL-mode arm of the canvas user-message
// data-loss bug (internal#470 sibling — tracked on internal#471).
//
// Bug (reported 2026-05-16 by CTO Hongming): "in canvas i sometimes lose
// my own message when i exit chat". The push-mode arm was fixed by
// #1347 (persistUserMessageAtIngest — a SYNCHRONOUS, before-dispatch,
// context.WithoutCancel INSERT). #1347's framing asserted "poll-mode
// workspaces were never affected — logA2AReceiveQueued already persists
// at ingest". That assertion is OVERSTATED.
//
// Hongming's tenant (slug `hongming`, org 2c940477-...) has 4 workspaces,
// ALL runtime=external with empty URL → ALL delivery_mode=poll (proven
// empirically: a benign A2A probe returns the synthetic
// {"delivery_mode":"poll","status":"queued"} envelope for every one).
// So his reported loss is the POLL path, NOT the push path #1347 fixes.
//
// Root cause (poll arm): the poll-mode short-circuit (a2a_proxy.go ~402)
// calls logA2AReceiveQueued and then IMMEDIATELY returns the synthetic
// 200 {status:"queued"} to the canvas. But logA2AReceiveQueued's durable
// INSERT runs inside h.goAsync(...) — a DETACHED goroutine with NO
// happens-before barrier against the HTTP response. The canvas sees 200
// ("message accepted") while the activity_logs row may not yet be — and,
// on a workspace-server restart / deploy / OOM / EC2 hibernation between
// the 200 and the goroutine's commit, NEVER will be — durable. There is
// also no fallback (unlike push-mode's legacy-INSERT fallback): a
// swallowed LogActivity error loses the message with only a log line.
// Chat-history reads activity_logs (postgres_store.go:165-187); a missing
// row = message gone on reopen. That is exactly Hongming's symptom.
//
// Fix (parity with push-mode): the poll-mode ingest persist of the
// canvas user message must be SYNCHRONOUS — committed before the queued
// 200 is returned — on a context.WithoutCancel derived context, so a
// client disconnect on chat-exit and a post-response restart cannot lose
// it. Behavior is never worse than today (best-effort; a persist error
// still returns queued).
//
// TEST DESIGN NOTE: sqlmock.ExpectationsWereMet() hangs indefinitely if
// the expected query never fires. We use a select+default+time.After
// pattern so the test FAILS fast (not hangs) when the production code
// regresses to async (the INSERT never fires before handler returns),
// while still returning promptly when all expectations are met. The
// insertDelay is kept small (50ms) to minimise suite-level timing
// impact under -race detection, where mock delays are amplified by
// the instrumenter's goroutine overhead.
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse
// is the defining contract: for a poll-mode workspace, the canvas user
// message MUST be durably INSERTed into activity_logs BEFORE the synthetic
// queued 200 is returned to the client — with NO reliance on a detached
// async goroutine completing later.
//
// The test proves the ordering by making the INSERT block briefly and
// asserting the handler does NOT return until the INSERT has completed.
// Pre-fix (INSERT in h.goAsync, response returned immediately) the
// handler returns ~instantly while the INSERT is still pending in the
// goroutine → the elapsed time is far below the injected INSERT delay and
// ExpectationsWereMet() is racy/unmet at return. Post-fix (synchronous
// persist before the queued response) the handler return is gated on the
// INSERT, so elapsed >= the injected delay and the expectation is met
// deterministically at return WITHOUT any waitAsyncForTest()/sleep.
func TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
const wsID = "ws-poll-sync-persist"
// Keep delay small: -race detection amplifies mock delays significantly.
// A 50ms delay is sufficient to prove synchronous blocking (~50× the
// normal INSERT latency) without bloating the full ./... suite runtime.
const insertDelay = 50 * time.Millisecond
expectBudgetCheck(mock, wsID)
// lookupDeliveryMode → poll, triggering the short-circuit.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
// workspace-name lookup inside logA2AReceiveQueued.
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Poll WS"))
// The durable user-message write. We delay it so a synchronous
// persist visibly gates the handler return; a detached-goroutine
// persist (pre-fix) does not. The fix must keep using
// context.WithoutCancel so this write survives a chat-exit cancel.
mock.ExpectExec("INSERT INTO activity_logs").
WillDelayFor(insertDelay).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
// callerID == "" (no X-Workspace-ID) → this is a canvas_user message,
// exactly Hongming's case.
body := `{"jsonrpc":"2.0","id":"poll-canvas-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"my own message"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
start := time.Now()
handler.ProxyA2A(c)
elapsed := time.Since(start)
// Defining assertion #1: the handler must not have returned the
// queued response before the durable INSERT committed. Pre-fix this
// fails (elapsed ≈ 0, INSERT still racing in goAsync).
if elapsed < insertDelay {
t.Fatalf("poll-mode queued response returned in %v, before the %v user-message INSERT — "+
"the message is not durable when the client/process goes away (DATA LOSS). "+
"Persist must be synchronous before the queued 200.", elapsed, insertDelay)
}
// Defining assertion #2: the durable write actually happened by the
// time the handler returned. ExpectionsWereMet() hangs indefinitely if
// the mock never fires (e.g. production code regressed to async),
// so we check it in a goroutine with a hard 2s timeout — fails fast
// (no CI hang) on regression while returning promptly on success.
expectDone := make(chan error, 1)
go func() { expectDone <- mock.ExpectationsWereMet() }()
select {
case err := <-expectDone:
if err != nil {
t.Fatalf("user-message INSERT was not durable at handler return (unmet sqlmock expectations): %v", err)
}
case <-time.After(2 * time.Second):
t.Fatalf("ExpectationsWereMet() hung for >2s — INSERT mock never fired. " +
"Likely cause: production code regressed logA2AReceiveQueued to goAsync " +
"(INSERT fires after handler returns, not before).")
}
// Sanity: still the correct poll-mode envelope + status.
if w.Code != http.StatusOK {
t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp["status"] != "queued" || resp["delivery_mode"] != "poll" {
t.Errorf("poll envelope changed: got status=%v delivery_mode=%v, want queued/poll",
resp["status"], resp["delivery_mode"])
}
}
+21 -35
View File
@@ -97,28 +97,28 @@ const maxProxyResponseBody = 10 << 20
//
// Timeout model — three independent budgets, none of which gets in each other's way:
//
// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on
// the entire request including streamed body reads, and would pre-empt
// legitimate slow cold-start flows (Claude Code first-token over OAuth
// can take 30-60s on boot; long-running agent synthesis can stream
// tokens for minutes). Total-request budget is enforced per-request
// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling).
// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on
// the entire request including streamed body reads, and would pre-empt
// legitimate slow cold-start flows (Claude Code first-token over OAuth
// can take 30-60s on boot; long-running agent synthesis can stream
// tokens for minutes). Total-request budget is enforced per-request
// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling).
//
// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2
// black-holes TCP connects (instance terminated mid-flight, security group
// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long
// enough that Cloudflare's ~100s edge timeout can fire first and surface
// a generic 502 page to canvas. 10s is well above realistic intra-region
// latencies and well below CF's edge timeout.
// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2
// black-holes TCP connects (instance terminated mid-flight, security group
// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long
// enough that Cloudflare's ~100s edge timeout can fire first and surface
// a generic 502 page to canvas. 10s is well above realistic intra-region
// latencies and well below CF's edge timeout.
//
// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end
// to response-headers-start. Configurable via
// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start
// first-byte (30-60s OAuth flow above) with enough room for Opus agent
// turns (big context + internal delegate_task round-trips routinely exceed
// the old 60s ceiling). Body streaming after headers is governed by the
// per-request context deadline, NOT this timeout — so multi-minute agent
// responses still work fine.
// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end
// to response-headers-start. Configurable via
// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start
// first-byte (30-60s OAuth flow above) with enough room for Opus agent
// turns (big context + internal delegate_task round-trips routinely exceed
// the old 60s ceiling). Body streaming after headers is governed by the
// per-request context deadline, NOT this timeout — so multi-minute agent
// responses still work fine.
//
// The point of (2) and (3) is to surface a *structured* 503 from
// handleA2ADispatchError when the workspace agent is unreachable, so canvas
@@ -399,21 +399,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
// (no Do(), no maybeMarkContainerDead). The response is a synthetic
// {status:"queued"} envelope so the caller (canvas, another workspace)
// knows delivery is acknowledged but pending consumption.
deliveryMode, deliveryModeErr := lookupDeliveryMode(ctx, workspaceID)
if deliveryModeErr != nil {
// internal#497 fail-closed: a real DB/context error on the
// delivery-mode read MUST NOT silently fall through to the push
// dispatch path — that is exactly what silently misrouted every
// poll-mode peer for 5 days under the ce2db75f regression. Surface
// a structured error so the delegation is marked failed (loud +
// retryable) instead of dispatched to the wrong path.
log.Printf("ProxyA2A: delivery-mode lookup failed for %s: %v — failing closed", workspaceID, deliveryModeErr)
return 0, nil, &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "delivery-mode lookup failed; refusing to dispatch to avoid silent misrouting"},
}
}
if deliveryMode == models.DeliveryModePoll {
if lookupDeliveryMode(ctx, workspaceID) == models.DeliveryModePoll {
if logActivity {
h.logA2AReceiveQueued(ctx, workspaceID, callerID, body, a2aMethod)
}
@@ -468,64 +468,40 @@ func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) {
return 0, 0
}
// lookupDeliveryMode returns the workspace's delivery_mode.
//
// internal#497 / RFC#497 fail-closed (SURGICAL scope): the *specific*
// failure mode that hid the ce2db75f regression for 5 days is now
// propagated instead of silently swallowed — a CONTEXT error
// (context.Canceled / context.DeadlineExceeded). Under ce2db75f the
// detached delegation goroutine ran on a cancelled request context, every
// `SELECT delivery_mode` failed `context canceled`, this function returned
// push, the poll-mode short-circuit in proxyA2ARequest was skipped, and
// poll-mode peers (e.g. an operator laptop on molecule-mcp-claude-channel)
// silently never got their a2a_receive inbox row. A transient,
// systematic-once-triggered context cancellation became permanent
// invisible misrouting. Returning that error lets the caller fail loud
// (mark the delegation failed) instead of mis-dispatching.
//
// Scope is deliberately narrow: only ctx errors propagate. Other DB
// errors retain the long-standing documented "fall back to push (today's
// synchronous behavior)" contract — that path is loud + recoverable
// (502 / SSRF reject / restart), unlike the silent poll-mode drop, and
// the surrounding proxy (incl. the sibling checkWorkspaceBudget) is
// intentionally built around that fail-open-to-push behavior. Widening
// further is an RFC#497 follow-up, not part of this P0 fix.
//
// A genuinely *absent* configuration is NOT an error and still resolves to
// push (the safe synchronous default): sql.ErrNoRows, a NULL/empty column,
// or an unrecognised value all return (push, nil).
// lookupDeliveryMode returns the workspace's delivery_mode. On any DB
// error or missing row it returns DeliveryModePush — the fail-closed
// default. "Closed" here means "fall back to today's behavior (synchronous
// dispatch)" rather than "fall back to drop the request silently into
// activity_logs where the agent might never see it." A poll-mode workspace
// that briefly reads as push will get its A2A request dispatched to the
// stored URL (or a 502 if no URL); a push-mode workspace that briefly
// reads as poll would get its request silently queued with no dispatch.
// The first failure is loud + recoverable; the second is silent.
//
// The function is intentionally lookup-only — it never mutates the row.
// The register handler (registry.go) is the only writer for delivery_mode.
//
// See #2339 PR 1 for the column + register-flow side; this is the
// proxy-side read used for the short-circuit in proxyA2ARequest.
func lookupDeliveryMode(ctx context.Context, workspaceID string) (string, error) {
func lookupDeliveryMode(ctx context.Context, workspaceID string) string {
var mode sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&mode)
if err != nil {
// internal#497: a context cancellation/deadline MUST NOT be
// swallowed into a silent push default — that is the exact 5-day
// silent-misrouting vector. Propagate so the caller fails closed.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Printf("ProxyA2A: lookupDeliveryMode(%s) context error (%v) — failing closed (NOT defaulting to push)", workspaceID, err)
return "", err
}
if !errors.Is(err, sql.ErrNoRows) {
log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push (non-ctx DB error; legacy fail-open-to-push contract)", workspaceID, err)
log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push", workspaceID, err)
}
return models.DeliveryModePush, nil
return models.DeliveryModePush
}
if !mode.Valid || mode.String == "" {
return models.DeliveryModePush, nil
return models.DeliveryModePush
}
if !models.IsValidDeliveryMode(mode.String) {
log.Printf("ProxyA2A: workspace %s has invalid delivery_mode=%q — defaulting to push", workspaceID, mode.String)
return models.DeliveryModePush, nil
return models.DeliveryModePush
}
return mode.String, nil
return mode.String
}
// logA2AReceiveQueued records a poll-mode "queued" A2A receive into
@@ -538,49 +514,26 @@ func lookupDeliveryMode(ctx context.Context, workspaceID string) (string, error)
// reads in PR 3 — that's how a poll-mode workspace receives inbound A2A
// without a public URL.
func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) {
// DATA-LOSS FIX (internal#471 — poll-mode sibling of #1347/internal#470):
// this is the ONLY durable write of a poll-mode inbound message,
// including a canvas_user message (callerID == "") typed in the canvas
// chat. It MUST be SYNCHRONOUS and complete BEFORE the caller returns
// the synthetic {status:"queued"} 200 — otherwise the canvas sees the
// send acknowledged while the activity_logs row is still racing in a
// detached goroutine, and a workspace-server restart / deploy / OOM /
// EC2 hibernation between the 200 and the goroutine's commit loses the
// user's message permanently (chat-history reads activity_logs, so a
// missing row = message gone on reopen). Hongming's tenant is entirely
// poll-mode (4 external workspaces, no URL — verified empirically), so
// his reported loss is THIS path; #1347 (push-mode, persists AFTER the
// poll short-circuit) structurally cannot cover it.
//
// Mirrors persistUserMessageAtIngest's discipline:
// - context.WithoutCancel: a client disconnect on chat-exit (which
// cancels the inbound request ctx) MUST NOT abort this write.
// - SYNCHRONOUS (no goAsync): the row must be durable before the
// queued 200 is returned to the caller.
// - Best-effort: LogActivity already logs+swallows INSERT errors, so
// a hiccup never blocks or fails the user's send (behavior for
// that one request is never worse than the pre-fix async path).
// The post-commit broadcast still fires inside LogActivity; a missed
// WebSocket event is not data loss (the durable row is the truth the
// canvas re-reads on reopen).
insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
var wsName string
db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
if wsName == "" {
wsName = workspaceID
}
summary := a2aMethod + " → " + wsName + " (queued for poll)"
LogActivity(insCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
Status: "ok",
parent := ctx
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
Status: "ok",
})
})
}
@@ -54,7 +54,6 @@ func TestPreflight_ContainerRunning_ReturnsNil(t *testing.T) {
_ = setupTestDB(t)
stub := &preflightLocalProv{running: true, err: nil}
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, h)
h.provisioner = stub
if err := h.preflightContainerHealth(context.Background(), "ws-running-123"); err != nil {
@@ -187,8 +186,8 @@ func TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT(t *testing.T) {
}
var (
callsIsRunning bool
callsContainerInspectRaw bool
callsIsRunning bool
callsContainerInspectRaw bool
callsRunningContainerNameDirect bool
)
ast.Inspect(fn.Body, func(n ast.Node) bool {
@@ -262,7 +262,6 @@ func TestProxyA2A_Upstream502_TriggersContainerDeadCheck(t *testing.T) {
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
cp := &fakeCPProv{running: false}
handler.SetCPProvisioner(cp)
@@ -325,7 +324,6 @@ func TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs(t *testing.T) {
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
cp := &fakeCPProv{running: true}
handler.SetCPProvisioner(cp)
@@ -515,7 +513,6 @@ func TestProxyA2A_AllowedSelf_SkipsAccessCheck(t *testing.T) {
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
@@ -664,18 +661,18 @@ func TestProxyA2A_CallerIDDerivedFromBearer(t *testing.T) {
// (column order: workspace_id, activity_type, source_id, target_id, ...)
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(
"ws-target", // $1 workspace_id
"a2a_receive", // $2 activity_type
sqlmock.AnyArg(), // $3 source_id — *string("ws-caller"), checked below
sqlmock.AnyArg(), // $4 target_id
sqlmock.AnyArg(), // $5 method
sqlmock.AnyArg(), // $6 summary
sqlmock.AnyArg(), // $7 request_body
sqlmock.AnyArg(), // $8 response_body
sqlmock.AnyArg(), // $9 tool_trace
sqlmock.AnyArg(), // $10 duration_ms
sqlmock.AnyArg(), // $11 status
sqlmock.AnyArg(), // $12 error_detail
"ws-target", // $1 workspace_id
"a2a_receive", // $2 activity_type
sqlmock.AnyArg(), // $3 source_id — *string("ws-caller"), checked below
sqlmock.AnyArg(), // $4 target_id
sqlmock.AnyArg(), // $5 method
sqlmock.AnyArg(), // $6 summary
sqlmock.AnyArg(), // $7 request_body
sqlmock.AnyArg(), // $8 response_body
sqlmock.AnyArg(), // $9 tool_trace
sqlmock.AnyArg(), // $10 duration_ms
sqlmock.AnyArg(), // $11 status
sqlmock.AnyArg(), // $12 error_detail
).
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -1719,6 +1716,7 @@ func TestDispatchA2A_RejectsUnsafeURL(t *testing.T) {
}
}
// --- handleA2ADispatchError ---
func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {
@@ -1805,7 +1803,6 @@ func TestMaybeMarkContainerDead_CPOnly_NotRunning(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
cp := &fakeCPProv{running: false}
handler.SetCPProvisioner(cp)
@@ -1958,7 +1955,6 @@ func TestLogA2AFailure_Smoke(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// Sync workspace-name lookup (called in the caller goroutine).
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
@@ -1977,7 +1973,6 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// Empty name from DB → summary uses the workspaceID as the name.
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
@@ -1994,7 +1989,6 @@ func TestLogA2ASuccess_Smoke(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-ok").
@@ -2011,7 +2005,6 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-err").
@@ -2235,18 +2228,12 @@ func TestProxyA2A_PushMode_NoShortCircuit(t *testing.T) {
}
}
// TestProxyA2A_PollMode_FailsClosedToPush verifies the LEGACY safety
// contract is PRESERVED for non-context DB errors: a generic DB error
// reading delivery_mode still defaults to push (today's behavior), NOT
// poll. Failing to push means a poll-mode workspace briefly attempts a
// real dispatch — visible failure (502 / SSRF rejection / restart
// cascade), not a silent drop into activity_logs where the agent might
// never look. Loud > silent, recoverable > lost.
//
// internal#497 narrows the fail-closed change to *context* errors only
// (the actual ce2db75f regression vector); generic DB errors keep this
// long-standing fail-open-to-push contract. The ctx-error fail-closed is
// covered by TestLookupDeliveryMode_ContextCanceled_FailsClosed.
// TestProxyA2A_PollMode_FailsClosedToPush verifies the safety contract:
// a DB error reading delivery_mode must default to push (the existing
// behavior), NOT poll. Failing to push means a poll-mode workspace
// briefly attempts a real dispatch — visible failure (502 / SSRF
// rejection / restart cascade), not a silent drop into activity_logs
// where the agent might never look. Loud > silent, recoverable > lost.
func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t) // empty Redis — forces resolveAgentURL DB lookup
@@ -2257,8 +2244,7 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
expectBudgetCheck(mock, wsID)
// lookupDeliveryMode hits a generic (non-context) DB error → must
// still default push (legacy contract preserved by internal#497).
// lookupDeliveryMode hits a transient DB error → must default push.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnError(sql.ErrConnDone)
@@ -2282,7 +2268,7 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] == "queued" {
t.Errorf("generic DB error on delivery_mode lookup silently queued the request — must fail-open-to-push, got body: %s", w.Body.String())
t.Errorf("DB error on delivery_mode lookup silently queued the request — must fail-closed-to-push, got body: %s", w.Body.String())
}
}
@@ -2291,37 +2277,6 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
}
}
// TestLookupDeliveryMode_ContextCanceled_FailsClosed is the internal#497
// regression test for the SECONDARY defect. It pins the exact invariant
// that hid the ce2db75f regression for 5 days: when the delivery_mode read
// fails because the context was cancelled (precisely what happened in the
// detached delegation goroutine running on a returned request context),
// lookupDeliveryMode MUST return an error and MUST NOT silently return
// "push". Returning push there is what skipped the poll-mode short-circuit
// and silently dropped 100% of poll-mode peer deliveries.
//
// A pre-cancelled context makes QueryRowContext fail with
// context.Canceled deterministically — no DB rows are mocked because the
// query never reaches a result.
func TestLookupDeliveryMode_ContextCanceled_FailsClosed(t *testing.T) {
mock := setupTestDB(t)
// The query fails on the cancelled ctx before matching; provide a
// permissive expectation so sqlmock doesn't complain about the attempt.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WillReturnError(context.Canceled)
ctx, cancel := context.WithCancel(context.Background())
cancel() // simulate the HTTP handler having returned (request ctx dead)
mode, err := lookupDeliveryMode(ctx, "ws-poll-peer")
if err == nil {
t.Fatalf("internal#497 regression: lookupDeliveryMode swallowed a context error and returned mode=%q with nil err — this is the exact 5-day silent-misrouting vector", mode)
}
if mode == models.DeliveryModePush {
t.Errorf("internal#497 regression: context error must NOT default to push (got mode=%q)", mode)
}
}
// ==================== a2aClient ResponseHeaderTimeout config ====================
func TestA2AClientResponseHeaderTimeout(t *testing.T) {
@@ -26,10 +26,6 @@ import (
// setupTestDBForQueueTests creates a sqlmock DB using QueryMatcherEqual (exact
// string matching) so that ExpectQuery/ExpectExec patterns are compared verbatim.
// Uses the same global db.DB as setupTestDB so the handler can use it.
//
// IMPORTANT: db.DB is saved before assignment and restored via t.Cleanup so
// that tests running after this one are not polluted by a closed mock.
// Same fix as setupTestDB (handlers_test.go); same root cause as mc#975.
func setupTestDBForQueueTests(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
@@ -44,8 +44,8 @@ func NewWorkspaceImageService(docker *dockerclient.Client) *WorkspaceImageServic
// AllRuntimes is the canonical list mirroring docs/workspace-runtime-package.md.
// Update both when a new template is added.
var AllRuntimes = []string{
"claude-code", "langgraph", "autogen",
"hermes", "openclaw",
"claude-code", "langgraph", "crewai", "autogen",
"deepagents", "hermes", "gemini-cli", "openclaw",
}
// RefreshResult is the per-call outcome surfaced to HTTP callers AND logged
@@ -2,7 +2,6 @@ package handlers
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
@@ -163,32 +162,8 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
},
})
// Fire-and-forget: send A2A in a background goroutine.
//
// internal#497 — the goroutine MUST NOT inherit the HTTP request's
// cancellation. `ctx` here is c.Request.Context(); the handler returns
// 202 a few lines below, which cancels that context immediately. Before
// this fix (regression ce2db75f) executeDelegation ran on the
// request-scoped ctx, so every DB op + proxy call in the detached
// goroutine failed `context canceled` the instant the 202 was written.
// That silently broke 100% of A2A peer delegations fleet-wide since
// 2026-05-12 (poll-mode peers never got their a2a_receive inbox row;
// lookupDeliveryMode swallowed the ctx error and defaulted to push).
//
// context.WithoutCancel detaches cancellation/deadline while PRESERVING
// all context values (trace/correlation/tenant ids that proxyA2ARequest
// and the broadcaster read off ctx) — this is the established pattern in
// this package (a2a_proxy.go:850, a2a_proxy_helpers.go:525,
// registry.go:822). The 30-minute ceiling matches the prior internal
// budget executeDelegation used before ce2db75f and the proxy's own
// absolute agent-dispatch ceiling (a2a_proxy.go forwardCtx).
delegationCtx, cancelDelegation := context.WithTimeout(
context.WithoutCancel(ctx), 30*time.Minute,
)
go func() {
defer cancelDelegation()
h.executeDelegation(delegationCtx, sourceID, body.TargetID, delegationID, a2aBody)
}()
// Fire-and-forget: send A2A in background goroutine
go h.executeDelegation(ctx, sourceID, body.TargetID, delegationID, a2aBody)
// Broadcast event so canvas shows delegation in real-time
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{
@@ -723,8 +698,7 @@ func (h *DelegationHandler) listDelegationsFromLedger(ctx context.Context, works
var result []map[string]interface{}
for rows.Next() {
var delegationID, callerID, calleeID, taskPreview, status string
var resultPreview, errorDetail sql.NullString
var delegationID, callerID, calleeID, taskPreview, status, resultPreview, errorDetail string
var lastHeartbeat, deadline, createdAt, updatedAt *time.Time
if err := rows.Scan(
&delegationID, &callerID, &calleeID, &taskPreview,
@@ -743,11 +717,11 @@ func (h *DelegationHandler) listDelegationsFromLedger(ctx context.Context, works
"updated_at": updatedAt,
"_ledger": true, // marker so callers know this row is from the ledger
}
if resultPreview.Valid && resultPreview.String != "" {
entry["response_preview"] = textutil.TruncateBytes(resultPreview.String, 300)
if resultPreview != "" {
entry["response_preview"] = textutil.TruncateBytes(resultPreview, 300)
}
if errorDetail.Valid && errorDetail.String != "" {
entry["error"] = errorDetail.String
if errorDetail != "" {
entry["error"] = errorDetail
}
if lastHeartbeat != nil {
entry["last_heartbeat"] = lastHeartbeat
@@ -145,54 +145,6 @@ func TestListDelegationsFromLedger_MultipleRows(t *testing.T) {
}
}
func TestListDelegationsFromLedger_NullsOmitted(t *testing.T) {
// last_heartbeat, deadline, result_preview, error_detail are all NULL.
// Handler must not panic and must omit those keys from the map.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
now := time.Now()
rows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail",
"last_heartbeat", "deadline", "created_at", "updated_at",
}).
AddRow("del-1", "ws-1", "ws-2", "task", "queued", nil, nil, nil, nil, now, now)
mock.ExpectQuery("SELECT .+ FROM delegations").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromLedger(context.Background(), "ws-1")
if len(got) != 1 {
t.Fatalf("expected 1 entry, got %d", len(got))
}
e := got[0]
if _, ok := e["last_heartbeat"]; ok {
t.Error("last_heartbeat should be absent when NULL")
}
if _, ok := e["deadline"]; ok {
t.Error("deadline should be absent when NULL")
}
if _, ok := e["response_preview"]; ok {
t.Error("response_preview should be absent when NULL result_preview")
}
if _, ok := e["error"]; ok {
t.Error("error should be absent when NULL error_detail")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromLedger_QueryError(t *testing.T) {
// Query failure returns nil — graceful fallback, no panic.
mockDB, mock, err := sqlmock.New()
@@ -16,65 +16,6 @@ import (
"github.com/gin-gonic/gin"
)
// ---------- internal#497 regression: detached goroutine ctx must outlive the handler ----------
// TestDelegate_DetachedContext_SurvivesRequestCancellation pins the
// load-bearing invariant that regression ce2db75f violated: the context
// handed to executeDelegation in the fire-and-forget goroutine must NOT be
// cancelled when the HTTP handler returns 202 (which cancels
// c.Request.Context()). Before the fix, executeDelegation ran on the
// request-scoped ctx, so every DB op + proxy call failed `context
// canceled` the instant the 202 was written — silently breaking 100% of
// A2A peer delegations fleet-wide since 2026-05-12.
//
// This test asserts the exact ctx-derivation contract used by Delegate
// (context.WithoutCancel(parent) + a timeout budget): the derived context
// (a) stays alive after the parent is cancelled, and (b) still carries
// parent values (trace/correlation/tenant ids the downstream proxy +
// broadcaster read off ctx). It is intentionally DB-free and fast.
func TestDelegate_DetachedContext_SurvivesRequestCancellation(t *testing.T) {
type ctxKey string
const traceKey ctxKey = "trace-id"
// Simulate c.Request.Context() carrying a correlation value.
parent, cancelParent := context.WithCancel(
context.WithValue(context.Background(), traceKey, "trace-abc-123"),
)
// Exact derivation Delegate uses for the detached goroutine.
delegationCtx, cancelDelegation := context.WithTimeout(
context.WithoutCancel(parent), 30*time.Minute,
)
defer cancelDelegation()
// The HTTP handler "returns 202" → request context is cancelled.
cancelParent()
if err := parent.Err(); err == nil {
t.Fatal("precondition: parent context should be cancelled after the handler returns")
}
// (a) Cancellation MUST NOT propagate to the detached context.
select {
case <-delegationCtx.Done():
t.Fatalf("regression: detached delegation ctx was cancelled by the handler returning (err=%v) — executeDelegation would fail every DB op with `context canceled`", delegationCtx.Err())
default:
// alive — correct
}
// (b) Parent values MUST still be readable (WithoutCancel preserves
// values; trace/correlation/tenant ids the proxy + broadcaster use).
if got, _ := delegationCtx.Value(traceKey).(string); got != "trace-abc-123" {
t.Errorf("detached ctx lost the parent trace value: got %q, want %q", got, "trace-abc-123")
}
// And it still has a real deadline (the 30m budget), so it is not an
// unbounded background context.
if _, hasDeadline := delegationCtx.Deadline(); !hasDeadline {
t.Error("detached ctx must carry the 30-minute timeout budget, but has no deadline")
}
}
// ---------- Delegate: missing target_id → 400 ----------
func TestDelegate_MissingTargetID(t *testing.T) {
@@ -646,12 +646,8 @@ const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path.
# external machine today, pair with the Python SDK tab.
# 1. Install openclaw CLI + the workspace runtime wheel:
# The version pin (>=0.1.999) ensures the "molecule-mcp" console
# script is present it is what keeps the workspace ALIVE on canvas
# (register-on-startup + 20s heartbeat). Older versions only ship
# a2a_mcp_server which does not heartbeat.
npm install -g openclaw@latest
pip install "molecule-ai-workspace-runtime>=0.1.999"
pip install molecule-ai-workspace-runtime
# 2. Onboard openclaw against your model provider (one-time setup).
# --non-interactive needs an explicit --provider + --model so it
@@ -61,11 +61,6 @@ func drainTestAsync() {
// setupTestDB creates a sqlmock DB and assigns it to the global db.DB.
// It also disables the SSRF URL check so that httptest.NewServer loopback
// URLs and fake hostnames (*.example) used in tests don't trigger rejections.
//
// IMPORTANT: db.DB is saved before assignment and restored via t.Cleanup so
// that tests running after this one are not polluted by a closed mock.
// This is the single root cause of the systemic CI/Platform (Go) failures on
// main HEAD 8026f020 (mc#975).
func setupTestDB(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New()
@@ -103,11 +98,6 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
return mock
}
func waitForHandlerAsyncBeforeDBCleanup(t *testing.T, h *WorkspaceHandler) {
t.Helper()
t.Cleanup(h.waitAsyncForTest)
}
// setupTestRedis creates a miniredis instance and assigns it to the global db.RDB.
func setupTestRedis(t *testing.T) *miniredis.Miniredis {
t.Helper()
@@ -407,11 +397,6 @@ func TestWorkspaceCreate(t *testing.T) {
}
func TestBuildProvisionerConfig_IncludesAwarenessSettings(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT digest FROM runtime_image_pins`).
WithArgs("claude-code").
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
@@ -2,12 +2,10 @@ package handlers
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"regexp"
"testing"
"time"
@@ -82,135 +80,117 @@ func TestInstructionsList_ByWorkspaceID(t *testing.T) {
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var result []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("invalid JSON: %v", err)
var out []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if len(result) != 2 {
t.Fatalf("expected 2 instructions, got %d", len(result))
if len(out) != 2 {
t.Errorf("expected 2 instructions, got %d", len(out))
}
if result[0].Scope != "global" || result[1].Scope != "workspace" {
t.Fatalf("expected global then workspace instructions, got %#v", result)
if out[0].Scope != "global" {
t.Errorf("first row scope: expected global, got %s", out[0].Scope)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
t.Errorf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_List_WithScopeFilter(t *testing.T) {
func TestInstructionsList_ByScope(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
rows := sqlmock.NewRows([]string{
"id", "scope", "scope_target", "title", "content", "priority", "enabled", "created_at", "updated_at",
}).AddRow("inst-1", "global", nil, "Be kind", "Always be kind", 10, true,
time.Now(), time.Now())
w, c := newGetRequest("/instructions?scope=global")
c.Request = httptest.NewRequest(http.MethodGet, "/instructions?scope=global", nil)
mock.ExpectQuery(regexp.QuoteMeta("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1 AND scope = $1 ORDER BY scope, priority DESC, created_at")).
rows := sqlmock.NewRows(instructionCols).
AddRow("inst-g", "global", nil, "Global Rule", "Follow policy.", 10, true, time.Now(), time.Now())
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1").
WithArgs("global").
WillReturnRows(rows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/instructions?scope=global", nil)
handler.List(c)
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var result []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("invalid JSON: %v", err)
var out []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if len(result) != 1 {
t.Fatalf("expected 1 instruction, got %d", len(result))
}
if result[0].Scope != "global" {
t.Errorf("expected scope 'global', got %q", result[0].Scope)
if len(out) != 1 || out[0].Scope != "global" {
t.Errorf("unexpected response: %v", out)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
t.Errorf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_List_WithWorkspaceID(t *testing.T) {
func TestInstructionsList_AllNoParams(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
wsID := "ws-test-123"
h := NewInstructionsHandler()
rows := sqlmock.NewRows([]string{
"id", "scope", "scope_target", "title", "content", "priority", "enabled", "created_at", "updated_at",
}).AddRow("inst-1", "global", nil, "Global rule", "Stay safe", 5, true,
time.Now(), time.Now()).
AddRow("inst-2", "workspace", &wsID, "WS rule", "Use HTTPS", 10, true,
time.Now(), time.Now())
w, c := newGetRequest("/instructions")
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE enabled = true AND \\(").
WithArgs(wsID).
rows := sqlmock.NewRows(instructionCols)
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1").
WillReturnRows(rows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/instructions?workspace_id="+wsID, nil)
handler.List(c)
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var result []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("invalid JSON: %v", err)
var out []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if len(result) != 2 {
t.Fatalf("expected 2 instructions, got %d", len(result))
// Empty slice, not nil
if out == nil {
t.Error("expected empty slice, got nil")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
t.Errorf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_List_QueryError(t *testing.T) {
func TestInstructionsList_DBError(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
w, c := newGetRequest("/instructions")
c.Request = httptest.NewRequest(http.MethodGet, "/instructions", nil)
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1").
WillReturnError(context.DeadlineExceeded)
WillReturnError(errors.New("connection refused"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/instructions", nil)
handler.List(c)
h.List(c)
if w.Code != http.StatusInternalServerError {
t.Fatalf("expected 500, got %d", w.Code)
t.Fatalf("expected 500, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// ── Create ──────────────────────────────────────────────────────────────────────
// ── Create ───────────────────────────────────────────────────────────────────
func TestInstructionsHandler_Create_Success(t *testing.T) {
func TestInstructionsCreate_ValidGlobal(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
w, c := newPostRequest("/instructions", map[string]interface{}{
"scope": "global",
"title": "Be Helpful",
"content": "Always be helpful to the user.",
"priority": 10,
})
mock.ExpectQuery("INSERT INTO platform_instructions").
WithArgs("global", nil, "Be kind", "Always be kind", 5).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("new-inst-id"))
WithArgs("global", nil, "Be Helpful", "Always be helpful to the user.", 10).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("new-inst-1"))
body, _ := json.Marshal(map[string]interface{}{
"scope": "global",
"title": "Be kind",
"content": "Always be kind",
"priority": 5,
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/instructions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
h.Create(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
@@ -219,8 +199,8 @@ func TestInstructionsHandler_Create_Success(t *testing.T) {
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if out["id"] != "new-inst-id" {
t.Errorf("expected id new-inst-id, got %s", out["id"])
if out["id"] != "new-inst-1" {
t.Errorf("expected id new-inst-1, got %s", out["id"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
@@ -319,65 +299,56 @@ func TestInstructionsCreate_InvalidScope(t *testing.T) {
}
}
func TestInstructionsHandler_Create_WorkspaceScopeMissingScopeTarget(t *testing.T) {
func TestInstructionsCreate_WorkspaceScopeNoTarget(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
body, _ := json.Marshal(map[string]interface{}{
w, c := newPostRequest("/instructions", map[string]interface{}{
"scope": "workspace",
"title": "Test",
"content": "Test content",
"title": "Missing Target",
"content": "Workspace scope without scope_target.",
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/instructions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
h.Create(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestInstructionsHandler_Create_ContentTooLong(t *testing.T) {
func TestInstructionsCreate_ContentTooLong(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
longContent := string(bytes.Repeat([]byte("x"), 8193))
body, _ := json.Marshal(map[string]interface{}{
// Build a string longer than maxInstructionContentLen (8192).
longContent := string(make([]byte, maxInstructionContentLen+1))
w, c := newPostRequest("/instructions", map[string]interface{}{
"scope": "global",
"title": "Test",
"title": "Too Long",
"content": longContent,
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/instructions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
h.Create(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestInstructionsHandler_Create_TitleTooLong(t *testing.T) {
func TestInstructionsCreate_TitleTooLong(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
longTitle := string(bytes.Repeat([]byte("x"), 201))
body, _ := json.Marshal(map[string]interface{}{
longTitle := string(make([]byte, 201))
w, c := newPostRequest("/instructions", map[string]interface{}{
"scope": "global",
"title": longTitle,
"content": "Short content",
"content": "Short content.",
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/instructions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
h.Create(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
@@ -871,250 +842,43 @@ func TestInstructionsResolve_ScopeTransitionOnlyGlobal(t *testing.T) {
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
var out struct {
Instructions string `json:"instructions"`
}
}
func TestInstructionsHandler_Update_NotFound(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
mock.ExpectExec(regexp.QuoteMeta("UPDATE platform_instructions SET\n\t\t\t\ttitle = COALESCE($2, title),\n\t\t\t\tcontent = COALESCE($3, content),\n\t\t\t\tpriority = COALESCE($4, priority),\n\t\t\t\tenabled = COALESCE($5, enabled),\n\t\t\t\tupdated_at = NOW()\n\t\t\t\tWHERE id = $1")).
WithArgs("nonexistent", sqlmock.AnyArg(), nil, nil, nil).
WillReturnResult(sqlmock.NewResult(0, 0))
body, _ := json.Marshal(map[string]interface{}{"title": "Updated title"})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "nonexistent"}}
c.Request = httptest.NewRequest("PUT", "/instructions/nonexistent", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Update(c)
if w.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String())
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
// Two global instructions share one section header.
if bytes.Count([]byte(out.Instructions), []byte("Platform-Wide Rules")) != 1 {
t.Error("expect exactly one 'Platform-Wide Rules' header for consecutive global rows")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
t.Errorf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_Update_ContentTooLong(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
// ─── Update: empty body (all nil — no-op update) ─────────────────────────────
longContent := string(bytes.Repeat([]byte("x"), 8193))
body, _ := json.Marshal(map[string]interface{}{"content": longContent})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "inst-1"}}
c.Request = httptest.NewRequest("PUT", "/instructions/inst-1", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Update(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestInstructionsHandler_Update_TitleTooLong(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
longTitle := string(bytes.Repeat([]byte("x"), 201))
body, _ := json.Marshal(map[string]interface{}{"title": longTitle})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "inst-1"}}
c.Request = httptest.NewRequest("PUT", "/instructions/inst-1", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Update(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// ── Delete ─────────────────────────────────────────────────────────────────────
func TestInstructionsHandler_Delete_Success(t *testing.T) {
func TestInstructionsUpdate_EmptyBody(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
mock.ExpectExec(regexp.QuoteMeta("DELETE FROM platform_instructions WHERE id = $1")).
WithArgs("inst-1").
instID := "inst-empty-update"
w, c := newPutRequest("/instructions/"+instID, map[string]interface{}{})
c.Params = []gin.Param{{Key: "id", Value: instID}}
// COALESCE(nil, ...) = unchanged; still updates updated_at.
// Args order: ($1=id, $2=title, $3=content, $4=priority, $5=enabled)
mock.ExpectExec("UPDATE platform_instructions SET").
WithArgs(instID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "inst-1"}}
c.Request = httptest.NewRequest("DELETE", "/instructions/inst-1", nil)
handler.Delete(c)
h.Update(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
t.Fatalf("expected 200 for empty body, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_Delete_NotFound(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
mock.ExpectExec(regexp.QuoteMeta("DELETE FROM platform_instructions WHERE id = $1")).
WithArgs("nonexistent").
WillReturnResult(sqlmock.NewResult(0, 0))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "nonexistent"}}
c.Request = httptest.NewRequest("DELETE", "/instructions/nonexistent", nil)
handler.Delete(c)
if w.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// ── Resolve ────────────────────────────────────────────────────────────────────
func TestInstructionsHandler_Resolve_Empty(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
wsID := "ws-resolve-1"
mock.ExpectQuery("SELECT scope, title, content FROM platform_instructions WHERE enabled = true AND").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"scope", "title", "content"}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/instructions/resolve", nil)
handler.Resolve(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
if resp["workspace_id"] != wsID {
t.Errorf("expected workspace_id %q, got %v", wsID, resp["workspace_id"])
}
if resp["instructions"] != "" {
t.Errorf("expected empty instructions, got %q", resp["instructions"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_Resolve_WithInstructions(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
wsID := "ws-resolve-2"
rows := sqlmock.NewRows([]string{"scope", "title", "content"}).
AddRow("global", "Be safe", "No SSRF").
AddRow("workspace", "WS Rule", "Use HTTPS")
mock.ExpectQuery("SELECT scope, title, content FROM platform_instructions WHERE enabled = true AND").
WithArgs(wsID).
WillReturnRows(rows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/instructions/resolve", nil)
handler.Resolve(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
instructions, ok := resp["instructions"].(string)
if !ok {
t.Fatalf("instructions field is not a string: %T", resp["instructions"])
}
if instructions == "" {
t.Fatalf("expected non-empty instructions")
}
// Verify scope headers are present
if !bytes.Contains([]byte(instructions), []byte("Platform-Wide Rules")) {
t.Errorf("expected 'Platform-Wide Rules' header in instructions")
}
if !bytes.Contains([]byte(instructions), []byte("Role-Specific Rules")) {
t.Errorf("expected 'Role-Specific Rules' header in instructions")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_Resolve_MissingWorkspaceID(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: ""}}
c.Request = httptest.NewRequest("GET", "/workspaces//instructions/resolve", nil)
handler.Resolve(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// scanInstructions is called by the List handler — verify it handles
// rows.Err() gracefully without panicking.
func TestInstructionsHandler_List_ScanErrorContinues(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
rows := sqlmock.NewRows([]string{
"id", "scope", "scope_target", "title", "content", "priority", "enabled", "created_at", "updated_at",
}).AddRow("inst-1", "global", nil, "Good", "Content here", 5, true, time.Now(), time.Now()).
RowError(1, context.DeadlineExceeded) // error on row 2 (if it existed)
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1").
WillReturnRows(rows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/instructions", nil)
handler.List(c)
// Should still return 200 and the one valid row
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var result []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
// The valid row should still be returned (error is logged, not fatal)
if len(result) != 1 {
t.Fatalf("expected 1 instruction despite row error, got %d", len(result))
t.Errorf("unmet expectations: %v", err)
}
}
@@ -15,7 +15,6 @@ import (
"gopkg.in/yaml.v3"
)
// resolvePromptRef reads a prompt body from either an inline string or a
// file ref relative to the workspace's files_dir. Inline always wins when
// both are non-empty (caller-provided inline is more authoritative than a
@@ -177,7 +176,7 @@ func isEnvIdentPart(c byte) bool {
return isEnvIdentStart(c) || (c >= '0' && c <= '9')
}
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env .env and the workspace-specific .env
// (workspace overrides org root). Used by both secret injection and channel
// config expansion.
//
@@ -104,8 +104,8 @@ func TestHasUnresolvedVarRef_Resolved(t *testing.T) {
// documents this design choice; callers who need empty=resolved should
// pre-process the output before calling hasUnresolvedVarRef.
{"${VAR}", "", true},
{"${VAR}", "value", false}, // var replaced
{"$VAR", "value", false}, // bare var replaced
{"${VAR}", "value", false}, // var replaced
{"$VAR", "value", false}, // bare var replaced
{"prefix${VAR}suffix", "prefixvaluesuffix", false},
{"${A}${B}", "ab", false},
// FOO=FOO and BAR=BAR — both vars found and replaced. Expanded output
@@ -125,14 +125,14 @@ func TestHasUnresolvedVarRef_Resolved(t *testing.T) {
func TestHasUnresolvedVarRef_Unresolved(t *testing.T) {
// Expansion left the refs intact → unresolved.
cases := []struct {
orig string
orig string
expanded string
}{
{"${VAR}", "${VAR}"}, // untouched
{"$VAR", "$VAR"}, // bare untouched
{"${VAR}", "${VAR}"}, // untouched
{"$VAR", "$VAR"}, // bare untouched
{"prefix${VAR}suffix", "prefix${VAR}suffix"},
{"${A}${B}", "${A}${B}"}, // both unresolved
{"${FOO}", ""}, // empty result with var ref in original
{"${A}${B}", "${A}${B}"}, // both unresolved
{"${FOO}", ""}, // empty result with var ref in original
}
for _, tc := range cases {
t.Run(tc.orig, func(t *testing.T) {
@@ -205,8 +205,8 @@ func TestMergeCategoryRouting_WorkspaceOverrides(t *testing.T) {
"ui": {"Frontend Engineer"},
}
ws := map[string][]string{
"security": {"SRE Team"}, // narrows
"ui": {}, // drops
"security": {"SRE Team"}, // narrows
"ui": {}, // drops
"infra": {"Platform Team"}, // adds
}
r := mergeCategoryRouting(defaults, ws)
@@ -467,44 +467,6 @@ func TestExpandWithEnv_PartiallyPresent(t *testing.T) {
assert.Equal(t, "yes and ${NOT_SET}", result)
}
func TestExpandWithEnv_EmbeddedMissingProcessEnvStaysLiteral(t *testing.T) {
t.Setenv("MOL_TEST_EMBEDDED_MISSING", "")
result := expandWithEnv("prefix/${MOL_TEST_EMBEDDED_MISSING}/suffix", map[string]string{})
assert.Equal(t, "prefix/${MOL_TEST_EMBEDDED_MISSING}/suffix", result)
}
// POSIX identifier guard regression tests (CWE-78 fix).
// Keys not starting with [a-zA-Z_] must not be looked up in env or os.Getenv.
func TestExpandWithEnv_DigitPrefix_NotExpanded(t *testing.T) {
// ${0}, ${5}, ${1VAR} — numeric prefix → not a valid shell identifier.
// Guard must return "$0", "$5", "$1VAR" literally; no env lookup.
cases := []struct {
input string
want string
}{
{"${0}", "$0"},
{"${5}", "$5"},
{"${1VAR}", "$1VAR"},
{"prefix ${0} suffix", "prefix $0 suffix"},
{"$0", "$0"},
{"$5", "$5"},
{"HOME=${HOME}", "HOME=${HOME}"}, // HOME is valid but embedded in larger string
}
for _, tc := range cases {
t.Run(tc.input, func(t *testing.T) {
got := expandWithEnv(tc.input, map[string]string{})
assert.Equal(t, tc.want, got)
})
}
}
func TestExpandWithEnv_EmptyKey_ReturnsDollar(t *testing.T) {
// ${} → "$" (empty key, guard returns "$")
result := expandWithEnv("value=${}", map[string]string{})
assert.Equal(t, "value=$", result)
}
// mergeCategoryRouting tests — unions defaults with per-workspace routing.
// ── Additional coverage: mergeCategoryRouting ──────────────────────
@@ -584,8 +546,8 @@ func TestRenderCategoryRoutingYAML_SingleCategory(t *testing.T) {
func TestRenderCategoryRoutingYAML_MultipleCategoriesSorted(t *testing.T) {
routing := map[string][]string{
"zebra": {"RoleZ"},
"alpha": {"RoleA"},
"zebra": {"RoleZ"},
"alpha": {"RoleA"},
"middleware": {"RoleM"},
}
result, err := renderCategoryRoutingYAML(routing)
@@ -342,11 +342,6 @@ func TestPluginInstall_InstanceLookupError_Returns503(t *testing.T) {
// ---------- dispatch: uninstall ----------
func TestPluginUninstall_SaaS_DispatchesToEIC(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectExec("DELETE FROM workspace_plugins WHERE workspace_id").
WithArgs("ws-1", "browser-automation").
WillReturnResult(sqlmock.NewResult(0, 1))
stubReadPluginManifestViaEIC(t, func(ctx context.Context, instanceID, runtime, pluginName string) ([]byte, error) {
return []byte("name: browser-automation\nskills:\n - browse\n"), nil
})
@@ -1,53 +0,0 @@
package handlers
// plugins_install_test.go — additional coverage for plugins_install.go.
//
// Gaps filled vs. existing test files:
// - plugins_install_external_test.go: Install + Uninstall 422 (external runtime) ✓ covered
// - plugins_test.go: Install 400 (missing source, invalid body, etc.) ✓ covered
// Uninstall 400 (invalid plugin name, empty name) ✓ covered
// Download auth gate ✓ covered
// - org_import_helpers_test.go: countWorkspaces, envRequirementKey, sanitizeEnvMembers,
// flattenAndSortRequirements, collectOrgEnv ✓ covered
//
// New test added here:
// - Uninstall 503: container not running, no SaaS dispatch.
//
// NOTE: validateWorkspaceID is not called inside the Install/Uninstall handlers.
// UUID validation is the responsibility of the WorkspaceAuth middleware, so no
// 400 test is needed here for UUID format.
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
)
// TestPluginUninstall_ContainerNotRunning_Returns503 exercises the 503 path
// where neither a local Docker container nor a SaaS instance-id dispatch
// resolves. The handler must return "workspace container not running" — NOT a
// generic 500 or a misleading 422 (external-runtime) message.
func TestPluginUninstall_ContainerNotRunning_Returns503(t *testing.T) {
// No docker client + no instance-id lookup → falls through to 503.
h := NewPluginsHandler(t.TempDir(), nil, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "id", Value: "550e8400-e29b-41d4-a716-446655440000"},
{Key: "name", Value: "some-plugin"},
}
c.Request = httptest.NewRequest("DELETE",
"/workspaces/550e8400-e29b-41d4-a716-446655440000/plugins/some-plugin", nil)
h.Uninstall(c)
require.Equal(t, http.StatusServiceUnavailable, w.Code)
var body map[string]string
json.Unmarshal(w.Body.Bytes(), &body)
require.Equal(t, "workspace container not running", body["error"])
}
@@ -1,472 +0,0 @@
package handlers
// Unit tests for plugins_listing.go:
// - parseManifestYAML: full YAML, missing fields, empty YAML
// - listRegistryFiltered: empty/missing dir, no yaml, valid yaml, runtime filter
// - ListRegistry (GET /plugins): no filter, with runtime filter
// - ListAvailableForWorkspace (GET /workspaces/:id/plugins/available): runtimeLookup stub
import (
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"github.com/gin-gonic/gin"
)
// -------- parseManifestYAML --------
func TestParseManifestYAML_FullPlugin(t *testing.T) {
data := []byte(`
name: molecule-audit
version: 1.2.3
description: Security audit plugin for Claude Code
author: Molecule AI
tags:
- security
- audit
skills:
- security-scan
- compliance-check
runtimes:
- claude_code
- hermes
`)
info := parseManifestYAML("fallback-name", data)
if info.Name != "fallback-name" {
t.Errorf("Name = %q; want fallback-name", info.Name)
}
if info.Version != "1.2.3" {
t.Errorf("Version = %q; want 1.2.3", info.Version)
}
if info.Description != "Security audit plugin for Claude Code" {
t.Errorf("Description = %q; want full description", info.Description)
}
if info.Author != "Molecule AI" {
t.Errorf("Author = %q; want Molecule AI", info.Author)
}
if len(info.Tags) != 2 || info.Tags[0] != "security" || info.Tags[1] != "audit" {
t.Errorf("Tags = %v; want [security audit]", info.Tags)
}
if len(info.Skills) != 2 || info.Skills[0] != "security-scan" || info.Skills[1] != "compliance-check" {
t.Errorf("Skills = %v; want [security-scan compliance-check]", info.Skills)
}
if len(info.Runtimes) != 2 || info.Runtimes[0] != "claude_code" || info.Runtimes[1] != "hermes" {
t.Errorf("Runtimes = %v; want [claude_code hermes]", info.Runtimes)
}
}
func TestParseManifestYAML_MinimalFields(t *testing.T) {
// Only name field; all others should be zero-value.
data := []byte(`name: minimal-plugin`)
info := parseManifestYAML("fallback", data)
if info.Name != "fallback" {
t.Errorf("Name = %q; want fallback", info.Name)
}
if info.Version != "" {
t.Errorf("Version = %q; want empty", info.Version)
}
if info.Description != "" {
t.Errorf("Description = %q; want empty", info.Description)
}
if len(info.Tags) != 0 {
t.Errorf("Tags = %v; want []", info.Tags)
}
if len(info.Skills) != 0 {
t.Errorf("Skills = %v; want []", info.Skills)
}
if len(info.Runtimes) != 0 {
t.Errorf("Runtimes = %v; want []", info.Runtimes)
}
}
func TestParseManifestYAML_MissingPluginYAML(t *testing.T) {
// No plugin.yaml present → returns fallback name only.
info := parseManifestYAML("no-file", nil)
if info.Name != "no-file" {
t.Errorf("Name = %q; want no-file", info.Name)
}
}
func TestParseManifestYAML_BadYAML(t *testing.T) {
// Malformed YAML → returns fallback name only (no panic).
info := parseManifestYAML("bad-yaml", []byte("not: [yaml: at all"))
if info.Name != "bad-yaml" {
t.Errorf("Name = %q; want bad-yaml", info.Name)
}
if info.Version != "" {
t.Errorf("Version = %q; want empty after bad YAML", info.Version)
}
}
func TestParseManifestYAML_PartialFields(t *testing.T) {
// Present tags/skills/runtimes that are not []interface{} (e.g. wrong type)
// should not panic and should leave the field empty.
data := []byte(`
name: partial
tags: "not-an-array"
skills: 123
runtimes: true
`)
info := parseManifestYAML("partial", data)
if info.Name != "partial" {
t.Errorf("Name = %q; want partial", info.Name)
}
if len(info.Tags) != 0 {
t.Errorf("Tags = %v; want [] (wrong type)", info.Tags)
}
if len(info.Skills) != 0 {
t.Errorf("Skills = %v; want [] (wrong type)", info.Skills)
}
if len(info.Runtimes) != 0 {
t.Errorf("Runtimes = %v; want [] (wrong type)", info.Runtimes)
}
}
// -------- listRegistryFiltered --------
func makeTestHandler(t *testing.T, pluginsDir string) *PluginsHandler {
// Construct a minimal PluginsHandler with a nil docker client
// (filesystem paths are tested directly; container-dependent paths are
// tested separately or skipped in this file).
h := &PluginsHandler{pluginsDir: pluginsDir}
return h
}
func writePluginYAML(t *testing.T, dir, name, content string) {
path := filepath.Join(dir, name, "plugin.yaml")
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
t.Fatalf("mkdir: %v", err)
}
if err := os.WriteFile(path, []byte(content), 0o644); err != nil {
t.Fatalf("writeFile: %v", err)
}
}
func TestListRegistryFiltered_EmptyDir(t *testing.T) {
dir := t.TempDir()
h := makeTestHandler(t, dir)
got := h.listRegistryFiltered("")
if len(got) != 0 {
t.Errorf("expected empty list for empty dir; got %d plugins", len(got))
}
}
func TestListRegistryFiltered_NonExistentDir(t *testing.T) {
h := makeTestHandler(t, "/does/not/exist")
got := h.listRegistryFiltered("")
if len(got) != 0 {
t.Errorf("expected empty list for nonexistent dir; got %d plugins", len(got))
}
}
func TestListRegistryFiltered_NoPluginYAML(t *testing.T) {
// Plugin directory exists but has no plugin.yaml → fallback name only.
dir := t.TempDir()
writePluginYAML(t, dir, "no-manifest-plugin", "")
h := makeTestHandler(t, dir)
got := h.listRegistryFiltered("")
if len(got) != 1 {
t.Fatalf("expected 1 plugin; got %d", len(got))
}
if got[0].Name != "no-manifest-plugin" {
t.Errorf("Name = %q; want no-manifest-plugin", got[0].Name)
}
}
func TestListRegistryFiltered_ValidPlugin(t *testing.T) {
dir := t.TempDir()
writePluginYAML(t, dir, "molecule-audit", `
name: molecule-audit
version: 1.0.0
description: Security audit plugin
author: Molecule AI
tags:
- security
skills:
- audit
runtimes:
- hermes
`)
h := makeTestHandler(t, dir)
got := h.listRegistryFiltered("")
if len(got) != 1 {
t.Fatalf("expected 1 plugin; got %d", len(got))
}
if got[0].Name != "molecule-audit" {
t.Errorf("Name = %q; want molecule-audit", got[0].Name)
}
if got[0].Version != "1.0.0" {
t.Errorf("Version = %q; want 1.0.0", got[0].Version)
}
if len(got[0].Tags) != 1 || got[0].Tags[0] != "security" {
t.Errorf("Tags = %v; want [security]", got[0].Tags)
}
}
func TestListRegistryFiltered_FilesIgnored(t *testing.T) {
// Regular files in pluginsDir are skipped (only directories are scanned).
dir := t.TempDir()
writePluginYAML(t, dir, "real-plugin", `
name: real-plugin
version: 1.0.0
`)
f, err := os.Create(filepath.Join(dir, "not-a-plugin.txt"))
if err != nil {
t.Fatal(err)
}
f.Close()
h := makeTestHandler(t, dir)
got := h.listRegistryFiltered("")
if len(got) != 1 || got[0].Name != "real-plugin" {
t.Errorf("expected only real-plugin; got %v", got)
}
}
func TestListRegistryFiltered_RuntimeFilterMatches(t *testing.T) {
dir := t.TempDir()
writePluginYAML(t, dir, "cc-plugin", `
name: cc-plugin
runtimes: [claude_code]
`)
writePluginYAML(t, dir, "hermes-plugin", `
name: hermes-plugin
runtimes: [hermes]
`)
h := makeTestHandler(t, dir)
// With hermes filter → only hermes-plugin returned.
got := h.listRegistryFiltered("hermes")
if len(got) != 1 || got[0].Name != "hermes-plugin" {
t.Errorf("expected [hermes-plugin]; got %v", got)
}
// With claude-code filter → hyphen normalises to underscore → cc-plugin returned.
got2 := h.listRegistryFiltered("claude-code")
if len(got2) != 1 || got2[0].Name != "cc-plugin" {
t.Errorf("expected [cc-plugin] with claude-code filter; got %v", got2)
}
}
func TestListRegistryFiltered_RuntimeFilterExcludes(t *testing.T) {
// Plugin declares hermes; query asks for claude-code → plugin excluded.
dir := t.TempDir()
writePluginYAML(t, dir, "hermes-only", `
name: hermes-only
runtimes: [hermes]
`)
h := makeTestHandler(t, dir)
got := h.listRegistryFiltered("claude_code")
if len(got) != 0 {
t.Errorf("expected empty list for mismatched runtime; got %v", got)
}
}
func TestListRegistryFiltered_UnspecifiedRuntimeIncluded(t *testing.T) {
// Plugin with no runtimes field is included in any filtered query
// ("unspecified = try it" contract).
dir := t.TempDir()
writePluginYAML(t, dir, "universal-plugin", `
name: universal-plugin
runtimes: []
`)
h := makeTestHandler(t, dir)
got := h.listRegistryFiltered("any-runtime")
if len(got) != 1 || got[0].Name != "universal-plugin" {
t.Errorf("expected [universal-plugin] with any runtime filter; got %v", got)
}
}
func TestListRegistryFiltered_MultipleMatching(t *testing.T) {
dir := t.TempDir()
for _, name := range []string{"plugin-a", "plugin-b", "plugin-c"} {
writePluginYAML(t, dir, name, `name: `+name+`
runtimes: [hermes, claude_code]
`)
}
h := makeTestHandler(t, dir)
got := h.listRegistryFiltered("hermes")
if len(got) != 3 {
t.Errorf("expected 3 plugins; got %d: %v", len(got), got)
}
}
// -------- ListRegistry (GET /plugins) --------
func listRegistryReq(runtime string) (*http.Request, *httptest.ResponseRecorder, *gin.Context) {
url := "/plugins"
if runtime != "" {
url += "?runtime=" + runtime
}
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", url, nil)
return c.Request, w, c
}
func TestListRegistry_NoFilter(t *testing.T) {
dir := t.TempDir()
writePluginYAML(t, dir, "test-plugin", `
name: test-plugin
version: 0.1.0
`)
h := makeTestHandler(t, dir)
_, w, c := listRegistryReq("")
h.ListRegistry(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200; got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if len(resp) != 1 || resp[0]["name"] != "test-plugin" {
t.Errorf("unexpected response: %v", resp)
}
}
func TestListRegistry_WithRuntimeFilter(t *testing.T) {
dir := t.TempDir()
writePluginYAML(t, dir, "hermes-plugin", `
name: hermes-plugin
runtimes: [hermes]
`)
writePluginYAML(t, dir, "cc-plugin", `
name: cc-plugin
runtimes: [claude_code]
`)
h := makeTestHandler(t, dir)
_, w, c := listRegistryReq("hermes")
h.ListRegistry(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200; got %d", w.Code)
}
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 1 || resp[0]["name"] != "hermes-plugin" {
t.Errorf("expected [hermes-plugin]; got %v", resp)
}
}
func TestListRegistry_EmptyOnNoMatches(t *testing.T) {
dir := t.TempDir()
writePluginYAML(t, dir, "cc-plugin", `name: cc-plugin
runtimes: [claude_code]
`)
h := makeTestHandler(t, dir)
_, w, c := listRegistryReq("nonexistent")
h.ListRegistry(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200; got %d", w.Code)
}
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list; got %v", resp)
}
}
// -------- ListAvailableForWorkspace (GET /workspaces/:id/plugins/available) --------
func listAvailableReq(workspaceID string) (*http.Request, *httptest.ResponseRecorder, *gin.Context) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: workspaceID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+workspaceID+"/plugins/available", nil)
return c.Request, w, c
}
func TestListAvailableForWorkspace_RuntimeLookupReturnsRuntime(t *testing.T) {
dir := t.TempDir()
writePluginYAML(t, dir, "hermes-plugin", `
name: hermes-plugin
runtimes: [hermes]
`)
writePluginYAML(t, dir, "cc-plugin", `
name: cc-plugin
runtimes: [claude_code]
`)
h := makeTestHandler(t, dir)
h.runtimeLookup = func(workspaceID string) (string, error) {
return "hermes", nil
}
_, w, c := listAvailableReq("00000000-0000-0000-0000-000000000001")
h.ListAvailableForWorkspace(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200; got %d", w.Code)
}
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 1 || resp[0]["name"] != "hermes-plugin" {
t.Errorf("expected [hermes-plugin]; got %v", resp)
}
}
func TestListAvailableForWorkspace_RuntimeLookupErrors(t *testing.T) {
// runtimeLookup error → runtime="" → full registry returned.
dir := t.TempDir()
writePluginYAML(t, dir, "plugin-a", `name: plugin-a
runtimes: [hermes]
`)
writePluginYAML(t, dir, "plugin-b", `name: plugin-b
runtimes: [claude_code]
`)
h := makeTestHandler(t, dir)
h.runtimeLookup = func(workspaceID string) (string, error) {
return "", errors.New("runtime lookup failed")
}
_, w, c := listAvailableReq("00000000-0000-0000-0000-000000000002")
h.ListAvailableForWorkspace(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200; got %d", w.Code)
}
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 2 {
t.Errorf("expected 2 plugins (full registry fallback); got %d: %v", len(resp), resp)
}
}
func TestListAvailableForWorkspace_NoRuntimeLookup(t *testing.T) {
// runtimeLookup nil → full registry (no filter).
dir := t.TempDir()
writePluginYAML(t, dir, "plugin-x", `name: plugin-x`)
h := makeTestHandler(t, dir)
// runtimeLookup is nil by default from makeTestHandler.
_, w, c := listAvailableReq("00000000-0000-0000-0000-000000000003")
h.ListAvailableForWorkspace(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200; got %d", w.Code)
}
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 1 || resp[0]["name"] != "plugin-x" {
t.Errorf("expected [plugin-x]; got %v", resp)
}
}
func TestListAvailableForWorkspace_UnspecifiedRuntimePluginsAlwaysIncluded(t *testing.T) {
// Plugins with empty runtimes list should always be included
// regardless of workspace runtime.
dir := t.TempDir()
writePluginYAML(t, dir, "universal", `name: universal
runtimes: []
`)
writePluginYAML(t, dir, "cc-only", `name: cc-only
runtimes: [claude_code]
`)
h := makeTestHandler(t, dir)
h.runtimeLookup = func(id string) (string, error) { return "hermes", nil }
_, w, c := listAvailableReq("ws-001")
h.ListAvailableForWorkspace(c)
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
// "universal" has no runtimes (try-it); "cc-only" doesn't support hermes.
if len(resp) != 1 || resp[0]["name"] != "universal" {
t.Errorf("expected [universal]; got %v", resp)
}
}
@@ -629,9 +629,6 @@ func TestPluginInstall_RejectsUnknownScheme(t *testing.T) {
}
func TestPluginInstall_LocalSourceReachesContainerLookup(t *testing.T) {
mock := setupTestDB(t)
expectAllowlistAllowAll(mock)
base := t.TempDir()
pluginDir := filepath.Join(base, "demo")
_ = os.MkdirAll(pluginDir, 0o755)
@@ -958,14 +955,14 @@ func TestLogInstallLimitsOnce(t *testing.T) {
func TestRegexpEscapeForAwk(t *testing.T) {
cases := map[string]string{
"my-plugin": `my-plugin`,
"# Plugin: foo /": `# Plugin: foo \/`,
"# Plugin: a.b /": `# Plugin: a\.b \/`,
"foo[bar]": `foo\[bar\]`,
"a*b+c?": `a\*b\+c\?`,
"path|with|pipes": `path\|with\|pipes`,
`back\slash`: `back\\slash`,
"": ``,
"my-plugin": `my-plugin`,
"# Plugin: foo /": `# Plugin: foo \/`,
"# Plugin: a.b /": `# Plugin: a\.b \/`,
"foo[bar]": `foo\[bar\]`,
"a*b+c?": `a\*b\+c\?`,
"path|with|pipes": `path\|with\|pipes`,
`back\slash`: `back\\slash`,
"": ``,
}
for in, want := range cases {
got := regexpEscapeForAwk(in)
@@ -1250,7 +1247,7 @@ func TestPluginDownload_GithubSchemeStreamsTarball(t *testing.T) {
scheme: "github",
fetchFn: func(_ context.Context, _ string, dst string) (string, error) {
files := map[string]string{
"plugin.yaml": "name: remote-plugin\nversion: 1.0.0\n",
"plugin.yaml": "name: remote-plugin\nversion: 1.0.0\n",
"skills/x/SKILL.md": "---\nname: x\n---\n",
"adapters/claude_code.py": "from plugins_registry.builtins import AgentskillsAdaptor as Adaptor\n",
}
@@ -271,7 +271,6 @@ func TestGracefulPreRestart_URLResolutionError(t *testing.T) {
WorkspaceHandler: newHandlerWithTestDeps(t),
errToReturn: context.DeadlineExceeded,
}
waitForHandlerAsyncBeforeDBCleanup(t, hWrapper.WorkspaceHandler)
hWrapper.gracefulPreRestart(context.Background(), "ws-url-err-111")
time.Sleep(200 * time.Millisecond)
@@ -210,16 +210,11 @@ func (h *TemplatesHandler) List(c *gin.Context) {
model = raw.RuntimeConfig.Model
}
tier := raw.Tier
if h.wh != nil && h.wh.IsSaaS() {
tier = h.wh.DefaultTier()
}
templates = append(templates, templateSummary{
ID: id,
Name: raw.Name,
Description: raw.Description,
Tier: tier,
Tier: raw.Tier,
Runtime: raw.Runtime,
Model: model,
Models: raw.RuntimeConfig.Models,
@@ -376,11 +371,6 @@ func (h *TemplatesHandler) ListFiles(c *gin.Context) {
if err != nil || path == walkRoot {
return nil
}
// Skip symlinks to prevent path traversal via malicious symlinks
// inside the workspace config directory (OFFSEC-010).
if info.Mode()&os.ModeSymlink != 0 {
return nil
}
rel, _ := filepath.Rel(walkRoot, path)
// Enforce depth limit
if strings.Count(rel, string(filepath.Separator))+1 > depth {
@@ -847,58 +847,6 @@ func TestListFiles_FallbackToHost_WithTemplate(t *testing.T) {
}
}
func TestListFiles_FallbackToHost_SkipsSymlinks(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
tmpDir := t.TempDir()
tmplDir := filepath.Join(tmpDir, "test-agent")
if err := os.MkdirAll(tmplDir, 0755); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(tmplDir, "config.yaml"), []byte("name: Test Agent\n"), 0644); err != nil {
t.Fatal(err)
}
secret := filepath.Join(t.TempDir(), "secret.txt")
if err := os.WriteFile(secret, []byte("do-not-list"), 0600); err != nil {
t.Fatal(err)
}
if err := os.Symlink(secret, filepath.Join(tmplDir, "leaked-secret")); err != nil {
t.Fatal(err)
}
handler := NewTemplatesHandler(tmpDir, nil, nil)
mock.ExpectQuery(`SELECT name, COALESCE\(instance_id, ''\), COALESCE\(runtime, ''\) FROM workspaces WHERE id =`).
WithArgs("ws-tmpl").
WillReturnRows(sqlmock.NewRows([]string{"name", "instance_id", "runtime"}).AddRow("Test Agent", "", ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-tmpl"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-tmpl/files", nil)
handler.ListFiles(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatal(err)
}
for _, file := range resp {
if file["path"] == "leaked-secret" {
t.Fatalf("symlink should not be listed: %#v", resp)
}
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ==================== GET /workspaces/:id/files/*path ====================
func TestReadFile_PathTraversal(t *testing.T) {
@@ -1252,3 +1200,4 @@ func TestCWE78_DeleteFile_TraversalVariants(t *testing.T) {
})
}
}
@@ -340,11 +340,6 @@ func TestSSHCommandCmd_BuildsArgv(t *testing.T) {
// a workspace must still be able to access its own terminal. The CanCommunicate
// fast-path returns true when callerID == targetID.
func TestTerminalConnect_KI005_AllowsOwnTerminal(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery("SELECT COALESCE").
WithArgs("ws-alice").
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
// CanCommunicate fast-path: callerID == targetID → returns true without DB.
prev := canCommunicateCheck
canCommunicateCheck = func(callerID, targetID string) bool { return callerID == targetID }
@@ -372,11 +367,6 @@ func TestTerminalConnect_KI005_AllowsOwnTerminal(t *testing.T) {
// skip the CanCommunicate check entirely and fall through to the Docker auth path.
// We assert they get the nil-docker 503 instead of 403.
func TestTerminalConnect_KI005_SkipsCheckWithoutHeader(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery("SELECT COALESCE").
WithArgs("ws-any").
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
h := NewTerminalHandler(nil) // nil docker → 503 if reached
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -449,9 +439,6 @@ func TestTerminalConnect_KI005_AllowsSiblingWorkspace(t *testing.T) {
mock.ExpectExec(`UPDATE workspace_auth_tokens SET last_used_at`).
WithArgs(sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT COALESCE").
WithArgs("ws-dev").
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
h := NewTerminalHandler(nil)
w := httptest.NewRecorder()
@@ -476,10 +463,7 @@ func TestTerminalConnect_KI005_AllowsSiblingWorkspace(t *testing.T) {
// introduced in GH#1885: internal routing uses org tokens which are not in
// workspace_auth_tokens, so ValidateToken would always fail for them.
func TestKI005_OrgToken_SkipsValidateToken(t *testing.T) {
mock := setupTestDB(t) // no ValidateToken ExpectQuery — none should fire
mock.ExpectQuery("SELECT COALESCE").
WithArgs("ws-target").
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
setupTestDB(t) // no ValidateToken ExpectQuery — none should fire
prev := canCommunicateCheck
canCommunicateCheck = func(callerID, targetID string) bool {
// Simulate platform agent → target workspace (same org).
@@ -560,3 +544,4 @@ func TestSSHCommandCmd_ConnectTimeoutPresent(t *testing.T) {
args)
}
}
@@ -176,14 +176,15 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
id := uuid.New().String()
awarenessNamespace := workspaceAwarenessNamespace(id)
if h.IsSaaS() {
// SaaS hard gate: every hosted workspace gets its own sibling
// EC2 instance, so T4 is the only meaningful runtime boundary.
// Do not trust stale clients/templates that still send T1/T2/T3.
payload.Tier = 4
} else if payload.Tier == 0 {
// Self-hosted default remains T3. Lower tiers (T1 sandboxed,
// T2 standard) stay explicit opt-ins for low-trust local agents.
if payload.Tier == 0 {
// SaaS-aware default. SaaS → T4 (full host access; each
// workspace runs on its own sibling EC2 so the tier boundary
// is a Docker resource limit on the only container present —
// no neighbour to protect from). Self-hosted → T3 (read-write
// workspace mount + Docker daemon access, most templates'
// baseline). Lower tiers (T1 sandboxed, T2 standard) remain
// explicit opt-ins for low-trust agents. Matches the canvas
// CreateWorkspaceDialog defaults so the API and the UI agree.
payload.Tier = h.DefaultTier()
}
@@ -1,193 +0,0 @@
package handlers
import (
"bytes"
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// patchReq builds a gin context for a PATCH request to /workspaces/:id/abilities.
func patchReq(id, body string) (*http.Request, *httptest.ResponseRecorder, *gin.Context) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: id}}
c.Request = httptest.NewRequest("PATCH", "/workspaces/"+id+"/abilities", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
return c.Request, w, c
}
func TestPatchAbilities_InvalidWorkspaceID(t *testing.T) {
setupTestDB(t)
// "not-a-uuid" fails validateWorkspaceID
_, w, c := patchReq("not-a-uuid", `{"broadcast_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_EmptyBody(t *testing.T) {
setupTestDB(t)
id := "00000000-0000-0000-0000-000000000001"
// Empty JSON object — no ability fields present
_, w, c := patchReq(id, `{}`)
PatchAbilities(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]string
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["error"] != "at least one ability field required" {
t.Errorf("expected 'at least one ability field required', got %v", resp["error"])
}
}
func TestPatchAbilities_WorkspaceNotFound(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000002"
// SELECT EXISTS returns false (workspace does not exist)
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
_, w, c := patchReq(id, `{"broadcast_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_SetBroadcastEnabledTrue(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000003"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE broadcast_enabled = true
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, true).
WillReturnResult(sqlmock.NewResult(0, 1))
_, w, c := patchReq(id, `{"broadcast_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]string
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] != "updated" {
t.Errorf("expected status=updated, got %v", resp["status"])
}
}
func TestPatchAbilities_SetTalkToUserEnabledFalse(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000004"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE talk_to_user_enabled = false
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, false).
WillReturnResult(sqlmock.NewResult(0, 1))
_, w, c := patchReq(id, `{"talk_to_user_enabled":false}`)
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_BothFields(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000005"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE broadcast_enabled = false
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, false).
WillReturnResult(sqlmock.NewResult(0, 1))
// UPDATE talk_to_user_enabled = true
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, true).
WillReturnResult(sqlmock.NewResult(0, 1))
_, w, c := patchReq(id, `{"broadcast_enabled":false,"talk_to_user_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_BroadcastUpdateFails(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000006"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE fails
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, true).
WillReturnError(sql.ErrConnDone)
_, w, c := patchReq(id, `{"broadcast_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_TalkToUserUpdateFails(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000007"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE broadcast_enabled skipped (not in payload)
// UPDATE talk_to_user_enabled fails
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, false).
WillReturnError(sql.ErrConnDone)
_, w, c := patchReq(id, `{"talk_to_user_enabled":false}`)
PatchAbilities(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
}
@@ -3,7 +3,7 @@ package handlers
// workspace_broadcast.go — POST /workspaces/:id/broadcast
//
// Allows a workspace with broadcast_enabled=true to send a message to every
// non-removed agent workspace in the SAME ORG. The message is:
// non-removed agent workspace in the org. The message is:
//
// • Persisted in each recipient's activity_logs (type='broadcast_receive')
// so poll-mode agents pick it up via GET /activity.
@@ -16,11 +16,6 @@ package handlers
// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
// The handler re-validates broadcast_enabled inside the DB lookup to prevent
// TOCTOU — the middleware only proved the token is valid, not the ability.
//
// Org isolation (OFFSEC-015): recipients are scoped to the sender's org using
// a recursive CTE that walks the parent_id chain to find the org root. This
// prevents a compromised or misconfigured workspace from broadcasting to
// workspaces in other tenants' orgs.
import (
"log"
@@ -79,49 +74,11 @@ func (h *BroadcastHandler) Broadcast(c *gin.Context) {
return
}
// Find the sender's org root by walking the parent_id chain.
// Workspaces with parent_id = NULL are org roots; every other workspace
// belongs to the org identified by its topmost ancestor.
var orgRootID string
err = db.DB.QueryRowContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
FROM workspaces
WHERE id = $1
UNION ALL
SELECT w.id, w.parent_id, c.root_id
FROM workspaces w
JOIN org_chain c ON w.id = c.parent_id
)
SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
`, senderID).Scan(&orgRootID)
if err != nil {
log.Printf("Broadcast: org root lookup for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
// Collect all non-removed agent workspaces in the SAME ORG (same root_id),
// excluding the sender itself.
rows, err := db.DB.QueryContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
FROM workspaces
WHERE parent_id IS NULL
UNION ALL
SELECT w.id, w.parent_id, c.root_id
FROM workspaces w
JOIN org_chain c ON w.parent_id = c.id
)
SELECT c.id
FROM org_chain c
WHERE c.root_id = $1
AND c.id != $2
AND EXISTS (
SELECT 1 FROM workspaces w
WHERE w.id = c.id AND w.status != 'removed'
)
`, orgRootID, senderID)
// Collect all non-removed agent workspaces (excludes the sender itself).
rows, err := db.DB.QueryContext(ctx,
`SELECT id FROM workspaces WHERE status != 'removed' AND id != $1`,
senderID,
)
if err != nil {
log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
@@ -1,428 +0,0 @@
package handlers
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// -------- Org-scoped recipient query tests (OFFSEC-015) --------
// TestBroadcast_OrgScopedRecipients verifies that a broadcast from Org-A does
// NOT reach workspaces belonging to Org-B. This is the core regression test
// for OFFSEC-015: the original query had no org filter, so a workspace in
// Org-A could broadcast to every non-removed workspace in the entire DB,
// including workspaces owned by other tenants.
func TestBroadcast_OrgScopedRecipients(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
// Org-A structure:
// org-a-root (parent_id = NULL) ← sender
// ├── ws-a-child
// Org-B structure:
// org-b-root (parent_id = NULL)
// └── ws-b-child
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
wsAChild := "00000000-0000-0000-0000-000000000002"
// ws-b-child is in Org-B (different root); the org-scoped query MUST NOT include it.
// 1. Sender lookup
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Org-A Root", true))
// 2. Org root lookup — sender is its own root (parent_id = NULL)
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// 3. Org-scoped recipient query — MUST include org filter so ws-b-child is NOT included.
// The query joins on org_chain.root_id = orgRootID, which scopes to Org-A only.
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID). // orgRootID, senderID (EXCLUDED)
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsAChild)) // only Org-A child
// Activity log inserts
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(wsAChild, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello from org-a"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal response: %v", err)
}
if resp["status"] != "sent" {
t.Errorf("expected status 'sent', got %v", resp["status"])
}
// ws-b-child is in a DIFFERENT org — the org-scoped query MUST NOT include it.
// If it were included, the mock would have an unmet expectation.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations — cross-org workspace was included in broadcast: %v", err)
}
}
// TestBroadcast_OrgScoped_OrgRootSender verifies that when the sender IS the
// org root (parent_id = NULL), broadcasts still reach sibling workspaces.
func TestBroadcast_OrgScoped_OrgRootSender(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
siblingID := "00000000-0000-0000-0000-000000000002"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
// Sender is the org root — CTE returns sender's own ID as root
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// Recipients in same org, excluding sender
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello siblings"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_OrgScoped_ChildWorkspaceSender verifies that a non-root child
// workspace can broadcast to siblings in the same org.
func TestBroadcast_OrgScoped_ChildWorkspaceSender(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
orgRootID := "00000000-0000-0000-0000-000000000001"
senderID := "00000000-0000-0000-0000-000000000002" // child workspace
siblingID := "00000000-0000-0000-0000-000000000003"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Child Agent", true))
// Org root lookup — walk up to find org-a-root
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(orgRootID))
// Recipients: same org, excluding sender
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(orgRootID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"child broadcasting"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// -------- Non-regression cases --------
func TestBroadcast_NotFound(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000099"
// UUID is valid, but no workspace row matches
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnError(errors.New("workspace not found"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"test"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
func TestBroadcast_Disabled(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Disabled Agent", false))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"should not send"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusForbidden {
t.Errorf("expected 403, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal: %v", err)
}
if resp["error"] != "broadcast_disabled" {
t.Errorf("expected error 'broadcast_disabled', got %v", resp["error"])
}
}
func TestBroadcast_EmptyOrg_NoRecipients(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001" // org root, only workspace in org
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Lone Root", true))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// No other workspaces in this org
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello org"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal: %v", err)
}
if resp["delivered"] != float64(0) {
t.Errorf("expected delivered=0, got %v", resp["delivered"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
func TestBroadcast_InvalidWorkspaceID(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
body := `{"message":"test"}`
c.Request = httptest.NewRequest("POST", "/workspaces/not-a-uuid/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_MissingMessage(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
c.Request = httptest.NewRequest("POST", "/workspaces/00000000-0000-0000-0000-000000000001/broadcast", bytes.NewBufferString("{}"))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// TestBroadcast_OrgRootLookupFails verifies that if the recursive CTE for
// finding the org root errors, the handler returns 500 instead of proceeding
// with an un-scoped query that would broadcast to all orgs.
func TestBroadcast_OrgRootLookupFails(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
// Org root CTE fails
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnError(context.DeadlineExceeded)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"should not broadcast"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
// The recipient query MUST NOT be called — it would broadcast cross-org
// if the org root lookup failed silently.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_OrgScoped_SelfBroadcastExcluded verifies that broadcasting
// from a workspace does not send a broadcast_receive to the sender itself
// (the sender logs broadcast_sent, not broadcast_receive).
func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
peerID := "00000000-0000-0000-0000-000000000002"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// Recipient query MUST exclude sender via id != senderID
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerID))
// Peer receives broadcast_receive
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(peerID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
// Sender logs broadcast_sent (NOT broadcast_receive)
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"no echo to self"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
// character (U+2026) when len(msg) > max. The truncated output is max runes + "…",
// so truncating a 48-char string at max=20 produces 21 characters (20 runes + "…").
func TestBroadcast_Truncate(t *testing.T) {
cases := []struct {
msg string
max int
expect string
}{
{"short", 120, "short"}, // under max — no truncation
// exactly120chars (15) + 105 ones = 120 chars; at max=120 → unchanged
{"exactly120chars1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111", 120, "exactly120chars111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111…"},
// "this is a longer mes" = 20 runes; + "…" = 21 chars
{"this is a longer message that needs truncating", 20, "this is a longer mes…"},
// at-max boundary: 20 chars at max=20 → no truncation
{"exactly twenty chars", 20, "exactly twenty chars"},
// over max: 11 chars at max=10 → 10 + "…" = 11
{"hello world!", 10, "hello worl…"},
}
for _, tc := range cases {
result := broadcastTruncate(tc.msg, tc.max)
if result != tc.expect {
t.Errorf("broadcastTruncate(%q, %d) = %q; want %q", tc.msg, tc.max, result, tc.expect)
}
}
}
@@ -15,7 +15,6 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"gopkg.in/yaml.v3"
)
// logProvisionPanic is the deferred recover at the top of every provision
@@ -473,10 +472,9 @@ func configDirName(workspaceID string) string {
// runtime means bumping both this list and the Docker image tags.
// knownRuntimes is populated from manifest.json at service init (see
// runtime_registry.go). The package init order is:
// 1. var knownRuntimes = fallbackRuntimes
// 2. init() calls initKnownRuntimes() which replaces it if
// manifest.json is readable.
//
// 1. var knownRuntimes = fallbackRuntimes
// 2. init() calls initKnownRuntimes() which replaces it if
// manifest.json is readable.
// The fallback matters for unit tests that don't mount the manifest.
//
// "external" is a first-class runtime that intentionally does NOT
@@ -541,9 +539,6 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
// org_import.go; consolidating prevents silent drift.
model = models.DefaultModel(runtime)
}
if runtime == "claude-code" {
model = normalizeClaudeCodeModel(model)
}
// Sanitize name/role/model for YAML safety — always double-quote so
// a crafted value with a newline or colon can't terminate the scalar
@@ -559,11 +554,6 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
quoteModel := yamlQuote(model)
configYAML := fmt.Sprintf("name: %s\ndescription: %s\nversion: 1.0.0\ntier: %d\nruntime: %s\n",
quoteName, quoteRole, payload.Tier, runtime)
if runtime == "claude-code" {
if providersYAML := h.defaultTemplateProvidersYAML(runtime); providersYAML != "" {
configYAML += providersYAML + "\n"
}
}
// Model always at top level — config.py reads raw["model"] for all runtimes.
configYAML += fmt.Sprintf("model: %s\n", quoteModel)
@@ -573,11 +563,7 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
// and preflight already validates that the env vars are present before
// the agent loop starts. Hardcoding token names here caused #1028
// (expired CLAUDE_CODE_OAUTH_TOKEN baked into config.yaml).
configYAML += "runtime_config:\n"
if runtime == "claude-code" {
configYAML += fmt.Sprintf(" model: %s\n", quoteModel)
}
configYAML += " timeout: 0\n"
configYAML += "runtime_config:\n timeout: 0\n"
files["config.yaml"] = []byte(configYAML)
@@ -585,60 +571,6 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
return files
}
func normalizeClaudeCodeModel(model string) string {
model = strings.TrimSpace(model)
if before, after, ok := strings.Cut(model, "/"); ok && before != "" && after != "" {
return after
}
return model
}
func (h *WorkspaceHandler) defaultTemplateProvidersYAML(runtime string) string {
if h.configsDir == "" {
return ""
}
templateName := runtime + "-default"
templatePath, err := resolveInsideRoot(h.configsDir, templateName)
if err != nil {
log.Printf("Provisioner: default template providers skipped for runtime %s: %v", runtime, err)
return ""
}
data, err := os.ReadFile(filepath.Join(templatePath, "config.yaml"))
if err != nil {
return ""
}
var root yaml.Node
if err := yaml.Unmarshal(data, &root); err != nil {
log.Printf("Provisioner: default template providers skipped for runtime %s: invalid YAML: %v", runtime, err)
return ""
}
if len(root.Content) == 0 || root.Content[0].Kind != yaml.MappingNode {
return ""
}
mapping := root.Content[0]
for i := 0; i+1 < len(mapping.Content); i += 2 {
if mapping.Content[i].Value != "providers" {
continue
}
out := yaml.Node{
Kind: yaml.MappingNode,
Content: []*yaml.Node{
{Kind: yaml.ScalarNode, Value: "providers"},
mapping.Content[i+1],
},
}
encoded, err := yaml.Marshal(&out)
if err != nil {
log.Printf("Provisioner: default template providers skipped for runtime %s: marshal failed: %v", runtime, err)
return ""
}
return strings.TrimRight(string(encoded), "\n")
}
return ""
}
// deriveProviderFromModelSlug maps a hermes-agent model slug prefix to
// its provider name — a Go translation of the case statement in
// workspace-configs-templates/hermes/scripts/derive-provider.sh that we
@@ -144,7 +144,6 @@ func TestProvisionWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
rec := &trackingCPProv{startErr: errors.New("simulated CP rejection")}
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, h)
h.SetCPProvisioner(rec)
wsID := "ws-routes-to-cp-0123456789abcdef"
@@ -596,7 +595,6 @@ func TestRestartWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
// Mock DB so cpStopWithRetry can run without a real Postgres.
mock := setupTestDB(t)
waitForHandlerAsyncBeforeDBCleanup(t, h)
mock.MatchExpectationsInOrder(false)
// provisionWorkspaceCP runs in the goroutine and will hit secrets
// SELECTs + UPDATE workspace as failed (we make CP Start return
@@ -672,7 +670,6 @@ func TestRestartWorkspaceAuto_RoutesToDockerWhenOnlyDocker(t *testing.T) {
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, h)
stub := &stoppingLocalProv{}
h.provisioner = stub
@@ -2,7 +2,6 @@ package handlers
import (
"context"
"database/sql"
"fmt"
"net/http"
"os"
@@ -261,67 +260,6 @@ func TestEnsureDefaultConfig_ClaudeCode(t *testing.T) {
}
}
func TestEnsureDefaultConfig_ClaudeCodeCopiesProviderRegistry(t *testing.T) {
broadcaster := newTestBroadcaster()
configsDir := t.TempDir()
templateDir := filepath.Join(configsDir, "claude-code-default")
if err := os.MkdirAll(templateDir, 0o755); err != nil {
t.Fatalf("mkdir template: %v", err)
}
if err := os.WriteFile(filepath.Join(templateDir, "config.yaml"), []byte(`
name: Claude Code Agent
runtime: claude-code
providers:
- name: anthropic-oauth
auth_mode: oauth
model_aliases: [sonnet]
auth_env: [CLAUDE_CODE_OAUTH_TOKEN]
- name: minimax
auth_mode: third_party_anthropic_compat
model_prefixes: [minimax-]
base_url: https://api.minimax.io/anthropic
auth_env: [MINIMAX_API_KEY, ANTHROPIC_AUTH_TOKEN]
runtime_config:
model: sonnet
`), 0o644); err != nil {
t.Fatalf("write template: %v", err)
}
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", configsDir)
files := handler.ensureDefaultConfig("ws-code-123", models.CreateWorkspacePayload{
Name: "Code Agent",
Tier: 4,
Runtime: "claude-code",
Model: "minimax/MiniMax-M2.7",
})
var parsed struct {
Model string `yaml:"model"`
Providers []struct {
Name string `yaml:"name"`
ModelPrefixes []string `yaml:"model_prefixes"`
} `yaml:"providers"`
RuntimeConfig struct {
Model string `yaml:"model"`
} `yaml:"runtime_config"`
}
if err := yaml.Unmarshal(files["config.yaml"], &parsed); err != nil {
t.Fatalf("generated YAML invalid: %v\n%s", err, files["config.yaml"])
}
if parsed.Model != "MiniMax-M2.7" {
t.Fatalf("top-level model = %q, want MiniMax-M2.7\n%s", parsed.Model, files["config.yaml"])
}
if parsed.RuntimeConfig.Model != "MiniMax-M2.7" {
t.Fatalf("runtime_config.model = %q, want MiniMax-M2.7\n%s", parsed.RuntimeConfig.Model, files["config.yaml"])
}
if len(parsed.Providers) != 2 {
t.Fatalf("providers len = %d, want 2\n%s", len(parsed.Providers), files["config.yaml"])
}
if parsed.Providers[1].Name != "minimax" || len(parsed.Providers[1].ModelPrefixes) != 1 || parsed.Providers[1].ModelPrefixes[0] != "minimax-" {
t.Fatalf("minimax provider registry not preserved: %+v\n%s", parsed.Providers, files["config.yaml"])
}
}
func TestEnsureDefaultConfig_CustomModel(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@@ -696,11 +634,6 @@ func TestSeedInitialMemories_EmptyMemoriesNil(t *testing.T) {
// ==================== buildProvisionerConfig ====================
func TestBuildProvisionerConfig_BasicFields(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT COALESCE\(workspace_dir`).
WithArgs("ws-basic").
WillReturnRows(sqlmock.NewRows([]string{"workspace_dir", "workspace_access"}).AddRow("", "none"))
broadcaster := newTestBroadcaster()
tmpDir := t.TempDir()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", tmpDir)
@@ -745,14 +678,6 @@ func TestBuildProvisionerConfig_BasicFields(t *testing.T) {
}
func TestBuildProvisionerConfig_WorkspacePathFromEnv(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT COALESCE\(workspace_dir`).
WithArgs("ws-env").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT digest FROM runtime_image_pins`).
WithArgs("claude-code").
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@@ -414,44 +414,6 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) {
}
}
func TestWorkspaceCreate_SaaSHardForcesTier4(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
handler.SetCPProvisioner(&trackingCPProv{})
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "SaaS External Agent", nil, 4, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
WithArgs(sqlmock.AnyArg(), float64(0), float64(0)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("UPDATE workspaces SET url").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"name":"SaaS External Agent","runtime":"external","external":true,"url":"https://example.com/agent","tier":2}`
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusCreated {
t.Errorf("expected status 201, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestWorkspaceCreate_WithSecrets_Persists asserts that secrets in the create
// payload are written to workspace_secrets inside the same transaction as the
// workspace row, and that the handler returns 201.
@@ -23,8 +23,8 @@ package models
// - claude-code: "sonnet" — Anthropic's CLI accepts the short
// name and resolves it via the operator's anthropic-oauth or
// ANTHROPIC_API_KEY chain.
// - everything else (hermes, langgraph, autogen, codex, openclaw,
// external, ""): a fully-qualified
// - everything else (hermes, langgraph, crewai, autogen, deepagents,
// codex, openclaw, gemini-cli, external, ""): a fully-qualified
// vendor:model slug that the universal MODEL_PROVIDER chain in
// molecule-core PR #247 can route via per-vendor required_env.
//
@@ -21,9 +21,12 @@ func TestDefaultModel(t *testing.T) {
// as a generic "unknown" failure.
{"hermes", "anthropic:claude-opus-4-7"},
{"langgraph", "anthropic:claude-opus-4-7"},
{"crewai", "anthropic:claude-opus-4-7"},
{"autogen", "anthropic:claude-opus-4-7"},
{"deepagents", "anthropic:claude-opus-4-7"},
{"codex", "anthropic:claude-opus-4-7"},
{"openclaw", "anthropic:claude-opus-4-7"},
{"gemini-cli", "anthropic:claude-opus-4-7"},
{"external", "anthropic:claude-opus-4-7"},
// Unknown / empty — fall through to universal default rather
@@ -217,59 +217,6 @@ func TestStart_HappyPath(t *testing.T) {
}
}
func TestStart_SendsTemplateAndGeneratedConfigFiles(t *testing.T) {
tmpl := t.TempDir()
if err := os.WriteFile(filepath.Join(tmpl, "config.yaml"), []byte("name: template\n"), 0o600); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(tmpl, "adapter.py"), bytes.Repeat([]byte("x"), cpConfigFilesMaxBytes), 0o600); err != nil {
t.Fatal(err)
}
if err := os.Mkdir(filepath.Join(tmpl, "prompts"), 0o700); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(tmpl, "prompts", "system.md"), []byte("hello"), 0o600); err != nil {
t.Fatal(err)
}
var body cpProvisionRequest
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("decode request: %v", err)
}
w.WriteHeader(http.StatusCreated)
_, _ = io.WriteString(w, `{"instance_id":"i-abc123","state":"pending"}`)
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
_, err := p.Start(context.Background(), WorkspaceConfig{
WorkspaceID: "ws-1",
Runtime: "claude-code",
Tier: 4,
PlatformURL: "http://tenant",
TemplatePath: tmpl,
ConfigFiles: map[string][]byte{
"config.yaml": []byte("name: generated\n"),
},
})
if err != nil {
t.Fatalf("Start: %v", err)
}
wantConfig := base64.StdEncoding.EncodeToString([]byte("name: generated\n"))
if got := body.ConfigFiles["config.yaml"]; got != wantConfig {
t.Errorf("config.yaml payload = %q, want generated override %q", got, wantConfig)
}
wantPrompt := base64.StdEncoding.EncodeToString([]byte("hello"))
if got := body.ConfigFiles["prompts/system.md"]; got != wantPrompt {
t.Errorf("prompt payload = %q, want %q", got, wantPrompt)
}
if _, ok := body.ConfigFiles["adapter.py"]; ok {
t.Error("non-config template file adapter.py must not be sent to CP")
}
}
// TestStart_Non201ReturnsStructuredError — when CP returns 401 with a
// structured {"error":"..."} body, Start surfaces that error message.
// Verifies the defense against log-leaking raw upstream bodies.
@@ -572,9 +519,9 @@ func TestStop_4xxResponseSurfacesError(t *testing.T) {
func TestStop_2xxVariantsAllSucceed(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-ok"})
for _, code := range []int{
http.StatusOK, // 200
http.StatusAccepted, // 202
http.StatusNoContent, // 204
http.StatusOK, // 200
http.StatusAccepted, // 202
http.StatusNoContent, // 204
} {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(code)
@@ -642,11 +589,11 @@ func TestIsRunning_ParsesStateField(t *testing.T) {
_, _ = io.WriteString(w, `{"state":"`+state+`"}`)
}))
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
adminToken: "tok-xyz",
httpClient: srv.Client(),
httpClient: srv.Client(),
}
got, err := p.IsRunning(context.Background(), "ws-1")
srv.Close()
@@ -190,7 +190,7 @@ func TestEnsureLocalImage_RepoNotFound(t *testing.T) {
opts.HTTPClient = srv.Client()
opts.remoteHeadSha = nil // exercise real HTTP path
_, err := ensureLocalImageWithOpts(context.Background(), "hermes", opts)
_, err := ensureLocalImageWithOpts(context.Background(), "crewai", opts)
if err == nil {
t.Fatalf("expected error, got nil")
}
@@ -35,19 +35,6 @@ import (
// drift-risk #6.
var ErrNoBackend = errors.New("provisioner: no backend configured (zero-valued receiver)")
// ErrUnresolvableRuntime is returned by selectImage when a workspace
// names a runtime that has no resolvable image (not in RuntimeImages and
// no operator-pinned cfg.Image). RFC internal#483 + security review 4269:
// previously such a request silently fell through to DefaultImage
// (langgraph) — a user asking for crewai would get a langgraph container
// with no signal. The CTO standing directive
// (feedback_platform_must_hardgate_base_contract) is fail-closed: a
// named-but-unresolvable runtime must reject with a structured,
// runtime-naming error so the existing provision-failed notify/log path
// surfaces it, NOT silently degrade. The genuinely-unspecified (empty)
// runtime is still a distinct, legitimate path that keeps DefaultImage.
var ErrUnresolvableRuntime = errors.New("provisioner: requested runtime has no resolvable image")
// RuntimeImages maps runtime names to their Docker image refs.
// Each standalone template repo publishes its image via the reusable
// publish-template-image workflow in molecule-ci on every main merge.
@@ -117,33 +104,20 @@ type WorkspaceConfig struct {
// selectImage resolves the final Docker image ref for a workspace. The handler
// layer is the source of truth — if it set cfg.Image (the digest-pinned form
// from runtime_image_pins, #2272), honor that. Otherwise fall back to the
// runtime→tag lookup in RuntimeImages (legacy `:latest` behavior).
//
// Fail-closed contract (RFC internal#483 / security review 4269 /
// feedback_platform_must_hardgate_base_contract): if the workspace NAMES a
// runtime that resolves to no image (not in RuntimeImages, no pinned
// cfg.Image), reject with ErrUnresolvableRuntime instead of silently
// substituting DefaultImage. Pre-fix, removing crewai/deepagents/gemini-cli
// from the catalog left those create requests silently provisioning a
// langgraph container — the user asked for crewai and got langgraph with no
// signal. The error propagates through Start → markProvisionFailed, which
// already broadcasts WorkspaceProvisionFailed and records the message.
//
// The genuinely-unspecified runtime (empty cfg.Runtime, e.g. an org template
// that doesn't pin one) is an intended distinct path and still resolves to
// DefaultImage — only a NAMED-but-unresolvable runtime is rejected.
func selectImage(cfg WorkspaceConfig) (string, error) {
// runtime→tag lookup in RuntimeImages (legacy `:latest` behavior). When the
// runtime isn't recognized either, fall back to DefaultImage so Start() still
// has something to hand Docker — surfacing a "No such image" later is more
// actionable than a silent "" panic in ContainerCreate.
func selectImage(cfg WorkspaceConfig) string {
if cfg.Image != "" {
return cfg.Image, nil
return cfg.Image
}
if cfg.Runtime != "" {
if img, ok := RuntimeImages[cfg.Runtime]; ok {
return img, nil
return img
}
return "", fmt.Errorf("%w: runtime %q (known runtimes: %v)",
ErrUnresolvableRuntime, cfg.Runtime, knownRuntimes)
}
return DefaultImage, nil
return DefaultImage
}
// Workspace-access constants for #65. Matches the CHECK constraint on
@@ -215,24 +189,6 @@ const containerNamePrefix = "ws-"
// (the wiped-DB case after `docker compose down -v`).
const LabelManaged = "molecule.platform.managed"
// AgentUID / AgentGID are the uid/gid of the unprivileged `agent` user that
// every workspace template creates and drops to via `gosu agent` before
// exec'ing the runtime (the a2a_mcp_server runs under this uid). The value is
// fixed at 1000:1000 across all templates — see:
// - workspace-configs-templates/claude-code-default/Dockerfile (`useradd -u 1000 ... agent`)
// - workspace-configs-templates/hermes/Dockerfile (`useradd -u 1000 ... agent`)
// - workspace/entrypoint.sh (`exec gosu agent` — "uid 1000")
//
// Files the platform injects into /configs AFTER the entrypoint's
// `chown -R agent:agent /configs` (the post-start #418 re-injection and the
// pre-start #1877 volume write) must be owned by this uid/gid, otherwise the
// agent-uid MCP server hits EACCES reading /configs/.auth_token, sends an
// empty bearer, and the platform 401s on /registry/{id}/peers (list_peers).
const (
AgentUID = 1000
AgentGID = 1000
)
// managedLabels is the canonical label map applied to every workspace
// container + volume. Pulled out so a future addition (e.g. instance
// UUID for multi-platform-shared-daemon disambiguation) is one edit.
@@ -362,15 +318,7 @@ func (p *Provisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, e
env := buildContainerEnv(cfg)
image, imgErr := selectImage(cfg)
if imgErr != nil {
// Fail-closed: a named-but-unresolvable runtime must not silently
// become DefaultImage (RFC internal#483 / review 4269). The caller's
// error path (markProvisionFailed) broadcasts the failure + records
// the message so the canvas surfaces it.
log.Printf("Provisioner: refusing to start %s: %v", cfg.WorkspaceID, imgErr)
return "", imgErr
}
image := selectImage(cfg)
// Local-build mode (issue #63 / Task #194): when MOLECULE_IMAGE_REGISTRY
// is unset, the OSS contributor path skips the registry pull entirely
@@ -862,15 +810,6 @@ func ApplyTierConfig(hostCfg *container.HostConfig, cfg WorkspaceConfig, configM
// CopyTemplateToContainer copies files from a host directory into /configs in the container.
func (p *Provisioner) CopyTemplateToContainer(ctx context.Context, containerID, templatePath string) error {
buf, err := buildTemplateTar(templatePath)
if err != nil {
return err
}
return p.cli.CopyToContainer(ctx, containerID, "/configs", buf, container.CopyToContainerOptions{})
}
func buildTemplateTar(templatePath string) (*bytes.Buffer, error) {
// Resolve symlinks at the root before walking. filepath.Walk does
// NOT follow a symlink that IS the root — it Lstats the path, sees
// a symlink (non-directory), and emits exactly one entry without
@@ -893,15 +832,6 @@ func buildTemplateTar(templatePath string) (*bytes.Buffer, error) {
if err != nil {
return err
}
// OFFSEC-010: skip symlinks to prevent path traversal via malicious
// template symlinks (e.g. template/.ssh → /root/.ssh). filepath.Walk
// follows symlinks by default, so without this guard a crafted symlink
// inside the template directory could escape to include arbitrary host
// files in the tar archive. We intentionally skip rather than error so
// a broken symlink in an org template is a silent no-op.
if info.Mode()&os.ModeSymlink != 0 {
return nil
}
rel, err := filepath.Rel(templatePath, path)
if err != nil {
return err
@@ -942,27 +872,17 @@ func buildTemplateTar(templatePath string) (*bytes.Buffer, error) {
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to create tar from %s: %w", templatePath, err)
return fmt.Errorf("failed to create tar from %s: %w", templatePath, err)
}
if err := tw.Close(); err != nil {
return nil, fmt.Errorf("failed to close tar writer: %w", err)
return fmt.Errorf("failed to close tar writer: %w", err)
}
return &buf, nil
return p.cli.CopyToContainer(ctx, containerID, "/configs", &buf, container.CopyToContainerOptions{})
}
// buildConfigFilesTar builds the tar stream that WriteFilesToContainer streams
// into /configs via CopyToContainer. Every entry is stamped Uid/Gid = agent
// (AgentUID/AgentGID) so the files land agent-owned after extraction. This is
// the issue #418 post-start re-injection path: it runs AFTER the template
// entrypoint's `chown -R agent:agent /configs`, so without explicit ownership
// in the tar header the files extract as root:root (tar Uid/Gid default 0) and
// the agent-uid MCP server can no longer read /configs/.auth_token (and
// /configs/.platform_inbound_secret) → empty bearer → list_peers 401.
//
// Pulled out as a pure function so the ownership contract is unit-testable
// without a live Docker daemon (mirrors buildTemplateTar).
func buildConfigFilesTar(files map[string][]byte) (*bytes.Buffer, error) {
// WriteFilesToContainer writes in-memory files into /configs in the container.
func (p *Provisioner) WriteFilesToContainer(ctx context.Context, containerID string, files map[string][]byte) error {
var buf bytes.Buffer
tw := tar.NewWriter(&buf)
@@ -975,10 +895,8 @@ func buildConfigFilesTar(files map[string][]byte) (*bytes.Buffer, error) {
Typeflag: tar.TypeDir,
Name: dir + "/",
Mode: 0755,
Uid: AgentUID,
Gid: AgentGID,
}); err != nil {
return nil, fmt.Errorf("failed to write tar dir header for %s: %w", dir, err)
return fmt.Errorf("failed to write tar dir header for %s: %w", dir, err)
}
createdDirs[dir] = true
}
@@ -987,30 +905,19 @@ func buildConfigFilesTar(files map[string][]byte) (*bytes.Buffer, error) {
Name: name,
Mode: 0644,
Size: int64(len(data)),
Uid: AgentUID,
Gid: AgentGID,
}
if err := tw.WriteHeader(header); err != nil {
return nil, fmt.Errorf("failed to write tar header for %s: %w", name, err)
return fmt.Errorf("failed to write tar header for %s: %w", name, err)
}
if _, err := tw.Write(data); err != nil {
return nil, fmt.Errorf("failed to write tar data for %s: %w", name, err)
return fmt.Errorf("failed to write tar data for %s: %w", name, err)
}
}
if err := tw.Close(); err != nil {
return nil, fmt.Errorf("failed to close tar writer: %w", err)
return fmt.Errorf("failed to close tar writer: %w", err)
}
return &buf, nil
}
// WriteFilesToContainer writes in-memory files into /configs in the container,
// agent-owned (see buildConfigFilesTar).
func (p *Provisioner) WriteFilesToContainer(ctx context.Context, containerID string, files map[string][]byte) error {
buf, err := buildConfigFilesTar(files)
if err != nil {
return err
}
return p.cli.CopyToContainer(ctx, containerID, "/configs", buf, container.CopyToContainerOptions{})
return p.cli.CopyToContainer(ctx, containerID, "/configs", &buf, container.CopyToContainerOptions{})
}
// CopyToContainer exposes CopyToContainer from the Docker client for use by other packages.
@@ -1100,28 +1007,13 @@ func (p *Provisioner) ReadFromVolume(ctx context.Context, volumeName, filePath s
return clean, nil
}
// writeAuthTokenVolumeCmd is the shell command the throwaway alpine container
// runs to seed /vol/.auth_token. alpine runs it as root, so without the
// explicit `chown 1000:1000` the file stays root:root after the template
// entrypoint's `chown -R agent:agent /configs` has already run — the agent-uid
// (AgentUID) MCP server then gets EACCES reading it → empty bearer →
// list_peers 401. Pulled out as a pure function so the ownership contract is
// unit-testable without a live Docker daemon. Issue #1877.
func writeAuthTokenVolumeCmd() string {
return fmt.Sprintf(
"mkdir -p /vol && printf '%%s' $TOKEN > /vol/.auth_token && chmod 0600 /vol/.auth_token && chown %d:%d /vol/.auth_token",
AgentUID, AgentGID,
)
}
// WriteAuthTokenToVolume writes the workspace auth token into the config volume
// BEFORE the container starts, eliminating the token-injection race window where
// a restarted container could read a stale token from /configs/.auth_token before
// WriteFilesToContainer writes the new one. Issue #1877.
//
// Uses a throwaway alpine container to write directly to the named volume,
// bypassing the container lifecycle entirely. The written file is chowned to
// the agent uid/gid (see writeAuthTokenVolumeCmd).
// bypassing the container lifecycle entirely.
func (p *Provisioner) WriteAuthTokenToVolume(ctx context.Context, workspaceID, token string) error {
if p == nil || p.cli == nil {
return ErrNoBackend
@@ -1129,7 +1021,7 @@ func (p *Provisioner) WriteAuthTokenToVolume(ctx context.Context, workspaceID, t
volName := ConfigVolumeName(workspaceID)
resp, err := p.cli.ContainerCreate(ctx, &container.Config{
Image: "alpine",
Cmd: []string{"sh", "-c", writeAuthTokenVolumeCmd()},
Cmd: []string{"sh", "-c", "mkdir -p /vol && printf '%s' $TOKEN > /vol/.auth_token && chmod 0600 /vol/.auth_token"},
Env: []string{"TOKEN=" + token},
}, &container.HostConfig{
Binds: []string{volName + ":/vol"},
@@ -1,9 +1,7 @@
package provisioner
import (
"archive/tar"
"errors"
"io"
"os"
"path/filepath"
"strings"
@@ -64,72 +62,6 @@ func TestValidateConfigSource_TemplateIsDirName(t *testing.T) {
}
}
func TestStartSeedsConfigsBeforeContainerStart(t *testing.T) {
src, err := os.ReadFile("provisioner.go")
if err != nil {
t.Fatalf("read provisioner.go: %v", err)
}
text := string(src)
copyTemplate := strings.Index(text, "p.CopyTemplateToContainer(ctx, resp.ID, cfg.TemplatePath)")
writeFiles := strings.Index(text, "p.WriteFilesToContainer(ctx, resp.ID, cfg.ConfigFiles)")
start := strings.Index(text, "p.cli.ContainerStart(ctx, resp.ID, container.StartOptions{})")
if copyTemplate < 0 || writeFiles < 0 || start < 0 {
t.Fatalf("expected Start to copy template, write config files, and start container")
}
if copyTemplate >= start || writeFiles >= start {
t.Fatalf("config seeding must happen before ContainerStart: copyTemplate=%d writeFiles=%d start=%d", copyTemplate, writeFiles, start)
}
}
func TestBuildTemplateTar_SkipsSymlinks(t *testing.T) {
dir := t.TempDir()
if err := os.WriteFile(filepath.Join(dir, "config.yaml"), []byte("name: safe\n"), 0644); err != nil {
t.Fatalf("write config: %v", err)
}
outside := filepath.Join(t.TempDir(), "secret.txt")
if err := os.WriteFile(outside, []byte("do-not-copy\n"), 0644); err != nil {
t.Fatalf("write outside target: %v", err)
}
if err := os.Symlink(outside, filepath.Join(dir, "linked-secret.txt")); err != nil {
t.Fatalf("create symlink: %v", err)
}
buf, err := buildTemplateTar(dir)
if err != nil {
t.Fatalf("buildTemplateTar: %v", err)
}
names := map[string]string{}
tr := tar.NewReader(buf)
for {
hdr, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
t.Fatalf("read tar: %v", err)
}
body, err := io.ReadAll(tr)
if err != nil {
t.Fatalf("read body for %s: %v", hdr.Name, err)
}
names[hdr.Name] = string(body)
}
if got := names["config.yaml"]; got != "name: safe\n" {
t.Fatalf("config.yaml body = %q, want safe config", got)
}
if _, ok := names["linked-secret.txt"]; ok {
t.Fatalf("symlink entry was copied into template tar: %#v", names)
}
for name, body := range names {
if strings.Contains(body, "do-not-copy") {
t.Fatalf("symlink target leaked through %s: %q", name, body)
}
}
}
// baseHostConfig returns a fresh HostConfig with typical pre-tier binds,
// mimicking what Start() builds before calling ApplyTierConfig.
func baseHostConfig(pluginsPath string) *container.HostConfig {
@@ -513,10 +445,7 @@ func TestWorkspaceConfig_ResetClaudeSessionFieldPresent(t *testing.T) {
// we lose the "one bad publish doesn't break every workspace" guarantee.
func TestSelectImage_PrefersExplicitImage(t *testing.T) {
pinned := "ghcr.io/molecule-ai/workspace-template-claude-code@sha256:3d6761a97ed07d7d33cfc19a8fbab81175d9d9179618d493dbc00c5f7ef076a3"
got, err := selectImage(WorkspaceConfig{Runtime: "claude-code", Image: pinned})
if err != nil {
t.Fatalf("selectImage with cfg.Image=pinned: unexpected error %v", err)
}
got := selectImage(WorkspaceConfig{Runtime: "claude-code", Image: pinned})
if got != pinned {
t.Errorf("selectImage with cfg.Image=pinned: got %q, want %q", got, pinned)
}
@@ -526,46 +455,28 @@ func TestSelectImage_PrefersExplicitImage(t *testing.T) {
// pin lookup deliberately bypassed via WORKSPACE_IMAGE_LOCAL_OVERRIDE).
// selectImage must use the legacy runtime→:latest map.
func TestSelectImage_FallsBackToRuntimeMap(t *testing.T) {
got, err := selectImage(WorkspaceConfig{Runtime: "claude-code", Image: ""})
if err != nil {
t.Fatalf("selectImage with empty Image: unexpected error %v", err)
}
got := selectImage(WorkspaceConfig{Runtime: "claude-code", Image: ""})
want := RuntimeImages["claude-code"]
if got != want {
t.Errorf("selectImage with empty Image: got %q, want %q", got, want)
}
}
// TestSelectImage_NamedUnresolvableRuntimeRejects pins the fail-closed
// contract (RFC internal#483 / security review 4269 /
// feedback_platform_must_hardgate_base_contract): a NAMED runtime with no
// resolvable image must reject with ErrUnresolvableRuntime, NOT silently
// substitute DefaultImage. Pre-fix this returned langgraph — a user asking
// for a removed runtime (crewai/deepagents/gemini-cli) silently got a
// langgraph container. "crewai" is the concrete regression from the
// security finding.
func TestSelectImage_NamedUnresolvableRuntimeRejects(t *testing.T) {
for _, rt := range []string{"no-such-runtime", "crewai", "deepagents", "gemini-cli"} {
got, err := selectImage(WorkspaceConfig{Runtime: rt})
if !errors.Is(err, ErrUnresolvableRuntime) {
t.Errorf("selectImage(%q): got err %v, want ErrUnresolvableRuntime", rt, err)
}
if got != "" {
t.Errorf("selectImage(%q): got image %q, want \"\" on reject", rt, got)
}
if err != nil && !strings.Contains(err.Error(), rt) {
t.Errorf("selectImage(%q): error must name the offending runtime, got %v", rt, err)
}
// TestSelectImage_UnknownRuntimeFallsBackToDefault preserves today's
// behavior — an unrecognized runtime resolves to DefaultImage rather than
// "" so ContainerCreate gets a usable arg and surfaces a meaningful
// "No such image" error if the default itself is missing.
func TestSelectImage_UnknownRuntimeFallsBackToDefault(t *testing.T) {
got := selectImage(WorkspaceConfig{Runtime: "no-such-runtime"})
if got != DefaultImage {
t.Errorf("selectImage with unknown runtime: got %q, want DefaultImage %q", got, DefaultImage)
}
}
// TestSelectImage_EmptyRuntimeFallsBackToDefault: same invariant for the
// no-runtime-supplied path (legacy callers / older handler code).
func TestSelectImage_EmptyRuntimeFallsBackToDefault(t *testing.T) {
got, err := selectImage(WorkspaceConfig{})
if err != nil {
t.Fatalf("selectImage with zero cfg: unexpected error %v (empty runtime is a legitimate DefaultImage path)", err)
}
got := selectImage(WorkspaceConfig{})
if got != DefaultImage {
t.Errorf("selectImage with zero cfg: got %q, want DefaultImage %q", got, DefaultImage)
}
@@ -957,7 +868,7 @@ func TestIsImageNotFoundErr(t *testing.T) {
{"nil", nil, false},
{"moby no such image", fmtErr(`Error response from daemon: No such image: workspace-template:openclaw`), true},
{"no such image lowercase", fmtErr(`error: no such image: foo:bar`), true},
{"image not found", fmtErr(`Error: image "workspace-template:hermes" not found`), true},
{"image not found", fmtErr(`Error: image "workspace-template:crewai" not found`), true},
{"generic not found without image", fmtErr(`container not found`), false},
{"unrelated error", fmtErr(`connection refused`), false},
{"permission denied", fmtErr(`permission denied`), false},
@@ -21,6 +21,9 @@ var knownRuntimes = []string{
"autogen",
"claude-code",
"codex",
"crewai",
"deepagents",
"gemini-cli",
"hermes",
"langgraph",
"openclaw",
@@ -53,8 +53,8 @@ func TestRuntimeImage_AllKnownRuntimes(t *testing.T) {
}
}
// Pin the count so adding a runtime requires explicit test acknowledgement.
if len(knownRuntimes) != 6 {
t.Errorf("knownRuntimes length = %d, want 6 (autogen, claude-code, codex, hermes, langgraph, openclaw)", len(knownRuntimes))
if len(knownRuntimes) != 9 {
t.Errorf("knownRuntimes length = %d, want 9 (autogen, claude-code, codex, crewai, deepagents, gemini-cli, hermes, langgraph, openclaw)", len(knownRuntimes))
}
}
@@ -1,95 +0,0 @@
package provisioner
import (
"archive/tar"
"errors"
"io"
"strings"
"testing"
)
// These tests pin the P0 fix for the fleet-wide list_peers 401 (Hermes and
// every other template): the workspace-server token-injection paths wrote
// /configs/.auth_token (and /configs/.platform_inbound_secret) as root:root
// AFTER the template entrypoint's `chown -R agent:agent /configs` ran, so the
// agent-uid (1000) MCP server (a2a_mcp_server, running via `gosu agent`) hit
// `[Errno 13] Permission denied` reading the bearer → empty bearer → platform
// 401 on /registry/{id}/peers (the literal tool_list_peers path).
//
// The agent uid is 1000:1000, verified from the templates:
// - workspace-configs-templates/claude-code-default/Dockerfile: `useradd -u 1000 ... agent`
// - workspace-configs-templates/hermes/Dockerfile: `useradd -u 1000 ... agent`
// - workspace/entrypoint.sh / claude-code-default/entrypoint.sh: `exec gosu agent` ("uid 1000")
//
// Both tests assert the real artifact (the tar headers Docker's CopyToContainer
// honours for ownership, and the literal shell command the throwaway alpine
// container runs), not a mock that bypasses ownership. They FAIL on pre-fix
// code (no Uid/Gid in tar headers; no chown in the alpine command → root:root)
// and PASS post-fix (agent-owned).
// TestWriteFilesToContainerTar_FilesAreAgentOwned covers the issue #418
// post-start re-injection path (WriteFilesToContainer): the tar it streams
// into /configs via CopyToContainer must carry Uid/Gid = agent (1000) so the
// extracted files land agent-readable, not root:root. This is the path that
// (re)writes BOTH .auth_token and .platform_inbound_secret on a cadence.
func TestWriteFilesToContainerTar_FilesAreAgentOwned(t *testing.T) {
files := map[string][]byte{
".auth_token": []byte("tok-abc123"),
".platform_inbound_secret": []byte("inbound-secret-xyz"),
"nested/dir/file.txt": []byte("data"),
}
buf, err := buildConfigFilesTar(files)
if err != nil {
t.Fatalf("buildConfigFilesTar: %v", err)
}
tr := tar.NewReader(buf)
seen := map[string]bool{}
for {
hdr, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
t.Fatalf("read tar: %v", err)
}
if _, err := io.Copy(io.Discard, tr); err != nil {
t.Fatalf("drain %s: %v", hdr.Name, err)
}
seen[hdr.Name] = true
if hdr.Uid != AgentUID {
t.Fatalf("tar entry %q Uid = %d, want %d (agent) — root-owned injection causes the list_peers 401",
hdr.Name, hdr.Uid, AgentUID)
}
if hdr.Gid != AgentGID {
t.Fatalf("tar entry %q Gid = %d, want %d (agent)", hdr.Name, hdr.Gid, AgentGID)
}
}
for _, want := range []string{".auth_token", ".platform_inbound_secret"} {
if !seen[want] {
t.Fatalf("tar missing %q (seen: %v)", want, seen)
}
}
}
// TestWriteAuthTokenVolumeCmd_ChownsToAgent covers the issue #1877 pre-start
// volume-write path (WriteAuthTokenToVolume): the throwaway alpine container
// writes /vol/.auth_token then chmod 0600 but, pre-fix, never chowns it, so it
// stays root:root (alpine runs the command as root). The literal command must
// chown the file to the agent uid:gid so the agent-uid MCP server can read it.
func TestWriteAuthTokenVolumeCmd_ChownsToAgent(t *testing.T) {
cmd := writeAuthTokenVolumeCmd()
if !strings.Contains(cmd, "chmod 0600 /vol/.auth_token") {
t.Fatalf("alpine cmd lost the 0600 chmod (regression): %q", cmd)
}
wantChown := "chown 1000:1000 /vol/.auth_token"
if !strings.Contains(cmd, wantChown) {
t.Fatalf("alpine cmd = %q, missing %q — without it .auth_token stays root:root "+
"and the agent-uid MCP server gets EACCES → empty bearer → list_peers 401",
cmd, wantChown)
}
}
@@ -14,9 +14,8 @@ func setupMockDB(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -31,9 +31,8 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -17,9 +17,8 @@ func setupHibernationMock(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -18,9 +18,8 @@ func setupLivenessTestDB(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -24,9 +24,8 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
+3 -7
View File
@@ -698,8 +698,8 @@ def _format_channel_content(
# --- MCP Server (JSON-RPC over stdio) ---
def _assert_stdio_is_pipe_compatible(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
"""Assert that stdio fds are pipe/socket/char-device compatible.
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
"""Warn when stdio isn't a pipe — but continue anyway.
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
rejected regular files, PTYs, and sockets with:
@@ -723,10 +723,6 @@ def _assert_stdio_is_pipe_compatible(stdin_fd: int = 0, stdout_fd: int = 1) -> N
)
# Deprecated alias — the canonical name is _assert_stdio_is_pipe_compatible.
_warn_if_stdio_not_pipe = _assert_stdio_is_pipe_compatible
async def main(): # pragma: no cover
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
@@ -983,7 +979,7 @@ def cli_main(transport: str = "stdio", port: int = 9100) -> None: # pragma: no
if transport == "http":
asyncio.run(_run_http_server(port))
else:
_assert_stdio_is_pipe_compatible()
_warn_if_stdio_not_pipe()
asyncio.run(main())
-47
View File
@@ -431,43 +431,6 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
return source_id is None or source_id == ""
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
"""Return True if ``row`` is a self-originated a2a_receive row.
Internal #469: when a workspace delegates to a target that never picks
up the task, ``tool_delegate_task`` calls ``report_activity`` which
POSTs to the platform with source_id set to the *sender's* workspace
UUID (mandated by spoof-defense in workspace-server's a2a_proxy). The
activity API exposes that row under type=a2a_receive, so the inbox
poller re-fetches it. Without this guard the row is surfaced as
kind='peer_agent' with the workspace's own identity as peer_id —
the workspace sees its own delegation-failure echoed back as if a
peer had delegated to it.
The guard mirrors the existing _is_self_notify_row pattern: both
skip rows that would otherwise create spurious inbound signal. The
long-term fix (making the platform write a distinct activity_type
for agent-outbound rows) is tracked separately; this guard stays
because it only excludes rows the agent never wants.
``workspace_id`` must be non-empty an empty-string workspace_id
(single-workspace legacy path) can never match a UUID source_id, so
the predicate is always False there, which is safe.
RFC #2829 PR-2 note: rows with method="delegate_result" are excluded
from the self-echo guard even when source_id matches our workspace_id.
The platform may write a delegation-result row with source_id set to
our workspace_id (e.g. a self-delegation or edge case in the platform's
result-writing path). Such rows must reach the inbox so that
message_from_activity can surface them as peer_agent inbound and the
runtime receives the delegation result. Silently filtering them as
self-echo would break delegation result delivery.
"""
if not workspace_id:
return False
return row.get("source_id") == workspace_id and row.get("method") != "delegate_result"
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage.
@@ -660,16 +623,6 @@ def _poll_once(
# the same self-notify on every iteration.
last_id = str(row.get("id", "")) or last_id
continue
if _is_self_echo_row(row, workspace_id):
# Internal #469: tool_delegate_task writes its own a2a_receive
# row with source_id = this workspace's UUID (spoof-defense).
# The poll fetches it back as kind='peer_agent', making the
# workspace echo its own delegation-failure as an inbound from
# a phantom peer. Skip it — the real delegation-result path
# (delegate_result push) is separate and unaffected. Cursor
# still advances so the next poll doesn't re-seen this row.
last_id = str(row.get("id", "")) or last_id
continue
message = message_from_activity(row)
if not message.activity_id:
continue
+10 -10
View File
@@ -1826,8 +1826,8 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
class TestStdioPipeAssertion:
"""Pin _assert_stdio_is_pipe_compatible — the canonical function name.
_warn_if_stdio_not_pipe is a deprecated alias.
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
the old fatal _assert_stdio_is_pipe_compatible guard.
The universal stdio transport now works with ANY file descriptor
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
@@ -1838,12 +1838,12 @@ class TestStdioPipeAssertion:
def test_pipe_pair_passes_silently(self, caplog):
"""Happy path — both fds are pipes. No warning emitted."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, w = os.pipe()
try:
with caplog.at_level("WARNING"):
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
assert "not a pipe" not in caplog.text
finally:
os.close(r)
@@ -1852,14 +1852,14 @@ class TestStdioPipeAssertion:
def test_regular_file_stdout_warns(self, tmp_path, caplog):
"""Reproducer for runtime#61: stdout redirected to a regular file.
Now emits a warning instead of exiting."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, _w = os.pipe()
regular = tmp_path / "captured.log"
f = open(regular, "wb")
try:
with caplog.at_level("WARNING"):
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=f.fileno())
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
assert "stdout" in caplog.text
assert "not a pipe" in caplog.text
finally:
@@ -1868,7 +1868,7 @@ class TestStdioPipeAssertion:
def test_regular_file_stdin_warns(self, tmp_path, caplog):
"""Symmetric case — stdin redirected from a regular file."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
from a2a_mcp_server import _warn_if_stdio_not_pipe
regular = tmp_path / "input.json"
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
@@ -1876,7 +1876,7 @@ class TestStdioPipeAssertion:
_r, w = os.pipe()
try:
with caplog.at_level("WARNING"):
_assert_stdio_is_pipe_compatible(stdin_fd=f.fileno(), stdout_fd=w)
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
assert "stdin" in caplog.text
assert "not a pipe" in caplog.text
finally:
@@ -1886,13 +1886,13 @@ class TestStdioPipeAssertion:
def test_closed_fd_warns_about_stat_error(self, caplog):
"""If stdio is closed, os.fstat raises OSError. Warning is
skipped silently (can't stat the fd)."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, w = os.pipe()
os.close(w) # Now `w` is a stale fd — fstat will fail.
try:
with caplog.at_level("WARNING"):
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
# No warning emitted because fstat failed before the check
assert "not a pipe" not in caplog.text
finally:
-145
View File
@@ -495,151 +495,6 @@ def test_poll_once_skips_self_notify_rows(state: inbox.InboxState):
assert [m.activity_id for m in queue] == ["act-real"]
# ---------------------------------------------------------------------------
# _is_self_echo_row — internal #469 fix
# ---------------------------------------------------------------------------
#
# When a workspace delegates to a target that never picks up the task,
# tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs
# to the platform with source_id set to the *sender's* workspace UUID
# (spoof-defense). The activity API returns that row under type=a2a_receive
# on the next poll, so message_from_activity sets peer_id = workspace's own
# UUID — the workspace sees its own delegation-failure as an inbound from
# a phantom peer. _is_self_echo_row guards against this.
#
# Internal #469 was live-reproduced on hongming.moleculesai.app 2026-05-16.
def test_is_self_echo_row_true_when_source_id_matches_workspace():
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-abc123") is True
def test_is_self_echo_row_false_when_source_id_differs():
"""A real peer agent (different workspace_id) must NOT be filtered."""
row = {"source_id": "ws-peer", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_source_id_is_none():
"""Canvas-user inbound has no source_id — never an echo."""
row = {"source_id": None, "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_workspace_id_is_empty():
"""Single-workspace legacy path with empty workspace_id cannot
match a UUID source_id predicate is always False, which is safe."""
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "") is False
def test_is_self_echo_row_false_when_source_id_key_absent():
row = {"method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_for_delegate_result():
"""RFC #2829 PR-2 regression pin: a row with source_id matching our
workspace_id but method=delegate_result must NOT be filtered as a
self-echo. The platform may write a delegation-result row with our
workspace_id as source_id; such rows must reach the inbox so the
runtime receives the delegation result. Silently filtering them would
break delegate_result delivery."""
row = {"source_id": "ws-1", "method": "delegate_result"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_poll_once_skips_self_echo_rows(state: inbox.InboxState):
"""Internal #469 regression pin: a row with source_id matching our
workspace_id must NOT land in the inbox queue it is our own
delegation-report echoing back, not a real peer inbound."""
rows = [
{
"id": "act-real-peer",
"source_id": "ws-peer",
"method": "a2a_receive",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "real peer inbound"}]},
"created_at": "2026-04-30T22:00:00Z",
},
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: target timed out",
"request_body": None,
"created_at": "2026-04-30T22:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the real peer inbound counted; self-echo silently dropped.
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-real-peer"]
assert queue[0].peer_id == "ws-peer"
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
"""Cursor must advance past self-echo rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-echo
on every iteration, wasting requests and blocking real inbound."""
state.save_cursor("act-old")
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: timeout",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
assert state.peek(10) == []
# Cursor must move past the skipped row so we don't re-poll it.
assert state.load_cursor() == "act-self-echo"
def test_poll_once_self_echo_does_not_fire_notification(state: inbox.InboxState):
"""The notification callback (channel push to Claude Code etc.)
must not fire for self-echo rows. Same rationale as self-notify:
push-capable hosts would see the echo loop on the push channel."""
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: timeout",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
received: list[dict] = []
inbox.set_notification_callback(received.append)
try:
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox.set_notification_callback(None)
assert received == [], (
"self-echo rows must not surface as MCP notifications — "
"doing so re-creates the echo loop on push-capable hosts"
)
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
"""Cursor must advance past self-notify rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-