Compare commits

..

1 Commits

Author SHA1 Message Date
Molecule AI Dev Engineer A (Kimi) f44f3beb12 fix(plugins): log silently ignored execAsRoot errors during uninstall
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
sop-checklist / review-refire (pull_request_target) Has been cancelled
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 14s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 17s
CI / Python Lint & Test (pull_request) Successful in 4s
E2E API Smoke Test / detect-changes (pull_request) Successful in 11s
CI / Detect changes (pull_request) Successful in 26s
E2E Chat / detect-changes (pull_request) Successful in 13s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 16s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 16s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 6s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 11s
Harness Replays / detect-changes (pull_request) Successful in 1m2s
Check migration collisions / Migration version collision check (pull_request) Successful in 1m49s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 18s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m22s
review-check-tests / review-check.sh regression tests (pull_request) Successful in 5s
sync-providers-yaml / Compare synced providers.yaml against controlplane canonical (pull_request) Successful in 3s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m8s
gate-check-v3 / gate-check (pull_request_target) Successful in 5s
qa-review / approved (pull_request_target) Failing after 4s
security-review / approved (pull_request_target) Failing after 3s
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: comprehensive-testing, local-postgres-e2
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request_target) Successful in 3s
sop-tier-check / tier-check (pull_request_target) Successful in 3s
verify-providers-gen / Regenerate providers artifact and fail on drift (pull_request) Successful in 25s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 32s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Failing after 1m41s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Failing after 1m34s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m19s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m10s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m2s
E2E Chat / E2E Chat (pull_request) Successful in 2s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 21s
Harness Replays / Harness Replays (pull_request) Successful in 1s
CI / Platform (Go) (pull_request) Failing after 37s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 31s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Failing after 1m1s
CI / Canvas (Next.js) (pull_request) Successful in 5m43s
CI / all-required (pull_request) Has been skipped
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Plugin uninstall had two sites where execAsRoot errors were discarded:
- Skill directory removal (plugins_install.go:125) — orphaned skill dirs
  if rm -rf failed silently
- CLAUDE.md marker stripping (plugins_install_pipeline.go:326) — stale
  plugin content left in CLAUDE.md if awk script failed

Both now log the error without failing the overall uninstall (best-effort
 cleanup), giving operators visibility into incomplete uninstalls.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-02 03:54:39 +00:00
86 changed files with 241 additions and 6525 deletions
@@ -466,40 +466,12 @@ def fetch_log(target_url: str) -> str | None:
def grep_fail_markers(log_text: str) -> list[str]:
"""Return up to 5 sample matching lines for any FAIL_PATTERNS hit.
Empty list = clean log.
Heuristic: skip lines where the marker appears inside script source
(e.g. ``echo "::error::..."`` in a ``::group::Run`` block) rather
than actual execution output. The Gitea Actions log prints the raw
script before executing it; ``echo "::error::"`` lines in that
display are false positives.
"""
Empty list = clean log."""
matches: list[str] = []
in_run_group = False
group_depth = 0
for line in log_text.splitlines():
stripped = line.strip()
# Track Gitea Actions group markers so we can skip the
# ``::group::Run`` script-source display blocks.
if stripped.startswith("::group::Run"):
in_run_group = True
group_depth = 1
continue
if stripped == "::endgroup::":
if in_run_group:
in_run_group = False
group_depth = 0
continue
if in_run_group:
continue
for pat in FAIL_PATTERNS:
if pat in line:
# Additional false-positive guard: ``echo "::error::"``
# is script source, not a runtime error emission.
if pat == "::error::":
prefix = line[: line.index(pat)].strip()
if prefix.endswith('echo') or prefix.endswith("echo '") or prefix.endswith('echo "'):
break
# Truncate to keep error output bounded.
matches.append(line.strip()[:240])
break
if len(matches) >= 5:
-83
View File
@@ -364,71 +364,6 @@ def _api_json_optional(url: str, token: str) -> tuple[int, dict | None]:
return exc.code, None
def current_branch_head(env: dict[str, str]) -> str | None:
"""Return the SHA at the tip of the deploy branch (main) per Gitea, or None.
Used to detect a *superseded* deploy job (see `superseded_by`). Fail-safe:
any read error / missing token returns None so the caller treats the job as
NOT superseded and the strict /buildinfo verify still runs. We never let an
unreadable head silently green a deploy.
"""
token = env.get("GITEA_TOKEN", "").strip()
if not token:
return None
host = env.get("GITEA_HOST", "git.moleculesai.app")
repo = env.get("GITHUB_REPOSITORY", "molecule-ai/molecule-core")
# Deploy lane is on: push:main; the branch is always main here, but read it
# from the ref name when present so a future branch rename doesn't break us.
branch = env.get("GITHUB_REF_NAME", "").strip() or "main"
url = f"https://{host}/api/v1/repos/{repo}/branches/{quote(branch, safe='')}"
status, body = _api_json_optional(url, token)
if status != 200 or not isinstance(body, dict):
return None
commit = body.get("commit")
if isinstance(commit, dict):
head = commit.get("id") or commit.get("sha")
if isinstance(head, str) and head.strip():
return head.strip()
return None
def superseded_by(env: dict[str, str]) -> str | None:
"""Return the newer head SHA if THIS deploy job has been superseded, else None.
This workflow runs with no `concurrency:` (intentional — Gitea 1.22.6 cancels
queued runs, which is unacceptable for a prod deploy). When two main pushes
land close together, BOTH deploy-production jobs run. The newer push rolls the
fleet forward first; the OLDER job's strict /buildinfo verify then sees tenants
on the NEWER SHA and false-reds with "$slug is stale" — even though the fleet
is AHEAD, not behind. Git SHAs aren't ordered, so the verify can't tell ahead
from behind on its own (and /buildinfo exposes only git_sha, no build time).
Resolve it at the source of truth for ordering — the branch ref: if main's
current head is a DIFFERENT SHA than the one this job is deploying, a newer
commit has landed and this job is superseded; the newest job's verify is the
authoritative one. We return that head SHA so the caller can log it and exit
success early, skipping the strict-equality verify for this stale job.
Fail-safe: returns None (NOT superseded) when the head can't be read or equals
our SHA, so a genuinely-behind tenant under the LATEST deploy job still fails
the strict verify loudly. This never suppresses a real-stale signal — it only
excuses a job that is no longer the latest from asserting exact equality.
"""
sha = env.get("GITHUB_SHA", "").strip()
if not sha:
return None
head = current_branch_head(env)
if not head:
return None
# SHA lengths can differ (short vs full); compare on the shorter prefix.
n = min(len(head), len(sha))
if head[:n].lower() == sha[:n].lower():
return None
return head
def live_disable_flag(env: dict[str, str]) -> str:
"""Return a live disable value from Gitea variables when readable.
@@ -507,14 +442,6 @@ def main() -> int:
sub.add_parser("plan", help="print production deploy plan as JSON")
sub.add_parser("assert-enabled", help="fail if production deploy is currently disabled")
sub.add_parser("wait-ci", help="block until required CI context is green")
sub.add_parser(
"check-superseded",
help=(
"exit 0 if a newer commit has landed on the deploy branch (this job "
"is superseded; prints the newer head SHA), exit 10 if this job is "
"still the latest"
),
)
rollout_parser = sub.add_parser("rollout", help="execute canary-first scoped production rollout")
rollout_parser.add_argument("--plan", required=True, help="path to prod-auto-deploy plan JSON")
rollout_parser.add_argument("--response", required=True, help="path to write aggregate response JSON")
@@ -530,16 +457,6 @@ def main() -> int:
if args.command == "wait-ci":
wait_for_ci_context(dict(os.environ))
return 0
if args.command == "check-superseded":
newer = superseded_by(dict(os.environ))
if newer:
print(newer)
return 0
# Exit 10 (not 0, not 1): "this job is still the latest". The
# workflow treats only exit 0 as superseded; 10 means proceed to
# the strict verify. A non-zero code here is informational, not a
# failure — the workflow step swallows it.
return 10
if args.command == "rollout":
rollout_from_plan_file(args.plan, args.response, dict(os.environ))
return 0
+4 -10
View File
@@ -13,26 +13,20 @@ set -euo pipefail
OWNER="${REPO%%/*}"
NAME="${REPO##*/}"
API="https://${GITEA_HOST}/api/v1"
# Branch-protection requires the (pull_request_target) context variant.
# The refire path must post the EXACT BP-required name so the gate flips.
CONTEXT="${TEAM}-review / approved (pull_request_target)"
CONTEXT="${TEAM}-review / approved (pull_request)"
TARGET_URL="https://${GITEA_HOST}/${OWNER}/${NAME}/pulls/${PR_NUMBER}"
authfile=$(mktemp)
post_authfile=$(mktemp)
prfile=$(mktemp)
postfile=$(mktemp)
# shellcheck disable=SC2329 # invoked by EXIT trap
cleanup() {
rm -f "$authfile" "$post_authfile" "$prfile" "$postfile"
rm -f "$authfile" "$prfile" "$postfile"
}
trap cleanup EXIT
chmod 600 "$authfile" "$post_authfile"
chmod 600 "$authfile"
printf 'header = "Authorization: token %s"\n' "$GITEA_TOKEN" > "$authfile"
# STATUS_POST_TOKEN is narrow-scoped write:repository for explicit status POST.
# Falls back to GITEA_TOKEN for backward compatibility (e.g. local test).
printf 'header = "Authorization: token %s"\n' "${STATUS_POST_TOKEN:-$GITEA_TOKEN}" > "$post_authfile"
code=$(curl -sS -o "$prfile" -w '%{http_code}' -K "$authfile" \
"${API}/repos/${OWNER}/${NAME}/pulls/${PR_NUMBER}")
@@ -74,7 +68,7 @@ body=$(jq -nc \
'{state:$state, context:$context, description:$description, target_url:$target_url}')
code=$(curl -sS -o "$postfile" -w '%{http_code}' -X POST \
-K "$post_authfile" -H "Content-Type: application/json" \
-K "$authfile" -H "Content-Type: application/json" \
-d "$body" \
"${API}/repos/${OWNER}/${NAME}/statuses/${head_sha}")
if [ "$code" != "200" ] && [ "$code" != "201" ]; then
+1 -87
View File
@@ -895,47 +895,6 @@ def resolve_required_teams(item: dict[str, Any], high_risk: bool) -> list[str]:
return list(item.get("required_teams") or [])
# ---------------------------------------------------------------------------
# CI status validation for testing-class AI acks (internal#760 CTO hardening)
# ---------------------------------------------------------------------------
# Slugs that require CI / all-required green before an AI ack is valid.
_TESTING_CLASS_SLUGS = {"comprehensive-testing", "local-postgres-e2e", "staging-smoke"}
# Human-only carve-out: these items can NEVER be acked by AI, regardless
# of config drift. Any item in this set MUST NOT have ai_ack_eligible.
# migration / schema are future-proofing — not yet in config items, but
# the code guard rejects them proactively (CTO hardening, msg 1388c76f).
_HUMAN_ONLY_SLUGS = {"root-cause", "no-backwards-compat", "migration", "schema"}
def get_ci_status(client: GiteaClient, owner: str, repo: str, sha: str) -> str:
"""Return the state of CI / all-required (pull_request) for `sha`.
Looks through the commit statuses and returns the state string
("success", "failure", "pending", "error") or "missing" if the
context is not found. This prevents an AI agent from attesting
"tests pass" independently of the actual CI run.
"""
code, data = client._req( # noqa: SLF001
"GET", f"/repos/{owner}/{repo}/statuses/{sha}"
)
if code != 200:
return "unknown"
if not data or not isinstance(data, list):
return "missing"
# Gitea returns statuses newest-first. Find the latest for our context.
for status in data:
if status.get("context") == "CI / all-required (pull_request)":
return status.get("state", "unknown")
return "missing"
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser()
p.add_argument("--owner", required=True)
@@ -1029,9 +988,6 @@ def main(argv: list[str] | None = None) -> int:
# one membership lookup per team.
team_member_cache: dict[tuple[str, int], bool | None] = {}
# Pre-resolve the ai-sop-ack team id once (None if the team does not exist).
ai_sop_ack_team_id = client.resolve_team_id(args.owner, "ai-sop-ack")
def probe(slug: str, users: list[str]) -> list[str]:
# `slug` may be either an items-key (compute_ack_state caller) OR
# an n/a-gate key (compute_na_state caller). Previously this hard
@@ -1086,18 +1042,14 @@ def main(argv: list[str] | None = None) -> int:
file=sys.stderr,
)
approved: list[str] = []
rejected_ai_ineligible: list[str] = []
rejected_ci_not_green: list[str] = []
for u in users:
# 1) Human required_teams membership check
in_human_team = False
for tid in team_ids:
cache_key = (u, tid)
if cache_key not in team_member_cache:
team_member_cache[cache_key] = client.is_team_member(tid, u)
result = team_member_cache[cache_key]
if result is True:
in_human_team = True
approved.append(u)
break
if result is None:
print(
@@ -1107,44 +1059,6 @@ def main(argv: list[str] | None = None) -> int:
)
# Treat as not-in-team for this user/team pair; loop
# may still find membership in another team.
if in_human_team:
approved.append(u)
continue
# 2) AI-sop-ack team membership check (only for items that allow it).
if slug in items_by_slug:
item = items_by_slug[slug]
# Defensive: human-only carve-out is enforced in code, not just
# config. Even if ai_ack_eligible were mistakenly added to a
# migration/schema item, the AI path is rejected here.
if slug in _HUMAN_ONLY_SLUGS:
rejected_ai_ineligible.append(u)
continue
if item.get("ai_ack_eligible") and ai_sop_ack_team_id is not None:
cache_key = (u, ai_sop_ack_team_id)
if cache_key not in team_member_cache:
team_member_cache[cache_key] = client.is_team_member(
ai_sop_ack_team_id, u
)
result = team_member_cache[cache_key]
if result is True:
# 2a) Testing-class items require real CI artifact evidence.
if slug in _TESTING_CLASS_SLUGS:
ci_state = get_ci_status(
client, args.owner, args.repo, head_sha
)
if ci_state != "success":
print(
f"::warning::AI ack for {slug} rejected: "
f"CI / all-required is {ci_state}, not success",
file=sys.stderr,
)
rejected_ci_not_green.append(u)
continue
approved.append(u)
continue
# If we get here, user is not approved for this slug.
rejected_ai_ineligible.append(u)
return approved
ack_state = compute_ack_state(
@@ -21,7 +21,6 @@ Scenarios:
T16_comments_generic_approval — reviews empty; comments have "APPROVED" by team member → exit 0
T17_comments_no_approval — reviews empty; comments have no approval keywords → exit 1
T18_review_wrong_team_comment_right_team — review candidate 404s, comment candidate passes
T19_ai_sop_ack_approved — ai-sop-ack member APPROVED review → team probe 404 → exit 1
Usage:
FIXTURE_STATE_DIR=/tmp/x python3 _review_check_fixture.py 8080
@@ -117,12 +116,6 @@ class Handler(http.server.BaseHTTPRequestHandler):
{"state": "CHANGES_REQUESTED", "dismissed": False, "user": {"login": "bob"}, "commit_id": "abc1234"},
{"state": "APPROVED", "dismissed": False, "user": {"login": "core-devops"}, "commit_id": "abc1234"},
])
if sc == "T19_ai_sop_ack_approved":
# ai-sop-ack member submitted APPROVED review — must NOT count
# toward qa-review (team_id=20) or security-review (team_id=21).
return self._json(200, [
{"state": "APPROVED", "dismissed": False, "user": {"login": "ai-reviewer"}, "commit_id": "abc1234"},
])
# Default: one non-author APPROVED
return self._json(200, [
{"state": "APPROVED", "dismissed": False, "user": {"login": "core-devops"}, "commit_id": "abc1234"},
@@ -164,9 +157,6 @@ class Handler(http.server.BaseHTTPRequestHandler):
return self._empty(403)
if sc == "T18_review_wrong_team_comment_right_team" and login == "core-devops":
return self._empty(404)
if sc == "T19_ai_sop_ack_approved" and login == "ai-reviewer":
# ai-sop-ack member is NOT in qa (20) or security (21).
return self._empty(404)
# T7_team_member: member
return self._empty(204)
@@ -1,198 +0,0 @@
"""Live-fire regression test for #2159 — gate auto-fire runtime verification.
Static tests (test_gate_review_auto_fire.py) validate that the workflow YAML
is structurally correct. This test validates the *runtime* path: submitting an
APPROVED review to a PR whose head contains the current gate workflows causes
Gitea Actions to queue the qa-review + security-review workflows and POST the
branch-protection-required (pull_request_target) contexts within a reasonable
window.
Skipped when Gitea API credentials are not available. Intended for:
- manual developer verification
- CI jobs provisioned with a service-account token
Environment:
GITEA_HOST — default: git.moleculesai.app
GITEA_TOKEN — token with read:repository + write:issues (for review POST)
REPO — default: molecule-ai/molecule-core
LIVEFIRE_PR_NUMBER — optional; if omitted the test tries to find a
suitable open PR automatically, or skips.
LIVEFIRE_TIMEOUT_SEC — default: 120
"""
import base64
import json
import os
import time
import urllib.error
import urllib.request
from pathlib import Path
import pytest
import yaml
GITEA_HOST = os.environ.get("GITEA_HOST", "git.moleculesai.app")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
REPO = os.environ.get("REPO", "molecule-ai/molecule-core")
LIVEFIRE_PR_NUMBER = os.environ.get("LIVEFIRE_PR_NUMBER", "")
LIVEFIRE_TIMEOUT_SEC = int(os.environ.get("LIVEFIRE_TIMEOUT_SEC", "120"))
REQUIRED_CONTEXTS = [
"qa-review / approved (pull_request_target)",
"security-review / approved (pull_request_target)",
]
skip_no_token = pytest.mark.skipif(
not GITEA_TOKEN,
reason="GITEA_TOKEN not set — live-fire test requires API credentials",
)
def _api(method: str, path: str, body: dict | None = None) -> tuple[int, dict]:
url = f"https://{GITEA_HOST}/api/v1{path}"
headers = {
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
}
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
code = resp.status
except urllib.error.HTTPError as exc:
raw = exc.read()
code = exc.code
payload = json.loads(raw) if raw else {}
return code, payload
def _get_pr(number: int) -> dict:
code, pr = _api("GET", f"/repos/{REPO}/pulls/{number}")
if code != 200:
pytest.fail(f"GET /pulls/{number} returned HTTP {code}: {pr}")
return pr
def _list_open_prs() -> list[dict]:
code, prs = _api("GET", f"/repos/{REPO}/pulls?state=open&limit=50")
if code != 200:
pytest.fail(f"GET /pulls?state=open returned HTTP {code}: {prs}")
return prs
def _pr_has_trigger_in_head(pr: dict) -> bool:
"""Return True if the PR head contains pull_request_review in both workflows."""
head_sha = pr["head"]["sha"]
for wf_name in ("qa-review.yml", "security-review.yml"):
path = f"/repos/{REPO}/contents/.gitea/workflows/{wf_name}?ref={head_sha}"
code, payload = _api("GET", path)
if code != 200:
return False
raw = base64.b64decode(payload.get("content", "")).decode("utf-8")
wf = yaml.safe_load(raw)
on = wf.get(True) or wf.get("on") or {}
if isinstance(on, str):
if on != "pull_request_review":
return False
elif "pull_request_review" not in on:
return False
return True
def _find_suitable_pr() -> dict:
if LIVEFIRE_PR_NUMBER:
pr = _get_pr(int(LIVEFIRE_PR_NUMBER))
if pr.get("state") != "open":
pytest.skip(f"PR {LIVEFIRE_PR_NUMBER} is not open")
return pr
prs = _list_open_prs()
for pr in prs:
if _pr_has_trigger_in_head(pr):
return pr
pytest.skip("No open PR found whose head contains the pull_request_review trigger")
def _submit_approved_review(pr_number: int) -> dict:
code, review = _api(
"POST",
f"/repos/{REPO}/pulls/{pr_number}/reviews",
{"body": "Live-fire test APPROVED review", "event": "APPROVED"},
)
# 200 = created, 422 = review already exists (idempotent enough for our purposes)
if code not in (200, 201, 422):
pytest.fail(f"POST /pulls/{pr_number}/reviews returned HTTP {code}")
return review
def _get_status_updated_at(sha: str) -> dict[str, str]:
"""Return mapping context -> updated_at for required contexts on this SHA."""
code, statuses = _api("GET", f"/repos/{REPO}/statuses/{sha}?limit=100")
if code != 200:
return {}
result: dict[str, str] = {}
for st in statuses:
ctx = st.get("context", "")
if ctx in REQUIRED_CONTEXTS:
result[ctx] = st.get("updated_at", st.get("created_at", ""))
return result
def _poll_fresh_statuses(
sha: str,
prior_updated_at: dict[str, str],
timeout_sec: int = LIVEFIRE_TIMEOUT_SEC,
) -> dict[str, str]:
"""Poll until required contexts appear with updated_at fresher than prior."""
deadline = time.monotonic() + timeout_sec
found: dict[str, str] = {}
while time.monotonic() < deadline:
code, statuses = _api("GET", f"/repos/{REPO}/statuses/{sha}?limit=100")
if code == 200:
for st in statuses:
ctx = st.get("context", "")
if ctx in REQUIRED_CONTEXTS:
updated_at = st.get("updated_at", st.get("created_at", ""))
# Fresh if the context was absent before, OR its timestamp changed.
if ctx not in prior_updated_at or updated_at != prior_updated_at[ctx]:
found[ctx] = st.get("state", st.get("status", ""))
if all(ctx in found for ctx in REQUIRED_CONTEXTS):
return found
time.sleep(5)
return found
@skip_no_token
class TestGateAutoFireLive:
def test_auto_fire_posts_required_contexts(self):
"""Submit APPROVED review; assert BP-required contexts appear fresh within timeout."""
pr = _find_suitable_pr()
pr_number = pr["number"]
head_sha = pr["head"]["sha"]
# Capture pre-existing status timestamps so we can prove FRESH contexts
# were posted after the review submission (not stale from a prior run).
prior_updated_at = _get_status_updated_at(head_sha)
_submit_approved_review(pr_number)
found = _poll_fresh_statuses(head_sha, prior_updated_at)
missing = [ctx for ctx in REQUIRED_CONTEXTS if ctx not in found]
if missing:
pytest.fail(
f"After {LIVEFIRE_TIMEOUT_SEC}s, fresh contexts still missing: {missing}. "
f"Found: {found}. Prior timestamps: {prior_updated_at}. "
f"PR #{pr_number} head={head_sha}. "
f"This indicates the pull_request_review trigger did not fire at runtime."
)
# The contexts appeared fresh — that's the proof of auto-fire.
# We do NOT assert success vs failure; the evaluator decides that.
# The point of #2159 is that the workflows QUEUE and POST at all.
for ctx, state in found.items():
assert state in ("pending", "success", "failure"), (
f"Unexpected state {state!r} for {ctx}"
)
@@ -1,168 +0,0 @@
"""Regression test #765 — gate auto-fire on real qa/security APPROVED review.
Validates the structural configuration of qa-review.yml and security-review.yml
so that a real team-member APPROVED review fires the workflow and POSTs the
exact branch-protection-required context name. This is the test #2020's
stale-context failure would have caught.
"""
from pathlib import Path
import yaml
ROOT = Path(__file__).resolve().parents[2]
def load_workflow(name: str) -> dict:
with (ROOT / "workflows" / name).open() as f:
return yaml.safe_load(f)
def _job_guard_string(workflow: dict) -> str:
"""Return the raw job-level `if:` string for the single job."""
jobs = workflow["jobs"]
# Both qa-review and security-review have exactly one job named "approved".
job = jobs["approved"]
return str(job.get("if", ""))
def _post_step(workflow: dict) -> dict:
"""Return the explicit POST /statuses step from the job steps list."""
jobs = workflow["jobs"]
steps = jobs["approved"]["steps"]
for step in steps:
name = step.get("name", "")
if "Post required status context" in name:
return step
raise AssertionError("No explicit POST status step found")
class TestQaReviewDirectTrigger:
def test_trigger_is_pull_request_review_submitted(self):
wf = load_workflow("qa-review.yml")
# PyYAML parses bare 'on' as boolean True.
on = wf[True]
assert "pull_request_review" in on, (
"qa-review must trigger on pull_request_review"
)
types = on["pull_request_review"].get("types", [])
assert "submitted" in types, (
"pull_request_review must include 'submitted' type"
)
def test_job_guard_requires_approved_state(self):
wf = load_workflow("qa-review.yml")
guard = _job_guard_string(wf)
assert "github.event.review.state == 'APPROVED'" in guard, (
"job guard must check review.state for 'APPROVED'"
)
assert "github.event.review.state == 'approved'" in guard, (
"job guard must check review.state for 'approved' (case fallback per #2135)"
)
def test_post_step_uses_status_post_token(self):
wf = load_workflow("qa-review.yml")
post = _post_step(wf)
env = post.get("env", {})
assert env.get("GITEA_TOKEN") == "${{ secrets.STATUS_POST_TOKEN }}", (
"POST step must use STATUS_POST_TOKEN for write-scoped status POST"
)
def test_post_step_context_name_exact(self):
"""The context POSTed must byte-match the branch-protection requirement."""
wf = load_workflow("qa-review.yml")
post = _post_step(wf)
run = post.get("run", "")
assert '"qa-review / approved (pull_request_target)"' in run, (
"POST step must emit exact BP-required context name"
)
class TestSecurityReviewDirectTrigger:
def test_trigger_is_pull_request_review_submitted(self):
wf = load_workflow("security-review.yml")
# PyYAML parses bare 'on' as boolean True.
on = wf[True]
assert "pull_request_review" in on, (
"security-review must trigger on pull_request_review"
)
types = on["pull_request_review"].get("types", [])
assert "submitted" in types, (
"pull_request_review must include 'submitted' type"
)
def test_job_guard_requires_approved_state(self):
wf = load_workflow("security-review.yml")
guard = _job_guard_string(wf)
assert "github.event.review.state == 'APPROVED'" in guard, (
"job guard must check review.state for 'APPROVED'"
)
assert "github.event.review.state == 'approved'" in guard, (
"job guard must check review.state for 'approved' (case fallback per #2135)"
)
def test_post_step_uses_status_post_token(self):
wf = load_workflow("security-review.yml")
post = _post_step(wf)
env = post.get("env", {})
assert env.get("GITEA_TOKEN") == "${{ secrets.STATUS_POST_TOKEN }}", (
"POST step must use STATUS_POST_TOKEN for write-scoped status POST"
)
def test_post_step_context_name_exact(self):
"""The context POSTed must byte-match the branch-protection requirement."""
wf = load_workflow("security-review.yml")
post = _post_step(wf)
run = post.get("run", "")
assert '"security-review / approved (pull_request_target)"' in run, (
"POST step must emit exact BP-required context name"
)
class TestRefireScriptContextName:
"""review-refire-status.sh must emit the BP-required (pull_request_target) context."""
def test_refire_script_context_is_pull_request_target(self):
script = ROOT / "scripts" / "review-refire-status.sh"
content = script.read_text()
assert 'CONTEXT="${TEAM}-review / approved (pull_request_target)"' in content, (
"refire script CONTEXT must be the exact BP-required (pull_request_target) variant"
)
assert 'approved (pull_request)"' not in content, (
"refire script must NOT post bare (pull_request) context"
)
class TestRefireTokenSeparation:
"""The /qa-recheck + /security-recheck backstop must also use STATUS_POST_TOKEN."""
def _refire_step(self, workflow_name: str, step_name_keyword: str) -> dict:
wf = load_workflow(workflow_name)
jobs = wf["jobs"]
steps = jobs["review-refire"]["steps"]
for step in steps:
name = step.get("name", "")
if step_name_keyword in name:
return step
raise AssertionError(f"No refire step matching {step_name_keyword!r}")
def test_qa_refire_uses_status_post_token(self):
step = self._refire_step("sop-checklist.yml", "Refire qa-review")
env = step.get("env", {})
assert env.get("STATUS_POST_TOKEN") == "${{ secrets.STATUS_POST_TOKEN }}", (
"qa refire must receive STATUS_POST_TOKEN env var"
)
# Evaluator stays on read token
assert "SOP_TIER_CHECK_TOKEN" in env.get("GITEA_TOKEN", "") or "GITHUB_TOKEN" in env.get("GITEA_TOKEN", ""), (
"qa refire evaluator must stay on read-scoped token"
)
def test_security_refire_uses_status_post_token(self):
step = self._refire_step("sop-checklist.yml", "Refire security-review")
env = step.get("env", {})
assert env.get("STATUS_POST_TOKEN") == "${{ secrets.STATUS_POST_TOKEN }}", (
"security refire must receive STATUS_POST_TOKEN env var"
)
assert "SOP_TIER_CHECK_TOKEN" in env.get("GITEA_TOKEN", "") or "GITHUB_TOKEN" in env.get("GITEA_TOKEN", ""), (
"security refire evaluator must stay on read-scoped token"
)
@@ -1,145 +0,0 @@
"""Stale-head diagnostic test for #2159.
Deterministically reports whether a PR's HEAD contains the pull_request_review
trigger in qa-review.yml and security-review.yml. If the trigger is absent,
auto-fire on APPROVED review is impossible for that PR.
This is used as a self-diagnostic for future stale-PR situations (PRs opened
before #2157 merged, or branches cut from old bases).
Environment:
GITEA_HOST — default: git.moleculesai.app
GITEA_TOKEN — token with read:repository scope (optional; falls back to local files)
REPO — default: molecule-ai/molecule-core
PR_NUMBER — required when running against a real PR
"""
import base64
import json
import os
import urllib.error
import urllib.request
from pathlib import Path
import pytest
import yaml
GITEA_HOST = os.environ.get("GITEA_HOST", "git.moleculesai.app")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
REPO = os.environ.get("REPO", "molecule-ai/molecule-core")
PR_NUMBER = os.environ.get("PR_NUMBER", "")
ROOT = Path(__file__).resolve().parents[2]
def _api(method: str, path: str) -> tuple[int, dict]:
url = f"https://{GITEA_HOST}/api/v1{path}"
headers = {"Authorization": f"token {GITEA_TOKEN}"}
req = urllib.request.Request(url, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as exc:
body = exc.read()
return exc.code, json.loads(body) if body else {}
def _fetch_workflow_from_ref(workflow_name: str, ref: str) -> dict:
path = f"/repos/{REPO}/contents/.gitea/workflows/{workflow_name}?ref={ref}"
code, payload = _api("GET", path)
if code != 200:
pytest.fail(
f"GET {path} returned HTTP {code}: {payload}. "
f"Cannot determine whether PR head contains the trigger."
)
raw = base64.b64decode(payload.get("content", "")).decode("utf-8")
return yaml.safe_load(raw)
def _fetch_workflow_local(workflow_name: str) -> dict:
p = ROOT / "workflows" / workflow_name
if not p.exists():
pytest.fail(f"Local workflow file not found: {p}")
return yaml.safe_load(p.read_text())
def _has_pull_request_review_trigger(wf: dict) -> bool:
on = wf.get(True) or wf.get("on") or {}
if isinstance(on, list):
return "pull_request_review" in on
if isinstance(on, dict):
return "pull_request_review" in on
if isinstance(on, str):
return on == "pull_request_review"
return False
def _diagnose_pr(pr_number: int) -> dict[str, bool]:
code, pr = _api("GET", f"/repos/{REPO}/pulls/{pr_number}")
if code != 200:
pytest.fail(f"GET /pulls/{pr_number} returned HTTP {code}: {pr}")
head_ref = pr["head"]["ref"]
head_sha = pr["head"]["sha"]
results: dict[str, bool] = {}
for wf_name in ("qa-review.yml", "security-review.yml"):
wf = _fetch_workflow_from_ref(wf_name, head_sha)
results[wf_name] = _has_pull_request_review_trigger(wf)
return {
"pr_number": pr_number,
"head_ref": head_ref,
"head_sha": head_sha,
"triggers": results,
"auto_fire_possible": all(results.values()),
}
def _diagnose_local() -> dict[str, bool]:
results: dict[str, bool] = {}
for wf_name in ("qa-review.yml", "security-review.yml"):
wf = _fetch_workflow_local(wf_name)
results[wf_name] = _has_pull_request_review_trigger(wf)
return {
"pr_number": None,
"head_ref": "local-checkout",
"head_sha": None,
"triggers": results,
"auto_fire_possible": all(results.values()),
}
class TestStaleHeadDiagnostic:
"""Test deterministically reports 'auto-fire impossible for this PR' when
the PR head lacks the pull_request_review trigger.
"""
def test_local_checkout_has_pull_request_review_trigger(self):
"""Local files (the ones in this checkout) must contain the trigger.
This is the baseline: if the checkout itself is stale, every PR cut
from it will also be stale.
"""
diag = _diagnose_local()
missing = [n for n, ok in diag["triggers"].items() if not ok]
if missing:
pytest.fail(
f"Local checkout is missing pull_request_review trigger in: {missing}. "
f"This branch cannot produce PRs that auto-fire."
)
@pytest.mark.skipif(not GITEA_TOKEN, reason="GITEA_TOKEN not set")
@pytest.mark.skipif(not PR_NUMBER, reason="PR_NUMBER not set")
def test_pr_head_has_pull_request_review_trigger(self):
"""When PR_NUMBER is given, assert the PR head contains the trigger."""
diag = _diagnose_pr(int(PR_NUMBER))
if not diag["auto_fire_possible"]:
missing = [n for n, ok in diag["triggers"].items() if not ok]
pytest.fail(
f"Auto-fire impossible for PR #{diag['pr_number']}. "
f"Head ref={diag['head_ref']} sha={diag['head_sha']}. "
f"Missing trigger in: {missing}. "
f"This PR needs /qa-recheck + /security-recheck fallback, or a rebase onto current main."
)
@@ -486,88 +486,3 @@ def test_scoped_rollout_dry_run_does_not_assert_coverage():
sleep=lambda _s: None,
)
assert aggregate["ok"] is True
# --- Superseded-deploy guard (false-stale fix) -----------------------------
#
# Scenario this fixes: no `concurrency:` on the prod-deploy workflow means two
# close main pushes run BOTH deploy-production jobs. eb31bcf (Fix A) and 286338
# (Fix C) merge back-to-back; the 286338 job rolls the fleet to staging-2863380
# first; the OLDER eb31bcf job's strict verify then sees tenants on 2863380 and
# false-reds "stale" though the fleet is AHEAD. superseded_by detects that main's
# head is no longer eb31bcf and lets the older job succeed without weakening the
# behind-tenant signal for whichever job IS the latest.
def test_superseded_by_returns_newer_head_when_main_moved_ahead(monkeypatch):
# eb31bcf job: main head is now 2863380 -> superseded, return the newer head.
monkeypatch.setattr(prod, "current_branch_head", lambda _env: "2863380fullhash")
newer = prod.superseded_by({"GITHUB_SHA": "eb31bcffullhash"})
assert newer == "2863380fullhash"
def test_superseded_by_none_when_this_job_is_still_head(monkeypatch):
# 2863380 job (the latest): head == our SHA -> NOT superseded -> strict verify
# runs, so a genuinely-behind tenant still fails loudly.
monkeypatch.setattr(prod, "current_branch_head", lambda _env: "2863380fullhash")
assert prod.superseded_by({"GITHUB_SHA": "2863380fullhash"}) is None
def test_superseded_by_matches_on_short_vs_full_sha_prefix(monkeypatch):
# GITHUB_SHA is full; Gitea may return a different-length id. Equal prefixes
# must NOT count as superseded (avoid false-skipping the real latest job).
monkeypatch.setattr(prod, "current_branch_head", lambda _env: "2863380")
assert prod.superseded_by({"GITHUB_SHA": "2863380fullhash"}) is None
monkeypatch.setattr(prod, "current_branch_head", lambda _env: "2863380FULLHASH")
assert prod.superseded_by({"GITHUB_SHA": "2863380fullhash"}) is None
def test_superseded_by_fail_safe_returns_none_when_head_unreadable(monkeypatch):
# Fail-safe: unreadable head (no token / API error) must NOT be treated as
# superseded, so the strict verify still runs and never silently greens.
monkeypatch.setattr(prod, "current_branch_head", lambda _env: None)
assert prod.superseded_by({"GITHUB_SHA": "eb31bcffullhash"}) is None
def test_superseded_by_none_without_github_sha(monkeypatch):
monkeypatch.setattr(prod, "current_branch_head", lambda _env: "2863380fullhash")
assert prod.superseded_by({}) is None
def test_current_branch_head_parses_gitea_branch_commit_id(monkeypatch):
captured = {}
def fake_optional(url, _token):
captured["url"] = url
return 200, {"name": "main", "commit": {"id": "2863380fullhash"}}
monkeypatch.setattr(prod, "_api_json_optional", fake_optional)
head = prod.current_branch_head(
{"GITEA_TOKEN": "secret", "GITHUB_REPOSITORY": "molecule-ai/molecule-core"}
)
assert head == "2863380fullhash"
assert captured["url"].endswith("/repos/molecule-ai/molecule-core/branches/main")
def test_current_branch_head_uses_ref_name_branch(monkeypatch):
captured = {}
def fake_optional(url, _token):
captured["url"] = url
return 200, {"commit": {"sha": "deadbeef"}}
monkeypatch.setattr(prod, "_api_json_optional", fake_optional)
head = prod.current_branch_head(
{"GITEA_TOKEN": "secret", "GITHUB_REF_NAME": "release"}
)
assert head == "deadbeef"
assert captured["url"].endswith("/branches/release")
def test_current_branch_head_none_without_token():
assert prod.current_branch_head({}) is None
def test_current_branch_head_none_on_non_200(monkeypatch):
monkeypatch.setattr(prod, "_api_json_optional", lambda _u, _t: (500, None))
assert prod.current_branch_head({"GITEA_TOKEN": "secret"}) is None
+2 -23
View File
@@ -205,8 +205,6 @@ chmod +x "$FIXTURE_DIR/bin/curl"
# Helper: run the script with fixture environment
run_review_check() {
local scenario="$1"
local team="${2:-qa}"
local team_id="${3:-20}"
echo "$scenario" >"$FIX_STATE_DIR/scenario"
local out
set +e
@@ -217,8 +215,8 @@ run_review_check() {
REPO="molecule-ai/molecule-core" \
PR_NUMBER="999" \
DEFAULT_BRANCH="main" \
TEAM="$team" \
TEAM_ID="$team_id" \
TEAM="qa" \
TEAM_ID="20" \
REVIEW_CHECK_DEBUG="0" \
REVIEW_CHECK_STRICT="0" \
bash "$SCRIPT" 2>&1
@@ -374,25 +372,6 @@ assert_eq "T18 exit code 0 (comment approval still considered)" "0" "$T18_RC"
assert_contains "T18 comment candidate notice" "comment-based approval" "$T18_OUT"
assert_contains "T18 comment approver accepted" "APPROVED by core-qa-agent" "$T18_OUT"
# T19 — ai-sop-ack member APPROVED review must NOT count toward qa-review
# or security-review (R1 hardening refinement, msg 1388c76f).
echo
echo "== T19 ai-sop-ack APPROVED review excluded from qa-review gate =="
T19_OUT=$(run_review_check "T19_ai_sop_ack_approved" "qa" "20")
T19_RC=$(cat "$FIX_STATE_DIR/last_rc")
assert_eq "T19 exit code 1 (ai-sop-ack not in qa team)" "1" "$T19_RC"
assert_contains "T19 ai-reviewer excluded from qa" "candidates: ai-reviewer" "$T19_OUT"
assert_contains "T19 none are in qa team" "none are in team" "$T19_OUT"
# T20 — same ai-sop-ack member must also be excluded from security-review gate.
echo
echo "== T20 ai-sop-ack APPROVED review excluded from security-review gate =="
T20_OUT=$(run_review_check "T19_ai_sop_ack_approved" "security" "21")
T20_RC=$(cat "$FIX_STATE_DIR/last_rc")
assert_eq "T20 exit code 1 (ai-sop-ack not in security team)" "1" "$T20_RC"
assert_contains "T20 ai-reviewer excluded from security" "candidates: ai-reviewer" "$T20_OUT"
assert_contains "T20 none are in security team" "none are in team" "$T20_OUT"
echo
echo "------"
echo "PASS=$PASS FAIL=$FAIL"
-296
View File
@@ -1003,299 +1003,3 @@ class TestComputeNaStateAcceptsGateNotInItems(unittest.TestCase):
comments, "alice", na_gates, lambda *_: ["alice"]
)
self.assertFalse(na_state["security-review"]["declared"])
# ---------------------------------------------------------------------------
# internal#760 ceremony — ai-sop-ack team + ai_ack_eligible per-item flag
# ---------------------------------------------------------------------------
class TestAIAckEligibleConfig(unittest.TestCase):
"""CTO-controlled allowlist (msg 1388c76f):
ai_ack_eligible: comprehensive-testing, local-postgres-e2e, staging-smoke,
five-axis-review, memory-consulted
human-only: root-cause, no-backwards-compat
"""
def test_ai_ack_eligible_items(self):
cfg = sop.load_config(CONFIG_PATH)
items_by_slug = {it["slug"]: it for it in cfg["items"]}
eligible = {
"comprehensive-testing",
"local-postgres-e2e",
"staging-smoke",
"five-axis-review",
"memory-consulted",
}
for slug in eligible:
self.assertTrue(
items_by_slug[slug].get("ai_ack_eligible"),
f"{slug} must be ai_ack_eligible",
)
def test_human_only_items(self):
cfg = sop.load_config(CONFIG_PATH)
items_by_slug = {it["slug"]: it for it in cfg["items"]}
human_only = {"root-cause", "no-backwards-compat"}
for slug in human_only:
self.assertFalse(
items_by_slug[slug].get("ai_ack_eligible", False),
f"{slug} must NOT be ai_ack_eligible (human-only)",
)
def test_testing_class_slugs_constant(self):
"""_TESTING_CLASS_SLUGS must match the three testing items."""
self.assertEqual(
sop._TESTING_CLASS_SLUGS,
{"comprehensive-testing", "local-postgres-e2e", "staging-smoke"},
)
def test_human_only_slugs_constant(self):
"""_HUMAN_ONLY_SLUGS encodes the migration/schema carve-out.
If this set changes, the CTO must approve the widening.
"""
self.assertEqual(
sop._HUMAN_ONLY_SLUGS,
{"root-cause", "no-backwards-compat", "migration", "schema"},
)
def test_human_only_invariant_enforced_in_code_and_config(self):
"""Every config-present slug in _HUMAN_ONLY_SLUGS must be human-only.
This test fails if a migration/schema-class item accidentally
acquires ai_ack_eligible via config drift. migration/schema are
future-proofing slugs not yet in the live config; they are checked
by the production probe closure but skipped here.
"""
cfg = sop.load_config(CONFIG_PATH)
items_by_slug = {it["slug"]: it for it in cfg["items"]}
for slug in sop._HUMAN_ONLY_SLUGS:
if slug not in items_by_slug:
# Future-proofing slug (e.g. migration, schema) — not yet
# in config, but the code guard still rejects AI acks.
continue
self.assertFalse(
items_by_slug[slug].get("ai_ack_eligible", False),
f"{slug} is in _HUMAN_ONLY_SLUGS and must NEVER be ai_ack_eligible",
)
class TestAIAckEligibilityProbe(unittest.TestCase):
"""The probe closure in main() delegates to compute_ack_state.
We simulate the AI-ack path by injecting a probe that behaves like
the production probe (human team first, then ai-sop-ack fallback).
"""
def setUp(self):
self.items = _items_by_slug()
self.aliases = _numeric_aliases()
def _probe_human_then_ai(self, human_users, ai_users):
"""Return users in human_users immediately; users in ai_users only
if the item is ai_ack_eligible."""
def probe(slug, users):
item = self.items.get(slug, {})
approved = []
for u in users:
if u in human_users:
approved.append(u)
elif u in ai_users and item.get("ai_ack_eligible"):
approved.append(u)
return approved
return probe
def test_ai_ack_passes_for_eligible_item(self):
comments = [_comment("ai-bot", "/sop-ack five-axis-review")]
probe = self._probe_human_then_ai(human_users=set(), ai_users={"ai-bot"})
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["five-axis-review"]["ackers"], ["ai-bot"])
def test_ai_ack_rejected_for_human_only_item(self):
comments = [_comment("ai-bot", "/sop-ack root-cause")]
probe = self._probe_human_then_ai(human_users=set(), ai_users={"ai-bot"})
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["root-cause"]["ackers"], [])
self.assertIn("ai-bot", state["root-cause"]["rejected"]["not_in_team"])
def test_human_ack_still_works_for_ai_eligible_item(self):
comments = [_comment("bob", "/sop-ack comprehensive-testing")]
probe = self._probe_human_then_ai(human_users={"bob"}, ai_users=set())
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
def test_ai_ack_rejected_for_testing_item_when_ci_red(self):
# Simulate the production probe that checks CI status for testing items.
# When CI is not green, ai-sop-ack member is rejected.
def probe(slug, users):
item = self.items.get(slug, {})
approved = []
for u in users:
if u == "ai-bot" and item.get("ai_ack_eligible"):
# Testing items require CI green; simulate CI red.
if slug in sop._TESTING_CLASS_SLUGS:
continue # rejected: CI not green
approved.append(u)
return approved
comments = [_comment("ai-bot", "/sop-ack comprehensive-testing")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["comprehensive-testing"]["ackers"], [])
def test_ai_ack_passes_for_testing_item_when_ci_green(self):
# Simulate CI green → AI ack passes.
def probe(slug, users):
item = self.items.get(slug, {})
approved = []
for u in users:
if u == "ai-bot" and item.get("ai_ack_eligible"):
if slug in sop._TESTING_CLASS_SLUGS:
# CI is green → allow
pass
approved.append(u)
return approved
comments = [_comment("ai-bot", "/sop-ack comprehensive-testing")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["ai-bot"])
class TestAIAckHumanOnlyMigrationSchema(unittest.TestCase):
"""RC 8322: migration and schema items are human-only regardless of
any future config that might accidentally mark them ai_ack_eligible.
These slugs are not yet in the live config items list; the tests use
synthetic items so the production guard can be exercised directly.
"""
def setUp(self):
# Synthetic items — if live config ever adds migration/schema,
# they MUST stay human-only. The probe below mirrors the actual
# production closure logic (human team first, then AI fallback
# with _HUMAN_ONLY_SLUGS guard).
self.items = {
"migration": {
"slug": "migration",
"ai_ack_eligible": True,
"required_teams": ["engineers"],
},
"schema": {
"slug": "schema",
"ai_ack_eligible": True,
"required_teams": ["engineers"],
},
}
self.aliases = {}
def _production_like_probe(self, human_users, ai_users):
"""Return a probe that mirrors the production closure's guard."""
def probe(slug, users):
item = self.items.get(slug, {})
approved = []
for u in users:
if u in human_users:
approved.append(u)
elif u in ai_users:
# Production guard: _HUMAN_ONLY_SLUGS rejects AI acks
# regardless of the ai_ack_eligible flag.
if slug in sop._HUMAN_ONLY_SLUGS:
continue
if item.get("ai_ack_eligible"):
approved.append(u)
return approved
return probe
def test_ai_ack_rejected_for_migration(self):
comments = [_comment("ai-bot", "/sop-ack migration")]
probe = self._production_like_probe(human_users=set(), ai_users={"ai-bot"})
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["migration"]["ackers"], [])
self.assertIn("ai-bot", state["migration"]["rejected"]["not_in_team"])
def test_ai_ack_rejected_for_schema(self):
comments = [_comment("ai-bot", "/sop-ack schema")]
probe = self._production_like_probe(human_users=set(), ai_users={"ai-bot"})
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["schema"]["ackers"], [])
self.assertIn("ai-bot", state["schema"]["rejected"]["not_in_team"])
def test_human_ack_still_works_for_migration(self):
# Human team member acking migration/schema is unaffected.
comments = [_comment("bob", "/sop-ack migration")]
probe = self._production_like_probe(human_users={"bob"}, ai_users=set())
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["migration"]["ackers"], ["bob"])
def test_human_ack_still_works_for_schema(self):
comments = [_comment("bob", "/sop-ack schema")]
probe = self._production_like_probe(human_users={"bob"}, ai_users=set())
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe
)
self.assertEqual(state["schema"]["ackers"], ["bob"])
class TestGetCIStatus(unittest.TestCase):
"""Verify get_ci_status reads the correct context from commit statuses."""
def _client_with_statuses(self, statuses):
client = sop.GiteaClient("git.example.com", "tok")
def fake_req(method, path, body=None, ok_codes=(200, 201, 204)):
return 200, statuses
client._req = fake_req # type: ignore[method-assign]
return client
def test_ci_green_returns_success(self):
client = self._client_with_statuses([
{"context": "CI / all-required (pull_request)", "state": "success"},
])
self.assertEqual(
sop.get_ci_status(client, "o", "r", "sha1"), "success"
)
def test_ci_red_returns_failure(self):
client = self._client_with_statuses([
{"context": "CI / all-required (pull_request)", "state": "failure"},
])
self.assertEqual(
sop.get_ci_status(client, "o", "r", "sha1"), "failure"
)
def test_missing_context_returns_missing(self):
client = self._client_with_statuses([
{"context": "some-other-context", "state": "success"},
])
self.assertEqual(
sop.get_ci_status(client, "o", "r", "sha1"), "missing"
)
def test_api_error_returns_unknown(self):
client = sop.GiteaClient("git.example.com", "tok")
def fake_req(method, path, body=None, ok_codes=(200, 201, 204)):
return 500, {"error": "boom"}
client._req = fake_req # type: ignore[method-assign]
self.assertEqual(
sop.get_ci_status(client, "o", "r", "sha1"), "unknown"
)
+1 -29
View File
@@ -32,26 +32,6 @@
# AUTHOR SELF-ACK IS FORBIDDEN regardless of which team contains them
# — the gate script enforces commenter != PR author before checking
# team membership.
#
# AI-SOP-ACK TEAM (internal#760 ceremony design, CTO-approved):
# The `ai-sop-ack` team contains AI agent identities that can ack
# SOP-checklist items ON BEHALF OF automated evidence. An AI ack is
# only valid when:
# 1. the item has `ai_ack_eligible: true`
# 2. the item is NOT in the human-only carve-out (migration/schema)
# 3. for testing-class items, CI / all-required (pull_request) is
# green on the current head SHA
#
# AI acks NEVER count toward qa-review or security-review gates —
# those remain human-team-only (enforced by review-check.sh team
# probe against TEAM_ID 20/21).
#
# INITIAL ai_ack_eligible allowlist (CTO-controlled, msg 1388c76f):
# comprehensive-testing, local-postgres-e2e, staging-smoke,
# five-axis-review, memory-consulted
# HUMAN-ONLY carve-out:
# root-cause, no-backwards-compat
# Any widening requires an explicit config change reviewed by CTO.
version: 1
@@ -103,31 +83,25 @@ items:
numeric_alias: 1
pr_section_marker: "Comprehensive testing performed"
required_teams: [qa, engineers]
ai_ack_eligible: true
description: >-
What was tested, how, edge cases covered. Ack from any qa-team
member (or engineers fallback while qa is small). AI ack valid
only when CI / all-required (pull_request) is green.
member (or engineers fallback while qa is small).
- slug: local-postgres-e2e
numeric_alias: 2
pr_section_marker: "Local-postgres E2E run"
required_teams: [engineers]
ai_ack_eligible: true
description: >-
Link to local CI artifact, or "N/A: pure-frontend change". Ack
from any engineer who can verify the local DB test actually ran.
AI ack valid only when CI / all-required (pull_request) is green.
- slug: staging-smoke
numeric_alias: 3
pr_section_marker: "Staging-smoke verified or pending"
required_teams: [engineers]
ai_ack_eligible: true
description: >-
Link to canary run, or "scheduled post-merge". Ack from any
engineer (core-devops/infra-sre are members of engineers team).
AI ack valid only when CI / all-required (pull_request) is green.
- slug: root-cause
numeric_alias: 4
@@ -146,7 +120,6 @@ items:
numeric_alias: 5
pr_section_marker: "Five-Axis review walked"
required_teams: [engineers]
ai_ack_eligible: true
description: >-
Correctness / readability / architecture / security / performance.
Ack from any non-author engineer.
@@ -167,7 +140,6 @@ items:
numeric_alias: 7
pr_section_marker: "Memory/saved-feedback consulted"
required_teams: [engineers]
ai_ack_eligible: true
description: >-
List of feedback memories applicable to this change. Ack from
any engineer who has the same memory access.
@@ -42,9 +42,11 @@ jobs:
check:
name: Migration version collision check
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): 22 days green since 2026-05-11 port.
# mc#1982 mask removed — no surfaced defects in this lane.
continue-on-error: false
# Phase 3 (RFC #219 §1): surface broken workflows without blocking
# the PR. Follow-up PR flips this off after surfaced defects are
# triaged.
# mc#1982: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
timeout-minutes: 5
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+4 -6
View File
@@ -123,9 +123,8 @@ jobs:
# integration). See internal#512 for the class defect.
runs-on: docker-host
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# mc#1982: mask removed. If regressions appear, root-fix the underlying
# test — do NOT renew the mask silently.
continue-on-error: false
# mc#1982: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
outputs:
api: ${{ steps.decide.outputs.api }}
steps:
@@ -161,9 +160,8 @@ jobs:
# detect-changes for the full rationale.
runs-on: docker-host
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# mc#1982: mask removed. If regressions appear, root-fix the underlying
# test — do NOT renew the mask silently.
continue-on-error: false
# mc#1982: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
timeout-minutes: 15
env:
# Unique per-run container names so concurrent runs on the host-
+8 -30
View File
@@ -278,38 +278,16 @@ jobs:
export NEXT_PUBLIC_WS_URL="ws://127.0.0.1:${PLATFORM_PORT}/ws"
npx next dev --turbopack -p "${CANVAS_PORT}" > canvas.log 2>&1 &
echo $! > canvas.pid
# Readiness must wait for the actual chat route to *compile*, not
# just for the dev server to bind the port. `next dev --turbopack`
# accepts the TCP connection well before it has compiled a route
# on first request, so a bare `curl /` can 200 (or hang) while the
# page the tests load is still building. We therefore probe the
# real route the specs navigate to (`/?m=chat`) and require a 2xx,
# which only happens once Turbopack has finished the first
# compile. The previous 30s budget was also too tight for a cold
# Turbopack first-compile on a loaded operator-host runner — the
# `Canvas did not start in 30s` flake. Raise to 120s (job
# timeout-minutes is 15, so this is comfortably bounded) and probe
# every 2s.
READY=""
for i in $(seq 1 60); do
# Tempfile-routed -w + set +e/-e prevents curl-exit-code
# pollution of the captured status (lint-curl-status-capture.yml).
set +e
curl -s -o /dev/null -w '%{http_code}' "http://localhost:${CANVAS_PORT}/?m=chat" > /tmp/canvas-ready.code
set -e
CODE=$(cat /tmp/canvas-ready.code 2>/dev/null || echo "000")
if [ "$CODE" -ge 200 ] && [ "$CODE" -lt 400 ]; then
echo "Canvas (chat route compiled) up after ~$((i*2))s (HTTP ${CODE})"
READY=1
break
for i in $(seq 1 30); do
if curl -sf "http://localhost:${CANVAS_PORT}" > /dev/null 2>&1; then
echo "Canvas up after ${i}s"
exit 0
fi
sleep 2
sleep 1
done
if [ -z "$READY" ]; then
echo "::error::Canvas chat route did not compile in 120s (last HTTP ${CODE})"
cat canvas.log || true
exit 1
fi
echo "::error::Canvas did not start in 30s"
cat canvas.log || true
exit 1
- name: Run Playwright E2E tests
if: needs.detect-changes.outputs.chat == 'true'
@@ -88,9 +88,8 @@ jobs:
# surprises and keeps the routing rule discoverable in one place.
runs-on: docker-host
# mc#1982 Phase 3 (RFC §1): surface broken workflows without blocking.
# mc#1982: mask removed. If regressions appear, root-fix the underlying
# test — do NOT renew the mask silently.
continue-on-error: false
# mc#1982: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
outputs:
handlers: ${{ steps.filter.outputs.handlers }}
steps:
@@ -120,9 +119,8 @@ jobs:
# exists). See detect-changes for the full routing rationale.
runs-on: docker-host
# mc#1982 Phase 3 (RFC §1): surface broken workflows without blocking.
# mc#1982: mask removed. If regressions appear, root-fix the underlying
# test — do NOT renew the mask silently.
continue-on-error: false
# mc#1982: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
env:
# Unique name per run so concurrent jobs don't collide on the
# bridge network. ${RUN_ID}-${RUN_ATTEMPT} is unique even across
@@ -91,10 +91,10 @@ jobs:
name: lint-bp-context-emit-match
runs-on: ubuntu-latest
timeout-minutes: 5
# Phase 4 (RFC #219 §1): 22 days green since 2026-05-11 port,
# well past the 7-clean-run threshold. Scheduled failure is now
# a hard CI signal.
continue-on-error: false
# Phase 3 (RFC #219 §1): surface drift without blocking. After 7
# clean scheduled runs on main, flip to false so a scheduled
# failure is a hard CI signal.
continue-on-error: true # mc#1982 Phase 3 — flip to false after 7 clean main runs
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
@@ -48,9 +48,11 @@ jobs:
scan:
name: Scan workflows for curl status-capture pollution
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): 22 days green since 2026-05-11 port.
# mc#1982 mask removed — no surfaced defects in this lane.
continue-on-error: false
# Phase 3 (RFC #219 §1): surface broken workflows without blocking
# the PR. Follow-up PR flips this off after surfaced defects are
# triaged.
# mc#1982: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Find curl ... -w '%{http_code}' ... || echo "000" subshells
@@ -81,10 +81,10 @@ jobs:
name: lint-required-context-exists-in-bp
runs-on: ubuntu-latest
timeout-minutes: 5
# Phase 4 (RFC #219 §1): 22 days green since 2026-05-11 port,
# well past the 7-clean-day threshold. PR-time failure is now
# a hard CI signal.
continue-on-error: false
# Phase 3 (RFC #219 §1): surface the pattern without blocking PRs
# while the directive convention beds in. Follow-up flip to false
# after 7 clean days on main. mc#1982.
continue-on-error: true # mc#1982 Phase 3 — flip to false after 7 clean main runs
steps:
- name: Check out PR head with full history (need base SHA blobs)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -49,56 +49,37 @@ jobs:
GITHUB_SERVER_URL: https://git.moleculesai.app
steps:
- name: Identify runner
id: identify
continue-on-error: true
run: |
set -eu
echo "arch=$(uname -m)"
echo "kernel=$(uname -sr)"
echo "shell=$BASH_VERSION"
# Sanity: must actually be arm64. If amd64 sneaks in here,
# the job skips gracefully rather than hard-failing, because
# a mislabelled runner is an ops concern, not a code defect.
# Pilot lane must not make main red (#2146).
# fail fast — that means the label routing is wrong.
case "$(uname -m)" in
aarch64|arm64)
echo "arm64 confirmed"
echo "arm64=true" >> "$GITHUB_OUTPUT"
;;
*)
echo "ERROR: expected arm64, got $(uname -m) — label routing may be wrong"
echo "arm64=false" >> "$GITHUB_OUTPUT"
exit 1
;;
aarch64|arm64) echo "arm64 confirmed" ;;
*) echo "ERROR: expected arm64, got $(uname -m)"; exit 1 ;;
esac
- name: Checkout
if: steps.identify.outputs.arm64 == 'true'
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Install shellcheck (arm64)
if: steps.identify.outputs.arm64 == 'true'
continue-on-error: true
run: |
set -eu
if command -v shellcheck >/dev/null 2>&1; then
echo "shellcheck already present: $(shellcheck --version | head -1)"
else
# Prefer apt if the runner base ships it; else download the
# correct platform binary (darwin vs linux).
# Prefer apt if the runner base ships it; else download arm64 binary.
if command -v apt-get >/dev/null 2>&1; then
sudo apt-get update -qq
sudo apt-get install -y --no-install-recommends shellcheck
else
SC_VER=v0.10.0
if [ "$(uname -s)" = "Darwin" ]; then
SC_PKG="shellcheck-${SC_VER}.darwin.aarch64.tar.xz"
else
SC_PKG="shellcheck-${SC_VER}.linux.aarch64.tar.xz"
fi
curl -fsSL "https://github.com/koalaman/shellcheck/releases/download/${SC_VER}/${SC_PKG}" \
curl -fsSL "https://github.com/koalaman/shellcheck/releases/download/${SC_VER}/shellcheck-${SC_VER}.linux.aarch64.tar.xz" \
| tar -xJf - --strip-components=1
sudo mv shellcheck /usr/local/bin/
fi
@@ -106,15 +87,14 @@ jobs:
shellcheck --version | head -2
- name: Run shellcheck on .gitea/scripts/*.sh
if: steps.identify.outputs.arm64 == 'true'
continue-on-error: true
run: |
set -eu
# Only the scripts we control under .gitea/scripts. Pilot
# scope is intentionally narrow — broaden in a follow-up
# once the lane is proven.
if ! command -v shellcheck >/dev/null 2>&1 || ! shellcheck --version >/dev/null 2>&1; then
echo "WARN: shellcheck not functional — skipping (pilot mode)"
if ! command -v shellcheck >/dev/null 2>&1; then
echo "WARN: shellcheck binary not found — skipping (pilot mode)"
exit 0
fi
# NOTE: macOS ships Bash 3.2 (Apple license), no `mapfile`
@@ -16,24 +16,14 @@ name: publish-workspace-server-image
#
# Image tags produced:
# :staging-<sha> — per-commit digest, stable for canary verify
# :staging-latest — tracks most recent BUILD on this branch (set by the
# build job, last-writer-wins, NOT prod-gated)
# :latest — tracks the most recent PROD-PROMOTED build. Re-pointed by the
# deploy-production job ONLY after green main CI + canary +
# fleet rollout + /buildinfo verification pass. So :latest ==
# "current prod image", never the raw build. (Added 2026-06-03
# after a stale :latest — last moved 2026-05-10 — reverted a
# production tenant on a no-arg redeploy.)
# :staging-latest — tracks most recent build on this branch
#
# Production auto-deploy:
# After both platform and tenant images are pushed, deploy-production waits
# for strict required push contexts on the same SHA to go green, then
# calls the production CP redeploy-fleet endpoint with target_tag=
# staging-<sha>. On success (rollout + buildinfo verified) it re-points
# :latest to the same SHA. Set repo variable or secret
# PROD_AUTO_DEPLOY_DISABLED=true to stop production rollout while keeping
# image publishing enabled — in which case :latest is NOT advanced either
# (correct: an unpromoted build must not become :latest).
# staging-<sha>. Set repo variable or secret PROD_AUTO_DEPLOY_DISABLED=true
# to stop production rollout while keeping image publishing enabled.
#
# Primary ECR target: 153263036946.dkr.ecr.us-east-2.amazonaws.com/molecule-ai/*
# Optional staging tenant mirror target:
@@ -263,19 +253,6 @@ jobs:
PROD_AUTO_DEPLOY_DRY_RUN: ${{ vars.PROD_AUTO_DEPLOY_DRY_RUN || '' }}
PROD_ALLOW_NON_PROD_CP_URL: ${{ vars.PROD_ALLOW_NON_PROD_CP_URL || '' }}
steps:
# The publish runner's default HOME (/home/hongming) is not writable, so
# git/docker credential saves fail (`Error saving credentials: mkdir
# /home/hongming: permission denied`) and halt the production rollout
# (#2193). Point HOME + DOCKER_CONFIG at the writable job temp dir —
# mirrors build-and-push's "Prepare writable Docker config" fix above.
- name: Prepare writable HOME + Docker config
run: |
set -euo pipefail
H="$RUNNER_TEMP/auto-deploy-home"
mkdir -p "$H/.docker"
echo "HOME=$H" >> "$GITHUB_ENV"
echo "DOCKER_CONFIG=$H/.docker" >> "$GITHUB_ENV"
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -386,36 +363,6 @@ jobs:
run: |
set -euo pipefail
RESP="$RUNNER_TEMP/prod-redeploy-response.json"
# Superseded-job guard. This workflow has no `concurrency:` (header
# explains why: Gitea 1.22.6 cancels queued prod deploys). So two
# close main pushes run BOTH deploy-production jobs. The newer one
# rolls the fleet to its (newer) build first; this older job's strict
# equality check below would then see tenants on the NEWER SHA and
# false-red "$slug is stale" even though the fleet is AHEAD, not
# behind (git SHAs aren't ordered; /buildinfo exposes only git_sha).
#
# If main's current head is no longer THIS job's SHA, a newer commit
# has landed and this deploy is superseded — the newest job's verify
# is authoritative. Skip strict verify and succeed. exit 0 => newer
# head printed (superseded); exit 10 => still the latest, proceed to
# the strict verify so a genuinely-behind tenant still fails loudly.
set +e
NEWER_HEAD="$(python3 .gitea/scripts/prod-auto-deploy.py check-superseded)"
SUPERSEDED_EXIT=$?
set -e
if [ "$SUPERSEDED_EXIT" -eq 0 ] && [ -n "$NEWER_HEAD" ]; then
echo "::notice::Superseded deploy: main head is now ${NEWER_HEAD:0:7} (this job deployed ${GITHUB_SHA:0:7}). The fleet is at or ahead of this build; the newer deploy job's verify is authoritative. Skipping strict SHA verify."
{
echo ""
echo "### Buildinfo verification skipped — superseded deploy"
echo ""
echo "This deploy job's SHA \`${GITHUB_SHA:0:7}\` is no longer the head of \`main\` (now \`${NEWER_HEAD:0:7}\`)."
echo "A newer deploy job is rolling the fleet forward; its verify is authoritative."
} >> "$GITHUB_STEP_SUMMARY"
exit 0
fi
mapfile -t SLUGS < <(jq -r '.results[]? | .slug' "$RESP")
if [ ${#SLUGS[@]} -eq 0 ]; then
echo "::error::No tenants returned from redeploy-fleet; refusing to mark production deploy verified."
@@ -462,65 +409,3 @@ jobs:
if [ "$STALE_COUNT" -gt 0 ] || [ "$UNHEALTHY_COUNT" -gt 0 ] || [ "$UNREACHABLE_COUNT" -gt 0 ]; then
exit 1
fi
# Re-point :latest to the just-promoted image — ONLY after the
# production rollout + buildinfo verification above have passed.
#
# WHY HERE (promote point), not at build time:
# The platform-tenant ECR `:latest` tag was last moved 2026-05-10
# and went 3.5 weeks stale because the build step only pushes
# :staging-<sha> + :staging-latest and never re-points :latest. A
# no-arg POST /cp/admin/tenants/:slug/redeploy (whose default tag
# fell through to "latest") then pulled the 3.5-week-old image and
# REVERTED the tenant (incident: molecule-adk-demo, 2026-06-03).
#
# The defense-in-depth half of this fix changes that redeploy
# default to :staging-latest, but :latest itself must also be
# kept meaningful. We make :latest track the PROD-BLESSED build,
# not the raw build: by living at the end of deploy-production —
# after `wait-ci` (green main CI), the canary-first batched fleet
# rollout, AND the /buildinfo SHA verification — :latest only ever
# advances to a SHA that is actually green and confirmed running
# across the live fleet. So `:latest` == "current prod image",
# and any consumer that pulls :latest (legacy callers, manual
# `docker pull`, a redeploy that somehow still resolves "latest")
# gets the blessed image instead of whatever happened to build.
#
# Re-tag is digest-level (imagetools create), so no rebuild and
# :latest is byte-identical to :staging-<sha> for this commit.
- name: Promote :latest to the verified prod image
if: ${{ steps.plan.outputs.enabled == 'true' }}
env:
TENANT_IMAGE_NAME: ${{ env.TENANT_IMAGE_NAME }}
STAGING_TENANT_IMAGE_NAME: ${{ env.STAGING_TENANT_IMAGE_NAME }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: us-east-2
run: |
set -euo pipefail
SHA_TAG="staging-${GITHUB_SHA::7}"
PROD_ECR_REGISTRY="${TENANT_IMAGE_NAME%%/*}"
STAGING_ECR_REGISTRY="${STAGING_TENANT_IMAGE_NAME%%/*}"
aws ecr get-login-password --region us-east-2 | \
docker login --username AWS --password-stdin "${PROD_ECR_REGISTRY}"
aws ecr get-login-password --region us-east-2 | \
docker login --username AWS --password-stdin "${STAGING_ECR_REGISTRY}"
# imagetools create copies the source manifest to the new tag by
# digest (no pull/rebuild). :latest now points at the exact image
# that just passed the prod gate.
docker buildx imagetools create \
--tag "${TENANT_IMAGE_NAME}:latest" \
"${TENANT_IMAGE_NAME}:${SHA_TAG}"
docker buildx imagetools create \
--tag "${STAGING_TENANT_IMAGE_NAME}:latest" \
"${STAGING_TENANT_IMAGE_NAME}:${SHA_TAG}"
{
echo ""
echo "### :latest promoted"
echo ""
echo "Re-pointed \`platform-tenant:latest\` → \`${SHA_TAG}\` (prod + staging ECR)."
echo ":latest now tracks the prod-blessed, fleet-verified image."
} >> "$GITHUB_STEP_SUMMARY"
+8 -89
View File
@@ -9,22 +9,10 @@
# Triggers on:
# - `pull_request_target`: opened, synchronize, reopened
# → initial status posts when PR opens / re-pushes
# - `pull_request_review` types: [submitted]
# → re-evaluate when a team member submits an APPROVE review so
# the gate flips immediately (no wait for the next push or
# slash-command). Verified live: sop-tier-check.yml uses this
# same event and provably fires (produces
# `sop-tier-check / tier-check (pull_request_review)` contexts).
# The job-level `if:` guard checks
# `github.event.review.state == 'APPROVED' || 'approved'` so
# only APPROVE reviews run the evaluator; COMMENT and
# REQUEST_CHANGES are skipped at the job level.
# Branch-protection requires the `(pull_request_target)`
# context variant, so the review-event path EXPLICITLY POSTS
# the required context via the API. Trust boundary preserved
# (BASE ref, no PR-head).
# - comment refires are handled by `sop-checklist.yml` review-refire job
# → `/qa-recheck` slash-command re-evaluates this gate.
# - comment refires are handled by `review-refire-comments.yml`
# → a single issue_comment dispatcher prevents every SOP/review
# comment from enqueueing separate qa/security/tier jobs on
# Gitea 1.22.6 before job-level `if:` can skip them.
# Workflow name = `qa-review` ; job name = `approved`.
# The job's own pass/fail conclusion publishes the status context
# `qa-review / approved (<event>)` — NO `POST /statuses` call → NO
@@ -97,26 +85,21 @@ name: qa-review
on:
pull_request_target:
types: [opened, synchronize, reopened]
pull_request_review:
types: [submitted]
permissions:
contents: read
pull-requests: read
statuses: write
secrets: read
jobs:
# bp-exempt: PR review bot signal; required merge state is enforced by CI / all-required.
approved:
# Gate the job:
# - On pull_request_target events: always run.
# - On pull_request_review_approved events: run so the gate flips
# immediately when a team member submits an APPROVE review.
# Comment-triggered refires live in sop-checklist.yml review-refire job.
# Comment-triggered refires live in review-refire-comments.yml. Keeping
# this workflow PR-only avoids comment-triggered queue storms.
if: |
github.event_name == 'pull_request_target' ||
(github.event_name == 'pull_request_review' &&
(github.event.review.state == 'APPROVED' || github.event.review.state == 'approved'))
github.event_name == 'pull_request_target'
runs-on: ubuntu-latest
steps:
- name: Privilege check (A1.1 — INFORMATIONAL log only, NOT a gate)
@@ -160,7 +143,6 @@ jobs:
ref: ${{ github.event.repository.default_branch }}
- name: Evaluate qa-review
id: eval
env:
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
GITEA_HOST: git.moleculesai.app
@@ -175,66 +157,3 @@ jobs:
REVIEW_CHECK_DEBUG: '0'
REVIEW_CHECK_STRICT: '0'
run: bash .gitea/scripts/review-check.sh
- name: Post required status context on pull_request_review
# Gitea Actions auto-publishes (pull_request_review) context
# for this event, but branch-protection requires (pull_request_target).
# We explicitly POST the BP-required context so the gate flips.
# Trust boundary: same BASE-ref script result, no PR-head code.
#
# TOKEN FIX (RC 8326): uses STATUS_POST_TOKEN (CTO-granted,
# msg d52cc72a). Dedicated narrow-scoped write:repository token
# for the explicit status POST. Evaluator step stays on
# SOP_TIER_CHECK_TOKEN (read-only) per deliberate security
# separation: eval computes, POST writes, never the same cred.
if: github.event_name == 'pull_request_review' && always()
env:
GITEA_TOKEN: ${{ secrets.STATUS_POST_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.pull_request.number || github.event.issue.number }}
EVAL_OUTCOME: ${{ steps.eval.outcome }}
run: |
set -euo pipefail
authfile=$(mktemp)
chmod 600 "$authfile"
printf 'header = "Authorization: token %s"\n' "$GITEA_TOKEN" > "$authfile"
prfile=$(mktemp)
code=$(curl -sS -o "$prfile" -w '%{http_code}' -K "$authfile" \
"https://${GITEA_HOST}/api/v1/repos/${REPO}/pulls/${PR_NUMBER}")
if [ "$code" != "200" ]; then
echo "::error::GET /pulls/${PR_NUMBER} returned HTTP ${code}"
rm -f "$prfile" "$authfile"
exit 1
fi
head_sha=$(jq -r '.head.sha // ""' "$prfile")
rm -f "$prfile"
if [ "$EVAL_OUTCOME" = "success" ]; then
status_state="success"
description="Approved via pull_request_review trigger"
else
status_state="failure"
description="Review check failed via pull_request_review trigger"
fi
body=$(jq -nc \
--arg state "$status_state" \
--arg context "qa-review / approved (pull_request_target)" \
--arg description "$description" \
'{state:$state, context:$context, description:$description}')
post_code=$(curl -sS -o /dev/null -w '%{http_code}' -X POST \
-K "$authfile" -H "Content-Type: application/json" \
-d "$body" \
"https://${GITEA_HOST}/api/v1/repos/${REPO}/statuses/${head_sha}")
rm -f "$authfile"
if [ "$post_code" != "200" ] && [ "$post_code" != "201" ]; then
echo "::error::POST /statuses/${head_sha} returned HTTP ${post_code}"
exit 1
fi
echo "::notice::posted ${status_state} for context=\"qa-review / approved (pull_request_target)\" on sha=${head_sha}"
+4 -87
View File
@@ -6,44 +6,25 @@
#
# See `qa-review.yml` header for the full A1-α / A1.1 / A4 / A5 design
# rationale; everything below is identical in shape.
#
# A1-α addendum (internal#760): review-event trigger added so the security
# gate flips immediately when a team member submits an APPROVE review.
# Uses `pull_request_review` types: [submitted] — verified live via
# sop-tier-check.yml which provably fires this event (produces
# `sop-tier-check / tier-check (pull_request_review)` contexts).
# The job-level `if:` guard checks
# `github.event.review.state == 'APPROVED' || 'approved'` so only APPROVE
# reviews run the evaluator; COMMENT and REQUEST_CHANGES are skipped at
# the job level. Branch-protection requires the `(pull_request_target)`
# context variant, so the review-event path EXPLICITLY POSTS the required
# context via the API. Trust boundary preserved (BASE ref, no PR-head).
name: security-review
on:
pull_request_target:
types: [opened, synchronize, reopened]
pull_request_review:
types: [submitted]
permissions:
contents: read
pull-requests: read
statuses: write
secrets: read
jobs:
# bp-exempt: PR security review bot signal; required merge state is enforced by CI / all-required.
approved:
# Gate the job:
# - On pull_request_target events: always run.
# - On pull_request_review_approved events: run so the gate flips
# immediately when a team member submits an APPROVE review.
# Comment-triggered refires live in sop-checklist.yml review-refire job.
# Comment-triggered refires live in review-refire-comments.yml. Keeping
# this workflow PR-only avoids comment-triggered queue storms.
if: |
github.event_name == 'pull_request_target' ||
(github.event_name == 'pull_request_review' &&
(github.event.review.state == 'APPROVED' || github.event.review.state == 'approved'))
github.event_name == 'pull_request_target'
runs-on: ubuntu-latest
steps:
- name: Privilege check (A1.1 — INFORMATIONAL log only, NOT a gate)
@@ -76,7 +57,6 @@ jobs:
ref: ${{ github.event.repository.default_branch }}
- name: Evaluate security-review
id: eval
env:
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
GITEA_HOST: git.moleculesai.app
@@ -88,66 +68,3 @@ jobs:
REVIEW_CHECK_DEBUG: '0'
REVIEW_CHECK_STRICT: '0'
run: bash .gitea/scripts/review-check.sh
- name: Post required status context on pull_request_review
# Gitea Actions auto-publishes (pull_request_review) context
# for this event, but branch-protection requires (pull_request_target).
# We explicitly POST the BP-required context so the gate flips.
# Trust boundary: same BASE-ref script result, no PR-head code.
#
# TOKEN FIX (RC 8326): uses STATUS_POST_TOKEN (CTO-granted,
# msg d52cc72a). Dedicated narrow-scoped write:repository token
# for the explicit status POST. Evaluator step stays on
# SOP_TIER_CHECK_TOKEN (read-only) per deliberate security
# separation: eval computes, POST writes, never the same cred.
if: github.event_name == 'pull_request_review' && always()
env:
GITEA_TOKEN: ${{ secrets.STATUS_POST_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.pull_request.number || github.event.issue.number }}
EVAL_OUTCOME: ${{ steps.eval.outcome }}
run: |
set -euo pipefail
authfile=$(mktemp)
chmod 600 "$authfile"
printf 'header = "Authorization: token %s"\n' "$GITEA_TOKEN" > "$authfile"
prfile=$(mktemp)
code=$(curl -sS -o "$prfile" -w '%{http_code}' -K "$authfile" \
"https://${GITEA_HOST}/api/v1/repos/${REPO}/pulls/${PR_NUMBER}")
if [ "$code" != "200" ]; then
echo "::error::GET /pulls/${PR_NUMBER} returned HTTP ${code}"
rm -f "$prfile" "$authfile"
exit 1
fi
head_sha=$(jq -r '.head.sha // ""' "$prfile")
rm -f "$prfile"
if [ "$EVAL_OUTCOME" = "success" ]; then
status_state="success"
description="Approved via pull_request_review trigger"
else
status_state="failure"
description="Review check failed via pull_request_review trigger"
fi
body=$(jq -nc \
--arg state "$status_state" \
--arg context "security-review / approved (pull_request_target)" \
--arg description "$description" \
'{state:$state, context:$context, description:$description}')
post_code=$(curl -sS -o /dev/null -w '%{http_code}' -X POST \
-K "$authfile" -H "Content-Type: application/json" \
-d "$body" \
"https://${GITEA_HOST}/api/v1/repos/${REPO}/statuses/${head_sha}")
rm -f "$authfile"
if [ "$post_code" != "200" ] && [ "$post_code" != "201" ]; then
echo "::error::POST /statuses/${head_sha} returned HTTP ${post_code}"
exit 1
fi
echo "::notice::posted ${status_state} for context=\"security-review / approved (pull_request_target)\" on sha=${head_sha}"
+6 -6
View File
@@ -179,10 +179,10 @@ jobs:
- name: Refire qa-review status
if: steps.classify.outputs.run_qa == 'true'
env:
# Evaluator (review-check.sh + GET /pulls) stays on read-scoped token.
# RFC_324_TEAM_READ_TOKEN is read-only (team membership read scope only).
# review-refire-status.sh POSTs to /statuses — requires write scope.
# SOP_TIER_CHECK_TOKEN carries write:repository + write:issue + read:organization.
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
# Explicit POST /statuses uses narrow-scoped write:repository token.
STATUS_POST_TOKEN: ${{ secrets.STATUS_POST_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.issue.number }}
@@ -198,10 +198,10 @@ jobs:
- name: Refire security-review status
if: steps.classify.outputs.run_security == 'true'
env:
# Evaluator (review-check.sh + GET /pulls) stays on read-scoped token.
# RFC_324_TEAM_READ_TOKEN is read-only (team membership read scope only).
# review-refire-status.sh POSTs to /statuses — requires write scope.
# SOP_TIER_CHECK_TOKEN carries write:repository + write:issue + read:organization.
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
# Explicit POST /statuses uses narrow-scoped write:repository token.
STATUS_POST_TOKEN: ${{ secrets.STATUS_POST_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.issue.number }}
+2 -17
View File
@@ -60,26 +60,11 @@ test.describe("MobileChat", () => {
await expect(page.getByText("Echo: Mobile persistence")).toBeVisible({ timeout: 15_000 });
// Reload and deterministically wait for the chat-history GET that
// rehydrates the transcript to come back 2xx, rather than racing a
// fixed-timeout render assertion against an in-flight fetch. The
// server now persists the a2a_receive row SYNCHRONOUSLY before the
// send's 200 (workspace-server logA2ASuccess), so the row is
// guaranteed present by the time this GET runs — the wait is for
// hydration latency, not for a still-racing write.
const historyResponse = page.waitForResponse(
(resp) =>
resp.url().includes("/chat-history") &&
resp.request().method() === "GET" &&
resp.status() === 200,
{ timeout: 15_000 },
);
await page.reload();
await page.waitForSelector("[data-testid='chat-panel']", { timeout: 10_000 });
await historyResponse;
await expect(page.getByText("Mobile persistence", { exact: true })).toBeVisible();
await expect(page.getByText("Echo: Mobile persistence")).toBeVisible();
await expect(page.getByText("Mobile persistence", { exact: true })).toBeVisible({ timeout: 5_000 });
await expect(page.getByText("Echo: Mobile persistence")).toBeVisible({ timeout: 5_000 });
});
test("composer auto-grows with multi-line text", async ({ page }) => {
@@ -8,12 +8,9 @@ import { ExternalConnectModal, type ExternalConnectionInfo } from "./ExternalCon
import {
ProviderModelSelector,
buildProviderCatalog,
buildProviderCatalogFromRegistry,
findProviderForModel,
type SelectorModel,
type SelectorValue,
type RegistryProvider,
type RegistryModel,
} from "./ProviderModelSelector";
interface WorkspaceOption {
@@ -35,16 +32,6 @@ interface TemplateSpec {
model?: string;
models?: SelectorModel[];
providers?: string[];
// internal#718 P3 registry-served fields (additive; absent on older
// backends and for non-registry runtimes). When registry_backed is true the
// provider→model catalog is built from registry_providers/registry_models so
// each model's DERIVED provider (e.g. moonshot/kimi-k2.6 → "platform") drives
// the dropdown bucket and the create payload's llm_provider — instead of the
// legacy inferVendor heuristic that slash-splits the id into "moonshot".
// Mirrors ConfigTab's RuntimeOption loader (RFC#340 Fix C).
registry_backed?: boolean;
registry_providers?: RegistryProvider[];
registry_models?: RegistryModel[];
}
const DEFAULT_RUNTIME = "claude-code";
@@ -181,53 +168,15 @@ export function CreateWorkspaceButton() {
}),
[runtime, templateSpecs],
);
// The /templates row backing the LLM picker: an explicitly-selected
// workspace template wins, else the base runtime template row.
const llmSourceSpec = useMemo<TemplateSpec | null>(
() => selectedTemplateSpec ?? selectedRuntimeTemplateSpec,
const llmModels = useMemo(
() => {
const sourceSpec = selectedTemplateSpec ?? selectedRuntimeTemplateSpec;
if (!sourceSpec?.models?.length) return [];
return sourceSpec.models;
},
[selectedRuntimeTemplateSpec, selectedTemplateSpec],
);
// internal#718 P3 / RFC#340 Fix C: a runtime is registry-backed when the
// /templates row says so AND it served a non-empty registry_models set.
// Mirrors ConfigTab's `registryBacked` derivation exactly.
const registryBacked = useMemo(
() =>
llmSourceSpec?.registry_backed === true &&
(llmSourceSpec.registry_models?.length ?? 0) > 0,
[llmSourceSpec],
);
// Models fed to the selector dropdown. For a registry-backed runtime use the
// registry-served native set, carrying each model's DERIVED provider so the
// selector buckets it correctly (moonshot/kimi-k2.6 → "platform", not the
// inferVendor "moonshot"). Otherwise fall back to the template-served
// models[] + the legacy heuristic — same fallback ConfigTab keeps.
const llmModels = useMemo<SelectorModel[]>(
() => {
if (registryBacked) {
return (llmSourceSpec?.registry_models ?? []).map((m) => ({
id: m.id,
name: m.name,
...(m.provider ? { provider: m.provider } : {}),
}));
}
return llmSourceSpec?.models?.length ? llmSourceSpec.models : [];
},
[registryBacked, llmSourceSpec],
);
// Registry-backed path: build the catalog from registry_providers/
// registry_models so dropdown labels + billing + the derived provider come
// from the provider-registry SSOT (restores the "Platform" bucket). Legacy
// path: re-infer from models[] via buildProviderCatalog (inferVendor).
const llmCatalog = useMemo(
() =>
registryBacked
? buildProviderCatalogFromRegistry(
llmSourceSpec?.registry_providers ?? [],
llmSourceSpec?.registry_models ?? [],
)
: buildProviderCatalog(llmModels),
[registryBacked, llmSourceSpec, llmModels],
);
const llmCatalog = useMemo(() => buildProviderCatalog(llmModels), [llmModels]);
const selectedLLMProvider = useMemo(
() => llmCatalog.find((p) => p.id === llmSelection.providerId) ?? llmCatalog[0],
[llmCatalog, llmSelection.providerId],
@@ -235,7 +184,7 @@ export function CreateWorkspaceButton() {
useEffect(() => {
if (llmCatalog.length === 0) return;
const sourceDefault = llmSourceSpec?.model?.trim();
const sourceDefault = (selectedTemplateSpec ?? selectedRuntimeTemplateSpec)?.model?.trim();
const platformProvider = llmCatalog.find((p) => p.vendor === "platform");
const matched = sourceDefault ? findProviderForModel(llmCatalog, sourceDefault) : null;
const next = platformProvider ?? matched ?? llmCatalog[0];
@@ -248,7 +197,7 @@ export function CreateWorkspaceButton() {
envVars: next.envVars,
});
setLLMSecret("");
}, [llmCatalog, llmSourceSpec]);
}, [llmCatalog, selectedRuntimeTemplateSpec, selectedTemplateSpec]);
// Reset form and load workspaces whenever dialog opens
useEffect(() => {
@@ -512,7 +461,6 @@ export function CreateWorkspaceButton() {
</div>
<ProviderModelSelector
models={llmModels}
catalog={registryBacked ? llmCatalog : undefined}
value={llmSelection}
onChange={(next) => {
setLLMSelection(next);
@@ -454,128 +454,6 @@ describe("CreateWorkspaceDialog — dynamic runtime provider picker", () => {
});
});
// ---------------------------------------------------------------------------
// Registry-backed provider catalog (RFC#340 Fix C)
//
// Regression guard for the mis-bucketing bug: when a registry-backed
// claude-code template serves `moonshot/kimi-k2.6` whose DERIVED provider is
// `platform`, the dialog must build the dropdown from registry_providers/
// registry_models (buildProviderCatalogFromRegistry) — NOT the legacy
// inferVendor heuristic which slash-splits the id into "moonshot". The
// distinguishing trait of this fixture: the plain `models[]` array does NOT
// carry an explicit `provider` field, so the LEGACY path would bucket the
// model under "moonshot" and send llm_provider:"moonshot". Only the
// registry-backed path yields the Platform bucket + llm_provider:"platform".
// ---------------------------------------------------------------------------
// claude-code template whose plain models[] is UN-annotated (no explicit
// provider). The derived-provider annotation lives ONLY in registry_models.
const REGISTRY_TEMPLATE = {
id: "claude-code-default",
name: "Claude Code Agent",
runtime: "claude-code",
model: "moonshot/kimi-k2.6",
// Legacy fields — note: NO explicit provider on the platform model, so the
// legacy inferVendor path would slash-split it into "moonshot".
providers: ["platform", "minimax", "anthropic"],
models: [
{ id: "moonshot/kimi-k2.6", name: "Kimi K2.6", required_env: [] },
{ id: "MiniMax-M2.7", name: "MiniMax M2.7", required_env: ["MINIMAX_API_KEY"] },
{ id: "claude-sonnet-4-6", name: "Claude Sonnet 4.6", required_env: ["ANTHROPIC_API_KEY"] },
],
// Registry-served SSOT (internal#718 P3). DeriveProvider resolved
// moonshot/kimi-k2.6 → "platform"; MiniMax-M2.7 → "minimax".
registry_backed: true,
registry_providers: [
{ name: "platform", display_name: "Platform", auth_env: [], billing_mode: "platform_managed" },
{ name: "minimax", display_name: "MiniMax", auth_env: ["MINIMAX_API_KEY"], billing_mode: "byok" },
{ name: "anthropic", display_name: "Anthropic API", auth_env: ["ANTHROPIC_API_KEY"], billing_mode: "byok" },
],
registry_models: [
{ id: "moonshot/kimi-k2.6", name: "Kimi K2.6", provider: "platform", billing_mode: "platform_managed" },
{ id: "MiniMax-M2.7", name: "MiniMax M2.7", provider: "minimax", billing_mode: "byok" },
{ id: "claude-sonnet-4-6", name: "Claude Sonnet 4.6", provider: "anthropic", billing_mode: "byok" },
],
};
describe("CreateWorkspaceDialog — registry-backed provider catalog (RFC#340 Fix C)", () => {
beforeEach(() => {
mockGet.mockImplementation(async (url: string) => {
if (url === "/templates") {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return [REGISTRY_TEMPLATE] as any;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return SAMPLE_WORKSPACES as any;
});
});
it("shows the Platform provider bucket for the registry-backed claude-code runtime", async () => {
await openDialog();
const providerSelect = await waitFor(() => {
const sel = document.querySelector("[data-testid='provider-select']") as HTMLSelectElement;
expect(sel).toBeTruthy();
return sel;
});
const labels = Array.from(providerSelect.options).map((o) => o.text.trim());
// Registry display_name "Platform" appears — NOT "moonshot" from the
// legacy slash-split heuristic.
expect(labels).toContain("Platform");
expect(labels).not.toContain("moonshot");
// Bucket id is the registry-keyed id, vendor is the bare provider name.
const values = Array.from(providerSelect.options).map((o) => o.value);
expect(values).toContain("registry|platform");
});
it("sends llm_provider: platform (not moonshot) for moonshot/kimi-k2.6", async () => {
await openDialog();
fireEvent.change(screen.getByPlaceholderText("e.g. SEO Agent"), {
target: { value: "Kimi Agent" },
});
// Wait for the registry default to settle on the Platform bucket + model.
await waitFor(() => {
const modelSelect = document.querySelector("[data-testid='model-select']") as HTMLSelectElement;
expect(modelSelect?.value).toBe("moonshot/kimi-k2.6");
});
const createBtn = screen.getAllByRole("button").find((b) => b.textContent === "Create");
fireEvent.click(createBtn!);
await waitFor(() => expect(mockPost).toHaveBeenCalled());
const body = mockPost.mock.calls[0][1] as Record<string, unknown>;
expect(body.model).toBe("moonshot/kimi-k2.6");
expect(body.llm_provider).toBe("platform");
// Platform is auth-env-free → no BYOK secret.
expect(body.secrets).toBeUndefined();
});
it("buckets MiniMax-M2.7 under its derived provider and sends llm_provider: minimax", async () => {
await openDialog();
fireEvent.change(screen.getByPlaceholderText("e.g. SEO Agent"), {
target: { value: "MiniMax Agent" },
});
await waitFor(() => {
const sel = document.querySelector("[data-testid='provider-select']") as HTMLSelectElement;
expect(Array.from(sel.options).map((o) => o.value)).toContain("registry|minimax");
});
fireEvent.change(document.querySelector("[data-testid='provider-select']") as HTMLSelectElement, {
target: { value: "registry|minimax" },
});
fireEvent.change(document.getElementById("llm-secret-input") as HTMLInputElement, {
target: { value: "sk-minimax-test" },
});
const createBtn = screen.getAllByRole("button").find((b) => b.textContent === "Create");
fireEvent.click(createBtn!);
await waitFor(() => expect(mockPost).toHaveBeenCalled());
const body = mockPost.mock.calls[0][1] as Record<string, unknown>;
expect(body.model).toBe("MiniMax-M2.7");
expect(body.llm_provider).toBe("minimax");
expect(body.secrets).toEqual({ MINIMAX_API_KEY: "sk-minimax-test" });
});
});
// ---------------------------------------------------------------------------
// budget_limit field tests (#541)
// ---------------------------------------------------------------------------
+2
View File
@@ -34,6 +34,8 @@
],
"org_templates": [
{"name": "molecule-dev", "repo": "molecule-ai/molecule-ai-org-template-molecule-dev", "ref": "main"},
{"name": "free-beats-all", "repo": "molecule-ai/molecule-ai-org-template-free-beats-all", "ref": "main"},
{"name": "medo-smoke", "repo": "molecule-ai/molecule-ai-org-template-medo-smoke", "ref": "main"},
{"name": "molecule-worker-gemini", "repo": "molecule-ai/molecule-ai-org-template-molecule-worker-gemini", "ref": "main"},
{"name": "ux-ab-lab", "repo": "molecule-ai/molecule-ai-org-template-ux-ab-lab", "ref": "main"}
]
-131
View File
@@ -1,131 +0,0 @@
# Developer SOP — PR review gate auto-fire and stale-head handling
> Last updated: 2026-06-03 (cp#2159 follow-up)
>
> Applies to: all core-PR authors and reviewers on `molecule-core` and sibling
> repos using the `qa-review` + `security-review` branch-protection gates.
---
## 1. Gitea PR-head workflow-selection rule
**Rule:** For `pull_request_target` and `pull_request_review` events, Gitea
loads the workflow definition from the **PR's HEAD branch**, not from the
base (`main`) branch.
This is different from GitHub Actions, where `pull_request_target` always
loads workflows from the base branch. Gitea's behaviour means:
- A PR that was opened **before** the `pull_request_review` trigger was added
to `qa-review.yml` / `security-review.yml` will **NOT** auto-fire on review,
because its HEAD still contains the old workflow YAML (no trigger).
- A PR that was opened **after** the trigger was added (or that has been
rebased onto a commit containing the trigger) **WILL** auto-fire, because its
HEAD contains the new workflow YAML.
### Ops implication
| PR head contains `pull_request_review` trigger? | Behaviour on APPROVED review |
|---|---|
| **Yes** (cut from current main, or rebased) | Workflows auto-queue, evaluate, and POST the `(pull_request_target)` context automatically. No slash-command needed. |
| **No** (stale head, opened before #2157) | Nothing fires. Use `/qa-recheck` + `/security-recheck` slash-commands in a PR comment, OR rebase onto current main. |
---
## 2. Standard core-PR flow (post-#2157)
```
1. Author opens PR from a branch based on current main
→ qa-review + security-review workflows run on pull_request_target
→ status contexts post (initial eval, usually red until reviews land)
2. Reviewers submit real APPROVED reviews
→ If PR head has the trigger: workflows AUTO-FIRE on pull_request_review
→ Contexts flip green (or stay red if reviewer is not in team)
3. [Optional] If contexts did not flip (stale head, event lost, etc.):
→ Anyone can comment `/qa-recheck` or `/security-recheck`
→ sop-checklist.yml refires the evaluator (read-only, idempotent)
4. Both qa-review + security-review contexts are green
→ Plain Do:merge (no force-merge needed)
```
### Key point
The `/qa-recheck` and `/security-recheck` commands are a **backstop**, not the
primary path. PRs cut from current main should auto-fire without manual
intervention.
---
## 3. Diagnosing a stale head
If a PR has real team-member APPROVED reviews but the qa/security contexts
remain red and no workflow run appears on the PR's "Actions" tab for the
review event, the PR head is likely stale.
### Quick check
```bash
# From the PR page, look at the head commit SHA, then:
curl -sS "https://git.moleculesai.app/api/v1/repos/molecule-ai/molecule-core/contents/.gitea/workflows/qa-review.yml?ref=<HEAD_SHA>" \
| jq -r '.content' | base64 -d | grep -c 'pull_request_review'
# 0 → stale head (no trigger in that version of the workflow)
# >0 → trigger present; auto-fire SHOULD work (if it didn't, file a tracker)
```
### Automated diagnostic
The test suite includes `test_gate_stale_head_diagnostic.py`, which reports
"auto-fire impossible for this PR" when the head lacks the trigger. Run it
in CI or locally with:
```bash
PR_NUMBER=123 python -m pytest .gitea/scripts/tests/test_gate_stale_head_diagnostic.py -v
```
---
## 4. Rebasing vs. slash-refire
| Approach | When to use | Trade-off |
|---|---|---|
| **Rebase onto current main** | PR is genuinely stale (head lacks trigger OR head is far behind main) | Clean history, gets all recent fixes, but requires force-push and re-approval if the branch was protected |
| **`/qa-recheck` + `/security-recheck`** | PR head is recent but the review event was missed, or you want to avoid rebase churn | Quick, no force-push, but does NOT fix a missing trigger in the head |
**Do not** use slash-refire as a substitute for rebasing a stale head. If the
workflow YAML in the PR head does not contain `pull_request_review`, no amount
of rechecking will make auto-fire work.
---
## 5. Live-fire verification
The `test_gate_auto_fire_live.py` regression test exercises the full runtime
path: it submits an APPROVED review to a test PR and polls for the
`(pull_request_target)` status contexts. It is skipped when no API token is
available, and is intended to catch runtime non-fire that static structural
tests (e.g. `test_gate_review_auto_fire.py`) cannot detect.
Run manually with:
```bash
export GITEA_HOST=git.moleculesai.app
export GITEA_TOKEN=<your-token>
export REPO=molecule-ai/molecule-core
export LIVEFIRE_PR_NUMBER=<test-pr-number>
python -m pytest .gitea/scripts/tests/test_gate_auto_fire_live.py -v
```
---
## References
- #2159 — gate auto-trigger not firing (root cause: stale PR heads lacking
the `pull_request_review` trigger, NOT a workflow code defect)
- #765 — static structural regression test for gate configuration
- #2157 — merged trigger addition (`pull_request_review` types: [submitted])
- #2020 — milestone confirming gate infrastructure is stable
- RFC#324 — qa-review + security-review design
-18
View File
@@ -858,24 +858,6 @@ fi
if echo "$AGENT_TEXT" | grep -qiE "exceeded your current quota|insufficient_quota"; then
fail "A2A — PROVIDER QUOTA EXHAUSTED (NOT a platform regression). Operator action: top up MOLECULE_STAGING_OPENAI_API_KEY billing or rotate to a higher-quota org at Settings → Secrets and Variables → Actions. Tracked in #2578. Raw: $AGENT_TEXT"
fi
# Empty-completion class — the agent runtime reached the LLM and got a
# 2xx back, but the assistant turn carried NO text part (empty content,
# or tool_calls/reasoning-only with no surfaced text), so the runtime
# returns the literal "Error: message contained no text content." as its
# reply text. Steps 0-7 passing means the platform is healthy (CP up,
# tenant provisioned, workspace online + routable, A2A delivery e2e); the
# break is the configured completion BACKEND returning an empty turn — a
# model/provider-side regression, NOT a workspace-server or harness bug,
# and NOT NOT_CONFIGURED (that fails earlier, at boot). Name it explicitly
# so the canary alert points at the model, not the platform: a generic
# "error-shaped response" misdirects triage to workspace-server. Observed
# 2026-06-03/04 across every staging canary on MODEL_SLUG=MiniMax-M2 (the
# canary default since #2710) — 100% on the parent's first cold turn,
# identical on main's scheduled synthetic E2E and on PRs (so it is an
# environmental backend regression, never PR-introduced).
if echo "$AGENT_TEXT" | grep -qiF "message contained no text content"; then
fail "A2A — EMPTY COMPLETION (backend regression, NOT a platform/workspace-server bug). The configured model (MODEL_SLUG=${MODEL_SLUG:-?}) returned a 2xx completion with no text part; the runtime surfaced 'message contained no text content.'. Operator action: check the staging LLM backend / proxy for the canary model (MiniMax-M2 since #2710) — empty assistant turns, not an auth/quota/boot fault. Raw: $AGENT_TEXT"
fi
# Generic catch-all — falls through if none of the known regressions hit.
if echo "$AGENT_TEXT" | grep -qiE "error|exception"; then
fail "A2A returned an error-shaped response: $AGENT_TEXT"
+5 -56
View File
@@ -26,12 +26,11 @@ import (
// the update cycle — no ssh, no re-provision, no ops toil.
//
// Contract (paired with cp-side GET /cp/tenants/config):
//
// Request: GET {MOLECULE_CP_URL or https://api.moleculesai.app}/cp/tenants/config
// Authorization: Bearer <ADMIN_TOKEN>
// X-Molecule-Org-Id: <MOLECULE_ORG_ID>
// Response: 200 {"MOLECULE_CP_SHARED_SECRET":"…","MOLECULE_CP_URL":"…", …}
// 401 on bearer mismatch or unknown org
// Request: GET {MOLECULE_CP_URL or https://api.moleculesai.app}/cp/tenants/config
// Authorization: Bearer <ADMIN_TOKEN>
// X-Molecule-Org-Id: <MOLECULE_ORG_ID>
// Response: 200 {"MOLECULE_CP_SHARED_SECRET":"…","MOLECULE_CP_URL":"…", …}
// 401 on bearer mismatch or unknown org
//
// Best-effort: any failure logs and returns — main() keeps booting.
// Self-hosted deploys without MOLECULE_ORG_ID or ADMIN_TOKEN set
@@ -106,53 +105,3 @@ func refreshEnvFromCP() error {
log.Printf("CP env refresh: applied %d values from %s/cp/tenants/config", applied, base)
return nil
}
// requiredLLMEnvVars is the set of LLM proxy env vars a managed SaaS
// tenant must have populated after refreshEnvFromCP. cp#469 (tenant
// proxy-env delivery) — guaranteed CP-delivered creds reach the
// tenant process env on boot. Per Researcher Task #37 / Spec 2 and
// Task #46 (watch-fail-first test).
//
// Key set byte-matched against Researcher's verified emission in
// controlplane tenant_config.go:140-144 (Researcher REQUEST_CHANGES
// iterate body, 3987f59c). The four keys below ARE the LLM-proxy
// subset of the 8 CP-emitted keys; OPENAI_BASE_URL / OPENAI_API_KEY /
// ANTHROPIC_BASE_URL / ANTHROPIC_API_KEY are out of scope for cp#469
// (different feature surfaces — direct-to-provider fallbacks, not
// the proxy). v2 fix: MOLECULE_LLM_USAGE_TOKEN, MOLECULE_LLM_USAGE_URL,
// MOLECULE_LLM_BASE_URL, MOLECULE_LLM_ANTHROPIC_BASE_URL — note the
// 4th key is namespaced MOLECULE_LLM_ANTHROPIC_BASE_URL, NOT bare
// ANTHROPIC_BASE_URL. Bare ANTHROPIC_BASE_URL is a separate CP-emitted
// key for direct-provider use, not the LLM proxy.
var requiredLLMEnvVars = []string{
"MOLECULE_LLM_USAGE_TOKEN",
"MOLECULE_LLM_USAGE_URL", // CRITICAL fix v2: was MOLECULE_LLM_URL in v1
"MOLECULE_LLM_BASE_URL",
"MOLECULE_LLM_ANTHROPIC_BASE_URL", // CRITICAL fix v3: was ANTHROPIC_BASE_URL in v2 (different key!)
}
// assertManagedTenantHasLLMEnv verifies that, when running as a
// managed SaaS tenant (MOLECULE_ORG_ID + ADMIN_TOKEN both set), all
// required LLM proxy env vars are populated after refreshEnvFromCP.
//
// Self-hosted (no orgID/adminToken) is exempt — dev must not be
// blocked here. Managed tenants with missing LLM keys fail with
// MISSING_CP_LLM_ENV so they do not silently boot with broken proxy
// creds. Caller in main.go decides whether to log and continue or
// log.Fatalf depending on deployment context.
func assertManagedTenantHasLLMEnv() error {
if os.Getenv("MOLECULE_ORG_ID") == "" || os.Getenv("ADMIN_TOKEN") == "" {
// Self-hosted dev / not yet provisioned — not a managed tenant.
return nil
}
var missing []string
for _, k := range requiredLLMEnvVars {
if os.Getenv(k) == "" {
missing = append(missing, k)
}
}
if len(missing) > 0 {
return fmt.Errorf("MISSING_CP_LLM_ENV: required LLM proxy keys not set after refreshEnvFromCP: %v", missing)
}
return nil
}
@@ -5,7 +5,6 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
)
@@ -60,138 +59,6 @@ func TestRefreshEnvFromCP_AppliesCPResponse(t *testing.T) {
}
}
// TestRefreshEnvFromCP_ManagedTenantRequiresLLMKeys: watch-fail-first
// per Researcher Task #46. When running as a managed tenant
// (MOLECULE_ORG_ID + ADMIN_TOKEN set), missing LLM proxy env vars
// after refreshEnvFromCP MUST surface as MISSING_CP_LLM_ENV, not be
// silently accepted. Without this guard, a CP that loses its LLM
// creds (e.g. during an incident) would let a tenant boot and then
// fail later at first LLM call — worse than a loud refusal here.
func TestRefreshEnvFromCP_ManagedTenantRequiresLLMKeys(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Stub CP returns a CP response WITHOUT any of the required
// LLM keys — simulates the failure mode where the CP side
// dropped or never had the LLM creds for this org.
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"MOLECULE_CP_SHARED_SECRET":"x","MOLECULE_CP_URL":"https://api.moleculesai.app"}`)
}))
defer srv.Close()
t.Setenv("MOLECULE_ORG_ID", "org-managed-1")
t.Setenv("ADMIN_TOKEN", "admin-tok")
t.Setenv("MOLECULE_CP_URL", srv.URL)
// Clear all LLM keys to simulate the boot-without-LLM-env failure mode.
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "")
t.Setenv("MOLECULE_LLM_USAGE_URL", "")
t.Setenv("MOLECULE_LLM_BASE_URL", "")
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "")
// refreshEnvFromCP itself should succeed — CP is reachable, returned 200.
if err := refreshEnvFromCP(); err != nil {
t.Fatalf("refreshEnvFromCP: %v", err)
}
// The boot assertion must catch the missing LLM keys.
err := assertManagedTenantHasLLMEnv()
if err == nil {
t.Fatal("expected MISSING_CP_LLM_ENV error for managed tenant without LLM keys, got nil")
}
if !strings.Contains(err.Error(), "MISSING_CP_LLM_ENV") {
t.Errorf("expected error to contain MISSING_CP_LLM_ENV, got: %v", err)
}
}
// TestRefreshEnvFromCP_ManagedTenantHappyPath: when the CP returns
// all 4 LLM-proxy keys, the gate must PASS — no MISSING_CP_LLM_ENV
// for a properly-configured managed tenant. Watch-fail counterpart
// to TestRefreshEnvFromCP_ManagedTenantRequiresLLMKeys: if THIS test
// ever fires MISSING_CP_LLM_ENV on the byte-correct key set, the
// requiredLLMEnvVars list has drifted from the CP emission again.
// Per Researcher REQUEST_CHANGES TEST ADEQUACY note.
func TestRefreshEnvFromCP_ManagedTenantHappyPath(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// Return ALL 4 LLM-proxy keys — names byte-matched to
// tenant_config.go:140-144 CP emission.
fmt.Fprint(w, `{"MOLECULE_LLM_USAGE_TOKEN":"tok-1","MOLECULE_LLM_USAGE_URL":"https://llm.example.com/usage","MOLECULE_LLM_BASE_URL":"https://llm.example.com","MOLECULE_LLM_ANTHROPIC_BASE_URL":"https://llm.example.com/anthropic"}`)
}))
defer srv.Close()
t.Setenv("MOLECULE_ORG_ID", "org-managed-happy")
t.Setenv("ADMIN_TOKEN", "admin-tok")
t.Setenv("MOLECULE_CP_URL", srv.URL)
// Pre-clear so we can verify the refresh actually populated them.
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "")
t.Setenv("MOLECULE_LLM_USAGE_URL", "")
t.Setenv("MOLECULE_LLM_BASE_URL", "")
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "")
if err := refreshEnvFromCP(); err != nil {
t.Fatalf("refreshEnvFromCP: %v", err)
}
// Sanity: refresh actually applied the keys.
if got := os.Getenv("MOLECULE_LLM_USAGE_TOKEN"); got != "tok-1" {
t.Errorf("refresh did not apply USAGE_TOKEN: got %q", got)
}
// The boot assertion must pass — no MISSING_CP_LLM_ENV.
if err := assertManagedTenantHasLLMEnv(); err != nil {
t.Errorf("managed happy path must not MISSING_CP_LLM_ENV, got: %v", err)
}
}
// TestRefreshEnvFromCP_ManagedTenantPartialEnv: when the CP returns
// 3 of 4 LLM-proxy keys (one missing), the gate must STILL catch it
// and the error must name the missing key. Per Researcher
// REQUEST_CHANGES TEST ADEQUACY note — partial-env coverage is
// critical because the production failure mode is usually "one
// key dropped" not "all keys dropped".
func TestRefreshEnvFromCP_ManagedTenantPartialEnv(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// 3 of 4 — MOLECULE_LLM_ANTHROPIC_BASE_URL is missing.
fmt.Fprint(w, `{"MOLECULE_LLM_USAGE_TOKEN":"tok-1","MOLECULE_LLM_USAGE_URL":"https://llm.example.com/usage","MOLECULE_LLM_BASE_URL":"https://llm.example.com"}`)
}))
defer srv.Close()
t.Setenv("MOLECULE_ORG_ID", "org-managed-partial")
t.Setenv("ADMIN_TOKEN", "admin-tok")
t.Setenv("MOLECULE_CP_URL", srv.URL)
// Pre-clear all 4 so the 3 that come back from CP are the only
// ones set; the 4th (MOLECULE_LLM_ANTHROPIC_BASE_URL) stays empty.
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "")
t.Setenv("MOLECULE_LLM_USAGE_URL", "")
t.Setenv("MOLECULE_LLM_BASE_URL", "")
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "")
if err := refreshEnvFromCP(); err != nil {
t.Fatalf("refreshEnvFromCP: %v", err)
}
err := assertManagedTenantHasLLMEnv()
if err == nil {
t.Fatal("expected MISSING_CP_LLM_ENV for partial env (3 of 4 keys), got nil")
}
if !strings.Contains(err.Error(), "MISSING_CP_LLM_ENV") {
t.Errorf("expected error to contain MISSING_CP_LLM_ENV, got: %v", err)
}
if !strings.Contains(err.Error(), "MOLECULE_LLM_ANTHROPIC_BASE_URL") {
t.Errorf("expected error to name the missing key MOLECULE_LLM_ANTHROPIC_BASE_URL, got: %v", err)
}
}
// TestAssertManagedTenantHasLLMEnv_NotManagedIsNoop: self-hosted
// (no orgID/adminToken) must NOT block on missing LLM keys — dev
// ergonomics matter and the assertion's contract is "managed only".
func TestAssertManagedTenantHasLLMEnv_NotManagedIsNoop(t *testing.T) {
t.Setenv("MOLECULE_ORG_ID", "")
t.Setenv("ADMIN_TOKEN", "")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "")
t.Setenv("MOLECULE_LLM_USAGE_URL", "")
t.Setenv("MOLECULE_LLM_BASE_URL", "")
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "")
if err := assertManagedTenantHasLLMEnv(); err != nil {
t.Errorf("self-hosted (not managed) must not block, got: %v", err)
}
}
// TestRefreshEnvFromCP_CPUnreachableDoesNotFailBoot: network errors must
// return non-nil BUT main.go treats that as warn-and-continue. We assert
// the function returns an error (not a panic) so the caller can log.
-20
View File
@@ -82,16 +82,6 @@ func main() {
log.Printf("CP env refresh: %v (continuing with baked-in env)", err)
}
// Managed-tenant boot assertion (cp#469 — tenant proxy-env delivery).
// If we're a managed SaaS tenant (orgID + adminToken set), all required
// LLM proxy env vars must be present after refresh. Missing keys block
// the tenant from booting with broken LLM creds — silent-fail is worse
// than a loud refusal. Self-hosted (no orgID/adminToken) short-circuits
// inside the assertion, so this never fires for dev.
if err := assertManagedTenantHasLLMEnv(); err != nil {
log.Fatalf("Managed tenant boot assertion: %v", err)
}
// Secrets encryption. In MOLECULE_ENV=prod, boot refuses to start
// without a valid SECRETS_ENCRYPTION_KEY (fail-secure — Top-5 #5).
// In any other environment, missing keys just log a warning and
@@ -359,16 +349,6 @@ func main() {
codexauth.StartCodexAuthRefresher(c, db.DB)
})
// RFC internal#742 Part 2: wire the boot-failure rescue capture into
// the provision-timeout sweep's failure verdict. When the sweep flips
// a stuck workspace to `failed`, this hook captures a forensic rescue
// bundle off the still-running (but boot-failed) EC2 and ships it to
// obs/Loki before the control plane reaps the instance. Best-effort +
// non-blocking (handlers.BootFailureRescueHook dispatches on its own
// goroutine + timeout). The handler-side boot-failure path
// (WorkspaceHandler.BootstrapFailed) wires its own capture inline.
registry.BootFailureRescueHook = handlers.BootFailureRescueHook
// Provision-timeout sweep — flips workspaces that have been stuck in
// status='provisioning' past the timeout window to 'failed' and emits
// WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic
+2 -4
View File
@@ -149,11 +149,9 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
models.StatusFailed, msg, wsID); dbErr != nil {
log.Printf("bundle import: failed to mark workspace %s as failed: %v", wsID, dbErr)
}
if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
"error": msg,
}); bcErr != nil {
log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr)
}
})
}
func nilIfEmpty(s string) interface{} {
@@ -407,14 +407,12 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
// Broadcast event
if m.broadcaster != nil {
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
"channel_id": ch.ID,
"channel_type": ch.ChannelType,
"username": msg.Username,
"direction": "inbound",
}); err != nil {
log.Printf("Channels: failed to broadcast inbound event: %v", err)
}
})
}
return nil
@@ -455,13 +453,11 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
}
if m.broadcaster != nil {
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
"channel_id": ch.ID,
"channel_type": ch.ChannelType,
"direction": "outbound",
}); err != nil {
log.Printf("Channels: failed to broadcast outbound event: %v", err)
}
})
}
return nil
@@ -517,9 +517,7 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
// Acknowledge the button press (removes loading spinner)
ackCfg := tgbotapi.NewCallback(cb.ID, "Received")
if _, err := bot.Send(ackCfg); err != nil {
log.Printf("telegram: failed to send callback ack: %v", err)
}
bot.Send(ackCfg)
// Update the message to show what was clicked
decision := "approved"
@@ -531,9 +529,7 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
cb.Message.MessageID,
cb.Message.Text+"\n\n✅ CEO "+decision,
)
if _, err := bot.Send(editMsg); err != nil {
log.Printf("telegram: failed to send edit message: %v", err)
}
bot.Send(editMsg)
// Route the decision as an inbound message to the agent
inbound := &InboundMessage{
@@ -383,48 +383,23 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
}
summary := a2aMethod + " → " + wsNameForLog
toolTrace := extractToolTrace(respBody)
// DATA-LOSS FIX (internal#470 / #1347 push-mode sibling): this
// a2a_receive row is the ONLY durable record of a push-mode chat
// round-trip — request_body carries the user's message, response_body
// carries the agent's reply, and chat-history hydration
// (messagestore.PostgresMessageStore) reads BOTH back to rebuild the
// transcript on canvas reopen / reload. It MUST be written
// SYNCHRONOUSLY, before proxyA2ARequest returns and ProxyA2A flushes
// the 200 to the canvas — otherwise the canvas sees the reply
// acknowledged (and rendered optimistically) while the row is still
// racing in a detached goroutine, and a reload (or a workspace-server
// restart / deploy / OOM) between the 200 and the goroutine's commit
// loses the message permanently on reopen.
//
// This mirrors the discipline already applied to the poll-mode ingest
// path (logA2AReceiveQueued / persistUserMessageAtIngest); the
// push-mode counterpart was left async, which the E2E Chat
// "history persists across reload" test surfaced as an intermittent
// red (the reload out-raced the INSERT).
//
// - context.WithoutCancel: a client disconnect on chat-exit (which
// cancels the inbound request ctx) MUST NOT abort this write.
// - SYNCHRONOUS (no goAsync): the row must be durable before the 200.
// - Best-effort: LogActivity logs+swallows INSERT errors internally,
// so a DB hiccup never blocks or fails the user's send — behaviour
// for that one request is never worse than the pre-fix async path.
// - The post-commit ACTIVITY_LOGGED broadcast still fires inside
// LogActivity; the durable row is the truth the canvas re-reads.
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
ResponseBody: json.RawMessage(respBody),
ToolTrace: toolTrace,
DurationMs: &durationMs,
Status: logStatus,
parent := ctx
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
ResponseBody: json.RawMessage(respBody),
ToolTrace: toolTrace,
DurationMs: &durationMs,
Status: logStatus,
})
})
if callerID == "" && statusCode < 400 {
@@ -60,10 +60,10 @@ func sanitizeErrorDetailForBroadcast(s string) string {
}
type ActivityHandler struct {
broadcaster events.EventEmitter
broadcaster *events.Broadcaster
}
func NewActivityHandler(b events.EventEmitter) *ActivityHandler {
func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
return &ActivityHandler{broadcaster: b}
}
File diff suppressed because it is too large Load Diff
@@ -54,29 +54,23 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
return
}
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
"approval_id": approvalID,
"action": body.Action,
"reason": body.Reason,
"task_id": body.TaskID,
}); err != nil {
log.Printf("approvals: failed to broadcast approval requested: %v", err)
}
})
// Auto-escalate to parent
var parentID *string
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
log.Printf("approvals: failed to lookup parent for escalation: %v", err)
}
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
if parentID != nil {
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
"approval_id": approvalID,
"from_workspace_id": workspaceID,
"action": body.Action,
"reason": body.Reason,
}); err != nil {
log.Printf("approvals: failed to broadcast approval escalated: %v", err)
}
})
}
c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"})
@@ -227,13 +221,11 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) {
eventType = "APPROVAL_DENIED"
}
if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
"approval_id": approvalID,
"decision": body.Decision,
"decided_by": decidedBy,
}); err != nil {
log.Printf("approvals: failed to broadcast approval decision: %v", err)
}
})
c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID})
}
@@ -196,15 +196,10 @@ func resolveWorkspaceForwardCreds(c *gin.Context, ctx context.Context, workspace
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "workspace url not registered yet"})
return "", "", false
}
// Defense-in-depth for #2316: workspaces.url is validated at
// registration time, but the DB row can be stale/tampered and the
// SSRF policy can tighten. Re-validate immediately before attaching
// the inbound secret to an outbound forward.
if err := isSafeURL(wsURL); err != nil {
log.Printf("chat_files %s: unsafe workspace URL for %s rejected: %v", op, workspaceID, err)
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace URL not allowed"})
return "", "", false
}
// Trust note: workspaces.url passes validateAgentURL at /registry/
// register write time, blocking SSRF-shaped URLs. We rely on that
// upstream gate rather than re-validating here. Tracked at #2316
// for follow-up: forward-time re-validation as defense-in-depth.
secret, healed, err := readOrLazyHealInboundSecret(ctx, workspaceID, "chat_files "+op)
if err != nil {
@@ -414,56 +414,6 @@ func TestChatUpload_WorkspaceUnreachable(t *testing.T) {
}
}
func TestChatUpload_RejectsMetadataWorkspaceURL(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
restore := setSSRFCheckForTest(true)
t.Cleanup(restore)
wsID := "00000000-0000-0000-0000-000000000047"
expectURL(mock, wsID, "http://169.254.169.254/latest/meta-data")
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil, nil))
body, ct := uploadFixture(t)
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for metadata workspace URL, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "workspace URL not allowed") {
t.Errorf("expected unsafe URL error, got: %s", w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestChatUpload_RejectsNonHTTPWorkspaceURL(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
restore := setSSRFCheckForTest(true)
t.Cleanup(restore)
wsID := "00000000-0000-0000-0000-000000000048"
expectURL(mock, wsID, "file:///etc/passwd")
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil, nil))
body, ct := uploadFixture(t)
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for non-HTTP workspace URL, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "workspace URL not allowed") {
t.Errorf("expected unsafe URL error, got: %s", w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestChatUpload_BodyUnderCap_Forwards pins the lower edge of the new
// 100 MB body cap (CTO 2026-05-19 directive on forensic a99ab0a1).
// A multipart payload comfortably under the cap must reach the
@@ -696,54 +646,6 @@ func TestChatDownload_NoInboundSecret_LazyHealFailure(t *testing.T) {
}
}
func TestChatDownload_RejectsMetadataWorkspaceURL(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
restore := setSSRFCheckForTest(true)
t.Cleanup(restore)
wsID := "00000000-0000-0000-0000-000000000054"
expectURL(mock, wsID, "http://169.254.169.254/latest/meta-data")
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil, nil))
c, w := makeDownloadRequest(t, wsID, "/workspace/foo.txt")
h.Download(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for metadata workspace URL, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "workspace URL not allowed") {
t.Errorf("expected unsafe URL error, got: %s", w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestChatDownload_RejectsNonHTTPWorkspaceURL(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
restore := setSSRFCheckForTest(true)
t.Cleanup(restore)
wsID := "00000000-0000-0000-0000-000000000055"
expectURL(mock, wsID, "file:///etc/passwd")
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil, nil))
c, w := makeDownloadRequest(t, wsID, "/workspace/foo.txt")
h.Download(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for non-HTTP workspace URL, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "workspace URL not allowed") {
t.Errorf("expected unsafe URL error, got: %s", w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestChatDownload_ForwardsToWorkspace_HappyPath(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
@@ -102,10 +102,10 @@ func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, st
// and the A2A request runs in the background.
type DelegationHandler struct {
workspace *WorkspaceHandler
broadcaster events.EventEmitter
broadcaster *events.Broadcaster
}
func NewDelegationHandler(wh *WorkspaceHandler, b events.EventEmitter) *DelegationHandler {
func NewDelegationHandler(wh *WorkspaceHandler, b *events.Broadcaster) *DelegationHandler {
return &DelegationHandler{workspace: wh, broadcaster: b}
}
@@ -372,78 +372,3 @@ func TestApplyPlatformManagedLLMEnv_WorkspaceOriginCredExemptFromStrip(t *testin
t.Errorf("sqlmock expectations: %v", err)
}
}
// TestApplyPlatformManagedLLMEnv_MissingProxyEnvFailClosed is the #2162
// regression guard. A platform-managed workspace whose CP proxy env is absent
// must NOT start credential-less. The empty-proxy path must return
// HasUsableLLMCred=false so the caller aborts with MISSING_PLATFORM_PROXY.
//
// Mutation: revert the early-return from HasUsableLLMCred=false to true
// → workspace starts with zero credential → "container started but never
// called /registry/register" (600s provision-timeout sweep) → this test RED.
func TestApplyPlatformManagedLLMEnv_MissingProxyEnvFailClosed(t *testing.T) {
ctx := context.Background()
const wsID = "29b95be9-811e-4857-be36-1dafdbf4f697" // adk-demo failure workspace
mock := setupTestDB(t)
expectOverrideQuery(mock, wsID, "")
// No proxy env present — simulates the boot-race / misconfig path.
envVars := map[string]string{}
res := applyPlatformManagedLLMEnv(ctx, envVars, wsID, "claude-code", "moonshot/kimi-k2.6", nil)
if res.ResolvedMode != LLMBillingModePlatformManaged {
t.Fatalf("platform-managed model must stay platform_managed, got %q (source=%s)", res.ResolvedMode, res.Source)
}
// THE FIX: must NOT report usable credential when none was injected.
if res.HasUsableLLMCred {
t.Fatalf("empty proxy env → HasUsableLLMCred must be false (fail-closed), got true — the #2162 dark-wedge class")
}
// No credential env must be present.
if _, present := envVars["ANTHROPIC_API_KEY"]; present {
t.Errorf("empty proxy env must NOT inject ANTHROPIC_API_KEY")
}
if _, present := envVars["MOLECULE_LLM_USAGE_TOKEN"]; present {
t.Errorf("empty proxy env must NOT inject MOLECULE_LLM_USAGE_TOKEN")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
// TestApplyPlatformManagedLLMEnv_ProxyEnvPresentInjectsCredential is the
// positive-path pair to the #2162 regression guard: when the CP proxy env IS
// present, the platform-managed path must inject ANTHROPIC_API_KEY +
// ANTHROPIC_BASE_URL for an Anthropic-native runtime and report
// HasUsableLLMCred=true.
func TestApplyPlatformManagedLLMEnv_ProxyEnvPresentInjectsCredential(t *testing.T) {
ctx := context.Background()
const wsID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
mock := setupTestDB(t)
expectOverrideQuery(mock, wsID, "")
envVars := map[string]string{}
// Simulate the CP proxy env being present (as it is in production).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.moleculesai.app/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "https://api.moleculesai.app/api/v1/internal/llm/anthropic/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "PLATFORM-PROXY-TOKEN")
res := applyPlatformManagedLLMEnv(ctx, envVars, wsID, "claude-code", "moonshot/kimi-k2.6", nil)
if res.ResolvedMode != LLMBillingModePlatformManaged {
t.Fatalf("expected platform_managed, got %q", res.ResolvedMode)
}
if !res.HasUsableLLMCred {
t.Fatalf("proxy env present → HasUsableLLMCred must be true, got false")
}
if envVars["ANTHROPIC_API_KEY"] != "PLATFORM-PROXY-TOKEN" {
t.Errorf("ANTHROPIC_API_KEY must be injected with the platform proxy token; got %q", envVars["ANTHROPIC_API_KEY"])
}
if envVars["ANTHROPIC_BASE_URL"] != "https://api.moleculesai.app/api/v1/internal/llm/anthropic/v1" {
t.Errorf("ANTHROPIC_BASE_URL must be injected with the platform anthropic proxy; got %q", envVars["ANTHROPIC_BASE_URL"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
@@ -17,7 +17,6 @@ package handlers
import (
"fmt"
"sort"
"strings"
)
@@ -56,95 +55,3 @@ func validateRegisteredModelForRuntime(runtime, model string) (bool, string) {
"model %q is not a registered model for runtime %q; pick one of the runtime's registered models (provider-registry SSOT, internal#718)",
model, runtime)
}
// validateDerivedProviderInRegistry (issue #2172) is the provider-side companion
// to validateRegisteredModelForRuntime. The model-side check asks "is this
// (runtime, model) in the registry?"; the provider-side check asks "is the
// provider this model DERIVES to — the same one the adapter will resolve at
// boot — a known provider in providers.yaml?"
//
// Live trigger (adk-demo Assistant, 2026-06-03): workspace config
// `model=moonshot/kimi-k2.6` (claude-code) → adapter derives `provider=moonshot`
// → `ValueError: provider=moonshot not in providers registry` at BOOT. The
// save was accepted (no validation at the API boundary), and the failure only
// surfaced when the agent tried to register. CI never saw it. The drift gate
// (RFC#580) validates TEMPLATES against the registry, NOT per-workspace
// configs; the existing model-side check rejects a model the runtime doesn't
// own but says nothing about the DERIVED provider's registry membership.
//
// Returns:
//
// (true, "") — pass: model is empty (MODEL_REQUIRED owns it), the
// runtime is not in the registry (fail-open for
// federated / non-first-party runtimes — mirror of the
// model-side check's federation contract), the registry
// failed to load (build-time gate owns it), OR the
// derived provider name is a known provider in the
// registry's `providers:` list.
// (false, reason) — reject: a known (runtime, model) pair derives to a
// provider name absent from the providers list. This is
// the structural class the adk-demo boot failure belongs
// to — the registry's `runtimes:` block references a
// provider not declared in `providers:`, which by
// construction is a registry-data bug. Catching it at
// config-SAVE keeps it out of the agent-boot path.
//
// Defense-in-depth: by construction, a model in a runtime's native provider set
// has a provider that IS in the catalog (the runtime ref names a provider from
// the providers list). So the rejection path is primarily a registry-consistency
// guard. The real value is the FAIL-LOUD semantics — any future drift between
// `providers:` and `runtimes:` fails the create call with a clear pointer to
// the missing provider, instead of silently wedging the agent at boot.
func validateDerivedProviderInRegistry(runtime, model string) (bool, string) {
model = strings.TrimSpace(model)
if model == "" {
return true, "" // MODEL_REQUIRED owns this.
}
m, err := providerRegistry()
if err != nil || m == nil {
// Registry unavailable (build-time defect the gates catch). Fail open —
// do not block create on a registry-load failure.
return true, ""
}
// DeriveProvider is fail-closed for unknown runtimes. Mirror the
// model-side check's federation contract: a runtime the registry does
// NOT know (langgraph / external / kimi / mock / federated) is allowed
// to pass through. DeriveProvider's `unknown runtime` error IS that
// signal — treat it as fail-open, identical to ModelsForRuntime's
// not-found behavior above.
p, err := m.DeriveProvider(runtime, model, nil)
if err != nil {
// Either the runtime is unknown (fail-open by contract) OR the model
// is not native to the runtime (the model-side validator already
// rejected this — DeriveProvider's error here means
// validateRegisteredModelForRuntime should have caught it. Don't
// double-reject: pass through and let the model-side response own
// the message).
return true, ""
}
// Defense-in-depth: confirm the DERIVED provider is a known entry in the
// providers list. By construction it should be (DeriveProvider only
// returns a Provider that was looked up by name from `providers:`), but
// a future federation merge could introduce a runtime ref pointing at a
// contributed provider absent from the core catalog. Reject loudly here
// rather than letting the save reach the agent-boot path and wedge with
// "provider=X not in providers registry" (the original adk-demo class).
for _, candidate := range m.Providers {
if candidate.Name == p.Name {
return true, ""
}
}
// Build a sorted, comma-separated list of valid provider names so the
// operator/caller sees the actionable list (the boot-time error message
// the adk-demo class produced does NOT include this — the fix is to
// surface it at the API boundary, where the caller can fix the request
// without a stuck workspace + operator page).
valid := make([]string, 0, len(m.Providers))
for _, c := range m.Providers {
valid = append(valid, c.Name)
}
sort.Strings(valid)
return false, fmt.Sprintf(
"derived provider %q (for model %q on runtime %q) is not in the providers registry; pick a model whose derived provider is one of: %s",
p.Name, model, runtime, strings.Join(valid, ", "))
}
@@ -6,17 +6,8 @@ package handlers
// fail OPEN (allow) for a runtime the registry doesn't know yet (federation /
// langgraph/etc. not in the first-party registry) so the existing knownRuntimes
// gate stays authoritative there.
//
// TestValidateDerivedProviderInRegistry (issue #2172) is the provider-side
// companion: once the model-side check passes, confirm the DERIVED provider
// (the one the adapter will resolve at boot) is a known provider in
// providers.yaml. Catches the adk-demo "provider=X not in providers registry"
// class at config-SAVE time instead of letting it wedge the agent at boot.
import (
"strings"
"testing"
)
import "testing"
func TestValidateRegisteredModelForRuntime(t *testing.T) {
type tc struct {
@@ -89,163 +80,3 @@ func TestValidateRegisteredModelForRuntime(t *testing.T) {
})
}
}
func TestValidateDerivedProviderInRegistry(t *testing.T) {
type tc struct {
name string
runtime string
model string
wantOK bool
// wantReasonContains: a substring the rejection reason must include
// (skipped for OK cases). Pins the actionable list / derivation pointer
// so the caller knows which provider was missing and what the valid
// set looks like — this is the fix that distinguishes the new gate
// from the boot-time "provider=X not in providers registry" string
// it replaces.
wantReasonContains string
}
cases := []tc{
// PASS — every native (runtime, model) in the catalog derives to a
// provider that IS in the providers list. These are the live corpus
// entries; the test pins the registry-consistency invariant.
{
name: "claude_code_anthropic_api_native",
runtime: "claude-code",
model: "claude-sonnet-4-6",
wantOK: true,
},
{
name: "claude_code_kimi_coding_native",
runtime: "claude-code",
model: "kimi-for-coding",
wantOK: true,
},
{
name: "claude_code_minimax_native",
runtime: "claude-code",
model: "MiniMax-M2.7",
wantOK: true,
},
{
name: "claude_code_platform_namespaced",
runtime: "claude-code",
model: "moonshot/kimi-k2.6",
wantOK: true,
},
{
name: "codex_openai_subscription_default_arm",
runtime: "codex",
model: "gpt-5.5",
wantOK: true,
},
{
name: "codex_platform_namespaced",
runtime: "codex",
model: "openai/gpt-5.4-mini",
wantOK: true,
},
{
name: "hermes_kimi_coding",
runtime: "hermes",
model: "kimi-coding/kimi-k2",
wantOK: true,
},
{
name: "hermes_platform_namespaced",
runtime: "hermes",
model: "moonshot/kimi-k2.6",
wantOK: true,
},
{
name: "openclaw_kimi_coding",
runtime: "openclaw",
model: "moonshot:kimi-k2.6",
wantOK: true,
},
// FAIL — model-side validator catches this, but the provider-side
// gate is called AFTER it in Create and inherits the fail-open
// contract for "model is not native to runtime" (DeriveProvider
// errors → allow, letting the model-side response own the message).
// This is the deliberate "don't double-reject" decision.
{
name: "unregistered_model_pass_through_to_model_side",
runtime: "claude-code",
model: "totally-made-up-model-xyz",
wantOK: true, // pass-through: model-side validator owns the rejection
},
// Federation contract — mirror of the model-side test above.
{
name: "langgraph_runtime_failopen",
runtime: "langgraph",
model: "anything-goes",
wantOK: true,
},
{
name: "external_runtime_failopen",
runtime: "external",
model: "whatever",
wantOK: true,
},
// Empty model — MODEL_REQUIRED owns it; allow.
{
name: "empty_model_allowed_other_gate_owns_it",
runtime: "claude-code",
model: "",
wantOK: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
ok, why := validateDerivedProviderInRegistry(c.runtime, c.model)
if ok != c.wantOK {
t.Errorf("validateDerivedProviderInRegistry(%q,%q) ok=%v want %v (reason=%q)",
c.runtime, c.model, ok, c.wantOK, why)
}
if !c.wantOK && c.wantReasonContains != "" && !strings.Contains(why, c.wantReasonContains) {
t.Errorf("rejection reason missing %q: got %q", c.wantReasonContains, why)
}
})
}
}
// TestRegistryConsistency_AllNativeModelsDeriveToKnownProvider walks every
// (runtime, model) pair in the registry's native model sets and asserts each
// one derives to a provider that IS in the providers list. This is the
// static regression gate the issue calls for ("a CI test fails if any shipped
// demo/template config references an unregistered provider") — generalized
// to the catalog as a whole: if anyone edits providers.yaml such that a
// `runtimes:` block names a provider absent from `providers:`, this test
// fires before the bad config can reach a customer workspace.
//
// By construction this invariant should always hold (DeriveProvider only
// returns a Provider that was looked up by name from `providers:`), so the
// test primarily guards against future federation merges that introduce a
// runtime ref pointing at a contributed provider absent from the core
// catalog — exactly the failure shape the adk-demo Assistant wedge
// belongs to.
func TestRegistryConsistency_AllNativeModelsDeriveToKnownProvider(t *testing.T) {
m, err := providerRegistry()
if err != nil || m == nil {
t.Skipf("providerRegistry unavailable in test env (err=%v); skipping consistency walk", err)
}
providerNames := make(map[string]struct{}, len(m.Providers))
for _, p := range m.Providers {
providerNames[p.Name] = struct{}{}
}
for runtimeName, runtime := range m.Runtimes {
for _, ref := range runtime.Providers {
for _, modelID := range ref.Models {
p, err := m.DeriveProvider(runtimeName, modelID, nil)
if err != nil {
t.Errorf("catalog invariant broken: runtime=%q model=%q failed DeriveProvider: %v",
runtimeName, modelID, err)
continue
}
if _, ok := providerNames[p.Name]; !ok {
t.Errorf("catalog invariant broken: runtime=%q model=%q derives to provider %q which is not in the providers list (refs=%q)",
runtimeName, modelID, p.Name, ref.Name)
}
}
}
}
}
@@ -171,9 +171,11 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context,
log.Printf("Plugin uninstall: skipping invalid skill name %q in %s: %v", skill, pluginName, err)
continue
}
_, _ = h.execAsRoot(ctx, containerName, []string{
if _, rmErr := h.execAsRoot(ctx, containerName, []string{
"rm", "-rf", "/configs/skills/" + skill,
})
}); rmErr != nil {
log.Printf("Plugin uninstall: failed to remove skill %s from %s: %v", skill, workspaceID, rmErr)
}
}
// 3. Delete the plugin directory itself (as root to handle file ownership).
@@ -417,7 +417,9 @@ func (h *PluginsHandler) stripPluginMarkersFromMemory(ctx context.Context, conta
`awk 'BEGIN{skip=0; blanks=0} /^%s/{skip=1; blanks=0; next} skip==1 && /^[[:space:]]*$/{blanks++; if(blanks>=2){skip=0; print; next} next} /^# Plugin: /{if(skip==1)skip=0} skip==1{next} {print}' /configs/CLAUDE.md > /tmp/claude.new && mv /tmp/claude.new /configs/CLAUDE.md`,
regexpEscapeForAwk(marker),
)
_, _ = h.execAsRoot(ctx, containerName, []string{"bash", "-c", script})
if _, awkErr := h.execAsRoot(ctx, containerName, []string{"bash", "-c", script}); awkErr != nil {
log.Printf("Plugin uninstall: failed to strip markers from CLAUDE.md for %s in %s: %v", pluginName, workspaceID, awkErr)
}
}
// regexpEscapeForAwk escapes characters that have special meaning inside an
@@ -1,21 +0,0 @@
package handlers
import (
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/providers"
)
// Proper-SSOT (task #65): required_env is DERIVED from the resolved provider's
// serving classification (IsPlatform), not hand-authored — platform injects
// creds server-side (none required), BYOK requires its auth_env.
func TestRequiredEnvForRegistryProvider(t *testing.T) {
if got := requiredEnvForRegistryProvider(providers.Provider{Name: providers.PlatformProviderName}); got != nil {
t.Errorf("platform provider requiredEnv = %v; want nil (creds injected server-side)", got)
}
byok := providers.Provider{Name: "google", AuthEnv: []string{"GEMINI_API_KEY", "GOOGLE_API_KEY"}}
got := requiredEnvForRegistryProvider(byok)
if len(got) != 2 || got[0] != "GEMINI_API_KEY" {
t.Errorf("byok requiredEnv = %v; want its auth_env", got)
}
}
@@ -1,168 +0,0 @@
package handlers
// rescue_read.go — GET /workspaces/:id/rescue (RFC internal#742 Part 3).
//
// Serves the LATEST post-mortem rescue bundle captured for a
// boot-failed/terminated workspace, so "why won't my agent boot" is
// answerable WITHOUT a live instance. Powers the future canvas
// "Why did this fail?" panel.
//
// Read-path: the bundle is read from the queryable rescue_bundles table
// (internal/rescuestore), NOT from obs/Loki. Part 2 ships the bundle via
// internal/audit (Loki-only); reading from Loki would require obs read
// creds the tenant deliberately lacks. Part 3 persists the
// already-redacted bundle on capture and serves it here — see the
// migration header for the full rationale.
//
// Auth/scoping: registered on the WorkspaceAuth-guarded /workspaces/:id
// group (same gate as /files/* and /exec), so the caller must hold a
// valid per-workspace or org bearer token for :id. TenantGuard already
// 404s cross-org requests at the routing layer; on top of that the store
// read is org-scoped by MOLECULE_ORG_ID, so a row written under a
// different org is never returned (defense in depth).
//
// Redaction: the stored sections were already scrubbed at capture time
// (Part 2's SAFE-T1201 secret-scan). This handler returns them verbatim
// — it never re-ships or re-derives secrets.
import (
"log"
"net/http"
"os"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore"
"github.com/gin-gonic/gin"
)
// maxResponseSections bounds how many sections the read response
// returns. The fixed capture set is small (6), so this is a backstop
// against a future capture set growth or a hand-written row — keeps the
// JSON response bounded regardless of what's stored. Per-section content
// is already clamped at persist time (rescuestore.maxSectionBytes).
const maxResponseSections = 64
// RescueReadHandler serves GET /workspaces/:id/rescue. The store is
// injected so tests fake it; production wires a Postgres store over
// db.DB (see NewRescueReadHandler).
type RescueReadHandler struct {
store rescuestore.Store
}
// NewRescueReadHandler builds the handler over the package db.DB. db.DB
// is nil in some unit-test binaries; the handler tolerates that by
// returning 503 rather than nil-deref (the store guards nil db).
func NewRescueReadHandler() *RescueReadHandler {
return &RescueReadHandler{store: rescuestore.NewPostgres(db.DB)}
}
// WithStore overrides the store (test seam). Returns the handler for
// chaining.
func (h *RescueReadHandler) WithStore(s rescuestore.Store) *RescueReadHandler {
h.store = s
return h
}
// rescueSection is one labelled chunk in the read response.
type rescueSection struct {
Name string `json:"name"`
Content string `json:"content"`
Redacted bool `json:"redacted"`
}
// rescueReadResponse is the JSON shape returned for a found bundle.
// `sections` is an ordered array (capture reading order), not a map, so
// the order config→logs→state→env is preserved for the canvas panel.
type rescueReadResponse struct {
WorkspaceID string `json:"workspace_id"`
CapturedAt time.Time `json:"captured_at"`
Reason string `json:"reason"`
InstanceID string `json:"instance_id"`
Sections []rescueSection `json:"sections"`
// Truncated is true when the stored bundle had more sections than
// maxResponseSections and the response was capped.
Truncated bool `json:"truncated,omitempty"`
}
// GetRescue handles GET /workspaces/:id/rescue.
//
// 200 — latest rescue bundle for the workspace (org-scoped).
// 404 — no rescue bundle on file for this workspace (or wrong org).
// 503 — store/datastore unavailable.
func (h *RescueReadHandler) GetRescue(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
if h.store == nil {
log.Printf("GetRescue: store not configured for ws=%s", workspaceID)
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "rescue store unavailable",
"code": "platform_unavailable",
})
return
}
// org_id is the tenant's configured org (one tenant = one org).
// Fail closed: an empty org_id disables org isolation and must not
// reach the store (#2020).
orgID := os.Getenv("MOLECULE_ORG_ID")
if orgID == "" {
log.Printf("GetRescue: missing MOLECULE_ORG_ID for ws=%s", workspaceID)
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "rescue org not configured",
"code": "platform_misconfigured",
})
return
}
stored, err := h.store.GetLatest(ctx, workspaceID, orgID)
if err != nil {
// Per the Store contract a missing bundle is (nil, nil), NOT an
// error — so any error here is a genuine datastore fault → 503,
// never a masquerading 404 that would hide an outage.
log.Printf("GetRescue: store query failed for ws=%s: %v", workspaceID, err)
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "rescue store query failed",
"code": "platform_unavailable",
})
return
}
if stored == nil {
// No bundle captured (workspace never boot-failed, or its grace
// window lapsed). 404 — existence-non-inferring; a workspace in a
// sibling org reaches the same 404 via the org filter.
c.JSON(http.StatusNotFound, gin.H{"error": "no rescue bundle for this workspace"})
return
}
resp := buildRescueResponse(workspaceID, stored)
c.JSON(http.StatusOK, resp)
}
// buildRescueResponse maps a stored bundle to the read response, bounding
// the section count. Split out so the mapping/limit is unit-testable.
func buildRescueResponse(workspaceID string, stored *rescuestore.StoredBundle) rescueReadResponse {
secs := stored.Bundle.Sections
truncated := false
if len(secs) > maxResponseSections {
secs = secs[:maxResponseSections]
truncated = true
}
out := make([]rescueSection, 0, len(secs))
for _, s := range secs {
// rescue.Section and rescueSection are field-identical; the
// explicit conversion keeps the handler's JSON shape independent
// of the leaf package's struct (which could gain non-response
// fields later).
out = append(out, rescueSection(s))
}
return rescueReadResponse{
WorkspaceID: workspaceID,
CapturedAt: stored.CapturedAt,
Reason: stored.Bundle.Reason,
InstanceID: stored.Bundle.InstanceID,
Sections: out,
Truncated: truncated,
}
}
@@ -1,238 +0,0 @@
package handlers
// Tests for GET /workspaces/:id/rescue (RFC internal#742 Part 3).
//
// These exercise the handler against a FAKE store (no DB) so every path
// is deterministic without external infra:
// - returns the latest bundle in the documented shape
// - 404 when no bundle exists for the workspace
// - org-scoping: the handler passes the tenant's MOLECULE_ORG_ID to
// the store, so a fake that returns nil for a mismatched org proves a
// sibling org cannot read another org's bundle
// - 503 on a store/datastore error (not a 404 masquerade)
// - redaction/shape preserved: stored sections are returned verbatim,
// no re-derivation
//
// WorkspaceAuth gating itself is covered by the middleware tests; here we
// invoke the handler directly (the route is registered on the wsAuth
// group in router.go).
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore"
"github.com/gin-gonic/gin"
)
func init() { gin.SetMode(gin.TestMode) }
// fakeRescueStore records the args it was called with and returns a
// scripted result. Implements rescuestore.Store.
type fakeRescueStore struct {
// gotWorkspaceID/gotOrgID capture what the handler passed.
gotWorkspaceID string
gotOrgID string
// ret/err are the scripted GetLatest result.
ret *rescuestore.StoredBundle
err error
}
func (f *fakeRescueStore) Persist(_ context.Context, _ rescue.Bundle) error { return nil }
func (f *fakeRescueStore) GetLatest(_ context.Context, workspaceID, orgID string) (*rescuestore.StoredBundle, error) {
f.gotWorkspaceID = workspaceID
f.gotOrgID = orgID
return f.ret, f.err
}
// doRescueGet runs the handler for ws against the given fake and returns
// the recorder. orgEnv sets MOLECULE_ORG_ID for the duration.
func doRescueGet(t *testing.T, ws, orgEnv string, fake *fakeRescueStore) *httptest.ResponseRecorder {
t.Helper()
t.Setenv("MOLECULE_ORG_ID", orgEnv)
h := (&RescueReadHandler{}).WithStore(fake)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: ws}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+ws+"/rescue", nil)
h.GetRescue(c)
return w
}
// sampleStored builds a representative stored bundle with a redacted +
// a failure-marker section.
func sampleStored() *rescuestore.StoredBundle {
return &rescuestore.StoredBundle{
CapturedAt: time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC),
Bundle: rescue.Bundle{
WorkspaceID: "ws-1",
OrgID: "org-9",
InstanceID: "i-abc123",
Reason: "provision_timeout_sweep",
Sections: []rescue.Section{
{Name: "config.yaml", Content: "model: gpt-4\nANTHROPIC_API_KEY=[REDACTED]", Redacted: true},
{Name: "docker-ps", Content: "(rescue: section collection failed: ssh blip)", Redacted: false},
},
},
}
}
// TestGetRescue_ReturnsLatestBundle — happy path: 200 with the full
// documented shape, sections in order, redaction-preserved.
func TestGetRescue_ReturnsLatestBundle(t *testing.T) {
fake := &fakeRescueStore{ret: sampleStored()}
w := doRescueGet(t, "ws-1", "org-9", fake)
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200; body=%s", w.Code, w.Body.String())
}
var resp struct {
WorkspaceID string `json:"workspace_id"`
CapturedAt time.Time `json:"captured_at"`
Reason string `json:"reason"`
InstanceID string `json:"instance_id"`
Sections []struct {
Name string `json:"name"`
Content string `json:"content"`
Redacted bool `json:"redacted"`
} `json:"sections"`
}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode: %v; body=%s", err, w.Body.String())
}
if resp.WorkspaceID != "ws-1" {
t.Errorf("workspace_id = %q, want ws-1", resp.WorkspaceID)
}
if resp.Reason != "provision_timeout_sweep" {
t.Errorf("reason = %q", resp.Reason)
}
if resp.InstanceID != "i-abc123" {
t.Errorf("instance_id = %q", resp.InstanceID)
}
if !resp.CapturedAt.Equal(time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC)) {
t.Errorf("captured_at = %v", resp.CapturedAt)
}
if len(resp.Sections) != 2 {
t.Fatalf("sections = %d, want 2", len(resp.Sections))
}
// Order preserved: config first, docker-ps second.
if resp.Sections[0].Name != "config.yaml" || resp.Sections[1].Name != "docker-ps" {
t.Errorf("section order wrong: %q, %q", resp.Sections[0].Name, resp.Sections[1].Name)
}
// Redaction-preserved: the redacted flag rides through untouched, and
// the failure marker stays a non-redacted marker.
if !resp.Sections[0].Redacted {
t.Error("config.yaml section should be redacted=true")
}
if resp.Sections[1].Redacted {
t.Error("failure-marker section should be redacted=false")
}
// Handler does NOT re-derive secrets; stored [REDACTED] verbatim.
if want := "ANTHROPIC_API_KEY=[REDACTED]"; !strings.Contains(resp.Sections[0].Content, want) {
t.Errorf("section content = %q, want it to contain %q", resp.Sections[0].Content, want)
}
}
// TestGetRescue_404WhenNone — no bundle on file → 404, not 500/200.
func TestGetRescue_404WhenNone(t *testing.T) {
fake := &fakeRescueStore{ret: nil} // store returns (nil, nil)
w := doRescueGet(t, "ws-none", "org-9", fake)
if w.Code != http.StatusNotFound {
t.Fatalf("status = %d, want 404; body=%s", w.Code, w.Body.String())
}
}
// TestGetRescue_OrgScopingPassedToStore — the handler must hand the
// tenant's MOLECULE_ORG_ID to the store, and a store that returns nil for
// a mismatched org yields 404. This is the sibling-org isolation: a
// caller in org B (a different tenant process, MOLECULE_ORG_ID=org-B)
// reading ws-1 (which belongs to org-9) gets the org filter applied → no
// row → 404.
func TestGetRescue_OrgScopingPassedToStore(t *testing.T) {
// Tenant configured as a DIFFERENT org than the bundle's owner.
// Fake mimics the Postgres org filter: returns nil because org-B
// doesn't match the row's org-9.
fake := &fakeRescueStore{ret: nil}
w := doRescueGet(t, "ws-1", "org-B", fake)
if fake.gotOrgID != "org-B" {
t.Errorf("store got org_id = %q, want the tenant's org-B", fake.gotOrgID)
}
if fake.gotWorkspaceID != "ws-1" {
t.Errorf("store got workspace_id = %q, want ws-1", fake.gotWorkspaceID)
}
if w.Code != http.StatusNotFound {
t.Fatalf("sibling-org read: status = %d, want 404", w.Code)
}
}
// TestGetRescue_EmptyOrgEnvRejected — empty MOLECULE_ORG_ID is a
// fail-closed security violation (#2020). The handler must 503 before
// calling the store, so the org filter cannot be bypassed.
func TestGetRescue_EmptyOrgEnvRejected(t *testing.T) {
fake := &fakeRescueStore{ret: sampleStored()}
w := doRescueGet(t, "ws-1", "", fake)
if fake.gotOrgID != "" {
t.Errorf("store was called with org_id = %q; want no call when env empty", fake.gotOrgID)
}
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("status = %d, want 503; body=%s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "platform_misconfigured") {
t.Fatalf("body = %s, want platform_misconfigured code", w.Body.String())
}
}
// TestGetRescue_StoreErrorIs503 — an actual datastore error must surface
// as 503, never a 404 (which would hide an outage as "no bundle").
func TestGetRescue_StoreErrorIs503(t *testing.T) {
fake := &fakeRescueStore{err: errors.New("connection refused")}
w := doRescueGet(t, "ws-1", "org-9", fake)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("status = %d, want 503; body=%s", w.Code, w.Body.String())
}
}
// TestGetRescue_NilStoreIs503 — defensive: a handler with no store wired
// (db.DB nil in a degraded boot) returns 503, never panics.
func TestGetRescue_NilStoreIs503(t *testing.T) {
t.Setenv("MOLECULE_ORG_ID", "org-9")
h := &RescueReadHandler{} // store == nil
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/rescue", nil)
h.GetRescue(c)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("status = %d, want 503", w.Code)
}
}
// TestBuildRescueResponse_BoundsSections — a stored bundle with more than
// maxResponseSections sections is capped + flagged truncated.
func TestBuildRescueResponse_BoundsSections(t *testing.T) {
many := make([]rescue.Section, maxResponseSections+5)
for i := range many {
many[i] = rescue.Section{Name: "s", Content: "c", Redacted: true}
}
stored := &rescuestore.StoredBundle{
CapturedAt: time.Now(),
Bundle: rescue.Bundle{WorkspaceID: "ws-1", Sections: many},
}
resp := buildRescueResponse("ws-1", stored)
if len(resp.Sections) != maxResponseSections {
t.Errorf("sections = %d, want capped at %d", len(resp.Sections), maxResponseSections)
}
if !resp.Truncated {
t.Error("truncated flag should be set when sections were capped")
}
}
@@ -1,168 +0,0 @@
package handlers
// rescue_wiring.go — bridges the leaf internal/rescue package to the
// handlers package's EIC/SSH runner + secret redactor, and exposes the
// boot-failure rescue hook used by both boot-failure verdict paths
// (handlers.BootstrapFailed here, registry.sweepStuckProvisioning via
// an injected hook wired in main.go).
//
// Why the indirection: internal/rescue is a leaf so registry (which
// must NOT import handlers — that's an import cycle) can call it. The
// two heavy dependencies live here in handlers — `withEICTunnel`
// (the EIC keypair → push → tunnel → ssh dance) and `redactSecrets`
// (the SAFE-T1201 secret-scan) — so we inject them into rescue's
// package-level func vars at init().
//
// RFC internal#742 Part 2.
import (
"bytes"
"context"
"database/sql"
"fmt"
"os"
"os/exec"
"strings"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore"
)
func init() {
// Wire the leaf rescue package to handlers' EIC runner + redactor.
// Done in init() (not main.go) so the binding is present for any
// caller of rescue.Capture, including the registry sweeper hook and
// the handler path, without each call site re-wiring it.
rescue.RunRemote = rescueRunRemoteViaEIC
rescue.Redact = func(workspaceID, content string) string {
out, _ := redactSecrets(workspaceID, content)
return out
}
// Part 3: persist the redacted bundle to the queryable store on
// capture so GET /workspaces/:id/rescue can serve it without obs/Loki
// read creds. db.DB is resolved per-call (rescuestore guards a nil
// handle) so wiring at init() is safe even before InitPostgres has
// run; a capture before the DB is up logs + skips the persist rather
// than failing the boot-failure path.
rescue.PersistBundle = func(ctx context.Context, b rescue.Bundle) error {
return rescuestore.NewPostgres(db.DB).Persist(ctx, b)
}
}
// rescueRunRemoteViaEIC runs a single shell command on the still-running
// (but boot-failed) workspace EC2 over an EIC tunnel and returns its
// combined stdout+stderr. Reuses the same `withEICTunnel` dance as the
// canvas file ops (ephemeral keypair → SendSSHPublicKey → open-tunnel →
// ssh) so the rescue path inherits every fix to the EIC mechanism (e.g.
// PR #2822's LogLevel=ERROR shim) for free.
//
// Combined output (2>&1) is intentional: a boot-failed box's most
// useful signal is often on stderr (a panic, a missing-file error), and
// the rescue bundle is a forensic blob, not a parsed value — we want
// everything the command emitted.
func rescueRunRemoteViaEIC(ctx context.Context, instanceID, command string) (string, error) {
var combined []byte
runErr := withEICTunnel(ctx, instanceID, func(s eicSSHSession) error {
sshCmd := exec.CommandContext(ctx, "ssh", s.sshArgs(command)...)
sshCmd.Env = os.Environ()
var buf bytes.Buffer
sshCmd.Stdout = &buf
sshCmd.Stderr = &buf
// A non-zero remote exit is NOT a transport error for the rescue
// path — each section command already falls back to an
// `|| echo '(...)'` marker, so a clean exit is expected. Only
// surface an error when ssh/tunnel itself failed AND produced no
// output to ship.
err := sshCmd.Run()
combined = buf.Bytes()
if err != nil && len(combined) == 0 {
return fmt.Errorf("rescue ssh exec: %w", err)
}
return nil
})
if runErr != nil {
return "", runErr
}
return strings.TrimRight(string(combined), "\n"), nil
}
// captureRescueBundle fires a best-effort, non-blocking rescue capture
// for a boot-failed workspace. It is the single entry point both
// boot-failure verdict paths funnel through.
//
// NON-BLOCKING: the actual collection runs in its own goroutine with
// its own timeout (rescue.CaptureTimeout), detached from the caller's
// request/sweep context so it can't add latency to — or be cancelled
// by — the failure-handling path that triggered it. We snapshot the
// identity into a fresh context.Background() for the same reason: a
// gin request context is cancelled the instant the HTTP handler
// returns, which would kill the EIC tunnel mid-collection.
//
// instanceID/orgID are resolved here (best-effort) so the two call
// sites only need the workspace id. A missing instance id → rescue.Capture
// no-ops (logged), so an early-failure workspace that never got an EC2
// is handled cleanly.
func captureRescueBundle(workspaceID, reason string) {
rescueDispatch(func() {
ctx := context.Background()
instanceID, err := rescueResolveInstanceID(ctx, workspaceID)
if err != nil {
// Best-effort: a resolve failure is logged inside Capture's
// caller chain; pass empty so Capture no-ops cleanly.
instanceID = ""
}
rescue.Capture(ctx, rescue.Input{
InstanceID: instanceID,
WorkspaceID: workspaceID,
OrgID: os.Getenv("MOLECULE_ORG_ID"),
Reason: reason,
})
})
}
// rescueDispatch runs the rescue collection off the request path. In
// production it's `go fn()` so the capture never blocks or adds latency
// to the boot-failure handler. Tests swap it for a synchronous runner so
// they can assert the capture fired (or didn't) deterministically
// without racing the goroutine.
var rescueDispatch = func(fn func()) { go fn() }
// BootFailureRescueHook is the registry-facing adapter wired into
// registry.BootFailureRescueHook from main.go. The registry sweeper
// already resolved the instance id (it's in the candidate row), so this
// path uses it directly rather than re-querying — symmetric with the
// captureRescueBundle handler path but skipping the lookup.
//
// Best-effort + non-blocking: dispatches the capture on its own
// goroutine with its own timeout, so the sweep loop is never slowed.
func BootFailureRescueHook(workspaceID, instanceID, reason string) {
go rescue.Capture(context.Background(), rescue.Input{
InstanceID: instanceID,
WorkspaceID: workspaceID,
OrgID: os.Getenv("MOLECULE_ORG_ID"),
Reason: reason,
})
}
// rescueResolveInstanceID looks up the EC2 instance id for a workspace.
// Package var so tests can stub it without a sqlmock. Mirrors
// provisioner.resolveInstanceID (same query) but lives here to keep the
// rescue wiring self-contained and avoid widening the provisioner
// surface.
var rescueResolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) {
if db.DB == nil {
return "", nil // nil in unit tests
}
var instanceID sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&instanceID)
if err != nil && err != sql.ErrNoRows {
return "", err
}
if !instanceID.Valid {
return "", nil
}
return instanceID.String, nil
}
@@ -1,119 +0,0 @@
package handlers
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// rescueTestHarness makes the otherwise-async rescue capture
// deterministic + observable for handler tests:
// - rescueDispatch runs synchronously (no goroutine race).
// - rescueResolveInstanceID returns a fixed instance id.
// - rescue.RunRemote / rescue.Redact are stubbed so no real EIC/SSH
// fires; runCalls counts how many remote-command collections ran,
// which is the proxy for "did the capture fire".
//
// All originals are restored on cleanup.
func rescueTestHarness(t *testing.T, instanceID string) (runCalls *int) {
t.Helper()
n := 0
runCalls = &n
prevDispatch := rescueDispatch
rescueDispatch = func(fn func()) { fn() } // synchronous
prevResolve := rescueResolveInstanceID
rescueResolveInstanceID = func(_ context.Context, _ string) (string, error) { return instanceID, nil }
prevRun, prevRedact := rescue.RunRemote, rescue.Redact
rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { n++; return "out", nil }
rescue.Redact = func(_ws, c string) string { return c }
t.Cleanup(func() {
rescueDispatch = prevDispatch
rescueResolveInstanceID = prevResolve
rescue.RunRemote = prevRun
rescue.Redact = prevRedact
})
return runCalls
}
// TestBootstrapFailed_FiresRescueOnFlip — the RFC internal#742 handler
// hook: when BootstrapFailed actually flips a workspace to `failed`
// (affected==1), the rescue capture fires against the resolved instance.
func TestBootstrapFailed_FiresRescueOnFlip(t *testing.T) {
h, mock := setupBootstrapHandler(t)
runCalls := rescueTestHarness(t, "i-failed01")
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-crashed", sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("WORKSPACE_PROVISION_FAILED", "ws-crashed", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-crashed"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-crashed/bootstrap-failed",
bytes.NewBufferString(`{"error":"codex provider derivation failed","log_tail":"panic"}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
if *runCalls != len(rescueBundleSectionCount()) {
t.Errorf("rescue capture ran %d remote commands, want %d (one per bundle section)", *runCalls, len(rescueBundleSectionCount()))
}
}
// TestBootstrapFailed_NoRescueOnNoChange — an already-transitioned
// workspace (affected==0: raced to online, or double-report) is NOT a
// boot-failure verdict here, so the rescue capture must NOT fire.
func TestBootstrapFailed_NoRescueOnNoChange(t *testing.T) {
h, mock := setupBootstrapHandler(t)
runCalls := rescueTestHarness(t, "i-online01")
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-online", sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 0)) // already transitioned
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-online"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-online/bootstrap-failed",
bytes.NewBufferString(`{"error":"late report","log_tail":""}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d", w.Code)
}
if *runCalls != 0 {
t.Errorf("rescue capture fired (%d cmds) on a no-change report; it must only fire on a real flip", *runCalls)
}
}
// rescueBundleSectionCount returns the production rescue bundle section
// list length by running a capture against a counting runner once. It's
// a small indirection so the handler test stays decoupled from the exact
// section set in internal/rescue (which has its own tests).
func rescueBundleSectionCount() []struct{} {
count := 0
prevRun, prevRedact := rescue.RunRemote, rescue.Redact
rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { count++; return "", nil }
rescue.Redact = func(_ws, c string) string { return c }
rescue.Capture(context.Background(), rescue.Input{InstanceID: "i-probe", WorkspaceID: "w", OrgID: "o"})
rescue.RunRemote = prevRun
rescue.Redact = prevRedact
return make([]struct{}, count)
}
@@ -176,10 +176,6 @@ func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) {
// TestGracefulPreRestart_Success verifies that when the workspace returns 200,
// the signal is logged as acknowledged without error.
func TestGracefulPreRestart_Success(t *testing.T) {
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
testURL: "http://fake-agent.example/agent",
}
_ = setupTestDB(t)
// httptest server simulating the workspace container's /signals/restart_pending
@@ -209,15 +205,18 @@ func TestGracefulPreRestart_Success(t *testing.T) {
})
}))
defer srv.Close()
hWrapper.testURL = srv.URL + "/agent"
// Pre-populate Redis cache with the test server URL
_ = setupTestRedisWithURL(t, srv.URL)
// gracefulPreRestart runs in a goroutine; wait for it before db.DB is restored.
// Must be registered AFTER setupTestDB (LIFO: async wait → db.DB restore).
waitForHandlerAsyncBeforeDBCleanup(t, hWrapper.WorkspaceHandler)
// Use a wrapper so gracefulPreRestart runs through the embedded handler.
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
testURL: srv.URL + "/agent",
}
// gracefulPreRestart runs in a goroutine with its own timeout.
// We give it time to complete before the test ends.
hWrapper.gracefulPreRestart(context.Background(), "ws-ack-789")
time.Sleep(200 * time.Millisecond)
}
@@ -225,22 +224,19 @@ func TestGracefulPreRestart_Success(t *testing.T) {
// TestGracefulPreRestart_NotImplemented verifies that when the workspace returns
// 404 (old SDK version), the platform proceeds gracefully (log + no error).
func TestGracefulPreRestart_NotImplemented(t *testing.T) {
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
testURL: "http://fake-agent.example/agent",
}
_ = setupTestDB(t)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer srv.Close()
hWrapper.testURL = srv.URL + "/agent"
_ = setupTestRedisWithURL(t, srv.URL)
// Must be registered AFTER setupTestDB so LIFO order is: async wait → db.DB restore.
waitForHandlerAsyncBeforeDBCleanup(t, hWrapper.WorkspaceHandler)
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
testURL: srv.URL + "/agent",
}
hWrapper.gracefulPreRestart(context.Background(), "ws-noimpl-999")
time.Sleep(200 * time.Millisecond)
@@ -250,18 +246,15 @@ func TestGracefulPreRestart_NotImplemented(t *testing.T) {
// TestGracefulPreRestart_ConnectionRefused verifies that when the workspace
// is unreachable, the platform proceeds gracefully without error.
func TestGracefulPreRestart_ConnectionRefused(t *testing.T) {
_ = setupTestDB(t)
mr := setupTestRedisWithURL(t, "http://localhost:19999/agent") // nothing listening on 19999
_ = mr
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
testURL: "http://localhost:19999/agent",
}
_ = setupTestDB(t)
// Nothing listening on 19999 — deliberate connection failure.
mr := setupTestRedisWithURL(t, "http://localhost:19999/agent")
_ = mr
// Must be registered AFTER setupTestDB so LIFO order is: async wait → db.DB restore.
waitForHandlerAsyncBeforeDBCleanup(t, hWrapper.WorkspaceHandler)
hWrapper.gracefulPreRestart(context.Background(), "ws-unreachable-000")
time.Sleep(200 * time.Millisecond)
@@ -271,17 +264,13 @@ func TestGracefulPreRestart_ConnectionRefused(t *testing.T) {
// TestGracefulPreRestart_URLResolutionError verifies that when URL resolution
// fails, the platform proceeds gracefully without blocking the restart.
func TestGracefulPreRestart_URLResolutionError(t *testing.T) {
_ = setupTestDB(t)
_ = setupTestRedis(t) // empty → URL resolution will fail in resolveAgentURLForRestartSignal
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
errToReturn: context.DeadlineExceeded,
}
_ = setupTestDB(t)
_ = setupTestRedis(t) // empty → URL resolution will fail in resolveAgentURLForRestartSignal
// Must be registered AFTER setupTestDB so LIFO order is: async wait → db.DB restore.
// This ensures goroutines (which access both DB and Redis) are drained before
// any cleanup fires. setupTestRedis comes after newHandlerWithTestDeps
// so the handler holds the correct Redis client reference.
waitForHandlerAsyncBeforeDBCleanup(t, hWrapper.WorkspaceHandler)
hWrapper.gracefulPreRestart(context.Background(), "ws-url-err-111")
@@ -44,20 +44,6 @@ func billingModeForRegistryProvider(p providers.Provider) string {
return LLMBillingModeBYOK
}
// requiredEnvForRegistryProvider derives the env vars the USER must supply for
// a model owned by the resolved provider — the proper-SSOT single fact behind
// the canvas "Missing API Keys" preflight (task #65). The closed platform
// provider injects credentials server-side (the metered proxy) -> nothing
// required; BYOK providers require their auth_env. Derived from IsPlatform +
// AuthEnv so a template can no longer hand-author a required_env that drifts
// from the registry's serving classification.
func requiredEnvForRegistryProvider(p providers.Provider) []string {
if p.IsPlatform() {
return nil
}
return p.AuthEnv
}
// enrichFromRegistry populates the registry-served fields on a templateSummary
// when its runtime is known to the provider registry. It is a no-op (leaves
// RegistryBacked=false and the registry slices nil) for a runtime the registry
@@ -112,7 +98,6 @@ func enrichFromRegistry(summary *templateSummary, runtime string) {
if derived, derr := m.DeriveProvider(runtime, id, nil); derr == nil {
ms.Provider = derived.Name
ms.BillingMode = billingModeForRegistryProvider(derived)
ms.RequiredEnv = requiredEnvForRegistryProvider(derived)
}
// If DeriveProvider errors (ambiguous/overlap — a manifest defect the
// loader's tests pin against), still serve the id without a provider
+11 -57
View File
@@ -1,15 +1,12 @@
package handlers
import (
"context"
"fmt"
"io"
"net/http"
"os"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/crypto"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"github.com/gin-gonic/gin"
)
@@ -21,69 +18,30 @@ func NewTracesHandler() *TracesHandler {
return &TracesHandler{}
}
type langfuseConfig struct {
Host string
Public string
Secret string
}
// resolveLangfuseConfig resolves Langfuse connection settings from
// admin-controlled global secrets first, then process env for legacy/dev use.
// Workspace secrets are intentionally excluded: a workspace-controlled
// LANGFUSE_HOST would allow SSRF with BasicAuth attached (#2029).
func resolveLangfuseConfig(ctx context.Context) (*langfuseConfig, error) {
cfg := &langfuseConfig{}
resolve := func(key string) string {
var val []byte
var ver int
err := db.DB.QueryRowContext(ctx,
`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`,
key).Scan(&val, &ver)
if err == nil {
decrypted, decErr := crypto.DecryptVersioned(val, ver)
if decErr == nil {
return string(decrypted)
}
}
return os.Getenv(key)
}
cfg.Host = resolve("LANGFUSE_HOST")
cfg.Public = resolve("LANGFUSE_PUBLIC_KEY")
cfg.Secret = resolve("LANGFUSE_SECRET_KEY")
if cfg.Host == "" || cfg.Public == "" || cfg.Secret == "" {
return nil, nil
}
return cfg, nil
}
// List handles GET /workspaces/:id/traces
// Proxies to Langfuse API to get recent traces for a workspace.
func (h *TracesHandler) List(c *gin.Context) {
workspaceID := c.Param("id")
cfg, err := resolveLangfuseConfig(c.Request.Context())
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to resolve trace config"})
return
}
if cfg == nil {
langfuseHost := os.Getenv("LANGFUSE_HOST")
langfusePublic := os.Getenv("LANGFUSE_PUBLIC_KEY")
langfuseSecret := os.Getenv("LANGFUSE_SECRET_KEY")
if langfuseHost == "" || langfusePublic == "" || langfuseSecret == "" {
c.JSON(http.StatusOK, []interface{}{})
return
}
// Fetch traces from Langfuse, filtered by workspace tag or name
url := fmt.Sprintf("%s/api/public/traces?limit=20&orderBy=timestamp&orderDir=desc&tags=%s",
cfg.Host, workspaceID)
langfuseHost, workspaceID)
req, err := http.NewRequestWithContext(c.Request.Context(), "GET", url, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"})
return
}
req.SetBasicAuth(cfg.Public, cfg.Secret)
req.SetBasicAuth(langfusePublic, langfuseSecret)
resp, err := langfuseClient.Do(req)
if err != nil {
@@ -93,14 +51,10 @@ func (h *TracesHandler) List(c *gin.Context) {
}
defer func() { _ = resp.Body.Close() }()
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
c.JSON(http.StatusOK, []interface{}{})
body, err := io.ReadAll(resp.Body)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read response body"})
return
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
c.JSON(http.StatusOK, []interface{}{})
return
}
c.Data(http.StatusOK, "application/json", body)
c.Data(resp.StatusCode, "application/json", body)
}
@@ -1,31 +1,27 @@
package handlers
import (
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/crypto"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// ==================== GET /workspaces/:id/traces ====================
func TestTracesList_NoLangfuseConfig(t *testing.T) {
mock := setupTestDB(t)
setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// Ensure Langfuse env vars are not set
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
expectMissingGlobalLangfuseSecrets(mock)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces"}}
@@ -45,16 +41,14 @@ func TestTracesList_NoLangfuseConfig(t *testing.T) {
if len(resp) != 0 {
t.Errorf("expected empty list when Langfuse not configured, got %d items", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestTracesList_PartialLangfuseConfig(t *testing.T) {
mock := setupTestDB(t)
setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// Set only host, missing keys
os.Setenv("LANGFUSE_HOST", "http://localhost:3000")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
@@ -62,8 +56,6 @@ func TestTracesList_PartialLangfuseConfig(t *testing.T) {
os.Unsetenv("LANGFUSE_HOST")
}()
expectMissingGlobalLangfuseSecrets(mock)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
@@ -80,13 +72,10 @@ func TestTracesList_PartialLangfuseConfig(t *testing.T) {
if len(resp) != 0 {
t.Errorf("expected empty list with partial config, got %d items", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestTracesList_LangfuseUnreachable(t *testing.T) {
mock := setupTestDB(t)
setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
@@ -100,8 +89,6 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
os.Unsetenv("LANGFUSE_SECRET_KEY")
}()
expectMissingGlobalLangfuseSecrets(mock)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-down"}}
@@ -119,171 +106,4 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
if len(resp) != 0 {
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestTracesList_GlobalSecretsFallback(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
expectGlobalLangfuseSecret(mock, "LANGFUSE_HOST", "http://localhost:3000")
expectGlobalLangfuseSecret(mock, "LANGFUSE_PUBLIC_KEY", "pk-global")
expectGlobalLangfuseSecret(mock, "LANGFUSE_SECRET_KEY", "sk-global")
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-global"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-global/traces", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestTracesList_GlobalPartialConfig(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
expectGlobalLangfuseSecret(mock, "LANGFUSE_HOST", "http://localhost:3000")
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list with partial config, got %d items", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestTracesList_LangfuseUpstreamError(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("<html><body>Internal Server Error</body></html>"))
}))
defer upstream.Close()
os.Setenv("LANGFUSE_HOST", upstream.URL)
os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
os.Setenv("LANGFUSE_SECRET_KEY", "sk-test")
defer func() {
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
}()
expectMissingGlobalLangfuseSecrets(mock)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-500"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-500/traces", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list on upstream error, got %d items", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func TestTracesList_WorkspaceSecretsIgnored(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
expectMissingGlobalLangfuseSecrets(mock)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-ssrf"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-ssrf/traces", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list when workspace secrets ignored, got %d items", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
func expectMissingGlobalLangfuseSecrets(mock sqlmock.Sqlmock) {
for _, key := range []string{"LANGFUSE_HOST", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY"} {
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs(key).
WillReturnError(sql.ErrNoRows)
}
}
func expectGlobalLangfuseSecret(mock sqlmock.Sqlmock, key, value string) {
enc, _ := crypto.Encrypt([]byte(value))
ver := crypto.CurrentEncryptionVersion()
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs(key).
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(enc, ver))
}
@@ -474,32 +474,6 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
})
return
}
// issue #2172 (provider-side companion): once the (runtime, model)
// pair is known to be registered, confirm the DERIVED provider
// (the one the adapter will resolve at boot) is a known provider
// in the providers.yaml catalog. Live trigger (adk-demo Assistant,
// 2026-06-03): the model-side check passed for a registry-resident
// model whose derived provider name was NOT in the providers list,
// so the save was accepted and the agent wedged at boot with
// "provider=X not in providers registry". This check is a
// defense-in-depth registry-consistency guard: by construction a
// model in a runtime's native set derives to a provider that IS in
// the catalog, so the rejection path is primarily a registry-data
// invariant — any future drift between `providers:` and `runtimes:`
// fails the create with a clear pointer to the missing provider
// rather than silently wedging the agent. Fails open for runtimes
// the registry doesn't know (langgraph/external/kimi/mock/federated
// — the federation contract the model-side check also honors).
if ok, why := validateDerivedProviderInRegistry(payload.Runtime, payload.Model); !ok {
log.Printf("Create: 422 DERIVED_PROVIDER_NOT_IN_REGISTRY (runtime=%q model=%q): %s [issue #2172 hard-reject]", payload.Runtime, payload.Model, why)
c.JSON(http.StatusUnprocessableEntity, gin.H{
"error": why,
"runtime": payload.Runtime,
"model": payload.Model,
"code": "DERIVED_PROVIDER_NOT_IN_REGISTRY",
})
return
}
}
ctx := c.Request.Context()
@@ -56,20 +56,7 @@ func PatchAbilities(c *gin.Context) {
return
}
// Atomic update: when both fields are supplied, apply them in one SQL
// statement so the request is all-or-nothing (#2131). A partial mutation
// (e.g. broadcast_enabled updated but talk_to_user_enabled failing) would
// leave the workspace in an ambiguous capability state.
if body.BroadcastEnabled != nil && body.TalkToUserEnabled != nil {
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET broadcast_enabled = $2, talk_to_user_enabled = $3, updated_at = now() WHERE id = $1`,
id, *body.BroadcastEnabled, *body.TalkToUserEnabled,
); err != nil {
log.Printf("PatchAbilities both-fields for %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
} else if body.BroadcastEnabled != nil {
if body.BroadcastEnabled != nil {
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET broadcast_enabled = $2, updated_at = now() WHERE id = $1`,
id, *body.BroadcastEnabled,
@@ -78,7 +65,9 @@ func PatchAbilities(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
} else if body.TalkToUserEnabled != nil {
}
if body.TalkToUserEnabled != nil {
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET talk_to_user_enabled = $2, updated_at = now() WHERE id = $1`,
id, *body.TalkToUserEnabled,
@@ -130,8 +130,11 @@ func TestPatchAbilities_BothFields(t *testing.T) {
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(wsUUID1).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, talk_to_user_enabled = \$3, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, true, true).
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, true).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, true).
WillReturnResult(sqlmock.NewResult(0, 1))
w := patchAbilitiesReq(t, wsUUID1, `{"broadcast_enabled":true,"talk_to_user_enabled":true}`)
@@ -179,25 +182,19 @@ func TestPatchAbilities_TalkToUserUpdateError(t *testing.T) {
}
}
// TestPatchAbilities_BothFields_UpdateError — regression for #2131. When
// both fields are supplied the handler uses a single combined UPDATE. A
// failure of that UPDATE must leave the workspace unchanged (atomic).
func TestPatchAbilities_BothFields_UpdateError(t *testing.T) {
func TestPatchAbilities_BothFields_BroadcastFails(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(wsUUID1).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, talk_to_user_enabled = \$3, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, true, true).
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, true).
WillReturnError(errors.New("disk full"))
w := patchAbilitiesReq(t, wsUUID1, `{"broadcast_enabled":true,"talk_to_user_enabled":true}`)
if w.Code != http.StatusInternalServerError {
t.Fatalf("expected 500, got %d: %s", w.Code, w.Body.String())
}
// Because only one UPDATE is issued, there is no partial-mutation
// path to assert against; sqlmock implicitly verifies no second
// exec occurred.
}
@@ -91,18 +91,6 @@ func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) {
"log_tail": tail,
"source": "bootstrap_watcher",
})
// RFC internal#742 Part 2: this is one of the two boot-failure
// verdict points. We've just flipped a still-running (but
// unconfigured) workspace EC2 to `failed`; the control plane will
// reap the instance shortly. Capture a forensic rescue bundle off
// the live box NOW, before it's torn down, so a wedged workspace is
// post-mortem-inspectable. Best-effort + non-blocking: runs in its
// own goroutine with its own timeout, detached from this request's
// context (which is cancelled the instant this handler returns).
// Failure to capture never changes the boot-failure handling.
captureRescueBundle(id, "bootstrap_watcher")
log.Printf("BootstrapFailed: marked %s failed (tail=%d bytes, err=%q)", id, len(tail), errMsg)
c.JSON(http.StatusOK, gin.H{"ok": true})
}
@@ -95,14 +95,6 @@ func TestIntegration_BroadcastOrgRoot_NonRootSenderResolvesToRoot(t *testing.T)
}
})
// Pre-test hygiene: if a prior run crashed or was killed, its rows may
// still be in the shared integration DB. Remove them before inserting so
// the unique index workspaces_parent_name_uniq does not conflict.
if _, err := conn.ExecContext(ctx,
`DELETE FROM workspaces WHERE name LIKE $1`, prefix+"%"); err != nil {
t.Logf("pre-test cleanup (non-fatal): %v", err)
}
rootID := uuid.New().String()
midID := uuid.New().String()
leafID := uuid.New().String()
@@ -93,16 +93,3 @@ func formatMissingBYOKCredentialError(mode string) string {
mode,
)
}
// formatMissingPlatformProxyError builds the user-facing message for a
// provision failure caused by a platform-managed workspace whose control-plane
// proxy environment is absent (#2162). The platform-managed path requires
// MOLECULE_LLM_BASE_URL + MOLECULE_LLM_USAGE_TOKEN (or their OPENAI_*
// fallbacks) to inject a usable credential; without them the workspace must
// NOT start credential-less.
func formatMissingPlatformProxyError() string {
return "this workspace is configured for platform-managed LLM billing but the control-plane proxy is not ready. " +
"The required platform proxy env (MOLECULE_LLM_BASE_URL + MOLECULE_LLM_USAGE_TOKEN) is absent. " +
"This is usually a transient boot-race; retry in 30 seconds. If it persists, verify the platform proxy " +
"is configured for this tenant/runtime and contact the platform team."
}
@@ -613,32 +613,6 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
if model == "" {
log.Printf("ensureDefaultConfig: workspace %s reached provisioning with empty model — Create handler should have rejected this; rendering empty model: \"\" in config.yaml (workspace will boot not_configured)", workspaceID)
}
// Derive the provider from the providers manifest and stamp it into the
// generated config BEFORE claude-code model normalization strips the
// slash-prefix. DeriveProvider needs the FULL, un-normalized model id
// (e.g. "moonshot/kimi-k2.6") for the exact-id match that resolves the
// canvas claude-code case to provider=platform — normalizing to
// "kimi-k2.6" first would lose that match.
//
// Why this exists (RFC#340 Fix A): a canvas-created claude-code workspace
// with model "moonshot/kimi-k2.6" booted NOT_CONFIGURED — the adapter
// derived provider="moonshot" (slash-split of the model id) which is not
// in the providers registry. CP bakes `provider: platform` via heredoc,
// but the cp#329 config-bundle fetch overwrites /configs/config.yaml with
// THIS (previously providerless) bundle version, so molecule-runtime
// config.py re-derived the wrong provider. Stamping the manifest-derived
// provider here (mirroring CP's buildModelProviderYAML shape) makes the
// config the adapter reads carry the canonical provider.
//
// Reuses the SAME manifest path the config-SAVE validators use
// (providerRegistry() + Manifest.DeriveProvider; see
// model_registry_validation.go). On a derive MISS (unknown/unregistered
// model, or registry unavailable) provider is left empty and the field is
// omitted below — preserving today's behavior; never fail provisioning on
// a derive miss.
derivedProvider := deriveDefaultConfigProvider(runtime, model)
if runtime == "claude-code" {
model = normalizeClaudeCodeModel(model)
}
@@ -666,14 +640,6 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
// Model always at top level — config.py reads raw["model"] for all runtimes.
configYAML += fmt.Sprintf("model: %s\n", quoteModel)
// Stamp the manifest-derived provider at top level (mirroring CP's
// buildModelProviderYAML). Omitted entirely on a derive miss so the prior
// behavior — no `provider:` key, runtime re-derives — is preserved for
// unregistered models (requirement #3).
if derivedProvider != "" {
configYAML += fmt.Sprintf("provider: '%s'\n", yamlEscapeSingleQuotedProvider(derivedProvider))
}
// Add runtime_config. required_env is intentionally omitted — the
// platform injects secrets at container-start time via the secrets API,
// and preflight already validates that the env vars are present before
@@ -683,10 +649,6 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
if runtime == "claude-code" {
configYAML += fmt.Sprintf(" model: %s\n", quoteModel)
}
// Mirror the top-level provider under runtime_config (CP writes both).
if derivedProvider != "" {
configYAML += fmt.Sprintf(" provider: '%s'\n", yamlEscapeSingleQuotedProvider(derivedProvider))
}
configYAML += " timeout: 0\n"
files["config.yaml"] = []byte(configYAML)
@@ -695,48 +657,6 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
return files
}
// deriveDefaultConfigProvider resolves the provider name the adapter should
// see for (runtime, model) using the SAME providers manifest the config-SAVE
// validators use (providerRegistry() + Manifest.DeriveProvider; see
// model_registry_validation.go). It is intentionally fail-OPEN: any miss
// (empty model, registry unavailable, unknown runtime, or a model the runtime
// does not own) returns "" so the caller omits the `provider:` field and the
// generated config keeps its pre-fix shape. It NEVER fails provisioning.
//
// `model` must be the FULL, un-normalized id (e.g. "moonshot/kimi-k2.6") so
// DeriveProvider's exact-id match resolves the canvas claude-code case to
// provider=platform. The availableAuthEnv arg is nil here — config-generation
// has no per-workspace auth context yet (secrets are injected at container
// start), matching the validators' nil call.
func deriveDefaultConfigProvider(runtime, model string) string {
if strings.TrimSpace(model) == "" {
return ""
}
m, err := providerRegistry()
if err != nil || m == nil {
// Registry unavailable (a build-time defect the gen/sync gates catch).
// Fail open — do not stamp a provider, do not block provisioning.
return ""
}
p, err := m.DeriveProvider(runtime, model, nil)
if err != nil {
// Unknown runtime (federation / non-first-party) or a model the
// runtime does not own. Either way, omit the provider and let the
// runtime fall back to its prior derivation — preserving today's
// behavior for unregistered models.
return ""
}
return p.Name
}
// yamlEscapeSingleQuotedProvider escapes a value for a YAML single-quoted
// scalar, mirroring CP's buildModelProviderYAML (a literal single quote is
// doubled). Provider names are registry-controlled identifiers, so this is a
// defense-in-depth measure rather than a hot path.
func yamlEscapeSingleQuotedProvider(v string) string {
return strings.ReplaceAll(v, "'", "''")
}
func normalizeClaudeCodeModel(model string) string {
model = strings.TrimSpace(model)
if before, after, ok := strings.Cut(model, "/"); ok && before != "" && after != "" {
@@ -1083,13 +1003,12 @@ func applyPlatformManagedLLMEnv(ctx context.Context, envVars map[string]string,
anthropicBaseURL := firstNonEmptyEnv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "ANTHROPIC_BASE_URL")
token := firstNonEmptyEnv("MOLECULE_LLM_USAGE_TOKEN", "OPENAI_API_KEY")
if baseURL == "" || token == "" {
// Proxy not configured (boot race / misconfig). The platform_managed
// path REQUIRES the CP proxy env to inject a usable credential.
// Reporting HasUsableLLMCred=true here would start the workspace
// credential-less — the adk-demo dark-wedge class (#2162).
// Return false so the caller's fail-closed branch aborts with
// MISSING_PLATFORM_PROXY.
return platformLLMEnvResult{ResolvedMode: res.ResolvedMode, HasUsableLLMCred: false, Source: res.Source}
// Proxy not configured (boot race / misconfig). On the platform_managed
// path the workspace IS entitled to platform creds, so we do NOT strip
// here — but we report HasUsableLLMCred from whatever survived so the
// caller's fail-closed branch (non-platform only) is never reached on
// this path.
return platformLLMEnvResult{ResolvedMode: res.ResolvedMode, HasUsableLLMCred: true, Source: res.Source}
}
stripPlatformManagedLLMBypassEnv(envVars)
@@ -134,11 +134,6 @@ func TestProvisionWorkspaceAuto_NoBackendMarksFailed(t *testing.T) {
// This is the regression-prevention test for the Design Director bug
// where 7-of-7 sub-agents went down the Docker path on SaaS.
func TestProvisionWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
mock.MatchExpectationsInOrder(false)
@@ -602,11 +597,6 @@ func TestNoCallSiteCallsBareStop(t *testing.T) {
// count without mocking out the retry helper itself, which would
// invert the test contract — the retry IS the dispatcher's job here).
func TestRestartWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
rec := &trackingCPProv{}
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
@@ -805,11 +795,6 @@ func TestResumeHandler_UsesProvisionWorkspaceAuto(t *testing.T) {
// the async tests; the absence of `go` semantics is the load-bearing
// distinction we're pinning.
func TestProvisionWorkspaceAutoSync_RoutesToCPWhenSet(t *testing.T) {
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
mock.MatchExpectationsInOrder(false)
// provisionWorkspaceCP runs prepareProvisionContext synchronously, which
@@ -98,11 +98,6 @@ func (r *recordingCPProv) startedSet() map[string]struct{} {
func TestProvisionWorkspaceCP_ConcurrentBurst_NoSilentDrop(t *testing.T) {
const numWorkspaces = 7
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
// Every goroutine runs prepareProvisionContext → mintWorkspaceSecrets
@@ -230,18 +230,6 @@ func (h *WorkspaceHandler) prepareProvisionContext(
Extra: map[string]interface{}{"error": msg, "code": "MISSING_BYOK_CREDENTIAL", "billing_mode": llmRes.ResolvedMode, "issue": "1994"},
}
}
// Fail closed for a platform-managed workspace whose CP proxy env is
// absent: do NOT start it credential-less (adk-demo dark-wedge class,
// #2162). The platform_managed path requires the proxy injection to
// produce a usable credential.
if llmRes.ResolvedMode == LLMBillingModePlatformManaged && !llmRes.HasUsableLLMCred {
msg := formatMissingPlatformProxyError()
log.Printf("Provisioner: ABORT workspace=%s — platform-managed billing mode but CP proxy env absent (MISSING_PLATFORM_PROXY, molecule-core#2162)", workspaceID)
return nil, &provisionAbort{
Msg: msg,
Extra: map[string]interface{}{"error": msg, "code": "MISSING_PLATFORM_PROXY", "billing_mode": llmRes.ResolvedMode, "issue": "2162"},
}
}
applyRuntimeModelEnv(envVars, payload.Runtime, payload.Model)
if payload.Role != "" {
envVars["MOLECULE_AGENT_ROLE"] = payload.Role
@@ -264,11 +264,6 @@ func TestPrepareProvisionContext_ParentIDInjection(t *testing.T) {
},
}
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mock := setupTestDB(t)
@@ -336,10 +331,6 @@ func TestPrepareProvisionContext_InjectsGitHTTPCredsFromPersonaToken(t *testing.
}
}
t.Setenv("MOLECULE_PERSONA_ROOT", root)
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
cases := []struct {
name string
@@ -468,10 +459,6 @@ func TestPrepareProvisionContext_WorkspaceSecretWinsOverPersonaToken(t *testing.
t.Fatal(err)
}
t.Setenv("MOLECULE_PERSONA_ROOT", root)
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM global_secrets`).
@@ -363,74 +363,6 @@ runtime_config:
}
}
// TestEnsureDefaultConfig_StampsDerivedProvider pins RFC#340 Fix A: a
// canvas-created claude-code workspace with model "moonshot/kimi-k2.6" must
// have the manifest-derived provider stamped into config.yaml at BOTH the top
// level and under runtime_config, so the cp#329 config-bundle the adapter
// reads no longer leaves the runtime to slash-split "moonshot/..." → an
// unregistered provider="moonshot" (the original NOT_CONFIGURED boot). The
// canonical manifest exact-id-matches "moonshot/kimi-k2.6" to provider=platform.
func TestEnsureDefaultConfig_StampsDerivedProvider(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
files := handler.ensureDefaultConfig("ws-moonshot", models.CreateWorkspacePayload{
Name: "Kimi Agent",
Tier: 2,
Runtime: "claude-code",
Model: "moonshot/kimi-k2.6",
})
var parsed struct {
Model string `yaml:"model"`
Provider string `yaml:"provider"`
RuntimeConfig struct {
Model string `yaml:"model"`
Provider string `yaml:"provider"`
} `yaml:"runtime_config"`
}
if err := yaml.Unmarshal(files["config.yaml"], &parsed); err != nil {
t.Fatalf("generated YAML invalid: %v\n%s", err, files["config.yaml"])
}
if parsed.Provider != "platform" {
t.Errorf("top-level provider = %q, want platform\n%s", parsed.Provider, files["config.yaml"])
}
if parsed.RuntimeConfig.Provider != "platform" {
t.Errorf("runtime_config.provider = %q, want platform\n%s", parsed.RuntimeConfig.Provider, files["config.yaml"])
}
// The claude-code model normalization still strips the slash prefix.
if parsed.Model != "kimi-k2.6" {
t.Errorf("top-level model = %q, want kimi-k2.6\n%s", parsed.Model, files["config.yaml"])
}
}
// TestEnsureDefaultConfig_DeriveMissOmitsProvider pins requirement #3: a model
// the providers manifest does NOT recognize for the runtime (a derive miss)
// must NOT write any `provider:` key — neither top-level nor under
// runtime_config — preserving the pre-fix behavior (no empty `provider:`,
// provisioning never fails on a miss). "gpt-4o" is not a registered
// claude-code model, so DeriveProvider errors and the field is omitted.
func TestEnsureDefaultConfig_DeriveMissOmitsProvider(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
files := handler.ensureDefaultConfig("ws-derivemiss", models.CreateWorkspacePayload{
Name: "Unregistered Agent",
Tier: 1,
Runtime: "claude-code",
Model: "gpt-4o",
})
content := string(files["config.yaml"])
if strings.Contains(content, "provider:") {
t.Errorf("derive miss must NOT write any provider: key, got:\n%s", content)
}
// Sanity: a derive miss must still produce a valid, model-bearing config.
if !strings.Contains(content, `model: "gpt-4o"`) {
t.Errorf("derive miss should still render the model, got:\n%s", content)
}
}
func TestEnsureDefaultConfig_CustomModel(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@@ -1492,11 +1424,6 @@ func (s *stubFailingCPProv) IsRunning(_ context.Context, _ string) (bool, error)
// the broadcast payload would surface every marker; the canned
// "provisioning failed" message must surface none of them.
func TestProvisionWorkspaceCP_NoInternalErrorsInBroadcast(t *testing.T) {
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
// loadWorkspaceSecrets queries global_secrets and workspace_secrets
@@ -876,9 +876,8 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
h.provisionWorkspaceAutoSync(workspaceID, "", nil, payload)
// sendRestartContext is a one-way notification to the new container; safe
// to fire async — the next restart cycle won't depend on it completing.
// Tracked via h.goAsync so tests can wait for it via h.asyncWG before
// closing the sqlmock. Without this, untracked goroutines hit the restored
// mock and cause "was not expected" errors in parallel CI execution (mc#1264).
// Tracked via goAsync so the test harness can drain it before the
// global db.DB swap (sendRestartContext reads db.DB).
h.goAsync(func() { h.sendRestartContext(workspaceID, restartData) })
}
@@ -16,7 +16,7 @@ const SchemaVersion = 1
// Fingerprint is a stable content hash of the generated projection (schema
// version + provider catalog + runtime native sets). It changes iff the
// registry DATA changes (comment-only YAML edits do not churn it).
const Fingerprint = "a491f5ff8a17ef59"
const Fingerprint = "8f733b112695b926"
// GenProvider is the generated projection of one provider catalog entry —
// the subset a downstream consumer needs to derive + display a provider.
@@ -50,7 +50,7 @@ var Providers = []GenProvider{
{Name: "openai-api", DisplayName: "OpenAI API", Protocol: "openai", AuthMode: "anthropic_api", AuthEnv: []string{"OPENAI_API_KEY"}, ModelPrefixMatch: "^openai-api[:/]", IsPlatform: false, UpstreamVendor: "openai"},
{Name: "moonshot", DisplayName: "Moonshot (Kimi)", Protocol: "openai", AuthMode: "third_party_anthropic_compat", AuthEnv: []string{"MOONSHOT_API_KEY", "KIMI_API_KEY"}, ModelPrefixMatch: "^moonshot[:/-]", IsPlatform: false, UpstreamVendor: "moonshot"},
{Name: "minimax", DisplayName: "MiniMax", Protocol: "openai", AuthMode: "third_party_anthropic_compat", AuthEnv: []string{"MINIMAX_API_KEY", "ANTHROPIC_AUTH_TOKEN", "ANTHROPIC_API_KEY"}, ModelPrefixMatch: "(?i)^minimax-m", IsPlatform: false, UpstreamVendor: "minimax"},
{Name: "platform", DisplayName: "Platform", Protocol: "anthropic", AuthMode: "third_party_anthropic_compat", AuthEnv: []string{"MOLECULE_LLM_USAGE_TOKEN"}, ModelPrefixMatch: "^platform/", IsPlatform: true},
{Name: "platform", DisplayName: "Platform", Protocol: "anthropic", AuthMode: "third_party_anthropic_compat", AuthEnv: []string{"ANTHROPIC_API_KEY", "MOLECULE_LLM_USAGE_TOKEN"}, ModelPrefixMatch: "^platform/", IsPlatform: true},
{Name: "xiaomi-mimo", DisplayName: "Xiaomi MiMo", Protocol: "anthropic", AuthMode: "third_party_anthropic_compat", AuthEnv: []string{"ANTHROPIC_AUTH_TOKEN", "ANTHROPIC_API_KEY"}, ModelPrefixMatch: "^mimo-", IsPlatform: false},
{Name: "zai", DisplayName: "Z.ai (GLM)", Protocol: "anthropic", AuthMode: "third_party_anthropic_compat", AuthEnv: []string{"GLM_API_KEY", "ANTHROPIC_AUTH_TOKEN", "ANTHROPIC_API_KEY"}, ModelPrefixMatch: "(?i)^glm-", IsPlatform: false},
{Name: "kimi-coding", DisplayName: "Moonshot Kimi (coding-tuned)", Protocol: "anthropic", AuthMode: "third_party_anthropic_compat", AuthEnv: []string{"KIMI_API_KEY", "ANTHROPIC_API_KEY", "ANTHROPIC_AUTH_TOKEN"}, ModelPrefixMatch: "^kimi-", IsPlatform: false},
@@ -89,9 +89,8 @@ var Runtimes = map[string][]GenRuntimeRef{
{Name: "platform", Models: []string{"openai/gpt-5.4", "openai/gpt-5.4-mini"}},
},
"google-adk": {
{Name: "platform", Models: []string{"platform:gemini-2.5-pro", "platform:gemini-2.5-flash"}},
{Name: "google", Models: []string{"gemini-2.5-pro", "gemini-2.5-flash"}},
{Name: "vertex", Models: []string{"vertex:gemini-2.5-pro"}},
{Name: "google", Models: []string{"gemini-2.5-pro"}},
},
"hermes": {
{Name: "kimi-coding", Models: []string{"kimi-coding/kimi-k2"}},
@@ -1,42 +0,0 @@
package providers
import "testing"
// Proper-SSOT (task #65): google-adk keyless Gemini resolves to the closed
// platform provider -> IsPlatform=true; BYOK AI Studio -> google. The
// platform: select ids are registered so workspace-create accepts them
// (was 422 UNREGISTERED_MODEL_FOR_RUNTIME).
func TestGoogleADK_PlatformGeminiResolvesToPlatform(t *testing.T) {
m, err := LoadManifest()
if err != nil {
t.Fatal(err)
}
for _, id := range []string{"platform:gemini-2.5-pro", "platform:gemini-2.5-flash"} {
p, err := m.DeriveProvider("google-adk", id, nil)
if err != nil {
t.Fatalf("%s: %v", id, err)
}
if p.Name != PlatformProviderName || !p.IsPlatform() {
t.Errorf("%s -> %q IsPlatform=%v; want platform", id, p.Name, p.IsPlatform())
}
}
p, err := m.DeriveProvider("google-adk", "gemini-2.5-pro", nil)
if err != nil {
t.Fatal(err)
}
if p.IsPlatform() || p.Name != "google" {
t.Errorf("gemini-2.5-pro -> %q IsPlatform=%v; want google byok", p.Name, p.IsPlatform())
}
models, _ := m.ModelsForRuntime("google-adk")
want := map[string]bool{"platform:gemini-2.5-pro": false, "platform:gemini-2.5-flash": false}
for _, id := range models {
if _, ok := want[id]; ok {
want[id] = true
}
}
for id, ok := range want {
if !ok {
t.Errorf("%s not registered for google-adk — create would 422", id)
}
}
}
@@ -292,7 +292,7 @@ providers:
# PR-1 simplification when only claude-code referenced platform.
base_url_template: "https://api.moleculesai.app/api/v1/internal/llm/openai/v1"
base_url_anthropic: "https://api.moleculesai.app/api/v1/internal/llm/anthropic/v1"
auth_env: [MOLECULE_LLM_USAGE_TOKEN]
auth_env: [ANTHROPIC_API_KEY, MOLECULE_LLM_USAGE_TOKEN]
auth_token_env: ANTHROPIC_API_KEY
# Adapter routes kimi- / moonshot/ through platform by default. No bare
# vendor prefix of its own; it multiplexes other vendors' slugs. Match
@@ -412,24 +412,14 @@ providers:
model_prefix_match: "^gemini-"
model_aliases: []
# Google Vertex AI — served via the Molecule CP LLM proxy (NOT on-box ADC).
# google-adk routes ALL Gemini through the proxy, which mints a Vertex token
# server-side over Workload Identity Federation (AWS -> GCP STS -> SA; see
# internal/vertexauth + llm_proxy.go google/vertex case) and meters usage to
# org credits. The former on-box ADC delivery (a NON-SECRET external_account
# cred-config written to /configs/gcp-adc.json via GOOGLE_APPLICATION_CREDENTIALS,
# injected by the provisioner) was REMOVED: a tenant has EC2 root and could
# have used that credential to call Vertex DIRECTLY, bypassing metering
# (keyless-Vertex billing leak — task #64 / RFC internal#763; provisioner
# force-off + template routes vertex: through the proxy). The `vertex:`
# namespace + this entry remain for proxy routing/billing of Vertex-upstream
# requests, distinct from the API-key `google` vendor's ^gemini-
# (TestNoAmbiguousModelMatch).
#
# NOTE: display_name ("keyless ADC") and auth_env (GOOGLE_APPLICATION_CREDENTIALS)
# are now VESTIGIAL — no consumer reads auth_env post-leak-fix, but it must stay
# non-empty (providers.go validate). Left as-is to keep this a comment-only,
# regen-free change; retiring them is a registry-regen follow-up.
# Google Vertex AI — KEYLESS arm (mirrors the anthropic-oauth / anthropic-api
# and openai-subscription / openai-api split: same vendor, distinct auth).
# google-adk serves Gemini via Vertex using Application Default Credentials
# over Workload Identity Federation (AWS EC2 role -> GCP STS -> SA), injected
# by the provisioner (cp#416 + envs.yaml vertex block) as a NON-SECRET
# external_account cred-config at GOOGLE_APPLICATION_CREDENTIALS. No API key.
# Distinct `vertex:` model namespace keeps it unambiguous vs the API-key
# `google` vendor's ^gemini- (TestNoAmbiguousModelMatch).
- name: vertex
display_name: "Google Vertex AI (keyless ADC)"
vendor_logo: "google"
@@ -854,24 +844,11 @@ runtimes:
# this runtimes entry declares the selectable model set.
google-adk:
providers:
# Platform-managed (keyless, metered) Gemini via the Molecule LLM proxy ->
# Vertex AI (server-side WIF mint; NO credential on the tenant box — the
# keyless-Vertex leak fix). Resolves to the closed `platform` provider ->
# IsPlatform=true -> platform_managed billing + required_env=[] (derived).
# The org-compliant default. The runtime translates the `platform:` select
# id to the bare wire id the proxy routes to Vertex.
- name: platform
models:
- platform:gemini-2.5-pro
- platform:gemini-2.5-flash
# API-key BYOK arm: AI Studio (the tenant's OWN GOOGLE_API_KEY).
- name: google
models:
- gemini-2.5-pro
- gemini-2.5-flash
# DEPRECATED transitional: vertex: ids stay registered until templates
# move to platform: (superseded by the platform arm above). Remove in a
# cleanup once no template references vertex:gemini-*.
# Keyless Vertex (org-compliant default): Gemini via Vertex AI + ADC/WIF.
- name: vertex
models:
- vertex:gemini-2.5-pro
- vertex:gemini-2.5-pro
# API-key BYOK arm: AI Studio GEMINI_API_KEY/GOOGLE_API_KEY.
- name: google
models:
- gemini-2.5-pro
@@ -29,7 +29,7 @@ import (
// canonicalProvidersYAMLSHA256 is the sha256 of the canonical providers.yaml as
// synced from molecule-controlplane. Bumped deliberately on each re-sync (see
// file doc). Cross-checked live by the sync-providers-yaml CI workflow.
const canonicalProvidersYAMLSHA256 = "021ae082c2bbbbb61c406cae03205ac6b7fff160ae5976cfc64de3de676d02b2"
const canonicalProvidersYAMLSHA256 = "dec73199e26cee2d395a0acece99771618d3879dc5ca724ba57cb5b38079c6ce"
func TestSyncedYAMLMatchesCanonicalSHA(t *testing.T) {
sum := sha256.Sum256(embeddedYAML)
@@ -10,15 +10,8 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
)
// rescueVolumeGraceHours surfaces the rescue grace as whole hours for
// the retention-contract assertion (RFC internal#742 Part 2).
func rescueVolumeGraceHours() int {
return int(rescue.RescueVolumeGrace.Hours())
}
// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode
// sweeper tests. Records every Stop call so tests can assert which
// workspace IDs were re-issued.
@@ -104,55 +97,6 @@ func TestCPSweepOnce_NoOrphans(t *testing.T) {
}
}
// TestCPSweepOnce_DoesNotReapFailedWorkspace — RFC internal#742 Part 2
// volume-retention guarantee, molecule-core side.
//
// A boot-FAILED workspace (status='failed') must NOT be terminated by
// the platform's orphan sweeper: its instance + /configs data volume are
// retained through the rescue grace (rescue.RescueVolumeGrace) so a live
// rescue read is possible, distinct from the user-prune erase path. The
// sweeper reaps ONLY status='removed' (the explicit deprovision path),
// so a `failed` row is structurally excluded at the SELECT — it never
// reaches reaper.Stop. We assert the predicate filters to 'removed'
// (so the failed instance survives) and that no Stop fires for a DB
// whose only orphan-shaped row is `failed`.
//
// This is the "if the sweeper already keeps volumes by default, confirm
// + add a test asserting it" branch of the RFC: it does, by construction.
func TestCPSweepOnce_DoesNotReapFailedWorkspace(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
// The sweeper's SELECT carries `status = 'removed'`. A boot-failed
// workspace (status='failed') does not match that predicate, so the
// real DB returns it nowhere in this result set — modelled as the
// empty result the `removed`-only filter produces when the only
// instance-bearing row is `failed`. The regex pins the retention-
// critical predicate so a future widening to include 'failed' (which
// would terminate boot-failed boxes mid-rescue) fails this test.
mock.ExpectQuery(`(?s)WHERE status = 'removed'\s+AND instance_id IS NOT NULL`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"})) // failed row excluded by predicate
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("boot-failed workspace must be RETAINED (no terminate); got Stop calls %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestRescueVolumeGraceIsDistinctFromPrune documents that the rescue
// grace is its own contract (24h) and not coupled to any prune timing —
// the value is the SSOT the control-plane reaper must honour.
func TestRescueVolumeGraceIsDistinctFromPrune(t *testing.T) {
if rescueVolumeGraceHours() != 24 {
t.Errorf("rescue volume grace = %dh, want 24h (RFC internal#742)", rescueVolumeGraceHours())
}
}
// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd
// independently; one failure doesn't block others.
func TestCPSweepOnce_MultipleOrphans(t *testing.T) {
@@ -92,23 +92,6 @@ func provisioningTimeoutFor(runtime string, lookup RuntimeTimeoutLookup) time.Du
return DefaultProvisioningTimeout
}
// BootFailureRescueHook, when wired, is invoked once per workspace the
// sweep flips from `provisioning` to `failed` — i.e. on the boot-failure
// verdict, BEFORE the control plane reaps the instance. It captures a
// forensic rescue bundle off the still-running (but boot-failed) EC2 and
// ships it to obs/Loki (RFC internal#742 Part 2). Wired in main.go to
// handlers.captureRescueBundle via a thin adapter; nil in tests + on
// self-hosted deploys (no rescue shipping there).
//
// Function-typed injection (not an import of handlers) keeps the
// existing handlers→registry import direction intact — registry must not
// import handlers.
//
// MUST be best-effort + non-blocking: the hook itself dispatches the
// capture on its own goroutine with its own timeout, so the sweep loop
// is never slowed or blocked by a hung EIC tunnel on the dead box.
var BootFailureRescueHook func(workspaceID, instanceID, reason string)
// StartProvisioningTimeoutSweep periodically scans for workspaces stuck in
// `status='provisioning'` past the timeout window, flips them to `failed`,
// and broadcasts a WORKSPACE_PROVISION_TIMEOUT event so the canvas can
@@ -161,7 +144,7 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
// flight, not historical) and the partial index on status keeps
// it fast.
rows, err := db.DB.QueryContext(ctx, `
SELECT id, COALESCE(runtime, ''), COALESCE(instance_id, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int
SELECT id, COALESCE(runtime, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int
FROM workspaces
WHERE status = 'provisioning'
`)
@@ -172,15 +155,14 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
defer rows.Close()
type candidate struct {
id string
runtime string
instanceID string
ageSec int
id string
runtime string
ageSec int
}
var ids []candidate
for rows.Next() {
var c candidate
if err := rows.Scan(&c.id, &c.runtime, &c.instanceID, &c.ageSec); err == nil {
if err := rows.Scan(&c.id, &c.runtime, &c.ageSec); err == nil {
ids = append(ids, c)
}
}
@@ -218,19 +200,6 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
continue
}
log.Printf("Provision-timeout sweep: %s (runtime=%q) stuck in provisioning > %s — marked failed", c.id, c.runtime, timeout)
// RFC internal#742 Part 2: this flip is a boot-failure verdict.
// The instance is still running (the CP reaps it shortly after);
// capture a forensic rescue bundle off it NOW, before teardown.
// Best-effort + non-blocking — the hook dispatches on its own
// goroutine + timeout, so a hung EIC tunnel on the dead box can't
// slow the sweep. Only fires on a real flip (affected==1), never
// on a race (affected==0) or a non-overdue row — guaranteeing it
// runs once per boot-failure verdict and never on a healthy row.
if BootFailureRescueHook != nil {
BootFailureRescueHook(c.id, c.instanceID, "provision_timeout_sweep")
}
// Emit as WORKSPACE_PROVISION_FAILED, not _TIMEOUT, because the
// canvas event handler only flips node state on the _FAILED case.
// A separate event type was considered but the UI reaction is
@@ -1,130 +0,0 @@
package registry
import (
"context"
"sync"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/DATA-DOG/go-sqlmock"
)
// rescueHookRecorder captures the args of every BootFailureRescueHook
// invocation so tests can assert the rescue capture fires exactly on the
// boot-failure verdict — and never on a healthy/raced row.
type rescueHookRecorder struct {
mu sync.Mutex
calls [][3]string // {workspaceID, instanceID, reason}
}
func (r *rescueHookRecorder) hook() func(workspaceID, instanceID, reason string) {
return func(workspaceID, instanceID, reason string) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, [3]string{workspaceID, instanceID, reason})
}
}
func (r *rescueHookRecorder) count() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.calls)
}
// withRescueHook installs a recorder as the package-level
// BootFailureRescueHook for the test's duration.
func withRescueHook(t *testing.T) *rescueHookRecorder {
t.Helper()
rec := &rescueHookRecorder{}
prev := BootFailureRescueHook
BootFailureRescueHook = rec.hook()
t.Cleanup(func() { BootFailureRescueHook = prev })
return rec
}
// TestSweep_RescueFiresOnBootFailureVerdict — the core RFC internal#742
// assertion: when the sweep flips a stuck workspace to `failed`, the
// rescue hook fires once with the workspace + instance id and the
// provision_timeout_sweep reason, BEFORE teardown.
func TestSweep_RescueFiresOnBootFailureVerdict(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-0badf00d", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 1 {
t.Fatalf("rescue hook should fire once on a boot-failure flip, got %d", rec.count())
}
got := rec.calls[0]
if got[0] != "ws-stuck" || got[1] != "i-0badf00d" || got[2] != "provision_timeout_sweep" {
t.Errorf("rescue hook args = %v, want {ws-stuck i-0badf00d provision_timeout_sweep}", got)
}
}
// TestSweep_RescueDoesNotFireOnRace — affected==0 means the row raced to
// online/restart between SELECT and UPDATE. That is NOT a boot-failure
// verdict, so the rescue capture must NOT fire (we'd be snapshotting a
// healthy box that's about to come online).
func TestSweep_RescueDoesNotFireOnRace(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-raced", "codex", "i-raced", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 0)) // raced — 0 rows
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 0 {
t.Errorf("rescue hook must NOT fire on a raced flip (affected==0), got %d calls", rec.count())
}
}
// TestSweep_RescueDoesNotFireOnHealthyRow — a not-yet-overdue row is
// never flipped, so the rescue capture must not fire. Guards against the
// hook being attached above the age gate.
func TestSweep_RescueDoesNotFireOnHealthyRow(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
// hermes at 11 min (660s) < 30 min hermes budget → not overdue, no flip.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-healthy", "hermes", "i-healthy", 660}))
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 0 {
t.Errorf("rescue hook must NOT fire on a non-overdue (healthy) row, got %d calls", rec.count())
}
}
// TestSweep_RescueNilHookIsSafe — on a deploy where the hook is unwired
// (self-hosted / no rescue shipping), the sweep must still flip + emit
// without panicking on the nil hook.
func TestSweep_RescueNilHookIsSafe(t *testing.T) {
mock := setupTestDB(t)
prev := BootFailureRescueHook
BootFailureRescueHook = nil
t.Cleanup(func() { BootFailureRescueHook = prev })
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-x", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit, nil) // must not panic
if emit.count() != 1 {
t.Errorf("flip+emit must still happen with a nil rescue hook, got %d events", emit.count())
}
}
@@ -7,8 +7,8 @@ import (
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/DATA-DOG/go-sqlmock"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
)
// fakeEmitter records every RecordAndBroadcast call so tests can assert
@@ -42,15 +42,12 @@ func (f *fakeEmitter) count() int {
return len(f.events)
}
// candidateRows builds the query result (id, runtime, instance_id,
// age_sec). instance_id was added for the RFC internal#742 rescue hook —
// it rides alongside runtime so the boot-failure capture can reach the
// still-running box. Tests that don't care about the rescue path pass
// "" for instance_id. Use this in every sweep test to match the SELECT.
func candidateRows(rows ...[4]any) *sqlmock.Rows {
r := sqlmock.NewRows([]string{"id", "runtime", "instance_id", "age_sec"})
// candidateRows builds the new-shape query result (id, runtime, age_sec).
// Use this in every sweep test to match the runtime-aware SELECT.
func candidateRows(rows ...[3]any) *sqlmock.Rows {
r := sqlmock.NewRows([]string{"id", "runtime", "age_sec"})
for _, row := range rows {
r = r.AddRow(row[0], row[1], row[2], row[3])
r = r.AddRow(row[0], row[1], row[2])
}
return r
}
@@ -61,8 +58,8 @@ func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) {
mock := setupTestDB(t)
// claude-code workspace, 700s old > 600s default timeout → flipped.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
@@ -95,8 +92,8 @@ func TestSweepStuckProvisioning_HermesGets30MinSlack(t *testing.T) {
// 11 min = 660 sec. < HermesProvisioningTimeout (1800s).
// No UPDATE should fire — hermes still has time.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-hermes-booting", "hermes", "i-h1", 660}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-hermes-booting", "hermes", 660}))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit, nil)
@@ -117,8 +114,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) {
mock := setupTestDB(t)
// 31 min = 1860 sec > HermesProvisioningTimeout (1800s).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-hermes-stuck", "hermes", "i-h2", 1860}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-hermes-stuck", "hermes", 1860}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-hermes-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -153,8 +150,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) {
func TestSweepStuckProvisioning_ManifestOverrideSparesRow(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-claude-templated", "claude-code", "i-ct", 660}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-claude-templated", "claude-code", 660}))
// No ExpectExec — if the sweeper still flips the row, sqlmock will
// fail with an unexpected-query error.
@@ -186,8 +183,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin
mock := setupTestDB(t)
// 21 min = 1260s > 1200s manifest override → flipped.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-claude-truly-stuck", "claude-code", "i-cts", 1260}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-claude-truly-stuck", "claude-code", 1260}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-claude-truly-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -224,8 +221,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin
func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-raced", "claude-code", "i-raced", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-raced", "claude-code", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
@@ -247,7 +244,7 @@ func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows())
emit := &fakeEmitter{}
@@ -268,10 +265,10 @@ func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows(
[4]any{"ws-claude-code", "claude-code", "i-cc", 700},
[4]any{"ws-hermes", "hermes", "i-hh", 1860},
[3]any{"ws-claude-code", "claude-code", 700},
[3]any{"ws-hermes", "hermes", 1860},
))
mock.ExpectExec(`UPDATE workspaces`).
@@ -295,8 +292,8 @@ func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
func TestSweepStuckProvisioning_BroadcastFailureDoesNotCrash(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
-321
View File
@@ -1,321 +0,0 @@
// Package rescue captures a fixed post-mortem "rescue bundle" off a
// workspace EC2 whose boot FAILED — before the platform's sweeper /
// control-plane reaps the instance — and ships it to obs/Loki so a
// wedged workspace (e.g. the codex provider-derivation failure that
// motivated RFC internal#742) is inspectable instead of an
// uninspectable wall.
//
// Design constraints (RFC internal#742, Part 2):
//
// - BEST-EFFORT + NON-BLOCKING. Capture MUST NOT change boot-failure
// semantics or add latency to the failure path. Callers fire
// Capture in its own goroutine; Capture additionally bounds itself
// with CaptureTimeout so a hung EIC tunnel can't wedge the
// goroutine forever.
// - FIRES ON THE BOOT-FAILURE VERDICT ONLY. The two hook points are
// the provision-timeout sweep (registry.sweepStuckProvisioning) and
// the out-of-band bootstrap-watcher signal
// (handlers.WorkspaceHandler.BootstrapFailed). Normal teardown /
// deprovision / recreate / billing-suspend / hibernate paths do NOT
// call Capture — see the RFC's path enumeration.
// - REDACT BEFORE ANYTHING LEAVES THE BOX. Every collected section is
// run through the injected Redact func (wired to the existing
// handlers.redactSecrets secret-scan) before it is shipped. Raw
// tokens/keys never reach Loki.
//
// The package is a LEAF: it imports only internal/audit (the obs
// shipper) so it can be called from both handlers and registry without
// an import cycle (registry must not import handlers). The two heavy
// dependencies — the EIC/SSH remote-command runner and the redactor —
// are injected as package-level func vars, wired once at boot from the
// handlers package (which owns withEICTunnel + redactSecrets). Tests
// swap them for fakes.
package rescue
import (
"context"
"fmt"
"log"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/audit"
)
// CaptureTimeout bounds the whole bundle collection. The sweeper runs
// every 30s and the CP reap follows the failure verdict; 45s gives the
// EIC dance (~3-5s) plus six short remote commands (<2s each) generous
// headroom while still finishing well before the instance is torn down.
// Distinct from the per-op eicFileOpTimeout so a slow box that already
// failed to boot can't hang the capture goroutine indefinitely.
const CaptureTimeout = 45 * time.Second
// LokiKind is the Loki stream label value that tags every rescue
// record. Queryable as `kind="rescue"` (RFC internal#742 §Loki labels).
const LokiKind = "rescue"
// RescueVolumeGrace is how long a boot-failed workspace's /configs data
// volume (and its still-running instance) must be RETAINED past the
// boot-failure verdict so a live rescue read is possible — distinct from
// the user-requested prune path (cp#415), which is an explicit erase.
//
// In molecule-core (the tenant platform) the boot-failure verdict only
// flips workspaces.status to `failed`; it never issues a terminate. The
// platform's two reapers (registry.StartCPOrphanSweeper +
// handlers deprovision) act ONLY on status='removed', so a `failed`
// workspace's instance + /configs volume are retained here by
// construction — see TestCPSweepOnce_DoesNotReapFailedWorkspace. The
// time-bounded reap of the failed instance is the control plane's
// bootstrap-watcher concern; this constant is the SSOT for the grace
// the CP must honour (24h covers an operator's next-business-day
// post-mortem without leaking the volume indefinitely).
const RescueVolumeGrace = 24 * time.Hour
// rescueEventType is the audit event_type carried in the shipped
// record. The obs shipper (internal/audit) already maps event_type to a
// low-cardinality Loki label; "rescue.bundle" keeps the rescue stream
// trivially filterable alongside the existing audit taxonomy.
const rescueEventType = "rescue.bundle"
// RunRemote runs a single shell command on the still-running (but
// unconfigured) workspace EC2 over EIC/SSH and returns its combined
// output. Wired at boot to the handlers EIC runner
// (rescueRunRemoteViaEIC). nil until wired — Capture degrades to a
// logged no-op rather than panicking, so an operator who hasn't wired
// the hook still gets a clear signal instead of a crash on the failure
// path.
var RunRemote func(ctx context.Context, instanceID, command string) (string, error)
// Redact scrubs secret-shaped substrings from a collected section
// before it leaves the box. Wired at boot to handlers.redactSecrets.
// nil until wired — Capture refuses to ship un-redacted content if the
// redactor is missing (fails closed: logs + aborts rather than leaking
// raw config).
var Redact func(workspaceID, content string) string
// section is one labelled chunk of the rescue bundle: a human-readable
// name + the remote command that produces it.
type section struct {
name string
command string
}
// bundleSections is the FIXED set collected on every boot-failure
// rescue (RFC internal#742 §Build.1). Order is the post-mortem reading
// order: config first, then boot logs, then container state, then the
// resolved model/provider env that drove the codex derivation failure.
//
// - /configs/config.yaml + system-prompt.md: the managed config the
// runtime booted against (redacted; system-prompt can embed keys).
// - cloud-init-output.log tail: the user-data execution trace — where
// a wedged boot actually died.
// - docker ps -a: container state (did the agent container even
// start, exit-code, restart loop).
// - agent container logs: the runtime's own stderr (the codex
// provider-derivation panic lives here).
// - MODEL|PROVIDER|RUNTIME env: the resolved routing that motivated
// the RFC. `sudo cat` of the container env via docker inspect-style
// grep — see the command.
//
// All commands use `sudo -n` (the box's /configs is root-owned; ubuntu
// has passwordless sudo) and swallow missing-target stderr so a section
// that can't be produced ships as a short marker instead of failing the
// whole bundle. Kept as data (not inlined) so the redaction + ship loop
// is uniform and the set is reviewable in one place.
var bundleSections = []section{
{
name: "config.yaml",
command: "sudo -n cat /configs/config.yaml 2>/dev/null || echo '(/configs/config.yaml absent)'",
},
{
name: "system-prompt.md",
command: "sudo -n cat /configs/system-prompt.md 2>/dev/null || echo '(/configs/system-prompt.md absent)'",
},
{
name: "cloud-init-output.log.tail",
command: "sudo -n tail -200 /var/log/cloud-init-output.log 2>/dev/null || echo '(cloud-init-output.log absent)'",
},
{
name: "docker-ps",
command: "sudo -n docker ps -a 2>/dev/null || echo '(docker unavailable)'",
},
{
// The agent container is the first non-infra container; grab the
// most recently created one and tail its logs. `head -1` of
// `docker ps -a -q` is creation-ordered newest-first, which is
// the agent runtime on a workspace box.
name: "agent-container.logs.tail",
command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker logs --tail 200 \"$cid\" 2>&1 || echo '(no agent container)'",
},
{
// Resolved model/provider/runtime env from the agent container.
// `docker inspect` the env array and grep the routing keys. This
// is the field that pinpoints a provider-derivation failure.
name: "model-provider-runtime.env",
command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' \"$cid\" 2>/dev/null | grep -E 'MODEL|PROVIDER|RUNTIME' || echo '(no env)'",
},
}
// Input is the identity of the failed workspace being rescued.
type Input struct {
InstanceID string // EC2 instance id of the still-running failed box
WorkspaceID string
OrgID string
// Reason is a short tag for WHY the rescue fired (e.g.
// "provision_timeout_sweep" or "bootstrap_watcher") — carried into
// the Loki record so an operator can correlate the bundle with the
// failure verdict that triggered it.
Reason string
}
// Section is one labelled, already-redacted chunk of the persisted
// rescue bundle. It mirrors what ship() emits to Loki per-section, but
// is the unit the queryable store (and the read endpoint) returns.
// `Redacted` is false only for collection-failure markers (the section
// command couldn't run); true sections passed through the secret-scan.
type Section struct {
Name string `json:"name"`
Content string `json:"content"`
Redacted bool `json:"redacted"`
}
// Bundle is the full, already-redacted post-mortem bundle for ONE
// boot-failure capture — the unit persisted to the queryable store on
// capture (RFC internal#742 Part 3) and served by
// GET /workspaces/:id/rescue. Sections are in fixed reading order
// (config → boot logs → container state → resolved routing env).
type Bundle struct {
WorkspaceID string `json:"workspace_id"`
OrgID string `json:"org_id"`
InstanceID string `json:"instance_id"`
Reason string `json:"reason"`
Sections []Section `json:"sections"`
}
// PersistBundle writes the fully-collected, already-redacted bundle to
// the queryable per-tenant store (rescue_bundles table) so the rescue
// READ endpoint can serve it without obs/Loki read creds (RFC
// internal#742 Part 3 read-path decision — see the migration header).
//
// Wired at boot from the handlers package (which owns db.DB) to keep
// internal/rescue a leaf: it must NOT import internal/db, or registry —
// which imports rescue — would inherit a db dependency it can call
// without a cycle, but more importantly the leaf stays trivially
// testable with a fake. nil until wired: a capture with no store wired
// still ships to Loki (Part 2 behavior preserved) and logs that it
// skipped the DB persist, rather than failing the capture.
var PersistBundle func(ctx context.Context, b Bundle) error
// Capture collects the fixed rescue bundle off the failed instance,
// redacts each section, and ships it to Loki under
// {kind="rescue", org=<OrgID>, workspace_id=<WorkspaceID>}.
//
// BEST-EFFORT: every failure mode (missing wiring, EIC error, a single
// section that won't collect) is logged and does NOT propagate — Capture
// never returns an error and never panics, so the boot-failure handling
// at the call site is unaffected. The caller is expected to invoke this
// in its own goroutine; Capture additionally self-bounds with
// CaptureTimeout.
func Capture(ctx context.Context, in Input) {
defer func() {
// A logging helper on the failure path must never take the
// process down. Recover defensively — the redactor / shipper are
// injected and a future mis-wire shouldn't crash the sweeper.
if r := recover(); r != nil {
log.Printf("rescue: capture panicked for ws=%s instance=%s: %v", in.WorkspaceID, in.InstanceID, r)
}
}()
if in.InstanceID == "" {
// No live box to read — nothing to rescue (e.g. failure before
// any EC2 was launched). Not an error; just skip.
log.Printf("rescue: skip ws=%s — no instance_id (nothing to capture)", in.WorkspaceID)
return
}
if RunRemote == nil {
log.Printf("rescue: skip ws=%s instance=%s — RunRemote not wired (best-effort no-op)", in.WorkspaceID, in.InstanceID)
return
}
if Redact == nil {
// Fail CLOSED: without a redactor we could leak raw tokens to
// Loki. Abort rather than ship unredacted.
log.Printf("rescue: ABORT ws=%s instance=%s — Redact not wired; refusing to ship un-redacted bundle", in.WorkspaceID, in.InstanceID)
return
}
ctx, cancel := context.WithTimeout(ctx, CaptureTimeout)
defer cancel()
log.Printf("rescue: capturing bundle ws=%s instance=%s reason=%s", in.WorkspaceID, in.InstanceID, in.Reason)
collected := 0
// Accumulate the per-section result alongside shipping each to Loki,
// so the same already-redacted content is persisted to the queryable
// store as one bundle row after the loop. Shipping stays per-section
// (Part 2 Loki behavior unchanged); persistence is the single
// bundle the read endpoint serves.
bundle := Bundle{
WorkspaceID: in.WorkspaceID,
OrgID: in.OrgID,
InstanceID: in.InstanceID,
Reason: in.Reason,
Sections: make([]Section, 0, len(bundleSections)),
}
for _, sec := range bundleSections {
raw, err := RunRemote(ctx, in.InstanceID, sec.command)
if err != nil {
// One section failing (e.g. ssh blip mid-collection) must not
// abort the rest — ship a marker for it and continue.
log.Printf("rescue: section %q failed for ws=%s: %v", sec.name, in.WorkspaceID, err)
marker := fmt.Sprintf("(rescue: section collection failed: %v)", err)
ship(ctx, in, sec.name, marker, false)
bundle.Sections = append(bundle.Sections, Section{Name: sec.name, Content: marker, Redacted: false})
continue
}
redacted := Redact(in.WorkspaceID, raw)
ship(ctx, in, sec.name, redacted, true)
bundle.Sections = append(bundle.Sections, Section{Name: sec.name, Content: redacted, Redacted: true})
collected++
}
log.Printf("rescue: shipped %d/%d sections ws=%s instance=%s kind=%s", collected, len(bundleSections), in.WorkspaceID, in.InstanceID, LokiKind)
// Persist the redacted bundle to the queryable store so the rescue
// READ endpoint can serve it without obs/Loki read creds. Best-effort
// and last: a persist failure (or no store wired) must NOT undo the
// Loki ship that already succeeded, and never panics the failure path.
persistBundle(ctx, bundle)
}
// persistBundle writes the collected bundle to the queryable store if a
// store is wired. Best-effort: a nil store (operator hasn't wired the
// READ path) or a DB error is logged and swallowed — the Loki ship is
// the durable cross-tenant copy, and the failure path must never be
// disturbed by the post-mortem read store.
func persistBundle(ctx context.Context, b Bundle) {
if PersistBundle == nil {
log.Printf("rescue: store not wired — bundle for ws=%s shipped to Loki only (no queryable copy)", b.WorkspaceID)
return
}
if err := PersistBundle(ctx, b); err != nil {
log.Printf("rescue: persist bundle for ws=%s failed (shipped to Loki regardless): %v", b.WorkspaceID, err)
}
}
// ship emits one rescue section to Loki via the audit shipper. The
// org / workspace_id / kind ride in the record body (queryable via
// LogQL `| json`); event_type ("rescue.bundle") is the low-cardinality
// Loki label the shipper already promotes. `redacted` records whether
// the content passed through the secret-scan, so an operator can tell a
// shipped-but-redacted section from a collection-failure marker.
func ship(ctx context.Context, in Input, name, content string, redacted bool) {
audit.Emit(ctx, rescueEventType, map[string]any{
"kind": LokiKind,
"org": in.OrgID,
"workspace_id": in.WorkspaceID,
"instance_id": in.InstanceID,
"reason": in.Reason,
"section": name,
"redacted": redacted,
"content": content,
})
}
@@ -1,136 +0,0 @@
package rescue
// Part 3 coverage: Capture, after collecting + redacting every section,
// persists the bundle exactly once to the queryable store (in addition
// to the per-section Loki ship verified in rescue_test.go).
import (
"context"
"errors"
"strings"
"testing"
)
// withPersist swaps the injected PersistBundle for the test and restores
// it after.
func withPersist(t *testing.T, fn func(ctx context.Context, b Bundle) error) {
t.Helper()
prev := PersistBundle
PersistBundle = fn
t.Cleanup(func() { PersistBundle = prev })
}
// TestCapture_PersistsBundleOnce: the happy path persists one bundle
// carrying every section, with identity + redacted content matching what
// was shipped.
func TestCapture_PersistsBundleOnce(t *testing.T) {
_ = captureLoki(t) // keep Loki transport pointed at a temp file
withFakes(t,
func(_ context.Context, instanceID, cmd string) (string, error) {
return "OUT:" + instanceID, nil
},
func(_ws, c string) string { return "RED:" + c },
)
var persisted []Bundle
withPersist(t, func(_ context.Context, b Bundle) error {
persisted = append(persisted, b)
return nil
})
Capture(context.Background(), Input{
InstanceID: "i-abc",
WorkspaceID: "ws-1",
OrgID: "org-9",
Reason: "provision_timeout_sweep",
})
if len(persisted) != 1 {
t.Fatalf("PersistBundle called %d times, want exactly 1", len(persisted))
}
b := persisted[0]
if b.WorkspaceID != "ws-1" || b.OrgID != "org-9" || b.InstanceID != "i-abc" || b.Reason != "provision_timeout_sweep" {
t.Errorf("bundle identity wrong: %+v", b)
}
if len(b.Sections) != len(bundleSections) {
t.Fatalf("persisted %d sections, want %d", len(b.Sections), len(bundleSections))
}
for _, s := range b.Sections {
if !s.Redacted {
t.Errorf("section %q persisted with redacted=false on the happy path", s.Name)
}
// Redactor ("RED:" prefix) must have run on persisted content.
if !strings.HasPrefix(s.Content, "RED:") {
t.Errorf("section %q persisted un-redacted content: %q", s.Name, s.Content)
}
}
}
// TestCapture_PersistFailureDoesNotPanic: a store error is swallowed —
// Capture still completes (the Loki ship already succeeded).
func TestCapture_PersistFailureDoesNotPanic(t *testing.T) {
_ = captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) { return "ok", nil },
func(_ws, c string) string { return c },
)
withPersist(t, func(_ context.Context, _ Bundle) error {
return errors.New("db down")
})
// Must not panic / must return normally.
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"})
}
// TestCapture_NoPersistWiredIsSafe: with PersistBundle unwired (operator
// hasn't wired the read path), Capture still ships to Loki and does not
// panic.
func TestCapture_NoPersistWiredIsSafe(t *testing.T) {
readLoki := captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) { return "ok", nil },
func(_ws, c string) string { return c },
)
prev := PersistBundle
PersistBundle = nil
t.Cleanup(func() { PersistBundle = prev })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-3", OrgID: "o"})
// Loki ship still happened for every section.
if recs := readLoki(); len(recs) != len(bundleSections) {
t.Errorf("shipped %d records, want %d (Loki unaffected by missing store)", len(recs), len(bundleSections))
}
}
// TestCapture_FailureMarkerPersistedAsNonRedacted: a section whose
// collection fails is persisted with redacted=false + a marker, matching
// the Loki record.
func TestCapture_FailureMarkerPersistedAsNonRedacted(t *testing.T) {
_ = captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, cmd string) (string, error) {
if strings.Contains(cmd, "config.yaml") {
return "", errors.New("ssh blip")
}
return "ok", nil
},
func(_ws, c string) string { return c },
)
var got Bundle
withPersist(t, func(_ context.Context, b Bundle) error { got = b; return nil })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"})
var markers int
for _, s := range got.Sections {
if !s.Redacted {
markers++
if !strings.Contains(s.Content, "section collection failed") {
t.Errorf("non-redacted section %q content = %q, want a failure marker", s.Name, s.Content)
}
}
}
if markers != 1 {
t.Errorf("want exactly 1 failure marker persisted, got %d", markers)
}
}
@@ -1,226 +0,0 @@
package rescue
import (
"context"
"encoding/json"
"errors"
"os"
"path/filepath"
"strings"
"testing"
)
// withFakes swaps the injected RunRemote + Redact for the duration of a
// test and restores them after. Mirrors the provisioner test-fake
// pattern (package-var swap + t.Cleanup).
func withFakes(t *testing.T, run func(ctx context.Context, instanceID, cmd string) (string, error), redact func(ws, c string) string) {
t.Helper()
prevRun, prevRedact := RunRemote, Redact
RunRemote = run
Redact = redact
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
}
// captureLoki points the audit shipper at a temp JSONL file and returns
// a reader that decodes the records the rescue ship() loop wrote. This
// is the same transport the production rescue stream uses (audit.Emit →
// Loki via the tenant Vector source), so asserting on it proves the
// shipper-reuse + labels end to end.
func captureLoki(t *testing.T) func() []map[string]any {
t.Helper()
dir := t.TempDir()
path := filepath.Join(dir, "audit.jsonl")
t.Setenv("MOLECULE_AUDIT_LOG_PATH", path)
return func() []map[string]any {
b, err := os.ReadFile(path)
if err != nil {
return nil
}
var out []map[string]any
for _, line := range strings.Split(strings.TrimSpace(string(b)), "\n") {
if line == "" {
continue
}
var rec map[string]any
if err := json.Unmarshal([]byte(line), &rec); err != nil {
t.Fatalf("bad audit jsonl line %q: %v", line, err)
}
out = append(out, rec)
}
return out
}
}
func fields(rec map[string]any) map[string]any {
f, _ := rec["fields"].(map[string]any)
return f
}
// TestCapture_ShipsAllSectionsWithRescueLabels is the happy path: a
// boot-failure capture collects every fixed section, runs each through
// the redactor, and ships it to Loki under {kind="rescue", org, ws}.
func TestCapture_ShipsAllSectionsWithRescueLabels(t *testing.T) {
readLoki := captureLoki(t)
var seenCmds []string
withFakes(t,
func(_ context.Context, instanceID, cmd string) (string, error) {
seenCmds = append(seenCmds, cmd)
return "OUTPUT for " + instanceID, nil
},
func(_ws, c string) string { return c }, // identity redactor
)
Capture(context.Background(), Input{
InstanceID: "i-abc123",
WorkspaceID: "ws-1",
OrgID: "org-9",
Reason: "provision_timeout_sweep",
})
recs := readLoki()
if len(recs) != len(bundleSections) {
t.Fatalf("want %d shipped sections, got %d", len(bundleSections), len(recs))
}
if len(seenCmds) != len(bundleSections) {
t.Fatalf("want %d remote commands run, got %d", len(bundleSections), len(seenCmds))
}
for _, rec := range recs {
if rec["event_type"] != rescueEventType {
t.Errorf("event_type = %v, want %q", rec["event_type"], rescueEventType)
}
// workspace_id is promoted to the top-level record position by
// the audit shipper.
if rec["workspace_id"] != "ws-1" {
t.Errorf("top-level workspace_id = %v, want ws-1", rec["workspace_id"])
}
f := fields(rec)
if f["kind"] != LokiKind {
t.Errorf("kind = %v, want %q", f["kind"], LokiKind)
}
if f["org"] != "org-9" {
t.Errorf("org = %v, want org-9", f["org"])
}
if f["instance_id"] != "i-abc123" {
t.Errorf("instance_id = %v, want i-abc123", f["instance_id"])
}
if f["redacted"] != true {
t.Errorf("redacted = %v, want true for a collected section", f["redacted"])
}
}
}
// TestCapture_Redacts proves the bundle is scrubbed before it leaves the
// box: a remote section that contains a secret-shaped token must ship
// with the token replaced, never raw.
func TestCapture_Redacts(t *testing.T) {
readLoki := captureLoki(t)
const secret = "sk-ant-SUPERSECRETTOKENVALUE0001"
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) {
return "ANTHROPIC_API_KEY=" + secret, nil
},
// redactor that mangles anything containing the secret shape
func(_ws, c string) string {
if strings.Contains(c, secret) {
return strings.ReplaceAll(c, secret, "[REDACTED]")
}
return c
},
)
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"})
for _, rec := range readLoki() {
content, _ := fields(rec)["content"].(string)
if strings.Contains(content, secret) {
t.Fatalf("raw secret leaked to Loki in section %v: %q", fields(rec)["section"], content)
}
}
}
// TestCapture_SkipsWhenNoInstance: a failure with no provisioned EC2 has
// nothing to read — Capture must no-op (ship nothing) rather than dial a
// blank instance id.
func TestCapture_SkipsWhenNoInstance(t *testing.T) {
readLoki := captureLoki(t)
called := false
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) { called = true; return "", nil },
func(_ws, c string) string { return c },
)
Capture(context.Background(), Input{InstanceID: "", WorkspaceID: "ws-3", OrgID: "o"})
if called {
t.Error("RunRemote called for an empty instance id")
}
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records for an empty instance id, want 0", len(recs))
}
}
// TestCapture_FailsClosedWithoutRedactor: if the redactor is not wired,
// Capture must NOT ship anything (would leak raw config). Fail closed.
func TestCapture_FailsClosedWithoutRedactor(t *testing.T) {
readLoki := captureLoki(t)
prevRun, prevRedact := RunRemote, Redact
RunRemote = func(_ context.Context, _ string, _ string) (string, error) { return "raw config", nil }
Redact = nil
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"})
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records without a redactor wired, want 0 (fail closed)", len(recs))
}
}
// TestCapture_SectionFailureIsIsolated: one section's RunRemote error
// must not abort the rest — the failing section ships a marker and the
// others still ship.
func TestCapture_SectionFailureIsIsolated(t *testing.T) {
readLoki := captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, cmd string) (string, error) {
if strings.Contains(cmd, "config.yaml") {
return "", errors.New("ssh blip")
}
return "ok", nil
},
func(_ws, c string) string { return c },
)
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-5", OrgID: "o"})
recs := readLoki()
if len(recs) != len(bundleSections) {
t.Fatalf("want all %d sections shipped (incl. failure marker), got %d", len(bundleSections), len(recs))
}
var failureMarkers int
for _, rec := range recs {
if fields(rec)["redacted"] == false {
failureMarkers++
content, _ := fields(rec)["content"].(string)
if !strings.Contains(content, "section collection failed") {
t.Errorf("failure marker content = %q, want a collection-failed marker", content)
}
}
}
if failureMarkers != 1 {
t.Errorf("want exactly 1 failure marker, got %d", failureMarkers)
}
}
// TestCapture_NoWiringIsSafeNoOp: with RunRemote unwired (operator hasn't
// called the boot wiring), Capture must be a logged no-op, never a panic.
func TestCapture_NoWiringIsSafeNoOp(t *testing.T) {
readLoki := captureLoki(t)
prevRun, prevRedact := RunRemote, Redact
RunRemote = nil
Redact = func(_ws, c string) string { return c }
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-6", OrgID: "o"})
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records with RunRemote unwired, want 0", len(recs))
}
}
@@ -1,153 +0,0 @@
// Package rescuestore is the queryable persistence layer for rescue
// bundles (RFC internal#742 Part 3). It is the DB side of the read-path
// decision: because internal/audit (Part 2's ship transport) is
// Loki-only and tenants hold no obs read creds, the redacted bundle is
// ALSO written here on capture so GET /workspaces/:id/rescue can serve
// the latest one with a plain Postgres read.
//
// The package owns both the write (Persist, wired into
// rescue.PersistBundle at boot) and the read (GetLatest, used by the
// handler). It depends on internal/db and internal/rescue (for the
// Bundle/Section types); it is imported by handlers, never by the leaf
// internal/rescue or by registry — so no import cycle.
package rescuestore
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
)
// maxSectionBytes bounds a single persisted section's content so a
// pathological capture (e.g. a multi-megabyte container log) can't bloat
// the row or the read response. Capture already tails to ~200 lines per
// section, so this is a backstop, not the primary limit. Truncated
// content is suffixed with a marker so a reader knows it was clipped.
const maxSectionBytes = 64 * 1024 // 64 KiB per section
// truncationMarker is appended to any section clipped at maxSectionBytes.
const truncationMarker = "\n…(rescue: section truncated at 64KiB)"
// StoredBundle is a persisted bundle plus its capture timestamp (the DB
// assigns captured_at on write). The handler maps this to the read
// response shape.
type StoredBundle struct {
Bundle rescue.Bundle
CapturedAt time.Time
}
// Store is the read/write surface the handler and the capture wiring
// depend on. An interface so the handler test can fake it without a
// sqlmock; the production implementation is Postgres.
type Store interface {
// Persist writes one bundle row (captured_at = now()).
Persist(ctx context.Context, b rescue.Bundle) error
// GetLatest returns the most recent bundle for workspaceID. When
// orgID is non-empty the row must also match org_id (cross-org
// defense-in-depth behind TenantGuard). Returns (nil, nil) — NOT an
// error — when no bundle exists, so the handler can 404 cleanly.
GetLatest(ctx context.Context, workspaceID, orgID string) (*StoredBundle, error)
}
// Postgres is the production Store backed by the rescue_bundles table.
type Postgres struct{ db *sql.DB }
// NewPostgres builds a Postgres-backed store over the given handle.
func NewPostgres(db *sql.DB) *Postgres { return &Postgres{db: db} }
// Persist writes the bundle as one row. Sections are stored as JSONB.
// Each section's content is clamped to maxSectionBytes before write.
func (p *Postgres) Persist(ctx context.Context, b rescue.Bundle) error {
if p.db == nil {
return fmt.Errorf("rescuestore: nil db")
}
clamped := clampSections(b.Sections)
payload, err := json.Marshal(clamped)
if err != nil {
return fmt.Errorf("rescuestore: marshal sections: %w", err)
}
_, err = p.db.ExecContext(ctx,
`INSERT INTO rescue_bundles (workspace_id, org_id, instance_id, reason, sections)
VALUES ($1, $2, $3, $4, $5::jsonb)`,
b.WorkspaceID, b.OrgID, b.InstanceID, b.Reason, string(payload),
)
if err != nil {
return fmt.Errorf("rescuestore: insert: %w", err)
}
return nil
}
// GetLatest returns the newest bundle for workspaceID, org-scoped. The
// (workspace_id, captured_at DESC, id DESC) index serves this directly.
// sql.ErrNoRows maps to (nil, nil) so the handler 404s.
func (p *Postgres) GetLatest(ctx context.Context, workspaceID, orgID string) (*StoredBundle, error) {
if p.db == nil {
return nil, fmt.Errorf("rescuestore: nil db")
}
if orgID == "" {
return nil, fmt.Errorf("rescuestore: org_id required")
}
var (
instanceID string
reason string
capturedAt time.Time
sectionsRaw []byte
)
err := p.db.QueryRowContext(ctx,
`SELECT instance_id, reason, captured_at, sections
FROM rescue_bundles
WHERE workspace_id = $1
AND org_id = $2
ORDER BY captured_at DESC, id DESC
LIMIT 1`,
workspaceID, orgID,
).Scan(&instanceID, &reason, &capturedAt, &sectionsRaw)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("rescuestore: query latest: %w", err)
}
var sections []rescue.Section
if len(sectionsRaw) > 0 {
if err := json.Unmarshal(sectionsRaw, &sections); err != nil {
return nil, fmt.Errorf("rescuestore: unmarshal sections: %w", err)
}
}
return &StoredBundle{
Bundle: rescue.Bundle{
WorkspaceID: workspaceID,
OrgID: orgID,
InstanceID: instanceID,
Reason: reason,
Sections: sections,
},
CapturedAt: capturedAt,
}, nil
}
// clampSections returns a copy with each section's content clamped to
// maxSectionBytes. Clamps on a rune boundary so the marker doesn't split
// a multibyte sequence — the content is a forensic blob, never parsed.
func clampSections(in []rescue.Section) []rescue.Section {
out := make([]rescue.Section, len(in))
for i, s := range in {
if len(s.Content) > maxSectionBytes {
b := []byte(s.Content[:maxSectionBytes])
// Back off to a valid utf-8 boundary (at most 3 bytes).
for len(b) > 0 && b[len(b)-1]&0xC0 == 0x80 {
b = b[:len(b)-1]
}
s.Content = string(b) + truncationMarker
}
out[i] = s
}
return out
}
@@ -1,211 +0,0 @@
package rescuestore
// Sqlmock-backed coverage for the rescue_bundles store (RFC internal#742
// Part 3). Exercises Persist (incl. section clamp) + GetLatest (happy
// path, no-rows→nil, org-scoping, query error) without a real DB.
import (
"context"
"database/sql"
"encoding/json"
"errors"
"regexp"
"strings"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"github.com/DATA-DOG/go-sqlmock"
)
func newMock(t *testing.T) (*sql.DB, sqlmock.Sqlmock) {
t.Helper()
dbh, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
t.Cleanup(func() { _ = dbh.Close() })
return dbh, mock
}
func sampleBundle() rescue.Bundle {
return rescue.Bundle{
WorkspaceID: "ws-1",
OrgID: "org-9",
InstanceID: "i-abc",
Reason: "bootstrap_watcher",
Sections: []rescue.Section{
{Name: "config.yaml", Content: "model: gpt-4", Redacted: true},
{Name: "docker-ps", Content: "(no agent container)", Redacted: false},
},
}
}
// TestPersist_InsertsRow asserts Persist issues one INSERT with the
// bundle fields and a JSON sections payload.
func TestPersist_InsertsRow(t *testing.T) {
dbh, mock := newMock(t)
b := sampleBundle()
mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO rescue_bundles`)).
WithArgs("ws-1", "org-9", "i-abc", "bootstrap_watcher", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
if err := NewPostgres(dbh).Persist(context.Background(), b); err != nil {
t.Fatalf("Persist: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestClampSections: a section over maxSectionBytes is truncated +
// marker-suffixed; a small section is untouched.
func TestClampSections(t *testing.T) {
huge := strings.Repeat("x", maxSectionBytes+5000)
in := []rescue.Section{
{Name: "container.logs", Content: huge, Redacted: true},
{Name: "small", Content: "ok", Redacted: true},
}
out := clampSections(in)
if len(out[0].Content) > maxSectionBytes+len(truncationMarker) {
t.Errorf("clamped content len = %d, want <= %d", len(out[0].Content), maxSectionBytes+len(truncationMarker))
}
if !strings.HasSuffix(out[0].Content, truncationMarker) {
t.Error("clamped section missing truncation marker suffix")
}
if out[1].Content != "ok" {
t.Errorf("small section was modified: %q", out[1].Content)
}
}
// TestPersist_WritesClampedPayload: Persist marshals the clamped
// sections into the JSONB arg (the INSERT carries the truncation marker).
func TestPersist_WritesClampedPayload(t *testing.T) {
dbh, mock := newMock(t)
huge := strings.Repeat("x", maxSectionBytes+5000)
b := rescue.Bundle{
WorkspaceID: "ws-1",
Sections: []rescue.Section{{Name: "container.logs", Content: huge, Redacted: true}},
}
want, _ := json.Marshal(clampSections(b.Sections))
mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO rescue_bundles`)).
WithArgs("ws-1", "", "", "", string(want)).
WillReturnResult(sqlmock.NewResult(1, 1))
if err := NewPostgres(dbh).Persist(context.Background(), b); err != nil {
t.Fatalf("Persist: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet: %v", err)
}
}
// TestGetLatest_ReturnsBundle: a found row decodes back into the bundle.
func TestGetLatest_ReturnsBundle(t *testing.T) {
dbh, mock := newMock(t)
ts := time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC)
secs, _ := json.Marshal([]rescue.Section{
{Name: "config.yaml", Content: "redacted", Redacted: true},
})
mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)).
WithArgs("ws-1", "org-9").
WillReturnRows(sqlmock.NewRows([]string{"instance_id", "reason", "captured_at", "sections"}).
AddRow("i-abc", "bootstrap_watcher", ts, secs))
got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-9")
if err != nil {
t.Fatalf("GetLatest: %v", err)
}
if got == nil {
t.Fatal("got nil, want a bundle")
}
if !got.CapturedAt.Equal(ts) {
t.Errorf("captured_at = %v, want %v", got.CapturedAt, ts)
}
if got.Bundle.InstanceID != "i-abc" || got.Bundle.Reason != "bootstrap_watcher" {
t.Errorf("bundle meta wrong: %+v", got.Bundle)
}
if len(got.Bundle.Sections) != 1 || got.Bundle.Sections[0].Name != "config.yaml" {
t.Errorf("sections decoded wrong: %+v", got.Bundle.Sections)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet: %v", err)
}
}
// TestGetLatest_NoRowsReturnsNil: no bundle → (nil, nil), so the handler
// can 404 without treating it as an error.
func TestGetLatest_NoRowsReturnsNil(t *testing.T) {
dbh, mock := newMock(t)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)).
WithArgs("ws-none", "org-9").
WillReturnError(sql.ErrNoRows)
got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-none", "org-9")
if err != nil {
t.Fatalf("GetLatest err = %v, want nil for no-rows", err)
}
if got != nil {
t.Fatalf("got %+v, want nil for no-rows", got)
}
}
// TestGetLatest_OrgScopingArg: the org id is passed as the $2 filter arg
// with strict equality, so a row in a sibling org is excluded by the query
// itself. A mismatched org → no row → nil (same as no-rows).
func TestGetLatest_OrgScopingArg(t *testing.T) {
dbh, mock := newMock(t)
// Tenant org-B asks for ws-1 (owned by org-9). The strict predicate
// filters it out → ErrNoRows → nil.
mock.ExpectQuery(regexp.QuoteMeta(`AND org_id = $2`)).
WithArgs("ws-1", "org-B").
WillReturnError(sql.ErrNoRows)
got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-B")
if err != nil {
t.Fatalf("GetLatest: %v", err)
}
if got != nil {
t.Fatal("sibling-org read returned a bundle; want nil")
}
}
// TestGetLatest_EmptyOrgIDRejected: an empty orgID must fail closed with
// an error rather than disabling the org filter (#2020).
func TestGetLatest_EmptyOrgIDRejected(t *testing.T) {
dbh, _ := newMock(t)
_, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "")
if err == nil {
t.Fatal("GetLatest(empty orgID) should error")
}
}
// TestGetLatest_QueryErrorPropagates: a real DB error (not ErrNoRows)
// surfaces as an error so the handler returns 503, not a false 404.
func TestGetLatest_QueryErrorPropagates(t *testing.T) {
dbh, mock := newMock(t)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)).
WithArgs("ws-1", "org-9").
WillReturnError(errors.New("connection reset"))
_, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-9")
if err == nil {
t.Fatal("want an error for a non-ErrNoRows DB failure")
}
}
// TestNilDB: both methods return an error (never panic) when the db
// handle is nil — the degraded-boot guard the wiring relies on.
func TestNilDB(t *testing.T) {
p := NewPostgres(nil)
if err := p.Persist(context.Background(), sampleBundle()); err == nil {
t.Error("Persist(nil db) should error")
}
if _, err := p.GetLatest(context.Background(), "ws-1", "org-9"); err == nil {
t.Error("GetLatest(nil db) should error")
}
}
@@ -703,14 +703,6 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAuth.PUT("/files/*path", tmplh.WriteFile)
wsAuth.DELETE("/files/*path", tmplh.DeleteFile)
// Rescue read (RFC internal#742 Part 3) — latest post-mortem bundle
// for a boot-failed/terminated workspace, so "why won't my agent
// boot" is answerable without a live instance. Same WorkspaceAuth
// gate as /files/*; the handler org-scopes the store read by
// MOLECULE_ORG_ID so a sibling org cannot read another org's bundle.
rescueReadH := handlers.NewRescueReadHandler()
wsAuth.GET("/rescue", rescueReadH.GetRescue)
// Chat attachments — file upload (user → agent) and binary-safe
// streaming download (agent → user). Namespaced under /chat/ so
// the security model is obviously distinct from /files/* (which
@@ -1,4 +0,0 @@
-- Reverse RFC internal#742 Part 3 rescue_bundles table.
-- Forensic-only table; dropping it loses post-mortem read history but
-- does not affect boot-failure semantics (capture still ships to Loki).
DROP TABLE IF EXISTS rescue_bundles;
@@ -1,59 +0,0 @@
-- 20260531000000_rescue_bundles.up.sql — RFC internal#742 Part 3.
--
-- A queryable, post-mortem-inspectable copy of the rescue bundle that
-- Part 2 (internal/rescue.Capture) collects off a boot-failed workspace
-- EC2 before the control plane reaps it.
--
-- WHY a DB table (the Part 3 read-path decision):
-- Part 2 ships the bundle via internal/audit (audit.Emit), which is
-- stdout→Vector→Loki + a best-effort local JSONL on the tenant
-- container's EPHEMERAL rootfs — NOT a queryable store. Serving
-- GET /workspaces/:id/rescue from Loki would require giving the
-- tenant process a Loki *query* client + obs read creds, which it
-- deliberately does not have (and must not — RFC internal#742 keeps
-- obs read creds out of tenants). So Part 3 ALSO persists the
-- already-redacted bundle to this small per-tenant table on capture,
-- and the read endpoint serves the latest row. The Loki stream
-- remains the cross-tenant operator firehose; this table is the
-- tenant-local, org-scoped read surface that powers the future
-- canvas "Why did this fail?" panel.
--
-- REDACTION: the `sections` payload written here is the SAME content
-- the Loki ship loop emits — i.e. already run through the SAFE-T1201
-- secret-scan (handlers.redactSecrets) at capture time. This table
-- never holds raw tokens; the read endpoint returns the stored content
-- verbatim without re-redacting.
--
-- ORG SCOPING: org_id is denormalized onto the row so the read handler
-- can filter by (workspace_id, org_id) and a row whose org doesn't
-- match the tenant's MOLECULE_ORG_ID is never returned — defense in
-- depth behind TenantGuard (which already 404s cross-org requests at
-- the routing layer).
--
-- RETENTION: bounded by RescueVolumeGrace semantics on the capture
-- side; rows are small (a redacted forensic blob, capped at capture).
-- A future sweeper can prune rows past the grace window — out of scope
-- for Part 3; the table is append-only here.
CREATE TABLE IF NOT EXISTS rescue_bundles (
id BIGSERIAL PRIMARY KEY,
workspace_id TEXT NOT NULL,
org_id TEXT NOT NULL DEFAULT '',
instance_id TEXT NOT NULL DEFAULT '',
reason TEXT NOT NULL DEFAULT '',
captured_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- sections is the ordered, already-redacted bundle:
-- [{ "name": "config.yaml", "content": "...", "redacted": true }, ...]
-- Stored as JSONB so the read handler returns it as a structured map
-- and a future query can index into a single section if needed.
sections JSONB NOT NULL DEFAULT '[]'::jsonb
);
-- Read hot path: "latest bundle for this workspace" — the only query
-- the GET /workspaces/:id/rescue endpoint runs.
-- SELECT ... WHERE workspace_id = $1 [AND org_id = $2]
-- ORDER BY captured_at DESC, id DESC LIMIT 1
-- Partial-free composite index; (workspace_id, captured_at DESC) covers
-- the filter + ordering. id DESC tiebreaks same-timestamp captures.
CREATE INDEX IF NOT EXISTS idx_rescue_bundles_ws_captured
ON rescue_bundles (workspace_id, captured_at DESC, id DESC);