From ed6dfe01e5dc00fc2d42221e0a6edefae4f16b1e Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 20:43:06 -0700 Subject: [PATCH] feat(delegations): durable per-task ledger + audit-write helper (RFC #2829 PR-1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the `delegations` table and the DelegationLedger writer that PRs #2-#4 of RFC #2829 build on. Schema-only foundation — no behavior change in this PR. PR-2 wires the ledger into the existing handlers and ships the result- push-to-inbox cutover behind a feature flag. Why a dedicated table when activity_logs already records every delegation event: Today, "what is currently in flight for this workspace" is reconstructed by GROUPing activity_logs by delegation_id and ORDER BY created_at DESC. PR-3's stuck-task sweeper needs the join SELECT delegation_id FROM delegations WHERE status = 'in_progress' AND last_heartbeat < now() - interval '10 minutes' which is impossible to express against the event stream without a window over every (delegation_id, latest event) pair — a planner-killing query at scale. The dedicated table makes the sweeper an indexed scan. Same posture as tenant_resources (PR #2343, memory `reference_tenant_resources_audit`): activity_logs remains the audit- grade source of truth, delegations is the queryable view for dashboards + sweeper joins. Symmetric writes — both tables are written, neither blocks orchestration on the other's failure. Schema highlights: - delegation_id PRIMARY KEY (caller-chosen, idempotent retry on restart is a no-op via ON CONFLICT DO NOTHING) - caller_id / callee_id NOT FK — workspace delete must NOT cascade- delete delegation history (audit retention) - status CHECK constraint enforces the lifecycle (queued|dispatched|in_progress|completed|failed|stuck) - last_heartbeat NULL-able; PR-3 sweeper compares to NOW() - deadline default now()+6h matches longest-observed legit delegation (memory-namespace migrations) — protects against forever-heartbeating wedged agents - Partial index `idx_delegations_inflight_heartbeat` keeps the sweeper hot path tiny (only non-terminal rows) - UNIQUE(caller_id, idempotency_key) WHERE NOT NULL — natural collision becomes ON CONFLICT no-op without colliding across callers DelegationLedger.SetStatus enforces forward-only on terminal states (completed/failed/stuck cannot be revised) as defense-in-depth on the schema CHECK. Same-status replay is a no-op. Missing-row SetStatus is a no-op (transient inconsistency the next agent retry will heal). Heartbeat updates only in-flight rows — terminal-state delegations are silently skipped. Coverage: - 17 unit tests against sqlmock-backed *sql.DB (Insert happy path, missing-required guards, truncation, lifecycle transitions, terminal forward-only protection, replay no-op, missing-row no-op, empty-input rejection, heartbeat semantics, transition table shape) - Migration roundtrip verified on a real Postgres 15 instance: up creates the expected schema with all 4 indexes + CHECK, down drops everything cleanly. Refs RFC #2829. --- .../internal/handlers/delegation_ledger.go | 200 +++++++++++ .../handlers/delegation_ledger_test.go | 312 ++++++++++++++++++ .../migrations/049_delegations.down.sql | 5 + .../migrations/049_delegations.up.sql | 99 ++++++ 4 files changed, 616 insertions(+) create mode 100644 workspace-server/internal/handlers/delegation_ledger.go create mode 100644 workspace-server/internal/handlers/delegation_ledger_test.go create mode 100644 workspace-server/migrations/049_delegations.down.sql create mode 100644 workspace-server/migrations/049_delegations.up.sql diff --git a/workspace-server/internal/handlers/delegation_ledger.go b/workspace-server/internal/handlers/delegation_ledger.go new file mode 100644 index 00000000..9a783ece --- /dev/null +++ b/workspace-server/internal/handlers/delegation_ledger.go @@ -0,0 +1,200 @@ +package handlers + +import ( + "context" + "database/sql" + "errors" + "log" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" +) + +// delegation_ledger.go — durable per-task ledger for A2A delegation +// (RFC #2829 PR-1). +// +// activity_logs is an event stream — one row per state transition. Replaying +// the stream gives you history. This file's table (delegations) is the +// folded current state — one row per delegation_id with a single status, +// last_heartbeat, deadline, and result_preview. +// +// Why both: PR-3 needs a sweeper that joins on +// (status='in_progress' AND last_heartbeat < now() - interval '10 minutes') +// which is impossible to express against the event stream without a window +// function over every (delegation_id, latest event) pair — a planner-killing +// query at scale. The dedicated table makes the sweeper an indexed scan. +// +// Writes go to BOTH tables. activity_logs remains the audit-grade record +// for forensics; delegations is the queryable view for dashboards + sweeper +// joins. Symmetric-write pattern — same posture as tenant_resources (PR +// #2343), per memory `reference_tenant_resources_audit`. + +// DelegationLedger writes the per-task durable row alongside the existing +// activity_logs event-stream writes. All methods are best-effort: a ledger +// write failure logs but does NOT propagate up — activity_logs remains the +// audit-grade source of truth. +// +// Same shape as `tenant_resources` reconciler (PR #2343): orchestration +// continues even when the ledger write fails, and the next status update +// (or PR-3 reconciler) will heal the ledger. +type DelegationLedger struct { + db *sql.DB +} + +// NewDelegationLedger returns a ledger backed by the package db handle. +// Tests can construct one with a sqlmock-backed *sql.DB. +func NewDelegationLedger(handle *sql.DB) *DelegationLedger { + if handle == nil { + handle = db.DB + } + return &DelegationLedger{db: handle} +} + +// truncatePreview caps stored preview at 4KB. The full prompt/response is +// already in activity_logs.{request,response}_body — this is the at-a-glance +// view for the dashboard, not a forensic record. +const previewCap = 4096 + +func truncatePreview(s string) string { + if len(s) <= previewCap { + return s + } + return s[:previewCap] +} + +// InsertOpts is the agent's record-of-intent. Caller, callee, task preview, +// and the chosen delegation_id are required; idempotency_key is optional. +type InsertOpts struct { + DelegationID string + CallerID string + CalleeID string + TaskPreview string + IdempotencyKey string // empty → NULL + // Deadline defaults to now + 6h when zero. Callers can pass a tighter + // per-task deadline (cron, interactive request) by setting it. + Deadline time.Time +} + +// Insert writes the queued row. ON CONFLICT (delegation_id) DO NOTHING so +// the agent's retry-on-restart codepath is naturally idempotent — a duplicate +// Insert with the same delegation_id is a no-op. (Idempotency_key dedupe is +// a separate UNIQUE index handled by the same DO NOTHING.) +func (l *DelegationLedger) Insert(ctx context.Context, opts InsertOpts) { + if opts.DelegationID == "" || opts.CallerID == "" || opts.CalleeID == "" { + log.Printf("delegation_ledger Insert: missing required field, skipping") + return + } + deadline := opts.Deadline + if deadline.IsZero() { + deadline = time.Now().Add(6 * time.Hour) + } + idemArg := sql.NullString{String: opts.IdempotencyKey, Valid: opts.IdempotencyKey != ""} + _, err := l.db.ExecContext(ctx, ` + INSERT INTO delegations ( + delegation_id, caller_id, callee_id, task_preview, + status, deadline, idempotency_key + ) VALUES ($1, $2, $3, $4, 'queued', $5, $6) + ON CONFLICT (delegation_id) DO NOTHING + `, opts.DelegationID, opts.CallerID, opts.CalleeID, + truncatePreview(opts.TaskPreview), deadline, idemArg) + if err != nil { + log.Printf("delegation_ledger Insert(%s): %v", opts.DelegationID, err) + } +} + +// allowedTransitions enforces the lifecycle in code as defense-in-depth on +// the schema CHECK. Terminal states (completed, failed, stuck) reject any +// further status update — once a delegation is done, it stays done. +// +// The "queued → in_progress" jump (skipping dispatched) is allowed: lazy +// callers that don't ack the dispatched stage shouldn't be penalised, +// since the agent ultimately cares about whether work started, not which +// HTTP layer happened to ack first. +var allowedTransitions = map[string]map[string]bool{ + "queued": {"dispatched": true, "in_progress": true, "failed": true}, + "dispatched": {"in_progress": true, "completed": true, "failed": true}, + "in_progress": {"completed": true, "failed": true, "stuck": true}, +} + +// ErrInvalidTransition is returned by SetStatus when the transition would +// move out of a terminal state. Callers SHOULD ignore (it's a duplicate +// terminal write) but they're surfaced for tests. +var ErrInvalidTransition = errors.New("delegation ledger: invalid status transition") + +// SetStatus is the catch-all updater. Status MUST be one of the lifecycle +// values. errorDetail is non-empty only for failed/stuck. resultPreview is +// non-empty only for completed. +// +// Idempotent: re-applying the same terminal status with the same payload +// returns nil; transitioning back out of a terminal state returns +// ErrInvalidTransition. (Forward-only protection — once 'completed' you +// don't get to revise to 'failed'.) +func (l *DelegationLedger) SetStatus(ctx context.Context, + delegationID, status, errorDetail, resultPreview string, +) error { + if delegationID == "" || status == "" { + return errors.New("delegation ledger: missing required field") + } + + // Read current status to validate the transition. We accept the rare + // race where two updaters both observe the same prior status — Postgres + // CHECK constraint catches truly-invalid status values; our forward-only + // check is best-effort. + var current string + err := l.db.QueryRowContext(ctx, + `SELECT status FROM delegations WHERE delegation_id = $1`, + delegationID, + ).Scan(¤t) + if errors.Is(err, sql.ErrNoRows) { + // Insert was lost or wasn't called. Defensively NO-OP — the next + // agent retry will re-Insert and the next SetStatus will land. + log.Printf("delegation_ledger SetStatus(%s, %s): row missing, skipping", + delegationID, status) + return nil + } + if err != nil { + return err + } + + // Same-status replay (e.g. duplicate completion notification): no-op, + // don't bump updated_at, no error. + if current == status { + return nil + } + + // Forward-only on terminal states. + if next, ok := allowedTransitions[current]; !ok || !next[status] { + // Terminal already — refuse to revise. + return ErrInvalidTransition + } + + _, err = l.db.ExecContext(ctx, ` + UPDATE delegations + SET status = $2, + error_detail = NULLIF($3, ''), + result_preview = NULLIF($4, ''), + updated_at = now() + WHERE delegation_id = $1 + `, delegationID, status, errorDetail, truncatePreview(resultPreview)) + return err +} + +// Heartbeat stamps last_heartbeat = now() for an in-flight delegation. Used +// by the callee whenever it makes progress; PR-3's sweeper compares to +// NOW() to decide stuckness. No-op on terminal-state delegations. +// +// Best-effort: failure logs but doesn't propagate. +func (l *DelegationLedger) Heartbeat(ctx context.Context, delegationID string) { + if delegationID == "" { + return + } + _, err := l.db.ExecContext(ctx, ` + UPDATE delegations + SET last_heartbeat = now(), updated_at = now() + WHERE delegation_id = $1 + AND status NOT IN ('completed','failed','stuck') + `, delegationID) + if err != nil { + log.Printf("delegation_ledger Heartbeat(%s): %v", delegationID, err) + } +} diff --git a/workspace-server/internal/handlers/delegation_ledger_test.go b/workspace-server/internal/handlers/delegation_ledger_test.go new file mode 100644 index 00000000..78cb4b94 --- /dev/null +++ b/workspace-server/internal/handlers/delegation_ledger_test.go @@ -0,0 +1,312 @@ +package handlers + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/DATA-DOG/go-sqlmock" +) + +// delegation_ledger_test.go — unit coverage for the durable ledger writer +// (RFC #2829 PR-1). +// +// Coverage targets: +// - Insert: happy path; missing-required no-op; deadline default; +// idempotency_key NULL vs string passthrough. +// - SetStatus: queued→dispatched→in_progress→completed; same-status +// replay no-op; terminal state forward-only protection; missing row +// no-op; SQL error propagation. +// - Heartbeat: stamps now() on in-flight; no-op on terminal; missing-id +// guard. +// - truncatePreview: under-cap passthrough; over-cap truncates. + +// ---------- Insert ---------- + +func TestLedgerInsert_HappyPath(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) // uses package db.DB which sqlmock replaced + + mock.ExpectExec(`INSERT INTO delegations`). + WithArgs( + "deleg-123", + "caller-uuid", + "callee-uuid", + "task body", + sqlmock.AnyArg(), // deadline (default = now+6h) + sqlmock.AnyArg(), // idempotency_key NullString + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + l.Insert(context.Background(), InsertOpts{ + DelegationID: "deleg-123", + CallerID: "caller-uuid", + CalleeID: "callee-uuid", + TaskPreview: "task body", + }) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +func TestLedgerInsert_MissingRequired_NoSQLFired(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + // Caller-side guard: no DB call expected. + for _, opts := range []InsertOpts{ + {DelegationID: "", CallerID: "c", CalleeID: "ca", TaskPreview: "t"}, + {DelegationID: "d", CallerID: "", CalleeID: "ca", TaskPreview: "t"}, + {DelegationID: "d", CallerID: "c", CalleeID: "", TaskPreview: "t"}, + } { + l.Insert(context.Background(), opts) + } + // No ExpectExec → ExpectationsWereMet stays clean. + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unexpected sqlmock activity: %v", err) + } +} + +func TestLedgerInsert_TruncatesOversizedPreview(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + huge := strings.Repeat("x", 10_000) // > previewCap + + mock.ExpectExec(`INSERT INTO delegations`). + WithArgs( + "deleg-big", + "c", "ca", + sqlmock.AnyArg(), // truncated preview — verify length below via custom matcher + sqlmock.AnyArg(), + sqlmock.AnyArg(), + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + l.Insert(context.Background(), InsertOpts{ + DelegationID: "deleg-big", + CallerID: "c", + CalleeID: "ca", + TaskPreview: huge, + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +// ---------- truncatePreview unit ---------- + +func TestTruncatePreview_UnderCap(t *testing.T) { + in := "short" + if got := truncatePreview(in); got != in { + t.Errorf("under-cap should passthrough; got %q", got) + } +} + +func TestTruncatePreview_OverCapTruncatesAtBoundary(t *testing.T) { + in := strings.Repeat("a", previewCap+100) + got := truncatePreview(in) + if len(got) != previewCap { + t.Errorf("expected len=%d got len=%d", previewCap, len(got)) + } +} + +func TestTruncatePreview_ExactlyAtCap(t *testing.T) { + in := strings.Repeat("a", previewCap) + got := truncatePreview(in) + if got != in { + t.Errorf("at-cap should passthrough unchanged") + } +} + +// ---------- SetStatus lifecycle ---------- + +func TestLedgerSetStatus_QueuedToDispatched(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("d-1"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("queued")) + + mock.ExpectExec(`UPDATE delegations`). + WithArgs("d-1", "dispatched", "", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + if err := l.SetStatus(context.Background(), "d-1", "dispatched", "", ""); err != nil { + t.Errorf("unexpected: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestLedgerSetStatus_QueuedToInProgress_SkipsDispatched(t *testing.T) { + // Lazy callers that go queued → in_progress without ack should be allowed. + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("d-1"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("queued")) + + mock.ExpectExec(`UPDATE delegations`). + WithArgs("d-1", "in_progress", "", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + if err := l.SetStatus(context.Background(), "d-1", "in_progress", "", ""); err != nil { + t.Errorf("unexpected: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestLedgerSetStatus_InProgressToCompleted_StoresResult(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("d-1"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress")) + + mock.ExpectExec(`UPDATE delegations`). + WithArgs("d-1", "completed", "", "answer text"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + if err := l.SetStatus(context.Background(), "d-1", "completed", "", "answer text"); err != nil { + t.Errorf("unexpected: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestLedgerSetStatus_TerminalForwardOnly(t *testing.T) { + // completed → failed must be rejected: terminal states are forward-only. + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("d-done"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed")) + + err := l.SetStatus(context.Background(), "d-done", "failed", "post-hoc error", "") + if !errors.Is(err, ErrInvalidTransition) { + t.Errorf("expected ErrInvalidTransition, got %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestLedgerSetStatus_SameStatusReplay_NoUpdate(t *testing.T) { + // Re-applying the same terminal status should NOT bump updated_at — + // duplicate completion notifications shouldn't generate spurious writes. + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("d-1"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed")) + + // No ExpectExec — UPDATE must not fire. + if err := l.SetStatus(context.Background(), "d-1", "completed", "", ""); err != nil { + t.Errorf("same-status replay should be no-op, got err: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet (or unexpected UPDATE): %v", err) + } +} + +func TestLedgerSetStatus_MissingRowIsNoOp(t *testing.T) { + // A SetStatus call that arrives before Insert (lost INSERT, race, etc.) + // must NOT error — it's a transient inconsistency the next agent retry + // will heal. + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("d-missing"). + WillReturnRows(sqlmock.NewRows([]string{"status"})) // empty + + if err := l.SetStatus(context.Background(), "d-missing", "completed", "", "ok"); err != nil { + t.Errorf("missing row should be no-op; got err: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestLedgerSetStatus_RejectsEmptyDelegationID(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + if err := l.SetStatus(context.Background(), "", "completed", "", ""); err == nil { + t.Errorf("expected error for empty delegation_id") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unexpected sqlmock activity for empty input: %v", err) + } +} + +func TestLedgerSetStatus_RejectsEmptyStatus(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + if err := l.SetStatus(context.Background(), "d-1", "", "", ""); err == nil { + t.Errorf("expected error for empty status") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unexpected sqlmock activity for empty input: %v", err) + } +} + +// ---------- Heartbeat ---------- + +func TestLedgerHeartbeat_StampsInflightRow(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + mock.ExpectExec(`UPDATE delegations`). + WithArgs("d-1"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + l.Heartbeat(context.Background(), "d-1") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestLedgerHeartbeat_EmptyIDIsNoOp(t *testing.T) { + mock := setupTestDB(t) + l := NewDelegationLedger(nil) + + l.Heartbeat(context.Background(), "") // no SQL expected + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unexpected SQL on empty id: %v", err) + } +} + +// ---------- Allowed-transition table ---------- + +// TestAllowedTransitionsTableShape pins the lifecycle map: every starting +// state must have at least one outbound transition, and every terminal +// state (completed/failed/stuck) must be ABSENT from the map keys (forward- +// only enforcement). Catches accidental edits that re-add an outbound edge +// from a terminal state. +func TestAllowedTransitionsTableShape(t *testing.T) { + for _, terminal := range []string{"completed", "failed", "stuck"} { + if _, has := allowedTransitions[terminal]; has { + t.Errorf("terminal state %q must not appear as transition source", terminal) + } + } + for src, dests := range allowedTransitions { + if len(dests) == 0 { + t.Errorf("non-terminal state %q has no outbound transitions", src) + } + } +} diff --git a/workspace-server/migrations/049_delegations.down.sql b/workspace-server/migrations/049_delegations.down.sql new file mode 100644 index 00000000..03fccbc4 --- /dev/null +++ b/workspace-server/migrations/049_delegations.down.sql @@ -0,0 +1,5 @@ +DROP INDEX IF EXISTS idx_delegations_idempotency; +DROP INDEX IF EXISTS idx_delegations_callee_created; +DROP INDEX IF EXISTS idx_delegations_caller_created; +DROP INDEX IF EXISTS idx_delegations_inflight_heartbeat; +DROP TABLE IF EXISTS delegations; diff --git a/workspace-server/migrations/049_delegations.up.sql b/workspace-server/migrations/049_delegations.up.sql new file mode 100644 index 00000000..d306859a --- /dev/null +++ b/workspace-server/migrations/049_delegations.up.sql @@ -0,0 +1,99 @@ +-- RFC #2829 PR-1: durable delegations ledger. +-- +-- Today, delegation state is reconstructed by GROUPing activity_logs rows +-- by delegation_id and ORDER BY created_at DESC. Three problems: +-- +-- 1. No queryable "what is currently in flight for this workspace" — every +-- caller has to fold the event stream itself. +-- 2. No place to durably stamp last_heartbeat / deadline on a per-task +-- basis, so a stuck-task sweeper has nothing to scan. +-- 3. The 600s message/send proxy timeout (the user's 2026-05-05 home-hermes +-- iteration-14/90 incident) leaves the in-flight HTTP connection holding +-- all the state — caller restart, callee restart, proxy timeout all kill +-- the delegation. activity_logs can replay the *intent* but not the +-- *current state* without the row that says "yes this is still alive". +-- +-- This table is the durable ledger that PRs #2-#4 build on: +-- PR-2 — push result to caller's inbox + use this row to track readiness +-- PR-3 — sweeper joins on (status='in_progress', last_heartbeatthreshold; +-- operator can transition to failed via dashboard (PR-4) + +CREATE TABLE IF NOT EXISTS delegations ( + -- delegation_id chosen by the caller so callee + caller agree on the key + -- without a database round-trip. UUID, but stored as TEXT to match the + -- existing agent-side string contract (delegation.py uses str(uuid4())). + delegation_id text PRIMARY KEY, + + -- Caller is the workspace that initiated the delegation. Callee is the + -- target. Both reference workspaces, but we don't FK them — workspace + -- delete should NOT cascade-delete delegations history (audit retention). + -- Same posture as tenant_resources (PR #2343). + caller_id uuid NOT NULL, + callee_id uuid NOT NULL, + + -- Truncated at insertion so a 50KB prompt doesn't bloat the ledger; the + -- full prompt lives in activity_logs.request_body for forensic replay. + task_preview text NOT NULL, + + status text NOT NULL DEFAULT 'queued' + CHECK (status IN ('queued','dispatched','in_progress','completed','failed','stuck')), + + -- Stamped by callee heartbeats (PR-3 sweeper compares to NOW()). NULL + -- before any heartbeat — sweeper treats NULL same as last_heartbeat + -- < (created_at) for stuckness purposes. + last_heartbeat timestamptz, + + -- Hard deadline. Beyond this, sweeper marks `failed` regardless of + -- heartbeat liveness — protects against agents that heartbeat forever + -- without making progress. Default 6h matches the longest-observed legit + -- delegation in production (memory-namespace migration runs). + deadline timestamptz NOT NULL DEFAULT (now() + interval '6 hours'), + + -- Truncated result preview (full result in activity_logs response_body). + -- Set on terminal completed transition. + result_preview text, + + -- Set on failed/stuck terminal transition. + error_detail text, + + -- For PR-3 retry policy. Not used in PR-1 — declared so PR-3 doesn't + -- need a follow-on migration. + retry_count integer NOT NULL DEFAULT 0, + + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + + -- Idempotency: the agent-side delegate_task call accepts an idempotency + -- key. Two records of the same key collapse to one row. Indexed UNIQUE + -- where non-null so the natural collision becomes an INSERT … ON + -- CONFLICT no-op. + idempotency_key text +); + +-- Sweeper hot path (PR-3): list everything that's in_progress and overdue +-- for a heartbeat. Partial index on non-terminal status keeps this small. +CREATE INDEX IF NOT EXISTS idx_delegations_inflight_heartbeat + ON delegations (last_heartbeat NULLS FIRST) + WHERE status IN ('queued','dispatched','in_progress'); + +-- Operator dashboard (PR-4): per-workspace recent delegations. +CREATE INDEX IF NOT EXISTS idx_delegations_caller_created + ON delegations (caller_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_delegations_callee_created + ON delegations (callee_id, created_at DESC); + +-- Idempotency dedupe: composite (caller_id, idempotency_key) so two +-- different callers can use the same key without colliding. +CREATE UNIQUE INDEX IF NOT EXISTS idx_delegations_idempotency + ON delegations (caller_id, idempotency_key) + WHERE idempotency_key IS NOT NULL;