Compare commits

..

6 Commits

Author SHA1 Message Date
Molecule AI Dev Engineer A (Kimi) 4f57115fa8 fix(ci-drift): add REQUIRED_CHECKS_JSON variant support (internal#804)
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Python Lint & Test (pull_request) Successful in 2s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Failing after 2s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 4s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 5s
E2E Chat / detect-changes (pull_request) Successful in 9s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 3s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 10s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 3s
Harness Replays / detect-changes (pull_request) Successful in 12s
E2E API Smoke Test / detect-changes (pull_request) Successful in 14s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 13s
CI / Detect changes (pull_request) Successful in 15s
security-review / approved (pull_request_target) Failing after 12s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 15s
qa-review / approved (pull_request_target) Failing after 14s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m0s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m18s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 1m53s
CI / Platform (Go) (pull_request) Successful in 4m1s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m2s
CI / all-required (pull_request) Successful in 1s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 58s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m13s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m11s
E2E Chat / E2E Chat (pull_request) Successful in 2s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 4s
CI / Canvas (Next.js) (pull_request) Successful in 2s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 1s
Harness Replays / Harness Replays (pull_request) Successful in 3s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m50s
sop-checklist / review-refire (pull_request_target) Has been skipped
gate-check-v3 / gate-check (pull_request_target) Successful in 4s
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request_target) Successful in 5s
sop-tier-check / tier-check (pull_request_target) Successful in 7s
The ci-required-drift parser only looked for REQUIRED_CHECKS while
audit-force-merge.yml switched to REQUIRED_CHECKS_JSON (branch-aware
dict). This caused F3 drift detection to fail on repos using the JSON
variant.

Changes:
- required_checks_env() now detects both REQUIRED_CHECKS_JSON (preferred)
  and REQUIRED_CHECKS (legacy fallback).
- For JSON variant: parse the dict, extract the array for the target
  branch, validate structure, return as a set of context names.
- For legacy variant: unchanged newline-split behavior.
- Error messages updated to mention both env vars.
- render_body() resolution text updated to mention both variants.
- Tests added for JSON precedence, fallback, missing branch, malformed
  JSON, and full drift-class coverage (F3a/F3b/happy-path).

Closes internal#804

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 21:32:28 +00:00
Molecule AI Dev Engineer A (Kimi) 8547a7d845 fix(integration): avoid invalid-UTF-8 insert into workspace_schedules.prompt
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2m38s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m37s
CI / Platform (Go) (pull_request) Successful in 6m19s
CI / all-required (pull_request) Successful in 7s
qa-review / approved (pull_request_target) Review check failed via pull_request_review_approved trigger
security-review / approved (pull_request_target) Review check failed via pull_request_review_approved trigger
qa-review / approved (pull_request_review_approved) Failing after 3s
sop-tier-check / tier-check (pull_request_review) Successful in 3s
security-review / approved (pull_request_review_approved) Failing after 3s
CI / Python Lint & Test (pull_request) Successful in 3s
CI / Detect changes (pull_request) Successful in 6s
E2E API Smoke Test / detect-changes (pull_request) Successful in 6s
Harness Replays / detect-changes (pull_request) Successful in 4s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Failing after 1s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 8s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 6s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 3s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 3s
E2E Chat / detect-changes (pull_request) Successful in 9s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 4s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 8s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 9s
sop-checklist / all-items-acked (pull_request_target) Has been cancelled
sop-checklist / review-refire (pull_request_target) Has been cancelled
gate-check-v3 / gate-check (pull_request_target) Successful in 7s
sop-tier-check / tier-check (pull_request_target) Successful in 4s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Canvas (Next.js) (pull_request) Successful in 2s
Harness Replays / Harness Replays (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 4s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 55s
E2E Chat / E2E Chat (pull_request) Successful in 2s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m6s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m3s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m12s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m11s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m14s
Postgres TEXT columns in a UTF-8 database reject raw bytes like 0x80 and
0xff. The test was trying to insert these into workspace_schedules.prompt
via insertSchedule, which failed with:

  pq: invalid byte sequence for encoding "UTF8": 0x80

Fix: insert a valid prompt into the DB fixture, then call fireSchedule
directly with a scheduleRow whose Prompt field carries the invalid bytes.
This still exercises the #2026 regression path (sanitizeUTF8 before jsonb
INSERT) without tripping Postgres TEXT validation.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 20:50:16 +00:00
Molecule AI Dev Engineer A (Kimi) b1178c968d test(scheduler): fix fixture enum drift — 'active' → 'online' (internal#795)
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 3s
CI / Python Lint & Test (pull_request) Successful in 3s
Harness Replays / detect-changes (pull_request) Successful in 4s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Failing after 2s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 5s
CI / Detect changes (pull_request) Successful in 9s
E2E API Smoke Test / detect-changes (pull_request) Successful in 9s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 5s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
Harness Replays / Harness Replays (pull_request) Successful in 2s
gate-check-v3 / gate-check (pull_request_target) Successful in 5s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 19s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 23s
E2E Chat / detect-changes (pull_request) Successful in 23s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 23s
sop-checklist / all-items-acked (pull_request_target) Has been cancelled
sop-checklist / review-refire (pull_request_target) Has been cancelled
CI / Shellcheck (E2E scripts) (pull_request) Successful in 23s
qa-review / approved (pull_request_target) Failing after 29s
CI / Canvas (Next.js) (pull_request) Successful in 31s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 57s
E2E Chat / E2E Chat (pull_request) Successful in 1s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 1s
security-review / approved (pull_request_target) Failing after 26s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
sop-tier-check / tier-check (pull_request_target) Successful in 19s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m4s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m17s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m14s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m22s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m40s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m47s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Failing after 1m43s
CI / Platform (Go) (pull_request) Successful in 3m55s
CI / all-required (pull_request) Successful in 1s
The workspace_status enum migrated away from 'active' in migration
043_workspace_status_enum.up.sql; valid values are provisioning/online/
offline/degraded/failed/removed/paused/hibernated/awaiting_agent/
hibernating. Inserting 'active' caused all five scheduler integration
tests to fail at fixture setup with:

  invalid input value for enum workspace_status: "active"

Fix: use 'online' (a valid enum member) for runnable fixture workspaces.
Also updates the helper comment to cite enum validity.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 20:21:31 +00:00
molecule-code-reviewer 5e4577cfe7 ci(handlers-pg): run scheduler real-PG integration tests (#2149)
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Failing after 1s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 3s
CI / Python Lint & Test (pull_request) Successful in 2s
CI / Detect changes (pull_request) Successful in 6s
Harness Replays / detect-changes (pull_request) Successful in 4s
E2E Chat / detect-changes (pull_request) Successful in 8s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 8s
E2E API Smoke Test / detect-changes (pull_request) Successful in 11s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 3s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 3s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 8s
sop-tier-check / tier-check (pull_request_target) Successful in 4s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 12s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 3s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 4s
qa-review / approved (pull_request_target) Failing after 5s
security-review / approved (pull_request_target) Failing after 4s
gate-check-v3 / gate-check (pull_request_target) Successful in 4s
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: comprehensive-testing, local-postgres-e2
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / review-refire (pull_request_target) Has been skipped
sop-checklist / all-items-acked (pull_request_target) Successful in 4s
CI / Canvas (Next.js) (pull_request) Successful in 1s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 1s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 3s
Harness Replays / Harness Replays (pull_request) Successful in 2s
E2E Chat / E2E Chat (pull_request) Successful in 12s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 51s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Failing after 1m0s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 58s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 53s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m39s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m46s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m47s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Failing after 2m1s
CI / Platform (Go) (pull_request) Successful in 4m13s
CI / all-required (pull_request) Successful in 4s
2026-06-03 00:39:42 +00:00
molecule-code-reviewer 7d1cf6cc35 ci(detect-changes): trigger handlers-postgres profile on scheduler pkg (#2149) 2026-06-03 00:39:41 +00:00
molecule-code-reviewer 25f601f8f8 test(scheduler): real-PG regression tests for cron firing loop (#2149)
Closes #2149
2026-06-03 00:39:31 +00:00
38 changed files with 946 additions and 2482 deletions
+78 -29
View File
@@ -8,7 +8,8 @@ pair diverges.
Sources:
A. `.gitea/workflows/ci.yml` jobs (CI source — the actual job set)
B. `status_check_contexts` in branch_protections (the merge gate)
C. `REQUIRED_CHECKS` env in audit-force-merge.yml (the audit env)
C. `REQUIRED_CHECKS_JSON` (preferred) or `REQUIRED_CHECKS` (legacy)
env in audit-force-merge.yml (the audit env)
Three failure classes:
F1 Job in (A) is not under the sentinel's `needs:` — sentinel
@@ -250,13 +251,21 @@ def sentinel_needs(ci_doc: dict) -> set[str]:
return set(needs)
def required_checks_env(audit_doc: dict) -> set[str]:
"""Pull the REQUIRED_CHECKS env value from audit-force-merge.yml.
def required_checks_env(audit_doc: dict, branch: str) -> set[str]:
"""Pull the required-checks env value from audit-force-merge.yml.
Walks the YAML AST per `feedback_behavior_based_ast_gates`: we do
NOT grep for `REQUIRED_CHECKS:` — that breaks under reformatting,
NOT grep for env keys — that breaks under reformatting,
multi-job workflows, or a future move of the env to a different
step. Instead, look inside every job's every step's `env:` map."""
found: list[str] = []
step. Instead, look inside every job's every step's `env:` map.
Supports two variants:
- REQUIRED_CHECKS_JSON (preferred): JSON dict keyed by branch name.
We extract the array for the target branch.
- REQUIRED_CHECKS (legacy): newline-separated list of context names.
"""
found_json: list[str] = []
found_legacy: list[str] = []
jobs = audit_doc.get("jobs", {})
if not isinstance(jobs, dict):
sys.stderr.write(f"::warning::{AUDIT_WORKFLOW_PATH} has no jobs: mapping\n")
@@ -268,27 +277,67 @@ def required_checks_env(audit_doc: dict) -> set[str]:
if not isinstance(step, dict):
continue
step_env = step.get("env") or {}
if isinstance(step_env, dict) and "REQUIRED_CHECKS" in step_env:
v = step_env["REQUIRED_CHECKS"]
if isinstance(v, str):
found.append(v)
if not found:
sys.stderr.write(
f"::error::REQUIRED_CHECKS env not found in any step of "
f"{AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
if len(found) > 1:
# Defensive: refuse to guess which one is canonical.
sys.stderr.write(
f"::error::REQUIRED_CHECKS env present in {len(found)} steps; ambiguous\n"
)
sys.exit(3)
raw = found[0]
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
# consistently with audit-force-merge.sh's parser so both sides
# produce identical sets.
return {line.strip() for line in raw.splitlines() if line.strip()}
if isinstance(step_env, dict):
if "REQUIRED_CHECKS_JSON" in step_env:
v = step_env["REQUIRED_CHECKS_JSON"]
if isinstance(v, str):
found_json.append(v)
if "REQUIRED_CHECKS" in step_env:
v = step_env["REQUIRED_CHECKS"]
if isinstance(v, str):
found_legacy.append(v)
# JSON variant takes precedence.
if found_json:
if len(found_json) > 1:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON env present in {len(found_json)} steps; ambiguous\n"
)
sys.exit(3)
try:
parsed = json.loads(found_json[0])
except json.JSONDecodeError as e:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON is not valid JSON: {e}\n"
)
sys.exit(3)
if not isinstance(parsed, dict):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON parsed to {type(parsed).__name__}, expected dict\n"
)
sys.exit(3)
branch_checks = parsed.get(branch)
if branch_checks is None:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON has no entry for branch '{branch}'\n"
)
sys.exit(3)
if not isinstance(branch_checks, list):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n"
)
sys.exit(3)
return {str(item).strip() for item in branch_checks if str(item).strip()}
# Legacy variant fallback.
if found_legacy:
if len(found_legacy) > 1:
# Defensive: refuse to guess which one is canonical.
sys.stderr.write(
f"::error::REQUIRED_CHECKS env present in {len(found_legacy)} steps; ambiguous\n"
)
sys.exit(3)
raw = found_legacy[0]
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
# consistently with audit-force-merge.sh's parser so both sides
# produce identical sets.
return {line.strip() for line in raw.splitlines() if line.strip()}
sys.stderr.write(
f"::error::Neither REQUIRED_CHECKS_JSON nor REQUIRED_CHECKS env found in any step of "
f"{AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
# --------------------------------------------------------------------------
@@ -330,7 +379,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
jobs = ci_job_names(ci_doc)
jobs_all = ci_jobs_all(ci_doc)
needs = sentinel_needs(ci_doc)
env_set = required_checks_env(audit_doc)
env_set = required_checks_env(audit_doc, branch)
# Protection
# api() raises ApiError on non-2xx. Transient 5xx should fail loud.
@@ -524,7 +573,7 @@ def render_body(branch: str, findings: list[str], debug: dict) -> str:
"- **F2**: rename the protection context to match an emitter, "
"or remove it from `status_check_contexts` "
"(PATCH `/api/v1/repos/{owner}/{repo}/branch_protections/{branch}`).",
"- **F3a / F3b**: bring `REQUIRED_CHECKS` env in "
"- **F3a / F3b**: bring `REQUIRED_CHECKS_JSON` (or `REQUIRED_CHECKS` legacy) env in "
"`.gitea/workflows/audit-force-merge.yml` into set-equality with "
"`status_check_contexts` (single PR, both files).",
"",
+5
View File
@@ -26,6 +26,10 @@ PROFILES: dict[str, dict[str, str]] = {
"handlers": (
r"^workspace-server/internal/handlers/"
r"|^workspace-server/internal/wsauth/"
# #2149: the scheduler real-PG integration tests run in this same
# workflow (they reuse its migrated Postgres), so changes to the
# scheduler package must trigger the job too.
r"|^workspace-server/internal/scheduler/"
r"|^workspace-server/migrations/"
r"|^\.gitea/workflows/handlers-postgres-integration\.yml$"
),
@@ -174,3 +178,4 @@ def main(argv: list[str]) -> int:
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
+4 -10
View File
@@ -13,26 +13,20 @@ set -euo pipefail
OWNER="${REPO%%/*}"
NAME="${REPO##*/}"
API="https://${GITEA_HOST}/api/v1"
# Branch-protection requires the (pull_request_target) context variant.
# The refire path must post the EXACT BP-required name so the gate flips.
CONTEXT="${TEAM}-review / approved (pull_request_target)"
CONTEXT="${TEAM}-review / approved (pull_request)"
TARGET_URL="https://${GITEA_HOST}/${OWNER}/${NAME}/pulls/${PR_NUMBER}"
authfile=$(mktemp)
post_authfile=$(mktemp)
prfile=$(mktemp)
postfile=$(mktemp)
# shellcheck disable=SC2329 # invoked by EXIT trap
cleanup() {
rm -f "$authfile" "$post_authfile" "$prfile" "$postfile"
rm -f "$authfile" "$prfile" "$postfile"
}
trap cleanup EXIT
chmod 600 "$authfile" "$post_authfile"
chmod 600 "$authfile"
printf 'header = "Authorization: token %s"\n' "$GITEA_TOKEN" > "$authfile"
# STATUS_POST_TOKEN is narrow-scoped write:repository for explicit status POST.
# Falls back to GITEA_TOKEN for backward compatibility (e.g. local test).
printf 'header = "Authorization: token %s"\n' "${STATUS_POST_TOKEN:-$GITEA_TOKEN}" > "$post_authfile"
code=$(curl -sS -o "$prfile" -w '%{http_code}' -K "$authfile" \
"${API}/repos/${OWNER}/${NAME}/pulls/${PR_NUMBER}")
@@ -74,7 +68,7 @@ body=$(jq -nc \
'{state:$state, context:$context, description:$description, target_url:$target_url}')
code=$(curl -sS -o "$postfile" -w '%{http_code}' -X POST \
-K "$post_authfile" -H "Content-Type: application/json" \
-K "$authfile" -H "Content-Type: application/json" \
-d "$body" \
"${API}/repos/${OWNER}/${NAME}/statuses/${head_sha}")
if [ "$code" != "200" ] && [ "$code" != "201" ]; then
@@ -1,4 +1,5 @@
import importlib.util
import json
import sys
from pathlib import Path
from unittest.mock import patch
@@ -36,6 +37,76 @@ def _make_audit_doc(required_checks: list[str]) -> dict:
}
def _make_audit_doc_json(required_checks_json: dict) -> dict:
return {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS_JSON": json.dumps(required_checks_json)}}
]
}
}
}
# ---------------------------------------------------------------------------
# required_checks_env — dual-variant parsing
# ---------------------------------------------------------------------------
def test_required_checks_env_prefers_json_over_legacy():
doc = {
"jobs": {
"audit": {
"steps": [
{
"env": {
"REQUIRED_CHECKS_JSON": json.dumps(
{"main": ["ctx-a"], "staging": ["ctx-b"]}
),
"REQUIRED_CHECKS": "ctx-legacy\nctx-old",
}
}
]
}
}
}
assert drift.required_checks_env(doc, "main") == {"ctx-a"}
assert drift.required_checks_env(doc, "staging") == {"ctx-b"}
def test_required_checks_env_falls_back_to_legacy():
doc = _make_audit_doc(["legacy-ctx"])
assert drift.required_checks_env(doc, "main") == {"legacy-ctx"}
def test_required_checks_env_json_missing_branch_fails():
doc = _make_audit_doc_json({"staging": ["ctx-b"]})
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
def test_required_checks_env_json_malformed_fails():
doc = {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS_JSON": "not-json"}}
]
}
}
}
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
# ---------------------------------------------------------------------------
# sentinel_needs
# ---------------------------------------------------------------------------
@@ -1,168 +0,0 @@
"""Regression test #765 — gate auto-fire on real qa/security APPROVED review.
Validates the structural configuration of qa-review.yml and security-review.yml
so that a real team-member APPROVED review fires the workflow and POSTs the
exact branch-protection-required context name. This is the test #2020's
stale-context failure would have caught.
"""
from pathlib import Path
import yaml
ROOT = Path(__file__).resolve().parents[2]
def load_workflow(name: str) -> dict:
with (ROOT / "workflows" / name).open() as f:
return yaml.safe_load(f)
def _job_guard_string(workflow: dict) -> str:
"""Return the raw job-level `if:` string for the single job."""
jobs = workflow["jobs"]
# Both qa-review and security-review have exactly one job named "approved".
job = jobs["approved"]
return str(job.get("if", ""))
def _post_step(workflow: dict) -> dict:
"""Return the explicit POST /statuses step from the job steps list."""
jobs = workflow["jobs"]
steps = jobs["approved"]["steps"]
for step in steps:
name = step.get("name", "")
if "Post required status context" in name:
return step
raise AssertionError("No explicit POST status step found")
class TestQaReviewDirectTrigger:
def test_trigger_is_pull_request_review_submitted(self):
wf = load_workflow("qa-review.yml")
# PyYAML parses bare 'on' as boolean True.
on = wf[True]
assert "pull_request_review" in on, (
"qa-review must trigger on pull_request_review"
)
types = on["pull_request_review"].get("types", [])
assert "submitted" in types, (
"pull_request_review must include 'submitted' type"
)
def test_job_guard_requires_approved_state(self):
wf = load_workflow("qa-review.yml")
guard = _job_guard_string(wf)
assert "github.event.review.state == 'APPROVED'" in guard, (
"job guard must check review.state for 'APPROVED'"
)
assert "github.event.review.state == 'approved'" in guard, (
"job guard must check review.state for 'approved' (case fallback per #2135)"
)
def test_post_step_uses_status_post_token(self):
wf = load_workflow("qa-review.yml")
post = _post_step(wf)
env = post.get("env", {})
assert env.get("GITEA_TOKEN") == "${{ secrets.STATUS_POST_TOKEN }}", (
"POST step must use STATUS_POST_TOKEN for write-scoped status POST"
)
def test_post_step_context_name_exact(self):
"""The context POSTed must byte-match the branch-protection requirement."""
wf = load_workflow("qa-review.yml")
post = _post_step(wf)
run = post.get("run", "")
assert '"qa-review / approved (pull_request_target)"' in run, (
"POST step must emit exact BP-required context name"
)
class TestSecurityReviewDirectTrigger:
def test_trigger_is_pull_request_review_submitted(self):
wf = load_workflow("security-review.yml")
# PyYAML parses bare 'on' as boolean True.
on = wf[True]
assert "pull_request_review" in on, (
"security-review must trigger on pull_request_review"
)
types = on["pull_request_review"].get("types", [])
assert "submitted" in types, (
"pull_request_review must include 'submitted' type"
)
def test_job_guard_requires_approved_state(self):
wf = load_workflow("security-review.yml")
guard = _job_guard_string(wf)
assert "github.event.review.state == 'APPROVED'" in guard, (
"job guard must check review.state for 'APPROVED'"
)
assert "github.event.review.state == 'approved'" in guard, (
"job guard must check review.state for 'approved' (case fallback per #2135)"
)
def test_post_step_uses_status_post_token(self):
wf = load_workflow("security-review.yml")
post = _post_step(wf)
env = post.get("env", {})
assert env.get("GITEA_TOKEN") == "${{ secrets.STATUS_POST_TOKEN }}", (
"POST step must use STATUS_POST_TOKEN for write-scoped status POST"
)
def test_post_step_context_name_exact(self):
"""The context POSTed must byte-match the branch-protection requirement."""
wf = load_workflow("security-review.yml")
post = _post_step(wf)
run = post.get("run", "")
assert '"security-review / approved (pull_request_target)"' in run, (
"POST step must emit exact BP-required context name"
)
class TestRefireScriptContextName:
"""review-refire-status.sh must emit the BP-required (pull_request_target) context."""
def test_refire_script_context_is_pull_request_target(self):
script = ROOT / "scripts" / "review-refire-status.sh"
content = script.read_text()
assert 'CONTEXT="${TEAM}-review / approved (pull_request_target)"' in content, (
"refire script CONTEXT must be the exact BP-required (pull_request_target) variant"
)
assert 'approved (pull_request)"' not in content, (
"refire script must NOT post bare (pull_request) context"
)
class TestRefireTokenSeparation:
"""The /qa-recheck + /security-recheck backstop must also use STATUS_POST_TOKEN."""
def _refire_step(self, workflow_name: str, step_name_keyword: str) -> dict:
wf = load_workflow(workflow_name)
jobs = wf["jobs"]
steps = jobs["review-refire"]["steps"]
for step in steps:
name = step.get("name", "")
if step_name_keyword in name:
return step
raise AssertionError(f"No refire step matching {step_name_keyword!r}")
def test_qa_refire_uses_status_post_token(self):
step = self._refire_step("sop-checklist.yml", "Refire qa-review")
env = step.get("env", {})
assert env.get("STATUS_POST_TOKEN") == "${{ secrets.STATUS_POST_TOKEN }}", (
"qa refire must receive STATUS_POST_TOKEN env var"
)
# Evaluator stays on read token
assert "SOP_TIER_CHECK_TOKEN" in env.get("GITEA_TOKEN", "") or "GITHUB_TOKEN" in env.get("GITEA_TOKEN", ""), (
"qa refire evaluator must stay on read-scoped token"
)
def test_security_refire_uses_status_post_token(self):
step = self._refire_step("sop-checklist.yml", "Refire security-review")
env = step.get("env", {})
assert env.get("STATUS_POST_TOKEN") == "${{ secrets.STATUS_POST_TOKEN }}", (
"security refire must receive STATUS_POST_TOKEN env var"
)
assert "SOP_TIER_CHECK_TOKEN" in env.get("GITEA_TOKEN", "") or "GITHUB_TOKEN" in env.get("GITEA_TOKEN", ""), (
"security refire evaluator must stay on read-scoped token"
)
@@ -241,7 +241,8 @@ jobs:
# MUST exist for the integration tests to be meaningful. Hard-
# fail if any didn't land — that would be a real regression we
# want loud.
for tbl in delegations workspaces activity_logs pending_uploads; do
# workspace_schedules added for the #2149 scheduler integration tests.
for tbl in delegations workspaces activity_logs pending_uploads workspace_schedules; do
if ! psql -h "${PG_HOST}" -U postgres -d molecule -tA \
-c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \
| grep -q 1; then
@@ -259,6 +260,16 @@ jobs:
# workflow runs don't fight over a host-net 5432 port.
go test -tags=integration -timeout 5m -v ./internal/handlers/ -run "^TestIntegration_"
- if: needs.detect-changes.outputs.handlers == 'true'
name: Run scheduler integration tests (#2149)
run: |
# #2149: real-PG regression coverage for the scheduler firing loop
# (tick → A2A fire → write-back of last_run_at/next_run_at/run_count/
# activity_logs jsonb incl. invalid-UTF-8 sanitization + sweepPhantomBusy).
# Reuses the same migrated Postgres (workspace_schedules / activity_logs
# / workspaces all landed by the migration replay step above).
go test -tags=integration -timeout 5m -v ./internal/scheduler/ -run "^TestIntegration_"
- if: failure() && needs.detect-changes.outputs.handlers == 'true'
name: Diagnostic dump on failure
env:
+18 -20
View File
@@ -9,19 +9,19 @@
# Triggers on:
# - `pull_request_target`: opened, synchronize, reopened
# → initial status posts when PR opens / re-pushes
# - `pull_request_review` types: [submitted]
# - `pull_request_review_approved`
# → re-evaluate when a team member submits an APPROVE review so
# the gate flips immediately (no wait for the next push or
# slash-command). Verified live: sop-tier-check.yml uses this
# same event and provably fires (produces
# `sop-tier-check / tier-check (pull_request_review)` contexts).
# The job-level `if:` guard checks
# `github.event.review.state == 'APPROVED' || 'approved'` so
# only APPROVE reviews run the evaluator; COMMENT and
# REQUEST_CHANGES are skipped at the job level.
# Branch-protection requires the `(pull_request_target)`
# context variant, so the review-event path EXPLICITLY POSTS
# the required context via the API. Trust boundary preserved
# slash-command). Gitea Actions uses the specific event name
# `pull_request_review_approved` (not the GitHub-style
# `pull_request_review` catch-all). Verified via Gitea source
# code audit (go-gitea/gitea main, modules/webhook/type.go +
# services/actions/notifier.go). The event payload carries
# `review.type="pull_request_review_approved"`; there is no
# `review.state` field. Branch-protection requires the
# `(pull_request_target)` context variant, so the
# pull_request_review_approved path EXPLICITLY POSTS the
# required context via the API. Trust boundary preserved
# (BASE ref, no PR-head).
# - comment refires are handled by `sop-checklist.yml` review-refire job
# → `/qa-recheck` slash-command re-evaluates this gate.
@@ -97,8 +97,7 @@ name: qa-review
on:
pull_request_target:
types: [opened, synchronize, reopened]
pull_request_review:
types: [submitted]
pull_request_review_approved:
permissions:
contents: read
@@ -115,8 +114,7 @@ jobs:
# Comment-triggered refires live in sop-checklist.yml review-refire job.
if: |
github.event_name == 'pull_request_target' ||
(github.event_name == 'pull_request_review' &&
(github.event.review.state == 'APPROVED' || github.event.review.state == 'approved'))
github.event_name == 'pull_request_review_approved'
runs-on: ubuntu-latest
steps:
- name: Privilege check (A1.1 — INFORMATIONAL log only, NOT a gate)
@@ -176,8 +174,8 @@ jobs:
REVIEW_CHECK_STRICT: '0'
run: bash .gitea/scripts/review-check.sh
- name: Post required status context on pull_request_review
# Gitea Actions auto-publishes (pull_request_review) context
- name: Post required status context on pull_request_review_approved
# Gitea Actions auto-publishes (pull_request_review_approved) context
# for this event, but branch-protection requires (pull_request_target).
# We explicitly POST the BP-required context so the gate flips.
# Trust boundary: same BASE-ref script result, no PR-head code.
@@ -187,7 +185,7 @@ jobs:
# for the explicit status POST. Evaluator step stays on
# SOP_TIER_CHECK_TOKEN (read-only) per deliberate security
# separation: eval computes, POST writes, never the same cred.
if: github.event_name == 'pull_request_review' && always()
if: github.event_name == 'pull_request_review_approved' && always()
env:
GITEA_TOKEN: ${{ secrets.STATUS_POST_TOKEN }}
GITEA_HOST: git.moleculesai.app
@@ -213,10 +211,10 @@ jobs:
if [ "$EVAL_OUTCOME" = "success" ]; then
status_state="success"
description="Approved via pull_request_review trigger"
description="Approved via pull_request_review_approved trigger"
else
status_state="failure"
description="Review check failed via pull_request_review trigger"
description="Review check failed via pull_request_review_approved trigger"
fi
body=$(jq -nc \
+18 -20
View File
@@ -7,25 +7,24 @@
# See `qa-review.yml` header for the full A1-α / A1.1 / A4 / A5 design
# rationale; everything below is identical in shape.
#
# A1-α addendum (internal#760): review-event trigger added so the security
# gate flips immediately when a team member submits an APPROVE review.
# Uses `pull_request_review` types: [submitted] — verified live via
# sop-tier-check.yml which provably fires this event (produces
# `sop-tier-check / tier-check (pull_request_review)` contexts).
# The job-level `if:` guard checks
# `github.event.review.state == 'APPROVED' || 'approved'` so only APPROVE
# reviews run the evaluator; COMMENT and REQUEST_CHANGES are skipped at
# the job level. Branch-protection requires the `(pull_request_target)`
# context variant, so the review-event path EXPLICITLY POSTS the required
# context via the API. Trust boundary preserved (BASE ref, no PR-head).
# A1-α addendum (internal#760): `pull_request_review_approved` trigger
# added so the security gate flips immediately when a team member submits
# an APPROVE review. Gitea Actions uses the specific event name
# `pull_request_review_approved` (not the GitHub-style `pull_request_review`
# catch-all). Verified via Gitea source code audit (go-gitea/gitea main,
# modules/webhook/type.go + services/actions/notifier.go). The event
# payload carries `review.type="pull_request_review_approved"`; there is
# no `review.state` field. Branch-protection requires the
# `(pull_request_target)` context variant, so the
# pull_request_review_approved path EXPLICITLY POSTS the required context
# via the API. Trust boundary preserved (BASE ref, no PR-head).
name: security-review
on:
pull_request_target:
types: [opened, synchronize, reopened]
pull_request_review:
types: [submitted]
pull_request_review_approved:
permissions:
contents: read
@@ -42,8 +41,7 @@ jobs:
# Comment-triggered refires live in sop-checklist.yml review-refire job.
if: |
github.event_name == 'pull_request_target' ||
(github.event_name == 'pull_request_review' &&
(github.event.review.state == 'APPROVED' || github.event.review.state == 'approved'))
github.event_name == 'pull_request_review_approved'
runs-on: ubuntu-latest
steps:
- name: Privilege check (A1.1 — INFORMATIONAL log only, NOT a gate)
@@ -89,8 +87,8 @@ jobs:
REVIEW_CHECK_STRICT: '0'
run: bash .gitea/scripts/review-check.sh
- name: Post required status context on pull_request_review
# Gitea Actions auto-publishes (pull_request_review) context
- name: Post required status context on pull_request_review_approved
# Gitea Actions auto-publishes (pull_request_review_approved) context
# for this event, but branch-protection requires (pull_request_target).
# We explicitly POST the BP-required context so the gate flips.
# Trust boundary: same BASE-ref script result, no PR-head code.
@@ -100,7 +98,7 @@ jobs:
# for the explicit status POST. Evaluator step stays on
# SOP_TIER_CHECK_TOKEN (read-only) per deliberate security
# separation: eval computes, POST writes, never the same cred.
if: github.event_name == 'pull_request_review' && always()
if: github.event_name == 'pull_request_review_approved' && always()
env:
GITEA_TOKEN: ${{ secrets.STATUS_POST_TOKEN }}
GITEA_HOST: git.moleculesai.app
@@ -126,10 +124,10 @@ jobs:
if [ "$EVAL_OUTCOME" = "success" ]; then
status_state="success"
description="Approved via pull_request_review trigger"
description="Approved via pull_request_review_approved trigger"
else
status_state="failure"
description="Review check failed via pull_request_review trigger"
description="Review check failed via pull_request_review_approved trigger"
fi
body=$(jq -nc \
+6 -6
View File
@@ -179,10 +179,10 @@ jobs:
- name: Refire qa-review status
if: steps.classify.outputs.run_qa == 'true'
env:
# Evaluator (review-check.sh + GET /pulls) stays on read-scoped token.
# RFC_324_TEAM_READ_TOKEN is read-only (team membership read scope only).
# review-refire-status.sh POSTs to /statuses — requires write scope.
# SOP_TIER_CHECK_TOKEN carries write:repository + write:issue + read:organization.
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
# Explicit POST /statuses uses narrow-scoped write:repository token.
STATUS_POST_TOKEN: ${{ secrets.STATUS_POST_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.issue.number }}
@@ -198,10 +198,10 @@ jobs:
- name: Refire security-review status
if: steps.classify.outputs.run_security == 'true'
env:
# Evaluator (review-check.sh + GET /pulls) stays on read-scoped token.
# RFC_324_TEAM_READ_TOKEN is read-only (team membership read scope only).
# review-refire-status.sh POSTs to /statuses — requires write scope.
# SOP_TIER_CHECK_TOKEN carries write:repository + write:issue + read:organization.
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
# Explicit POST /statuses uses narrow-scoped write:repository token.
STATUS_POST_TOKEN: ${{ secrets.STATUS_POST_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.issue.number }}
+127
View File
@@ -18,6 +18,7 @@ No network. No live Gitea calls.
from __future__ import annotations
import importlib.util
import json
import os
import textwrap
from pathlib import Path
@@ -117,6 +118,31 @@ def _write_audit_yaml(tmp_path: Path, required_checks: list[str]) -> Path:
return p
def _write_audit_yaml_json(tmp_path: Path, required_checks_json: dict) -> Path:
"""Write a synthetic audit-force-merge.yml with REQUIRED_CHECKS_JSON env."""
block = json.dumps(required_checks_json, indent=2)
text = textwrap.dedent(
f"""\
name: audit-force-merge
on:
schedule:
- cron: '*/30 * * * *'
jobs:
audit:
runs-on: ubuntu-latest
steps:
- name: Run audit
env:
REQUIRED_CHECKS_JSON: |
{block.replace(chr(10), chr(10) + ' ')}
run: bash .gitea/scripts/audit-force-merge.sh
"""
)
p = tmp_path / "audit-force-merge.yml"
p.write_text(text, encoding="utf-8")
return p
def _make_stub_api(responses: dict):
"""Build a fake `api()` callable.
@@ -363,6 +389,107 @@ def test_happy_path_no_drift(drift_module, tmp_path, monkeypatch):
assert findings == [], findings
# --------------------------------------------------------------------------
# REQUIRED_CHECKS_JSON variant drift tests
# --------------------------------------------------------------------------
def test_f3a_env_wider_than_protection_json_variant(drift_module, tmp_path, monkeypatch):
"""F3a: REQUIRED_CHECKS_JSON env has a context NOT in protection."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml_json(
tmp_path,
{"main": ["ci / build (pull_request)", "ci / ghost (pull_request)"]},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3a" in f and "ghost" in f for f in findings), findings
def test_f3b_protection_wider_than_env_json_variant(drift_module, tmp_path, monkeypatch):
"""F3b: protection has a context NOT in REQUIRED_CHECKS_JSON env."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml_json(
tmp_path,
{"main": ["ci / build (pull_request)"]},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3b" in f and "ci / test (pull_request)" in f for f in findings), findings
def test_happy_path_no_drift_json_variant(drift_module, tmp_path, monkeypatch):
"""Happy path with REQUIRED_CHECKS_JSON: all aligned."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml_json(
tmp_path,
{
"main": [
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
]
},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert findings == [], findings
# --------------------------------------------------------------------------
# MUST-FIX 1: find_open_issue must raise on transient HTTP errors
# --------------------------------------------------------------------------
-11
View File
@@ -349,17 +349,6 @@ func main() {
codexauth.StartCodexAuthRefresher(c, db.DB)
})
// RFC internal#742 Part 2: wire the boot-failure rescue capture into
// the provision-timeout sweep's failure verdict. When the sweep flips
// a stuck workspace to `failed`, this hook captures a forensic rescue
// bundle off the still-running (but boot-failed) EC2 and ships it to
// obs/Loki before the control plane reaps the instance. Best-effort +
// non-blocking (handlers.BootFailureRescueHook dispatches on its own
// goroutine + timeout). The handler-side boot-failure path
// (WorkspaceHandler.BootstrapFailed) wires its own capture inline.
registry.BootFailureRescueHook = handlers.BootFailureRescueHook
// Provision-timeout sweep — flips workspaces that have been stuck in
// status='provisioning' past the timeout window to 'failed' and emits
// WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic
@@ -246,6 +246,20 @@ func MarkQueueItemFailed(ctx context.Context, id, errMsg string) {
}
}
// QueueDepth returns the number of currently-queued (not dispatched/completed)
// items for a workspace. Used by the busy-return response body so callers
// can see how many ahead of them.
func QueueDepth(ctx context.Context, workspaceID string) int {
var n int
if err := db.DB.QueryRowContext(ctx,
`SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`,
workspaceID,
).Scan(&n); err != nil {
log.Printf("A2AQueue: QueueDepth query failed for workspace %s: %v", workspaceID, err)
}
return n
}
// DropStaleQueueItems marks queued items older than maxAge as 'dropped' with a
// system-generated reason so PM agents stop processing stale post-incident noise.
// Called with a workspaceID to scope cleanup to one workspace, or empty to sweep
@@ -372,78 +372,3 @@ func TestApplyPlatformManagedLLMEnv_WorkspaceOriginCredExemptFromStrip(t *testin
t.Errorf("sqlmock expectations: %v", err)
}
}
// TestApplyPlatformManagedLLMEnv_MissingProxyEnvFailClosed is the #2162
// regression guard. A platform-managed workspace whose CP proxy env is absent
// must NOT start credential-less. The empty-proxy path must return
// HasUsableLLMCred=false so the caller aborts with MISSING_PLATFORM_PROXY.
//
// Mutation: revert the early-return from HasUsableLLMCred=false to true
// → workspace starts with zero credential → "container started but never
// called /registry/register" (600s provision-timeout sweep) → this test RED.
func TestApplyPlatformManagedLLMEnv_MissingProxyEnvFailClosed(t *testing.T) {
ctx := context.Background()
const wsID = "29b95be9-811e-4857-be36-1dafdbf4f697" // adk-demo failure workspace
mock := setupTestDB(t)
expectOverrideQuery(mock, wsID, "")
// No proxy env present — simulates the boot-race / misconfig path.
envVars := map[string]string{}
res := applyPlatformManagedLLMEnv(ctx, envVars, wsID, "claude-code", "moonshot/kimi-k2.6", nil)
if res.ResolvedMode != LLMBillingModePlatformManaged {
t.Fatalf("platform-managed model must stay platform_managed, got %q (source=%s)", res.ResolvedMode, res.Source)
}
// THE FIX: must NOT report usable credential when none was injected.
if res.HasUsableLLMCred {
t.Fatalf("empty proxy env → HasUsableLLMCred must be false (fail-closed), got true — the #2162 dark-wedge class")
}
// No credential env must be present.
if _, present := envVars["ANTHROPIC_API_KEY"]; present {
t.Errorf("empty proxy env must NOT inject ANTHROPIC_API_KEY")
}
if _, present := envVars["MOLECULE_LLM_USAGE_TOKEN"]; present {
t.Errorf("empty proxy env must NOT inject MOLECULE_LLM_USAGE_TOKEN")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
// TestApplyPlatformManagedLLMEnv_ProxyEnvPresentInjectsCredential is the
// positive-path pair to the #2162 regression guard: when the CP proxy env IS
// present, the platform-managed path must inject ANTHROPIC_API_KEY +
// ANTHROPIC_BASE_URL for an Anthropic-native runtime and report
// HasUsableLLMCred=true.
func TestApplyPlatformManagedLLMEnv_ProxyEnvPresentInjectsCredential(t *testing.T) {
ctx := context.Background()
const wsID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
mock := setupTestDB(t)
expectOverrideQuery(mock, wsID, "")
envVars := map[string]string{}
// Simulate the CP proxy env being present (as it is in production).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.moleculesai.app/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "https://api.moleculesai.app/api/v1/internal/llm/anthropic/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "PLATFORM-PROXY-TOKEN")
res := applyPlatformManagedLLMEnv(ctx, envVars, wsID, "claude-code", "moonshot/kimi-k2.6", nil)
if res.ResolvedMode != LLMBillingModePlatformManaged {
t.Fatalf("expected platform_managed, got %q", res.ResolvedMode)
}
if !res.HasUsableLLMCred {
t.Fatalf("proxy env present → HasUsableLLMCred must be true, got false")
}
if envVars["ANTHROPIC_API_KEY"] != "PLATFORM-PROXY-TOKEN" {
t.Errorf("ANTHROPIC_API_KEY must be injected with the platform proxy token; got %q", envVars["ANTHROPIC_API_KEY"])
}
if envVars["ANTHROPIC_BASE_URL"] != "https://api.moleculesai.app/api/v1/internal/llm/anthropic/v1" {
t.Errorf("ANTHROPIC_BASE_URL must be injected with the platform anthropic proxy; got %q", envVars["ANTHROPIC_BASE_URL"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
@@ -1,168 +0,0 @@
package handlers
// rescue_read.go — GET /workspaces/:id/rescue (RFC internal#742 Part 3).
//
// Serves the LATEST post-mortem rescue bundle captured for a
// boot-failed/terminated workspace, so "why won't my agent boot" is
// answerable WITHOUT a live instance. Powers the future canvas
// "Why did this fail?" panel.
//
// Read-path: the bundle is read from the queryable rescue_bundles table
// (internal/rescuestore), NOT from obs/Loki. Part 2 ships the bundle via
// internal/audit (Loki-only); reading from Loki would require obs read
// creds the tenant deliberately lacks. Part 3 persists the
// already-redacted bundle on capture and serves it here — see the
// migration header for the full rationale.
//
// Auth/scoping: registered on the WorkspaceAuth-guarded /workspaces/:id
// group (same gate as /files/* and /exec), so the caller must hold a
// valid per-workspace or org bearer token for :id. TenantGuard already
// 404s cross-org requests at the routing layer; on top of that the store
// read is org-scoped by MOLECULE_ORG_ID, so a row written under a
// different org is never returned (defense in depth).
//
// Redaction: the stored sections were already scrubbed at capture time
// (Part 2's SAFE-T1201 secret-scan). This handler returns them verbatim
// — it never re-ships or re-derives secrets.
import (
"log"
"net/http"
"os"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore"
"github.com/gin-gonic/gin"
)
// maxResponseSections bounds how many sections the read response
// returns. The fixed capture set is small (6), so this is a backstop
// against a future capture set growth or a hand-written row — keeps the
// JSON response bounded regardless of what's stored. Per-section content
// is already clamped at persist time (rescuestore.maxSectionBytes).
const maxResponseSections = 64
// RescueReadHandler serves GET /workspaces/:id/rescue. The store is
// injected so tests fake it; production wires a Postgres store over
// db.DB (see NewRescueReadHandler).
type RescueReadHandler struct {
store rescuestore.Store
}
// NewRescueReadHandler builds the handler over the package db.DB. db.DB
// is nil in some unit-test binaries; the handler tolerates that by
// returning 503 rather than nil-deref (the store guards nil db).
func NewRescueReadHandler() *RescueReadHandler {
return &RescueReadHandler{store: rescuestore.NewPostgres(db.DB)}
}
// WithStore overrides the store (test seam). Returns the handler for
// chaining.
func (h *RescueReadHandler) WithStore(s rescuestore.Store) *RescueReadHandler {
h.store = s
return h
}
// rescueSection is one labelled chunk in the read response.
type rescueSection struct {
Name string `json:"name"`
Content string `json:"content"`
Redacted bool `json:"redacted"`
}
// rescueReadResponse is the JSON shape returned for a found bundle.
// `sections` is an ordered array (capture reading order), not a map, so
// the order config→logs→state→env is preserved for the canvas panel.
type rescueReadResponse struct {
WorkspaceID string `json:"workspace_id"`
CapturedAt time.Time `json:"captured_at"`
Reason string `json:"reason"`
InstanceID string `json:"instance_id"`
Sections []rescueSection `json:"sections"`
// Truncated is true when the stored bundle had more sections than
// maxResponseSections and the response was capped.
Truncated bool `json:"truncated,omitempty"`
}
// GetRescue handles GET /workspaces/:id/rescue.
//
// 200 — latest rescue bundle for the workspace (org-scoped).
// 404 — no rescue bundle on file for this workspace (or wrong org).
// 503 — store/datastore unavailable.
func (h *RescueReadHandler) GetRescue(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
if h.store == nil {
log.Printf("GetRescue: store not configured for ws=%s", workspaceID)
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "rescue store unavailable",
"code": "platform_unavailable",
})
return
}
// org_id is the tenant's configured org (one tenant = one org).
// Fail closed: an empty org_id disables org isolation and must not
// reach the store (#2020).
orgID := os.Getenv("MOLECULE_ORG_ID")
if orgID == "" {
log.Printf("GetRescue: missing MOLECULE_ORG_ID for ws=%s", workspaceID)
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "rescue org not configured",
"code": "platform_misconfigured",
})
return
}
stored, err := h.store.GetLatest(ctx, workspaceID, orgID)
if err != nil {
// Per the Store contract a missing bundle is (nil, nil), NOT an
// error — so any error here is a genuine datastore fault → 503,
// never a masquerading 404 that would hide an outage.
log.Printf("GetRescue: store query failed for ws=%s: %v", workspaceID, err)
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "rescue store query failed",
"code": "platform_unavailable",
})
return
}
if stored == nil {
// No bundle captured (workspace never boot-failed, or its grace
// window lapsed). 404 — existence-non-inferring; a workspace in a
// sibling org reaches the same 404 via the org filter.
c.JSON(http.StatusNotFound, gin.H{"error": "no rescue bundle for this workspace"})
return
}
resp := buildRescueResponse(workspaceID, stored)
c.JSON(http.StatusOK, resp)
}
// buildRescueResponse maps a stored bundle to the read response, bounding
// the section count. Split out so the mapping/limit is unit-testable.
func buildRescueResponse(workspaceID string, stored *rescuestore.StoredBundle) rescueReadResponse {
secs := stored.Bundle.Sections
truncated := false
if len(secs) > maxResponseSections {
secs = secs[:maxResponseSections]
truncated = true
}
out := make([]rescueSection, 0, len(secs))
for _, s := range secs {
// rescue.Section and rescueSection are field-identical; the
// explicit conversion keeps the handler's JSON shape independent
// of the leaf package's struct (which could gain non-response
// fields later).
out = append(out, rescueSection(s))
}
return rescueReadResponse{
WorkspaceID: workspaceID,
CapturedAt: stored.CapturedAt,
Reason: stored.Bundle.Reason,
InstanceID: stored.Bundle.InstanceID,
Sections: out,
Truncated: truncated,
}
}
@@ -1,238 +0,0 @@
package handlers
// Tests for GET /workspaces/:id/rescue (RFC internal#742 Part 3).
//
// These exercise the handler against a FAKE store (no DB) so every path
// is deterministic without external infra:
// - returns the latest bundle in the documented shape
// - 404 when no bundle exists for the workspace
// - org-scoping: the handler passes the tenant's MOLECULE_ORG_ID to
// the store, so a fake that returns nil for a mismatched org proves a
// sibling org cannot read another org's bundle
// - 503 on a store/datastore error (not a 404 masquerade)
// - redaction/shape preserved: stored sections are returned verbatim,
// no re-derivation
//
// WorkspaceAuth gating itself is covered by the middleware tests; here we
// invoke the handler directly (the route is registered on the wsAuth
// group in router.go).
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore"
"github.com/gin-gonic/gin"
)
func init() { gin.SetMode(gin.TestMode) }
// fakeRescueStore records the args it was called with and returns a
// scripted result. Implements rescuestore.Store.
type fakeRescueStore struct {
// gotWorkspaceID/gotOrgID capture what the handler passed.
gotWorkspaceID string
gotOrgID string
// ret/err are the scripted GetLatest result.
ret *rescuestore.StoredBundle
err error
}
func (f *fakeRescueStore) Persist(_ context.Context, _ rescue.Bundle) error { return nil }
func (f *fakeRescueStore) GetLatest(_ context.Context, workspaceID, orgID string) (*rescuestore.StoredBundle, error) {
f.gotWorkspaceID = workspaceID
f.gotOrgID = orgID
return f.ret, f.err
}
// doRescueGet runs the handler for ws against the given fake and returns
// the recorder. orgEnv sets MOLECULE_ORG_ID for the duration.
func doRescueGet(t *testing.T, ws, orgEnv string, fake *fakeRescueStore) *httptest.ResponseRecorder {
t.Helper()
t.Setenv("MOLECULE_ORG_ID", orgEnv)
h := (&RescueReadHandler{}).WithStore(fake)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: ws}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+ws+"/rescue", nil)
h.GetRescue(c)
return w
}
// sampleStored builds a representative stored bundle with a redacted +
// a failure-marker section.
func sampleStored() *rescuestore.StoredBundle {
return &rescuestore.StoredBundle{
CapturedAt: time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC),
Bundle: rescue.Bundle{
WorkspaceID: "ws-1",
OrgID: "org-9",
InstanceID: "i-abc123",
Reason: "provision_timeout_sweep",
Sections: []rescue.Section{
{Name: "config.yaml", Content: "model: gpt-4\nANTHROPIC_API_KEY=[REDACTED]", Redacted: true},
{Name: "docker-ps", Content: "(rescue: section collection failed: ssh blip)", Redacted: false},
},
},
}
}
// TestGetRescue_ReturnsLatestBundle — happy path: 200 with the full
// documented shape, sections in order, redaction-preserved.
func TestGetRescue_ReturnsLatestBundle(t *testing.T) {
fake := &fakeRescueStore{ret: sampleStored()}
w := doRescueGet(t, "ws-1", "org-9", fake)
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200; body=%s", w.Code, w.Body.String())
}
var resp struct {
WorkspaceID string `json:"workspace_id"`
CapturedAt time.Time `json:"captured_at"`
Reason string `json:"reason"`
InstanceID string `json:"instance_id"`
Sections []struct {
Name string `json:"name"`
Content string `json:"content"`
Redacted bool `json:"redacted"`
} `json:"sections"`
}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode: %v; body=%s", err, w.Body.String())
}
if resp.WorkspaceID != "ws-1" {
t.Errorf("workspace_id = %q, want ws-1", resp.WorkspaceID)
}
if resp.Reason != "provision_timeout_sweep" {
t.Errorf("reason = %q", resp.Reason)
}
if resp.InstanceID != "i-abc123" {
t.Errorf("instance_id = %q", resp.InstanceID)
}
if !resp.CapturedAt.Equal(time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC)) {
t.Errorf("captured_at = %v", resp.CapturedAt)
}
if len(resp.Sections) != 2 {
t.Fatalf("sections = %d, want 2", len(resp.Sections))
}
// Order preserved: config first, docker-ps second.
if resp.Sections[0].Name != "config.yaml" || resp.Sections[1].Name != "docker-ps" {
t.Errorf("section order wrong: %q, %q", resp.Sections[0].Name, resp.Sections[1].Name)
}
// Redaction-preserved: the redacted flag rides through untouched, and
// the failure marker stays a non-redacted marker.
if !resp.Sections[0].Redacted {
t.Error("config.yaml section should be redacted=true")
}
if resp.Sections[1].Redacted {
t.Error("failure-marker section should be redacted=false")
}
// Handler does NOT re-derive secrets; stored [REDACTED] verbatim.
if want := "ANTHROPIC_API_KEY=[REDACTED]"; !strings.Contains(resp.Sections[0].Content, want) {
t.Errorf("section content = %q, want it to contain %q", resp.Sections[0].Content, want)
}
}
// TestGetRescue_404WhenNone — no bundle on file → 404, not 500/200.
func TestGetRescue_404WhenNone(t *testing.T) {
fake := &fakeRescueStore{ret: nil} // store returns (nil, nil)
w := doRescueGet(t, "ws-none", "org-9", fake)
if w.Code != http.StatusNotFound {
t.Fatalf("status = %d, want 404; body=%s", w.Code, w.Body.String())
}
}
// TestGetRescue_OrgScopingPassedToStore — the handler must hand the
// tenant's MOLECULE_ORG_ID to the store, and a store that returns nil for
// a mismatched org yields 404. This is the sibling-org isolation: a
// caller in org B (a different tenant process, MOLECULE_ORG_ID=org-B)
// reading ws-1 (which belongs to org-9) gets the org filter applied → no
// row → 404.
func TestGetRescue_OrgScopingPassedToStore(t *testing.T) {
// Tenant configured as a DIFFERENT org than the bundle's owner.
// Fake mimics the Postgres org filter: returns nil because org-B
// doesn't match the row's org-9.
fake := &fakeRescueStore{ret: nil}
w := doRescueGet(t, "ws-1", "org-B", fake)
if fake.gotOrgID != "org-B" {
t.Errorf("store got org_id = %q, want the tenant's org-B", fake.gotOrgID)
}
if fake.gotWorkspaceID != "ws-1" {
t.Errorf("store got workspace_id = %q, want ws-1", fake.gotWorkspaceID)
}
if w.Code != http.StatusNotFound {
t.Fatalf("sibling-org read: status = %d, want 404", w.Code)
}
}
// TestGetRescue_EmptyOrgEnvRejected — empty MOLECULE_ORG_ID is a
// fail-closed security violation (#2020). The handler must 503 before
// calling the store, so the org filter cannot be bypassed.
func TestGetRescue_EmptyOrgEnvRejected(t *testing.T) {
fake := &fakeRescueStore{ret: sampleStored()}
w := doRescueGet(t, "ws-1", "", fake)
if fake.gotOrgID != "" {
t.Errorf("store was called with org_id = %q; want no call when env empty", fake.gotOrgID)
}
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("status = %d, want 503; body=%s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "platform_misconfigured") {
t.Fatalf("body = %s, want platform_misconfigured code", w.Body.String())
}
}
// TestGetRescue_StoreErrorIs503 — an actual datastore error must surface
// as 503, never a 404 (which would hide an outage as "no bundle").
func TestGetRescue_StoreErrorIs503(t *testing.T) {
fake := &fakeRescueStore{err: errors.New("connection refused")}
w := doRescueGet(t, "ws-1", "org-9", fake)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("status = %d, want 503; body=%s", w.Code, w.Body.String())
}
}
// TestGetRescue_NilStoreIs503 — defensive: a handler with no store wired
// (db.DB nil in a degraded boot) returns 503, never panics.
func TestGetRescue_NilStoreIs503(t *testing.T) {
t.Setenv("MOLECULE_ORG_ID", "org-9")
h := &RescueReadHandler{} // store == nil
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/rescue", nil)
h.GetRescue(c)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("status = %d, want 503", w.Code)
}
}
// TestBuildRescueResponse_BoundsSections — a stored bundle with more than
// maxResponseSections sections is capped + flagged truncated.
func TestBuildRescueResponse_BoundsSections(t *testing.T) {
many := make([]rescue.Section, maxResponseSections+5)
for i := range many {
many[i] = rescue.Section{Name: "s", Content: "c", Redacted: true}
}
stored := &rescuestore.StoredBundle{
CapturedAt: time.Now(),
Bundle: rescue.Bundle{WorkspaceID: "ws-1", Sections: many},
}
resp := buildRescueResponse("ws-1", stored)
if len(resp.Sections) != maxResponseSections {
t.Errorf("sections = %d, want capped at %d", len(resp.Sections), maxResponseSections)
}
if !resp.Truncated {
t.Error("truncated flag should be set when sections were capped")
}
}
@@ -1,168 +0,0 @@
package handlers
// rescue_wiring.go — bridges the leaf internal/rescue package to the
// handlers package's EIC/SSH runner + secret redactor, and exposes the
// boot-failure rescue hook used by both boot-failure verdict paths
// (handlers.BootstrapFailed here, registry.sweepStuckProvisioning via
// an injected hook wired in main.go).
//
// Why the indirection: internal/rescue is a leaf so registry (which
// must NOT import handlers — that's an import cycle) can call it. The
// two heavy dependencies live here in handlers — `withEICTunnel`
// (the EIC keypair → push → tunnel → ssh dance) and `redactSecrets`
// (the SAFE-T1201 secret-scan) — so we inject them into rescue's
// package-level func vars at init().
//
// RFC internal#742 Part 2.
import (
"bytes"
"context"
"database/sql"
"fmt"
"os"
"os/exec"
"strings"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore"
)
func init() {
// Wire the leaf rescue package to handlers' EIC runner + redactor.
// Done in init() (not main.go) so the binding is present for any
// caller of rescue.Capture, including the registry sweeper hook and
// the handler path, without each call site re-wiring it.
rescue.RunRemote = rescueRunRemoteViaEIC
rescue.Redact = func(workspaceID, content string) string {
out, _ := redactSecrets(workspaceID, content)
return out
}
// Part 3: persist the redacted bundle to the queryable store on
// capture so GET /workspaces/:id/rescue can serve it without obs/Loki
// read creds. db.DB is resolved per-call (rescuestore guards a nil
// handle) so wiring at init() is safe even before InitPostgres has
// run; a capture before the DB is up logs + skips the persist rather
// than failing the boot-failure path.
rescue.PersistBundle = func(ctx context.Context, b rescue.Bundle) error {
return rescuestore.NewPostgres(db.DB).Persist(ctx, b)
}
}
// rescueRunRemoteViaEIC runs a single shell command on the still-running
// (but boot-failed) workspace EC2 over an EIC tunnel and returns its
// combined stdout+stderr. Reuses the same `withEICTunnel` dance as the
// canvas file ops (ephemeral keypair → SendSSHPublicKey → open-tunnel →
// ssh) so the rescue path inherits every fix to the EIC mechanism (e.g.
// PR #2822's LogLevel=ERROR shim) for free.
//
// Combined output (2>&1) is intentional: a boot-failed box's most
// useful signal is often on stderr (a panic, a missing-file error), and
// the rescue bundle is a forensic blob, not a parsed value — we want
// everything the command emitted.
func rescueRunRemoteViaEIC(ctx context.Context, instanceID, command string) (string, error) {
var combined []byte
runErr := withEICTunnel(ctx, instanceID, func(s eicSSHSession) error {
sshCmd := exec.CommandContext(ctx, "ssh", s.sshArgs(command)...)
sshCmd.Env = os.Environ()
var buf bytes.Buffer
sshCmd.Stdout = &buf
sshCmd.Stderr = &buf
// A non-zero remote exit is NOT a transport error for the rescue
// path — each section command already falls back to an
// `|| echo '(...)'` marker, so a clean exit is expected. Only
// surface an error when ssh/tunnel itself failed AND produced no
// output to ship.
err := sshCmd.Run()
combined = buf.Bytes()
if err != nil && len(combined) == 0 {
return fmt.Errorf("rescue ssh exec: %w", err)
}
return nil
})
if runErr != nil {
return "", runErr
}
return strings.TrimRight(string(combined), "\n"), nil
}
// captureRescueBundle fires a best-effort, non-blocking rescue capture
// for a boot-failed workspace. It is the single entry point both
// boot-failure verdict paths funnel through.
//
// NON-BLOCKING: the actual collection runs in its own goroutine with
// its own timeout (rescue.CaptureTimeout), detached from the caller's
// request/sweep context so it can't add latency to — or be cancelled
// by — the failure-handling path that triggered it. We snapshot the
// identity into a fresh context.Background() for the same reason: a
// gin request context is cancelled the instant the HTTP handler
// returns, which would kill the EIC tunnel mid-collection.
//
// instanceID/orgID are resolved here (best-effort) so the two call
// sites only need the workspace id. A missing instance id → rescue.Capture
// no-ops (logged), so an early-failure workspace that never got an EC2
// is handled cleanly.
func captureRescueBundle(workspaceID, reason string) {
rescueDispatch(func() {
ctx := context.Background()
instanceID, err := rescueResolveInstanceID(ctx, workspaceID)
if err != nil {
// Best-effort: a resolve failure is logged inside Capture's
// caller chain; pass empty so Capture no-ops cleanly.
instanceID = ""
}
rescue.Capture(ctx, rescue.Input{
InstanceID: instanceID,
WorkspaceID: workspaceID,
OrgID: os.Getenv("MOLECULE_ORG_ID"),
Reason: reason,
})
})
}
// rescueDispatch runs the rescue collection off the request path. In
// production it's `go fn()` so the capture never blocks or adds latency
// to the boot-failure handler. Tests swap it for a synchronous runner so
// they can assert the capture fired (or didn't) deterministically
// without racing the goroutine.
var rescueDispatch = func(fn func()) { go fn() }
// BootFailureRescueHook is the registry-facing adapter wired into
// registry.BootFailureRescueHook from main.go. The registry sweeper
// already resolved the instance id (it's in the candidate row), so this
// path uses it directly rather than re-querying — symmetric with the
// captureRescueBundle handler path but skipping the lookup.
//
// Best-effort + non-blocking: dispatches the capture on its own
// goroutine with its own timeout, so the sweep loop is never slowed.
func BootFailureRescueHook(workspaceID, instanceID, reason string) {
go rescue.Capture(context.Background(), rescue.Input{
InstanceID: instanceID,
WorkspaceID: workspaceID,
OrgID: os.Getenv("MOLECULE_ORG_ID"),
Reason: reason,
})
}
// rescueResolveInstanceID looks up the EC2 instance id for a workspace.
// Package var so tests can stub it without a sqlmock. Mirrors
// provisioner.resolveInstanceID (same query) but lives here to keep the
// rescue wiring self-contained and avoid widening the provisioner
// surface.
var rescueResolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) {
if db.DB == nil {
return "", nil // nil in unit tests
}
var instanceID sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&instanceID)
if err != nil && err != sql.ErrNoRows {
return "", err
}
if !instanceID.Valid {
return "", nil
}
return instanceID.String, nil
}
@@ -1,119 +0,0 @@
package handlers
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// rescueTestHarness makes the otherwise-async rescue capture
// deterministic + observable for handler tests:
// - rescueDispatch runs synchronously (no goroutine race).
// - rescueResolveInstanceID returns a fixed instance id.
// - rescue.RunRemote / rescue.Redact are stubbed so no real EIC/SSH
// fires; runCalls counts how many remote-command collections ran,
// which is the proxy for "did the capture fire".
//
// All originals are restored on cleanup.
func rescueTestHarness(t *testing.T, instanceID string) (runCalls *int) {
t.Helper()
n := 0
runCalls = &n
prevDispatch := rescueDispatch
rescueDispatch = func(fn func()) { fn() } // synchronous
prevResolve := rescueResolveInstanceID
rescueResolveInstanceID = func(_ context.Context, _ string) (string, error) { return instanceID, nil }
prevRun, prevRedact := rescue.RunRemote, rescue.Redact
rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { n++; return "out", nil }
rescue.Redact = func(_ws, c string) string { return c }
t.Cleanup(func() {
rescueDispatch = prevDispatch
rescueResolveInstanceID = prevResolve
rescue.RunRemote = prevRun
rescue.Redact = prevRedact
})
return runCalls
}
// TestBootstrapFailed_FiresRescueOnFlip — the RFC internal#742 handler
// hook: when BootstrapFailed actually flips a workspace to `failed`
// (affected==1), the rescue capture fires against the resolved instance.
func TestBootstrapFailed_FiresRescueOnFlip(t *testing.T) {
h, mock := setupBootstrapHandler(t)
runCalls := rescueTestHarness(t, "i-failed01")
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-crashed", sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("WORKSPACE_PROVISION_FAILED", "ws-crashed", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-crashed"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-crashed/bootstrap-failed",
bytes.NewBufferString(`{"error":"codex provider derivation failed","log_tail":"panic"}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
if *runCalls != len(rescueBundleSectionCount()) {
t.Errorf("rescue capture ran %d remote commands, want %d (one per bundle section)", *runCalls, len(rescueBundleSectionCount()))
}
}
// TestBootstrapFailed_NoRescueOnNoChange — an already-transitioned
// workspace (affected==0: raced to online, or double-report) is NOT a
// boot-failure verdict here, so the rescue capture must NOT fire.
func TestBootstrapFailed_NoRescueOnNoChange(t *testing.T) {
h, mock := setupBootstrapHandler(t)
runCalls := rescueTestHarness(t, "i-online01")
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-online", sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 0)) // already transitioned
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-online"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-online/bootstrap-failed",
bytes.NewBufferString(`{"error":"late report","log_tail":""}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d", w.Code)
}
if *runCalls != 0 {
t.Errorf("rescue capture fired (%d cmds) on a no-change report; it must only fire on a real flip", *runCalls)
}
}
// rescueBundleSectionCount returns the production rescue bundle section
// list length by running a capture against a counting runner once. It's
// a small indirection so the handler test stays decoupled from the exact
// section set in internal/rescue (which has its own tests).
func rescueBundleSectionCount() []struct{} {
count := 0
prevRun, prevRedact := rescue.RunRemote, rescue.Redact
rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { count++; return "", nil }
rescue.Redact = func(_ws, c string) string { return c }
rescue.Capture(context.Background(), rescue.Input{InstanceID: "i-probe", WorkspaceID: "w", OrgID: "o"})
rescue.RunRemote = prevRun
rescue.Redact = prevRedact
return make([]struct{}, count)
}
@@ -91,18 +91,6 @@ func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) {
"log_tail": tail,
"source": "bootstrap_watcher",
})
// RFC internal#742 Part 2: this is one of the two boot-failure
// verdict points. We've just flipped a still-running (but
// unconfigured) workspace EC2 to `failed`; the control plane will
// reap the instance shortly. Capture a forensic rescue bundle off
// the live box NOW, before it's torn down, so a wedged workspace is
// post-mortem-inspectable. Best-effort + non-blocking: runs in its
// own goroutine with its own timeout, detached from this request's
// context (which is cancelled the instant this handler returns).
// Failure to capture never changes the boot-failure handling.
captureRescueBundle(id, "bootstrap_watcher")
log.Printf("BootstrapFailed: marked %s failed (tail=%d bytes, err=%q)", id, len(tail), errMsg)
c.JSON(http.StatusOK, gin.H{"ok": true})
}
@@ -93,16 +93,3 @@ func formatMissingBYOKCredentialError(mode string) string {
mode,
)
}
// formatMissingPlatformProxyError builds the user-facing message for a
// provision failure caused by a platform-managed workspace whose control-plane
// proxy environment is absent (#2162). The platform-managed path requires
// MOLECULE_LLM_BASE_URL + MOLECULE_LLM_USAGE_TOKEN (or their OPENAI_*
// fallbacks) to inject a usable credential; without them the workspace must
// NOT start credential-less.
func formatMissingPlatformProxyError() string {
return "this workspace is configured for platform-managed LLM billing but the control-plane proxy is not ready. " +
"The required platform proxy env (MOLECULE_LLM_BASE_URL + MOLECULE_LLM_USAGE_TOKEN) is absent. " +
"This is usually a transient boot-race; retry in 30 seconds. If it persists, verify the platform proxy " +
"is configured for this tenant/runtime and contact the platform team."
}
@@ -1003,13 +1003,12 @@ func applyPlatformManagedLLMEnv(ctx context.Context, envVars map[string]string,
anthropicBaseURL := firstNonEmptyEnv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "ANTHROPIC_BASE_URL")
token := firstNonEmptyEnv("MOLECULE_LLM_USAGE_TOKEN", "OPENAI_API_KEY")
if baseURL == "" || token == "" {
// Proxy not configured (boot race / misconfig). The platform_managed
// path REQUIRES the CP proxy env to inject a usable credential.
// Reporting HasUsableLLMCred=true here would start the workspace
// credential-less — the adk-demo dark-wedge class (#2162).
// Return false so the caller's fail-closed branch aborts with
// MISSING_PLATFORM_PROXY.
return platformLLMEnvResult{ResolvedMode: res.ResolvedMode, HasUsableLLMCred: false, Source: res.Source}
// Proxy not configured (boot race / misconfig). On the platform_managed
// path the workspace IS entitled to platform creds, so we do NOT strip
// here — but we report HasUsableLLMCred from whatever survived so the
// caller's fail-closed branch (non-platform only) is never reached on
// this path.
return platformLLMEnvResult{ResolvedMode: res.ResolvedMode, HasUsableLLMCred: true, Source: res.Source}
}
stripPlatformManagedLLMBypassEnv(envVars)
@@ -134,11 +134,6 @@ func TestProvisionWorkspaceAuto_NoBackendMarksFailed(t *testing.T) {
// This is the regression-prevention test for the Design Director bug
// where 7-of-7 sub-agents went down the Docker path on SaaS.
func TestProvisionWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
mock.MatchExpectationsInOrder(false)
@@ -602,11 +597,6 @@ func TestNoCallSiteCallsBareStop(t *testing.T) {
// count without mocking out the retry helper itself, which would
// invert the test contract — the retry IS the dispatcher's job here).
func TestRestartWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
rec := &trackingCPProv{}
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
@@ -805,11 +795,6 @@ func TestResumeHandler_UsesProvisionWorkspaceAuto(t *testing.T) {
// the async tests; the absence of `go` semantics is the load-bearing
// distinction we're pinning.
func TestProvisionWorkspaceAutoSync_RoutesToCPWhenSet(t *testing.T) {
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
mock.MatchExpectationsInOrder(false)
// provisionWorkspaceCP runs prepareProvisionContext synchronously, which
@@ -98,11 +98,6 @@ func (r *recordingCPProv) startedSet() map[string]struct{} {
func TestProvisionWorkspaceCP_ConcurrentBurst_NoSilentDrop(t *testing.T) {
const numWorkspaces = 7
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
// Every goroutine runs prepareProvisionContext → mintWorkspaceSecrets
@@ -230,18 +230,6 @@ func (h *WorkspaceHandler) prepareProvisionContext(
Extra: map[string]interface{}{"error": msg, "code": "MISSING_BYOK_CREDENTIAL", "billing_mode": llmRes.ResolvedMode, "issue": "1994"},
}
}
// Fail closed for a platform-managed workspace whose CP proxy env is
// absent: do NOT start it credential-less (adk-demo dark-wedge class,
// #2162). The platform_managed path requires the proxy injection to
// produce a usable credential.
if llmRes.ResolvedMode == LLMBillingModePlatformManaged && !llmRes.HasUsableLLMCred {
msg := formatMissingPlatformProxyError()
log.Printf("Provisioner: ABORT workspace=%s — platform-managed billing mode but CP proxy env absent (MISSING_PLATFORM_PROXY, molecule-core#2162)", workspaceID)
return nil, &provisionAbort{
Msg: msg,
Extra: map[string]interface{}{"error": msg, "code": "MISSING_PLATFORM_PROXY", "billing_mode": llmRes.ResolvedMode, "issue": "2162"},
}
}
applyRuntimeModelEnv(envVars, payload.Runtime, payload.Model)
if payload.Role != "" {
envVars["MOLECULE_AGENT_ROLE"] = payload.Role
@@ -264,11 +264,6 @@ func TestPrepareProvisionContext_ParentIDInjection(t *testing.T) {
},
}
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mock := setupTestDB(t)
@@ -336,10 +331,6 @@ func TestPrepareProvisionContext_InjectsGitHTTPCredsFromPersonaToken(t *testing.
}
}
t.Setenv("MOLECULE_PERSONA_ROOT", root)
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
cases := []struct {
name string
@@ -468,10 +459,6 @@ func TestPrepareProvisionContext_WorkspaceSecretWinsOverPersonaToken(t *testing.
t.Fatal(err)
}
t.Setenv("MOLECULE_PERSONA_ROOT", root)
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM global_secrets`).
@@ -1424,11 +1424,6 @@ func (s *stubFailingCPProv) IsRunning(_ context.Context, _ string) (bool, error)
// the broadcast payload would surface every marker; the canned
// "provisioning failed" message must surface none of them.
func TestProvisionWorkspaceCP_NoInternalErrorsInBroadcast(t *testing.T) {
// Supply the CP proxy env so the platform-managed default does not abort
// with MISSING_PLATFORM_PROXY (molecule-core#2162).
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
mock := setupTestDB(t)
// loadWorkspaceSecrets queries global_secrets and workspace_secrets
@@ -10,15 +10,8 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
)
// rescueVolumeGraceHours surfaces the rescue grace as whole hours for
// the retention-contract assertion (RFC internal#742 Part 2).
func rescueVolumeGraceHours() int {
return int(rescue.RescueVolumeGrace.Hours())
}
// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode
// sweeper tests. Records every Stop call so tests can assert which
// workspace IDs were re-issued.
@@ -104,55 +97,6 @@ func TestCPSweepOnce_NoOrphans(t *testing.T) {
}
}
// TestCPSweepOnce_DoesNotReapFailedWorkspace — RFC internal#742 Part 2
// volume-retention guarantee, molecule-core side.
//
// A boot-FAILED workspace (status='failed') must NOT be terminated by
// the platform's orphan sweeper: its instance + /configs data volume are
// retained through the rescue grace (rescue.RescueVolumeGrace) so a live
// rescue read is possible, distinct from the user-prune erase path. The
// sweeper reaps ONLY status='removed' (the explicit deprovision path),
// so a `failed` row is structurally excluded at the SELECT — it never
// reaches reaper.Stop. We assert the predicate filters to 'removed'
// (so the failed instance survives) and that no Stop fires for a DB
// whose only orphan-shaped row is `failed`.
//
// This is the "if the sweeper already keeps volumes by default, confirm
// + add a test asserting it" branch of the RFC: it does, by construction.
func TestCPSweepOnce_DoesNotReapFailedWorkspace(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
// The sweeper's SELECT carries `status = 'removed'`. A boot-failed
// workspace (status='failed') does not match that predicate, so the
// real DB returns it nowhere in this result set — modelled as the
// empty result the `removed`-only filter produces when the only
// instance-bearing row is `failed`. The regex pins the retention-
// critical predicate so a future widening to include 'failed' (which
// would terminate boot-failed boxes mid-rescue) fails this test.
mock.ExpectQuery(`(?s)WHERE status = 'removed'\s+AND instance_id IS NOT NULL`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"})) // failed row excluded by predicate
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("boot-failed workspace must be RETAINED (no terminate); got Stop calls %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestRescueVolumeGraceIsDistinctFromPrune documents that the rescue
// grace is its own contract (24h) and not coupled to any prune timing —
// the value is the SSOT the control-plane reaper must honour.
func TestRescueVolumeGraceIsDistinctFromPrune(t *testing.T) {
if rescueVolumeGraceHours() != 24 {
t.Errorf("rescue volume grace = %dh, want 24h (RFC internal#742)", rescueVolumeGraceHours())
}
}
// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd
// independently; one failure doesn't block others.
func TestCPSweepOnce_MultipleOrphans(t *testing.T) {
@@ -92,23 +92,6 @@ func provisioningTimeoutFor(runtime string, lookup RuntimeTimeoutLookup) time.Du
return DefaultProvisioningTimeout
}
// BootFailureRescueHook, when wired, is invoked once per workspace the
// sweep flips from `provisioning` to `failed` — i.e. on the boot-failure
// verdict, BEFORE the control plane reaps the instance. It captures a
// forensic rescue bundle off the still-running (but boot-failed) EC2 and
// ships it to obs/Loki (RFC internal#742 Part 2). Wired in main.go to
// handlers.captureRescueBundle via a thin adapter; nil in tests + on
// self-hosted deploys (no rescue shipping there).
//
// Function-typed injection (not an import of handlers) keeps the
// existing handlers→registry import direction intact — registry must not
// import handlers.
//
// MUST be best-effort + non-blocking: the hook itself dispatches the
// capture on its own goroutine with its own timeout, so the sweep loop
// is never slowed or blocked by a hung EIC tunnel on the dead box.
var BootFailureRescueHook func(workspaceID, instanceID, reason string)
// StartProvisioningTimeoutSweep periodically scans for workspaces stuck in
// `status='provisioning'` past the timeout window, flips them to `failed`,
// and broadcasts a WORKSPACE_PROVISION_TIMEOUT event so the canvas can
@@ -161,7 +144,7 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
// flight, not historical) and the partial index on status keeps
// it fast.
rows, err := db.DB.QueryContext(ctx, `
SELECT id, COALESCE(runtime, ''), COALESCE(instance_id, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int
SELECT id, COALESCE(runtime, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int
FROM workspaces
WHERE status = 'provisioning'
`)
@@ -172,15 +155,14 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
defer rows.Close()
type candidate struct {
id string
runtime string
instanceID string
ageSec int
id string
runtime string
ageSec int
}
var ids []candidate
for rows.Next() {
var c candidate
if err := rows.Scan(&c.id, &c.runtime, &c.instanceID, &c.ageSec); err == nil {
if err := rows.Scan(&c.id, &c.runtime, &c.ageSec); err == nil {
ids = append(ids, c)
}
}
@@ -218,19 +200,6 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
continue
}
log.Printf("Provision-timeout sweep: %s (runtime=%q) stuck in provisioning > %s — marked failed", c.id, c.runtime, timeout)
// RFC internal#742 Part 2: this flip is a boot-failure verdict.
// The instance is still running (the CP reaps it shortly after);
// capture a forensic rescue bundle off it NOW, before teardown.
// Best-effort + non-blocking — the hook dispatches on its own
// goroutine + timeout, so a hung EIC tunnel on the dead box can't
// slow the sweep. Only fires on a real flip (affected==1), never
// on a race (affected==0) or a non-overdue row — guaranteeing it
// runs once per boot-failure verdict and never on a healthy row.
if BootFailureRescueHook != nil {
BootFailureRescueHook(c.id, c.instanceID, "provision_timeout_sweep")
}
// Emit as WORKSPACE_PROVISION_FAILED, not _TIMEOUT, because the
// canvas event handler only flips node state on the _FAILED case.
// A separate event type was considered but the UI reaction is
@@ -1,130 +0,0 @@
package registry
import (
"context"
"sync"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/DATA-DOG/go-sqlmock"
)
// rescueHookRecorder captures the args of every BootFailureRescueHook
// invocation so tests can assert the rescue capture fires exactly on the
// boot-failure verdict — and never on a healthy/raced row.
type rescueHookRecorder struct {
mu sync.Mutex
calls [][3]string // {workspaceID, instanceID, reason}
}
func (r *rescueHookRecorder) hook() func(workspaceID, instanceID, reason string) {
return func(workspaceID, instanceID, reason string) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, [3]string{workspaceID, instanceID, reason})
}
}
func (r *rescueHookRecorder) count() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.calls)
}
// withRescueHook installs a recorder as the package-level
// BootFailureRescueHook for the test's duration.
func withRescueHook(t *testing.T) *rescueHookRecorder {
t.Helper()
rec := &rescueHookRecorder{}
prev := BootFailureRescueHook
BootFailureRescueHook = rec.hook()
t.Cleanup(func() { BootFailureRescueHook = prev })
return rec
}
// TestSweep_RescueFiresOnBootFailureVerdict — the core RFC internal#742
// assertion: when the sweep flips a stuck workspace to `failed`, the
// rescue hook fires once with the workspace + instance id and the
// provision_timeout_sweep reason, BEFORE teardown.
func TestSweep_RescueFiresOnBootFailureVerdict(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-0badf00d", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 1 {
t.Fatalf("rescue hook should fire once on a boot-failure flip, got %d", rec.count())
}
got := rec.calls[0]
if got[0] != "ws-stuck" || got[1] != "i-0badf00d" || got[2] != "provision_timeout_sweep" {
t.Errorf("rescue hook args = %v, want {ws-stuck i-0badf00d provision_timeout_sweep}", got)
}
}
// TestSweep_RescueDoesNotFireOnRace — affected==0 means the row raced to
// online/restart between SELECT and UPDATE. That is NOT a boot-failure
// verdict, so the rescue capture must NOT fire (we'd be snapshotting a
// healthy box that's about to come online).
func TestSweep_RescueDoesNotFireOnRace(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-raced", "codex", "i-raced", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 0)) // raced — 0 rows
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 0 {
t.Errorf("rescue hook must NOT fire on a raced flip (affected==0), got %d calls", rec.count())
}
}
// TestSweep_RescueDoesNotFireOnHealthyRow — a not-yet-overdue row is
// never flipped, so the rescue capture must not fire. Guards against the
// hook being attached above the age gate.
func TestSweep_RescueDoesNotFireOnHealthyRow(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
// hermes at 11 min (660s) < 30 min hermes budget → not overdue, no flip.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-healthy", "hermes", "i-healthy", 660}))
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 0 {
t.Errorf("rescue hook must NOT fire on a non-overdue (healthy) row, got %d calls", rec.count())
}
}
// TestSweep_RescueNilHookIsSafe — on a deploy where the hook is unwired
// (self-hosted / no rescue shipping), the sweep must still flip + emit
// without panicking on the nil hook.
func TestSweep_RescueNilHookIsSafe(t *testing.T) {
mock := setupTestDB(t)
prev := BootFailureRescueHook
BootFailureRescueHook = nil
t.Cleanup(func() { BootFailureRescueHook = prev })
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-x", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit, nil) // must not panic
if emit.count() != 1 {
t.Errorf("flip+emit must still happen with a nil rescue hook, got %d events", emit.count())
}
}
@@ -7,8 +7,8 @@ import (
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/DATA-DOG/go-sqlmock"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
)
// fakeEmitter records every RecordAndBroadcast call so tests can assert
@@ -42,15 +42,12 @@ func (f *fakeEmitter) count() int {
return len(f.events)
}
// candidateRows builds the query result (id, runtime, instance_id,
// age_sec). instance_id was added for the RFC internal#742 rescue hook —
// it rides alongside runtime so the boot-failure capture can reach the
// still-running box. Tests that don't care about the rescue path pass
// "" for instance_id. Use this in every sweep test to match the SELECT.
func candidateRows(rows ...[4]any) *sqlmock.Rows {
r := sqlmock.NewRows([]string{"id", "runtime", "instance_id", "age_sec"})
// candidateRows builds the new-shape query result (id, runtime, age_sec).
// Use this in every sweep test to match the runtime-aware SELECT.
func candidateRows(rows ...[3]any) *sqlmock.Rows {
r := sqlmock.NewRows([]string{"id", "runtime", "age_sec"})
for _, row := range rows {
r = r.AddRow(row[0], row[1], row[2], row[3])
r = r.AddRow(row[0], row[1], row[2])
}
return r
}
@@ -61,8 +58,8 @@ func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) {
mock := setupTestDB(t)
// claude-code workspace, 700s old > 600s default timeout → flipped.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
@@ -95,8 +92,8 @@ func TestSweepStuckProvisioning_HermesGets30MinSlack(t *testing.T) {
// 11 min = 660 sec. < HermesProvisioningTimeout (1800s).
// No UPDATE should fire — hermes still has time.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-hermes-booting", "hermes", "i-h1", 660}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-hermes-booting", "hermes", 660}))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit, nil)
@@ -117,8 +114,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) {
mock := setupTestDB(t)
// 31 min = 1860 sec > HermesProvisioningTimeout (1800s).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-hermes-stuck", "hermes", "i-h2", 1860}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-hermes-stuck", "hermes", 1860}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-hermes-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -153,8 +150,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) {
func TestSweepStuckProvisioning_ManifestOverrideSparesRow(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-claude-templated", "claude-code", "i-ct", 660}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-claude-templated", "claude-code", 660}))
// No ExpectExec — if the sweeper still flips the row, sqlmock will
// fail with an unexpected-query error.
@@ -186,8 +183,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin
mock := setupTestDB(t)
// 21 min = 1260s > 1200s manifest override → flipped.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-claude-truly-stuck", "claude-code", "i-cts", 1260}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-claude-truly-stuck", "claude-code", 1260}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-claude-truly-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -224,8 +221,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin
func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-raced", "claude-code", "i-raced", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-raced", "claude-code", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
@@ -247,7 +244,7 @@ func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows())
emit := &fakeEmitter{}
@@ -268,10 +265,10 @@ func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows(
[4]any{"ws-claude-code", "claude-code", "i-cc", 700},
[4]any{"ws-hermes", "hermes", "i-hh", 1860},
[3]any{"ws-claude-code", "claude-code", 700},
[3]any{"ws-hermes", "hermes", 1860},
))
mock.ExpectExec(`UPDATE workspaces`).
@@ -295,8 +292,8 @@ func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
func TestSweepStuckProvisioning_BroadcastFailureDoesNotCrash(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
-321
View File
@@ -1,321 +0,0 @@
// Package rescue captures a fixed post-mortem "rescue bundle" off a
// workspace EC2 whose boot FAILED — before the platform's sweeper /
// control-plane reaps the instance — and ships it to obs/Loki so a
// wedged workspace (e.g. the codex provider-derivation failure that
// motivated RFC internal#742) is inspectable instead of an
// uninspectable wall.
//
// Design constraints (RFC internal#742, Part 2):
//
// - BEST-EFFORT + NON-BLOCKING. Capture MUST NOT change boot-failure
// semantics or add latency to the failure path. Callers fire
// Capture in its own goroutine; Capture additionally bounds itself
// with CaptureTimeout so a hung EIC tunnel can't wedge the
// goroutine forever.
// - FIRES ON THE BOOT-FAILURE VERDICT ONLY. The two hook points are
// the provision-timeout sweep (registry.sweepStuckProvisioning) and
// the out-of-band bootstrap-watcher signal
// (handlers.WorkspaceHandler.BootstrapFailed). Normal teardown /
// deprovision / recreate / billing-suspend / hibernate paths do NOT
// call Capture — see the RFC's path enumeration.
// - REDACT BEFORE ANYTHING LEAVES THE BOX. Every collected section is
// run through the injected Redact func (wired to the existing
// handlers.redactSecrets secret-scan) before it is shipped. Raw
// tokens/keys never reach Loki.
//
// The package is a LEAF: it imports only internal/audit (the obs
// shipper) so it can be called from both handlers and registry without
// an import cycle (registry must not import handlers). The two heavy
// dependencies — the EIC/SSH remote-command runner and the redactor —
// are injected as package-level func vars, wired once at boot from the
// handlers package (which owns withEICTunnel + redactSecrets). Tests
// swap them for fakes.
package rescue
import (
"context"
"fmt"
"log"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/audit"
)
// CaptureTimeout bounds the whole bundle collection. The sweeper runs
// every 30s and the CP reap follows the failure verdict; 45s gives the
// EIC dance (~3-5s) plus six short remote commands (<2s each) generous
// headroom while still finishing well before the instance is torn down.
// Distinct from the per-op eicFileOpTimeout so a slow box that already
// failed to boot can't hang the capture goroutine indefinitely.
const CaptureTimeout = 45 * time.Second
// LokiKind is the Loki stream label value that tags every rescue
// record. Queryable as `kind="rescue"` (RFC internal#742 §Loki labels).
const LokiKind = "rescue"
// RescueVolumeGrace is how long a boot-failed workspace's /configs data
// volume (and its still-running instance) must be RETAINED past the
// boot-failure verdict so a live rescue read is possible — distinct from
// the user-requested prune path (cp#415), which is an explicit erase.
//
// In molecule-core (the tenant platform) the boot-failure verdict only
// flips workspaces.status to `failed`; it never issues a terminate. The
// platform's two reapers (registry.StartCPOrphanSweeper +
// handlers deprovision) act ONLY on status='removed', so a `failed`
// workspace's instance + /configs volume are retained here by
// construction — see TestCPSweepOnce_DoesNotReapFailedWorkspace. The
// time-bounded reap of the failed instance is the control plane's
// bootstrap-watcher concern; this constant is the SSOT for the grace
// the CP must honour (24h covers an operator's next-business-day
// post-mortem without leaking the volume indefinitely).
const RescueVolumeGrace = 24 * time.Hour
// rescueEventType is the audit event_type carried in the shipped
// record. The obs shipper (internal/audit) already maps event_type to a
// low-cardinality Loki label; "rescue.bundle" keeps the rescue stream
// trivially filterable alongside the existing audit taxonomy.
const rescueEventType = "rescue.bundle"
// RunRemote runs a single shell command on the still-running (but
// unconfigured) workspace EC2 over EIC/SSH and returns its combined
// output. Wired at boot to the handlers EIC runner
// (rescueRunRemoteViaEIC). nil until wired — Capture degrades to a
// logged no-op rather than panicking, so an operator who hasn't wired
// the hook still gets a clear signal instead of a crash on the failure
// path.
var RunRemote func(ctx context.Context, instanceID, command string) (string, error)
// Redact scrubs secret-shaped substrings from a collected section
// before it leaves the box. Wired at boot to handlers.redactSecrets.
// nil until wired — Capture refuses to ship un-redacted content if the
// redactor is missing (fails closed: logs + aborts rather than leaking
// raw config).
var Redact func(workspaceID, content string) string
// section is one labelled chunk of the rescue bundle: a human-readable
// name + the remote command that produces it.
type section struct {
name string
command string
}
// bundleSections is the FIXED set collected on every boot-failure
// rescue (RFC internal#742 §Build.1). Order is the post-mortem reading
// order: config first, then boot logs, then container state, then the
// resolved model/provider env that drove the codex derivation failure.
//
// - /configs/config.yaml + system-prompt.md: the managed config the
// runtime booted against (redacted; system-prompt can embed keys).
// - cloud-init-output.log tail: the user-data execution trace — where
// a wedged boot actually died.
// - docker ps -a: container state (did the agent container even
// start, exit-code, restart loop).
// - agent container logs: the runtime's own stderr (the codex
// provider-derivation panic lives here).
// - MODEL|PROVIDER|RUNTIME env: the resolved routing that motivated
// the RFC. `sudo cat` of the container env via docker inspect-style
// grep — see the command.
//
// All commands use `sudo -n` (the box's /configs is root-owned; ubuntu
// has passwordless sudo) and swallow missing-target stderr so a section
// that can't be produced ships as a short marker instead of failing the
// whole bundle. Kept as data (not inlined) so the redaction + ship loop
// is uniform and the set is reviewable in one place.
var bundleSections = []section{
{
name: "config.yaml",
command: "sudo -n cat /configs/config.yaml 2>/dev/null || echo '(/configs/config.yaml absent)'",
},
{
name: "system-prompt.md",
command: "sudo -n cat /configs/system-prompt.md 2>/dev/null || echo '(/configs/system-prompt.md absent)'",
},
{
name: "cloud-init-output.log.tail",
command: "sudo -n tail -200 /var/log/cloud-init-output.log 2>/dev/null || echo '(cloud-init-output.log absent)'",
},
{
name: "docker-ps",
command: "sudo -n docker ps -a 2>/dev/null || echo '(docker unavailable)'",
},
{
// The agent container is the first non-infra container; grab the
// most recently created one and tail its logs. `head -1` of
// `docker ps -a -q` is creation-ordered newest-first, which is
// the agent runtime on a workspace box.
name: "agent-container.logs.tail",
command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker logs --tail 200 \"$cid\" 2>&1 || echo '(no agent container)'",
},
{
// Resolved model/provider/runtime env from the agent container.
// `docker inspect` the env array and grep the routing keys. This
// is the field that pinpoints a provider-derivation failure.
name: "model-provider-runtime.env",
command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' \"$cid\" 2>/dev/null | grep -E 'MODEL|PROVIDER|RUNTIME' || echo '(no env)'",
},
}
// Input is the identity of the failed workspace being rescued.
type Input struct {
InstanceID string // EC2 instance id of the still-running failed box
WorkspaceID string
OrgID string
// Reason is a short tag for WHY the rescue fired (e.g.
// "provision_timeout_sweep" or "bootstrap_watcher") — carried into
// the Loki record so an operator can correlate the bundle with the
// failure verdict that triggered it.
Reason string
}
// Section is one labelled, already-redacted chunk of the persisted
// rescue bundle. It mirrors what ship() emits to Loki per-section, but
// is the unit the queryable store (and the read endpoint) returns.
// `Redacted` is false only for collection-failure markers (the section
// command couldn't run); true sections passed through the secret-scan.
type Section struct {
Name string `json:"name"`
Content string `json:"content"`
Redacted bool `json:"redacted"`
}
// Bundle is the full, already-redacted post-mortem bundle for ONE
// boot-failure capture — the unit persisted to the queryable store on
// capture (RFC internal#742 Part 3) and served by
// GET /workspaces/:id/rescue. Sections are in fixed reading order
// (config → boot logs → container state → resolved routing env).
type Bundle struct {
WorkspaceID string `json:"workspace_id"`
OrgID string `json:"org_id"`
InstanceID string `json:"instance_id"`
Reason string `json:"reason"`
Sections []Section `json:"sections"`
}
// PersistBundle writes the fully-collected, already-redacted bundle to
// the queryable per-tenant store (rescue_bundles table) so the rescue
// READ endpoint can serve it without obs/Loki read creds (RFC
// internal#742 Part 3 read-path decision — see the migration header).
//
// Wired at boot from the handlers package (which owns db.DB) to keep
// internal/rescue a leaf: it must NOT import internal/db, or registry —
// which imports rescue — would inherit a db dependency it can call
// without a cycle, but more importantly the leaf stays trivially
// testable with a fake. nil until wired: a capture with no store wired
// still ships to Loki (Part 2 behavior preserved) and logs that it
// skipped the DB persist, rather than failing the capture.
var PersistBundle func(ctx context.Context, b Bundle) error
// Capture collects the fixed rescue bundle off the failed instance,
// redacts each section, and ships it to Loki under
// {kind="rescue", org=<OrgID>, workspace_id=<WorkspaceID>}.
//
// BEST-EFFORT: every failure mode (missing wiring, EIC error, a single
// section that won't collect) is logged and does NOT propagate — Capture
// never returns an error and never panics, so the boot-failure handling
// at the call site is unaffected. The caller is expected to invoke this
// in its own goroutine; Capture additionally self-bounds with
// CaptureTimeout.
func Capture(ctx context.Context, in Input) {
defer func() {
// A logging helper on the failure path must never take the
// process down. Recover defensively — the redactor / shipper are
// injected and a future mis-wire shouldn't crash the sweeper.
if r := recover(); r != nil {
log.Printf("rescue: capture panicked for ws=%s instance=%s: %v", in.WorkspaceID, in.InstanceID, r)
}
}()
if in.InstanceID == "" {
// No live box to read — nothing to rescue (e.g. failure before
// any EC2 was launched). Not an error; just skip.
log.Printf("rescue: skip ws=%s — no instance_id (nothing to capture)", in.WorkspaceID)
return
}
if RunRemote == nil {
log.Printf("rescue: skip ws=%s instance=%s — RunRemote not wired (best-effort no-op)", in.WorkspaceID, in.InstanceID)
return
}
if Redact == nil {
// Fail CLOSED: without a redactor we could leak raw tokens to
// Loki. Abort rather than ship unredacted.
log.Printf("rescue: ABORT ws=%s instance=%s — Redact not wired; refusing to ship un-redacted bundle", in.WorkspaceID, in.InstanceID)
return
}
ctx, cancel := context.WithTimeout(ctx, CaptureTimeout)
defer cancel()
log.Printf("rescue: capturing bundle ws=%s instance=%s reason=%s", in.WorkspaceID, in.InstanceID, in.Reason)
collected := 0
// Accumulate the per-section result alongside shipping each to Loki,
// so the same already-redacted content is persisted to the queryable
// store as one bundle row after the loop. Shipping stays per-section
// (Part 2 Loki behavior unchanged); persistence is the single
// bundle the read endpoint serves.
bundle := Bundle{
WorkspaceID: in.WorkspaceID,
OrgID: in.OrgID,
InstanceID: in.InstanceID,
Reason: in.Reason,
Sections: make([]Section, 0, len(bundleSections)),
}
for _, sec := range bundleSections {
raw, err := RunRemote(ctx, in.InstanceID, sec.command)
if err != nil {
// One section failing (e.g. ssh blip mid-collection) must not
// abort the rest — ship a marker for it and continue.
log.Printf("rescue: section %q failed for ws=%s: %v", sec.name, in.WorkspaceID, err)
marker := fmt.Sprintf("(rescue: section collection failed: %v)", err)
ship(ctx, in, sec.name, marker, false)
bundle.Sections = append(bundle.Sections, Section{Name: sec.name, Content: marker, Redacted: false})
continue
}
redacted := Redact(in.WorkspaceID, raw)
ship(ctx, in, sec.name, redacted, true)
bundle.Sections = append(bundle.Sections, Section{Name: sec.name, Content: redacted, Redacted: true})
collected++
}
log.Printf("rescue: shipped %d/%d sections ws=%s instance=%s kind=%s", collected, len(bundleSections), in.WorkspaceID, in.InstanceID, LokiKind)
// Persist the redacted bundle to the queryable store so the rescue
// READ endpoint can serve it without obs/Loki read creds. Best-effort
// and last: a persist failure (or no store wired) must NOT undo the
// Loki ship that already succeeded, and never panics the failure path.
persistBundle(ctx, bundle)
}
// persistBundle writes the collected bundle to the queryable store if a
// store is wired. Best-effort: a nil store (operator hasn't wired the
// READ path) or a DB error is logged and swallowed — the Loki ship is
// the durable cross-tenant copy, and the failure path must never be
// disturbed by the post-mortem read store.
func persistBundle(ctx context.Context, b Bundle) {
if PersistBundle == nil {
log.Printf("rescue: store not wired — bundle for ws=%s shipped to Loki only (no queryable copy)", b.WorkspaceID)
return
}
if err := PersistBundle(ctx, b); err != nil {
log.Printf("rescue: persist bundle for ws=%s failed (shipped to Loki regardless): %v", b.WorkspaceID, err)
}
}
// ship emits one rescue section to Loki via the audit shipper. The
// org / workspace_id / kind ride in the record body (queryable via
// LogQL `| json`); event_type ("rescue.bundle") is the low-cardinality
// Loki label the shipper already promotes. `redacted` records whether
// the content passed through the secret-scan, so an operator can tell a
// shipped-but-redacted section from a collection-failure marker.
func ship(ctx context.Context, in Input, name, content string, redacted bool) {
audit.Emit(ctx, rescueEventType, map[string]any{
"kind": LokiKind,
"org": in.OrgID,
"workspace_id": in.WorkspaceID,
"instance_id": in.InstanceID,
"reason": in.Reason,
"section": name,
"redacted": redacted,
"content": content,
})
}
@@ -1,136 +0,0 @@
package rescue
// Part 3 coverage: Capture, after collecting + redacting every section,
// persists the bundle exactly once to the queryable store (in addition
// to the per-section Loki ship verified in rescue_test.go).
import (
"context"
"errors"
"strings"
"testing"
)
// withPersist swaps the injected PersistBundle for the test and restores
// it after.
func withPersist(t *testing.T, fn func(ctx context.Context, b Bundle) error) {
t.Helper()
prev := PersistBundle
PersistBundle = fn
t.Cleanup(func() { PersistBundle = prev })
}
// TestCapture_PersistsBundleOnce: the happy path persists one bundle
// carrying every section, with identity + redacted content matching what
// was shipped.
func TestCapture_PersistsBundleOnce(t *testing.T) {
_ = captureLoki(t) // keep Loki transport pointed at a temp file
withFakes(t,
func(_ context.Context, instanceID, cmd string) (string, error) {
return "OUT:" + instanceID, nil
},
func(_ws, c string) string { return "RED:" + c },
)
var persisted []Bundle
withPersist(t, func(_ context.Context, b Bundle) error {
persisted = append(persisted, b)
return nil
})
Capture(context.Background(), Input{
InstanceID: "i-abc",
WorkspaceID: "ws-1",
OrgID: "org-9",
Reason: "provision_timeout_sweep",
})
if len(persisted) != 1 {
t.Fatalf("PersistBundle called %d times, want exactly 1", len(persisted))
}
b := persisted[0]
if b.WorkspaceID != "ws-1" || b.OrgID != "org-9" || b.InstanceID != "i-abc" || b.Reason != "provision_timeout_sweep" {
t.Errorf("bundle identity wrong: %+v", b)
}
if len(b.Sections) != len(bundleSections) {
t.Fatalf("persisted %d sections, want %d", len(b.Sections), len(bundleSections))
}
for _, s := range b.Sections {
if !s.Redacted {
t.Errorf("section %q persisted with redacted=false on the happy path", s.Name)
}
// Redactor ("RED:" prefix) must have run on persisted content.
if !strings.HasPrefix(s.Content, "RED:") {
t.Errorf("section %q persisted un-redacted content: %q", s.Name, s.Content)
}
}
}
// TestCapture_PersistFailureDoesNotPanic: a store error is swallowed —
// Capture still completes (the Loki ship already succeeded).
func TestCapture_PersistFailureDoesNotPanic(t *testing.T) {
_ = captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) { return "ok", nil },
func(_ws, c string) string { return c },
)
withPersist(t, func(_ context.Context, _ Bundle) error {
return errors.New("db down")
})
// Must not panic / must return normally.
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"})
}
// TestCapture_NoPersistWiredIsSafe: with PersistBundle unwired (operator
// hasn't wired the read path), Capture still ships to Loki and does not
// panic.
func TestCapture_NoPersistWiredIsSafe(t *testing.T) {
readLoki := captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) { return "ok", nil },
func(_ws, c string) string { return c },
)
prev := PersistBundle
PersistBundle = nil
t.Cleanup(func() { PersistBundle = prev })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-3", OrgID: "o"})
// Loki ship still happened for every section.
if recs := readLoki(); len(recs) != len(bundleSections) {
t.Errorf("shipped %d records, want %d (Loki unaffected by missing store)", len(recs), len(bundleSections))
}
}
// TestCapture_FailureMarkerPersistedAsNonRedacted: a section whose
// collection fails is persisted with redacted=false + a marker, matching
// the Loki record.
func TestCapture_FailureMarkerPersistedAsNonRedacted(t *testing.T) {
_ = captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, cmd string) (string, error) {
if strings.Contains(cmd, "config.yaml") {
return "", errors.New("ssh blip")
}
return "ok", nil
},
func(_ws, c string) string { return c },
)
var got Bundle
withPersist(t, func(_ context.Context, b Bundle) error { got = b; return nil })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"})
var markers int
for _, s := range got.Sections {
if !s.Redacted {
markers++
if !strings.Contains(s.Content, "section collection failed") {
t.Errorf("non-redacted section %q content = %q, want a failure marker", s.Name, s.Content)
}
}
}
if markers != 1 {
t.Errorf("want exactly 1 failure marker persisted, got %d", markers)
}
}
@@ -1,226 +0,0 @@
package rescue
import (
"context"
"encoding/json"
"errors"
"os"
"path/filepath"
"strings"
"testing"
)
// withFakes swaps the injected RunRemote + Redact for the duration of a
// test and restores them after. Mirrors the provisioner test-fake
// pattern (package-var swap + t.Cleanup).
func withFakes(t *testing.T, run func(ctx context.Context, instanceID, cmd string) (string, error), redact func(ws, c string) string) {
t.Helper()
prevRun, prevRedact := RunRemote, Redact
RunRemote = run
Redact = redact
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
}
// captureLoki points the audit shipper at a temp JSONL file and returns
// a reader that decodes the records the rescue ship() loop wrote. This
// is the same transport the production rescue stream uses (audit.Emit →
// Loki via the tenant Vector source), so asserting on it proves the
// shipper-reuse + labels end to end.
func captureLoki(t *testing.T) func() []map[string]any {
t.Helper()
dir := t.TempDir()
path := filepath.Join(dir, "audit.jsonl")
t.Setenv("MOLECULE_AUDIT_LOG_PATH", path)
return func() []map[string]any {
b, err := os.ReadFile(path)
if err != nil {
return nil
}
var out []map[string]any
for _, line := range strings.Split(strings.TrimSpace(string(b)), "\n") {
if line == "" {
continue
}
var rec map[string]any
if err := json.Unmarshal([]byte(line), &rec); err != nil {
t.Fatalf("bad audit jsonl line %q: %v", line, err)
}
out = append(out, rec)
}
return out
}
}
func fields(rec map[string]any) map[string]any {
f, _ := rec["fields"].(map[string]any)
return f
}
// TestCapture_ShipsAllSectionsWithRescueLabels is the happy path: a
// boot-failure capture collects every fixed section, runs each through
// the redactor, and ships it to Loki under {kind="rescue", org, ws}.
func TestCapture_ShipsAllSectionsWithRescueLabels(t *testing.T) {
readLoki := captureLoki(t)
var seenCmds []string
withFakes(t,
func(_ context.Context, instanceID, cmd string) (string, error) {
seenCmds = append(seenCmds, cmd)
return "OUTPUT for " + instanceID, nil
},
func(_ws, c string) string { return c }, // identity redactor
)
Capture(context.Background(), Input{
InstanceID: "i-abc123",
WorkspaceID: "ws-1",
OrgID: "org-9",
Reason: "provision_timeout_sweep",
})
recs := readLoki()
if len(recs) != len(bundleSections) {
t.Fatalf("want %d shipped sections, got %d", len(bundleSections), len(recs))
}
if len(seenCmds) != len(bundleSections) {
t.Fatalf("want %d remote commands run, got %d", len(bundleSections), len(seenCmds))
}
for _, rec := range recs {
if rec["event_type"] != rescueEventType {
t.Errorf("event_type = %v, want %q", rec["event_type"], rescueEventType)
}
// workspace_id is promoted to the top-level record position by
// the audit shipper.
if rec["workspace_id"] != "ws-1" {
t.Errorf("top-level workspace_id = %v, want ws-1", rec["workspace_id"])
}
f := fields(rec)
if f["kind"] != LokiKind {
t.Errorf("kind = %v, want %q", f["kind"], LokiKind)
}
if f["org"] != "org-9" {
t.Errorf("org = %v, want org-9", f["org"])
}
if f["instance_id"] != "i-abc123" {
t.Errorf("instance_id = %v, want i-abc123", f["instance_id"])
}
if f["redacted"] != true {
t.Errorf("redacted = %v, want true for a collected section", f["redacted"])
}
}
}
// TestCapture_Redacts proves the bundle is scrubbed before it leaves the
// box: a remote section that contains a secret-shaped token must ship
// with the token replaced, never raw.
func TestCapture_Redacts(t *testing.T) {
readLoki := captureLoki(t)
const secret = "sk-ant-SUPERSECRETTOKENVALUE0001"
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) {
return "ANTHROPIC_API_KEY=" + secret, nil
},
// redactor that mangles anything containing the secret shape
func(_ws, c string) string {
if strings.Contains(c, secret) {
return strings.ReplaceAll(c, secret, "[REDACTED]")
}
return c
},
)
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"})
for _, rec := range readLoki() {
content, _ := fields(rec)["content"].(string)
if strings.Contains(content, secret) {
t.Fatalf("raw secret leaked to Loki in section %v: %q", fields(rec)["section"], content)
}
}
}
// TestCapture_SkipsWhenNoInstance: a failure with no provisioned EC2 has
// nothing to read — Capture must no-op (ship nothing) rather than dial a
// blank instance id.
func TestCapture_SkipsWhenNoInstance(t *testing.T) {
readLoki := captureLoki(t)
called := false
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) { called = true; return "", nil },
func(_ws, c string) string { return c },
)
Capture(context.Background(), Input{InstanceID: "", WorkspaceID: "ws-3", OrgID: "o"})
if called {
t.Error("RunRemote called for an empty instance id")
}
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records for an empty instance id, want 0", len(recs))
}
}
// TestCapture_FailsClosedWithoutRedactor: if the redactor is not wired,
// Capture must NOT ship anything (would leak raw config). Fail closed.
func TestCapture_FailsClosedWithoutRedactor(t *testing.T) {
readLoki := captureLoki(t)
prevRun, prevRedact := RunRemote, Redact
RunRemote = func(_ context.Context, _ string, _ string) (string, error) { return "raw config", nil }
Redact = nil
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"})
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records without a redactor wired, want 0 (fail closed)", len(recs))
}
}
// TestCapture_SectionFailureIsIsolated: one section's RunRemote error
// must not abort the rest — the failing section ships a marker and the
// others still ship.
func TestCapture_SectionFailureIsIsolated(t *testing.T) {
readLoki := captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, cmd string) (string, error) {
if strings.Contains(cmd, "config.yaml") {
return "", errors.New("ssh blip")
}
return "ok", nil
},
func(_ws, c string) string { return c },
)
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-5", OrgID: "o"})
recs := readLoki()
if len(recs) != len(bundleSections) {
t.Fatalf("want all %d sections shipped (incl. failure marker), got %d", len(bundleSections), len(recs))
}
var failureMarkers int
for _, rec := range recs {
if fields(rec)["redacted"] == false {
failureMarkers++
content, _ := fields(rec)["content"].(string)
if !strings.Contains(content, "section collection failed") {
t.Errorf("failure marker content = %q, want a collection-failed marker", content)
}
}
}
if failureMarkers != 1 {
t.Errorf("want exactly 1 failure marker, got %d", failureMarkers)
}
}
// TestCapture_NoWiringIsSafeNoOp: with RunRemote unwired (operator hasn't
// called the boot wiring), Capture must be a logged no-op, never a panic.
func TestCapture_NoWiringIsSafeNoOp(t *testing.T) {
readLoki := captureLoki(t)
prevRun, prevRedact := RunRemote, Redact
RunRemote = nil
Redact = func(_ws, c string) string { return c }
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-6", OrgID: "o"})
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records with RunRemote unwired, want 0", len(recs))
}
}
@@ -1,153 +0,0 @@
// Package rescuestore is the queryable persistence layer for rescue
// bundles (RFC internal#742 Part 3). It is the DB side of the read-path
// decision: because internal/audit (Part 2's ship transport) is
// Loki-only and tenants hold no obs read creds, the redacted bundle is
// ALSO written here on capture so GET /workspaces/:id/rescue can serve
// the latest one with a plain Postgres read.
//
// The package owns both the write (Persist, wired into
// rescue.PersistBundle at boot) and the read (GetLatest, used by the
// handler). It depends on internal/db and internal/rescue (for the
// Bundle/Section types); it is imported by handlers, never by the leaf
// internal/rescue or by registry — so no import cycle.
package rescuestore
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
)
// maxSectionBytes bounds a single persisted section's content so a
// pathological capture (e.g. a multi-megabyte container log) can't bloat
// the row or the read response. Capture already tails to ~200 lines per
// section, so this is a backstop, not the primary limit. Truncated
// content is suffixed with a marker so a reader knows it was clipped.
const maxSectionBytes = 64 * 1024 // 64 KiB per section
// truncationMarker is appended to any section clipped at maxSectionBytes.
const truncationMarker = "\n…(rescue: section truncated at 64KiB)"
// StoredBundle is a persisted bundle plus its capture timestamp (the DB
// assigns captured_at on write). The handler maps this to the read
// response shape.
type StoredBundle struct {
Bundle rescue.Bundle
CapturedAt time.Time
}
// Store is the read/write surface the handler and the capture wiring
// depend on. An interface so the handler test can fake it without a
// sqlmock; the production implementation is Postgres.
type Store interface {
// Persist writes one bundle row (captured_at = now()).
Persist(ctx context.Context, b rescue.Bundle) error
// GetLatest returns the most recent bundle for workspaceID. When
// orgID is non-empty the row must also match org_id (cross-org
// defense-in-depth behind TenantGuard). Returns (nil, nil) — NOT an
// error — when no bundle exists, so the handler can 404 cleanly.
GetLatest(ctx context.Context, workspaceID, orgID string) (*StoredBundle, error)
}
// Postgres is the production Store backed by the rescue_bundles table.
type Postgres struct{ db *sql.DB }
// NewPostgres builds a Postgres-backed store over the given handle.
func NewPostgres(db *sql.DB) *Postgres { return &Postgres{db: db} }
// Persist writes the bundle as one row. Sections are stored as JSONB.
// Each section's content is clamped to maxSectionBytes before write.
func (p *Postgres) Persist(ctx context.Context, b rescue.Bundle) error {
if p.db == nil {
return fmt.Errorf("rescuestore: nil db")
}
clamped := clampSections(b.Sections)
payload, err := json.Marshal(clamped)
if err != nil {
return fmt.Errorf("rescuestore: marshal sections: %w", err)
}
_, err = p.db.ExecContext(ctx,
`INSERT INTO rescue_bundles (workspace_id, org_id, instance_id, reason, sections)
VALUES ($1, $2, $3, $4, $5::jsonb)`,
b.WorkspaceID, b.OrgID, b.InstanceID, b.Reason, string(payload),
)
if err != nil {
return fmt.Errorf("rescuestore: insert: %w", err)
}
return nil
}
// GetLatest returns the newest bundle for workspaceID, org-scoped. The
// (workspace_id, captured_at DESC, id DESC) index serves this directly.
// sql.ErrNoRows maps to (nil, nil) so the handler 404s.
func (p *Postgres) GetLatest(ctx context.Context, workspaceID, orgID string) (*StoredBundle, error) {
if p.db == nil {
return nil, fmt.Errorf("rescuestore: nil db")
}
if orgID == "" {
return nil, fmt.Errorf("rescuestore: org_id required")
}
var (
instanceID string
reason string
capturedAt time.Time
sectionsRaw []byte
)
err := p.db.QueryRowContext(ctx,
`SELECT instance_id, reason, captured_at, sections
FROM rescue_bundles
WHERE workspace_id = $1
AND org_id = $2
ORDER BY captured_at DESC, id DESC
LIMIT 1`,
workspaceID, orgID,
).Scan(&instanceID, &reason, &capturedAt, &sectionsRaw)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("rescuestore: query latest: %w", err)
}
var sections []rescue.Section
if len(sectionsRaw) > 0 {
if err := json.Unmarshal(sectionsRaw, &sections); err != nil {
return nil, fmt.Errorf("rescuestore: unmarshal sections: %w", err)
}
}
return &StoredBundle{
Bundle: rescue.Bundle{
WorkspaceID: workspaceID,
OrgID: orgID,
InstanceID: instanceID,
Reason: reason,
Sections: sections,
},
CapturedAt: capturedAt,
}, nil
}
// clampSections returns a copy with each section's content clamped to
// maxSectionBytes. Clamps on a rune boundary so the marker doesn't split
// a multibyte sequence — the content is a forensic blob, never parsed.
func clampSections(in []rescue.Section) []rescue.Section {
out := make([]rescue.Section, len(in))
for i, s := range in {
if len(s.Content) > maxSectionBytes {
b := []byte(s.Content[:maxSectionBytes])
// Back off to a valid utf-8 boundary (at most 3 bytes).
for len(b) > 0 && b[len(b)-1]&0xC0 == 0x80 {
b = b[:len(b)-1]
}
s.Content = string(b) + truncationMarker
}
out[i] = s
}
return out
}
@@ -1,211 +0,0 @@
package rescuestore
// Sqlmock-backed coverage for the rescue_bundles store (RFC internal#742
// Part 3). Exercises Persist (incl. section clamp) + GetLatest (happy
// path, no-rows→nil, org-scoping, query error) without a real DB.
import (
"context"
"database/sql"
"encoding/json"
"errors"
"regexp"
"strings"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"github.com/DATA-DOG/go-sqlmock"
)
func newMock(t *testing.T) (*sql.DB, sqlmock.Sqlmock) {
t.Helper()
dbh, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
t.Cleanup(func() { _ = dbh.Close() })
return dbh, mock
}
func sampleBundle() rescue.Bundle {
return rescue.Bundle{
WorkspaceID: "ws-1",
OrgID: "org-9",
InstanceID: "i-abc",
Reason: "bootstrap_watcher",
Sections: []rescue.Section{
{Name: "config.yaml", Content: "model: gpt-4", Redacted: true},
{Name: "docker-ps", Content: "(no agent container)", Redacted: false},
},
}
}
// TestPersist_InsertsRow asserts Persist issues one INSERT with the
// bundle fields and a JSON sections payload.
func TestPersist_InsertsRow(t *testing.T) {
dbh, mock := newMock(t)
b := sampleBundle()
mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO rescue_bundles`)).
WithArgs("ws-1", "org-9", "i-abc", "bootstrap_watcher", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
if err := NewPostgres(dbh).Persist(context.Background(), b); err != nil {
t.Fatalf("Persist: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestClampSections: a section over maxSectionBytes is truncated +
// marker-suffixed; a small section is untouched.
func TestClampSections(t *testing.T) {
huge := strings.Repeat("x", maxSectionBytes+5000)
in := []rescue.Section{
{Name: "container.logs", Content: huge, Redacted: true},
{Name: "small", Content: "ok", Redacted: true},
}
out := clampSections(in)
if len(out[0].Content) > maxSectionBytes+len(truncationMarker) {
t.Errorf("clamped content len = %d, want <= %d", len(out[0].Content), maxSectionBytes+len(truncationMarker))
}
if !strings.HasSuffix(out[0].Content, truncationMarker) {
t.Error("clamped section missing truncation marker suffix")
}
if out[1].Content != "ok" {
t.Errorf("small section was modified: %q", out[1].Content)
}
}
// TestPersist_WritesClampedPayload: Persist marshals the clamped
// sections into the JSONB arg (the INSERT carries the truncation marker).
func TestPersist_WritesClampedPayload(t *testing.T) {
dbh, mock := newMock(t)
huge := strings.Repeat("x", maxSectionBytes+5000)
b := rescue.Bundle{
WorkspaceID: "ws-1",
Sections: []rescue.Section{{Name: "container.logs", Content: huge, Redacted: true}},
}
want, _ := json.Marshal(clampSections(b.Sections))
mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO rescue_bundles`)).
WithArgs("ws-1", "", "", "", string(want)).
WillReturnResult(sqlmock.NewResult(1, 1))
if err := NewPostgres(dbh).Persist(context.Background(), b); err != nil {
t.Fatalf("Persist: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet: %v", err)
}
}
// TestGetLatest_ReturnsBundle: a found row decodes back into the bundle.
func TestGetLatest_ReturnsBundle(t *testing.T) {
dbh, mock := newMock(t)
ts := time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC)
secs, _ := json.Marshal([]rescue.Section{
{Name: "config.yaml", Content: "redacted", Redacted: true},
})
mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)).
WithArgs("ws-1", "org-9").
WillReturnRows(sqlmock.NewRows([]string{"instance_id", "reason", "captured_at", "sections"}).
AddRow("i-abc", "bootstrap_watcher", ts, secs))
got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-9")
if err != nil {
t.Fatalf("GetLatest: %v", err)
}
if got == nil {
t.Fatal("got nil, want a bundle")
}
if !got.CapturedAt.Equal(ts) {
t.Errorf("captured_at = %v, want %v", got.CapturedAt, ts)
}
if got.Bundle.InstanceID != "i-abc" || got.Bundle.Reason != "bootstrap_watcher" {
t.Errorf("bundle meta wrong: %+v", got.Bundle)
}
if len(got.Bundle.Sections) != 1 || got.Bundle.Sections[0].Name != "config.yaml" {
t.Errorf("sections decoded wrong: %+v", got.Bundle.Sections)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet: %v", err)
}
}
// TestGetLatest_NoRowsReturnsNil: no bundle → (nil, nil), so the handler
// can 404 without treating it as an error.
func TestGetLatest_NoRowsReturnsNil(t *testing.T) {
dbh, mock := newMock(t)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)).
WithArgs("ws-none", "org-9").
WillReturnError(sql.ErrNoRows)
got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-none", "org-9")
if err != nil {
t.Fatalf("GetLatest err = %v, want nil for no-rows", err)
}
if got != nil {
t.Fatalf("got %+v, want nil for no-rows", got)
}
}
// TestGetLatest_OrgScopingArg: the org id is passed as the $2 filter arg
// with strict equality, so a row in a sibling org is excluded by the query
// itself. A mismatched org → no row → nil (same as no-rows).
func TestGetLatest_OrgScopingArg(t *testing.T) {
dbh, mock := newMock(t)
// Tenant org-B asks for ws-1 (owned by org-9). The strict predicate
// filters it out → ErrNoRows → nil.
mock.ExpectQuery(regexp.QuoteMeta(`AND org_id = $2`)).
WithArgs("ws-1", "org-B").
WillReturnError(sql.ErrNoRows)
got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-B")
if err != nil {
t.Fatalf("GetLatest: %v", err)
}
if got != nil {
t.Fatal("sibling-org read returned a bundle; want nil")
}
}
// TestGetLatest_EmptyOrgIDRejected: an empty orgID must fail closed with
// an error rather than disabling the org filter (#2020).
func TestGetLatest_EmptyOrgIDRejected(t *testing.T) {
dbh, _ := newMock(t)
_, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "")
if err == nil {
t.Fatal("GetLatest(empty orgID) should error")
}
}
// TestGetLatest_QueryErrorPropagates: a real DB error (not ErrNoRows)
// surfaces as an error so the handler returns 503, not a false 404.
func TestGetLatest_QueryErrorPropagates(t *testing.T) {
dbh, mock := newMock(t)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)).
WithArgs("ws-1", "org-9").
WillReturnError(errors.New("connection reset"))
_, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-9")
if err == nil {
t.Fatal("want an error for a non-ErrNoRows DB failure")
}
}
// TestNilDB: both methods return an error (never panic) when the db
// handle is nil — the degraded-boot guard the wiring relies on.
func TestNilDB(t *testing.T) {
p := NewPostgres(nil)
if err := p.Persist(context.Background(), sampleBundle()); err == nil {
t.Error("Persist(nil db) should error")
}
if _, err := p.GetLatest(context.Background(), "ws-1", "org-9"); err == nil {
t.Error("GetLatest(nil db) should error")
}
}
@@ -703,14 +703,6 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAuth.PUT("/files/*path", tmplh.WriteFile)
wsAuth.DELETE("/files/*path", tmplh.DeleteFile)
// Rescue read (RFC internal#742 Part 3) — latest post-mortem bundle
// for a boot-failed/terminated workspace, so "why won't my agent
// boot" is answerable without a live instance. Same WorkspaceAuth
// gate as /files/*; the handler org-scopes the store read by
// MOLECULE_ORG_ID so a sibling org cannot read another org's bundle.
rescueReadH := handlers.NewRescueReadHandler()
wsAuth.GET("/rescue", rescueReadH.GetRescue)
// Chat attachments — file upload (user → agent) and binary-safe
// streaming download (agent → user). Namespaced under /chat/ so
// the security model is obviously distinct from /files/* (which
@@ -0,0 +1,558 @@
//go:build integration
// +build integration
// scheduler_integration_test.go — REAL Postgres integration tests for the
// workspace-server cron scheduler firing loop. Regression coverage for
// molecule-core issue #2149 (filed under SOP rule internal#765).
//
// Run with:
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// # apply every migration up.sql / legacy .sql in lexicographic order
// for f in $(ls workspace-server/migrations/*.sql | grep -v '\.down\.sql$' | sort); do \
// psql "postgres://postgres:test@localhost:55432/molecule?sslmode=disable" -f "$f"; done
// cd workspace-server
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/scheduler/ -run '^TestIntegration_'
//
// CI: .gitea/workflows/handlers-postgres-integration.yml runs these on every
// PR/push that touches workspace-server/internal/scheduler/ (the
// `handlers-postgres` detect-changes profile was extended to include the
// scheduler package + this workflow file).
//
// Why these are NOT the existing sqlmock unit tests (scheduler_test.go)
// --------------------------------------------------------------------
// The strict-sqlmock unit tests pin which SQL statements fire — fast, no DB.
// But sqlmock CANNOT validate:
// - the activity_logs `$3::jsonb` cast (#2026 wedge) — sqlmock never parses
// the payload, so an invalid-UTF-8 jsonb body that wedges a real INSERT
// looks "green" under mock.ExpectExec(`INSERT INTO activity_logs`).
// - the ROW STATE after tick()/fireSchedule run: that last_run_at,
// next_run_at, run_count, last_status actually landed on the row.
// - sweepPhantomBusy's NOT IN (SELECT … activity_logs) subquery semantics
// against real rows — it has no unit test at all (#2149).
//
// A SQL regression here = a fleet-wide silent cron outage (#85 ran 12h before
// detection). These tests boot a real Postgres, insert real rows, run the
// production tick()/sweepPhantomBusy, and SELECT the rows back to assert the
// observable end state — the gap sqlmock structurally cannot cover.
//
// Watch-fail intent: each test is written to FAIL on a regression of the
// behavior under test (e.g. drop the activity_logs INSERT, drop the
// write-back UPDATE, drop the UTF-8 sanitize, or break the phantom-busy
// subquery) and to PASS against the current-correct scheduler.go.
package scheduler
import (
"context"
"database/sql"
"os"
"testing"
"time"
mdb "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
_ "github.com/lib/pq"
)
// ── test doubles ──────────────────────────────────────────────────────────
// recordingProxy is an A2AProxy that records each fire and returns a
// configurable response. Used to assert that tick()/fireSchedule actually
// reached the A2A boundary for the due schedule.
type recordingProxy struct {
status int
body []byte
err error
fires int
lastBody []byte
lastCaller string
lastLogFlag bool
lastWSID string
}
func (p *recordingProxy) ProxyA2ARequest(
_ context.Context, workspaceID string, body []byte, callerID string, logActivity bool,
) (int, []byte, error) {
p.fires++
p.lastWSID = workspaceID
p.lastBody = body
p.lastCaller = callerID
p.lastLogFlag = logActivity
if p.err != nil {
return 0, nil, p.err
}
return p.status, p.body, nil
}
// ── connection + fixture helpers ──────────────────────────────────────────
// integrationDB returns the configured integration-test connection or skips
// the test if INTEGRATION_DB_URL is unset. Hot-swaps the package-level
// mdb.DB so the production scheduler helpers (tick, fireSchedule,
// sweepPhantomBusy) operate on this connection; restores it via t.Cleanup.
//
// NOT SAFE FOR t.Parallel(): the package-global swap races across tests.
func integrationDB(t *testing.T) *sql.DB {
t.Helper()
url := os.Getenv("INTEGRATION_DB_URL")
if url == "" {
t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)")
}
conn, err := sql.Open("postgres", url)
if err != nil {
t.Fatalf("open: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := conn.PingContext(ctx); err != nil {
t.Fatalf("ping: %v", err)
}
// Clean slate. activity_logs + workspace_schedules cascade off workspaces,
// but we DELETE explicitly (and in FK order) so a partial prior run can't
// leave orphan rows that perturb the next test's assertions.
cctx, ccancel := context.WithTimeout(context.Background(), 10*time.Second)
defer ccancel()
for _, q := range []string{
`DELETE FROM activity_logs`,
`DELETE FROM workspace_schedules`,
`DELETE FROM workspaces`,
} {
if _, err := conn.ExecContext(cctx, q); err != nil {
t.Fatalf("cleanup %q: %v", q, err)
}
}
prev := mdb.DB
mdb.DB = conn
t.Cleanup(func() {
mdb.DB = prev
conn.Close()
})
return conn
}
// insertWorkspace inserts a workspace row and returns its UUID. active is the
// initial active_tasks value; status defaults to 'online' (valid workspace_status enum).
func insertWorkspace(t *testing.T, conn *sql.DB, name string, active int) string {
t.Helper()
var id string
err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks)
VALUES ($1, 'online', $2, 1)
RETURNING id
`, name, active).Scan(&id)
if err != nil {
t.Fatalf("insertWorkspace(%s): %v", name, err)
}
return id
}
// insertSchedule inserts an enabled workspace_schedules row whose next_run_at
// is in the past (so tick() picks it up immediately) and returns its UUID.
func insertSchedule(t *testing.T, conn *sql.DB, wsID, name, cronExpr, prompt string) string {
t.Helper()
var id string
err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspace_schedules
(workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at, source)
VALUES ($1, $2, $3, 'UTC', $4, true, now() - interval '1 minute', 'runtime')
RETURNING id
`, wsID, name, cronExpr, prompt).Scan(&id)
if err != nil {
t.Fatalf("insertSchedule(%s): %v", name, err)
}
return id
}
type scheduleState struct {
lastRunAt sql.NullTime
nextRunAt sql.NullTime
runCount int
lastStatus string
lastError string
}
func readScheduleState(t *testing.T, conn *sql.DB, id string) scheduleState {
t.Helper()
var st scheduleState
var status, errStr sql.NullString
err := conn.QueryRowContext(context.Background(), `
SELECT last_run_at, next_run_at, run_count, last_status, last_error
FROM workspace_schedules WHERE id = $1
`, id).Scan(&st.lastRunAt, &st.nextRunAt, &st.runCount, &status, &errStr)
if err != nil {
t.Fatalf("readScheduleState(%s): %v", id, err)
}
st.lastStatus = status.String
st.lastError = errStr.String
return st
}
// ── TestIntegration_TickFiresAndWritesBack (#2149 core) ───────────────────
//
// Insert one due schedule, run tick() once, and assert the full firing
// loop landed against a REAL Postgres:
// - the A2A proxy was invoked exactly once for the schedule's workspace
// - the post-fire UPDATE wrote last_run_at (was NULL), advanced next_run_at
// into the future, bumped run_count to 1, set last_status='ok'
// - a cron_run activity_logs row was inserted with VALID jsonb request_body
// (the `$3::jsonb` cast #2026 path) carrying the schedule metadata
//
// Regression watch-fail: if a refactor drops the write-back UPDATE, the
// activity_logs INSERT, or breaks the jsonb cast, this test fails where every
// sqlmock unit test stays green.
func TestIntegration_TickFiresAndWritesBack(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "cron-fire-ws", 0)
schedID := insertSchedule(t, conn, wsID, "hourly-audit", "0 * * * *", "run the hourly audit")
proxy := &recordingProxy{
status: 200,
body: []byte(`{"jsonrpc":"2.0","result":{"kind":"message","parts":[{"kind":"text","text":"done"}]},"id":"1"}`),
}
s := New(proxy, nil)
s.tick(context.Background())
// 1. A2A boundary reached exactly once for the right workspace.
if proxy.fires != 1 {
t.Fatalf("proxy fires = %d, want 1 (tick must fire the one due schedule)", proxy.fires)
}
if proxy.lastWSID != wsID {
t.Errorf("proxy fired for workspace %q, want %q", proxy.lastWSID, wsID)
}
// Empty callerID = canvas-style (bypasses access control); logActivity=true.
if proxy.lastCaller != "" {
t.Errorf("callerID = %q, want empty (canvas-style scheduler fire)", proxy.lastCaller)
}
if !proxy.lastLogFlag {
t.Error("logActivity flag = false, want true")
}
// 2. Row write-back.
st := readScheduleState(t, conn, schedID)
if !st.lastRunAt.Valid {
t.Error("last_run_at is NULL after fire, want set (write-back UPDATE did not land)")
}
if !st.nextRunAt.Valid {
t.Fatal("next_run_at is NULL after fire, want a future timestamp")
}
if !st.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at = %v, want a time in the future (schedule would tight-loop otherwise)", st.nextRunAt.Time)
}
if st.runCount != 1 {
t.Errorf("run_count = %d, want 1", st.runCount)
}
if st.lastStatus != "ok" {
t.Errorf("last_status = %q, want \"ok\"", st.lastStatus)
}
// 3. activity_logs cron_run row with valid jsonb request_body.
var actCount int
var summary, status string
var reqBody []byte
err := conn.QueryRowContext(context.Background(), `
SELECT count(*) OVER (), summary, status, request_body
FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run'
LIMIT 1
`, wsID).Scan(&actCount, &summary, &status, &reqBody)
if err == sql.ErrNoRows {
t.Fatal("no cron_run activity_logs row inserted after fire (#152/#2026 path missing)")
}
if err != nil {
t.Fatalf("read activity_logs: %v", err)
}
if actCount != 1 {
t.Errorf("cron_run activity_logs rows = %d, want 1", actCount)
}
if status != "ok" {
t.Errorf("activity_logs.status = %q, want \"ok\"", status)
}
// request_body must be valid jsonb carrying the schedule_id — proves the
// `$3::jsonb` cast accepted the payload (the #2026 wedge surface).
var sid string
if err := conn.QueryRowContext(context.Background(), `
SELECT request_body->>'schedule_id'
FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&sid); err != nil {
t.Fatalf("request_body is not queryable jsonb: %v", err)
}
if sid != schedID {
t.Errorf("activity_logs request_body->>'schedule_id' = %q, want %q", sid, schedID)
}
}
// ── TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb (#2026 / #2149) ────
//
// The agent-editable prompt can carry raw invalid-UTF-8 bytes. Postgres jsonb
// columns REJECT invalid UTF-8, which (pre-#2026) wedged the activity_logs
// INSERT and held the transaction open — stalling the whole scheduler.
// fireSchedule now sanitizeUTF8()s every string before the `$3::jsonb` insert.
//
// Postgres TEXT columns (workspace_schedules.prompt) also reject invalid UTF-8
// in a UTF-8 database, so we cannot INSERT the bad bytes through the fixture.
// Instead we insert a valid prompt, then call fireSchedule directly with a
// scheduleRow whose Prompt field contains the invalid bytes — this simulates
// the real regression path (e.g. truncation splitting a multi-byte rune, or
// an agent-edited template arriving via a path that bypasses DB validation).
//
// Assertions:
// - the fire still completed (write-back UPDATE landed)
// - the cron_run activity_logs row was inserted (the jsonb cast accepted
// the SANITIZED payload — the INSERT did not wedge)
// - the stored request_body is queryable jsonb (valid UTF-8 on disk)
//
// Watch-fail: remove the sanitizeUTF8() wrapping around the jsonb payload and
// this test fails on a real Postgres (INSERT errors / row absent), while the
// sqlmock unit test that only checks "an INSERT fired" stays green.
func TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "utf8-ws", 0)
// Insert with valid UTF-8 — Postgres TEXT rejects 0x80/0xff.
schedID := insertSchedule(t, conn, wsID, "utf8-job", "0 * * * *", "valid prompt")
// Prompt with invalid UTF-8: orphan continuation byte + bare 0xff.
badPrompt := "audit \x80 report \xff end"
row := scheduleRow{
ID: schedID,
WorkspaceID: wsID,
Name: "utf8-job",
CronExpr: "0 * * * *",
Timezone: "UTC",
Prompt: badPrompt,
}
proxy := &recordingProxy{
status: 200,
body: []byte(`{"result":{"kind":"message","parts":[{"kind":"text","text":"ok"}]}}`),
}
s := New(proxy, nil)
s.fireSchedule(context.Background(), row)
if proxy.fires != 1 {
t.Fatalf("proxy fires = %d, want 1", proxy.fires)
}
// Write-back must have landed despite the bad prompt bytes.
st := readScheduleState(t, conn, schedID)
if st.runCount != 1 || st.lastStatus != "ok" {
t.Errorf("post-fire state run_count=%d last_status=%q, want 1/\"ok\" "+
"(invalid-UTF-8 prompt must not block the fire)", st.runCount, st.lastStatus)
}
// The cron_run activity_logs row MUST exist — proving the `$3::jsonb`
// INSERT accepted the sanitized payload (did not wedge on invalid UTF-8).
var n int
if err := conn.QueryRowContext(context.Background(), `
SELECT count(*) FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run'
`, wsID).Scan(&n); err != nil {
t.Fatalf("count cron_run rows: %v", err)
}
if n != 1 {
t.Fatalf("cron_run activity_logs rows = %d, want 1 — the jsonb INSERT wedged "+
"on invalid UTF-8 (the #2026 regression)", n)
}
// The stored prompt inside request_body must be queryable + valid UTF-8.
var storedPrompt string
if err := conn.QueryRowContext(context.Background(), `
SELECT request_body->>'prompt'
FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&storedPrompt); err != nil {
t.Fatalf("request_body->>'prompt' not queryable jsonb: %v", err)
}
if storedPrompt == "" {
t.Error("stored prompt is empty, want the sanitized prompt text")
}
// Round-trip through Postgres jsonb guarantees valid UTF-8; assert the
// replacement character replaced the bad bytes rather than them surviving.
for i := 0; i < len(storedPrompt); i++ {
if storedPrompt[i] == 0x80 || storedPrompt[i] == 0xff {
t.Fatalf("stored prompt still contains raw invalid byte 0x%x at %d", storedPrompt[i], i)
}
}
}
// ── TestIntegration_TickErrorStatusWriteBack (#2149) ──────────────────────
//
// When the A2A proxy returns a transport error, fireSchedule must still write
// back: last_status='error', last_error populated, next_run_at advanced (so
// the schedule does not get stuck re-firing), run_count bumped. Verifies the
// error path persists to a real row, not just that "an UPDATE fired".
func TestIntegration_TickErrorStatusWriteBack(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "err-ws", 0)
schedID := insertSchedule(t, conn, wsID, "err-job", "0 * * * *", "do work")
proxy := &recordingProxy{err: context.DeadlineExceeded}
s := New(proxy, nil)
s.tick(context.Background())
st := readScheduleState(t, conn, schedID)
if st.lastStatus != "error" {
t.Errorf("last_status = %q, want \"error\"", st.lastStatus)
}
if st.lastError == "" {
t.Error("last_error is empty, want the proxy error text persisted (#152)")
}
if st.runCount != 1 {
t.Errorf("run_count = %d, want 1 (run still counted on error)", st.runCount)
}
if !st.nextRunAt.Valid || !st.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at not advanced to future on error path (= %v) — schedule would tight-loop", st.nextRunAt)
}
// The error activity_logs row must carry status='error' + error_detail.
var status, errDetail string
if err := conn.QueryRowContext(context.Background(), `
SELECT status, COALESCE(error_detail,'') FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&status, &errDetail); err != nil {
t.Fatalf("read error activity_logs: %v", err)
}
if status != "error" {
t.Errorf("activity_logs.status = %q, want \"error\"", status)
}
if errDetail == "" {
t.Error("activity_logs.error_detail empty on error fire, want the error message (#152)")
}
}
// ── TestIntegration_SweepPhantomBusy (#2149 — no prior test) ──────────────
//
// sweepPhantomBusy resets active_tasks=0 for workspaces stuck busy with NO
// activity_logs row in the last phantomStaleThreshold window, and must LEAVE
// ALONE workspaces that have recent activity. The NOT IN (SELECT DISTINCT
// workspace_id FROM activity_logs WHERE created_at > now() - interval) subquery
// is exactly the kind of set-semantics that sqlmock cannot validate — there is
// no unit test for this method at all (#2149).
//
// Fixture:
// - phantomWS: active_tasks=3, NO recent activity_log → must reset to 0
// - recentWS: active_tasks=2, activity_log 1 min ago → must stay at 2
// - staleWS: active_tasks=1, activity_log 30 min ago → must reset to 0
// - removedWS: active_tasks=4, status='removed', no activity → must stay (status guard)
// - idleWS: active_tasks=0 → untouched (not >0)
//
// Watch-fail: break the subquery (e.g. drop the status!='removed' guard, or
// invert the NOT IN), and the asserted end-state diverges on a real Postgres.
func TestIntegration_SweepPhantomBusy(t *testing.T) {
conn := integrationDB(t)
phantomWS := insertWorkspace(t, conn, "phantom-ws", 3)
recentWS := insertWorkspace(t, conn, "recent-ws", 2)
staleWS := insertWorkspace(t, conn, "stale-ws", 1)
idleWS := insertWorkspace(t, conn, "idle-ws", 0)
// removedWS: busy but status='removed' — the sweep must skip it.
var removedWS string
if err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks)
VALUES ('removed-ws', 'removed', 4, 1) RETURNING id
`).Scan(&removedWS); err != nil {
t.Fatalf("insert removedWS: %v", err)
}
// recentWS has a fresh activity_log (1 min ago → inside the 10-min window).
if _, err := conn.ExecContext(context.Background(), `
INSERT INTO activity_logs (workspace_id, activity_type, status, created_at)
VALUES ($1, 'a2a_receive', 'ok', now() - interval '1 minute')
`, recentWS); err != nil {
t.Fatalf("insert recent activity_log: %v", err)
}
// staleWS has only an OLD activity_log (30 min ago → outside the window).
if _, err := conn.ExecContext(context.Background(), `
INSERT INTO activity_logs (workspace_id, activity_type, status, created_at)
VALUES ($1, 'a2a_receive', 'ok', now() - interval '30 minutes')
`, staleWS); err != nil {
t.Fatalf("insert stale activity_log: %v", err)
}
s := New(nil, nil)
s.sweepPhantomBusy(context.Background())
active := func(id string) int {
var n int
if err := conn.QueryRowContext(context.Background(),
`SELECT active_tasks FROM workspaces WHERE id = $1`, id).Scan(&n); err != nil {
t.Fatalf("read active_tasks(%s): %v", id, err)
}
return n
}
if got := active(phantomWS); got != 0 {
t.Errorf("phantomWS active_tasks = %d, want 0 (busy + no recent activity → must be swept)", got)
}
if got := active(staleWS); got != 0 {
t.Errorf("staleWS active_tasks = %d, want 0 (only stale activity → must be swept)", got)
}
if got := active(recentWS); got != 2 {
t.Errorf("recentWS active_tasks = %d, want 2 (recent activity → must NOT be swept)", got)
}
if got := active(removedWS); got != 4 {
t.Errorf("removedWS active_tasks = %d, want 4 (status='removed' → sweep must skip it)", got)
}
if got := active(idleWS); got != 0 {
t.Errorf("idleWS active_tasks = %d, want 0 (was never busy)", got)
}
// The swept rows must also have current_task cleared.
var ct string
if err := conn.QueryRowContext(context.Background(),
`SELECT COALESCE(current_task,'') FROM workspaces WHERE id = $1`, phantomWS).Scan(&ct); err != nil {
t.Fatalf("read current_task: %v", err)
}
if ct != "" {
t.Errorf("phantomWS current_task = %q, want empty after sweep", ct)
}
}
// ── TestIntegration_NativeSchedulerSkipAdvancesNextRunAt (#2149) ──────────
//
// When a workspace's adapter owns scheduling natively, tick() must SKIP the
// fire but still advance next_run_at (so the row doesn't tight-loop on every
// poll) — observability (next_run_at) is preserved while the fire is dropped.
// Asserts the native-skip UPDATE landed on a real row and the proxy was NOT
// invoked. This is the native-skip UPDATE path #2149 calls out — sqlmock can
// only assert an UPDATE fired, not that next_run_at moved forward.
func TestIntegration_NativeSchedulerSkipAdvancesNextRunAt(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "native-ws", 0)
schedID := insertSchedule(t, conn, wsID, "native-job", "0 * * * *", "native run")
// Capture the pre-tick next_run_at (it is in the past by construction).
before := readScheduleState(t, conn, schedID)
if !before.nextRunAt.Valid || before.nextRunAt.Time.After(time.Now()) {
t.Fatalf("precondition: next_run_at should start in the past, got %v", before.nextRunAt)
}
proxy := &recordingProxy{status: 200, body: []byte(`{}`)}
s := New(proxy, nil)
// Every workspace reports native scheduling → fire must be skipped.
s.SetNativeSchedulerCheck(func(string) bool { return true })
s.tick(context.Background())
if proxy.fires != 0 {
t.Errorf("proxy fires = %d, want 0 (native-scheduler workspace must NOT fire)", proxy.fires)
}
after := readScheduleState(t, conn, schedID)
if !after.nextRunAt.Valid || !after.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at = %v, want advanced into the future (native-skip UPDATE must still run)", after.nextRunAt)
}
// Skip path does NOT bump run_count or write last_run_at (no fire happened).
if after.runCount != 0 {
t.Errorf("run_count = %d, want 0 (skip must not count as a run)", after.runCount)
}
if after.lastRunAt.Valid {
t.Error("last_run_at set on native-skip, want NULL (no fire occurred)")
}
}
@@ -1,4 +0,0 @@
-- Reverse RFC internal#742 Part 3 rescue_bundles table.
-- Forensic-only table; dropping it loses post-mortem read history but
-- does not affect boot-failure semantics (capture still ships to Loki).
DROP TABLE IF EXISTS rescue_bundles;
@@ -1,59 +0,0 @@
-- 20260531000000_rescue_bundles.up.sql — RFC internal#742 Part 3.
--
-- A queryable, post-mortem-inspectable copy of the rescue bundle that
-- Part 2 (internal/rescue.Capture) collects off a boot-failed workspace
-- EC2 before the control plane reaps it.
--
-- WHY a DB table (the Part 3 read-path decision):
-- Part 2 ships the bundle via internal/audit (audit.Emit), which is
-- stdout→Vector→Loki + a best-effort local JSONL on the tenant
-- container's EPHEMERAL rootfs — NOT a queryable store. Serving
-- GET /workspaces/:id/rescue from Loki would require giving the
-- tenant process a Loki *query* client + obs read creds, which it
-- deliberately does not have (and must not — RFC internal#742 keeps
-- obs read creds out of tenants). So Part 3 ALSO persists the
-- already-redacted bundle to this small per-tenant table on capture,
-- and the read endpoint serves the latest row. The Loki stream
-- remains the cross-tenant operator firehose; this table is the
-- tenant-local, org-scoped read surface that powers the future
-- canvas "Why did this fail?" panel.
--
-- REDACTION: the `sections` payload written here is the SAME content
-- the Loki ship loop emits — i.e. already run through the SAFE-T1201
-- secret-scan (handlers.redactSecrets) at capture time. This table
-- never holds raw tokens; the read endpoint returns the stored content
-- verbatim without re-redacting.
--
-- ORG SCOPING: org_id is denormalized onto the row so the read handler
-- can filter by (workspace_id, org_id) and a row whose org doesn't
-- match the tenant's MOLECULE_ORG_ID is never returned — defense in
-- depth behind TenantGuard (which already 404s cross-org requests at
-- the routing layer).
--
-- RETENTION: bounded by RescueVolumeGrace semantics on the capture
-- side; rows are small (a redacted forensic blob, capped at capture).
-- A future sweeper can prune rows past the grace window — out of scope
-- for Part 3; the table is append-only here.
CREATE TABLE IF NOT EXISTS rescue_bundles (
id BIGSERIAL PRIMARY KEY,
workspace_id TEXT NOT NULL,
org_id TEXT NOT NULL DEFAULT '',
instance_id TEXT NOT NULL DEFAULT '',
reason TEXT NOT NULL DEFAULT '',
captured_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- sections is the ordered, already-redacted bundle:
-- [{ "name": "config.yaml", "content": "...", "redacted": true }, ...]
-- Stored as JSONB so the read handler returns it as a structured map
-- and a future query can index into a single section if needed.
sections JSONB NOT NULL DEFAULT '[]'::jsonb
);
-- Read hot path: "latest bundle for this workspace" — the only query
-- the GET /workspaces/:id/rescue endpoint runs.
-- SELECT ... WHERE workspace_id = $1 [AND org_id = $2]
-- ORDER BY captured_at DESC, id DESC LIMIT 1
-- Partial-free composite index; (workspace_id, captured_at DESC) covers
-- the filter + ordering. id DESC tiebreaks same-timestamp captures.
CREATE INDEX IF NOT EXISTS idx_rescue_bundles_ws_captured
ON rescue_bundles (workspace_id, captured_at DESC, id DESC);