fix(org-import): reconcile mode + audit-event emission
All checks were successful
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Successful in 1s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Successful in 2s
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Successful in 1s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 5s
CI / Detect changes (pull_request) Successful in 8s
E2E API Smoke Test / detect-changes (pull_request) Successful in 7s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 8s
Harness Replays / detect-changes (pull_request) Successful in 7s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 7s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 10s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Python Lint & Test (pull_request) Successful in 4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 34s
CI / Canvas (Next.js) (pull_request) Successful in 57s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 56s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m1s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m22s
Harness Replays / Harness Replays (pull_request) Successful in 2m59s
CI / Platform (Go) (pull_request) Successful in 3m20s

Closes the additive-import zombie bug — re-running /org/import with a
tree shape that reparents same-named roles left the prior workspace
online because lookupExistingChild's dedupe is parent-scoped (different
parent_id → "different" workspace). Caught 2026-05-08 after a dev-tree
re-import left 8 orphans co-existing with the new tree on canvas until
manual cascade-delete.

Three layers in this PR:

- mode="reconcile" on /org/import — after the import loop, online
  workspaces whose name matches an imported name but whose id isn't in
  the result set are cascade-deleted. Default mode "" / "merge"
  preserves existing additive behavior. Empty-set guards prevent
  accidental "delete everything" if either array comes up empty.

- WorkspaceHandler.CascadeDelete extracted as a callable helper from
  the existing Delete HTTP handler so OrgImport's reconcile path shares
  the same teardown sequence (#73 race guard, container stop, volume
  removal, token revocation, schedule disable, event broadcast). The
  HTTP Delete handler still inlines the same logic; deduplication
  tracked as tech-debt follow-up.

- emitOrgEvent(structure_events) records org.import.started +
  org.import.completed with mode, created/skipped/reconcile_removed
  counts, duration_ms, error. Replaces the lost-on-restart stdout-only
  log shape for an audit-trail surface that's queryable by SQL. Closes
  the "what happened at 20:13?" debugging gap that motivated this fix.

Verified live against the local platform: cascade-delete on an old
tree's removed root cleared 8 surviving orphans; mode="reconcile" with
a freshly-INSERTed fake orphan removed exactly the fake; idempotent
re-run of reconcile is a no-op (0 removed, no errors); structure_events
captures every started+completed pair with full payload.

7 new unit tests (walkOrgWorkspaceNames flat/nested/spawning:false/
empty-name; emitOrgEvent success + DB-error-swallow; errString). Full
handler suite green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
claude-ceo-assistant 2026-05-08 15:04:47 -07:00
parent 6f861926bd
commit 3de51faa19
3 changed files with 431 additions and 1 deletions

View File

@ -13,12 +13,15 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/gin-gonic/gin"
"github.com/lib/pq"
"gopkg.in/yaml.v3"
)
@ -568,11 +571,30 @@ func (h *OrgHandler) Import(c *gin.Context) {
var body struct {
Dir string `json:"dir"` // org template directory name
Template OrgTemplate `json:"template"` // or inline template
// Mode controls cleanup behavior of pre-existing workspaces:
// "" / "merge" — additive (default; current behavior).
// Existing workspaces matched by
// (parent_id, name) are skipped; nothing
// outside the new tree is touched.
// "reconcile" — additive + cleanup. After import, any
// online workspace whose name matches an
// imported workspace's name but whose id
// isn't in the import result set is
// cascade-deleted. Catches "previous
// import survived a re-import" zombies
// (the 20:13→21:17 dev-tree case).
Mode string `json:"mode"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
importStart := time.Now()
emitOrgEvent(c.Request.Context(), "org.import.started", map[string]any{
"name": body.Template.Name,
"dir": body.Dir,
"mode": body.Mode,
})
var tmpl OrgTemplate
var orgBaseDir string // base directory for files_dir resolution
@ -718,18 +740,171 @@ func (h *OrgHandler) Import(c *gin.Context) {
}
}
// Reconcile mode: prune workspaces present from a previous import that
// share a name with the new tree but are NOT in the new result set.
// Catches the additive-import bug where re-running /org/import with a
// changed tree shape (different parent_id for the same role name) leaves
// the prior workspace online — visible to the canvas, consuming
// containers, and looking like a duplicate. Default mode "" / "merge"
// preserves the old additive behavior.
reconcileRemovedCount := 0
reconcileSkipped := 0
reconcileErrs := []string{}
if body.Mode == "reconcile" && createErr == nil {
ctx := c.Request.Context()
importedNames := []string{}
walkOrgWorkspaceNames(tmpl.Workspaces, &importedNames)
importedIDs := make([]string, 0, len(results))
for _, r := range results {
if id, ok := r["id"].(string); ok && id != "" {
importedIDs = append(importedIDs, id)
}
}
// Empty-set guards: if the import didn't produce any names or any
// IDs, skip — querying with empty arrays would either match
// nothing (harmless) or, worse, match every workspace if a future
// query rewrite drops the IN clause. Belt-and-suspenders.
if len(importedNames) > 0 && len(importedIDs) > 0 {
rows, err := db.DB.QueryContext(ctx, `
SELECT id FROM workspaces
WHERE name = ANY($1::text[])
AND id != ALL($2::uuid[])
AND status != 'removed'
`, pq.Array(importedNames), pq.Array(importedIDs))
if err != nil {
log.Printf("Org import reconcile: orphan query failed: %v", err)
reconcileErrs = append(reconcileErrs, fmt.Sprintf("orphan query: %v", err))
} else {
orphanIDs := []string{}
for rows.Next() {
var orphanID string
if rows.Scan(&orphanID) == nil {
orphanIDs = append(orphanIDs, orphanID)
}
}
rows.Close()
for _, oid := range orphanIDs {
cascadeCount, stopErrs, err := h.workspace.CascadeDelete(ctx, oid)
if err != nil {
log.Printf("Org import reconcile: CascadeDelete(%s) failed: %v", oid, err)
reconcileErrs = append(reconcileErrs, fmt.Sprintf("delete %s: %v", oid, err))
reconcileSkipped++
continue
}
reconcileRemovedCount += 1 + cascadeCount
if len(stopErrs) > 0 {
log.Printf("Org import reconcile: %s had %d stop errors (orphan sweeper will retry)", oid, len(stopErrs))
}
}
log.Printf("Org import reconcile: %d orphans removed (%d cascade descendants), %d skipped", len(orphanIDs), reconcileRemovedCount-len(orphanIDs), reconcileSkipped)
}
}
}
status := http.StatusCreated
resp := gin.H{
"org": tmpl.Name,
"workspaces": results,
"count": len(results),
}
if body.Mode == "reconcile" {
resp["mode"] = "reconcile"
resp["reconcile_removed_count"] = reconcileRemovedCount
if len(reconcileErrs) > 0 {
resp["reconcile_errors"] = reconcileErrs
}
}
if createErr != nil {
status = http.StatusMultiStatus
resp["error"] = createErr.Error()
}
log.Printf("Org import: %s — %d workspaces created", tmpl.Name, len(results))
// results contains both freshly-created AND lookupExistingChild skips
// (entries with "skipped":true). Splitting the count here so the audit
// row reflects "what changed" vs "what was already there" — telemetry
// readers shouldn't need to grep stdout to tell an idempotent re-run
// apart from a fresh-create.
createdCount, skippedCount := 0, 0
for _, r := range results {
if skipped, _ := r["skipped"].(bool); skipped {
skippedCount++
} else {
createdCount++
}
}
log.Printf("Org import: %s — %d created, %d skipped, %d reconciled",
tmpl.Name, createdCount, skippedCount, reconcileRemovedCount)
emitOrgEvent(c.Request.Context(), "org.import.completed", map[string]any{
"name": tmpl.Name,
"dir": body.Dir,
"mode": body.Mode,
"created_count": createdCount,
"skipped_count": skippedCount,
"reconcile_removed_count": reconcileRemovedCount,
"reconcile_errors": len(reconcileErrs),
"duration_ms": time.Since(importStart).Milliseconds(),
"create_error": errString(createErr),
})
c.JSON(status, resp)
}
// walkOrgWorkspaceNames collects every Name in the tree (in any order) into
// names. Used by reconcile to detect orphan workspaces — workspaces with the
// same role name as a freshly-imported one but a different id, surviving from
// a prior import.
func walkOrgWorkspaceNames(workspaces []OrgWorkspace, names *[]string) {
for _, w := range workspaces {
// spawning:false subtrees are still part of the imported tree
// from a logical-tree perspective — DON'T skip the recursion,
// or reconcile would orphan the rest of the subtree on every
// re-import where spawning is toggled. Names of skipped
// workspaces remain registered so reconcile won't double-create
// them when spawning flips back to true.
if w.Name != "" {
*names = append(*names, w.Name)
}
walkOrgWorkspaceNames(w.Children, names)
}
}
// emitOrgEvent records an org-lifecycle event in structure_events so the
// import history is queryable independent of stdout log retention. Errors
// are logged and swallowed — never block the request path on telemetry.
//
// Event-type taxonomy (extend by appending; never rename):
//
// org.import.started — handler entered, request body parsed
// org.import.completed — handler exiting (success or partial)
// org.import.failed — handler exiting with an unrecoverable error
//
// payload fields are documented at each call site.
func emitOrgEvent(ctx context.Context, eventType string, payload map[string]any) {
if payload == nil {
payload = map[string]any{}
}
payloadJSON, err := json.Marshal(payload)
if err != nil {
log.Printf("emitOrgEvent: marshal %s payload failed: %v", eventType, err)
return
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO structure_events (event_type, payload, created_at)
VALUES ($1, $2, now())
`, eventType, payloadJSON); err != nil {
log.Printf("emitOrgEvent: insert %s failed: %v", eventType, err)
}
}
// errString returns "" for a nil error, err.Error() otherwise. Lets us put
// nullable error strings in event payloads without checking for nil at every
// call site.
func errString(err error) string {
if err == nil {
return ""
}
return err.Error()
}

View File

@ -0,0 +1,158 @@
package handlers
import (
"context"
"sort"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
// Tests for the reconcile-mode + audit-event additions to OrgHandler.Import.
//
// Background: /org/import was purely additive — re-running with a tree that
// renamed/reparented a role left the prior workspace online (different
// parent_id from the new one, so lookupExistingChild's parent-scoped dedupe
// missed it). The 2026-05-08 dev-tree case left 8 orphans surviving a
// re-import. mode="reconcile" closes the gap; emitOrgEvent makes "what
// happened at 20:13?" queryable instead of stdout-grep archaeology.
func TestWalkOrgWorkspaceNames_FlatTree(t *testing.T) {
tree := []OrgWorkspace{
{Name: "Dev Lead"},
{Name: "Release Manager"},
}
var names []string
walkOrgWorkspaceNames(tree, &names)
sort.Strings(names)
want := []string{"Dev Lead", "Release Manager"}
if !equalStrings(names, want) {
t.Errorf("flat tree: got %v, want %v", names, want)
}
}
func TestWalkOrgWorkspaceNames_NestedTree(t *testing.T) {
tree := []OrgWorkspace{
{
Name: "Dev Lead",
Children: []OrgWorkspace{
{Name: "Core Platform Lead", Children: []OrgWorkspace{{Name: "Core-BE"}}},
{Name: "SDK Lead"},
},
},
}
var names []string
walkOrgWorkspaceNames(tree, &names)
sort.Strings(names)
want := []string{"Core Platform Lead", "Core-BE", "Dev Lead", "SDK Lead"}
if !equalStrings(names, want) {
t.Errorf("nested tree: got %v, want %v", names, want)
}
}
// Pins the contract that spawning:false subtrees still contribute their names
// to the reconcile working set. If the walker started skipping them, a
// re-import that toggled spawning would orphan whichever workspaces had been
// previously imported with spawning:true — the inverse of the bug being
// fixed. Spawning gates *provisioning*, not *reconcile membership*.
func TestWalkOrgWorkspaceNames_SpawningFalseStillCounted(t *testing.T) {
f := false
tree := []OrgWorkspace{
{Name: "Dev Lead", Children: []OrgWorkspace{
{Name: "Skipped Lead", Spawning: &f, Children: []OrgWorkspace{
{Name: "Skipped Child"},
}},
}},
}
var names []string
walkOrgWorkspaceNames(tree, &names)
sort.Strings(names)
want := []string{"Dev Lead", "Skipped Child", "Skipped Lead"}
if !equalStrings(names, want) {
t.Errorf("spawning:false subtree: got %v, want %v", names, want)
}
}
func TestWalkOrgWorkspaceNames_EmptyNamesSkipped(t *testing.T) {
tree := []OrgWorkspace{
{Name: "Dev Lead"},
{Name: ""}, // YAML default / placeholder
{Name: "Release Manager"},
}
var names []string
walkOrgWorkspaceNames(tree, &names)
sort.Strings(names)
want := []string{"Dev Lead", "Release Manager"}
if !equalStrings(names, want) {
t.Errorf("empty-name skip: got %v, want %v", names, want)
}
}
// emitOrgEvent must INSERT into structure_events with event_type + JSON
// payload. Verifies the SQL shape pinning so a future schema rename
// (e.g., switching to audit_events) breaks the test loudly instead of
// silently dropping telemetry.
func TestEmitOrgEvent_InsertsToStructureEvents(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("org.import.started", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
emitOrgEvent(context.Background(), "org.import.started", map[string]any{
"name": "test-org",
"mode": "reconcile",
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
// Insert failures are log-and-swallow — telemetry MUST NOT block the
// caller path. If this regresses (e.g., a future patch returns the err),
// org-import requests would fail with HTTP 500 every time a structure_events
// INSERT hiccups, which is strictly worse than losing the row.
func TestEmitOrgEvent_DBErrorIsSwallowed(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("org.import.failed", sqlmock.AnyArg()).
WillReturnError(errSentinelTest)
// Must not panic; must not propagate. The function returns nothing,
// so the contract is "doesn't crash."
emitOrgEvent(context.Background(), "org.import.failed", map[string]any{
"err": "preflight failed",
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestErrString(t *testing.T) {
if got := errString(nil); got != "" {
t.Errorf("nil error: got %q, want empty", got)
}
if got := errString(errSentinelTest); got != "sentinel" {
t.Errorf("sentinel error: got %q, want \"sentinel\"", got)
}
}
// errSentinelTest is a marker error used for swallow-error assertions.
var errSentinelTest = sentinelErrTest{}
type sentinelErrTest struct{}
func (sentinelErrTest) Error() string { return "sentinel" }
func equalStrings(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

View File

@ -543,6 +543,103 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "removed", "cascade_deleted": len(descendantIDs)})
}
// CascadeDelete performs the cascade-removal sequence used by the HTTP
// DELETE handler and by OrgImport's reconcile mode: walk descendants, mark
// self+descendants 'removed' first (#73 race guard), stop containers / EC2s,
// remove volumes, revoke tokens, disable schedules, broadcast events.
//
// Idempotent against already-removed rows (the descendant CTE and all UPDATE
// guards skip status='removed'). Returns the number of cascaded descendants
// (not including id itself) and any per-workspace stop errors so callers can
// surface a retryable failure instead of a silent-leak.
//
// Caller is responsible for the children-confirmation gate (the HTTP handler
// returns 409 when children exist + ?confirm=true is missing); this helper
// always cascades.
func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) (int, []error, error) {
if err := validateWorkspaceID(id); err != nil {
return 0, nil, err
}
descendantIDs := []string{}
descRows, err := db.DB.QueryContext(ctx, `
WITH RECURSIVE descendants AS (
SELECT id FROM workspaces WHERE parent_id = $1 AND status != 'removed'
UNION ALL
SELECT w.id FROM workspaces w JOIN descendants d ON w.parent_id = d.id WHERE w.status != 'removed'
)
SELECT id FROM descendants
`, id)
if err != nil {
return 0, nil, fmt.Errorf("descendant query: %w", err)
}
for descRows.Next() {
var descID string
if descRows.Scan(&descID) == nil {
descendantIDs = append(descendantIDs, descID)
}
}
descRows.Close()
allIDs := append([]string{id}, descendantIDs...)
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, updated_at = now() WHERE id = ANY($2::uuid[])`,
models.StatusRemoved, pq.Array(allIDs)); err != nil {
log.Printf("CascadeDelete status update for %s: %v", id, err)
}
if _, err := db.DB.ExecContext(ctx,
`DELETE FROM canvas_layouts WHERE workspace_id = ANY($1::uuid[])`,
pq.Array(allIDs)); err != nil {
log.Printf("CascadeDelete canvas_layouts for %s: %v", id, err)
}
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspace_auth_tokens SET revoked_at = now()
WHERE workspace_id = ANY($1::uuid[]) AND revoked_at IS NULL`,
pq.Array(allIDs)); err != nil {
log.Printf("CascadeDelete token revocation for %s: %v", id, err)
}
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspace_schedules SET enabled = false, updated_at = now()
WHERE workspace_id = ANY($1::uuid[]) AND enabled = true`,
pq.Array(allIDs)); err != nil {
log.Printf("CascadeDelete schedule disable for %s: %v", id, err)
}
cleanupCtx, cleanupCancel := context.WithTimeout(
context.WithoutCancel(ctx), 30*time.Second)
defer cleanupCancel()
var stopErrs []error
stopAndRemove := func(wsID string) {
if err := h.StopWorkspaceAuto(cleanupCtx, wsID); err != nil {
log.Printf("CascadeDelete %s stop failed: %v — leaving cleanup for orphan sweeper", wsID, err)
stopErrs = append(stopErrs, fmt.Errorf("stop %s: %w", wsID, err))
return
}
if h.provisioner != nil {
if err := h.provisioner.RemoveVolume(cleanupCtx, wsID); err != nil {
log.Printf("CascadeDelete %s volume removal warning: %v", wsID, err)
}
}
}
for _, descID := range descendantIDs {
stopAndRemove(descID)
db.ClearWorkspaceKeys(cleanupCtx, descID)
restartStates.Delete(descID)
h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), descID, map[string]interface{}{})
}
stopAndRemove(id)
db.ClearWorkspaceKeys(cleanupCtx, id)
restartStates.Delete(id)
h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), id, map[string]interface{}{
"cascade_deleted": len(descendantIDs),
})
return len(descendantIDs), stopErrs, nil
}
// validateWorkspaceID returns an error when id is not a valid UUID.
// #687: prevents 500s from Postgres when a garbage string (e.g. ../../etc/passwd)
// is passed as the :id path parameter.