Merge pull request #2824 from Molecule-AI/fix/stop-workspace-auto-saas-1777945000

fix(provision): StopWorkspaceAuto mirror — close SaaS EC2-leak class
This commit is contained in:
Hongming Wang 2026-05-05 03:05:09 +00:00 committed by GitHub
commit 46d79a3e3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 288 additions and 37 deletions

View File

@ -11,7 +11,6 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"gopkg.in/yaml.v3"
@ -25,28 +24,21 @@ import (
// NULL auth_token — same drift class as the SaaS bug fixed in #2366.
type TeamHandler struct {
broadcaster *events.Broadcaster
// provisioner is interface-typed (#2369) for the same reason as
// WorkspaceHandler.provisioner — Stop is the only call site here
// and it's on the LocalProvisionerAPI surface, so widening is free
// and symmetric with WorkspaceHandler.
provisioner provisioner.LocalProvisionerAPI
wh *WorkspaceHandler
platformURL string
configsDir string
}
func NewTeamHandler(b *events.Broadcaster, p *provisioner.Provisioner, wh *WorkspaceHandler, platformURL, configsDir string) *TeamHandler {
h := &TeamHandler{
// NewTeamHandler constructs a TeamHandler. Backend selection (Docker vs
// CP) goes through h.wh.StopWorkspaceAuto + h.wh.provisionWorkspaceAuto;
// no per-handler provisioner field is needed here.
func NewTeamHandler(b *events.Broadcaster, wh *WorkspaceHandler, platformURL, configsDir string) *TeamHandler {
return &TeamHandler{
broadcaster: b,
wh: wh,
platformURL: platformURL,
configsDir: configsDir,
}
// Avoid the typed-nil interface trap (see NewWorkspaceHandler note).
if p != nil {
h.provisioner = p
}
return h
}
// Expand handles POST /workspaces/:id/expand
@ -203,9 +195,14 @@ func (h *TeamHandler) Collapse(c *gin.Context) {
continue
}
// Stop container if provisioner available
if h.provisioner != nil {
h.provisioner.Stop(ctx, childID)
// Stop the workload via the backend dispatcher (CP for SaaS,
// Docker for self-hosted). Pre-2026-05-05 this was
// `if h.provisioner != nil { h.provisioner.Stop(...) }`, which
// silently skipped on every SaaS tenant — child EC2s kept running
// after team-collapse until the orphan sweeper caught them
// (issue #2813).
if err := h.wh.StopWorkspaceAuto(ctx, childID); err != nil {
log.Printf("Team collapse: stop %s failed: %v — orphan sweeper will reconcile", childID, err)
}
// Mark as removed

View File

@ -34,7 +34,7 @@ func TestTeamCollapse_NoChildren(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewTeamHandler(broadcaster, nil, nil, "http://localhost:8080", "/tmp/configs")
handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs")
// No children
mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id").
@ -66,7 +66,7 @@ func TestTeamCollapse_WithChildren(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewTeamHandler(broadcaster, nil, nil, "http://localhost:8080", "/tmp/configs")
handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs")
// Two children
mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id").
@ -122,7 +122,7 @@ func TestTeamCollapse_WithChildren(t *testing.T) {
func TestTeamExpand_WorkspaceNotFound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTeamHandler(newTestBroadcaster(), nil, nil, "http://localhost:8080", "/tmp/configs")
handler := NewTeamHandler(newTestBroadcaster(), NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs")
mock.ExpectQuery("SELECT name, tier, status FROM workspaces WHERE id").
WithArgs("ws-missing").
@ -143,7 +143,7 @@ func TestTeamExpand_WorkspaceNotFound(t *testing.T) {
func TestTeamExpand_NoConfigFound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTeamHandler(newTestBroadcaster(), nil, nil, "http://localhost:8080", t.TempDir())
handler := NewTeamHandler(newTestBroadcaster(), NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", t.TempDir())
mock.ExpectQuery("SELECT name, tier, status FROM workspaces WHERE id").
WithArgs("ws-1").
@ -167,7 +167,7 @@ func TestTeamExpand_EmptySubWorkspaces(t *testing.T) {
setupTestRedis(t)
configDir := makeTeamConfigDir(t, "myagent", "name: MyAgent\nsub_workspaces: []\n")
handler := NewTeamHandler(newTestBroadcaster(), nil, nil, "http://localhost:8080", configDir)
handler := NewTeamHandler(newTestBroadcaster(), NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", configDir)
mock.ExpectQuery("SELECT name, tier, status FROM workspaces WHERE id").
WithArgs("ws-1").
@ -199,7 +199,7 @@ sub_workspaces:
role: code-reviewer
`
configDir := makeTeamConfigDir(t, "teamlead", yaml)
handler := NewTeamHandler(broadcaster, nil, nil, "http://localhost:8080", configDir)
handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", configDir)
mock.ExpectQuery("SELECT name, tier, status FROM workspaces WHERE id").
WithArgs("ws-lead").

View File

@ -170,6 +170,43 @@ func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath stri
return false
}
// StopWorkspaceAuto picks the backend (CP for SaaS, local Docker for
// self-hosted) and stops the workspace synchronously. Returns nil when
// neither backend is wired (a workspace nobody is running can't be
// stopped — that's a no-op, not an error).
//
// Single source of truth for "stop a workspace" — symmetric with
// provisionWorkspaceAuto. Pre-2026-05-05 the stop side had no Auto
// dispatcher and every caller wrote `if h.provisioner != nil { Stop }`,
// which silently leaked EC2s on SaaS:
// - team.go:208 (Collapse) — issue #2813
// - workspace_crud.go:432 (stopAndRemove during Delete) — issue #2814
//
// Both bugs reproduced for ~6 months. The pattern is the same drift
// class as the org-import provision bug closed by PR #2811.
//
// Why CP wins when both are wired (matching provisionWorkspaceAuto):
// production runs exactly one backend at a time — a SaaS tenant has
// cpProv set + provisioner nil; a self-hosted operator has provisioner
// set + cpProv nil. The "both set" case only arises in test fixtures,
// and the CP-wins ordering matches how Auto picks for provisioning so
// the test stubs stay on a single side.
//
// Volume cleanup (workspace_crud.go) stays Docker-only — CP-managed
// workspaces have no volumes to clean. Callers that need that extra
// step keep their `if h.provisioner != nil { RemoveVolume(...) }`
// gate AFTER calling StopWorkspaceAuto. The abstraction here is "stop
// the running workload," not "tear down all state."
func (h *WorkspaceHandler) StopWorkspaceAuto(ctx context.Context, workspaceID string) error {
if h.cpProv != nil {
return h.cpProv.Stop(ctx, workspaceID)
}
if h.provisioner != nil {
return h.provisioner.Stop(ctx, workspaceID)
}
return nil
}
// SetEnvMutators wires a provisionhook.Registry into the handler. Plugins
// living in separate repos register on the same Registry instance during
// boot (see cmd/server/main.go) and main.go calls this setter once before

View File

@ -420,22 +420,33 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
var stopErrs []error
stopAndRemove := func(wsID string) {
if h.provisioner == nil {
return
}
// Check Stop's error before attempting RemoveVolume — the
// previous code discarded it and immediately tried the
// volume remove, which always fails with "volume in use"
// when Stop didn't actually kill the container. The orphan
// sweeper (registry/orphan_sweeper.go) catches what we
// skip here on the next reconcile pass.
if err := h.provisioner.Stop(cleanupCtx, wsID); err != nil {
log.Printf("Delete %s container stop failed: %v — leaving volume for orphan sweeper", wsID, err)
// Stop the workload first via the backend dispatcher (CP for
// SaaS, Docker for self-hosted). Pre-2026-05-05 this gate was
// `if h.provisioner == nil { return }` — early-returning on
// every SaaS tenant left the EC2 running with no DB row to
// track it (issue #2814; the comment below claimed "loud-fail
// instead of silent-leak" but the early-return made it the
// silent path on SaaS).
//
// Check Stop's error before any volume cleanup — the previous
// code discarded it and immediately tried RemoveVolume, which
// always fails with "volume in use" when Stop didn't actually
// kill the container. The orphan sweeper
// (registry/orphan_sweeper.go) catches what we skip here on
// the next reconcile pass.
if err := h.StopWorkspaceAuto(cleanupCtx, wsID); err != nil {
log.Printf("Delete %s stop failed: %v — leaving cleanup for orphan sweeper", wsID, err)
stopErrs = append(stopErrs, fmt.Errorf("stop %s: %w", wsID, err))
return
}
if err := h.provisioner.RemoveVolume(cleanupCtx, wsID); err != nil {
log.Printf("Delete %s volume removal warning: %v", wsID, err)
// Volume cleanup is Docker-only — CP-managed workspaces have
// no host-bind volumes to remove. Skip silently when no Docker
// provisioner is wired (the SaaS path already terminated the
// EC2 above; nothing left to do).
if h.provisioner != nil {
if err := h.provisioner.RemoveVolume(cleanupCtx, wsID); err != nil {
log.Printf("Delete %s volume removal warning: %v", wsID, err)
}
}
}

View File

@ -41,7 +41,9 @@ import (
type trackingCPProv struct {
mu sync.Mutex
started []string
stopped []string
startErr error
stopErr error
}
func (r *trackingCPProv) Start(_ context.Context, cfg provisioner.WorkspaceConfig) (string, error) {
@ -53,12 +55,25 @@ func (r *trackingCPProv) Start(_ context.Context, cfg provisioner.WorkspaceConfi
}
return "i-stub-" + cfg.WorkspaceID, nil
}
func (r *trackingCPProv) Stop(_ context.Context, _ string) error { return nil }
func (r *trackingCPProv) Stop(_ context.Context, workspaceID string) error {
r.mu.Lock()
r.stopped = append(r.stopped, workspaceID)
r.mu.Unlock()
return r.stopErr
}
func (r *trackingCPProv) GetConsoleOutput(_ context.Context, _ string) (string, error) {
return "", nil
}
func (r *trackingCPProv) IsRunning(_ context.Context, _ string) (bool, error) { return true, nil }
func (r *trackingCPProv) stoppedSnapshot() []string {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]string, len(r.stopped))
copy(out, r.stopped)
return out
}
func (r *trackingCPProv) startedSnapshot() []string {
r.mu.Lock()
defer r.mu.Unlock()
@ -432,3 +447,194 @@ func TestOrgImportGate_UsesHasProvisionerNotBareField(t *testing.T) {
t.Errorf("org_import.go must call h.workspace.HasProvisioner() in the provisioning gate — current code does not")
}
}
// TestStopWorkspaceAuto_RoutesToCPWhenSet — symmetric with the
// provision dispatcher test above. SaaS tenants run with cpProv set
// and the local Docker provisioner nil; Auto must route Stop to CP
// (= terminate the EC2). Pre-2026-05-05 the absence of this dispatcher
// meant team-collapse + workspace-delete called h.provisioner.Stop
// directly, no-oping on every SaaS tenant — issue #2813 (collapse) and
// #2814 (delete) both leak EC2s for ~6 months.
func TestStopWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
rec := &trackingCPProv{}
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
h.SetCPProvisioner(rec)
wsID := "ws-stop-routes-cp"
if err := h.StopWorkspaceAuto(context.Background(), wsID); err != nil {
t.Fatalf("StopWorkspaceAuto returned err with CP wired: %v", err)
}
got := rec.stoppedSnapshot()
if len(got) != 1 || got[0] != wsID {
t.Errorf("expected cpProv.Stop invoked once with %q, got %v", wsID, got)
}
}
// TestStopWorkspaceAuto_RoutesToDockerWhenOnlyDocker — self-hosted
// operators run with the local Docker provisioner wired and cpProv nil.
// Auto must route to Docker.
//
// Stub-injects a LocalProvisionerAPI via a private constructor pattern
// so we don't need a real Docker daemon. NewWorkspaceHandler's
// constructor takes *provisioner.Provisioner (concrete) so we set the
// interface field directly.
func TestStopWorkspaceAuto_RoutesToDockerWhenOnlyDocker(t *testing.T) {
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
stub := &stoppingLocalProv{}
h.provisioner = stub
wsID := "ws-stop-routes-docker"
if err := h.StopWorkspaceAuto(context.Background(), wsID); err != nil {
t.Fatalf("StopWorkspaceAuto returned err with Docker wired: %v", err)
}
if len(stub.stopped) != 1 || stub.stopped[0] != wsID {
t.Errorf("expected Docker provisioner.Stop invoked once with %q, got %v", wsID, stub.stopped)
}
}
// TestStopWorkspaceAuto_NoBackendIsNoOp — when neither backend is wired
// (misconfigured deployment, or test fixture), StopWorkspaceAuto returns
// nil silently. Distinct from provisionWorkspaceAuto's mark-failed
// behavior: there's no row state to mark "failed to stop" against, and
// the absence of a backend means nothing was running to stop.
func TestStopWorkspaceAuto_NoBackendIsNoOp(t *testing.T) {
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
// Neither SetCPProvisioner nor a Docker provisioner — both nil.
if err := h.StopWorkspaceAuto(context.Background(), "ws-noback"); err != nil {
t.Errorf("expected nil error on no-backend stop, got %v", err)
}
}
// stoppingLocalProv is a minimal LocalProvisionerAPI stub that records
// Stop invocations. Other methods panic — guards against accidental
// use by tests that should be using a different stub.
type stoppingLocalProv struct {
stopped []string
}
func (s *stoppingLocalProv) Stop(_ context.Context, workspaceID string) error {
s.stopped = append(s.stopped, workspaceID)
return nil
}
func (s *stoppingLocalProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) {
panic("stoppingLocalProv: Start not implemented for this test")
}
func (s *stoppingLocalProv) IsRunning(_ context.Context, _ string) (bool, error) {
panic("stoppingLocalProv: IsRunning not implemented for this test")
}
func (s *stoppingLocalProv) ExecRead(_ context.Context, _, _ string) ([]byte, error) {
panic("stoppingLocalProv: ExecRead not implemented for this test")
}
func (s *stoppingLocalProv) RemoveVolume(_ context.Context, _ string) error {
panic("stoppingLocalProv: RemoveVolume not implemented for this test")
}
func (s *stoppingLocalProv) VolumeHasFile(_ context.Context, _, _ string) (bool, error) {
panic("stoppingLocalProv: VolumeHasFile not implemented for this test")
}
func (s *stoppingLocalProv) WriteAuthTokenToVolume(_ context.Context, _, _ string) error {
panic("stoppingLocalProv: WriteAuthTokenToVolume not implemented for this test")
}
// TestNoCallSiteCallsBareStop — source-level pin against the bug
// pattern that motivated this PR. Any non-test handler that wants to
// "stop the workload" must go through h.X.StopWorkspaceAuto, not bare
// h.X.provisioner.Stop / h.X.cpProv.Stop / h.X.Stop. Pre-2026-05-05
// team.go and workspace_crud.go both called h.provisioner.Stop directly
// inside `if h.provisioner != nil { ... }` gates — silent no-op on
// SaaS, EC2 leak (#2813, #2814).
//
// Allowed exceptions:
// - workspace.go: defines StopWorkspaceAuto (the dispatcher itself).
// - workspace_provision.go: defines per-backend Start/Stop bodies.
// - workspace_restart.go: pre-dates the dispatchers and uses manual
// if-cpProv-else dispatch with retry semantics tuned for the
// restart hot path. Functionally equivalent + wraps cpStopWithRetry,
// so it's not the bug class this gate targets — but it IS
// architectural duplication, tracked under #2799.
// - container_files.go: drives Docker daemon directly for file-copy
// short-lived containers; no workspace-level Stop semantics.
func TestNoCallSiteCallsBareStop(t *testing.T) {
wd, err := os.Getwd()
if err != nil {
t.Fatalf("getwd: %v", err)
}
entries, err := os.ReadDir(wd)
if err != nil {
t.Fatalf("readdir: %v", err)
}
bareShapes := []string{
".provisioner.Stop(",
".cpProv.Stop(",
}
allowedFiles := map[string]bool{
"workspace.go": true,
"workspace_provision.go": true,
"workspace_restart.go": true,
"container_files.go": true,
}
for _, entry := range entries {
name := entry.Name()
if filepath.Ext(name) != ".go" {
continue
}
if len(name) > len("_test.go") &&
name[len(name)-len("_test.go"):] == "_test.go" {
continue
}
if allowedFiles[name] {
continue
}
src, err := os.ReadFile(filepath.Join(wd, name))
if err != nil {
t.Fatalf("read %s: %v", name, err)
}
// Strip line + block comments before substring check — the gate
// targets call expressions in real code, not historical
// references in documentation/comments. Without this, comments
// describing the old buggy shape (kept on purpose for
// archaeology) trip the test.
stripped := stripGoComments(src)
for _, needle := range bareShapes {
if bytes.Contains(stripped, []byte(needle)) {
t.Errorf("%s contains bare `%s` — must go through h.X.StopWorkspaceAuto so SaaS tenants route to CP. "+
"Pre-2026-05-05 team.go and workspace_crud.go did this and silently leaked EC2s on every SaaS collapse / delete (#2813, #2814).", name, needle)
}
}
}
}
// stripGoComments removes // line comments and /* */ block comments
// from Go source. Imperfect (doesn't handle comments-inside-strings)
// but adequate for the source-level pin tests in this file — none of
// our gated needles legitimately appear inside string literals in the
// handlers package.
func stripGoComments(src []byte) []byte {
out := make([]byte, 0, len(src))
for i := 0; i < len(src); i++ {
// Block comment
if i+1 < len(src) && src[i] == '/' && src[i+1] == '*' {
i += 2
for i+1 < len(src) && !(src[i] == '*' && src[i+1] == '/') {
i++
}
i++ // skip closing /
continue
}
// Line comment — preserve the newline so line counts stay sane
if i+1 < len(src) && src[i] == '/' && src[i+1] == '/' {
for i < len(src) && src[i] != '\n' {
i++
}
if i < len(src) {
out = append(out, '\n')
}
continue
}
out = append(out, src[i])
}
return out
}

View File

@ -230,7 +230,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
r.GET("/approvals/pending", middleware.AdminAuth(db.DB), apph.ListAll)
// Team Expansion
teamh := handlers.NewTeamHandler(broadcaster, prov, wh, platformURL, configsDir)
teamh := handlers.NewTeamHandler(broadcaster, wh, platformURL, configsDir)
wsAuth.POST("/expand", teamh.Expand)
wsAuth.POST("/collapse", teamh.Collapse)