From 12c49183183e3b8b81b5cc86e49e7fba59caa63f Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sat, 25 Apr 2026 12:36:22 -0700 Subject: [PATCH] fix(platform): stop leaking workspace containers on delete MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Symptom: deleting workspaces from the canvas marked DB rows status='removed' but left Docker containers running indefinitely. After a session of org imports + cancellations, we counted 10 running ws-* containers all backed by 'removed' DB rows, eating ~1100% CPU on the Docker VM. Two compounding bugs in handlers/workspace_crud.go's delete cascade: 1. The cleanup loop used `c.Request.Context()` for the Docker stop/remove calls. When the canvas's `api.del` resolved on the platform's 200, gin cancelled the request ctx — and any in-flight Docker call cancelled with `context canceled`, leaving the container alive. Old logs: "Delete descendant volume removal warning: ... context canceled" 2. `provisioner.Stop`'s error return was discarded and `RemoveVolume` ran unconditionally afterward. When Stop didn't actually kill the container (transient daemon error, ctx cancellation as in #1), the volume removal would predictably fail with "volume in use" and the container kept running with the volume mounted. Old logs: "Delete descendant volume removal warning: Error response from daemon: remove ... volume is in use" Fix layered in two parts: - workspace_crud.go: detach cleanup with `context.WithoutCancel(ctx)` + a 30s bounded timeout. Stop's error is now checked and on failure we skip RemoveVolume entirely (the orphan sweeper below catches what we deferred). - New registry/orphan_sweeper.go: periodic reconcile pass (every 60s, initial run on boot). Lists running ws-* containers via Docker name filter, intersects with DB rows where status='removed', stops + removes volumes for the leaks. Defence in depth — even a brand-new Stop failure mode heals on the next sweep instead of leaking forever. Provisioner gains a tiny ListWorkspaceContainerIDPrefixes helper that wraps ContainerList with the `name=ws-` filter; the sweeper takes an OrphanReaper interface (matches the ContainerChecker pattern in healthsweep.go) so unit tests don't need a real Docker daemon. main.go wires the sweeper alongside the existing liveness + health-sweep + provisioning-timeout monitors, all under supervised.RunWithRecover so a panic restarts the goroutine. 6 new sweeper tests cover the reconcile path, the no-running-containers short-circuit, the daemon-error skip, the Stop-failure-leaves-volume invariant (the same trap that motivated this fix), the volume-remove-error-is-non-fatal continuation, and the nil-reaper no-op. Verified: full Go test suite passes; manually purged the 10 leaked containers + their orphan volumes from the dev host with `docker rm -f` + `docker volume rm` (one-off cleanup; the sweeper would have caught them on the next cycle once deployed). Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace-server/cmd/server/main.go | 12 ++ .../internal/handlers/workspace_crud.go | 54 +++-- .../internal/provisioner/provisioner.go | 51 +++++ .../internal/registry/orphan_sweeper.go | 148 +++++++++++++ .../internal/registry/orphan_sweeper_test.go | 194 ++++++++++++++++++ 5 files changed, 444 insertions(+), 15 deletions(-) create mode 100644 workspace-server/internal/registry/orphan_sweeper.go create mode 100644 workspace-server/internal/registry/orphan_sweeper_test.go diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index e47780b8..3805452b 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -229,6 +229,18 @@ func main() { }) } + // Orphan-container reconcile sweep — finds running containers + // whose workspace row is already status='removed' and stops + // them. Defence in depth on top of the inline cleanup in + // handlers/workspace_crud.go: any Docker hiccup that left a + // container alive after the user clicked delete heals on the + // next sweep instead of leaking forever. + if prov != nil { + go supervised.RunWithRecover(ctx, "orphan-sweeper", func(c context.Context) { + registry.StartOrphanSweeper(c, prov) + }) + } + // Provision-timeout sweep — flips workspaces that have been stuck in // status='provisioning' past the timeout window to 'failed' and emits // WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic diff --git a/workspace-server/internal/handlers/workspace_crud.go b/workspace-server/internal/handlers/workspace_crud.go index 1d428183..6d7c79a2 100644 --- a/workspace-server/internal/handlers/workspace_crud.go +++ b/workspace-server/internal/handlers/workspace_crud.go @@ -5,12 +5,14 @@ package handlers // Delete (cascade + purge), and input validation helpers. import ( + "context" "database/sql" "fmt" "log" "net/http" "path/filepath" "strings" + "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" @@ -388,25 +390,47 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) { // Now stop containers + remove volumes for all descendants (any depth). // Any concurrent heartbeat / registration / liveness-triggered restart // will see status='removed' and bail out early. - for _, descID := range descendantIDs { - if h.provisioner != nil { - h.provisioner.Stop(ctx, descID) - if err := h.provisioner.RemoveVolume(ctx, descID); err != nil { - log.Printf("Delete descendant %s volume removal warning: %v", descID, err) - } + // + // IMPORTANT: detach from the request ctx via WithoutCancel so that + // when the canvas's `api.del` resolves on our 200 (and gin cancels + // `c.Request.Context()`), in-flight Docker stop/remove calls don't + // get cancelled mid-operation. The previous shape leaked containers + // every time the canvas hung up promptly: Stop returned + // `context canceled`, the container stayed up, and the next + // RemoveVolume call failed with `volume in use`. The 30s bound is + // generous for Docker daemon round-trips (typical: <2s) and keeps + // a stuck daemon from holding a goroutine forever. + cleanupCtx, cleanupCancel := context.WithTimeout( + context.WithoutCancel(ctx), 30*time.Second) + defer cleanupCancel() + + stopAndRemove := func(wsID string) { + if h.provisioner == nil { + return } - db.ClearWorkspaceKeys(ctx, descID) + // Check Stop's error before attempting RemoveVolume — the + // previous code discarded it and immediately tried the + // volume remove, which always fails with "volume in use" + // when Stop didn't actually kill the container. The orphan + // sweeper (registry/orphan_sweeper.go) catches what we + // skip here on the next reconcile pass. + if err := h.provisioner.Stop(cleanupCtx, wsID); err != nil { + log.Printf("Delete %s container stop failed: %v — leaving volume for orphan sweeper", wsID, err) + return + } + if err := h.provisioner.RemoveVolume(cleanupCtx, wsID); err != nil { + log.Printf("Delete %s volume removal warning: %v", wsID, err) + } + } + + for _, descID := range descendantIDs { + stopAndRemove(descID) + db.ClearWorkspaceKeys(cleanupCtx, descID) h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_REMOVED", descID, map[string]interface{}{}) } - // Stop + remove volume for the workspace itself - if h.provisioner != nil { - h.provisioner.Stop(ctx, id) - if err := h.provisioner.RemoveVolume(ctx, id); err != nil { - log.Printf("Delete %s volume removal warning: %v", id, err) - } - } - db.ClearWorkspaceKeys(ctx, id) + stopAndRemove(id) + db.ClearWorkspaceKeys(cleanupCtx, id) h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_REMOVED", id, map[string]interface{}{ "cascade_deleted": len(descendantIDs), diff --git a/workspace-server/internal/provisioner/provisioner.go b/workspace-server/internal/provisioner/provisioner.go index ac04b15f..412859b3 100644 --- a/workspace-server/internal/provisioner/provisioner.go +++ b/workspace-server/internal/provisioner/provisioner.go @@ -16,6 +16,7 @@ import ( "time" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" dockerimage "github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/volume" @@ -133,6 +134,56 @@ func ContainerName(workspaceID string) string { return fmt.Sprintf("ws-%s", id) } +// containerNamePrefix is the shared prefix every workspace container +// name carries (`ws-`). Used by ListWorkspaceContainerIDPrefixes for +// the Docker name-filter, and by the orphan sweeper to recognise our +// own containers vs. anything else on the host. +const containerNamePrefix = "ws-" + +// ListWorkspaceContainerIDPrefixes returns the 12-char workspace ID +// prefixes of every running ws-* container the Docker daemon knows +// about. The 12-char form matches ContainerName's truncation, so the +// orphan sweeper can intersect this set against `SELECT +// substring(id::text, 1, 12) FROM workspaces WHERE status = 'removed'` +// without an extra round-trip per row. +// +// Returns an empty slice on any Docker error (sweeper treats that as +// "skip this round" — better than a partial scan that misses leaks). +func (p *Provisioner) ListWorkspaceContainerIDPrefixes(ctx context.Context) ([]string, error) { + if p == nil || p.cli == nil { + return nil, nil + } + containers, err := p.cli.ContainerList(ctx, container.ListOptions{ + // All=true catches stopped-but-not-removed containers too — + // those still hold their volume references and would block + // RemoveVolume just like a running container would. + All: true, + Filters: filters.NewArgs(filters.Arg("name", containerNamePrefix)), + }) + if err != nil { + return nil, err + } + prefixes := make([]string, 0, len(containers)) + for _, c := range containers { + // Container names from the API include a leading slash: + // "/ws-abc123def456". Strip both the slash and our prefix + // to recover the 12-char workspace ID. + for _, name := range c.Names { + n := strings.TrimPrefix(name, "/") + if !strings.HasPrefix(n, containerNamePrefix) { + continue + } + id := strings.TrimPrefix(n, containerNamePrefix) + if id == "" { + continue + } + prefixes = append(prefixes, id) + break // one name is enough; multiple aliases would dup + } + } + return prefixes, nil +} + // InternalURL returns the Docker-internal URL for a workspace container. func InternalURL(workspaceID string) string { return fmt.Sprintf("http://%s:%s", ContainerName(workspaceID), DefaultPort) diff --git a/workspace-server/internal/registry/orphan_sweeper.go b/workspace-server/internal/registry/orphan_sweeper.go new file mode 100644 index 00000000..6fbbc0fd --- /dev/null +++ b/workspace-server/internal/registry/orphan_sweeper.go @@ -0,0 +1,148 @@ +package registry + +// orphan_sweeper.go — periodic reconcile pass that cleans up Docker +// containers whose corresponding workspace row in Postgres has +// status='removed'. Defence in depth on top of the inline cleanup +// in handlers/workspace_crud.go. +// +// Why this exists: the inline cleanup is one-shot — if Docker hiccups +// (daemon restart, host load, transient API error), the container +// silently stays alive while the DB row is already 'removed'. Without +// a reconcile pass those leaks accumulate forever. With one, every +// missed cleanup heals on the next sweep. +// +// Cost: O(running containers) per cycle, not O(historical removed +// rows). The Docker name filter trims the candidate set to ws-* only +// (typically the same handful as ContainerList without filter on a +// dev host); the DB lookup is one indexed query against the +// idx_workspaces_status btree. + +import ( + "context" + "log" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/lib/pq" +) + +// OrphanReaper is the dependency the sweeper takes from provisioner. +// Extracted as an interface so the sweeper is unit-testable without +// a real Docker daemon — matches the ContainerChecker pattern in +// healthsweep.go. *provisioner.Provisioner satisfies this naturally. +type OrphanReaper interface { + ListWorkspaceContainerIDPrefixes(ctx context.Context) ([]string, error) + Stop(ctx context.Context, workspaceID string) error + RemoveVolume(ctx context.Context, workspaceID string) error +} + +// OrphanSweepInterval is the cadence of the reconcile loop. 60s +// matches the heartbeat cadence (30s) × 2 — a single missed cleanup +// surfaces within ~90s end-to-end (canvas delete → next sweep tick → +// container gone). Faster cycles would just pay Docker API cost for +// no UX win; slower would let leaks linger long enough to compound +// CPU pressure on dev hosts. +const OrphanSweepInterval = 60 * time.Second + +// orphanSweepDeadline bounds a single sweep cycle. A daemon at the +// edge of timing out shouldn't accumulate goroutines. 30s is generous +// for a dev host with dozens of containers and a busy daemon. +const orphanSweepDeadline = 30 * time.Second + +// StartOrphanSweeper runs the reconcile loop until ctx is cancelled. +// nil reaper makes the loop a no-op (matches handlers' +// nil-provisioner-tolerant pattern — some test harnesses run without +// Docker available). +func StartOrphanSweeper(ctx context.Context, reaper OrphanReaper) { + if reaper == nil { + log.Println("Orphan sweeper: reaper is nil — sweeper disabled") + return + } + log.Printf("Orphan sweeper started — reconciling every %s", OrphanSweepInterval) + ticker := time.NewTicker(OrphanSweepInterval) + defer ticker.Stop() + // Run once immediately so a platform restart cleans up any + // containers leaked while we were down — don't make the user + // wait 60s for the first reconcile. + sweepOnce(ctx, reaper) + for { + select { + case <-ctx.Done(): + log.Println("Orphan sweeper: shutdown") + return + case <-ticker.C: + sweepOnce(ctx, reaper) + } + } +} + +func sweepOnce(parent context.Context, reaper OrphanReaper) { + ctx, cancel := context.WithTimeout(parent, orphanSweepDeadline) + defer cancel() + + prefixes, err := reaper.ListWorkspaceContainerIDPrefixes(ctx) + if err != nil { + log.Printf("Orphan sweeper: ListWorkspaceContainerIDPrefixes failed: %v — skipping cycle", err) + return + } + if len(prefixes) == 0 { + return + } + + // Resolve each prefix to a full workspace_id whose status is + // 'removed'. The platform's workspace IDs are full UUIDs but + // container names are truncated to 12 chars — an UPPER BOUND + // of one match per prefix is guaranteed by the DB (UUID v4 + // collisions in the first 12 chars across active rows are + // statistically negligible). Use a single IN-style query so + // the cost is one round-trip regardless of leak count. + // + // LIKE patterns built from prefixes go through pq.Array → no + // SQL injection vector (prefixes come from Docker, not user + // input, but defence in depth). + likes := make([]string, 0, len(prefixes)) + for _, p := range prefixes { + likes = append(likes, p+"%") + } + rows, err := db.DB.QueryContext(ctx, ` + SELECT id::text + FROM workspaces + WHERE status = 'removed' + AND id::text LIKE ANY($1::text[]) + `, pq.Array(likes)) + if err != nil { + log.Printf("Orphan sweeper: DB query failed: %v — skipping cycle", err) + return + } + defer rows.Close() + + var orphanIDs []string + for rows.Next() { + var id string + if scanErr := rows.Scan(&id); scanErr != nil { + log.Printf("Orphan sweeper: row scan failed: %v", scanErr) + continue + } + orphanIDs = append(orphanIDs, id) + } + if err := rows.Err(); err != nil { + log.Printf("Orphan sweeper: rows iteration failed: %v", err) + return + } + + for _, id := range orphanIDs { + log.Printf("Orphan sweeper: stopping leaked container for removed workspace %s", id) + if stopErr := reaper.Stop(ctx, id); stopErr != nil { + // Stop() itself returns nil even when container is + // gone, but a future change could surface real errors. + // Keep the volume around for the next sweep so we + // don't fall into the same Stop-failed-then-volume- + // in-use trap that motivated this sweeper. + log.Printf("Orphan sweeper: Stop failed for %s: %v — leaving volume", id, stopErr) + continue + } + if rmErr := reaper.RemoveVolume(ctx, id); rmErr != nil { + log.Printf("Orphan sweeper: RemoveVolume warning for %s: %v", id, rmErr) + } + } +} diff --git a/workspace-server/internal/registry/orphan_sweeper_test.go b/workspace-server/internal/registry/orphan_sweeper_test.go new file mode 100644 index 00000000..99eea4a9 --- /dev/null +++ b/workspace-server/internal/registry/orphan_sweeper_test.go @@ -0,0 +1,194 @@ +package registry + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" +) + +// fakeReaper is a hand-rolled OrphanReaper for the sweeper tests. +// Records every Stop / RemoveVolume call so tests can assert which +// workspace IDs got reconciled. +type fakeReaper struct { + mu sync.Mutex + listResponse []string + listErr error + stopErr map[string]error + removeVolErr map[string]error + stopCalls []string + removeVolCalls []string +} + +func (f *fakeReaper) ListWorkspaceContainerIDPrefixes(_ context.Context) ([]string, error) { + if f.listErr != nil { + return nil, f.listErr + } + return f.listResponse, nil +} + +func (f *fakeReaper) Stop(_ context.Context, wsID string) error { + f.mu.Lock() + defer f.mu.Unlock() + f.stopCalls = append(f.stopCalls, wsID) + return f.stopErr[wsID] +} + +func (f *fakeReaper) RemoveVolume(_ context.Context, wsID string) error { + f.mu.Lock() + defer f.mu.Unlock() + f.removeVolCalls = append(f.removeVolCalls, wsID) + return f.removeVolErr[wsID] +} + +// TestSweepOnce_ReconcilesRunningRemovedRows — the core reconcile +// behavior: a container running for a workspace whose DB row is +// 'removed' gets stopped + volume removed. +func TestSweepOnce_ReconcilesRunningRemovedRows(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + // Docker reports two ws-* containers; one's row is 'removed' + // (the leak), the other's is 'online' (the DB rightly excludes + // it from the WHERE clause and we should NOT reap it). + reaper := &fakeReaper{ + listResponse: []string{"abc123def456", "xyz789ghi012"}, + } + + // The query asks for status='removed' rows whose id matches the + // LIKE patterns built from the running container prefixes. Mock + // returns only the leaked one as a UUID-shaped full id. + mock.ExpectQuery(`SELECT id::text\s+FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"}). + AddRow("abc123def456-0000-0000-0000-000000000000")) + + sweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "abc123def456-0000-0000-0000-000000000000" { + t.Errorf("Stop calls = %v, want exactly the leaked id", reaper.stopCalls) + } + if len(reaper.removeVolCalls) != 1 || reaper.removeVolCalls[0] != "abc123def456-0000-0000-0000-000000000000" { + t.Errorf("RemoveVolume calls = %v, want exactly the leaked id", reaper.removeVolCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestSweepOnce_NoRunningContainers — Docker returns nothing, sweeper +// short-circuits without a DB query (no leak possible if no +// containers exist). +func TestSweepOnce_NoRunningContainers(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + reaper := &fakeReaper{listResponse: nil} + + // No DB query expected — if sweepOnce makes one anyway the + // sqlmock will fail "unexpected query". + sweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 0 { + t.Errorf("Stop should not fire when no containers exist; got %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestSweepOnce_DockerListErrorSkipsCycle — a Docker daemon hiccup +// must not cascade into a DB query (otherwise we'd reap based on +// stale information). Skip the cycle, retry next tick. +func TestSweepOnce_DockerListErrorSkipsCycle(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + reaper := &fakeReaper{listErr: errors.New("daemon unreachable")} + sweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 0 { + t.Errorf("Stop must not fire when Docker list failed; got %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestSweepOnce_StopFailureLeavesVolume — if Stop fails, RemoveVolume +// MUST NOT fire. This is the same trap that motivated the sweeper: +// removing a volume held by a still-running container always errors +// with "volume in use", and we'd accumulate noise in the log without +// actually fixing anything. Leave the volume for the next sweep +// (which will retry Stop). +func TestSweepOnce_StopFailureLeavesVolume(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + reaper := &fakeReaper{ + listResponse: []string{"abc123def456"}, + stopErr: map[string]error{ + "abc123def456-0000-0000-0000-000000000000": errors.New("docker daemon timeout"), + }, + } + mock.ExpectQuery(`SELECT id::text\s+FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"}). + AddRow("abc123def456-0000-0000-0000-000000000000")) + + sweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 1 { + t.Errorf("Stop should have been attempted exactly once, got %v", reaper.stopCalls) + } + if len(reaper.removeVolCalls) != 0 { + t.Errorf("RemoveVolume must not fire when Stop failed; got %v", reaper.removeVolCalls) + } +} + +// TestSweepOnce_VolumeRemoveErrorIsNonFatal — RemoveVolume failures +// are logged but don't prevent processing other orphans in the same +// cycle. Belt + braces against a transient daemon issue mid-loop. +func TestSweepOnce_VolumeRemoveErrorIsNonFatal(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + reaper := &fakeReaper{ + listResponse: []string{"aaa111bbb222", "ccc333ddd444"}, + removeVolErr: map[string]error{ + "aaa111bbb222-0000-0000-0000-000000000000": errors.New("volume not found"), + }, + } + mock.ExpectQuery(`SELECT id::text\s+FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"}). + AddRow("aaa111bbb222-0000-0000-0000-000000000000"). + AddRow("ccc333ddd444-0000-0000-0000-000000000000")) + + sweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 2 { + t.Errorf("both orphans should have been Stopped; got %v", reaper.stopCalls) + } + if len(reaper.removeVolCalls) != 2 { + t.Errorf("both orphans should have had RemoveVolume attempted; got %v", reaper.removeVolCalls) + } +} + +// TestStartOrphanSweeper_NilReaperIsNoOp — tolerance for the +// nil-provisioner path used by some test harnesses. +func TestStartOrphanSweeper_NilReaperIsNoOp(t *testing.T) { + // Should return immediately without panicking. Wrap in a goroutine + // + done-channel so we can assert it didn't block. + done := make(chan struct{}) + go func() { + StartOrphanSweeper(context.Background(), nil) + close(done) + }() + select { + case <-done: + // expected + case <-time.After(500 * time.Millisecond): + t.Fatal("StartOrphanSweeper(nil) blocked instead of returning immediately") + } +}