diff --git a/workspace-server/internal/provisioner/cp_provisioner.go b/workspace-server/internal/provisioner/cp_provisioner.go index 0c0e6c9c..6f6ae58d 100644 --- a/workspace-server/internal/provisioner/cp_provisioner.go +++ b/workspace-server/internal/provisioner/cp_provisioner.go @@ -3,6 +3,7 @@ package provisioner import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" "io" @@ -10,6 +11,8 @@ import ( "net/http" "os" "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" ) // CPProvisioner provisions workspace agents by calling the control plane's @@ -182,8 +185,26 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, } // Stop terminates the workspace's EC2 instance via the control plane. +// +// Looks up the actual EC2 instance_id from the workspaces table before +// calling CP — earlier versions passed workspaceID (a UUID) as the +// instance_id query param, which CP forwarded to EC2 TerminateInstances, +// which rejected with InvalidInstanceID.Malformed (EC2 IDs are i-… not +// UUIDs). The terminate failure then left the workspace's SG attached, +// blocking the next provision with InvalidGroup.Duplicate — a full +// "Save & Restart" crash on SaaS. func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error { - url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, workspaceID) + instanceID, err := resolveInstanceID(ctx, workspaceID) + if err != nil { + return fmt.Errorf("cp provisioner: stop: resolve instance_id: %w", err) + } + if instanceID == "" { + // No instance was ever provisioned (or already deprovisioned and + // the column was cleared). Nothing to terminate — idempotent. + log.Printf("CP provisioner: Stop for %s — no instance_id on file, nothing to do", workspaceID) + return nil + } + url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, instanceID) req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil) p.provisionAuthHeaders(req) resp, err := p.httpClient.Do(req) @@ -194,6 +215,35 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error { return nil } +// resolveInstanceID reads workspaces.instance_id for the given workspace. +// Returns ("", nil) when the row exists but has no instance_id recorded +// (edge case for external workspaces or stale rows). Returns an error +// only on real DB failures, not on missing rows — callers (Stop, +// IsRunning) treat the empty string as "nothing to act on." +// +// Exposed as a package var so tests can substitute a stub without +// standing up a sqlmock just to unblock the Stop/IsRunning code path. +// Production code never reassigns it. +var resolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) { + if db.DB == nil { + // Defensive: NewCPProvisioner never runs without db.DB being + // set in main(). If somehow nil, treat as "no instance" rather + // than panicking in the Stop/IsRunning path. + return "", nil + } + var instanceID sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&instanceID) + if err != nil && err != sql.ErrNoRows { + return "", err + } + if !instanceID.Valid { + return "", nil + } + return instanceID.String, nil +} + // IsRunning checks workspace EC2 instance state via the control plane. // // Contract (matches the Docker Provisioner.IsRunning contract — @@ -219,7 +269,18 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error { // Both callers are happy with (true, err); callers that need the // previous (false, err) shape must inspect err themselves. func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) { - url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, workspaceID) + instanceID, err := resolveInstanceID(ctx, workspaceID) + if err != nil { + // Treat DB errors the same as transport errors — (true, err) keeps + // a2a_proxy on the alive path and logs the signal. + return true, fmt.Errorf("cp provisioner: status: resolve instance_id: %w", err) + } + if instanceID == "" { + // No instance recorded. Report "not running" cleanly (no error) + // so restart cascades can trigger a fresh provision. + return false, nil + } + url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, instanceID) req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) p.provisionAuthHeaders(req) resp, err := p.httpClient.Do(req) diff --git a/workspace-server/internal/provisioner/cp_provisioner_test.go b/workspace-server/internal/provisioner/cp_provisioner_test.go index 247863e3..c8553adf 100644 --- a/workspace-server/internal/provisioner/cp_provisioner_test.go +++ b/workspace-server/internal/provisioner/cp_provisioner_test.go @@ -11,6 +11,23 @@ import ( "time" ) +// primeInstanceIDLookup swaps resolveInstanceID for a stub that returns +// the mapped instance_id for the given workspace_id, or "" for anything +// not in the map. Cheaper than standing up a sqlmock since Stop/IsRunning +// tests mostly don't care about the SQL path — they're testing the CP +// HTTP interaction downstream of the lookup. +func primeInstanceIDLookup(t *testing.T, pairs map[string]string) { + t.Helper() + prev := resolveInstanceID + resolveInstanceID = func(_ context.Context, wsID string) (string, error) { + if id, ok := pairs[wsID]; ok { + return id, nil + } + return "", nil + } + t.Cleanup(func() { resolveInstanceID = prev }) +} + // TestNewCPProvisioner_RequiresOrgID — self-hosted deployments don't // have a MOLECULE_ORG_ID, and the provisioner must refuse to construct // rather than silently phone home to the prod CP with an empty tenant. @@ -267,6 +284,12 @@ func TestStart_TransportFailureSurfaces(t *testing.T) { // platform-wide shared secret AND the per-tenant admin token, or the // CP will 401. func TestStop_SendsBothAuthHeaders(t *testing.T) { + // resolveInstanceID looks up the real EC2 id from the workspaces + // table; previously this test passed when the tenant buggily + // reused workspaceID AS instance_id. Now we assert the correct + // EC2 id round-trips. + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-abc123"}) + var sawBearer, sawAdminToken, sawMethod, sawPath string var sawInstance string srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -295,8 +318,8 @@ func TestStop_SendsBothAuthHeaders(t *testing.T) { if sawPath != "/cp/workspaces/ws-1" { t.Errorf("path = %q, want /cp/workspaces/ws-1", sawPath) } - if sawInstance != "ws-1" { - t.Errorf("instance_id query = %q, want ws-1", sawInstance) + if sawInstance != "i-abc123" { + t.Errorf("instance_id query = %q, want i-abc123 (from DB lookup, NOT the workspace UUID)", sawInstance) } if sawBearer != "Bearer s3cret" { t.Errorf("bearer = %q, want Bearer s3cret", sawBearer) @@ -310,6 +333,7 @@ func TestStop_SendsBothAuthHeaders(t *testing.T) { // teardown call hits a dead CP, the error must surface so the caller // knows the workspace might still be running and needs retry. func TestStop_TransportErrorSurfaces(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-abc123"}) p := &CPProvisioner{ baseURL: "http://127.0.0.1:1", orgID: "org-1", @@ -327,6 +351,7 @@ func TestStop_TransportErrorSurfaces(t *testing.T) { // TestIsRunning_ParsesStateField — CP returns the EC2 state, we expose // a bool ("running"/"pending"/"terminated" → true only for "running"). func TestIsRunning_ParsesStateField(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-parsesstatefield"}) cases := map[string]bool{ "running": true, "pending": false, @@ -364,6 +389,7 @@ func TestIsRunning_ParsesStateField(t *testing.T) { // require the same per-tenant auth because they leak public_ip + // private_ip to the caller. func TestIsRunning_SendsBothAuthHeaders(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-auth-headers"}) var sawBearer, sawAdminToken string srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { sawBearer = r.Header.Get("Authorization") @@ -398,6 +424,7 @@ func TestIsRunning_SendsBothAuthHeaders(t *testing.T) { // The sweeper (healthsweep.go) inspects err independently and skips // on any error, so (true, err) is equally safe for that caller. func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-transport"}) p := &CPProvisioner{ baseURL: "http://127.0.0.1:1", orgID: "org-1", @@ -418,6 +445,7 @@ func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) { // the sweeper would see the workspace as not-running. Now every // non-2xx is an error the caller can log + retry. func TestIsRunning_Non2xxSurfacesError(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-non2xx"}) cases := []struct { name string status int @@ -459,6 +487,7 @@ func TestIsRunning_Non2xxSurfacesError(t *testing.T) { // a middleware glitch (HTML error page with 200) from looking like // "workspace stopped". func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-malformed"}) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) _, _ = io.WriteString(w, "maintenance mode") @@ -486,6 +515,7 @@ func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) { // the documented contract values rather than simulating the whole // a2a_proxy flow. func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-contract"}) // Simulate every error path and assert running==true for each. t.Run("transport error", func(t *testing.T) { p := &CPProvisioner{ @@ -559,6 +589,7 @@ func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) { // only needs the prefix to produce a value, so the decode succeeds — // and the LimitReader enforces the cap regardless. func TestIsRunning_BoundedBodyRead(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-bounded"}) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(200) // Valid JSON prefix, then pad well past the 64 KiB cap. @@ -628,3 +659,67 @@ func TestGetConsoleOutput_UsesAdminBearer(t *testing.T) { t.Errorf("bearer = %q, want Bearer admin-api-key (NOT the provision secret)", sawBearer) } } + +// TestStop_EmptyInstanceIDIsNoop — when workspaces.instance_id is NULL +// (e.g. a row that was never provisioned, or deprovisioned and cleared), +// Stop should be a no-op instead of sending a malformed CP request. +// Regression guard: previously Stop sent workspaceID as instance_id +// even when no EC2 had been booked, causing CP → EC2 to 400. +func TestStop_EmptyInstanceIDIsNoop(t *testing.T) { + // Empty map → lookup returns ("", nil) for any workspace. + primeInstanceIDLookup(t, map[string]string{}) + + hit := false + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hit = true + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()} + if err := p.Stop(context.Background(), "ws-ghost"); err != nil { + t.Fatalf("Stop with empty instance_id should no-op, got err %v", err) + } + if hit { + t.Errorf("Stop contacted CP even though instance_id was empty") + } +} + +// TestIsRunning_UsesDBInstanceID — IsRunning must also look up +// instance_id from the workspaces table, same as Stop. Mirror of +// TestStop_SendsBothAuthHeaders. +func TestIsRunning_UsesDBInstanceID(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{"ws-1": "i-xyz789"}) + + var sawInstance string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sawInstance = r.URL.Query().Get("instance_id") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, `{"state":"running"}`) + })) + defer srv.Close() + + p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()} + if _, err := p.IsRunning(context.Background(), "ws-1"); err != nil { + t.Fatalf("IsRunning: %v", err) + } + if sawInstance != "i-xyz789" { + t.Errorf("instance_id query = %q, want i-xyz789 (from DB lookup)", sawInstance) + } +} + +// TestIsRunning_EmptyInstanceIDReturnsFalse — IsRunning on a +// workspace with no recorded EC2 instance must report (false, nil) so +// restart cascades re-provision fresh instead of looping on a stale +// row with no backing instance. +func TestIsRunning_EmptyInstanceIDReturnsFalse(t *testing.T) { + primeInstanceIDLookup(t, map[string]string{}) + p := &CPProvisioner{baseURL: "http://unused", orgID: "org-1"} + running, err := p.IsRunning(context.Background(), "ws-ghost") + if err != nil { + t.Errorf("IsRunning with empty instance_id should return (false, nil), got err %v", err) + } + if running { + t.Errorf("IsRunning with empty instance_id should return running=false, got true") + } +}