Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 87dbee381c |
@@ -266,19 +266,6 @@ func main() {
|
||||
})
|
||||
}
|
||||
|
||||
// CP-mode orphan sweeper — SaaS counterpart to the Docker sweeper
|
||||
// above. Re-issues cpProv.Stop for any workspace at status='removed'
|
||||
// with a non-NULL instance_id, healing the deprovision split-write
|
||||
// race documented in #2989: tenant marks status='removed' BEFORE
|
||||
// calling CP DELETE, so a transient CP failure leaves the EC2
|
||||
// running with no retry path. cpProv.Stop is idempotent against
|
||||
// already-terminated instances; on success we clear instance_id.
|
||||
if cpProv != nil {
|
||||
go supervised.RunWithRecover(ctx, "cp-orphan-sweeper", func(c context.Context) {
|
||||
registry.StartCPOrphanSweeper(c, cpProv)
|
||||
})
|
||||
}
|
||||
|
||||
// Pending-uploads GC sweep — deletes acked rows past their retention
|
||||
// window plus unacked rows past expires_at. Without this the
|
||||
// pending_uploads table grows unbounded; even with the 24h hard TTL,
|
||||
|
||||
@@ -45,7 +45,7 @@ func NewWorkspaceImageService(docker *dockerclient.Client) *WorkspaceImageServic
|
||||
// Update both when a new template is added.
|
||||
var AllRuntimes = []string{
|
||||
"claude-code", "langgraph", "crewai", "autogen",
|
||||
"deepagents", "hermes", "gemini-cli", "openclaw", "codex",
|
||||
"deepagents", "hermes", "gemini-cli", "openclaw",
|
||||
}
|
||||
|
||||
// RefreshResult is the per-call outcome surfaced to HTTP callers AND logged
|
||||
@@ -56,17 +56,10 @@ type RefreshResult struct {
|
||||
Recreated []string `json:"recreated"`
|
||||
}
|
||||
|
||||
// TemplateImageRef returns the canonical image ref for a runtime's template,
|
||||
// using the configured registry (provisioner.RegistryPrefix()) and the
|
||||
// moving `:latest` tag. Single source of truth shared with imagewatch.
|
||||
//
|
||||
// Defaults to ghcr.io/molecule-ai/workspace-template-<runtime>:latest
|
||||
// (upstream OSS). When MOLECULE_IMAGE_REGISTRY is set in the environment
|
||||
// (typically the AWS ECR mirror in production), this returns the prefixed
|
||||
// equivalent so admin operations and image-watch checks hit the same
|
||||
// registry the provisioner pulls from.
|
||||
// TemplateImageRef returns the canonical GHCR ref for a runtime's template
|
||||
// image. Single source of truth shared with imagewatch.
|
||||
func TemplateImageRef(runtime string) string {
|
||||
return fmt.Sprintf("%s/workspace-template-%s:latest", provisioner.RegistryPrefix(), runtime)
|
||||
return fmt.Sprintf("ghcr.io/molecule-ai/workspace-template-%s:latest", runtime)
|
||||
}
|
||||
|
||||
// ghcrAuthHeader returns the base64-encoded JSON auth payload Docker's
|
||||
@@ -91,7 +84,7 @@ func ghcrAuthHeader() string {
|
||||
log.Printf("workspace-images: failed to marshal GHCR auth: %v", err)
|
||||
return ""
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(js)
|
||||
return base64.URLEncoding.EncodeToString(js)
|
||||
}
|
||||
|
||||
// Refresh pulls the requested runtimes' template images from GHCR and (if
|
||||
@@ -191,7 +184,7 @@ func (s *WorkspaceImageService) Refresh(ctx context.Context, runtimes []string,
|
||||
|
||||
// AdminWorkspaceImagesHandler serves POST /admin/workspace-images/refresh.
|
||||
//
|
||||
// ?runtime=claude-code (optional; default = all 9 templates)
|
||||
// ?runtime=claude-code (optional; default = all 8 templates)
|
||||
// &recreate=true|false (default true; false = pull only)
|
||||
//
|
||||
// Returns JSON {pulled: [...], failed: [...], recreated: [...]}
|
||||
|
||||
@@ -35,9 +35,9 @@ func TestGHCRAuthHeader_EncodesDockerEnginePayload(t *testing.T) {
|
||||
if got == "" {
|
||||
t.Fatal("expected non-empty auth header")
|
||||
}
|
||||
raw, err := base64.StdEncoding.DecodeString(got)
|
||||
raw, err := base64.URLEncoding.DecodeString(got)
|
||||
if err != nil {
|
||||
t.Fatalf("auth header is not valid base64: %v", err)
|
||||
t.Fatalf("auth header is not valid base64-url: %v", err)
|
||||
}
|
||||
var payload map[string]string
|
||||
if err := json.Unmarshal(raw, &payload); err != nil {
|
||||
@@ -61,7 +61,7 @@ func TestGHCRAuthHeader_TrimsWhitespace(t *testing.T) {
|
||||
t.Setenv("GHCR_USER", " alice ")
|
||||
t.Setenv("GHCR_TOKEN", "\tfake-tok-value\n")
|
||||
got := ghcrAuthHeader()
|
||||
raw, _ := base64.StdEncoding.DecodeString(got)
|
||||
raw, _ := base64.URLEncoding.DecodeString(got)
|
||||
var payload map[string]string
|
||||
_ = json.Unmarshal(raw, &payload)
|
||||
if payload["username"] != "alice" {
|
||||
|
||||
@@ -631,11 +631,6 @@ func (h *DelegationHandler) ListDelegations(c *gin.Context) {
|
||||
}
|
||||
delegations = append(delegations, entry)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("delegation list rows error: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
|
||||
return
|
||||
}
|
||||
|
||||
if delegations == nil {
|
||||
delegations = []map[string]interface{}{}
|
||||
|
||||
@@ -348,10 +348,6 @@ func queryPeerMaps(query string, args ...interface{}) ([]map[string]interface{},
|
||||
|
||||
result = append(result, peer)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("queryPeerMaps rows error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -159,11 +159,15 @@ func generateAppInstallationToken() (string, time.Time, error) {
|
||||
req, _ := http.NewRequest("POST", fmt.Sprintf("https://api.github.com/app/installations/%d/access_tokens", installID), nil)
|
||||
req.Header.Set("Authorization", "Bearer "+signed)
|
||||
req.Header.Set("Accept", "application/vnd.github+json")
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
client := &http.Client{Timeout: 15 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", time.Time{}, err
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return "", time.Time{}, fmt.Errorf("github API returned status %d", resp.StatusCode)
|
||||
}
|
||||
var result struct {
|
||||
Token string `json:"token"`
|
||||
ExpiresAt time.Time `json:"expires_at"`
|
||||
|
||||
@@ -248,11 +248,6 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
|
||||
b.WriteString(content)
|
||||
b.WriteString("\n\n")
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Instructions list rows error: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"workspace_id": workspaceID,
|
||||
|
||||
@@ -67,10 +67,6 @@ func (h *TokenHandler) List(c *gin.Context) {
|
||||
}
|
||||
tokens = append(tokens, t)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list tokens"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"tokens": tokens,
|
||||
|
||||
@@ -35,37 +35,36 @@ import (
|
||||
// drift-risk #6.
|
||||
var ErrNoBackend = errors.New("provisioner: no backend configured (zero-valued receiver)")
|
||||
|
||||
// RuntimeImages maps runtime names to their Docker image refs.
|
||||
// RuntimeImages maps runtime names to their Docker image refs on GHCR.
|
||||
// Each standalone template repo publishes its image via the reusable
|
||||
// publish-template-image workflow in molecule-ci on every main merge.
|
||||
// The provisioner pulls these on demand (see ensureImageLocal) — no
|
||||
// pre-build step on the tenant host.
|
||||
//
|
||||
// The registry prefix is determined by RegistryPrefix() in registry.go;
|
||||
// defaults to ghcr.io/molecule-ai (upstream OSS) and is overridden via the
|
||||
// MOLECULE_IMAGE_REGISTRY env var in production tenants that mirror to
|
||||
// AWS ECR or another registry. The map is computed at package init and
|
||||
// captures whatever prefix was active then.
|
||||
//
|
||||
// Legacy local-build path (`docker build -t workspace-template:<runtime>`
|
||||
// via scripts/build-images.sh) is still supported for development:
|
||||
// when a bare `workspace-template:<runtime>` image is present locally,
|
||||
// Docker's image resolver matches it before any pull is attempted. Set
|
||||
// the env var WORKSPACE_IMAGE_LOCAL_OVERRIDE=1 (enforced by callers) to
|
||||
// short-circuit pulls entirely if needed.
|
||||
var RuntimeImages = computeRuntimeImages()
|
||||
|
||||
// DefaultImage is the fallback workspace Docker image (langgraph is the
|
||||
// most common runtime). Computed via RegistryPrefix() so the prefix
|
||||
// override applies to the fallback path too.
|
||||
//
|
||||
// NOTE: Every runtime MUST have an entry in knownRuntimes (registry.go).
|
||||
// If a runtime is missing, it falls back to DefaultImage which may have
|
||||
// wrong deps. Add new runtimes to knownRuntimes AND create the standalone
|
||||
// template repo.
|
||||
var DefaultImage = RuntimeImage(defaultRuntime)
|
||||
var RuntimeImages = map[string]string{
|
||||
"langgraph": "ghcr.io/molecule-ai/workspace-template-langgraph:latest",
|
||||
"claude-code": "ghcr.io/molecule-ai/workspace-template-claude-code:latest",
|
||||
"openclaw": "ghcr.io/molecule-ai/workspace-template-openclaw:latest",
|
||||
"deepagents": "ghcr.io/molecule-ai/workspace-template-deepagents:latest",
|
||||
"crewai": "ghcr.io/molecule-ai/workspace-template-crewai:latest",
|
||||
"autogen": "ghcr.io/molecule-ai/workspace-template-autogen:latest",
|
||||
"hermes": "ghcr.io/molecule-ai/workspace-template-hermes:latest", // Hermes (Nous Research) — real hermes-agent behind A2A bridge
|
||||
"gemini-cli": "ghcr.io/molecule-ai/workspace-template-gemini-cli:latest", // Google Gemini CLI
|
||||
}
|
||||
|
||||
const (
|
||||
// DefaultImage is the fallback workspace Docker image (langgraph is the most common runtime).
|
||||
DefaultImage = "ghcr.io/molecule-ai/workspace-template-langgraph:latest"
|
||||
// NOTE: Every runtime MUST have an entry in RuntimeImages above. If a runtime is missing,
|
||||
// it falls back to DefaultImage which may have wrong deps. Add new runtimes to both
|
||||
// RuntimeImages AND create the standalone template repo.
|
||||
|
||||
// DefaultNetwork is the Docker network workspaces join.
|
||||
DefaultNetwork = "molecule-monorepo-net"
|
||||
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
package provisioner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// defaultRegistryPrefix is the upstream OSS face for all workspace template
|
||||
// images. Self-hosted Molecule deployments without the MOLECULE_IMAGE_REGISTRY
|
||||
// override pull from here.
|
||||
const defaultRegistryPrefix = "ghcr.io/molecule-ai"
|
||||
|
||||
// knownRuntimes is the canonical list of workspace template runtimes shipped
|
||||
// in main. Any runtime added here MUST also have a standalone template repo
|
||||
// (Molecule-AI/molecule-ai-workspace-template-<name>) and an entry in the
|
||||
// publish-template-image workflow that builds it.
|
||||
//
|
||||
// Order matters for deterministic test snapshots; keep alphabetical.
|
||||
var knownRuntimes = []string{
|
||||
"autogen",
|
||||
"claude-code",
|
||||
"codex",
|
||||
"crewai",
|
||||
"deepagents",
|
||||
"gemini-cli",
|
||||
"hermes",
|
||||
"langgraph",
|
||||
"openclaw",
|
||||
}
|
||||
|
||||
// defaultRuntime is the fallback when a workspace's config doesn't specify a
|
||||
// runtime. Picked because LangGraph is the most common in our org templates
|
||||
// and has the smallest "first impression" cold-start surface.
|
||||
const defaultRuntime = "langgraph"
|
||||
|
||||
// RegistryPrefix returns the registry prefix all workspace-template image
|
||||
// references should use. Defaults to ghcr.io/molecule-ai (the upstream OSS
|
||||
// face) and is overridden by the MOLECULE_IMAGE_REGISTRY env var in
|
||||
// production tenants where we mirror images to a private registry.
|
||||
//
|
||||
// The override is set at deploy time (Railway env, EC2 user-data) — never
|
||||
// from user-supplied input — so the value is trusted by the time it reaches
|
||||
// this code. Validation is deliberately minimal: an operator-supplied
|
||||
// prefix that points at a registry the EC2 can't authenticate to will fail
|
||||
// loudly at docker-pull time, which is the right blast radius.
|
||||
//
|
||||
// Example values:
|
||||
//
|
||||
// (unset) → ghcr.io/molecule-ai (OSS default)
|
||||
// "123456789012.dkr.ecr.us-east-2.amazonaws.com/molecule-ai" → AWS ECR mirror
|
||||
// "git.moleculesai.app/molecule-ai" → self-hosted Gitea Container Registry (future)
|
||||
//
|
||||
// Auth is registry-specific and configured outside this function:
|
||||
// - GHCR: GHCR_USER/GHCR_TOKEN env vars consumed by ghcrAuthHeader()
|
||||
// - ECR: docker credential helper (amazon-ecr-credential-helper) configured
|
||||
// in EC2 user-data; ~/.docker/config.json has credHelpers entry; the
|
||||
// daemon resolves auth automatically on every pull.
|
||||
func RegistryPrefix() string {
|
||||
if v := os.Getenv("MOLECULE_IMAGE_REGISTRY"); v != "" {
|
||||
return v
|
||||
}
|
||||
return defaultRegistryPrefix
|
||||
}
|
||||
|
||||
// RuntimeImage returns the canonical image reference for the given runtime,
|
||||
// using the current RegistryPrefix() and the moving `:latest` tag.
|
||||
//
|
||||
// For SHA-pinned references (production thin-AMI launches), the
|
||||
// runtime_image_pins lookup in handlers/runtime_image_pin.go strips the
|
||||
// `:latest` suffix and appends an immutable `@sha256:<digest>` from the DB.
|
||||
// That code path naturally inherits any RegistryPrefix() change because it
|
||||
// reads from RuntimeImages[runtime] and only re-formats the tag suffix.
|
||||
//
|
||||
// Returns the empty string for unknown runtimes; callers should fall through
|
||||
// to DefaultImage in that case (matching legacy behavior).
|
||||
func RuntimeImage(runtime string) string {
|
||||
for _, r := range knownRuntimes {
|
||||
if r == runtime {
|
||||
return fmt.Sprintf("%s/workspace-template-%s:latest", RegistryPrefix(), runtime)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// computeRuntimeImages returns the {runtime: image-ref} map evaluated against
|
||||
// the current RegistryPrefix(). Called at package init to populate the
|
||||
// exported RuntimeImages var. Tests that flip MOLECULE_IMAGE_REGISTRY between
|
||||
// expected values use this helper to rebuild the map mid-run.
|
||||
func computeRuntimeImages() map[string]string {
|
||||
out := make(map[string]string, len(knownRuntimes))
|
||||
for _, r := range knownRuntimes {
|
||||
out[r] = RuntimeImage(r)
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -1,140 +0,0 @@
|
||||
package provisioner
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestRegistryPrefix_DefaultsToGHCR pins the OSS-default behavior. If a future
|
||||
// refactor accidentally drops the default, OSS users self-hosting Molecule
|
||||
// would silently lose image pulls — this test should fail loudly instead.
|
||||
func TestRegistryPrefix_DefaultsToGHCR(t *testing.T) {
|
||||
t.Setenv("MOLECULE_IMAGE_REGISTRY", "")
|
||||
got := RegistryPrefix()
|
||||
want := "ghcr.io/molecule-ai"
|
||||
if got != want {
|
||||
t.Fatalf("RegistryPrefix() = %q, want %q (default must remain GHCR for OSS users)", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegistryPrefix_RespectsEnv verifies the override path used in
|
||||
// production tenants where MOLECULE_IMAGE_REGISTRY points at a private
|
||||
// mirror (AWS ECR, self-hosted Harbor, etc.).
|
||||
func TestRegistryPrefix_RespectsEnv(t *testing.T) {
|
||||
t.Setenv("MOLECULE_IMAGE_REGISTRY", "123456789012.dkr.ecr.us-east-2.amazonaws.com/molecule-ai")
|
||||
got := RegistryPrefix()
|
||||
want := "123456789012.dkr.ecr.us-east-2.amazonaws.com/molecule-ai"
|
||||
if got != want {
|
||||
t.Fatalf("RegistryPrefix() = %q, want %q (env override path is the production cutover mechanism)", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegistryPrefix_EmptyEnvFallsBackToDefault — guard against an operator
|
||||
// setting MOLECULE_IMAGE_REGISTRY="" by mistake (e.g. unset deploy variable
|
||||
// becomes empty string, not literally absent). We treat "" as "use default"
|
||||
// so a misconfigured env doesn't mean an empty registry prefix.
|
||||
func TestRegistryPrefix_EmptyEnvFallsBackToDefault(t *testing.T) {
|
||||
t.Setenv("MOLECULE_IMAGE_REGISTRY", "")
|
||||
if RegistryPrefix() != defaultRegistryPrefix {
|
||||
t.Fatalf("empty MOLECULE_IMAGE_REGISTRY should fall back to %q, got %q", defaultRegistryPrefix, RegistryPrefix())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRuntimeImage_AllKnownRuntimes — every runtime in the canonical list
|
||||
// must produce a properly-formatted image ref. If a new runtime is added to
|
||||
// knownRuntimes but the format changes, this catches it.
|
||||
func TestRuntimeImage_AllKnownRuntimes(t *testing.T) {
|
||||
t.Setenv("MOLECULE_IMAGE_REGISTRY", "")
|
||||
for _, r := range knownRuntimes {
|
||||
got := RuntimeImage(r)
|
||||
want := "ghcr.io/molecule-ai/workspace-template-" + r + ":latest"
|
||||
if got != want {
|
||||
t.Errorf("RuntimeImage(%q) = %q, want %q", r, got, want)
|
||||
}
|
||||
}
|
||||
// Pin the count so adding a runtime requires explicit test acknowledgement.
|
||||
if len(knownRuntimes) != 9 {
|
||||
t.Errorf("knownRuntimes length = %d, want 9 (autogen, claude-code, codex, crewai, deepagents, gemini-cli, hermes, langgraph, openclaw)", len(knownRuntimes))
|
||||
}
|
||||
}
|
||||
|
||||
// TestRuntimeImage_UnknownRuntime — defensive: callers must fall back to
|
||||
// DefaultImage when a runtime is unknown, never silently use the wrong
|
||||
// prefix. Returning "" enforces an explicit fallback at every call site.
|
||||
func TestRuntimeImage_UnknownRuntime(t *testing.T) {
|
||||
for _, name := range []string{"", "nonexistent", "WORKSPACE-TEMPLATE-FAKE", "../../../etc/passwd"} {
|
||||
if got := RuntimeImage(name); got != "" {
|
||||
t.Errorf("RuntimeImage(%q) = %q, want empty string for unknown runtime", name, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRuntimeImage_RegistryOverrideAppliesToAllRuntimes — the override
|
||||
// flips ALL runtimes consistently. If a refactor accidentally hardcoded
|
||||
// the prefix in some runtimes but not others (the failure mode that
|
||||
// triggered this whole rollout), this test catches it.
|
||||
func TestRuntimeImage_RegistryOverrideAppliesToAllRuntimes(t *testing.T) {
|
||||
const ecr = "999999999999.dkr.ecr.us-east-2.amazonaws.com/molecule-ai"
|
||||
t.Setenv("MOLECULE_IMAGE_REGISTRY", ecr)
|
||||
|
||||
for _, r := range knownRuntimes {
|
||||
got := RuntimeImage(r)
|
||||
if !strings.HasPrefix(got, ecr+"/workspace-template-") {
|
||||
t.Errorf("RuntimeImage(%q) = %q, must start with override prefix %q", r, got, ecr)
|
||||
}
|
||||
if !strings.HasSuffix(got, ":latest") {
|
||||
t.Errorf("RuntimeImage(%q) = %q, must keep :latest tag suffix", r, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestComputeRuntimeImages_AllRuntimesPresent — the map must contain every
|
||||
// known runtime. Drift between knownRuntimes and computeRuntimeImages would
|
||||
// silently break the runtime → image lookup that provisioner.Start uses.
|
||||
func TestComputeRuntimeImages_AllRuntimesPresent(t *testing.T) {
|
||||
t.Setenv("MOLECULE_IMAGE_REGISTRY", "")
|
||||
m := computeRuntimeImages()
|
||||
if len(m) != len(knownRuntimes) {
|
||||
t.Fatalf("computeRuntimeImages() has %d entries, want %d (one per knownRuntime)", len(m), len(knownRuntimes))
|
||||
}
|
||||
for _, r := range knownRuntimes {
|
||||
img, ok := m[r]
|
||||
if !ok {
|
||||
t.Errorf("computeRuntimeImages() missing runtime %q", r)
|
||||
continue
|
||||
}
|
||||
if img == "" {
|
||||
t.Errorf("computeRuntimeImages()[%q] is empty", r)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestComputeRuntimeImages_ReflectsCurrentEnv — calling computeRuntimeImages
|
||||
// after env change rebuilds the map with new prefix. Tests + ops procedures
|
||||
// that flip the env in-process rely on this.
|
||||
func TestComputeRuntimeImages_ReflectsCurrentEnv(t *testing.T) {
|
||||
t.Setenv("MOLECULE_IMAGE_REGISTRY", "")
|
||||
defaultMap := computeRuntimeImages()
|
||||
if !strings.HasPrefix(defaultMap["claude-code"], "ghcr.io/molecule-ai/") {
|
||||
t.Fatalf("default map should be GHCR-prefixed, got %q", defaultMap["claude-code"])
|
||||
}
|
||||
|
||||
const mirror = "registry.example.com/molecule-ai"
|
||||
t.Setenv("MOLECULE_IMAGE_REGISTRY", mirror)
|
||||
mirrorMap := computeRuntimeImages()
|
||||
if !strings.HasPrefix(mirrorMap["claude-code"], mirror+"/") {
|
||||
t.Fatalf("mirror-prefixed map should start with %q, got %q", mirror, mirrorMap["claude-code"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestKnownRuntimes_AlphabeticalOrder — pin the order so test snapshots
|
||||
// (and human readers diffing the file) see deterministic output. Adding a
|
||||
// new runtime out of alphabetical order will fail this test, which is the
|
||||
// nudge to keep the file readable.
|
||||
func TestKnownRuntimes_AlphabeticalOrder(t *testing.T) {
|
||||
for i := 1; i < len(knownRuntimes); i++ {
|
||||
if knownRuntimes[i-1] >= knownRuntimes[i] {
|
||||
t.Errorf("knownRuntimes not alphabetical: %q comes before %q", knownRuntimes[i-1], knownRuntimes[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,149 +0,0 @@
|
||||
package registry
|
||||
|
||||
// cp_orphan_sweeper.go — SaaS-mode counterpart to orphan_sweeper.go.
|
||||
//
|
||||
// The Docker sweeper (StartOrphanSweeper) runs only when prov != nil
|
||||
// (single-tenant Docker mode); SaaS tenants run cpProv != nil and prov
|
||||
// == nil, so they get no sweep coverage from that path. This file fills
|
||||
// the gap for the deprovision split-write race documented in #2989:
|
||||
//
|
||||
// 1. handlers/workspace_crud.go:365 marks workspaces.status = 'removed'.
|
||||
// 2. workspace_crud.go:439 calls StopWorkspaceAuto → cpProv.Stop, which
|
||||
// issues DELETE /cp/workspaces/:id?instance_id=… to controlplane.
|
||||
// 3. If step 2 fails (CP transient 5xx, network blip, AWS hiccup), the
|
||||
// inline path returns a 500 to the canvas — but the DB row is already
|
||||
// at status='removed' with instance_id still populated. There's no
|
||||
// retry, and the EC2 lives forever.
|
||||
//
|
||||
// This sweeper closes that gap by re-issuing cpProv.Stop on every cycle
|
||||
// for any workspace at status='removed' with a non-NULL instance_id.
|
||||
// Stop is idempotent: AWS TerminateInstance on an already-terminated
|
||||
// instance is a no-op (per AWS docs), and CP's Deprovision handler
|
||||
// (controlplane/internal/handlers/workspace_provision.go:289) handles
|
||||
// the already-terminated and already-deleted-DNS cases via best-effort
|
||||
// guards. On Stop success, the sweeper clears instance_id so the next
|
||||
// cycle skips the row.
|
||||
//
|
||||
// Cadence + safety filters mirror the Docker sweeper:
|
||||
// - 60s tick (OrphanSweepInterval)
|
||||
// - 30s per-cycle deadline (orphanSweepDeadline)
|
||||
// - LIMIT 100 per cycle so a sustained CP outage that backs up many
|
||||
// orphans doesn't blow the request timeout; subsequent cycles drain.
|
||||
//
|
||||
// SSOT note: Stop's idempotency (no-op on empty instance_id, AWS
|
||||
// terminate on already-terminated) is the load-bearing invariant. Any
|
||||
// future change that adds non-idempotent side effects to cpProv.Stop
|
||||
// must also gate this sweeper, or it will re-execute those side effects
|
||||
// every 60s for every cleared-but-not-yet-NULL row.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// CPOrphanReaper is the dependency the SaaS-mode sweeper takes from
|
||||
// the CP provisioner. *provisioner.CPProvisioner satisfies this
|
||||
// naturally; tests inject fakes.
|
||||
type CPOrphanReaper interface {
|
||||
Stop(ctx context.Context, workspaceID string) error
|
||||
}
|
||||
|
||||
// cpSweepLimit caps the per-cycle row count so a sustained CP outage
|
||||
// can't make a single sweep cycle blow orphanSweepDeadline. With a
|
||||
// 60s cadence and 100-row limit, drain rate is up to 100 orphans/min,
|
||||
// which has never been approached even during the worst leak windows.
|
||||
const cpSweepLimit = 100
|
||||
|
||||
// StartCPOrphanSweeper runs the SaaS-mode reconcile loop until ctx is
|
||||
// cancelled. nil reaper makes the loop a no-op (matches the Docker
|
||||
// sweeper's nil-tolerant pattern).
|
||||
//
|
||||
// Caller is expected to gate on `cpProv != nil` (matching how
|
||||
// StartOrphanSweeper is gated on `prov != nil` at the call site in
|
||||
// cmd/server/main.go) — passing a nil *CPProvisioner here would also
|
||||
// short-circuit but the gate at the wiring site keeps the call shape
|
||||
// symmetric across the two sweepers.
|
||||
func StartCPOrphanSweeper(ctx context.Context, reaper CPOrphanReaper) {
|
||||
if reaper == nil {
|
||||
log.Println("CP orphan sweeper: reaper is nil — sweeper disabled")
|
||||
return
|
||||
}
|
||||
log.Printf("CP orphan sweeper started — reconciling every %s", OrphanSweepInterval)
|
||||
ticker := time.NewTicker(OrphanSweepInterval)
|
||||
defer ticker.Stop()
|
||||
cpSweepOnce(ctx, reaper)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("CP orphan sweeper: shutdown")
|
||||
return
|
||||
case <-ticker.C:
|
||||
cpSweepOnce(ctx, reaper)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cpSweepOnce executes one reconcile pass. Defensive against db.DB
|
||||
// being nil so a misconfigured boot doesn't panic.
|
||||
func cpSweepOnce(parent context.Context, reaper CPOrphanReaper) {
|
||||
if db.DB == nil {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(parent, orphanSweepDeadline)
|
||||
defer cancel()
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id::text
|
||||
FROM workspaces
|
||||
WHERE status = 'removed'
|
||||
AND instance_id IS NOT NULL
|
||||
AND instance_id != ''
|
||||
ORDER BY updated_at DESC
|
||||
LIMIT $1
|
||||
`, cpSweepLimit)
|
||||
if err != nil {
|
||||
log.Printf("CP orphan sweeper: DB query failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var orphanIDs []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if scanErr := rows.Scan(&id); scanErr != nil {
|
||||
log.Printf("CP orphan sweeper: row scan failed: %v", scanErr)
|
||||
continue
|
||||
}
|
||||
orphanIDs = append(orphanIDs, id)
|
||||
}
|
||||
if iterErr := rows.Err(); iterErr != nil {
|
||||
log.Printf("CP orphan sweeper: rows iteration failed: %v", iterErr)
|
||||
return
|
||||
}
|
||||
|
||||
for _, id := range orphanIDs {
|
||||
log.Printf("CP orphan sweeper: terminating leaked EC2 for removed workspace %s", id)
|
||||
if stopErr := reaper.Stop(ctx, id); stopErr != nil {
|
||||
// CP-side error — transient 5xx, network, AWS hiccup. Leave
|
||||
// instance_id populated so the next cycle retries. Loud-fail
|
||||
// only at the log layer; the user-visible 500 was already
|
||||
// returned by the inline path that triggered this orphan.
|
||||
log.Printf("CP orphan sweeper: Stop failed for %s: %v — retry next cycle", id, stopErr)
|
||||
continue
|
||||
}
|
||||
// Stop succeeded — clear instance_id so the next cycle skips this
|
||||
// row. We can't use a tombstone column (no schema change in this
|
||||
// PR); NULL'ing instance_id is the SSOT signal for "no live
|
||||
// EC2 attached." The matching SELECT predicate above stays in
|
||||
// sync with this UPDATE.
|
||||
if _, updErr := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET instance_id = NULL, updated_at = now() WHERE id = $1`,
|
||||
id,
|
||||
); updErr != nil {
|
||||
log.Printf("CP orphan sweeper: clear instance_id failed for %s: %v — next cycle will re-Stop (idempotent)", id, updErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,266 +0,0 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode
|
||||
// sweeper tests. Records every Stop call so tests can assert which
|
||||
// workspace IDs were re-issued.
|
||||
type fakeCPReaper struct {
|
||||
mu sync.Mutex
|
||||
stopErr map[string]error
|
||||
stopCalls []string
|
||||
}
|
||||
|
||||
func (f *fakeCPReaper) Stop(_ context.Context, wsID string) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.stopCalls = append(f.stopCalls, wsID)
|
||||
return f.stopErr[wsID]
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_StopSucceeds_ClearsInstanceID — happy path. Single
|
||||
// removed-row with non-NULL instance_id; Stop succeeds; instance_id
|
||||
// gets NULL'd so the next cycle won't re-sweep it.
|
||||
func TestCPSweepOnce_StopSucceeds_ClearsInstanceID(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'removed'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+ORDER BY updated_at DESC\s+LIMIT \$1`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs("ws-uuid-1").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" {
|
||||
t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_StopFails_KeepsInstanceID — CP transient failure.
|
||||
// Stop returns an error; instance_id MUST stay populated so the next
|
||||
// cycle retries. UPDATE must NOT fire.
|
||||
func TestCPSweepOnce_StopFails_KeepsInstanceID(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{
|
||||
stopErr: map[string]error{"ws-uuid-1": errors.New("CP returned 503")},
|
||||
}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1"))
|
||||
// No ExpectExec for the UPDATE — sqlmock fails the test if the
|
||||
// UPDATE fires.
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" {
|
||||
t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations (UPDATE should NOT have fired): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_NoOrphans — empty result set is the steady state in
|
||||
// healthy operation. No Stop, no UPDATE.
|
||||
func TestCPSweepOnce_NoOrphans(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 0 {
|
||||
t.Fatalf("expected zero Stop calls, got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd
|
||||
// independently; one failure doesn't block others.
|
||||
func TestCPSweepOnce_MultipleOrphans(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{
|
||||
stopErr: map[string]error{"ws-uuid-2": errors.New("CP 503 on ws-uuid-2")},
|
||||
}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).
|
||||
AddRow("ws-uuid-1").
|
||||
AddRow("ws-uuid-2").
|
||||
AddRow("ws-uuid-3"))
|
||||
// ws-uuid-1 succeeds → UPDATE fires.
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
|
||||
WithArgs("ws-uuid-1").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// ws-uuid-2 fails → no UPDATE.
|
||||
// ws-uuid-3 succeeds → UPDATE fires.
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
|
||||
WithArgs("ws-uuid-3").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 3 {
|
||||
t.Fatalf("expected Stop on all 3 ids, got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_QueryError — DB transient failure. Sweep returns
|
||||
// without panicking. No Stop calls.
|
||||
func TestCPSweepOnce_QueryError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnError(errors.New("connection refused"))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 0 {
|
||||
t.Fatalf("expected zero Stop calls on query error, got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_UpdateError_LogsButContinues — Stop succeeded but
|
||||
// the UPDATE to clear instance_id failed. Subsequent rows in the batch
|
||||
// must still process; comment in cpSweepOnce promises idempotent re-Stop
|
||||
// next cycle.
|
||||
func TestCPSweepOnce_UpdateError_LogsButContinues(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).
|
||||
AddRow("ws-uuid-1").
|
||||
AddRow("ws-uuid-2"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
|
||||
WithArgs("ws-uuid-1").
|
||||
WillReturnError(errors.New("UPDATE timeout"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
|
||||
WithArgs("ws-uuid-2").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 2 {
|
||||
t.Fatalf("expected Stop on both ids despite UPDATE error on first, got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_NilDB — defensive against db.DB being nil. Must not
|
||||
// panic; must not call Stop.
|
||||
func TestCPSweepOnce_NilDB(t *testing.T) {
|
||||
saved := db.DB
|
||||
db.DB = nil
|
||||
t.Cleanup(func() { db.DB = saved })
|
||||
|
||||
reaper := &fakeCPReaper{}
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 0 {
|
||||
t.Fatalf("expected zero Stop calls when db.DB is nil, got %v", reaper.stopCalls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStartCPOrphanSweeper_NilReaperDisabled — boot-safety: a SaaS CP
|
||||
// without cpProv configured must not start the loop (immediate return,
|
||||
// no goroutine leak).
|
||||
func TestStartCPOrphanSweeper_NilReaperDisabled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
StartCPOrphanSweeper(ctx, nil)
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
// expected — nil reaper short-circuits.
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Fatal("StartCPOrphanSweeper(nil) did not return immediately")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick — cadence
|
||||
// contract: kick off one sweep at boot (so a platform restart starts
|
||||
// healing immediately), then once per OrphanSweepInterval. Verifies
|
||||
// the loop terminates on ctx cancel.
|
||||
func TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
// Two sweeps within the test window: one immediate, one on the
|
||||
// first tick. We can't shrink OrphanSweepInterval (it's a const),
|
||||
// so assert "at least one immediate sweep" and let cancel close
|
||||
// the loop.
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
// The ticker may or may not fire in the test window depending on
|
||||
// scheduler; tolerate both shapes by registering a second optional
|
||||
// expectation. sqlmock fails on UNREGISTERED queries, so register
|
||||
// one more then accept either 1 or 2 fires.
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
StartCPOrphanSweeper(ctx, reaper)
|
||||
close(done)
|
||||
}()
|
||||
// 100ms is well past the boot-sweep but well shy of the 60s
|
||||
// interval, so the second query expectation is intentionally
|
||||
// unmet — that's fine, sqlmock distinguishes "expected but not
|
||||
// received" (we don't enforce here) from "unexpected query"
|
||||
// (which would fail).
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
// expected
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("StartCPOrphanSweeper did not exit on ctx cancel")
|
||||
}
|
||||
|
||||
// Boot sweep must have happened — without it, an operator restart
|
||||
// after a CP outage would leave a 60s gap before the first heal.
|
||||
// We don't assert mock.ExpectationsWereMet() here because the
|
||||
// second query is intentionally optional.
|
||||
}
|
||||
Reference in New Issue
Block a user