Merge pull request 'fix(org-import): reconcile mode + audit-event emission' (#137) from fix/org-import-reconcile-and-audit into main
All checks were successful
CodeQL / Analyze (${{ matrix.language }}) (python) (push) Successful in 0s
CodeQL / Analyze (${{ matrix.language }}) (go) (push) Successful in 0s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (push) Successful in 0s
Block internal-flavored paths / Block forbidden paths (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 6s
CI / Detect changes (push) Successful in 8s
Handlers Postgres Integration / detect-changes (push) Successful in 8s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 8s
Harness Replays / detect-changes (push) Successful in 8s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 6s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 8s
CI / Shellcheck (E2E scripts) (push) Successful in 3s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 3s
CI / Python Lint & Test (push) Successful in 36s
CI / Canvas (Next.js) (push) Successful in 58s
CI / Canvas Deploy Reminder (push) Has been skipped
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 59s
Harness Replays / Harness Replays (push) Successful in 1m10s
publish-workspace-server-image / build-and-push (push) Successful in 2m11s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2m23s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 2m25s
CI / Platform (Go) (push) Successful in 3m26s
All checks were successful
CodeQL / Analyze (${{ matrix.language }}) (python) (push) Successful in 0s
CodeQL / Analyze (${{ matrix.language }}) (go) (push) Successful in 0s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (push) Successful in 0s
Block internal-flavored paths / Block forbidden paths (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 6s
CI / Detect changes (push) Successful in 8s
Handlers Postgres Integration / detect-changes (push) Successful in 8s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 8s
Harness Replays / detect-changes (push) Successful in 8s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 6s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 8s
CI / Shellcheck (E2E scripts) (push) Successful in 3s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 3s
CI / Python Lint & Test (push) Successful in 36s
CI / Canvas (Next.js) (push) Successful in 58s
CI / Canvas Deploy Reminder (push) Has been skipped
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 59s
Harness Replays / Harness Replays (push) Successful in 1m10s
publish-workspace-server-image / build-and-push (push) Successful in 2m11s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2m23s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 2m25s
CI / Platform (Go) (push) Successful in 3m26s
This commit is contained in:
commit
c94ead1953
@ -13,12 +13,15 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
|
||||||
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/lib/pq"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -568,11 +571,30 @@ func (h *OrgHandler) Import(c *gin.Context) {
|
|||||||
var body struct {
|
var body struct {
|
||||||
Dir string `json:"dir"` // org template directory name
|
Dir string `json:"dir"` // org template directory name
|
||||||
Template OrgTemplate `json:"template"` // or inline template
|
Template OrgTemplate `json:"template"` // or inline template
|
||||||
|
// Mode controls cleanup behavior of pre-existing workspaces:
|
||||||
|
// "" / "merge" — additive (default; current behavior).
|
||||||
|
// Existing workspaces matched by
|
||||||
|
// (parent_id, name) are skipped; nothing
|
||||||
|
// outside the new tree is touched.
|
||||||
|
// "reconcile" — additive + cleanup. After import, any
|
||||||
|
// online workspace whose name matches an
|
||||||
|
// imported workspace's name but whose id
|
||||||
|
// isn't in the import result set is
|
||||||
|
// cascade-deleted. Catches "previous
|
||||||
|
// import survived a re-import" zombies
|
||||||
|
// (the 20:13→21:17 dev-tree case).
|
||||||
|
Mode string `json:"mode"`
|
||||||
}
|
}
|
||||||
if err := c.ShouldBindJSON(&body); err != nil {
|
if err := c.ShouldBindJSON(&body); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
importStart := time.Now()
|
||||||
|
emitOrgEvent(c.Request.Context(), "org.import.started", map[string]any{
|
||||||
|
"name": body.Template.Name,
|
||||||
|
"dir": body.Dir,
|
||||||
|
"mode": body.Mode,
|
||||||
|
})
|
||||||
|
|
||||||
var tmpl OrgTemplate
|
var tmpl OrgTemplate
|
||||||
var orgBaseDir string // base directory for files_dir resolution
|
var orgBaseDir string // base directory for files_dir resolution
|
||||||
@ -718,18 +740,171 @@ func (h *OrgHandler) Import(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reconcile mode: prune workspaces present from a previous import that
|
||||||
|
// share a name with the new tree but are NOT in the new result set.
|
||||||
|
// Catches the additive-import bug where re-running /org/import with a
|
||||||
|
// changed tree shape (different parent_id for the same role name) leaves
|
||||||
|
// the prior workspace online — visible to the canvas, consuming
|
||||||
|
// containers, and looking like a duplicate. Default mode "" / "merge"
|
||||||
|
// preserves the old additive behavior.
|
||||||
|
reconcileRemovedCount := 0
|
||||||
|
reconcileSkipped := 0
|
||||||
|
reconcileErrs := []string{}
|
||||||
|
if body.Mode == "reconcile" && createErr == nil {
|
||||||
|
ctx := c.Request.Context()
|
||||||
|
importedNames := []string{}
|
||||||
|
walkOrgWorkspaceNames(tmpl.Workspaces, &importedNames)
|
||||||
|
|
||||||
|
importedIDs := make([]string, 0, len(results))
|
||||||
|
for _, r := range results {
|
||||||
|
if id, ok := r["id"].(string); ok && id != "" {
|
||||||
|
importedIDs = append(importedIDs, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Empty-set guards: if the import didn't produce any names or any
|
||||||
|
// IDs, skip — querying with empty arrays would either match
|
||||||
|
// nothing (harmless) or, worse, match every workspace if a future
|
||||||
|
// query rewrite drops the IN clause. Belt-and-suspenders.
|
||||||
|
if len(importedNames) > 0 && len(importedIDs) > 0 {
|
||||||
|
rows, err := db.DB.QueryContext(ctx, `
|
||||||
|
SELECT id FROM workspaces
|
||||||
|
WHERE name = ANY($1::text[])
|
||||||
|
AND id != ALL($2::uuid[])
|
||||||
|
AND status != 'removed'
|
||||||
|
`, pq.Array(importedNames), pq.Array(importedIDs))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Org import reconcile: orphan query failed: %v", err)
|
||||||
|
reconcileErrs = append(reconcileErrs, fmt.Sprintf("orphan query: %v", err))
|
||||||
|
} else {
|
||||||
|
orphanIDs := []string{}
|
||||||
|
for rows.Next() {
|
||||||
|
var orphanID string
|
||||||
|
if rows.Scan(&orphanID) == nil {
|
||||||
|
orphanIDs = append(orphanIDs, orphanID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
|
||||||
|
for _, oid := range orphanIDs {
|
||||||
|
cascadeCount, stopErrs, err := h.workspace.CascadeDelete(ctx, oid)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Org import reconcile: CascadeDelete(%s) failed: %v", oid, err)
|
||||||
|
reconcileErrs = append(reconcileErrs, fmt.Sprintf("delete %s: %v", oid, err))
|
||||||
|
reconcileSkipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
reconcileRemovedCount += 1 + cascadeCount
|
||||||
|
if len(stopErrs) > 0 {
|
||||||
|
log.Printf("Org import reconcile: %s had %d stop errors (orphan sweeper will retry)", oid, len(stopErrs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Printf("Org import reconcile: %d orphans removed (%d cascade descendants), %d skipped", len(orphanIDs), reconcileRemovedCount-len(orphanIDs), reconcileSkipped)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
status := http.StatusCreated
|
status := http.StatusCreated
|
||||||
resp := gin.H{
|
resp := gin.H{
|
||||||
"org": tmpl.Name,
|
"org": tmpl.Name,
|
||||||
"workspaces": results,
|
"workspaces": results,
|
||||||
"count": len(results),
|
"count": len(results),
|
||||||
}
|
}
|
||||||
|
if body.Mode == "reconcile" {
|
||||||
|
resp["mode"] = "reconcile"
|
||||||
|
resp["reconcile_removed_count"] = reconcileRemovedCount
|
||||||
|
if len(reconcileErrs) > 0 {
|
||||||
|
resp["reconcile_errors"] = reconcileErrs
|
||||||
|
}
|
||||||
|
}
|
||||||
if createErr != nil {
|
if createErr != nil {
|
||||||
status = http.StatusMultiStatus
|
status = http.StatusMultiStatus
|
||||||
resp["error"] = createErr.Error()
|
resp["error"] = createErr.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Org import: %s — %d workspaces created", tmpl.Name, len(results))
|
// results contains both freshly-created AND lookupExistingChild skips
|
||||||
|
// (entries with "skipped":true). Splitting the count here so the audit
|
||||||
|
// row reflects "what changed" vs "what was already there" — telemetry
|
||||||
|
// readers shouldn't need to grep stdout to tell an idempotent re-run
|
||||||
|
// apart from a fresh-create.
|
||||||
|
createdCount, skippedCount := 0, 0
|
||||||
|
for _, r := range results {
|
||||||
|
if skipped, _ := r["skipped"].(bool); skipped {
|
||||||
|
skippedCount++
|
||||||
|
} else {
|
||||||
|
createdCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Printf("Org import: %s — %d created, %d skipped, %d reconciled",
|
||||||
|
tmpl.Name, createdCount, skippedCount, reconcileRemovedCount)
|
||||||
|
emitOrgEvent(c.Request.Context(), "org.import.completed", map[string]any{
|
||||||
|
"name": tmpl.Name,
|
||||||
|
"dir": body.Dir,
|
||||||
|
"mode": body.Mode,
|
||||||
|
"created_count": createdCount,
|
||||||
|
"skipped_count": skippedCount,
|
||||||
|
"reconcile_removed_count": reconcileRemovedCount,
|
||||||
|
"reconcile_errors": len(reconcileErrs),
|
||||||
|
"duration_ms": time.Since(importStart).Milliseconds(),
|
||||||
|
"create_error": errString(createErr),
|
||||||
|
})
|
||||||
c.JSON(status, resp)
|
c.JSON(status, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// walkOrgWorkspaceNames collects every Name in the tree (in any order) into
|
||||||
|
// names. Used by reconcile to detect orphan workspaces — workspaces with the
|
||||||
|
// same role name as a freshly-imported one but a different id, surviving from
|
||||||
|
// a prior import.
|
||||||
|
func walkOrgWorkspaceNames(workspaces []OrgWorkspace, names *[]string) {
|
||||||
|
for _, w := range workspaces {
|
||||||
|
// spawning:false subtrees are still part of the imported tree
|
||||||
|
// from a logical-tree perspective — DON'T skip the recursion,
|
||||||
|
// or reconcile would orphan the rest of the subtree on every
|
||||||
|
// re-import where spawning is toggled. Names of skipped
|
||||||
|
// workspaces remain registered so reconcile won't double-create
|
||||||
|
// them when spawning flips back to true.
|
||||||
|
if w.Name != "" {
|
||||||
|
*names = append(*names, w.Name)
|
||||||
|
}
|
||||||
|
walkOrgWorkspaceNames(w.Children, names)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// emitOrgEvent records an org-lifecycle event in structure_events so the
|
||||||
|
// import history is queryable independent of stdout log retention. Errors
|
||||||
|
// are logged and swallowed — never block the request path on telemetry.
|
||||||
|
//
|
||||||
|
// Event-type taxonomy (extend by appending; never rename):
|
||||||
|
//
|
||||||
|
// org.import.started — handler entered, request body parsed
|
||||||
|
// org.import.completed — handler exiting (success or partial)
|
||||||
|
// org.import.failed — handler exiting with an unrecoverable error
|
||||||
|
//
|
||||||
|
// payload fields are documented at each call site.
|
||||||
|
func emitOrgEvent(ctx context.Context, eventType string, payload map[string]any) {
|
||||||
|
if payload == nil {
|
||||||
|
payload = map[string]any{}
|
||||||
|
}
|
||||||
|
payloadJSON, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("emitOrgEvent: marshal %s payload failed: %v", eventType, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err := db.DB.ExecContext(ctx, `
|
||||||
|
INSERT INTO structure_events (event_type, payload, created_at)
|
||||||
|
VALUES ($1, $2, now())
|
||||||
|
`, eventType, payloadJSON); err != nil {
|
||||||
|
log.Printf("emitOrgEvent: insert %s failed: %v", eventType, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// errString returns "" for a nil error, err.Error() otherwise. Lets us put
|
||||||
|
// nullable error strings in event payloads without checking for nil at every
|
||||||
|
// call site.
|
||||||
|
func errString(err error) string {
|
||||||
|
if err == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
158
workspace-server/internal/handlers/org_import_reconcile_test.go
Normal file
158
workspace-server/internal/handlers/org_import_reconcile_test.go
Normal file
@ -0,0 +1,158 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Tests for the reconcile-mode + audit-event additions to OrgHandler.Import.
|
||||||
|
//
|
||||||
|
// Background: /org/import was purely additive — re-running with a tree that
|
||||||
|
// renamed/reparented a role left the prior workspace online (different
|
||||||
|
// parent_id from the new one, so lookupExistingChild's parent-scoped dedupe
|
||||||
|
// missed it). The 2026-05-08 dev-tree case left 8 orphans surviving a
|
||||||
|
// re-import. mode="reconcile" closes the gap; emitOrgEvent makes "what
|
||||||
|
// happened at 20:13?" queryable instead of stdout-grep archaeology.
|
||||||
|
|
||||||
|
func TestWalkOrgWorkspaceNames_FlatTree(t *testing.T) {
|
||||||
|
tree := []OrgWorkspace{
|
||||||
|
{Name: "Dev Lead"},
|
||||||
|
{Name: "Release Manager"},
|
||||||
|
}
|
||||||
|
var names []string
|
||||||
|
walkOrgWorkspaceNames(tree, &names)
|
||||||
|
sort.Strings(names)
|
||||||
|
want := []string{"Dev Lead", "Release Manager"}
|
||||||
|
if !equalStrings(names, want) {
|
||||||
|
t.Errorf("flat tree: got %v, want %v", names, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalkOrgWorkspaceNames_NestedTree(t *testing.T) {
|
||||||
|
tree := []OrgWorkspace{
|
||||||
|
{
|
||||||
|
Name: "Dev Lead",
|
||||||
|
Children: []OrgWorkspace{
|
||||||
|
{Name: "Core Platform Lead", Children: []OrgWorkspace{{Name: "Core-BE"}}},
|
||||||
|
{Name: "SDK Lead"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var names []string
|
||||||
|
walkOrgWorkspaceNames(tree, &names)
|
||||||
|
sort.Strings(names)
|
||||||
|
want := []string{"Core Platform Lead", "Core-BE", "Dev Lead", "SDK Lead"}
|
||||||
|
if !equalStrings(names, want) {
|
||||||
|
t.Errorf("nested tree: got %v, want %v", names, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pins the contract that spawning:false subtrees still contribute their names
|
||||||
|
// to the reconcile working set. If the walker started skipping them, a
|
||||||
|
// re-import that toggled spawning would orphan whichever workspaces had been
|
||||||
|
// previously imported with spawning:true — the inverse of the bug being
|
||||||
|
// fixed. Spawning gates *provisioning*, not *reconcile membership*.
|
||||||
|
func TestWalkOrgWorkspaceNames_SpawningFalseStillCounted(t *testing.T) {
|
||||||
|
f := false
|
||||||
|
tree := []OrgWorkspace{
|
||||||
|
{Name: "Dev Lead", Children: []OrgWorkspace{
|
||||||
|
{Name: "Skipped Lead", Spawning: &f, Children: []OrgWorkspace{
|
||||||
|
{Name: "Skipped Child"},
|
||||||
|
}},
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
var names []string
|
||||||
|
walkOrgWorkspaceNames(tree, &names)
|
||||||
|
sort.Strings(names)
|
||||||
|
want := []string{"Dev Lead", "Skipped Child", "Skipped Lead"}
|
||||||
|
if !equalStrings(names, want) {
|
||||||
|
t.Errorf("spawning:false subtree: got %v, want %v", names, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWalkOrgWorkspaceNames_EmptyNamesSkipped(t *testing.T) {
|
||||||
|
tree := []OrgWorkspace{
|
||||||
|
{Name: "Dev Lead"},
|
||||||
|
{Name: ""}, // YAML default / placeholder
|
||||||
|
{Name: "Release Manager"},
|
||||||
|
}
|
||||||
|
var names []string
|
||||||
|
walkOrgWorkspaceNames(tree, &names)
|
||||||
|
sort.Strings(names)
|
||||||
|
want := []string{"Dev Lead", "Release Manager"}
|
||||||
|
if !equalStrings(names, want) {
|
||||||
|
t.Errorf("empty-name skip: got %v, want %v", names, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// emitOrgEvent must INSERT into structure_events with event_type + JSON
|
||||||
|
// payload. Verifies the SQL shape pinning so a future schema rename
|
||||||
|
// (e.g., switching to audit_events) breaks the test loudly instead of
|
||||||
|
// silently dropping telemetry.
|
||||||
|
func TestEmitOrgEvent_InsertsToStructureEvents(t *testing.T) {
|
||||||
|
mock := setupTestDB(t)
|
||||||
|
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||||
|
WithArgs("org.import.started", sqlmock.AnyArg()).
|
||||||
|
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||||
|
|
||||||
|
emitOrgEvent(context.Background(), "org.import.started", map[string]any{
|
||||||
|
"name": "test-org",
|
||||||
|
"mode": "reconcile",
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("sqlmock expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert failures are log-and-swallow — telemetry MUST NOT block the
|
||||||
|
// caller path. If this regresses (e.g., a future patch returns the err),
|
||||||
|
// org-import requests would fail with HTTP 500 every time a structure_events
|
||||||
|
// INSERT hiccups, which is strictly worse than losing the row.
|
||||||
|
func TestEmitOrgEvent_DBErrorIsSwallowed(t *testing.T) {
|
||||||
|
mock := setupTestDB(t)
|
||||||
|
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||||
|
WithArgs("org.import.failed", sqlmock.AnyArg()).
|
||||||
|
WillReturnError(errSentinelTest)
|
||||||
|
|
||||||
|
// Must not panic; must not propagate. The function returns nothing,
|
||||||
|
// so the contract is "doesn't crash."
|
||||||
|
emitOrgEvent(context.Background(), "org.import.failed", map[string]any{
|
||||||
|
"err": "preflight failed",
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("sqlmock expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestErrString(t *testing.T) {
|
||||||
|
if got := errString(nil); got != "" {
|
||||||
|
t.Errorf("nil error: got %q, want empty", got)
|
||||||
|
}
|
||||||
|
if got := errString(errSentinelTest); got != "sentinel" {
|
||||||
|
t.Errorf("sentinel error: got %q, want \"sentinel\"", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// errSentinelTest is a marker error used for swallow-error assertions.
|
||||||
|
var errSentinelTest = sentinelErrTest{}
|
||||||
|
|
||||||
|
type sentinelErrTest struct{}
|
||||||
|
|
||||||
|
func (sentinelErrTest) Error() string { return "sentinel" }
|
||||||
|
|
||||||
|
func equalStrings(a, b []string) bool {
|
||||||
|
if len(a) != len(b) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i := range a {
|
||||||
|
if a[i] != b[i] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
@ -543,6 +543,103 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, gin.H{"status": "removed", "cascade_deleted": len(descendantIDs)})
|
c.JSON(http.StatusOK, gin.H{"status": "removed", "cascade_deleted": len(descendantIDs)})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CascadeDelete performs the cascade-removal sequence used by the HTTP
|
||||||
|
// DELETE handler and by OrgImport's reconcile mode: walk descendants, mark
|
||||||
|
// self+descendants 'removed' first (#73 race guard), stop containers / EC2s,
|
||||||
|
// remove volumes, revoke tokens, disable schedules, broadcast events.
|
||||||
|
//
|
||||||
|
// Idempotent against already-removed rows (the descendant CTE and all UPDATE
|
||||||
|
// guards skip status='removed'). Returns the number of cascaded descendants
|
||||||
|
// (not including id itself) and any per-workspace stop errors so callers can
|
||||||
|
// surface a retryable failure instead of a silent-leak.
|
||||||
|
//
|
||||||
|
// Caller is responsible for the children-confirmation gate (the HTTP handler
|
||||||
|
// returns 409 when children exist + ?confirm=true is missing); this helper
|
||||||
|
// always cascades.
|
||||||
|
func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) (int, []error, error) {
|
||||||
|
if err := validateWorkspaceID(id); err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
descendantIDs := []string{}
|
||||||
|
descRows, err := db.DB.QueryContext(ctx, `
|
||||||
|
WITH RECURSIVE descendants AS (
|
||||||
|
SELECT id FROM workspaces WHERE parent_id = $1 AND status != 'removed'
|
||||||
|
UNION ALL
|
||||||
|
SELECT w.id FROM workspaces w JOIN descendants d ON w.parent_id = d.id WHERE w.status != 'removed'
|
||||||
|
)
|
||||||
|
SELECT id FROM descendants
|
||||||
|
`, id)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("descendant query: %w", err)
|
||||||
|
}
|
||||||
|
for descRows.Next() {
|
||||||
|
var descID string
|
||||||
|
if descRows.Scan(&descID) == nil {
|
||||||
|
descendantIDs = append(descendantIDs, descID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
descRows.Close()
|
||||||
|
|
||||||
|
allIDs := append([]string{id}, descendantIDs...)
|
||||||
|
|
||||||
|
if _, err := db.DB.ExecContext(ctx,
|
||||||
|
`UPDATE workspaces SET status = $1, updated_at = now() WHERE id = ANY($2::uuid[])`,
|
||||||
|
models.StatusRemoved, pq.Array(allIDs)); err != nil {
|
||||||
|
log.Printf("CascadeDelete status update for %s: %v", id, err)
|
||||||
|
}
|
||||||
|
if _, err := db.DB.ExecContext(ctx,
|
||||||
|
`DELETE FROM canvas_layouts WHERE workspace_id = ANY($1::uuid[])`,
|
||||||
|
pq.Array(allIDs)); err != nil {
|
||||||
|
log.Printf("CascadeDelete canvas_layouts for %s: %v", id, err)
|
||||||
|
}
|
||||||
|
if _, err := db.DB.ExecContext(ctx,
|
||||||
|
`UPDATE workspace_auth_tokens SET revoked_at = now()
|
||||||
|
WHERE workspace_id = ANY($1::uuid[]) AND revoked_at IS NULL`,
|
||||||
|
pq.Array(allIDs)); err != nil {
|
||||||
|
log.Printf("CascadeDelete token revocation for %s: %v", id, err)
|
||||||
|
}
|
||||||
|
if _, err := db.DB.ExecContext(ctx,
|
||||||
|
`UPDATE workspace_schedules SET enabled = false, updated_at = now()
|
||||||
|
WHERE workspace_id = ANY($1::uuid[]) AND enabled = true`,
|
||||||
|
pq.Array(allIDs)); err != nil {
|
||||||
|
log.Printf("CascadeDelete schedule disable for %s: %v", id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanupCtx, cleanupCancel := context.WithTimeout(
|
||||||
|
context.WithoutCancel(ctx), 30*time.Second)
|
||||||
|
defer cleanupCancel()
|
||||||
|
|
||||||
|
var stopErrs []error
|
||||||
|
stopAndRemove := func(wsID string) {
|
||||||
|
if err := h.StopWorkspaceAuto(cleanupCtx, wsID); err != nil {
|
||||||
|
log.Printf("CascadeDelete %s stop failed: %v — leaving cleanup for orphan sweeper", wsID, err)
|
||||||
|
stopErrs = append(stopErrs, fmt.Errorf("stop %s: %w", wsID, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if h.provisioner != nil {
|
||||||
|
if err := h.provisioner.RemoveVolume(cleanupCtx, wsID); err != nil {
|
||||||
|
log.Printf("CascadeDelete %s volume removal warning: %v", wsID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, descID := range descendantIDs {
|
||||||
|
stopAndRemove(descID)
|
||||||
|
db.ClearWorkspaceKeys(cleanupCtx, descID)
|
||||||
|
restartStates.Delete(descID)
|
||||||
|
h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), descID, map[string]interface{}{})
|
||||||
|
}
|
||||||
|
stopAndRemove(id)
|
||||||
|
db.ClearWorkspaceKeys(cleanupCtx, id)
|
||||||
|
restartStates.Delete(id)
|
||||||
|
h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), id, map[string]interface{}{
|
||||||
|
"cascade_deleted": len(descendantIDs),
|
||||||
|
})
|
||||||
|
|
||||||
|
return len(descendantIDs), stopErrs, nil
|
||||||
|
}
|
||||||
|
|
||||||
// validateWorkspaceID returns an error when id is not a valid UUID.
|
// validateWorkspaceID returns an error when id is not a valid UUID.
|
||||||
// #687: prevents 500s from Postgres when a garbage string (e.g. ../../etc/passwd)
|
// #687: prevents 500s from Postgres when a garbage string (e.g. ../../etc/passwd)
|
||||||
// is passed as the :id path parameter.
|
// is passed as the :id path parameter.
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user