diff --git a/workspace-server/internal/handlers/workspace_switch_provider.go b/workspace-server/internal/handlers/workspace_switch_provider.go new file mode 100644 index 000000000..991d46219 --- /dev/null +++ b/workspace-server/internal/handlers/workspace_switch_provider.go @@ -0,0 +1,213 @@ +package handlers + +import ( + "context" + "database/sql" + "encoding/json" + "io" + "log" + "net/http" + "strings" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models" + "github.com/gin-gonic/gin" +) + +// SwitchProvider handles POST /workspaces/:id/switch-provider — move an EXISTING +// workspace to a different cloud (compute.provider). The VM is cloud-specific, so +// this reprovisions the box on the new cloud; the agent's durable identity +// (agent_card, config, secrets, tokens, memory) lives in the tenant DB and is +// preserved because the row is never deleted. +// +// CRITICAL ORDERING: the stop must run with the OLD provider BEFORE the DB +// provider is changed. The stop helper reads the current row at call time; if we +// wrote the new provider first, the teardown request would target the wrong +// backend and the old box would leak. So the sequence is strictly: +// +// 1. stop OLD box (DB still has old provider + old instance_id) +// 2. clear instance_id + write new provider (jsonb_set, preserving the rest) +// 3. provision NEW box (withStoredCompute now reads the new provider) +// +// Clearing instance_id in step 2 also makes a retried switch safe: a second call +// finds no stale instance to act on with the new provider metadata. +func (h *WorkspaceHandler) SwitchProvider(c *gin.Context) { + id := c.Param("id") + ctx := c.Request.Context() + + var body struct { + Provider string `json:"provider"` + ConfirmDataLoss bool `json:"confirm_data_loss"` + // MigrateData is accepted for forward-compat (RFC #622 PR4 wires the + // filesystem migration). Until then it is a no-op and a persistent + // volume still requires confirm_data_loss. + MigrateData bool `json:"migrate_data"` + } + if err := c.ShouldBindJSON(&body); err != nil && err != io.EOF { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid JSON body"}) + return + } + newProvider := strings.ToLower(strings.TrimSpace(body.Provider)) + if newProvider == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "provider is required"}) + return + } + if _, ok := workspaceComputeProviderAllowlist[newProvider]; !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "unsupported provider (want aws|gcp|hetzner)"}) + return + } + + var status, wsName, dbRuntime, oldProvider, dataPersistence string + var oldInstanceID sql.NullString + var tier int + err := db.DB.QueryRowContext(ctx, ` + SELECT status, name, tier, COALESCE(runtime, 'claude-code'), + COALESCE(compute->>'provider', ''), COALESCE(compute->>'data_persistence', ''), + instance_id + FROM workspaces WHERE id = $1`, id, + ).Scan(&status, &wsName, &tier, &dbRuntime, &oldProvider, &dataPersistence, &oldInstanceID) + if err == sql.ErrNoRows || status == string(models.StatusRemoved) { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) + return + } + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) + return + } + + // Switching the cloud backend is a SaaS-only concept — a self-hosted Docker + // workspace has no cloud provider to switch. external/mock runtimes have no + // box at all. + if h.cpProv == nil { + c.JSON(http.StatusConflict, gin.H{"error": "provider switching is only available for cloud (SaaS) workspaces"}) + return + } + if isExternalLikeRuntime(dbRuntime) || dbRuntime == "mock" { + c.JSON(http.StatusConflict, gin.H{"error": dbRuntime + " workspaces have no cloud box to switch"}) + return + } + if paused, parentName := isParentPaused(ctx, id); paused { + c.JSON(http.StatusConflict, gin.H{"error": "parent workspace \"" + parentName + "\" is paused — resume it first"}) + return + } + + // "" provider means the default AWS path. + effectiveOld := oldProvider + if effectiveOld == "" { + effectiveOld = "aws" + } + if newProvider == effectiveOld { + c.JSON(http.StatusOK, gin.H{"status": "noop", "provider": newProvider, "message": "workspace is already on this provider"}) + return + } + + // Data gate: a persistent data volume is cloud-specific and cannot move as a + // block device. Until the filesystem migration lands (RFC #622 PR3/PR4), the + // switch is allowed only with an explicit confirm_data_loss. + if dataPersistence == "persist" && !body.ConfirmDataLoss { + c.JSON(http.StatusConflict, gin.H{ + "error": "DATA_LOSS_UNCONFIRMED", + "detail": "this workspace has a persistent data volume that cannot move across clouds; set confirm_data_loss=true to switch with a fresh volume (cross-cloud data migration is RFC #622, not yet wired). Identity/config/secrets/memory are preserved regardless.", + }) + return + } + + // --- ordered switch (see doc-comment) --- + + // 1. Stop the OLD box with the OLD provider. DB is unchanged here, so the + // stop helper reads the old provider + old instance_id. Bounded retry; on + // exhaustion it returns an error but we STILL proceed (a stuck old box + // must not strand the switch) — except we capture the failure so step 2.5 + // can emit a durable audit row, because step 2 nulls instance_id and flips + // provider, which otherwise leaves the old box untracked by normal + // lifecycle cleanup (review finding #3). + stopErr := h.cpStopWithRetryErr(ctx, id, "SwitchProvider", false) + + // 2. Atomically claim the switch AND clear instance_id + write the new + // provider. The CAS (status not already provisioning, provider still the + // one we read) makes concurrent/duplicate switch calls safe: only the + // first winner launches a provision; a racing call sees 0 rows → 409, + // never a second provision against a second backend (review finding #4). + // jsonb_set preserves instance_type/volume/display/data_persistence. + res, err := db.DB.ExecContext(ctx, ` + UPDATE workspaces + SET instance_id = NULL, + compute = jsonb_set(COALESCE(compute, '{}'::jsonb), '{provider}', to_jsonb($2::text)), + status = $3, url = '', updated_at = now() + WHERE id = $1 + AND status <> $3 + AND COALESCE(compute->>'provider', '') IS NOT DISTINCT FROM $4`, + id, newProvider, models.StatusProvisioning, oldProvider) + if err != nil { + log.Printf("SwitchProvider: failed to write new provider for %s: %v", id, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to switch provider"}) + return + } + if n, _ := res.RowsAffected(); n == 0 { + // Lost the CAS: another switch already flipped the provider / set + // provisioning, or the row changed under us. Do NOT launch a second + // provision (would leave an untracked box). + c.JSON(http.StatusConflict, gin.H{"error": "ALREADY_SWITCHING", "detail": "a provider switch or provision is already in progress for this workspace"}) + return + } + + // 2.5. If the old box never confirmed stopped, it may not be tracked by the + // normal lifecycle cleanup after instance_id was nulled. Emit a durable + // audit row carrying the old instance_id + provider so a platform cleanup + // process can locate and terminate it (review finding #3). + if stopErr != nil && oldInstanceID.Valid && oldInstanceID.String != "" { + h.emitSwitchProviderStopExhausted(ctx, id, oldInstanceID.String, effectiveOld, newProvider, stopErr) + } + log.Printf("SwitchProvider: %s %s → %s (old box stop err=%v, reprovisioning)", id, effectiveOld, newProvider, stopErr) + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), id, map[string]interface{}{ + "name": wsName, + "tier": tier, + "runtime": dbRuntime, + "provider_from": effectiveOld, + "provider_to": newProvider, + }) + + // 3. Provision the NEW box. withStoredCompute re-reads compute (now carrying + // the new provider) → provisionWorkspaceCP routes to the new backend. + // Reuse the existing config volume (templatePath="") so identity/config + // are preserved. Detached context: the reprovision outlives the request. + payload := withStoredCompute(context.Background(), id, models.CreateWorkspacePayload{Name: wsName, Tier: tier, Runtime: dbRuntime}) + h.goAsync(func() { h.provisionWorkspaceCP(id, "", nil, payload) }) + + c.JSON(http.StatusAccepted, gin.H{ + "status": "switching", + "workspace_id": id, + "from": effectiveOld, + "to": newProvider, + }) +} + +// emitSwitchProviderStopExhausted records a durable audit row when a provider +// switch could not confirm the OLD box stopped before its instance_id was +// cleared from the row. Without it the old box may not be tracked by normal +// lifecycle cleanup; a platform cleanup process reads these rows by +// old_instance_id + old_provider to terminate the leaked box. Best-effort. +func (h *WorkspaceHandler) emitSwitchProviderStopExhausted(ctx context.Context, workspaceID, oldInstanceID, oldProvider, newProvider string, cause error) { + if db.DB == nil { + return + } + payload, err := json.Marshal(map[string]any{ + "workspace_id": workspaceID, + "old_instance_id": oldInstanceID, + "old_provider": oldProvider, + "new_provider": newProvider, + "last_error": cause.Error(), + "recovery_path": "platform_cleanup", + }) + if err != nil { + log.Printf("emitSwitchProviderStopExhausted: marshal failed for %s: %v", workspaceID, err) + return + } + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO structure_events (event_type, workspace_id, payload, created_at) + VALUES ($1, $2, $3, now()) + `, "workspace.provider.switch_stop_exhausted", workspaceID, payload); err != nil { + log.Printf("emitSwitchProviderStopExhausted: insert failed for %s: %v", workspaceID, err) + } +} diff --git a/workspace-server/internal/handlers/workspace_switch_provider_test.go b/workspace-server/internal/handlers/workspace_switch_provider_test.go new file mode 100644 index 000000000..73f0d6fce --- /dev/null +++ b/workspace-server/internal/handlers/workspace_switch_provider_test.go @@ -0,0 +1,106 @@ +package handlers + +import ( + "bytes" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/gin-gonic/gin" +) + +// TestSwitchProvider_StopBeforeProviderWrite is the load-bearing ordering pin. +// The stop helper MUST appear before the UPDATE that writes the new provider — +// otherwise the teardown uses the wrong backend metadata and the old box leaks. +// A source-level position check guards against a refactor reordering the two. +func TestSwitchProvider_StopBeforeProviderWrite(t *testing.T) { + wd, _ := os.Getwd() + src, err := os.ReadFile(filepath.Join(wd, "workspace_switch_provider.go")) + if err != nil { + t.Fatalf("read source: %v", err) + } + stripped := stripGoComments(src) + stopIdx := bytes.Index(stripped, []byte("cpStopWithRetryErr(ctx, id, \"SwitchProvider\"")) + if stopIdx < 0 { + t.Fatal("SwitchProvider must stop the old box via cpStopWithRetryErr before reprovisioning") + } + // the provider write is the jsonb_set on compute -> {provider} + writeIdx := bytes.Index(stripped, []byte("'{provider}'")) + if writeIdx < 0 { + t.Fatal("SwitchProvider must write the new provider via jsonb_set on compute->{provider}") + } + if stopIdx >= writeIdx { + t.Fatalf("ORDERING HAZARD: stop helper (idx %d) must come BEFORE the provider write (idx %d) — else the old box is torn down with wrong backend metadata and leaks", stopIdx, writeIdx) + } + // and the instance_id must be cleared in the same UPDATE (retry-safety) + if !bytes.Contains(stripped, []byte("instance_id = NULL")) { + t.Fatal("SwitchProvider must clear instance_id when writing the new provider (retry-safety)") + } +} + +// TestSwitchProvider_ConcurrencyGuardAndAudit pins the two hardening items from +// the correctness review: (a) the provider-write is an atomic CAS so two +// concurrent switches can't both launch a provision, and (b) stop-exhaustion +// emits a durable audit row carrying the old instance_id+provider so the old box +// remains discoverable after instance_id is nulled. +func TestSwitchProvider_ConcurrencyGuardAndAudit(t *testing.T) { + wd, _ := os.Getwd() + src, err := os.ReadFile(filepath.Join(wd, "workspace_switch_provider.go")) + if err != nil { + t.Fatalf("read source: %v", err) + } + s := stripGoComments(src) + if !bytes.Contains(s, []byte("status <> $3")) || !bytes.Contains(s, []byte("IS NOT DISTINCT FROM $4")) { + t.Error("the provider-write UPDATE must be a CAS (status not already provisioning AND provider unchanged) to prevent a double-provision race") + } + if !bytes.Contains(s, []byte("RowsAffected")) || !bytes.Contains(s, []byte("ALREADY_SWITCHING")) { + t.Error("SwitchProvider must 409 ALREADY_SWITCHING when the CAS affects 0 rows (lost the race)") + } + if !bytes.Contains(s, []byte("cpStopWithRetryErr")) { + t.Error("SwitchProvider must use cpStopWithRetryErr to detect stop exhaustion") + } + if !bytes.Contains(s, []byte("emitSwitchProviderStopExhausted")) { + t.Error("SwitchProvider must emit an audit row with old instance/provider metadata on stop exhaustion") + } +} + +// TestSwitchProvider_RejectsBadProvider: the allowlist check fires before any DB +// access, so a bad/missing provider is a clean 400 without touching the backend. +func TestSwitchProvider_RejectsBadProvider(t *testing.T) { + gin.SetMode(gin.TestMode) + h := &WorkspaceHandler{} + for _, tc := range []struct { + body string + want int + }{ + {`{"provider":"azure"}`, http.StatusBadRequest}, + {`{"provider":""}`, http.StatusBadRequest}, + {`{"provider":"AWS-typo"}`, http.StatusBadRequest}, + {`{}`, http.StatusBadRequest}, + } { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("POST", "/workspaces/ws-1/switch-provider", strings.NewReader(tc.body)) + c.Request.Header.Set("Content-Type", "application/json") + h.SwitchProvider(c) + if w.Code != tc.want { + t.Errorf("body %s: got %d want %d (%s)", tc.body, w.Code, tc.want, w.Body.String()) + } + } +} + +// TestSwitchProvider_RouteRegistered pins the route wiring. +func TestSwitchProvider_RouteRegistered(t *testing.T) { + wd, _ := os.Getwd() + src, err := os.ReadFile(filepath.Join(wd, "..", "router", "router.go")) + if err != nil { + t.Fatalf("read router: %v", err) + } + if !bytes.Contains(src, []byte(`POST("/switch-provider", wh.SwitchProvider)`)) { + t.Fatal("router must register POST /switch-provider → wh.SwitchProvider") + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index fb1cb36f8..e9b6a80e0 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -229,6 +229,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // Lifecycle wsAuth.GET("/state", wh.State) wsAuth.POST("/restart", wh.Restart) + wsAuth.POST("/switch-provider", wh.SwitchProvider) wsAuth.POST("/pause", wh.Pause) wsAuth.POST("/resume", wh.Resume) // Manual hibernate (opt-in, #711) — stops the container and sets status