From 3de51faa1925d53c84b6e4b942b150a3fe490796 Mon Sep 17 00:00:00 2001 From: claude-ceo-assistant Date: Fri, 8 May 2026 15:04:47 -0700 Subject: [PATCH] fix(org-import): reconcile mode + audit-event emission MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the additive-import zombie bug — re-running /org/import with a tree shape that reparents same-named roles left the prior workspace online because lookupExistingChild's dedupe is parent-scoped (different parent_id → "different" workspace). Caught 2026-05-08 after a dev-tree re-import left 8 orphans co-existing with the new tree on canvas until manual cascade-delete. Three layers in this PR: - mode="reconcile" on /org/import — after the import loop, online workspaces whose name matches an imported name but whose id isn't in the result set are cascade-deleted. Default mode "" / "merge" preserves existing additive behavior. Empty-set guards prevent accidental "delete everything" if either array comes up empty. - WorkspaceHandler.CascadeDelete extracted as a callable helper from the existing Delete HTTP handler so OrgImport's reconcile path shares the same teardown sequence (#73 race guard, container stop, volume removal, token revocation, schedule disable, event broadcast). The HTTP Delete handler still inlines the same logic; deduplication tracked as tech-debt follow-up. - emitOrgEvent(structure_events) records org.import.started + org.import.completed with mode, created/skipped/reconcile_removed counts, duration_ms, error. Replaces the lost-on-restart stdout-only log shape for an audit-trail surface that's queryable by SQL. Closes the "what happened at 20:13?" debugging gap that motivated this fix. Verified live against the local platform: cascade-delete on an old tree's removed root cleared 8 surviving orphans; mode="reconcile" with a freshly-INSERTed fake orphan removed exactly the fake; idempotent re-run of reconcile is a no-op (0 removed, no errors); structure_events captures every started+completed pair with full payload. 7 new unit tests (walkOrgWorkspaceNames flat/nested/spawning:false/ empty-name; emitOrgEvent success + DB-error-swallow; errString). Full handler suite green. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace-server/internal/handlers/org.go | 177 +++++++++++++++++- .../handlers/org_import_reconcile_test.go | 158 ++++++++++++++++ .../internal/handlers/workspace_crud.go | 97 ++++++++++ 3 files changed, 431 insertions(+), 1 deletion(-) create mode 100644 workspace-server/internal/handlers/org_import_reconcile_test.go diff --git a/workspace-server/internal/handlers/org.go b/workspace-server/internal/handlers/org.go index 3f360faf..fe86c1e1 100644 --- a/workspace-server/internal/handlers/org.go +++ b/workspace-server/internal/handlers/org.go @@ -13,12 +13,15 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/channels" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/gin-gonic/gin" + "github.com/lib/pq" "gopkg.in/yaml.v3" ) @@ -568,11 +571,30 @@ func (h *OrgHandler) Import(c *gin.Context) { var body struct { Dir string `json:"dir"` // org template directory name Template OrgTemplate `json:"template"` // or inline template + // Mode controls cleanup behavior of pre-existing workspaces: + // "" / "merge" — additive (default; current behavior). + // Existing workspaces matched by + // (parent_id, name) are skipped; nothing + // outside the new tree is touched. + // "reconcile" — additive + cleanup. After import, any + // online workspace whose name matches an + // imported workspace's name but whose id + // isn't in the import result set is + // cascade-deleted. Catches "previous + // import survived a re-import" zombies + // (the 20:13→21:17 dev-tree case). + Mode string `json:"mode"` } if err := c.ShouldBindJSON(&body); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) return } + importStart := time.Now() + emitOrgEvent(c.Request.Context(), "org.import.started", map[string]any{ + "name": body.Template.Name, + "dir": body.Dir, + "mode": body.Mode, + }) var tmpl OrgTemplate var orgBaseDir string // base directory for files_dir resolution @@ -718,18 +740,171 @@ func (h *OrgHandler) Import(c *gin.Context) { } } + // Reconcile mode: prune workspaces present from a previous import that + // share a name with the new tree but are NOT in the new result set. + // Catches the additive-import bug where re-running /org/import with a + // changed tree shape (different parent_id for the same role name) leaves + // the prior workspace online — visible to the canvas, consuming + // containers, and looking like a duplicate. Default mode "" / "merge" + // preserves the old additive behavior. + reconcileRemovedCount := 0 + reconcileSkipped := 0 + reconcileErrs := []string{} + if body.Mode == "reconcile" && createErr == nil { + ctx := c.Request.Context() + importedNames := []string{} + walkOrgWorkspaceNames(tmpl.Workspaces, &importedNames) + + importedIDs := make([]string, 0, len(results)) + for _, r := range results { + if id, ok := r["id"].(string); ok && id != "" { + importedIDs = append(importedIDs, id) + } + } + + // Empty-set guards: if the import didn't produce any names or any + // IDs, skip — querying with empty arrays would either match + // nothing (harmless) or, worse, match every workspace if a future + // query rewrite drops the IN clause. Belt-and-suspenders. + if len(importedNames) > 0 && len(importedIDs) > 0 { + rows, err := db.DB.QueryContext(ctx, ` + SELECT id FROM workspaces + WHERE name = ANY($1::text[]) + AND id != ALL($2::uuid[]) + AND status != 'removed' + `, pq.Array(importedNames), pq.Array(importedIDs)) + if err != nil { + log.Printf("Org import reconcile: orphan query failed: %v", err) + reconcileErrs = append(reconcileErrs, fmt.Sprintf("orphan query: %v", err)) + } else { + orphanIDs := []string{} + for rows.Next() { + var orphanID string + if rows.Scan(&orphanID) == nil { + orphanIDs = append(orphanIDs, orphanID) + } + } + rows.Close() + + for _, oid := range orphanIDs { + cascadeCount, stopErrs, err := h.workspace.CascadeDelete(ctx, oid) + if err != nil { + log.Printf("Org import reconcile: CascadeDelete(%s) failed: %v", oid, err) + reconcileErrs = append(reconcileErrs, fmt.Sprintf("delete %s: %v", oid, err)) + reconcileSkipped++ + continue + } + reconcileRemovedCount += 1 + cascadeCount + if len(stopErrs) > 0 { + log.Printf("Org import reconcile: %s had %d stop errors (orphan sweeper will retry)", oid, len(stopErrs)) + } + } + log.Printf("Org import reconcile: %d orphans removed (%d cascade descendants), %d skipped", len(orphanIDs), reconcileRemovedCount-len(orphanIDs), reconcileSkipped) + } + } + } + status := http.StatusCreated resp := gin.H{ "org": tmpl.Name, "workspaces": results, "count": len(results), } + if body.Mode == "reconcile" { + resp["mode"] = "reconcile" + resp["reconcile_removed_count"] = reconcileRemovedCount + if len(reconcileErrs) > 0 { + resp["reconcile_errors"] = reconcileErrs + } + } if createErr != nil { status = http.StatusMultiStatus resp["error"] = createErr.Error() } - log.Printf("Org import: %s — %d workspaces created", tmpl.Name, len(results)) + // results contains both freshly-created AND lookupExistingChild skips + // (entries with "skipped":true). Splitting the count here so the audit + // row reflects "what changed" vs "what was already there" — telemetry + // readers shouldn't need to grep stdout to tell an idempotent re-run + // apart from a fresh-create. + createdCount, skippedCount := 0, 0 + for _, r := range results { + if skipped, _ := r["skipped"].(bool); skipped { + skippedCount++ + } else { + createdCount++ + } + } + log.Printf("Org import: %s — %d created, %d skipped, %d reconciled", + tmpl.Name, createdCount, skippedCount, reconcileRemovedCount) + emitOrgEvent(c.Request.Context(), "org.import.completed", map[string]any{ + "name": tmpl.Name, + "dir": body.Dir, + "mode": body.Mode, + "created_count": createdCount, + "skipped_count": skippedCount, + "reconcile_removed_count": reconcileRemovedCount, + "reconcile_errors": len(reconcileErrs), + "duration_ms": time.Since(importStart).Milliseconds(), + "create_error": errString(createErr), + }) c.JSON(status, resp) } +// walkOrgWorkspaceNames collects every Name in the tree (in any order) into +// names. Used by reconcile to detect orphan workspaces — workspaces with the +// same role name as a freshly-imported one but a different id, surviving from +// a prior import. +func walkOrgWorkspaceNames(workspaces []OrgWorkspace, names *[]string) { + for _, w := range workspaces { + // spawning:false subtrees are still part of the imported tree + // from a logical-tree perspective — DON'T skip the recursion, + // or reconcile would orphan the rest of the subtree on every + // re-import where spawning is toggled. Names of skipped + // workspaces remain registered so reconcile won't double-create + // them when spawning flips back to true. + if w.Name != "" { + *names = append(*names, w.Name) + } + walkOrgWorkspaceNames(w.Children, names) + } +} + +// emitOrgEvent records an org-lifecycle event in structure_events so the +// import history is queryable independent of stdout log retention. Errors +// are logged and swallowed — never block the request path on telemetry. +// +// Event-type taxonomy (extend by appending; never rename): +// +// org.import.started — handler entered, request body parsed +// org.import.completed — handler exiting (success or partial) +// org.import.failed — handler exiting with an unrecoverable error +// +// payload fields are documented at each call site. +func emitOrgEvent(ctx context.Context, eventType string, payload map[string]any) { + if payload == nil { + payload = map[string]any{} + } + payloadJSON, err := json.Marshal(payload) + if err != nil { + log.Printf("emitOrgEvent: marshal %s payload failed: %v", eventType, err) + return + } + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO structure_events (event_type, payload, created_at) + VALUES ($1, $2, now()) + `, eventType, payloadJSON); err != nil { + log.Printf("emitOrgEvent: insert %s failed: %v", eventType, err) + } +} + +// errString returns "" for a nil error, err.Error() otherwise. Lets us put +// nullable error strings in event payloads without checking for nil at every +// call site. +func errString(err error) string { + if err == nil { + return "" + } + return err.Error() +} + diff --git a/workspace-server/internal/handlers/org_import_reconcile_test.go b/workspace-server/internal/handlers/org_import_reconcile_test.go new file mode 100644 index 00000000..acb4ec5c --- /dev/null +++ b/workspace-server/internal/handlers/org_import_reconcile_test.go @@ -0,0 +1,158 @@ +package handlers + +import ( + "context" + "sort" + "testing" + + "github.com/DATA-DOG/go-sqlmock" +) + +// Tests for the reconcile-mode + audit-event additions to OrgHandler.Import. +// +// Background: /org/import was purely additive — re-running with a tree that +// renamed/reparented a role left the prior workspace online (different +// parent_id from the new one, so lookupExistingChild's parent-scoped dedupe +// missed it). The 2026-05-08 dev-tree case left 8 orphans surviving a +// re-import. mode="reconcile" closes the gap; emitOrgEvent makes "what +// happened at 20:13?" queryable instead of stdout-grep archaeology. + +func TestWalkOrgWorkspaceNames_FlatTree(t *testing.T) { + tree := []OrgWorkspace{ + {Name: "Dev Lead"}, + {Name: "Release Manager"}, + } + var names []string + walkOrgWorkspaceNames(tree, &names) + sort.Strings(names) + want := []string{"Dev Lead", "Release Manager"} + if !equalStrings(names, want) { + t.Errorf("flat tree: got %v, want %v", names, want) + } +} + +func TestWalkOrgWorkspaceNames_NestedTree(t *testing.T) { + tree := []OrgWorkspace{ + { + Name: "Dev Lead", + Children: []OrgWorkspace{ + {Name: "Core Platform Lead", Children: []OrgWorkspace{{Name: "Core-BE"}}}, + {Name: "SDK Lead"}, + }, + }, + } + var names []string + walkOrgWorkspaceNames(tree, &names) + sort.Strings(names) + want := []string{"Core Platform Lead", "Core-BE", "Dev Lead", "SDK Lead"} + if !equalStrings(names, want) { + t.Errorf("nested tree: got %v, want %v", names, want) + } +} + +// Pins the contract that spawning:false subtrees still contribute their names +// to the reconcile working set. If the walker started skipping them, a +// re-import that toggled spawning would orphan whichever workspaces had been +// previously imported with spawning:true — the inverse of the bug being +// fixed. Spawning gates *provisioning*, not *reconcile membership*. +func TestWalkOrgWorkspaceNames_SpawningFalseStillCounted(t *testing.T) { + f := false + tree := []OrgWorkspace{ + {Name: "Dev Lead", Children: []OrgWorkspace{ + {Name: "Skipped Lead", Spawning: &f, Children: []OrgWorkspace{ + {Name: "Skipped Child"}, + }}, + }}, + } + var names []string + walkOrgWorkspaceNames(tree, &names) + sort.Strings(names) + want := []string{"Dev Lead", "Skipped Child", "Skipped Lead"} + if !equalStrings(names, want) { + t.Errorf("spawning:false subtree: got %v, want %v", names, want) + } +} + +func TestWalkOrgWorkspaceNames_EmptyNamesSkipped(t *testing.T) { + tree := []OrgWorkspace{ + {Name: "Dev Lead"}, + {Name: ""}, // YAML default / placeholder + {Name: "Release Manager"}, + } + var names []string + walkOrgWorkspaceNames(tree, &names) + sort.Strings(names) + want := []string{"Dev Lead", "Release Manager"} + if !equalStrings(names, want) { + t.Errorf("empty-name skip: got %v, want %v", names, want) + } +} + +// emitOrgEvent must INSERT into structure_events with event_type + JSON +// payload. Verifies the SQL shape pinning so a future schema rename +// (e.g., switching to audit_events) breaks the test loudly instead of +// silently dropping telemetry. +func TestEmitOrgEvent_InsertsToStructureEvents(t *testing.T) { + mock := setupTestDB(t) + mock.ExpectExec(`INSERT INTO structure_events`). + WithArgs("org.import.started", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + + emitOrgEvent(context.Background(), "org.import.started", map[string]any{ + "name": "test-org", + "mode": "reconcile", + }) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// Insert failures are log-and-swallow — telemetry MUST NOT block the +// caller path. If this regresses (e.g., a future patch returns the err), +// org-import requests would fail with HTTP 500 every time a structure_events +// INSERT hiccups, which is strictly worse than losing the row. +func TestEmitOrgEvent_DBErrorIsSwallowed(t *testing.T) { + mock := setupTestDB(t) + mock.ExpectExec(`INSERT INTO structure_events`). + WithArgs("org.import.failed", sqlmock.AnyArg()). + WillReturnError(errSentinelTest) + + // Must not panic; must not propagate. The function returns nothing, + // so the contract is "doesn't crash." + emitOrgEvent(context.Background(), "org.import.failed", map[string]any{ + "err": "preflight failed", + }) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +func TestErrString(t *testing.T) { + if got := errString(nil); got != "" { + t.Errorf("nil error: got %q, want empty", got) + } + if got := errString(errSentinelTest); got != "sentinel" { + t.Errorf("sentinel error: got %q, want \"sentinel\"", got) + } +} + +// errSentinelTest is a marker error used for swallow-error assertions. +var errSentinelTest = sentinelErrTest{} + +type sentinelErrTest struct{} + +func (sentinelErrTest) Error() string { return "sentinel" } + +func equalStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/workspace-server/internal/handlers/workspace_crud.go b/workspace-server/internal/handlers/workspace_crud.go index 200356b1..6334d0f2 100644 --- a/workspace-server/internal/handlers/workspace_crud.go +++ b/workspace-server/internal/handlers/workspace_crud.go @@ -543,6 +543,103 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "removed", "cascade_deleted": len(descendantIDs)}) } +// CascadeDelete performs the cascade-removal sequence used by the HTTP +// DELETE handler and by OrgImport's reconcile mode: walk descendants, mark +// self+descendants 'removed' first (#73 race guard), stop containers / EC2s, +// remove volumes, revoke tokens, disable schedules, broadcast events. +// +// Idempotent against already-removed rows (the descendant CTE and all UPDATE +// guards skip status='removed'). Returns the number of cascaded descendants +// (not including id itself) and any per-workspace stop errors so callers can +// surface a retryable failure instead of a silent-leak. +// +// Caller is responsible for the children-confirmation gate (the HTTP handler +// returns 409 when children exist + ?confirm=true is missing); this helper +// always cascades. +func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) (int, []error, error) { + if err := validateWorkspaceID(id); err != nil { + return 0, nil, err + } + + descendantIDs := []string{} + descRows, err := db.DB.QueryContext(ctx, ` + WITH RECURSIVE descendants AS ( + SELECT id FROM workspaces WHERE parent_id = $1 AND status != 'removed' + UNION ALL + SELECT w.id FROM workspaces w JOIN descendants d ON w.parent_id = d.id WHERE w.status != 'removed' + ) + SELECT id FROM descendants + `, id) + if err != nil { + return 0, nil, fmt.Errorf("descendant query: %w", err) + } + for descRows.Next() { + var descID string + if descRows.Scan(&descID) == nil { + descendantIDs = append(descendantIDs, descID) + } + } + descRows.Close() + + allIDs := append([]string{id}, descendantIDs...) + + if _, err := db.DB.ExecContext(ctx, + `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = ANY($2::uuid[])`, + models.StatusRemoved, pq.Array(allIDs)); err != nil { + log.Printf("CascadeDelete status update for %s: %v", id, err) + } + if _, err := db.DB.ExecContext(ctx, + `DELETE FROM canvas_layouts WHERE workspace_id = ANY($1::uuid[])`, + pq.Array(allIDs)); err != nil { + log.Printf("CascadeDelete canvas_layouts for %s: %v", id, err) + } + if _, err := db.DB.ExecContext(ctx, + `UPDATE workspace_auth_tokens SET revoked_at = now() + WHERE workspace_id = ANY($1::uuid[]) AND revoked_at IS NULL`, + pq.Array(allIDs)); err != nil { + log.Printf("CascadeDelete token revocation for %s: %v", id, err) + } + if _, err := db.DB.ExecContext(ctx, + `UPDATE workspace_schedules SET enabled = false, updated_at = now() + WHERE workspace_id = ANY($1::uuid[]) AND enabled = true`, + pq.Array(allIDs)); err != nil { + log.Printf("CascadeDelete schedule disable for %s: %v", id, err) + } + + cleanupCtx, cleanupCancel := context.WithTimeout( + context.WithoutCancel(ctx), 30*time.Second) + defer cleanupCancel() + + var stopErrs []error + stopAndRemove := func(wsID string) { + if err := h.StopWorkspaceAuto(cleanupCtx, wsID); err != nil { + log.Printf("CascadeDelete %s stop failed: %v — leaving cleanup for orphan sweeper", wsID, err) + stopErrs = append(stopErrs, fmt.Errorf("stop %s: %w", wsID, err)) + return + } + if h.provisioner != nil { + if err := h.provisioner.RemoveVolume(cleanupCtx, wsID); err != nil { + log.Printf("CascadeDelete %s volume removal warning: %v", wsID, err) + } + } + } + + for _, descID := range descendantIDs { + stopAndRemove(descID) + db.ClearWorkspaceKeys(cleanupCtx, descID) + restartStates.Delete(descID) + h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), descID, map[string]interface{}{}) + } + stopAndRemove(id) + db.ClearWorkspaceKeys(cleanupCtx, id) + restartStates.Delete(id) + h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), id, map[string]interface{}{ + "cascade_deleted": len(descendantIDs), + }) + + return len(descendantIDs), stopErrs, nil +} + // validateWorkspaceID returns an error when id is not a valid UUID. // #687: prevents 500s from Postgres when a garbage string (e.g. ../../etc/passwd) // is passed as the :id path parameter. -- 2.45.2