feat(ws-server): POST /workspaces/:id/switch-provider (ordered, leak-safe) [switch-provider PR2] #2422

Closed
devops-engineer wants to merge 3 commits from feat/ws-switch-provider-endpoint into main
3 changed files with 320 additions and 0 deletions
@@ -0,0 +1,213 @@
package handlers
import (
"context"
"database/sql"
"encoding/json"
"io"
"log"
"net/http"
"strings"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/gin-gonic/gin"
)
// SwitchProvider handles POST /workspaces/:id/switch-provider — move an EXISTING
// workspace to a different cloud (compute.provider). The VM is cloud-specific, so
// this reprovisions the box on the new cloud; the agent's durable identity
// (agent_card, config, secrets, tokens, memory) lives in the tenant DB and is
// preserved because the row is never deleted.
//
// CRITICAL ORDERING: the stop must run with the OLD provider BEFORE the DB
// provider is changed. The stop helper reads the current row at call time; if we
// wrote the new provider first, the teardown request would target the wrong
// backend and the old box would leak. So the sequence is strictly:
//
// 1. stop OLD box (DB still has old provider + old instance_id)
// 2. clear instance_id + write new provider (jsonb_set, preserving the rest)
// 3. provision NEW box (withStoredCompute now reads the new provider)
//
// Clearing instance_id in step 2 also makes a retried switch safe: a second call
// finds no stale instance to act on with the new provider metadata.
func (h *WorkspaceHandler) SwitchProvider(c *gin.Context) {
id := c.Param("id")
ctx := c.Request.Context()
var body struct {
Provider string `json:"provider"`
ConfirmDataLoss bool `json:"confirm_data_loss"`
// MigrateData is accepted for forward-compat (RFC #622 PR4 wires the
// filesystem migration). Until then it is a no-op and a persistent
// volume still requires confirm_data_loss.
MigrateData bool `json:"migrate_data"`
}
if err := c.ShouldBindJSON(&body); err != nil && err != io.EOF {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid JSON body"})
return
}
newProvider := strings.ToLower(strings.TrimSpace(body.Provider))
if newProvider == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "provider is required"})
return
}
if _, ok := workspaceComputeProviderAllowlist[newProvider]; !ok {
c.JSON(http.StatusBadRequest, gin.H{"error": "unsupported provider (want aws|gcp|hetzner)"})
return
}
var status, wsName, dbRuntime, oldProvider, dataPersistence string
var oldInstanceID sql.NullString
var tier int
err := db.DB.QueryRowContext(ctx, `
SELECT status, name, tier, COALESCE(runtime, 'claude-code'),
COALESCE(compute->>'provider', ''), COALESCE(compute->>'data_persistence', ''),
instance_id
FROM workspaces WHERE id = $1`, id,
).Scan(&status, &wsName, &tier, &dbRuntime, &oldProvider, &dataPersistence, &oldInstanceID)
if err == sql.ErrNoRows || status == string(models.StatusRemoved) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
return
}
// Switching the cloud backend is a SaaS-only concept — a self-hosted Docker
// workspace has no cloud provider to switch. external/mock runtimes have no
// box at all.
if h.cpProv == nil {
c.JSON(http.StatusConflict, gin.H{"error": "provider switching is only available for cloud (SaaS) workspaces"})
return
}
if isExternalLikeRuntime(dbRuntime) || dbRuntime == "mock" {
c.JSON(http.StatusConflict, gin.H{"error": dbRuntime + " workspaces have no cloud box to switch"})
return
}
if paused, parentName := isParentPaused(ctx, id); paused {
c.JSON(http.StatusConflict, gin.H{"error": "parent workspace \"" + parentName + "\" is paused — resume it first"})
return
}
// "" provider means the default AWS path.
effectiveOld := oldProvider
if effectiveOld == "" {
effectiveOld = "aws"
}
if newProvider == effectiveOld {
c.JSON(http.StatusOK, gin.H{"status": "noop", "provider": newProvider, "message": "workspace is already on this provider"})
return
}
// Data gate: a persistent data volume is cloud-specific and cannot move as a
// block device. Until the filesystem migration lands (RFC #622 PR3/PR4), the
// switch is allowed only with an explicit confirm_data_loss.
if dataPersistence == "persist" && !body.ConfirmDataLoss {
c.JSON(http.StatusConflict, gin.H{
"error": "DATA_LOSS_UNCONFIRMED",
"detail": "this workspace has a persistent data volume that cannot move across clouds; set confirm_data_loss=true to switch with a fresh volume (cross-cloud data migration is RFC #622, not yet wired). Identity/config/secrets/memory are preserved regardless.",
})
return
}
// --- ordered switch (see doc-comment) ---
// 1. Stop the OLD box with the OLD provider. DB is unchanged here, so the
// stop helper reads the old provider + old instance_id. Bounded retry; on
// exhaustion it returns an error but we STILL proceed (a stuck old box
// must not strand the switch) — except we capture the failure so step 2.5
// can emit a durable audit row, because step 2 nulls instance_id and flips
// provider, which otherwise leaves the old box untracked by normal
// lifecycle cleanup (review finding #3).
stopErr := h.cpStopWithRetryErr(ctx, id, "SwitchProvider", false)
// 2. Atomically claim the switch AND clear instance_id + write the new
// provider. The CAS (status not already provisioning, provider still the
// one we read) makes concurrent/duplicate switch calls safe: only the
// first winner launches a provision; a racing call sees 0 rows → 409,
// never a second provision against a second backend (review finding #4).
// jsonb_set preserves instance_type/volume/display/data_persistence.
res, err := db.DB.ExecContext(ctx, `
UPDATE workspaces
SET instance_id = NULL,
compute = jsonb_set(COALESCE(compute, '{}'::jsonb), '{provider}', to_jsonb($2::text)),
status = $3, url = '', updated_at = now()
WHERE id = $1
AND status <> $3
AND COALESCE(compute->>'provider', '') IS NOT DISTINCT FROM $4`,
id, newProvider, models.StatusProvisioning, oldProvider)
if err != nil {
log.Printf("SwitchProvider: failed to write new provider for %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to switch provider"})
return
}
if n, _ := res.RowsAffected(); n == 0 {
// Lost the CAS: another switch already flipped the provider / set
// provisioning, or the row changed under us. Do NOT launch a second
// provision (would leave an untracked box).
c.JSON(http.StatusConflict, gin.H{"error": "ALREADY_SWITCHING", "detail": "a provider switch or provision is already in progress for this workspace"})
return
}
// 2.5. If the old box never confirmed stopped, it may not be tracked by the
// normal lifecycle cleanup after instance_id was nulled. Emit a durable
// audit row carrying the old instance_id + provider so a platform cleanup
// process can locate and terminate it (review finding #3).
if stopErr != nil && oldInstanceID.Valid && oldInstanceID.String != "" {
h.emitSwitchProviderStopExhausted(ctx, id, oldInstanceID.String, effectiveOld, newProvider, stopErr)
}
log.Printf("SwitchProvider: %s %s → %s (old box stop err=%v, reprovisioning)", id, effectiveOld, newProvider, stopErr)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), id, map[string]interface{}{
"name": wsName,
"tier": tier,
"runtime": dbRuntime,
"provider_from": effectiveOld,
"provider_to": newProvider,
})
// 3. Provision the NEW box. withStoredCompute re-reads compute (now carrying
// the new provider) → provisionWorkspaceCP routes to the new backend.
// Reuse the existing config volume (templatePath="") so identity/config
// are preserved. Detached context: the reprovision outlives the request.
payload := withStoredCompute(context.Background(), id, models.CreateWorkspacePayload{Name: wsName, Tier: tier, Runtime: dbRuntime})
h.goAsync(func() { h.provisionWorkspaceCP(id, "", nil, payload) })
c.JSON(http.StatusAccepted, gin.H{
"status": "switching",
"workspace_id": id,
"from": effectiveOld,
"to": newProvider,
})
}
// emitSwitchProviderStopExhausted records a durable audit row when a provider
// switch could not confirm the OLD box stopped before its instance_id was
// cleared from the row. Without it the old box may not be tracked by normal
// lifecycle cleanup; a platform cleanup process reads these rows by
// old_instance_id + old_provider to terminate the leaked box. Best-effort.
func (h *WorkspaceHandler) emitSwitchProviderStopExhausted(ctx context.Context, workspaceID, oldInstanceID, oldProvider, newProvider string, cause error) {
if db.DB == nil {
return
}
payload, err := json.Marshal(map[string]any{
"workspace_id": workspaceID,
"old_instance_id": oldInstanceID,
"old_provider": oldProvider,
"new_provider": newProvider,
"last_error": cause.Error(),
"recovery_path": "platform_cleanup",
})
if err != nil {
log.Printf("emitSwitchProviderStopExhausted: marshal failed for %s: %v", workspaceID, err)
return
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO structure_events (event_type, workspace_id, payload, created_at)
VALUES ($1, $2, $3, now())
`, "workspace.provider.switch_stop_exhausted", workspaceID, payload); err != nil {
log.Printf("emitSwitchProviderStopExhausted: insert failed for %s: %v", workspaceID, err)
}
}
@@ -0,0 +1,106 @@
package handlers
import (
"bytes"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"github.com/gin-gonic/gin"
)
// TestSwitchProvider_StopBeforeProviderWrite is the load-bearing ordering pin.
// The stop helper MUST appear before the UPDATE that writes the new provider —
// otherwise the teardown uses the wrong backend metadata and the old box leaks.
// A source-level position check guards against a refactor reordering the two.
func TestSwitchProvider_StopBeforeProviderWrite(t *testing.T) {
wd, _ := os.Getwd()
src, err := os.ReadFile(filepath.Join(wd, "workspace_switch_provider.go"))
if err != nil {
t.Fatalf("read source: %v", err)
}
stripped := stripGoComments(src)
stopIdx := bytes.Index(stripped, []byte("cpStopWithRetryErr(ctx, id, \"SwitchProvider\""))
if stopIdx < 0 {
t.Fatal("SwitchProvider must stop the old box via cpStopWithRetryErr before reprovisioning")
}
// the provider write is the jsonb_set on compute -> {provider}
writeIdx := bytes.Index(stripped, []byte("'{provider}'"))
if writeIdx < 0 {
t.Fatal("SwitchProvider must write the new provider via jsonb_set on compute->{provider}")
}
if stopIdx >= writeIdx {
t.Fatalf("ORDERING HAZARD: stop helper (idx %d) must come BEFORE the provider write (idx %d) — else the old box is torn down with wrong backend metadata and leaks", stopIdx, writeIdx)
}
// and the instance_id must be cleared in the same UPDATE (retry-safety)
if !bytes.Contains(stripped, []byte("instance_id = NULL")) {
t.Fatal("SwitchProvider must clear instance_id when writing the new provider (retry-safety)")
}
}
// TestSwitchProvider_ConcurrencyGuardAndAudit pins the two hardening items from
// the correctness review: (a) the provider-write is an atomic CAS so two
// concurrent switches can't both launch a provision, and (b) stop-exhaustion
// emits a durable audit row carrying the old instance_id+provider so the old box
// remains discoverable after instance_id is nulled.
func TestSwitchProvider_ConcurrencyGuardAndAudit(t *testing.T) {
wd, _ := os.Getwd()
src, err := os.ReadFile(filepath.Join(wd, "workspace_switch_provider.go"))
if err != nil {
t.Fatalf("read source: %v", err)
}
s := stripGoComments(src)
if !bytes.Contains(s, []byte("status <> $3")) || !bytes.Contains(s, []byte("IS NOT DISTINCT FROM $4")) {
t.Error("the provider-write UPDATE must be a CAS (status not already provisioning AND provider unchanged) to prevent a double-provision race")
}
if !bytes.Contains(s, []byte("RowsAffected")) || !bytes.Contains(s, []byte("ALREADY_SWITCHING")) {
t.Error("SwitchProvider must 409 ALREADY_SWITCHING when the CAS affects 0 rows (lost the race)")
}
if !bytes.Contains(s, []byte("cpStopWithRetryErr")) {
t.Error("SwitchProvider must use cpStopWithRetryErr to detect stop exhaustion")
}
if !bytes.Contains(s, []byte("emitSwitchProviderStopExhausted")) {
t.Error("SwitchProvider must emit an audit row with old instance/provider metadata on stop exhaustion")
}
}
// TestSwitchProvider_RejectsBadProvider: the allowlist check fires before any DB
// access, so a bad/missing provider is a clean 400 without touching the backend.
func TestSwitchProvider_RejectsBadProvider(t *testing.T) {
gin.SetMode(gin.TestMode)
h := &WorkspaceHandler{}
for _, tc := range []struct {
body string
want int
}{
{`{"provider":"azure"}`, http.StatusBadRequest},
{`{"provider":""}`, http.StatusBadRequest},
{`{"provider":"AWS-typo"}`, http.StatusBadRequest},
{`{}`, http.StatusBadRequest},
} {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-1/switch-provider", strings.NewReader(tc.body))
c.Request.Header.Set("Content-Type", "application/json")
h.SwitchProvider(c)
if w.Code != tc.want {
t.Errorf("body %s: got %d want %d (%s)", tc.body, w.Code, tc.want, w.Body.String())
}
}
}
// TestSwitchProvider_RouteRegistered pins the route wiring.
func TestSwitchProvider_RouteRegistered(t *testing.T) {
wd, _ := os.Getwd()
src, err := os.ReadFile(filepath.Join(wd, "..", "router", "router.go"))
if err != nil {
t.Fatalf("read router: %v", err)
}
if !bytes.Contains(src, []byte(`POST("/switch-provider", wh.SwitchProvider)`)) {
t.Fatal("router must register POST /switch-provider → wh.SwitchProvider")
}
}
@@ -229,6 +229,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Lifecycle
wsAuth.GET("/state", wh.State)
wsAuth.POST("/restart", wh.Restart)
wsAuth.POST("/switch-provider", wh.SwitchProvider)
wsAuth.POST("/pause", wh.Pause)
wsAuth.POST("/resume", wh.Resume)
// Manual hibernate (opt-in, #711) — stops the container and sets status