From 44bb35a926458b40f1aa3901da86a0403544c0c9 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 02:26:06 -0700 Subject: [PATCH] feat(delegations): wire ledger Insert+SetStatus from production code paths (RFC #2829 #318) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-1 shipped the `delegations` table + `DelegationLedger` helper. PR-3 wired the sweeper. PR-4 wired the dashboard. But no PR ever wired `ledger.Insert` from a production code path — the table stayed empty, the sweeper had nothing to sweep, the dashboard had nothing to show. This PR closes that gap. Behind feature flag `DELEGATION_LEDGER_WRITE=1` (default off), the legacy activity_logs writes are mirrored to the durable ledger: - insertDelegationRow → ledger.Insert (queued) - updateDelegationStatus → ledger.SetStatus on every status transition - executeDelegation completion path → ledger.SetStatus(completed, result_preview) for the result preview that activity_logs already stores in response_body - Record handler → ledger.Insert + ledger.SetStatus(dispatched) so agent-initiated delegations land in the same table ## Why a flag The legacy flow has ~30 strict-sqlmock tests pinning exactly which SQL statements fire per handler. Adding ledger writes always-on would force adding ExpectExec stanzas to each. Flag-off keeps all 30 green without churn; flag-on lets operators populate the table in staging to feed the sweeper + dashboard once the agent-side cutover (RFC #2829 PR-5) has proven the round-trip end-to-end. Default off → byte-identical to pre-#318 behavior. ## Status vocabulary mapping activity_logs uses a freer status vocabulary than the ledger's CHECK constraint allows. updateDelegationStatus is called with values like "received" that the ledger doesn't accept; the wiring filters via a switch to only forward known-good values, skipping anything else. Record's first activity_logs row is `dispatched` but the ledger's Insert path requires `queued` as initial state. Insert as queued first; the very next SetStatus(..., dispatched) promotes it on the same row. ## Coverage 8 wiring tests (delegation_ledger_writes_test.go): - flag off → no SQL fired (rollout safety contract) - flag on → INSERT + UPDATE fire as expected - flag rejects loose truthy values (true/yes/0/on/TRUE) — only "1" is the on signal, matching PR-2 + PR-5 conventions - terminal-state replay swallows ErrInvalidTransition (legacy is authoritative; ledger replay error is not a delegation failure) All 30 existing delegation_test.go tests still pass — flag default off keeps the strict-sqlmock surface unchanged. Refs RFC #2829. --- .../internal/handlers/delegation.go | 27 +++- .../handlers/delegation_ledger_writes.go | 69 +++++++++ .../handlers/delegation_ledger_writes_test.go | 134 ++++++++++++++++++ 3 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 workspace-server/internal/handlers/delegation_ledger_writes.go create mode 100644 workspace-server/internal/handlers/delegation_ledger_writes_test.go diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index e1b5e1a8..218df0b2 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -269,6 +269,9 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6) `, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg) if err == nil { + // RFC #2829 #318 — mirror to the durable delegations ledger + // (gated by DELEGATION_LEDGER_WRITE; default off → no-op). + recordLedgerInsert(ctx, sourceID, body.TargetID, delegationID, body.Task, body.IdempotencyKey) return insertOK } // A unique-constraint hit means a concurrent request just took the @@ -409,6 +412,10 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s } h.updateDelegationStatus(sourceID, delegationID, "completed", "") + // RFC #2829 #318 — mirror result_preview to the durable ledger + // (updateDelegationStatus handles the status flip; ledger gets the + // preview field set on the same row). + recordLedgerStatus(ctx, delegationID, "completed", "", responseText) h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_COMPLETE", sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, @@ -420,7 +427,8 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s // updateDelegationStatus updates the status of a delegation record in activity_logs. func (h *DelegationHandler) updateDelegationStatus(workspaceID, delegationID, status, errorDetail string) { - if _, err := db.DB.ExecContext(context.Background(), ` + ctx := context.Background() + if _, err := db.DB.ExecContext(ctx, ` UPDATE activity_logs SET status = $1, error_detail = CASE WHEN $2 = '' THEN error_detail ELSE $2 END WHERE workspace_id = $3 @@ -429,6 +437,14 @@ func (h *DelegationHandler) updateDelegationStatus(workspaceID, delegationID, st `, status, errorDetail, workspaceID, delegationID); err != nil { log.Printf("Delegation %s: status update failed: %v", delegationID, err) } + // RFC #2829 #318 — mirror status transition to the durable ledger + // (gated). Note: the ledger uses different vocabulary for "pending" + // (its initial state is `queued`); map "received" / unknown values + // the ledger doesn't accept by skipping them rather than failing. + switch status { + case "queued", "dispatched", "in_progress", "completed", "failed", "stuck": + recordLedgerStatus(ctx, delegationID, status, errorDetail, "") + } } // Record handles POST /workspaces/:id/delegations/record — the agent-initiated @@ -474,6 +490,15 @@ func (h *DelegationHandler) Record(c *gin.Context) { return } + // RFC #2829 #318 — mirror to durable ledger (gated). Record always + // reflects an A2A request the agent already fired itself, so the + // initial activity_logs status is 'dispatched' — but the ledger's + // CHECK constraint only accepts 'queued' as the initial state via + // Insert. Insert as queued first; the very next SetStatus(..., + // dispatched) below promotes it to dispatched on the same row. + recordLedgerInsert(ctx, sourceID, body.TargetID, body.DelegationID, body.Task, "") + recordLedgerStatus(ctx, body.DelegationID, "dispatched", "", "") + h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_SENT", sourceID, map[string]interface{}{ "delegation_id": body.DelegationID, "target_id": body.TargetID, diff --git a/workspace-server/internal/handlers/delegation_ledger_writes.go b/workspace-server/internal/handlers/delegation_ledger_writes.go new file mode 100644 index 00000000..7beb3f1c --- /dev/null +++ b/workspace-server/internal/handlers/delegation_ledger_writes.go @@ -0,0 +1,69 @@ +package handlers + +import ( + "context" + "os" +) + +// delegation_ledger_writes.go — RFC #2829 follow-up (#318): wire +// DelegationLedger Insert + SetStatus calls into the existing +// activity_logs-driven flow without touching the legacy code path. +// +// Why a flag (not always-on) +// -------------------------- +// The legacy flow writes everything to activity_logs and a tight +// strict-sqlmock test surface (~30 tests) pins exactly which SQL +// statements fire per handler invocation. Adding ledger writes +// always-on would force updating each of those tests in this PR. +// Gating behind DELEGATION_LEDGER_WRITE=1 lets ledger-driven +// behavior land independently of the test refactor — operators +// can flip it on in staging to populate the `delegations` table +// (and thus give the PR-3 sweeper + PR-4 dashboard data to work +// with) without coupling the rollout to a churn-y test diff. +// +// Default off → byte-identical to pre-#318 behavior. Flip after +// staging burn-in once the agent-side cutover (PR-5) has proven +// the round-trip end-to-end. + +func ledgerWritesEnabled() bool { + return os.Getenv("DELEGATION_LEDGER_WRITE") == "1" +} + +// recordLedgerInsert is the gated wrapper around DelegationLedger.Insert. +// All callers in delegation.go go through here so flipping the flag +// requires no further code changes — the gate is one function. +// +// taskPreview is truncated by the ledger to `previewCap` bytes; pass +// the full task text without pre-truncating. +func recordLedgerInsert(ctx context.Context, callerID, calleeID, delegationID, taskPreview, idemKey string) { + if !ledgerWritesEnabled() { + return + } + NewDelegationLedger(nil).Insert(ctx, InsertOpts{ + DelegationID: delegationID, + CallerID: callerID, + CalleeID: calleeID, + TaskPreview: taskPreview, + IdempotencyKey: idemKey, + }) +} + +// recordLedgerStatus is the gated wrapper around DelegationLedger.SetStatus. +// status MUST be one of the lifecycle values the ledger accepts +// (queued|dispatched|in_progress|completed|failed|stuck). errorDetail is +// non-empty for failed/stuck; resultPreview is non-empty for completed. +// +// Errors are logged inside the ledger and not propagated — the legacy +// activity_logs path remains authoritative; ledger is best-effort +// (matches the tenant_resources audit posture, memory ref: +// `reference_tenant_resources_audit`). +func recordLedgerStatus(ctx context.Context, delegationID, status, errorDetail, resultPreview string) { + if !ledgerWritesEnabled() { + return + } + // SetStatus returns an error (e.g. ErrInvalidTransition for forward- + // only protection on terminal states) but we don't propagate it — + // the legacy path's status writes are still authoritative for the + // dashboard, and a ledger replay error is not a delegation failure. + _ = NewDelegationLedger(nil).SetStatus(ctx, delegationID, status, errorDetail, resultPreview) +} diff --git a/workspace-server/internal/handlers/delegation_ledger_writes_test.go b/workspace-server/internal/handlers/delegation_ledger_writes_test.go new file mode 100644 index 00000000..156e4e44 --- /dev/null +++ b/workspace-server/internal/handlers/delegation_ledger_writes_test.go @@ -0,0 +1,134 @@ +package handlers + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" +) + +// delegation_ledger_writes_test.go — RFC #2829 #318 wiring tests. +// +// Scope: +// - flag off (default) → no ledger SQL fires +// - flag on, recordLedgerInsert → INSERT INTO delegations +// - flag on, recordLedgerStatus on lifecycle transitions +// - flag on, recordLedgerStatus on terminal-state replay → no UPDATE +// +// We test the gate functions in isolation rather than re-asserting the +// full handler test surface (Delegate/Record/UpdateStatus) — those are +// already pinned by delegation_test.go (30 tests) and exercising the +// flag-on path through them would force adding ~20 ExpectExec stanzas +// to existing tests. That refactor lands separately when we're ready +// to flip the flag default to on. + +func TestLedgerWritesEnabled_FlagOff(t *testing.T) { + t.Setenv("DELEGATION_LEDGER_WRITE", "") + if ledgerWritesEnabled() { + t.Errorf("flag off must report disabled") + } +} + +func TestLedgerWritesEnabled_FlagOn(t *testing.T) { + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + if !ledgerWritesEnabled() { + t.Errorf("flag on must report enabled") + } +} + +func TestLedgerWritesEnabled_RejectsLooseTruthyValues(t *testing.T) { + // Only "1" is the on signal — "true", "yes", anything else is + // off. This matches the existing PR-2 + PR-5 flag conventions + // (DELEGATION_RESULT_INBOX_PUSH, DELEGATION_SYNC_VIA_INBOX). + for _, v := range []string{"true", "yes", "TRUE", "0", "on"} { + t.Run(v, func(t *testing.T) { + t.Setenv("DELEGATION_LEDGER_WRITE", v) + if ledgerWritesEnabled() { + t.Errorf("value %q must NOT enable the flag (only \"1\" does)", v) + } + }) + } +} + +func TestRecordLedgerInsert_FlagOff_NoSQL(t *testing.T) { + mock := setupTestDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "") + + recordLedgerInsert(context.Background(), + "caller", "callee", "deleg-1", "task body", "") + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("flag off must fire no SQL: %v", err) + } +} + +func TestRecordLedgerInsert_FlagOn_FiresInsert(t *testing.T) { + mock := setupTestDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + mock.ExpectExec(`INSERT INTO delegations`). + WithArgs( + "deleg-1", "caller", "callee", "task body", + sqlmock.AnyArg(), // deadline + sqlmock.AnyArg(), // idempotency_key NullString + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + recordLedgerInsert(context.Background(), + "caller", "callee", "deleg-1", "task body", "") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestRecordLedgerStatus_FlagOff_NoSQL(t *testing.T) { + mock := setupTestDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "") + + recordLedgerStatus(context.Background(), "deleg-1", "dispatched", "", "") + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("flag off must fire no SQL: %v", err) + } +} + +func TestRecordLedgerStatus_FlagOn_FiresUpdate(t *testing.T) { + mock := setupTestDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + // SetStatus reads current status first (forward-only protection). + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("deleg-1"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("queued")) + // Then UPDATEs. + mock.ExpectExec(`UPDATE delegations`). + WithArgs("deleg-1", "dispatched", "", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + recordLedgerStatus(context.Background(), "deleg-1", "dispatched", "", "") + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestRecordLedgerStatus_FlagOn_TerminalReplaySwallowsErr(t *testing.T) { + // SetStatus returns ErrInvalidTransition when called on a terminal + // row. recordLedgerStatus must swallow that — the legacy path is + // authoritative; ledger replay error is not a delegation failure. + mock := setupTestDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + // Row already completed — SELECT returns "completed". + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("deleg-1"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed")) + // No UPDATE expected — terminal forward-only protection blocks it. + + // Should NOT panic / propagate; mock's ExpectationsWereMet is the + // behavior assertion — if SetStatus tried to UPDATE, the unset + // expectation would catch it. + recordLedgerStatus(context.Background(), "deleg-1", "failed", "post-hoc", "") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("terminal-replay must not fire UPDATE: %v", err) + } +}