forked from molecule-ai/molecule-core
fix(cp-provisioner): look up real EC2 instance_id for Stop + IsRunning (#1738)
Resolves a "Save & Restart cascade" failure on SaaS tenants. Observed
2026-04-22 on hongmingwang workspace a8af9d79 after a Config-tab save:
03:13:20 workspace deprovision: TerminateInstances
InvalidInstanceID.Malformed: a8af9d79-... is malformed
03:13:21 workspace provision: CreateSecurityGroup
InvalidGroup.Duplicate: workspace-a8af9d79-394 already
exists for VPC vpc-09f85513b85d7acee
Root cause: CPProvisioner.Stop and IsRunning passed the workspace UUID
as the `instance_id` query param to CP. CP forwarded it to EC2
TerminateInstances, which rejected it (EC2 ids are i-…, not UUIDs).
The failed terminate left the workspace's SG attached → the immediate
re-provision hit InvalidGroup.Duplicate → user saw `provisioning
failed`.
Fix: both methods now call a new `resolveInstanceID` that reads
`workspaces.instance_id` from the tenant DB and passes the real EC2
id downstream. When no row / no instance_id exists, Stop is a no-op
and IsRunning returns (false, nil) so restart cascades can freshly
re-provision.
resolveInstanceID is exposed as a `var` package-level func so tests
can swap it for a pairs-map stub without standing up sqlmock — the
per-table DB scaffolding was a heavier price than the surface
warranted given these tests are about the CP HTTP flow downstream
of the lookup, not the lookup SQL itself.
Adds regression tests:
- TestStop_EmptyInstanceIDIsNoop: no DB row → no CP call
- TestIsRunning_UsesDBInstanceID: DB id round-trips to CP
- TestIsRunning_EmptyInstanceIDReturnsFalse: no instance → false/nil
Updates existing tests to assert the resolved instance_id (i-abc123
variants) instead of the previous buggy workspaceID.
After this lands, user's existing workspaces with stale instance_id
bindings still need a manual cleanup of the orphaned EC2 + SG (done
for a8af9d79 today). Future restarts use the correct id.
Co-authored-by: Hongming Wang <hongmingwang.rabbit@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
925a71887d
commit
c23ff848aa
@ -3,6 +3,7 @@ package provisioner
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -10,6 +11,8 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// CPProvisioner provisions workspace agents by calling the control plane's
|
||||
@ -182,8 +185,26 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
|
||||
}
|
||||
|
||||
// Stop terminates the workspace's EC2 instance via the control plane.
|
||||
//
|
||||
// Looks up the actual EC2 instance_id from the workspaces table before
|
||||
// calling CP — earlier versions passed workspaceID (a UUID) as the
|
||||
// instance_id query param, which CP forwarded to EC2 TerminateInstances,
|
||||
// which rejected with InvalidInstanceID.Malformed (EC2 IDs are i-… not
|
||||
// UUIDs). The terminate failure then left the workspace's SG attached,
|
||||
// blocking the next provision with InvalidGroup.Duplicate — a full
|
||||
// "Save & Restart" crash on SaaS.
|
||||
func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, workspaceID)
|
||||
instanceID, err := resolveInstanceID(ctx, workspaceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cp provisioner: stop: resolve instance_id: %w", err)
|
||||
}
|
||||
if instanceID == "" {
|
||||
// No instance was ever provisioned (or already deprovisioned and
|
||||
// the column was cleared). Nothing to terminate — idempotent.
|
||||
log.Printf("CP provisioner: Stop for %s — no instance_id on file, nothing to do", workspaceID)
|
||||
return nil
|
||||
}
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, instanceID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil)
|
||||
p.provisionAuthHeaders(req)
|
||||
resp, err := p.httpClient.Do(req)
|
||||
@ -194,6 +215,35 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// resolveInstanceID reads workspaces.instance_id for the given workspace.
|
||||
// Returns ("", nil) when the row exists but has no instance_id recorded
|
||||
// (edge case for external workspaces or stale rows). Returns an error
|
||||
// only on real DB failures, not on missing rows — callers (Stop,
|
||||
// IsRunning) treat the empty string as "nothing to act on."
|
||||
//
|
||||
// Exposed as a package var so tests can substitute a stub without
|
||||
// standing up a sqlmock just to unblock the Stop/IsRunning code path.
|
||||
// Production code never reassigns it.
|
||||
var resolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) {
|
||||
if db.DB == nil {
|
||||
// Defensive: NewCPProvisioner never runs without db.DB being
|
||||
// set in main(). If somehow nil, treat as "no instance" rather
|
||||
// than panicking in the Stop/IsRunning path.
|
||||
return "", nil
|
||||
}
|
||||
var instanceID sql.NullString
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&instanceID)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return "", err
|
||||
}
|
||||
if !instanceID.Valid {
|
||||
return "", nil
|
||||
}
|
||||
return instanceID.String, nil
|
||||
}
|
||||
|
||||
// IsRunning checks workspace EC2 instance state via the control plane.
|
||||
//
|
||||
// Contract (matches the Docker Provisioner.IsRunning contract —
|
||||
@ -219,7 +269,18 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
|
||||
// Both callers are happy with (true, err); callers that need the
|
||||
// previous (false, err) shape must inspect err themselves.
|
||||
func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, workspaceID)
|
||||
instanceID, err := resolveInstanceID(ctx, workspaceID)
|
||||
if err != nil {
|
||||
// Treat DB errors the same as transport errors — (true, err) keeps
|
||||
// a2a_proxy on the alive path and logs the signal.
|
||||
return true, fmt.Errorf("cp provisioner: status: resolve instance_id: %w", err)
|
||||
}
|
||||
if instanceID == "" {
|
||||
// No instance recorded. Report "not running" cleanly (no error)
|
||||
// so restart cascades can trigger a fresh provision.
|
||||
return false, nil
|
||||
}
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, instanceID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
p.provisionAuthHeaders(req)
|
||||
resp, err := p.httpClient.Do(req)
|
||||
|
||||
@ -11,6 +11,23 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// primeInstanceIDLookup swaps resolveInstanceID for a stub that returns
|
||||
// the mapped instance_id for the given workspace_id, or "" for anything
|
||||
// not in the map. Cheaper than standing up a sqlmock since Stop/IsRunning
|
||||
// tests mostly don't care about the SQL path — they're testing the CP
|
||||
// HTTP interaction downstream of the lookup.
|
||||
func primeInstanceIDLookup(t *testing.T, pairs map[string]string) {
|
||||
t.Helper()
|
||||
prev := resolveInstanceID
|
||||
resolveInstanceID = func(_ context.Context, wsID string) (string, error) {
|
||||
if id, ok := pairs[wsID]; ok {
|
||||
return id, nil
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
t.Cleanup(func() { resolveInstanceID = prev })
|
||||
}
|
||||
|
||||
// TestNewCPProvisioner_RequiresOrgID — self-hosted deployments don't
|
||||
// have a MOLECULE_ORG_ID, and the provisioner must refuse to construct
|
||||
// rather than silently phone home to the prod CP with an empty tenant.
|
||||
@ -267,6 +284,12 @@ func TestStart_TransportFailureSurfaces(t *testing.T) {
|
||||
// platform-wide shared secret AND the per-tenant admin token, or the
|
||||
// CP will 401.
|
||||
func TestStop_SendsBothAuthHeaders(t *testing.T) {
|
||||
// resolveInstanceID looks up the real EC2 id from the workspaces
|
||||
// table; previously this test passed when the tenant buggily
|
||||
// reused workspaceID AS instance_id. Now we assert the correct
|
||||
// EC2 id round-trips.
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-abc123"})
|
||||
|
||||
var sawBearer, sawAdminToken, sawMethod, sawPath string
|
||||
var sawInstance string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -295,8 +318,8 @@ func TestStop_SendsBothAuthHeaders(t *testing.T) {
|
||||
if sawPath != "/cp/workspaces/ws-1" {
|
||||
t.Errorf("path = %q, want /cp/workspaces/ws-1", sawPath)
|
||||
}
|
||||
if sawInstance != "ws-1" {
|
||||
t.Errorf("instance_id query = %q, want ws-1", sawInstance)
|
||||
if sawInstance != "i-abc123" {
|
||||
t.Errorf("instance_id query = %q, want i-abc123 (from DB lookup, NOT the workspace UUID)", sawInstance)
|
||||
}
|
||||
if sawBearer != "Bearer s3cret" {
|
||||
t.Errorf("bearer = %q, want Bearer s3cret", sawBearer)
|
||||
@ -310,6 +333,7 @@ func TestStop_SendsBothAuthHeaders(t *testing.T) {
|
||||
// teardown call hits a dead CP, the error must surface so the caller
|
||||
// knows the workspace might still be running and needs retry.
|
||||
func TestStop_TransportErrorSurfaces(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-abc123"})
|
||||
p := &CPProvisioner{
|
||||
baseURL: "http://127.0.0.1:1",
|
||||
orgID: "org-1",
|
||||
@ -327,6 +351,7 @@ func TestStop_TransportErrorSurfaces(t *testing.T) {
|
||||
// TestIsRunning_ParsesStateField — CP returns the EC2 state, we expose
|
||||
// a bool ("running"/"pending"/"terminated" → true only for "running").
|
||||
func TestIsRunning_ParsesStateField(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-parsesstatefield"})
|
||||
cases := map[string]bool{
|
||||
"running": true,
|
||||
"pending": false,
|
||||
@ -364,6 +389,7 @@ func TestIsRunning_ParsesStateField(t *testing.T) {
|
||||
// require the same per-tenant auth because they leak public_ip +
|
||||
// private_ip to the caller.
|
||||
func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-auth-headers"})
|
||||
var sawBearer, sawAdminToken string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
sawBearer = r.Header.Get("Authorization")
|
||||
@ -398,6 +424,7 @@ func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
|
||||
// The sweeper (healthsweep.go) inspects err independently and skips
|
||||
// on any error, so (true, err) is equally safe for that caller.
|
||||
func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-transport"})
|
||||
p := &CPProvisioner{
|
||||
baseURL: "http://127.0.0.1:1",
|
||||
orgID: "org-1",
|
||||
@ -418,6 +445,7 @@ func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) {
|
||||
// the sweeper would see the workspace as not-running. Now every
|
||||
// non-2xx is an error the caller can log + retry.
|
||||
func TestIsRunning_Non2xxSurfacesError(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-non2xx"})
|
||||
cases := []struct {
|
||||
name string
|
||||
status int
|
||||
@ -459,6 +487,7 @@ func TestIsRunning_Non2xxSurfacesError(t *testing.T) {
|
||||
// a middleware glitch (HTML error page with 200) from looking like
|
||||
// "workspace stopped".
|
||||
func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-malformed"})
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
_, _ = io.WriteString(w, "<html>maintenance mode</html>")
|
||||
@ -486,6 +515,7 @@ func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) {
|
||||
// the documented contract values rather than simulating the whole
|
||||
// a2a_proxy flow.
|
||||
func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-contract"})
|
||||
// Simulate every error path and assert running==true for each.
|
||||
t.Run("transport error", func(t *testing.T) {
|
||||
p := &CPProvisioner{
|
||||
@ -559,6 +589,7 @@ func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) {
|
||||
// only needs the prefix to produce a value, so the decode succeeds —
|
||||
// and the LimitReader enforces the cap regardless.
|
||||
func TestIsRunning_BoundedBodyRead(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-bounded"})
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
// Valid JSON prefix, then pad well past the 64 KiB cap.
|
||||
@ -628,3 +659,67 @@ func TestGetConsoleOutput_UsesAdminBearer(t *testing.T) {
|
||||
t.Errorf("bearer = %q, want Bearer admin-api-key (NOT the provision secret)", sawBearer)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStop_EmptyInstanceIDIsNoop — when workspaces.instance_id is NULL
|
||||
// (e.g. a row that was never provisioned, or deprovisioned and cleared),
|
||||
// Stop should be a no-op instead of sending a malformed CP request.
|
||||
// Regression guard: previously Stop sent workspaceID as instance_id
|
||||
// even when no EC2 had been booked, causing CP → EC2 to 400.
|
||||
func TestStop_EmptyInstanceIDIsNoop(t *testing.T) {
|
||||
// Empty map → lookup returns ("", nil) for any workspace.
|
||||
primeInstanceIDLookup(t, map[string]string{})
|
||||
|
||||
hit := false
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hit = true
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
|
||||
if err := p.Stop(context.Background(), "ws-ghost"); err != nil {
|
||||
t.Fatalf("Stop with empty instance_id should no-op, got err %v", err)
|
||||
}
|
||||
if hit {
|
||||
t.Errorf("Stop contacted CP even though instance_id was empty")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsRunning_UsesDBInstanceID — IsRunning must also look up
|
||||
// instance_id from the workspaces table, same as Stop. Mirror of
|
||||
// TestStop_SendsBothAuthHeaders.
|
||||
func TestIsRunning_UsesDBInstanceID(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-xyz789"})
|
||||
|
||||
var sawInstance string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
sawInstance = r.URL.Query().Get("instance_id")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = io.WriteString(w, `{"state":"running"}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
|
||||
if _, err := p.IsRunning(context.Background(), "ws-1"); err != nil {
|
||||
t.Fatalf("IsRunning: %v", err)
|
||||
}
|
||||
if sawInstance != "i-xyz789" {
|
||||
t.Errorf("instance_id query = %q, want i-xyz789 (from DB lookup)", sawInstance)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsRunning_EmptyInstanceIDReturnsFalse — IsRunning on a
|
||||
// workspace with no recorded EC2 instance must report (false, nil) so
|
||||
// restart cascades re-provision fresh instead of looping on a stale
|
||||
// row with no backing instance.
|
||||
func TestIsRunning_EmptyInstanceIDReturnsFalse(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{})
|
||||
p := &CPProvisioner{baseURL: "http://unused", orgID: "org-1"}
|
||||
running, err := p.IsRunning(context.Background(), "ws-ghost")
|
||||
if err != nil {
|
||||
t.Errorf("IsRunning with empty instance_id should return (false, nil), got err %v", err)
|
||||
}
|
||||
if running {
|
||||
t.Errorf("IsRunning with empty instance_id should return running=false, got true")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user