fix(platform): stop leaking workspace containers on delete

Symptom: deleting workspaces from the canvas marked DB rows
status='removed' but left Docker containers running indefinitely.
After a session of org imports + cancellations, we counted 10
running ws-* containers all backed by 'removed' DB rows, eating
~1100% CPU on the Docker VM.

Two compounding bugs in handlers/workspace_crud.go's delete cascade:

1. The cleanup loop used `c.Request.Context()` for the Docker
   stop/remove calls. When the canvas's `api.del` resolved on the
   platform's 200, gin cancelled the request ctx — and any in-flight
   Docker call cancelled with `context canceled`, leaving the
   container alive. Old logs:
       "Delete descendant <id> volume removal warning:
        ... context canceled"

2. `provisioner.Stop`'s error return was discarded and `RemoveVolume`
   ran unconditionally afterward. When Stop didn't actually kill the
   container (transient daemon error, ctx cancellation as in #1), the
   volume removal would predictably fail with "volume in use" and
   the container kept running with the volume mounted. Old logs:
       "Delete descendant <id> volume removal warning:
        Error response from daemon: remove ... volume is in use"

Fix layered in two parts:

- workspace_crud.go: detach cleanup with `context.WithoutCancel(ctx)`
  + a 30s bounded timeout. Stop's error is now checked and on
  failure we skip RemoveVolume entirely (the orphan sweeper below
  catches what we deferred).

- New registry/orphan_sweeper.go: periodic reconcile pass (every 60s,
  initial run on boot). Lists running ws-* containers via Docker name
  filter, intersects with DB rows where status='removed', stops +
  removes volumes for the leaks. Defence in depth — even a brand-new
  Stop failure mode heals on the next sweep instead of leaking
  forever.

Provisioner gains a tiny ListWorkspaceContainerIDPrefixes helper
that wraps ContainerList with the `name=ws-` filter; the sweeper
takes an OrphanReaper interface (matches the ContainerChecker
pattern in healthsweep.go) so unit tests don't need a real Docker
daemon.

main.go wires the sweeper alongside the existing liveness +
health-sweep + provisioning-timeout monitors, all under
supervised.RunWithRecover so a panic restarts the goroutine.

6 new sweeper tests cover the reconcile path, the
no-running-containers short-circuit, the daemon-error skip, the
Stop-failure-leaves-volume invariant (the same trap that motivated
this fix), the volume-remove-error-is-non-fatal continuation,
and the nil-reaper no-op.

Verified: full Go test suite passes; manually purged the 10 leaked
containers + their orphan volumes from the dev host with `docker
rm -f` + `docker volume rm` (one-off cleanup; the sweeper would
have caught them on the next cycle once deployed).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-25 12:36:22 -07:00
parent 2ee4b67cab
commit 12c4918318
5 changed files with 444 additions and 15 deletions

View File

@ -229,6 +229,18 @@ func main() {
})
}
// Orphan-container reconcile sweep — finds running containers
// whose workspace row is already status='removed' and stops
// them. Defence in depth on top of the inline cleanup in
// handlers/workspace_crud.go: any Docker hiccup that left a
// container alive after the user clicked delete heals on the
// next sweep instead of leaking forever.
if prov != nil {
go supervised.RunWithRecover(ctx, "orphan-sweeper", func(c context.Context) {
registry.StartOrphanSweeper(c, prov)
})
}
// Provision-timeout sweep — flips workspaces that have been stuck in
// status='provisioning' past the timeout window to 'failed' and emits
// WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic

View File

@ -5,12 +5,14 @@ package handlers
// Delete (cascade + purge), and input validation helpers.
import (
"context"
"database/sql"
"fmt"
"log"
"net/http"
"path/filepath"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
@ -388,25 +390,47 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
// Now stop containers + remove volumes for all descendants (any depth).
// Any concurrent heartbeat / registration / liveness-triggered restart
// will see status='removed' and bail out early.
for _, descID := range descendantIDs {
if h.provisioner != nil {
h.provisioner.Stop(ctx, descID)
if err := h.provisioner.RemoveVolume(ctx, descID); err != nil {
log.Printf("Delete descendant %s volume removal warning: %v", descID, err)
}
//
// IMPORTANT: detach from the request ctx via WithoutCancel so that
// when the canvas's `api.del` resolves on our 200 (and gin cancels
// `c.Request.Context()`), in-flight Docker stop/remove calls don't
// get cancelled mid-operation. The previous shape leaked containers
// every time the canvas hung up promptly: Stop returned
// `context canceled`, the container stayed up, and the next
// RemoveVolume call failed with `volume in use`. The 30s bound is
// generous for Docker daemon round-trips (typical: <2s) and keeps
// a stuck daemon from holding a goroutine forever.
cleanupCtx, cleanupCancel := context.WithTimeout(
context.WithoutCancel(ctx), 30*time.Second)
defer cleanupCancel()
stopAndRemove := func(wsID string) {
if h.provisioner == nil {
return
}
db.ClearWorkspaceKeys(ctx, descID)
// Check Stop's error before attempting RemoveVolume — the
// previous code discarded it and immediately tried the
// volume remove, which always fails with "volume in use"
// when Stop didn't actually kill the container. The orphan
// sweeper (registry/orphan_sweeper.go) catches what we
// skip here on the next reconcile pass.
if err := h.provisioner.Stop(cleanupCtx, wsID); err != nil {
log.Printf("Delete %s container stop failed: %v — leaving volume for orphan sweeper", wsID, err)
return
}
if err := h.provisioner.RemoveVolume(cleanupCtx, wsID); err != nil {
log.Printf("Delete %s volume removal warning: %v", wsID, err)
}
}
for _, descID := range descendantIDs {
stopAndRemove(descID)
db.ClearWorkspaceKeys(cleanupCtx, descID)
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_REMOVED", descID, map[string]interface{}{})
}
// Stop + remove volume for the workspace itself
if h.provisioner != nil {
h.provisioner.Stop(ctx, id)
if err := h.provisioner.RemoveVolume(ctx, id); err != nil {
log.Printf("Delete %s volume removal warning: %v", id, err)
}
}
db.ClearWorkspaceKeys(ctx, id)
stopAndRemove(id)
db.ClearWorkspaceKeys(cleanupCtx, id)
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_REMOVED", id, map[string]interface{}{
"cascade_deleted": len(descendantIDs),

View File

@ -16,6 +16,7 @@ import (
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
dockerimage "github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/volume"
@ -133,6 +134,56 @@ func ContainerName(workspaceID string) string {
return fmt.Sprintf("ws-%s", id)
}
// containerNamePrefix is the shared prefix every workspace container
// name carries (`ws-`). Used by ListWorkspaceContainerIDPrefixes for
// the Docker name-filter, and by the orphan sweeper to recognise our
// own containers vs. anything else on the host.
const containerNamePrefix = "ws-"
// ListWorkspaceContainerIDPrefixes returns the 12-char workspace ID
// prefixes of every running ws-* container the Docker daemon knows
// about. The 12-char form matches ContainerName's truncation, so the
// orphan sweeper can intersect this set against `SELECT
// substring(id::text, 1, 12) FROM workspaces WHERE status = 'removed'`
// without an extra round-trip per row.
//
// Returns an empty slice on any Docker error (sweeper treats that as
// "skip this round" — better than a partial scan that misses leaks).
func (p *Provisioner) ListWorkspaceContainerIDPrefixes(ctx context.Context) ([]string, error) {
if p == nil || p.cli == nil {
return nil, nil
}
containers, err := p.cli.ContainerList(ctx, container.ListOptions{
// All=true catches stopped-but-not-removed containers too —
// those still hold their volume references and would block
// RemoveVolume just like a running container would.
All: true,
Filters: filters.NewArgs(filters.Arg("name", containerNamePrefix)),
})
if err != nil {
return nil, err
}
prefixes := make([]string, 0, len(containers))
for _, c := range containers {
// Container names from the API include a leading slash:
// "/ws-abc123def456". Strip both the slash and our prefix
// to recover the 12-char workspace ID.
for _, name := range c.Names {
n := strings.TrimPrefix(name, "/")
if !strings.HasPrefix(n, containerNamePrefix) {
continue
}
id := strings.TrimPrefix(n, containerNamePrefix)
if id == "" {
continue
}
prefixes = append(prefixes, id)
break // one name is enough; multiple aliases would dup
}
}
return prefixes, nil
}
// InternalURL returns the Docker-internal URL for a workspace container.
func InternalURL(workspaceID string) string {
return fmt.Sprintf("http://%s:%s", ContainerName(workspaceID), DefaultPort)

View File

@ -0,0 +1,148 @@
package registry
// orphan_sweeper.go — periodic reconcile pass that cleans up Docker
// containers whose corresponding workspace row in Postgres has
// status='removed'. Defence in depth on top of the inline cleanup
// in handlers/workspace_crud.go.
//
// Why this exists: the inline cleanup is one-shot — if Docker hiccups
// (daemon restart, host load, transient API error), the container
// silently stays alive while the DB row is already 'removed'. Without
// a reconcile pass those leaks accumulate forever. With one, every
// missed cleanup heals on the next sweep.
//
// Cost: O(running containers) per cycle, not O(historical removed
// rows). The Docker name filter trims the candidate set to ws-* only
// (typically the same handful as ContainerList without filter on a
// dev host); the DB lookup is one indexed query against the
// idx_workspaces_status btree.
import (
"context"
"log"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/lib/pq"
)
// OrphanReaper is the dependency the sweeper takes from provisioner.
// Extracted as an interface so the sweeper is unit-testable without
// a real Docker daemon — matches the ContainerChecker pattern in
// healthsweep.go. *provisioner.Provisioner satisfies this naturally.
type OrphanReaper interface {
ListWorkspaceContainerIDPrefixes(ctx context.Context) ([]string, error)
Stop(ctx context.Context, workspaceID string) error
RemoveVolume(ctx context.Context, workspaceID string) error
}
// OrphanSweepInterval is the cadence of the reconcile loop. 60s
// matches the heartbeat cadence (30s) × 2 — a single missed cleanup
// surfaces within ~90s end-to-end (canvas delete → next sweep tick →
// container gone). Faster cycles would just pay Docker API cost for
// no UX win; slower would let leaks linger long enough to compound
// CPU pressure on dev hosts.
const OrphanSweepInterval = 60 * time.Second
// orphanSweepDeadline bounds a single sweep cycle. A daemon at the
// edge of timing out shouldn't accumulate goroutines. 30s is generous
// for a dev host with dozens of containers and a busy daemon.
const orphanSweepDeadline = 30 * time.Second
// StartOrphanSweeper runs the reconcile loop until ctx is cancelled.
// nil reaper makes the loop a no-op (matches handlers'
// nil-provisioner-tolerant pattern — some test harnesses run without
// Docker available).
func StartOrphanSweeper(ctx context.Context, reaper OrphanReaper) {
if reaper == nil {
log.Println("Orphan sweeper: reaper is nil — sweeper disabled")
return
}
log.Printf("Orphan sweeper started — reconciling every %s", OrphanSweepInterval)
ticker := time.NewTicker(OrphanSweepInterval)
defer ticker.Stop()
// Run once immediately so a platform restart cleans up any
// containers leaked while we were down — don't make the user
// wait 60s for the first reconcile.
sweepOnce(ctx, reaper)
for {
select {
case <-ctx.Done():
log.Println("Orphan sweeper: shutdown")
return
case <-ticker.C:
sweepOnce(ctx, reaper)
}
}
}
func sweepOnce(parent context.Context, reaper OrphanReaper) {
ctx, cancel := context.WithTimeout(parent, orphanSweepDeadline)
defer cancel()
prefixes, err := reaper.ListWorkspaceContainerIDPrefixes(ctx)
if err != nil {
log.Printf("Orphan sweeper: ListWorkspaceContainerIDPrefixes failed: %v — skipping cycle", err)
return
}
if len(prefixes) == 0 {
return
}
// Resolve each prefix to a full workspace_id whose status is
// 'removed'. The platform's workspace IDs are full UUIDs but
// container names are truncated to 12 chars — an UPPER BOUND
// of one match per prefix is guaranteed by the DB (UUID v4
// collisions in the first 12 chars across active rows are
// statistically negligible). Use a single IN-style query so
// the cost is one round-trip regardless of leak count.
//
// LIKE patterns built from prefixes go through pq.Array → no
// SQL injection vector (prefixes come from Docker, not user
// input, but defence in depth).
likes := make([]string, 0, len(prefixes))
for _, p := range prefixes {
likes = append(likes, p+"%")
}
rows, err := db.DB.QueryContext(ctx, `
SELECT id::text
FROM workspaces
WHERE status = 'removed'
AND id::text LIKE ANY($1::text[])
`, pq.Array(likes))
if err != nil {
log.Printf("Orphan sweeper: DB query failed: %v — skipping cycle", err)
return
}
defer rows.Close()
var orphanIDs []string
for rows.Next() {
var id string
if scanErr := rows.Scan(&id); scanErr != nil {
log.Printf("Orphan sweeper: row scan failed: %v", scanErr)
continue
}
orphanIDs = append(orphanIDs, id)
}
if err := rows.Err(); err != nil {
log.Printf("Orphan sweeper: rows iteration failed: %v", err)
return
}
for _, id := range orphanIDs {
log.Printf("Orphan sweeper: stopping leaked container for removed workspace %s", id)
if stopErr := reaper.Stop(ctx, id); stopErr != nil {
// Stop() itself returns nil even when container is
// gone, but a future change could surface real errors.
// Keep the volume around for the next sweep so we
// don't fall into the same Stop-failed-then-volume-
// in-use trap that motivated this sweeper.
log.Printf("Orphan sweeper: Stop failed for %s: %v — leaving volume", id, stopErr)
continue
}
if rmErr := reaper.RemoveVolume(ctx, id); rmErr != nil {
log.Printf("Orphan sweeper: RemoveVolume warning for %s: %v", id, rmErr)
}
}
}

View File

@ -0,0 +1,194 @@
package registry
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
// fakeReaper is a hand-rolled OrphanReaper for the sweeper tests.
// Records every Stop / RemoveVolume call so tests can assert which
// workspace IDs got reconciled.
type fakeReaper struct {
mu sync.Mutex
listResponse []string
listErr error
stopErr map[string]error
removeVolErr map[string]error
stopCalls []string
removeVolCalls []string
}
func (f *fakeReaper) ListWorkspaceContainerIDPrefixes(_ context.Context) ([]string, error) {
if f.listErr != nil {
return nil, f.listErr
}
return f.listResponse, nil
}
func (f *fakeReaper) Stop(_ context.Context, wsID string) error {
f.mu.Lock()
defer f.mu.Unlock()
f.stopCalls = append(f.stopCalls, wsID)
return f.stopErr[wsID]
}
func (f *fakeReaper) RemoveVolume(_ context.Context, wsID string) error {
f.mu.Lock()
defer f.mu.Unlock()
f.removeVolCalls = append(f.removeVolCalls, wsID)
return f.removeVolErr[wsID]
}
// TestSweepOnce_ReconcilesRunningRemovedRows — the core reconcile
// behavior: a container running for a workspace whose DB row is
// 'removed' gets stopped + volume removed.
func TestSweepOnce_ReconcilesRunningRemovedRows(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
// Docker reports two ws-* containers; one's row is 'removed'
// (the leak), the other's is 'online' (the DB rightly excludes
// it from the WHERE clause and we should NOT reap it).
reaper := &fakeReaper{
listResponse: []string{"abc123def456", "xyz789ghi012"},
}
// The query asks for status='removed' rows whose id matches the
// LIKE patterns built from the running container prefixes. Mock
// returns only the leaked one as a UUID-shaped full id.
mock.ExpectQuery(`SELECT id::text\s+FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("abc123def456-0000-0000-0000-000000000000"))
sweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "abc123def456-0000-0000-0000-000000000000" {
t.Errorf("Stop calls = %v, want exactly the leaked id", reaper.stopCalls)
}
if len(reaper.removeVolCalls) != 1 || reaper.removeVolCalls[0] != "abc123def456-0000-0000-0000-000000000000" {
t.Errorf("RemoveVolume calls = %v, want exactly the leaked id", reaper.removeVolCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestSweepOnce_NoRunningContainers — Docker returns nothing, sweeper
// short-circuits without a DB query (no leak possible if no
// containers exist).
func TestSweepOnce_NoRunningContainers(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
reaper := &fakeReaper{listResponse: nil}
// No DB query expected — if sweepOnce makes one anyway the
// sqlmock will fail "unexpected query".
sweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Errorf("Stop should not fire when no containers exist; got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestSweepOnce_DockerListErrorSkipsCycle — a Docker daemon hiccup
// must not cascade into a DB query (otherwise we'd reap based on
// stale information). Skip the cycle, retry next tick.
func TestSweepOnce_DockerListErrorSkipsCycle(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
reaper := &fakeReaper{listErr: errors.New("daemon unreachable")}
sweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Errorf("Stop must not fire when Docker list failed; got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestSweepOnce_StopFailureLeavesVolume — if Stop fails, RemoveVolume
// MUST NOT fire. This is the same trap that motivated the sweeper:
// removing a volume held by a still-running container always errors
// with "volume in use", and we'd accumulate noise in the log without
// actually fixing anything. Leave the volume for the next sweep
// (which will retry Stop).
func TestSweepOnce_StopFailureLeavesVolume(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
reaper := &fakeReaper{
listResponse: []string{"abc123def456"},
stopErr: map[string]error{
"abc123def456-0000-0000-0000-000000000000": errors.New("docker daemon timeout"),
},
}
mock.ExpectQuery(`SELECT id::text\s+FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("abc123def456-0000-0000-0000-000000000000"))
sweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 1 {
t.Errorf("Stop should have been attempted exactly once, got %v", reaper.stopCalls)
}
if len(reaper.removeVolCalls) != 0 {
t.Errorf("RemoveVolume must not fire when Stop failed; got %v", reaper.removeVolCalls)
}
}
// TestSweepOnce_VolumeRemoveErrorIsNonFatal — RemoveVolume failures
// are logged but don't prevent processing other orphans in the same
// cycle. Belt + braces against a transient daemon issue mid-loop.
func TestSweepOnce_VolumeRemoveErrorIsNonFatal(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
reaper := &fakeReaper{
listResponse: []string{"aaa111bbb222", "ccc333ddd444"},
removeVolErr: map[string]error{
"aaa111bbb222-0000-0000-0000-000000000000": errors.New("volume not found"),
},
}
mock.ExpectQuery(`SELECT id::text\s+FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("aaa111bbb222-0000-0000-0000-000000000000").
AddRow("ccc333ddd444-0000-0000-0000-000000000000"))
sweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 2 {
t.Errorf("both orphans should have been Stopped; got %v", reaper.stopCalls)
}
if len(reaper.removeVolCalls) != 2 {
t.Errorf("both orphans should have had RemoveVolume attempted; got %v", reaper.removeVolCalls)
}
}
// TestStartOrphanSweeper_NilReaperIsNoOp — tolerance for the
// nil-provisioner path used by some test harnesses.
func TestStartOrphanSweeper_NilReaperIsNoOp(t *testing.T) {
// Should return immediately without panicking. Wrap in a goroutine
// + done-channel so we can assert it didn't block.
done := make(chan struct{})
go func() {
StartOrphanSweeper(context.Background(), nil)
close(done)
}()
select {
case <-done:
// expected
case <-time.After(500 * time.Millisecond):
t.Fatal("StartOrphanSweeper(nil) blocked instead of returning immediately")
}
}