forked from molecule-ai/molecule-core
Compare commits
No commits in common. "refactor/delete-uses-cascade-helper" and "main" have entirely different histories.
refactor/d
...
main
@ -26,14 +26,6 @@ func TestExtended_WorkspaceDelete(t *testing.T) {
|
||||
WithArgs(wsDelID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}))
|
||||
|
||||
// CascadeDelete walks descendants unconditionally (the 0-children
|
||||
// optimization in the old inline path was dropped during the
|
||||
// CascadeDelete extraction — descendant CTE returns 0 rows here,
|
||||
// same end state, one extra cheap query).
|
||||
mock.ExpectQuery("WITH RECURSIVE descendants").
|
||||
WithArgs(wsDelID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
// #73: batch UPDATE happens BEFORE any container teardown.
|
||||
// Uses ANY($1::uuid[]) even with a single ID for consistency.
|
||||
mock.ExpectExec("UPDATE workspaces SET status =").
|
||||
|
||||
@ -787,14 +787,14 @@ func (h *OrgHandler) Import(c *gin.Context) {
|
||||
rows.Close()
|
||||
|
||||
for _, oid := range orphanIDs {
|
||||
descendantIDs, stopErrs, err := h.workspace.CascadeDelete(ctx, oid)
|
||||
cascadeCount, stopErrs, err := h.workspace.CascadeDelete(ctx, oid)
|
||||
if err != nil {
|
||||
log.Printf("Org import reconcile: CascadeDelete(%s) failed: %v", oid, err)
|
||||
reconcileErrs = append(reconcileErrs, fmt.Sprintf("delete %s: %v", oid, err))
|
||||
reconcileSkipped++
|
||||
continue
|
||||
}
|
||||
reconcileRemovedCount += 1 + len(descendantIDs)
|
||||
reconcileRemovedCount += 1 + cascadeCount
|
||||
if len(stopErrs) > 0 {
|
||||
log.Printf("Org import reconcile: %s had %d stop errors (orphan sweeper will retry)", oid, len(stopErrs))
|
||||
}
|
||||
|
||||
@ -323,19 +323,161 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Delegate the cascade to CascadeDelete so the HTTP path and the
|
||||
// OrgImport reconcile path share one teardown sequence (#73 race
|
||||
// guard, container stop, volume removal, token revocation, schedule
|
||||
// disable, broadcast). The HTTP-specific bits — direct-children 409
|
||||
// gate above, ?purge=true hard-delete below, response shaping —
|
||||
// stay in this handler.
|
||||
descendantIDs, stopErrs, err := h.CascadeDelete(ctx, id)
|
||||
if err != nil {
|
||||
log.Printf("Delete: CascadeDelete(%s) failed: %v", id, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
// Cascade delete: collect ALL descendants (not just direct children) via
|
||||
// recursive CTE, then stop each container and remove each volume.
|
||||
// Previous bug: only direct children's containers were stopped, leaving
|
||||
// grandchildren as orphan running containers after a cascade delete.
|
||||
descendantIDs := []string{}
|
||||
if len(children) > 0 {
|
||||
descRows, err := db.DB.QueryContext(ctx, `
|
||||
WITH RECURSIVE descendants AS (
|
||||
SELECT id FROM workspaces WHERE parent_id = $1 AND status != 'removed'
|
||||
UNION ALL
|
||||
SELECT w.id FROM workspaces w JOIN descendants d ON w.parent_id = d.id WHERE w.status != 'removed'
|
||||
)
|
||||
SELECT id FROM descendants
|
||||
`, id)
|
||||
if err != nil {
|
||||
log.Printf("Delete: descendant query error for %s: %v", id, err)
|
||||
} else {
|
||||
for descRows.Next() {
|
||||
var descID string
|
||||
if descRows.Scan(&descID) == nil {
|
||||
descendantIDs = append(descendantIDs, descID)
|
||||
}
|
||||
}
|
||||
descRows.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// #73 fix: mark rows 'removed' in the DB FIRST, BEFORE stopping containers
|
||||
// or removing volumes. Previously the sequence was stop → update-status,
|
||||
// which left a gap where:
|
||||
// - the container's last pre-teardown heartbeat could resurrect the row
|
||||
// via the register-handler UPSERT (now also guarded in #73)
|
||||
// - the liveness monitor could observe 'online' status + expired Redis
|
||||
// TTL and trigger RestartByID, recreating a container we're trying
|
||||
// to destroy
|
||||
// Marking 'removed' first makes both of those paths no-op via their
|
||||
// existing `status NOT IN ('removed', ...)` guards.
|
||||
allIDs := append([]string{id}, descendantIDs...)
|
||||
if _, err := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = $1, updated_at = now() WHERE id = ANY($2::uuid[])`,
|
||||
models.StatusRemoved, pq.Array(allIDs)); err != nil {
|
||||
log.Printf("Delete status update error for %s: %v", id, err)
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx,
|
||||
`DELETE FROM canvas_layouts WHERE workspace_id = ANY($1::uuid[])`,
|
||||
pq.Array(allIDs)); err != nil {
|
||||
log.Printf("Delete canvas_layouts error for %s: %v", id, err)
|
||||
}
|
||||
// Revoke all auth tokens for the deleted workspaces. Once the workspace is
|
||||
// gone its tokens are meaningless; leaving them alive would keep
|
||||
// HasAnyLiveTokenGlobal = true even after the platform is otherwise empty,
|
||||
// which prevents AdminAuth from returning to fail-open and breaks the E2E
|
||||
// test's count-zero assertion (and local re-run cleanup).
|
||||
if _, err := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspace_auth_tokens SET revoked_at = now()
|
||||
WHERE workspace_id = ANY($1::uuid[]) AND revoked_at IS NULL`,
|
||||
pq.Array(allIDs)); err != nil {
|
||||
log.Printf("Delete token revocation error for %s: %v", id, err)
|
||||
}
|
||||
// #1027: cascade-disable all schedules for the deleted workspaces so
|
||||
// the scheduler never fires a cron into a removed container.
|
||||
if _, err := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspace_schedules SET enabled = false, updated_at = now()
|
||||
WHERE workspace_id = ANY($1::uuid[]) AND enabled = true`,
|
||||
pq.Array(allIDs)); err != nil {
|
||||
log.Printf("Delete schedule disable error for %s: %v", id, err)
|
||||
}
|
||||
|
||||
// Now stop containers + remove volumes for all descendants (any depth).
|
||||
// Any concurrent heartbeat / registration / liveness-triggered restart
|
||||
// will see status='removed' and bail out early.
|
||||
//
|
||||
// Combines two concerns:
|
||||
//
|
||||
// 1. Detach cleanup from the request ctx via WithoutCancel + a 30s
|
||||
// timeout, so when the canvas's `api.del` resolves on our 200
|
||||
// (and gin cancels c.Request.Context()), in-flight Docker
|
||||
// stop/remove calls don't get cancelled mid-operation. The
|
||||
// previous shape leaked containers every time the canvas hung
|
||||
// up promptly: Stop returned "context canceled", the container
|
||||
// stayed up, and the next RemoveVolume failed with
|
||||
// "volume in use". 30s is generous for Docker daemon round-
|
||||
// trips (typical: <2s) and bounds a stuck daemon.
|
||||
//
|
||||
// 2. #1843: aggregate Stop() failures into stopErrs so the
|
||||
// post-deletion block surfaces them as 500. On the CP/EC2
|
||||
// backend, Stop() calls control plane's DELETE endpoint to
|
||||
// terminate the EC2; if that errors (transient 5xx, network),
|
||||
// the EC2 stays running with no DB row to track it (the
|
||||
// "orphan EC2 on a 0-customer account" scenario). Loud-fail
|
||||
// instead of silent-leak — clients retry, Stop's instance_id
|
||||
// lookup is idempotent against status='removed'. RemoveVolume
|
||||
// errors stay log-and-continue (local cleanup, not infra-leak).
|
||||
cleanupCtx, cleanupCancel := context.WithTimeout(
|
||||
context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cleanupCancel()
|
||||
|
||||
var stopErrs []error
|
||||
stopAndRemove := func(wsID string) {
|
||||
// Stop the workload first via the backend dispatcher (CP for
|
||||
// SaaS, Docker for self-hosted). Pre-2026-05-05 this gate was
|
||||
// `if h.provisioner == nil { return }` — early-returning on
|
||||
// every SaaS tenant left the EC2 running with no DB row to
|
||||
// track it (issue #2814; the comment below claimed "loud-fail
|
||||
// instead of silent-leak" but the early-return made it the
|
||||
// silent path on SaaS).
|
||||
//
|
||||
// Check Stop's error before any volume cleanup — the previous
|
||||
// code discarded it and immediately tried RemoveVolume, which
|
||||
// always fails with "volume in use" when Stop didn't actually
|
||||
// kill the container. The orphan sweeper
|
||||
// (registry/orphan_sweeper.go) catches what we skip here on
|
||||
// the next reconcile pass.
|
||||
if err := h.StopWorkspaceAuto(cleanupCtx, wsID); err != nil {
|
||||
log.Printf("Delete %s stop failed: %v — leaving cleanup for orphan sweeper", wsID, err)
|
||||
stopErrs = append(stopErrs, fmt.Errorf("stop %s: %w", wsID, err))
|
||||
return
|
||||
}
|
||||
// Volume cleanup is Docker-only — CP-managed workspaces have
|
||||
// no host-bind volumes to remove. Skip silently when no Docker
|
||||
// provisioner is wired (the SaaS path already terminated the
|
||||
// EC2 above; nothing left to do).
|
||||
if h.provisioner != nil {
|
||||
if err := h.provisioner.RemoveVolume(cleanupCtx, wsID); err != nil {
|
||||
log.Printf("Delete %s volume removal warning: %v", wsID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, descID := range descendantIDs {
|
||||
stopAndRemove(descID)
|
||||
db.ClearWorkspaceKeys(cleanupCtx, descID)
|
||||
// #2269: drop the per-workspace restartState entry so it
|
||||
// doesn't accumulate across the platform's lifetime. The
|
||||
// LoadOrStore that creates the entry (workspace_restart.go)
|
||||
// has no companion remove path; without this Delete, every
|
||||
// short-lived workspace leaks ~16 bytes forever.
|
||||
restartStates.Delete(descID)
|
||||
// Detach broadcaster ctx for the same reason as the cleanup
|
||||
// above — RecordAndBroadcast does an INSERT INTO
|
||||
// structure_events + Redis Publish. If the canvas hangs up,
|
||||
// a request-ctx-bound INSERT can be cancelled mid-write,
|
||||
// leaving other WS clients ignorant of the cascade. The DB
|
||||
// row is already 'removed' so it's recoverable, but the
|
||||
// inconsistency is avoidable.
|
||||
h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), descID, map[string]interface{}{})
|
||||
}
|
||||
|
||||
stopAndRemove(id)
|
||||
db.ClearWorkspaceKeys(cleanupCtx, id)
|
||||
restartStates.Delete(id) // #2269: same as descendants above
|
||||
|
||||
h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), id, map[string]interface{}{
|
||||
"cascade_deleted": len(descendantIDs),
|
||||
})
|
||||
|
||||
// If any Stop call failed, surface 500 so the client retries. The DB
|
||||
// row is already 'removed' (idempotent), and Stop's instance_id
|
||||
@ -407,17 +549,16 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
|
||||
// remove volumes, revoke tokens, disable schedules, broadcast events.
|
||||
//
|
||||
// Idempotent against already-removed rows (the descendant CTE and all UPDATE
|
||||
// guards skip status='removed'). Returns the descendant id list so the HTTP
|
||||
// caller can drive the optional `?purge=true` hard-delete path against the
|
||||
// same set the cascade just touched, plus any per-workspace stop errors so
|
||||
// callers can surface a retryable failure instead of a silent-leak.
|
||||
// guards skip status='removed'). Returns the number of cascaded descendants
|
||||
// (not including id itself) and any per-workspace stop errors so callers can
|
||||
// surface a retryable failure instead of a silent-leak.
|
||||
//
|
||||
// Caller is responsible for the children-confirmation gate (the HTTP handler
|
||||
// returns 409 when children exist + ?confirm=true is missing); this helper
|
||||
// always cascades.
|
||||
func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) ([]string, []error, error) {
|
||||
func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) (int, []error, error) {
|
||||
if err := validateWorkspaceID(id); err != nil {
|
||||
return nil, nil, err
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
descendantIDs := []string{}
|
||||
@ -430,7 +571,7 @@ func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) ([]stri
|
||||
SELECT id FROM descendants
|
||||
`, id)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("descendant query: %w", err)
|
||||
return 0, nil, fmt.Errorf("descendant query: %w", err)
|
||||
}
|
||||
for descRows.Next() {
|
||||
var descID string
|
||||
@ -496,7 +637,7 @@ func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) ([]stri
|
||||
"cascade_deleted": len(descendantIDs),
|
||||
})
|
||||
|
||||
return descendantIDs, stopErrs, nil
|
||||
return len(descendantIDs), stopErrs, nil
|
||||
}
|
||||
|
||||
// validateWorkspaceID returns an error when id is not a valid UUID.
|
||||
|
||||
@ -813,12 +813,6 @@ func TestWorkspaceDelete_DisablesSchedules(t *testing.T) {
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}))
|
||||
|
||||
// CascadeDelete walks descendants unconditionally — 0-children case
|
||||
// returns 0 rows here.
|
||||
mock.ExpectQuery("WITH RECURSIVE descendants").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
// Mark workspace as removed
|
||||
mock.ExpectExec("UPDATE workspaces SET status =").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
@ -941,12 +935,6 @@ func TestWorkspaceDelete_ScheduleDisableOnlyTargetsDeletedWorkspace(t *testing.T
|
||||
WithArgs(wsA).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}))
|
||||
|
||||
// CascadeDelete walks descendants unconditionally — 0-children case
|
||||
// returns 0 rows here.
|
||||
mock.ExpectQuery("WITH RECURSIVE descendants").
|
||||
WithArgs(wsA).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
// Mark only workspace A as removed
|
||||
mock.ExpectExec("UPDATE workspaces SET status =").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user