Compare commits

..

4 Commits

Author SHA1 Message Date
Molecule AI Dev Engineer A (Kimi) ea3bc9489f fix(instructions): check rows.Err after iteration
sop-checklist / review-refire (pull_request) Waiting to run
sop-checklist / na-declarations (pull_request) N/A: (none)
audit-force-merge / audit (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Waiting to run
branch-protection drift check / Branch protection drift (pull_request) Waiting to run
cascade-list-drift-gate / check (pull_request) Waiting to run
Check merge_group trigger on required workflows / Required workflows have merge_group trigger (pull_request) Waiting to run
Check migration collisions / Migration version collision check (pull_request) Waiting to run
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Waiting to run
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Waiting to run
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Waiting to run
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Waiting to run
Runtime Pin Compatibility / PyPI-latest install + import smoke (pull_request) Waiting to run
Secret scan / Scan diff for credential-shaped strings (pull_request) Waiting to run
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Waiting to run
gate-check-v3 / gate-check (pull_request) Waiting to run
qa-review / approved (pull_request) Waiting to run
security-review / approved (pull_request) Waiting to run
sop-checklist / all-items-acked (pull_request) Waiting to run
sop-tier-check / tier-check (pull_request) Waiting to run
E2E API Smoke Test / E2E API Smoke Test (pull_request) Has been cancelled
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Has been cancelled
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Has been cancelled
Harness Replays / Harness Replays (pull_request) Has been cancelled
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Has been cancelled
CI / Canvas (Next.js) (pull_request) Has been cancelled
CI / Platform (Go) (pull_request) Has been cancelled
CI / Shellcheck (E2E scripts) (pull_request) Has been cancelled
CI / Canvas Deploy Reminder (pull_request) Has been cancelled
CI / Python Lint & Test (pull_request) Has been cancelled
CI / Detect changes (pull_request) Has been cancelled
E2E API Smoke Test / detect-changes (pull_request) Has been cancelled
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Has been cancelled
Handlers Postgres Integration / detect-changes (pull_request) Has been cancelled
Harness Replays / detect-changes (pull_request) Has been cancelled
Runtime PR-Built Compatibility / detect-changes (pull_request) Has been cancelled
Adds the missing rows.Err() check after the rows.Next() loop in the
instructions list handler, preventing silent ingestion of partial
result sets on mid-iteration DB errors.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-31 23:20:07 +00:00
Molecule AI Dev Engineer A (Kimi) 03b1490016 fix(handlers): add rows.Err() checks in delegation, discovery, tokens list handlers
Adds the missing rows.Err() check after rows.Next() iteration in three
list handlers that were missed in the earlier audit:

- DelegationHandler.List
- DiscoveryHandler.queryPeerMaps
- TokenHandler.List

This prevents silent ingestion of partial result sets when a DB
connection error occurs mid-iteration.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-31 23:20:07 +00:00
Molecule AI Dev Engineer A (Kimi) 93146c0cc3 fix(admin-images): add codex to AllRuntimes + use StdEncoding for Docker auth
Two correctness fixes discovered during PR #3029 review:

1. AllRuntimes was missing \"codex\", so admin image refresh and the
   auto-refresh watcher never pulled or recreated codex containers.
   The provisioner's knownRuntimes already included codex; this keeps
   the admin handler consistent.

2. ghcrAuthHeader() used base64.URLEncoding, but Docker's RegistryAuth
   field expects standard base64 (StdEncoding). URL-safe encoding uses
   -_ instead of +/ and omits padding, which the Docker daemon may not
   accept for authenticated GHCR pulls.

Tests updated to decode with StdEncoding.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-31 23:20:07 +00:00
Hongming Wang e827b76a5c fix(workspace-server): CP orphan sweeper closes deprovision split-write race (#2989)
The deprovision path marks `workspaces.status='removed'` BEFORE calling
the controlplane DELETE. If that CP call fails (transient 5xx, network
hiccup, AWS provider error), the DB row stays at 'removed' with
`instance_id` populated and there's no retry — the EC2 lives forever.
9 prod orphans accumulated over 3 days under this bug.

Adds a SaaS-mode counterpart to the existing Docker `orphan_sweeper`:
- 60s tick (matches the Docker sweeper cadence)
- LIMIT 100 per cycle so a sustained CP outage drains over multiple
  cycles without blowing the request timeout
- Re-issues `cpProv.Stop` for any workspace at status='removed' with a
  non-NULL `instance_id`. Stop is idempotent (AWS terminate on
  already-terminated is a no-op; CP's Deprovision tolerates already-
  deleted DNS) so retries are safe.
- On Stop success, NULLs `instance_id` so the next cycle skips the row.
- On Stop failure, leaves `instance_id` populated for next cycle.

The existing Docker sweeper is gated on `prov != nil`; the new sweeper
is gated on `cpProv != nil`. SaaS tenants get exactly one of the two,
self-hosted tenants get the Docker one — no overlap.

Why this shape over option A (CP-first ordering) or B (durable outbox):
the existing inline path already returns a loud 500 to the user when
CP fails — the only missing piece is automatic retry, which a 60s
sweeper provides without protocol changes, new tables, or new workers.
~30 LOC of production code vs. ~400 for an outbox. RFC discussion in
#2989 comment chain.

Tests:
- 9 unit tests covering happy path, Stop failure, UPDATE failure,
  multiple orphans (one-fails-others-still-process), DB query error,
  nil-DB defense, nil-reaper short-circuit, and the boot-immediate-then-
  tick cadence contract.
- Mutation-tested: status='running' substitution and removed-UPDATE-
  block both fail at least one test.

Out of scope:
- Backfilling the 9 named orphans — they'll heal automatically on the
  first sweep cycle after this lands; no manual cleanup needed.
- Long-term durable-outbox architecture — separate RFC.
2026-05-31 23:20:07 +00:00
23 changed files with 481 additions and 170 deletions
+2 -2
View File
@@ -1,7 +1,7 @@
name: Block internal-flavored paths
# Hard CI gate. Internal content (positioning, competitive briefs, sales
# playbooks, PMM/press drip, draft campaigns) lives in molecule-ai/internal —
# playbooks, PMM/press drip, draft campaigns) lives in Molecule-AI/internal —
# this public monorepo must never re-acquire those paths. CEO directive
# 2026-04-23 after a fleet-wide audit found 79 internal files leaked here.
#
@@ -135,7 +135,7 @@ jobs:
echo "::error::Forbidden internal-flavored paths detected:"
printf "$OFFENDING"
echo ""
echo "These paths belong in molecule-ai/internal, not this public repo."
echo "These paths belong in Molecule-AI/internal, not this public repo."
echo "See docs/internal-content-policy.md for canonical locations."
echo ""
echo "If your file is genuinely public-facing (e.g. a blog post"
+1 -1
View File
@@ -108,7 +108,7 @@ jobs:
echo
echo "One or more canary secrets are unset (\`CANARY_TENANT_URLS\`, \`CANARY_ADMIN_TOKENS\`, \`CANARY_CP_SHARED_SECRET\`)."
echo "Phase 2 canary fleet has not been stood up yet —"
echo "see [canary-tenants.md](https://github.com/molecule-ai/molecule-controlplane/blob/main/docs/canary-tenants.md)."
echo "see [canary-tenants.md](https://github.com/Molecule-AI/molecule-controlplane/blob/main/docs/canary-tenants.md)."
echo
echo "**Skipped — promote-to-latest will NOT auto-fire.** Dispatch \`promote-latest.yml\` manually when ready."
} >> "$GITHUB_STEP_SUMMARY"
+5 -5
View File
@@ -87,7 +87,7 @@ jobs:
run: go mod download
- if: needs.changes.outputs.platform == 'true'
run: go build ./cmd/server
# CLI (molecli) moved to standalone repo: github.com/molecule-ai/molecule-cli
# CLI (molecli) moved to standalone repo: github.com/Molecule-AI/molecule-cli
- if: needs.changes.outputs.platform == 'true'
run: go vet ./... || true
- if: needs.changes.outputs.platform == 'true'
@@ -165,7 +165,7 @@ jobs:
# Strip the package-import prefix so we can match .coverage-allowlist.txt
# entries written as paths relative to workspace-server/.
# Handle both module paths: platform/workspace-server/... and platform/...
rel=$(echo "$file" | sed 's|^github.com/molecule-ai/molecule-monorepo/platform/workspace-server/||; s|^github.com/molecule-ai/molecule-monorepo/platform/||')
rel=$(echo "$file" | sed 's|^github.com/Molecule-AI/molecule-monorepo/platform/workspace-server/||; s|^github.com/Molecule-AI/molecule-monorepo/platform/||')
if echo "$ALLOWLIST" | grep -qxF "$rel"; then
echo "::warning file=workspace-server/$rel::Critical file at ${pct}% coverage (allowlisted, #1823) — fix before expiry."
@@ -243,8 +243,8 @@ jobs:
if-no-files-found: warn
# MCP Server + SDK removed from CI — now in standalone repos:
# - github.com/molecule-ai/molecule-mcp-server (npm CI)
# - github.com/molecule-ai/molecule-sdk-python (PyPI CI)
# - github.com/Molecule-AI/molecule-mcp-server (npm CI)
# - github.com/Molecule-AI/molecule-sdk-python (PyPI CI)
# e2e-api job moved to .github/workflows/e2e-api.yml (issue #458).
# It now has workflow-level concurrency (cancel-in-progress: false) so
@@ -434,5 +434,5 @@ jobs:
fi
# SDK + plugin validation moved to standalone repo:
# github.com/molecule-ai/molecule-sdk-python
# github.com/Molecule-AI/molecule-sdk-python
+1 -1
View File
@@ -62,7 +62,7 @@ jobs:
if: matrix.language == 'go'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
repository: molecule-ai/molecule-ai-plugin-github-app-auth
repository: Molecule-AI/molecule-ai-plugin-github-app-auth
path: molecule-ai-plugin-github-app-auth
token: ${{ secrets.PLUGIN_REPO_PAT || secrets.GITHUB_TOKEN }}
+1 -1
View File
@@ -102,7 +102,7 @@ jobs:
if: needs.detect-changes.outputs.run == 'true'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
repository: molecule-ai/molecule-ai-plugin-github-app-auth
repository: Molecule-AI/molecule-ai-plugin-github-app-auth
path: molecule-ai-plugin-github-app-auth
token: ${{ secrets.PLUGIN_REPO_PAT || secrets.GITHUB_TOKEN }}
+1 -1
View File
@@ -19,4 +19,4 @@ permissions:
jobs:
disable-auto-merge-on-push:
uses: molecule-ai/molecule-ci/.github/workflows/disable-auto-merge-on-push.yml@main
uses: Molecule-AI/molecule-ci/.github/workflows/disable-auto-merge-on-push.yml@main
+3 -3
View File
@@ -25,7 +25,7 @@ name: publish-runtime
# 3. Publishes to PyPI via the PyPA Trusted Publisher action (OIDC).
# No static API token is stored — PyPI verifies the workflow's
# OIDC claim against the trusted-publisher config registered for
# molecule-ai-workspace-runtime (molecule-ai/molecule-core,
# molecule-ai-workspace-runtime (Molecule-AI/molecule-core,
# publish-runtime.yml, environment pypi-publish).
#
# After publish: the 8 template repos pick up the new version on their
@@ -166,7 +166,7 @@ jobs:
- name: Publish to PyPI (Trusted Publisher / OIDC)
# PyPI side is configured: project molecule-ai-workspace-runtime →
# publisher molecule-ai/molecule-core, workflow publish-runtime.yml,
# publisher Molecule-AI/molecule-core, workflow publish-runtime.yml,
# environment pypi-publish. The action mints a short-lived OIDC
# token and exchanges it for a PyPI upload credential — no static
# API token in this repo's secrets.
@@ -342,7 +342,7 @@ jobs:
TEMPLATES="claude-code hermes openclaw codex langgraph crewai autogen deepagents gemini-cli"
FAILED=""
for tpl in $TEMPLATES; do
REPO="molecule-ai/molecule-ai-workspace-template-$tpl"
REPO="Molecule-AI/molecule-ai-workspace-template-$tpl"
STATUS=$(curl -sS -o /tmp/dispatch.out -w "%{http_code}" \
-X POST "https://api.github.com/repos/$REPO/dispatches" \
-H "Authorization: Bearer $DISPATCH_TOKEN" \
@@ -80,12 +80,12 @@ jobs:
#
# Uses a fine-grained PAT (PLUGIN_REPO_PAT) because the plugin repo
# is private and the default GITHUB_TOKEN is scoped to THIS repo.
# The PAT needs Contents:Read on molecule-ai/molecule-ai-plugin-
# The PAT needs Contents:Read on Molecule-AI/molecule-ai-plugin-
# github-app-auth. Falls back to the default token for the (rare)
# case where an operator made the plugin repo public.
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
repository: molecule-ai/molecule-ai-plugin-github-app-auth
repository: Molecule-AI/molecule-ai-plugin-github-app-auth
path: molecule-ai-plugin-github-app-auth
token: ${{ secrets.PLUGIN_REPO_PAT || secrets.GITHUB_TOKEN }}
@@ -9,7 +9,7 @@ name: redeploy-tenants-on-main
#
# This workflow closes the gap by calling the control-plane admin
# endpoint that performs a canary-first, batched, health-gated rolling
# redeploy across every live tenant. Implemented in molecule-ai/
# redeploy across every live tenant. Implemented in Molecule-AI/
# molecule-controlplane as POST /cp/admin/tenants/redeploy-fleet
# (feat/tenant-auto-redeploy, landing alongside this workflow).
#
@@ -146,7 +146,7 @@ jobs:
- name: Call CP redeploy-fleet
# CP_ADMIN_API_TOKEN must be set as a repo/org secret on
# molecule-ai/molecule-core, matching the staging/prod CP's
# Molecule-AI/molecule-core, matching the staging/prod CP's
# CP_ADMIN_API_TOKEN env. Stored in Railway, mirrored to this
# repo's secrets for CI.
env:
@@ -97,7 +97,7 @@ jobs:
- name: Call staging-CP redeploy-fleet
# CP_STAGING_ADMIN_API_TOKEN must be set as a repo/org secret
# on molecule-ai/molecule-core, matching staging-CP's
# on Molecule-AI/molecule-core, matching staging-CP's
# CP_ADMIN_API_TOKEN env var (visible in Railway controlplane
# / staging environment). Stored separately from the prod
# CP_ADMIN_API_TOKEN so a leak of one doesn't auth the other.
@@ -96,7 +96,7 @@ jobs:
--body "$(cat <<'BODY'
[retarget-bot] This PR was opened against `main` and has been retargeted to `staging` automatically.
**Why:** per [SHARED_RULES rule 8](https://github.com/molecule-ai/molecule-ai-org-template-molecule-dev/blob/main/SHARED_RULES.md), all feature work targets `staging` first; the CEO promotes `staging → main` separately.
**Why:** per [SHARED_RULES rule 8](https://github.com/Molecule-AI/molecule-ai-org-template-molecule-dev/blob/main/SHARED_RULES.md), all feature work targets `staging` first; the CEO promotes `staging → main` separately.
**What changed:** just the base branch — no code change. CI will re-run against `staging`. If you get merge conflicts, rebase on `staging`.
+1 -1
View File
@@ -12,7 +12,7 @@ name: Secret scan
#
# jobs:
# secret-scan:
# uses: molecule-ai/molecule-core/.github/workflows/secret-scan.yml@staging
# uses: Molecule-AI/molecule-core/.github/workflows/secret-scan.yml@staging
#
# Pin to @staging not @main — staging is the active default branch,
# main lags via the staging-promotion workflow. Updates ride along
-89
View File
@@ -1,89 +0,0 @@
package main
import "testing"
// TestResolveBindHost pins the precedence: BIND_ADDR explicit > dev-mode
// fail-open default of 127.0.0.1 > production-shape empty (all interfaces).
//
// Mutation-test invariant: removing the IsDevModeFailOpen() branch makes
// "no_bindaddr_devmode_unset_admin" fail (returns "" instead of "127.0.0.1").
// Removing the BIND_ADDR branch makes "explicit_bindaddr_*" cases fail.
func TestResolveBindHost(t *testing.T) {
cases := []struct {
name string
bindAddr string
adminToken string
molEnv string
want string
}{
{
name: "no_bindaddr_devmode_unset_admin",
bindAddr: "",
adminToken: "",
molEnv: "dev",
want: "127.0.0.1",
},
{
name: "no_bindaddr_devmode_unset_admin_full_word",
bindAddr: "",
adminToken: "",
molEnv: "development",
want: "127.0.0.1",
},
{
name: "no_bindaddr_admin_set_in_dev_env",
bindAddr: "",
adminToken: "secret",
molEnv: "dev",
want: "", // ADMIN_TOKEN flips IsDevModeFailOpen to false → all interfaces
},
{
name: "no_bindaddr_production_env",
bindAddr: "",
adminToken: "",
molEnv: "production",
want: "", // production is not a dev value → all interfaces
},
{
name: "no_bindaddr_unset_env",
bindAddr: "",
adminToken: "",
molEnv: "",
want: "", // unset MOLECULE_ENV → not dev → all interfaces
},
{
name: "explicit_bindaddr_loopback_overrides_devmode",
bindAddr: "127.0.0.1",
adminToken: "",
molEnv: "dev",
want: "127.0.0.1",
},
{
name: "explicit_bindaddr_wildcard_overrides_devmode_default",
bindAddr: "0.0.0.0",
adminToken: "",
molEnv: "dev",
want: "0.0.0.0",
},
{
name: "explicit_bindaddr_in_production",
bindAddr: "10.0.5.7",
adminToken: "secret",
molEnv: "production",
want: "10.0.5.7",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Setenv("BIND_ADDR", tc.bindAddr)
t.Setenv("ADMIN_TOKEN", tc.adminToken)
t.Setenv("MOLECULE_ENV", tc.molEnv)
got := resolveBindHost()
if got != tc.want {
t.Errorf("resolveBindHost() = %q, want %q (BIND_ADDR=%q ADMIN_TOKEN=%q MOLECULE_ENV=%q)",
got, tc.want, tc.bindAddr, tc.adminToken, tc.molEnv)
}
})
}
}
+16 -35
View File
@@ -19,7 +19,6 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/imagewatch"
memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
@@ -267,6 +266,19 @@ func main() {
})
}
// CP-mode orphan sweeper — SaaS counterpart to the Docker sweeper
// above. Re-issues cpProv.Stop for any workspace at status='removed'
// with a non-NULL instance_id, healing the deprovision split-write
// race documented in #2989: tenant marks status='removed' BEFORE
// calling CP DELETE, so a transient CP failure leaves the EC2
// running with no retry path. cpProv.Stop is idempotent against
// already-terminated instances; on success we clear instance_id.
if cpProv != nil {
go supervised.RunWithRecover(ctx, "cp-orphan-sweeper", func(c context.Context) {
registry.StartCPOrphanSweeper(c, cpProv)
})
}
// Pending-uploads GC sweep — deletes acked rows past their retention
// window plus unacked rows past expires_at. Without this the
// pending_uploads table grows unbounded; even with the 24h hard TTL,
@@ -338,23 +350,15 @@ func main() {
// Router
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle)
// HTTP server with graceful shutdown.
//
// Bind host: in dev-mode (no ADMIN_TOKEN, MOLECULE_ENV=dev|development)
// the AdminAuth chain fails open by design; pairing that with a wildcard
// bind would expose unauth /workspaces to any same-LAN peer. Default to
// loopback when fail-open is active. Operators who need LAN exposure set
// BIND_ADDR=0.0.0.0 explicitly. Production (ADMIN_TOKEN set) is unchanged.
// See molecule-core#7.
bindHost := resolveBindHost()
// HTTP server with graceful shutdown
srv := &http.Server{
Addr: fmt.Sprintf("%s:%s", bindHost, port),
Addr: fmt.Sprintf(":%s", port),
Handler: r,
}
// Start server in goroutine
go func() {
log.Printf("Platform starting on %s:%s (dev-mode-fail-open=%v)", bindHost, port, middleware.IsDevModeFailOpen())
log.Printf("Platform starting on :%s", port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server failed: %v", err)
}
@@ -389,29 +393,6 @@ func envOr(key, fallback string) string {
return fallback
}
// resolveBindHost picks the listener interface for the HTTP server.
//
// Precedence:
// 1. BIND_ADDR — explicit operator override (any value, including "0.0.0.0").
// 2. dev-mode fail-open active → "127.0.0.1" (loopback only).
// 3. otherwise → "" (Go binds every interface; existing prod/self-host shape).
//
// Coupling the loopback default to middleware.IsDevModeFailOpen() means the
// two safety levers — bind narrowness and auth strength — move together. A
// production deploy (ADMIN_TOKEN set) keeps binding to all interfaces because
// the auth chain is doing its job; a dev Mac (no ADMIN_TOKEN, MOLECULE_ENV=dev)
// is reachable only via loopback because the auth chain is fail-open. See
// molecule-core#7 for the original LAN exposure finding.
func resolveBindHost() string {
if v := os.Getenv("BIND_ADDR"); v != "" {
return v
}
if middleware.IsDevModeFailOpen() {
return "127.0.0.1"
}
return ""
}
func findConfigsDir() string {
candidates := []string{
"workspace-configs-templates",
@@ -45,7 +45,7 @@ func NewWorkspaceImageService(docker *dockerclient.Client) *WorkspaceImageServic
// Update both when a new template is added.
var AllRuntimes = []string{
"claude-code", "langgraph", "crewai", "autogen",
"deepagents", "hermes", "gemini-cli", "openclaw",
"deepagents", "hermes", "gemini-cli", "openclaw", "codex",
}
// RefreshResult is the per-call outcome surfaced to HTTP callers AND logged
@@ -91,7 +91,7 @@ func ghcrAuthHeader() string {
log.Printf("workspace-images: failed to marshal GHCR auth: %v", err)
return ""
}
return base64.URLEncoding.EncodeToString(js)
return base64.StdEncoding.EncodeToString(js)
}
// Refresh pulls the requested runtimes' template images from GHCR and (if
@@ -191,7 +191,7 @@ func (s *WorkspaceImageService) Refresh(ctx context.Context, runtimes []string,
// AdminWorkspaceImagesHandler serves POST /admin/workspace-images/refresh.
//
// ?runtime=claude-code (optional; default = all 8 templates)
// ?runtime=claude-code (optional; default = all 9 templates)
// &recreate=true|false (default true; false = pull only)
//
// Returns JSON {pulled: [...], failed: [...], recreated: [...]}
@@ -35,9 +35,9 @@ func TestGHCRAuthHeader_EncodesDockerEnginePayload(t *testing.T) {
if got == "" {
t.Fatal("expected non-empty auth header")
}
raw, err := base64.URLEncoding.DecodeString(got)
raw, err := base64.StdEncoding.DecodeString(got)
if err != nil {
t.Fatalf("auth header is not valid base64-url: %v", err)
t.Fatalf("auth header is not valid base64: %v", err)
}
var payload map[string]string
if err := json.Unmarshal(raw, &payload); err != nil {
@@ -61,7 +61,7 @@ func TestGHCRAuthHeader_TrimsWhitespace(t *testing.T) {
t.Setenv("GHCR_USER", " alice ")
t.Setenv("GHCR_TOKEN", "\tfake-tok-value\n")
got := ghcrAuthHeader()
raw, _ := base64.URLEncoding.DecodeString(got)
raw, _ := base64.StdEncoding.DecodeString(got)
var payload map[string]string
_ = json.Unmarshal(raw, &payload)
if payload["username"] != "alice" {
@@ -631,6 +631,11 @@ func (h *DelegationHandler) ListDelegations(c *gin.Context) {
}
delegations = append(delegations, entry)
}
if err := rows.Err(); err != nil {
log.Printf("delegation list rows error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
if delegations == nil {
delegations = []map[string]interface{}{}
@@ -348,6 +348,10 @@ func queryPeerMaps(query string, args ...interface{}) ([]map[string]interface{},
result = append(result, peer)
}
if err := rows.Err(); err != nil {
log.Printf("queryPeerMaps rows error: %v", err)
return nil, err
}
return result, nil
}
@@ -248,6 +248,11 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
b.WriteString(content)
b.WriteString("\n\n")
}
if err := rows.Err(); err != nil {
log.Printf("Instructions list rows error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
c.JSON(http.StatusOK, gin.H{
"workspace_id": workspaceID,
@@ -31,25 +31,11 @@ import (
// tests pin the helper's three observable behaviors plus an AST gate
// that catches future re-introductions of the un-checked INSERT.
// lookupChildSQLRE anchors the sqlmock ExpectQuery on every load-bearing
// token of lookupExistingChild's SELECT (org_import.go:639-645). A loose
// substring match (the prior shape, just `SELECT id FROM workspaces`)
// would silent-pass a regression that drops `IS NOT DISTINCT FROM`
// (breaks NULL-parent matching), drops `parent_id` entirely (hijacks
// siblings of the same name across different parents), or drops the
// `status != 'removed'` filter (blocks re-import after Collapse).
// RFC #2872 Important-2.
//
// The four anchored tokens are exactly the predicates the bug shapes
// would tamper with. Whitespace is `\s+` so a future formatter pass
// doesn't churn this string.
const lookupChildSQLRE = `(?s)SELECT id FROM workspaces\s+WHERE name = \$1\s+AND parent_id IS NOT DISTINCT FROM \$2\s+AND status != 'removed'`
func TestLookupExistingChild_NotFound_ReturnsFalseNoError(t *testing.T) {
mock := setupTestDB(t)
// 0-row result → driver returns sql.ErrNoRows on Scan.
parent := "parent-1"
mock.ExpectQuery(lookupChildSQLRE).
mock.ExpectQuery(`SELECT id FROM workspaces`).
WithArgs("Alpha", &parent).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
@@ -70,7 +56,7 @@ func TestLookupExistingChild_NotFound_ReturnsFalseNoError(t *testing.T) {
func TestLookupExistingChild_Found_ReturnsIDAndTrue(t *testing.T) {
mock := setupTestDB(t)
parent := "parent-1"
mock.ExpectQuery(lookupChildSQLRE).
mock.ExpectQuery(`SELECT id FROM workspaces`).
WithArgs("Alpha", &parent).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-existing-uuid"))
@@ -93,7 +79,7 @@ func TestLookupExistingChild_NilParent_MatchesRoot(t *testing.T) {
// a plain `=` would never match a NULL row. Pin that roots
// (parent_id=NULL) are still found by the lookup.
mock := setupTestDB(t)
mock.ExpectQuery(lookupChildSQLRE).
mock.ExpectQuery(`SELECT id FROM workspaces`).
WithArgs("RootAgent", (*string)(nil)).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-root-uuid"))
@@ -116,7 +102,7 @@ func TestLookupExistingChild_DBError_Propagates(t *testing.T) {
mock := setupTestDB(t)
parent := "parent-1"
connFail := errors.New("simulated postgres unavailable")
mock.ExpectQuery(lookupChildSQLRE).
mock.ExpectQuery(`SELECT id FROM workspaces`).
WithArgs("Alpha", &parent).
WillReturnError(connFail)
@@ -151,7 +137,7 @@ func TestLookupExistingChild_WrappedNoRows_TreatedAsNotFound(t *testing.T) {
mock := setupTestDB(t)
parent := "parent-1"
wrapped := fmt.Errorf("driver-wrapped: %w", sql.ErrNoRows)
mock.ExpectQuery(lookupChildSQLRE).
mock.ExpectQuery(`SELECT id FROM workspaces`).
WithArgs("Alpha", &parent).
WillReturnError(wrapped)
@@ -67,6 +67,10 @@ func (h *TokenHandler) List(c *gin.Context) {
}
tokens = append(tokens, t)
}
if err := rows.Err(); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list tokens"})
return
}
c.JSON(http.StatusOK, gin.H{
"tokens": tokens,
@@ -0,0 +1,149 @@
package registry
// cp_orphan_sweeper.go — SaaS-mode counterpart to orphan_sweeper.go.
//
// The Docker sweeper (StartOrphanSweeper) runs only when prov != nil
// (single-tenant Docker mode); SaaS tenants run cpProv != nil and prov
// == nil, so they get no sweep coverage from that path. This file fills
// the gap for the deprovision split-write race documented in #2989:
//
// 1. handlers/workspace_crud.go:365 marks workspaces.status = 'removed'.
// 2. workspace_crud.go:439 calls StopWorkspaceAuto → cpProv.Stop, which
// issues DELETE /cp/workspaces/:id?instance_id=… to controlplane.
// 3. If step 2 fails (CP transient 5xx, network blip, AWS hiccup), the
// inline path returns a 500 to the canvas — but the DB row is already
// at status='removed' with instance_id still populated. There's no
// retry, and the EC2 lives forever.
//
// This sweeper closes that gap by re-issuing cpProv.Stop on every cycle
// for any workspace at status='removed' with a non-NULL instance_id.
// Stop is idempotent: AWS TerminateInstance on an already-terminated
// instance is a no-op (per AWS docs), and CP's Deprovision handler
// (controlplane/internal/handlers/workspace_provision.go:289) handles
// the already-terminated and already-deleted-DNS cases via best-effort
// guards. On Stop success, the sweeper clears instance_id so the next
// cycle skips the row.
//
// Cadence + safety filters mirror the Docker sweeper:
// - 60s tick (OrphanSweepInterval)
// - 30s per-cycle deadline (orphanSweepDeadline)
// - LIMIT 100 per cycle so a sustained CP outage that backs up many
// orphans doesn't blow the request timeout; subsequent cycles drain.
//
// SSOT note: Stop's idempotency (no-op on empty instance_id, AWS
// terminate on already-terminated) is the load-bearing invariant. Any
// future change that adds non-idempotent side effects to cpProv.Stop
// must also gate this sweeper, or it will re-execute those side effects
// every 60s for every cleared-but-not-yet-NULL row.
import (
"context"
"log"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// CPOrphanReaper is the dependency the SaaS-mode sweeper takes from
// the CP provisioner. *provisioner.CPProvisioner satisfies this
// naturally; tests inject fakes.
type CPOrphanReaper interface {
Stop(ctx context.Context, workspaceID string) error
}
// cpSweepLimit caps the per-cycle row count so a sustained CP outage
// can't make a single sweep cycle blow orphanSweepDeadline. With a
// 60s cadence and 100-row limit, drain rate is up to 100 orphans/min,
// which has never been approached even during the worst leak windows.
const cpSweepLimit = 100
// StartCPOrphanSweeper runs the SaaS-mode reconcile loop until ctx is
// cancelled. nil reaper makes the loop a no-op (matches the Docker
// sweeper's nil-tolerant pattern).
//
// Caller is expected to gate on `cpProv != nil` (matching how
// StartOrphanSweeper is gated on `prov != nil` at the call site in
// cmd/server/main.go) — passing a nil *CPProvisioner here would also
// short-circuit but the gate at the wiring site keeps the call shape
// symmetric across the two sweepers.
func StartCPOrphanSweeper(ctx context.Context, reaper CPOrphanReaper) {
if reaper == nil {
log.Println("CP orphan sweeper: reaper is nil — sweeper disabled")
return
}
log.Printf("CP orphan sweeper started — reconciling every %s", OrphanSweepInterval)
ticker := time.NewTicker(OrphanSweepInterval)
defer ticker.Stop()
cpSweepOnce(ctx, reaper)
for {
select {
case <-ctx.Done():
log.Println("CP orphan sweeper: shutdown")
return
case <-ticker.C:
cpSweepOnce(ctx, reaper)
}
}
}
// cpSweepOnce executes one reconcile pass. Defensive against db.DB
// being nil so a misconfigured boot doesn't panic.
func cpSweepOnce(parent context.Context, reaper CPOrphanReaper) {
if db.DB == nil {
return
}
ctx, cancel := context.WithTimeout(parent, orphanSweepDeadline)
defer cancel()
rows, err := db.DB.QueryContext(ctx, `
SELECT id::text
FROM workspaces
WHERE status = 'removed'
AND instance_id IS NOT NULL
AND instance_id != ''
ORDER BY updated_at DESC
LIMIT $1
`, cpSweepLimit)
if err != nil {
log.Printf("CP orphan sweeper: DB query failed: %v", err)
return
}
defer rows.Close()
var orphanIDs []string
for rows.Next() {
var id string
if scanErr := rows.Scan(&id); scanErr != nil {
log.Printf("CP orphan sweeper: row scan failed: %v", scanErr)
continue
}
orphanIDs = append(orphanIDs, id)
}
if iterErr := rows.Err(); iterErr != nil {
log.Printf("CP orphan sweeper: rows iteration failed: %v", iterErr)
return
}
for _, id := range orphanIDs {
log.Printf("CP orphan sweeper: terminating leaked EC2 for removed workspace %s", id)
if stopErr := reaper.Stop(ctx, id); stopErr != nil {
// CP-side error — transient 5xx, network, AWS hiccup. Leave
// instance_id populated so the next cycle retries. Loud-fail
// only at the log layer; the user-visible 500 was already
// returned by the inline path that triggered this orphan.
log.Printf("CP orphan sweeper: Stop failed for %s: %v — retry next cycle", id, stopErr)
continue
}
// Stop succeeded — clear instance_id so the next cycle skips this
// row. We can't use a tombstone column (no schema change in this
// PR); NULL'ing instance_id is the SSOT signal for "no live
// EC2 attached." The matching SELECT predicate above stays in
// sync with this UPDATE.
if _, updErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET instance_id = NULL, updated_at = now() WHERE id = $1`,
id,
); updErr != nil {
log.Printf("CP orphan sweeper: clear instance_id failed for %s: %v — next cycle will re-Stop (idempotent)", id, updErr)
}
}
}
@@ -0,0 +1,266 @@
package registry
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode
// sweeper tests. Records every Stop call so tests can assert which
// workspace IDs were re-issued.
type fakeCPReaper struct {
mu sync.Mutex
stopErr map[string]error
stopCalls []string
}
func (f *fakeCPReaper) Stop(_ context.Context, wsID string) error {
f.mu.Lock()
defer f.mu.Unlock()
f.stopCalls = append(f.stopCalls, wsID)
return f.stopErr[wsID]
}
// TestCPSweepOnce_StopSucceeds_ClearsInstanceID — happy path. Single
// removed-row with non-NULL instance_id; Stop succeeds; instance_id
// gets NULL'd so the next cycle won't re-sweep it.
func TestCPSweepOnce_StopSucceeds_ClearsInstanceID(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'removed'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+ORDER BY updated_at DESC\s+LIMIT \$1`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1"))
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL, updated_at = now\(\) WHERE id = \$1`).
WithArgs("ws-uuid-1").
WillReturnResult(sqlmock.NewResult(0, 1))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" {
t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_StopFails_KeepsInstanceID — CP transient failure.
// Stop returns an error; instance_id MUST stay populated so the next
// cycle retries. UPDATE must NOT fire.
func TestCPSweepOnce_StopFails_KeepsInstanceID(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{
stopErr: map[string]error{"ws-uuid-1": errors.New("CP returned 503")},
}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1"))
// No ExpectExec for the UPDATE — sqlmock fails the test if the
// UPDATE fires.
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" {
t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations (UPDATE should NOT have fired): %v", err)
}
}
// TestCPSweepOnce_NoOrphans — empty result set is the steady state in
// healthy operation. No Stop, no UPDATE.
func TestCPSweepOnce_NoOrphans(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("expected zero Stop calls, got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd
// independently; one failure doesn't block others.
func TestCPSweepOnce_MultipleOrphans(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{
stopErr: map[string]error{"ws-uuid-2": errors.New("CP 503 on ws-uuid-2")},
}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("ws-uuid-1").
AddRow("ws-uuid-2").
AddRow("ws-uuid-3"))
// ws-uuid-1 succeeds → UPDATE fires.
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
WithArgs("ws-uuid-1").
WillReturnResult(sqlmock.NewResult(0, 1))
// ws-uuid-2 fails → no UPDATE.
// ws-uuid-3 succeeds → UPDATE fires.
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
WithArgs("ws-uuid-3").
WillReturnResult(sqlmock.NewResult(0, 1))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 3 {
t.Fatalf("expected Stop on all 3 ids, got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_QueryError — DB transient failure. Sweep returns
// without panicking. No Stop calls.
func TestCPSweepOnce_QueryError(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnError(errors.New("connection refused"))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("expected zero Stop calls on query error, got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_UpdateError_LogsButContinues — Stop succeeded but
// the UPDATE to clear instance_id failed. Subsequent rows in the batch
// must still process; comment in cpSweepOnce promises idempotent re-Stop
// next cycle.
func TestCPSweepOnce_UpdateError_LogsButContinues(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("ws-uuid-1").
AddRow("ws-uuid-2"))
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
WithArgs("ws-uuid-1").
WillReturnError(errors.New("UPDATE timeout"))
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
WithArgs("ws-uuid-2").
WillReturnResult(sqlmock.NewResult(0, 1))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 2 {
t.Fatalf("expected Stop on both ids despite UPDATE error on first, got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_NilDB — defensive against db.DB being nil. Must not
// panic; must not call Stop.
func TestCPSweepOnce_NilDB(t *testing.T) {
saved := db.DB
db.DB = nil
t.Cleanup(func() { db.DB = saved })
reaper := &fakeCPReaper{}
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("expected zero Stop calls when db.DB is nil, got %v", reaper.stopCalls)
}
}
// TestStartCPOrphanSweeper_NilReaperDisabled — boot-safety: a SaaS CP
// without cpProv configured must not start the loop (immediate return,
// no goroutine leak).
func TestStartCPOrphanSweeper_NilReaperDisabled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan struct{})
go func() {
StartCPOrphanSweeper(ctx, nil)
close(done)
}()
select {
case <-done:
// expected — nil reaper short-circuits.
case <-time.After(500 * time.Millisecond):
t.Fatal("StartCPOrphanSweeper(nil) did not return immediately")
}
}
// TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick — cadence
// contract: kick off one sweep at boot (so a platform restart starts
// healing immediately), then once per OrphanSweepInterval. Verifies
// the loop terminates on ctx cancel.
func TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
// Two sweeps within the test window: one immediate, one on the
// first tick. We can't shrink OrphanSweepInterval (it's a const),
// so assert "at least one immediate sweep" and let cancel close
// the loop.
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
// The ticker may or may not fire in the test window depending on
// scheduler; tolerate both shapes by registering a second optional
// expectation. sqlmock fails on UNREGISTERED queries, so register
// one more then accept either 1 or 2 fires.
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
StartCPOrphanSweeper(ctx, reaper)
close(done)
}()
// 100ms is well past the boot-sweep but well shy of the 60s
// interval, so the second query expectation is intentionally
// unmet — that's fine, sqlmock distinguishes "expected but not
// received" (we don't enforce here) from "unexpected query"
// (which would fail).
time.Sleep(100 * time.Millisecond)
cancel()
select {
case <-done:
// expected
case <-time.After(2 * time.Second):
t.Fatal("StartCPOrphanSweeper did not exit on ctx cancel")
}
// Boot sweep must have happened — without it, an operator restart
// after a CP outage would leave a 60s gap before the first heal.
// We don't assert mock.ExpectationsWereMet() here because the
// second query is intentionally optional.
}