Merge pull request #2854 from Molecule-AI/feat/rfc2829-wire-ledger-writes

feat(delegations): wire ledger Insert+SetStatus from production paths (RFC #2829 #318)
This commit is contained in:
Hongming Wang 2026-05-05 09:29:19 +00:00 committed by GitHub
commit ab164c1967
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 229 additions and 1 deletions

View File

@ -269,6 +269,9 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
if err == nil {
// RFC #2829 #318 — mirror to the durable delegations ledger
// (gated by DELEGATION_LEDGER_WRITE; default off → no-op).
recordLedgerInsert(ctx, sourceID, body.TargetID, delegationID, body.Task, body.IdempotencyKey)
return insertOK
}
// A unique-constraint hit means a concurrent request just took the
@ -409,6 +412,10 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
}
h.updateDelegationStatus(sourceID, delegationID, "completed", "")
// RFC #2829 #318 — mirror result_preview to the durable ledger
// (updateDelegationStatus handles the status flip; ledger gets the
// preview field set on the same row).
recordLedgerStatus(ctx, delegationID, "completed", "", responseText)
h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_COMPLETE", sourceID, map[string]interface{}{
"delegation_id": delegationID,
"target_id": targetID,
@ -420,7 +427,8 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
// updateDelegationStatus updates the status of a delegation record in activity_logs.
func (h *DelegationHandler) updateDelegationStatus(workspaceID, delegationID, status, errorDetail string) {
if _, err := db.DB.ExecContext(context.Background(), `
ctx := context.Background()
if _, err := db.DB.ExecContext(ctx, `
UPDATE activity_logs
SET status = $1, error_detail = CASE WHEN $2 = '' THEN error_detail ELSE $2 END
WHERE workspace_id = $3
@ -429,6 +437,14 @@ func (h *DelegationHandler) updateDelegationStatus(workspaceID, delegationID, st
`, status, errorDetail, workspaceID, delegationID); err != nil {
log.Printf("Delegation %s: status update failed: %v", delegationID, err)
}
// RFC #2829 #318 — mirror status transition to the durable ledger
// (gated). Note: the ledger uses different vocabulary for "pending"
// (its initial state is `queued`); map "received" / unknown values
// the ledger doesn't accept by skipping them rather than failing.
switch status {
case "queued", "dispatched", "in_progress", "completed", "failed", "stuck":
recordLedgerStatus(ctx, delegationID, status, errorDetail, "")
}
}
// Record handles POST /workspaces/:id/delegations/record — the agent-initiated
@ -474,6 +490,15 @@ func (h *DelegationHandler) Record(c *gin.Context) {
return
}
// RFC #2829 #318 — mirror to durable ledger (gated). Record always
// reflects an A2A request the agent already fired itself, so the
// initial activity_logs status is 'dispatched' — but the ledger's
// CHECK constraint only accepts 'queued' as the initial state via
// Insert. Insert as queued first; the very next SetStatus(...,
// dispatched) below promotes it to dispatched on the same row.
recordLedgerInsert(ctx, sourceID, body.TargetID, body.DelegationID, body.Task, "")
recordLedgerStatus(ctx, body.DelegationID, "dispatched", "", "")
h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_SENT", sourceID, map[string]interface{}{
"delegation_id": body.DelegationID,
"target_id": body.TargetID,

View File

@ -0,0 +1,69 @@
package handlers
import (
"context"
"os"
)
// delegation_ledger_writes.go — RFC #2829 follow-up (#318): wire
// DelegationLedger Insert + SetStatus calls into the existing
// activity_logs-driven flow without touching the legacy code path.
//
// Why a flag (not always-on)
// --------------------------
// The legacy flow writes everything to activity_logs and a tight
// strict-sqlmock test surface (~30 tests) pins exactly which SQL
// statements fire per handler invocation. Adding ledger writes
// always-on would force updating each of those tests in this PR.
// Gating behind DELEGATION_LEDGER_WRITE=1 lets ledger-driven
// behavior land independently of the test refactor — operators
// can flip it on in staging to populate the `delegations` table
// (and thus give the PR-3 sweeper + PR-4 dashboard data to work
// with) without coupling the rollout to a churn-y test diff.
//
// Default off → byte-identical to pre-#318 behavior. Flip after
// staging burn-in once the agent-side cutover (PR-5) has proven
// the round-trip end-to-end.
func ledgerWritesEnabled() bool {
return os.Getenv("DELEGATION_LEDGER_WRITE") == "1"
}
// recordLedgerInsert is the gated wrapper around DelegationLedger.Insert.
// All callers in delegation.go go through here so flipping the flag
// requires no further code changes — the gate is one function.
//
// taskPreview is truncated by the ledger to `previewCap` bytes; pass
// the full task text without pre-truncating.
func recordLedgerInsert(ctx context.Context, callerID, calleeID, delegationID, taskPreview, idemKey string) {
if !ledgerWritesEnabled() {
return
}
NewDelegationLedger(nil).Insert(ctx, InsertOpts{
DelegationID: delegationID,
CallerID: callerID,
CalleeID: calleeID,
TaskPreview: taskPreview,
IdempotencyKey: idemKey,
})
}
// recordLedgerStatus is the gated wrapper around DelegationLedger.SetStatus.
// status MUST be one of the lifecycle values the ledger accepts
// (queued|dispatched|in_progress|completed|failed|stuck). errorDetail is
// non-empty for failed/stuck; resultPreview is non-empty for completed.
//
// Errors are logged inside the ledger and not propagated — the legacy
// activity_logs path remains authoritative; ledger is best-effort
// (matches the tenant_resources audit posture, memory ref:
// `reference_tenant_resources_audit`).
func recordLedgerStatus(ctx context.Context, delegationID, status, errorDetail, resultPreview string) {
if !ledgerWritesEnabled() {
return
}
// SetStatus returns an error (e.g. ErrInvalidTransition for forward-
// only protection on terminal states) but we don't propagate it —
// the legacy path's status writes are still authoritative for the
// dashboard, and a ledger replay error is not a delegation failure.
_ = NewDelegationLedger(nil).SetStatus(ctx, delegationID, status, errorDetail, resultPreview)
}

View File

@ -0,0 +1,134 @@
package handlers
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
// delegation_ledger_writes_test.go — RFC #2829 #318 wiring tests.
//
// Scope:
// - flag off (default) → no ledger SQL fires
// - flag on, recordLedgerInsert → INSERT INTO delegations
// - flag on, recordLedgerStatus on lifecycle transitions
// - flag on, recordLedgerStatus on terminal-state replay → no UPDATE
//
// We test the gate functions in isolation rather than re-asserting the
// full handler test surface (Delegate/Record/UpdateStatus) — those are
// already pinned by delegation_test.go (30 tests) and exercising the
// flag-on path through them would force adding ~20 ExpectExec stanzas
// to existing tests. That refactor lands separately when we're ready
// to flip the flag default to on.
func TestLedgerWritesEnabled_FlagOff(t *testing.T) {
t.Setenv("DELEGATION_LEDGER_WRITE", "")
if ledgerWritesEnabled() {
t.Errorf("flag off must report disabled")
}
}
func TestLedgerWritesEnabled_FlagOn(t *testing.T) {
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
if !ledgerWritesEnabled() {
t.Errorf("flag on must report enabled")
}
}
func TestLedgerWritesEnabled_RejectsLooseTruthyValues(t *testing.T) {
// Only "1" is the on signal — "true", "yes", anything else is
// off. This matches the existing PR-2 + PR-5 flag conventions
// (DELEGATION_RESULT_INBOX_PUSH, DELEGATION_SYNC_VIA_INBOX).
for _, v := range []string{"true", "yes", "TRUE", "0", "on"} {
t.Run(v, func(t *testing.T) {
t.Setenv("DELEGATION_LEDGER_WRITE", v)
if ledgerWritesEnabled() {
t.Errorf("value %q must NOT enable the flag (only \"1\" does)", v)
}
})
}
}
func TestRecordLedgerInsert_FlagOff_NoSQL(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "")
recordLedgerInsert(context.Background(),
"caller", "callee", "deleg-1", "task body", "")
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("flag off must fire no SQL: %v", err)
}
}
func TestRecordLedgerInsert_FlagOn_FiresInsert(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
mock.ExpectExec(`INSERT INTO delegations`).
WithArgs(
"deleg-1", "caller", "callee", "task body",
sqlmock.AnyArg(), // deadline
sqlmock.AnyArg(), // idempotency_key NullString
).
WillReturnResult(sqlmock.NewResult(0, 1))
recordLedgerInsert(context.Background(),
"caller", "callee", "deleg-1", "task body", "")
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestRecordLedgerStatus_FlagOff_NoSQL(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "")
recordLedgerStatus(context.Background(), "deleg-1", "dispatched", "", "")
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("flag off must fire no SQL: %v", err)
}
}
func TestRecordLedgerStatus_FlagOn_FiresUpdate(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// SetStatus reads current status first (forward-only protection).
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("deleg-1").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("queued"))
// Then UPDATEs.
mock.ExpectExec(`UPDATE delegations`).
WithArgs("deleg-1", "dispatched", "", "").
WillReturnResult(sqlmock.NewResult(0, 1))
recordLedgerStatus(context.Background(), "deleg-1", "dispatched", "", "")
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestRecordLedgerStatus_FlagOn_TerminalReplaySwallowsErr(t *testing.T) {
// SetStatus returns ErrInvalidTransition when called on a terminal
// row. recordLedgerStatus must swallow that — the legacy path is
// authoritative; ledger replay error is not a delegation failure.
mock := setupTestDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Row already completed — SELECT returns "completed".
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("deleg-1").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed"))
// No UPDATE expected — terminal forward-only protection blocks it.
// Should NOT panic / propagate; mock's ExpectationsWereMet is the
// behavior assertion — if SetStatus tried to UPDATE, the unset
// expectation would catch it.
recordLedgerStatus(context.Background(), "deleg-1", "failed", "post-hoc", "")
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("terminal-replay must not fire UPDATE: %v", err)
}
}