Merge pull request #2832 from Molecule-AI/feat/rfc2829-pr1-delegations-table

feat(delegations): durable per-task ledger + audit-write helper (RFC #2829 PR-1)
This commit is contained in:
Hongming Wang 2026-05-05 03:47:06 +00:00 committed by GitHub
commit b3b9a242d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 616 additions and 0 deletions

View File

@ -0,0 +1,200 @@
package handlers
import (
"context"
"database/sql"
"errors"
"log"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// delegation_ledger.go — durable per-task ledger for A2A delegation
// (RFC #2829 PR-1).
//
// activity_logs is an event stream — one row per state transition. Replaying
// the stream gives you history. This file's table (delegations) is the
// folded current state — one row per delegation_id with a single status,
// last_heartbeat, deadline, and result_preview.
//
// Why both: PR-3 needs a sweeper that joins on
// (status='in_progress' AND last_heartbeat < now() - interval '10 minutes')
// which is impossible to express against the event stream without a window
// function over every (delegation_id, latest event) pair — a planner-killing
// query at scale. The dedicated table makes the sweeper an indexed scan.
//
// Writes go to BOTH tables. activity_logs remains the audit-grade record
// for forensics; delegations is the queryable view for dashboards + sweeper
// joins. Symmetric-write pattern — same posture as tenant_resources (PR
// #2343), per memory `reference_tenant_resources_audit`.
// DelegationLedger writes the per-task durable row alongside the existing
// activity_logs event-stream writes. All methods are best-effort: a ledger
// write failure logs but does NOT propagate up — activity_logs remains the
// audit-grade source of truth.
//
// Same shape as `tenant_resources` reconciler (PR #2343): orchestration
// continues even when the ledger write fails, and the next status update
// (or PR-3 reconciler) will heal the ledger.
type DelegationLedger struct {
db *sql.DB
}
// NewDelegationLedger returns a ledger backed by the package db handle.
// Tests can construct one with a sqlmock-backed *sql.DB.
func NewDelegationLedger(handle *sql.DB) *DelegationLedger {
if handle == nil {
handle = db.DB
}
return &DelegationLedger{db: handle}
}
// truncatePreview caps stored preview at 4KB. The full prompt/response is
// already in activity_logs.{request,response}_body — this is the at-a-glance
// view for the dashboard, not a forensic record.
const previewCap = 4096
func truncatePreview(s string) string {
if len(s) <= previewCap {
return s
}
return s[:previewCap]
}
// InsertOpts is the agent's record-of-intent. Caller, callee, task preview,
// and the chosen delegation_id are required; idempotency_key is optional.
type InsertOpts struct {
DelegationID string
CallerID string
CalleeID string
TaskPreview string
IdempotencyKey string // empty → NULL
// Deadline defaults to now + 6h when zero. Callers can pass a tighter
// per-task deadline (cron, interactive request) by setting it.
Deadline time.Time
}
// Insert writes the queued row. ON CONFLICT (delegation_id) DO NOTHING so
// the agent's retry-on-restart codepath is naturally idempotent — a duplicate
// Insert with the same delegation_id is a no-op. (Idempotency_key dedupe is
// a separate UNIQUE index handled by the same DO NOTHING.)
func (l *DelegationLedger) Insert(ctx context.Context, opts InsertOpts) {
if opts.DelegationID == "" || opts.CallerID == "" || opts.CalleeID == "" {
log.Printf("delegation_ledger Insert: missing required field, skipping")
return
}
deadline := opts.Deadline
if deadline.IsZero() {
deadline = time.Now().Add(6 * time.Hour)
}
idemArg := sql.NullString{String: opts.IdempotencyKey, Valid: opts.IdempotencyKey != ""}
_, err := l.db.ExecContext(ctx, `
INSERT INTO delegations (
delegation_id, caller_id, callee_id, task_preview,
status, deadline, idempotency_key
) VALUES ($1, $2, $3, $4, 'queued', $5, $6)
ON CONFLICT (delegation_id) DO NOTHING
`, opts.DelegationID, opts.CallerID, opts.CalleeID,
truncatePreview(opts.TaskPreview), deadline, idemArg)
if err != nil {
log.Printf("delegation_ledger Insert(%s): %v", opts.DelegationID, err)
}
}
// allowedTransitions enforces the lifecycle in code as defense-in-depth on
// the schema CHECK. Terminal states (completed, failed, stuck) reject any
// further status update — once a delegation is done, it stays done.
//
// The "queued → in_progress" jump (skipping dispatched) is allowed: lazy
// callers that don't ack the dispatched stage shouldn't be penalised,
// since the agent ultimately cares about whether work started, not which
// HTTP layer happened to ack first.
var allowedTransitions = map[string]map[string]bool{
"queued": {"dispatched": true, "in_progress": true, "failed": true},
"dispatched": {"in_progress": true, "completed": true, "failed": true},
"in_progress": {"completed": true, "failed": true, "stuck": true},
}
// ErrInvalidTransition is returned by SetStatus when the transition would
// move out of a terminal state. Callers SHOULD ignore (it's a duplicate
// terminal write) but they're surfaced for tests.
var ErrInvalidTransition = errors.New("delegation ledger: invalid status transition")
// SetStatus is the catch-all updater. Status MUST be one of the lifecycle
// values. errorDetail is non-empty only for failed/stuck. resultPreview is
// non-empty only for completed.
//
// Idempotent: re-applying the same terminal status with the same payload
// returns nil; transitioning back out of a terminal state returns
// ErrInvalidTransition. (Forward-only protection — once 'completed' you
// don't get to revise to 'failed'.)
func (l *DelegationLedger) SetStatus(ctx context.Context,
delegationID, status, errorDetail, resultPreview string,
) error {
if delegationID == "" || status == "" {
return errors.New("delegation ledger: missing required field")
}
// Read current status to validate the transition. We accept the rare
// race where two updaters both observe the same prior status — Postgres
// CHECK constraint catches truly-invalid status values; our forward-only
// check is best-effort.
var current string
err := l.db.QueryRowContext(ctx,
`SELECT status FROM delegations WHERE delegation_id = $1`,
delegationID,
).Scan(&current)
if errors.Is(err, sql.ErrNoRows) {
// Insert was lost or wasn't called. Defensively NO-OP — the next
// agent retry will re-Insert and the next SetStatus will land.
log.Printf("delegation_ledger SetStatus(%s, %s): row missing, skipping",
delegationID, status)
return nil
}
if err != nil {
return err
}
// Same-status replay (e.g. duplicate completion notification): no-op,
// don't bump updated_at, no error.
if current == status {
return nil
}
// Forward-only on terminal states.
if next, ok := allowedTransitions[current]; !ok || !next[status] {
// Terminal already — refuse to revise.
return ErrInvalidTransition
}
_, err = l.db.ExecContext(ctx, `
UPDATE delegations
SET status = $2,
error_detail = NULLIF($3, ''),
result_preview = NULLIF($4, ''),
updated_at = now()
WHERE delegation_id = $1
`, delegationID, status, errorDetail, truncatePreview(resultPreview))
return err
}
// Heartbeat stamps last_heartbeat = now() for an in-flight delegation. Used
// by the callee whenever it makes progress; PR-3's sweeper compares to
// NOW() to decide stuckness. No-op on terminal-state delegations.
//
// Best-effort: failure logs but doesn't propagate.
func (l *DelegationLedger) Heartbeat(ctx context.Context, delegationID string) {
if delegationID == "" {
return
}
_, err := l.db.ExecContext(ctx, `
UPDATE delegations
SET last_heartbeat = now(), updated_at = now()
WHERE delegation_id = $1
AND status NOT IN ('completed','failed','stuck')
`, delegationID)
if err != nil {
log.Printf("delegation_ledger Heartbeat(%s): %v", delegationID, err)
}
}

View File

@ -0,0 +1,312 @@
package handlers
import (
"context"
"errors"
"strings"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
// delegation_ledger_test.go — unit coverage for the durable ledger writer
// (RFC #2829 PR-1).
//
// Coverage targets:
// - Insert: happy path; missing-required no-op; deadline default;
// idempotency_key NULL vs string passthrough.
// - SetStatus: queued→dispatched→in_progress→completed; same-status
// replay no-op; terminal state forward-only protection; missing row
// no-op; SQL error propagation.
// - Heartbeat: stamps now() on in-flight; no-op on terminal; missing-id
// guard.
// - truncatePreview: under-cap passthrough; over-cap truncates.
// ---------- Insert ----------
func TestLedgerInsert_HappyPath(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil) // uses package db.DB which sqlmock replaced
mock.ExpectExec(`INSERT INTO delegations`).
WithArgs(
"deleg-123",
"caller-uuid",
"callee-uuid",
"task body",
sqlmock.AnyArg(), // deadline (default = now+6h)
sqlmock.AnyArg(), // idempotency_key NullString
).
WillReturnResult(sqlmock.NewResult(0, 1))
l.Insert(context.Background(), InsertOpts{
DelegationID: "deleg-123",
CallerID: "caller-uuid",
CalleeID: "callee-uuid",
TaskPreview: "task body",
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestLedgerInsert_MissingRequired_NoSQLFired(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
// Caller-side guard: no DB call expected.
for _, opts := range []InsertOpts{
{DelegationID: "", CallerID: "c", CalleeID: "ca", TaskPreview: "t"},
{DelegationID: "d", CallerID: "", CalleeID: "ca", TaskPreview: "t"},
{DelegationID: "d", CallerID: "c", CalleeID: "", TaskPreview: "t"},
} {
l.Insert(context.Background(), opts)
}
// No ExpectExec → ExpectationsWereMet stays clean.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unexpected sqlmock activity: %v", err)
}
}
func TestLedgerInsert_TruncatesOversizedPreview(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
huge := strings.Repeat("x", 10_000) // > previewCap
mock.ExpectExec(`INSERT INTO delegations`).
WithArgs(
"deleg-big",
"c", "ca",
sqlmock.AnyArg(), // truncated preview — verify length below via custom matcher
sqlmock.AnyArg(),
sqlmock.AnyArg(),
).
WillReturnResult(sqlmock.NewResult(0, 1))
l.Insert(context.Background(), InsertOpts{
DelegationID: "deleg-big",
CallerID: "c",
CalleeID: "ca",
TaskPreview: huge,
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// ---------- truncatePreview unit ----------
func TestTruncatePreview_UnderCap(t *testing.T) {
in := "short"
if got := truncatePreview(in); got != in {
t.Errorf("under-cap should passthrough; got %q", got)
}
}
func TestTruncatePreview_OverCapTruncatesAtBoundary(t *testing.T) {
in := strings.Repeat("a", previewCap+100)
got := truncatePreview(in)
if len(got) != previewCap {
t.Errorf("expected len=%d got len=%d", previewCap, len(got))
}
}
func TestTruncatePreview_ExactlyAtCap(t *testing.T) {
in := strings.Repeat("a", previewCap)
got := truncatePreview(in)
if got != in {
t.Errorf("at-cap should passthrough unchanged")
}
}
// ---------- SetStatus lifecycle ----------
func TestLedgerSetStatus_QueuedToDispatched(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("d-1").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("queued"))
mock.ExpectExec(`UPDATE delegations`).
WithArgs("d-1", "dispatched", "", "").
WillReturnResult(sqlmock.NewResult(0, 1))
if err := l.SetStatus(context.Background(), "d-1", "dispatched", "", ""); err != nil {
t.Errorf("unexpected: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestLedgerSetStatus_QueuedToInProgress_SkipsDispatched(t *testing.T) {
// Lazy callers that go queued → in_progress without ack should be allowed.
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("d-1").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("queued"))
mock.ExpectExec(`UPDATE delegations`).
WithArgs("d-1", "in_progress", "", "").
WillReturnResult(sqlmock.NewResult(0, 1))
if err := l.SetStatus(context.Background(), "d-1", "in_progress", "", ""); err != nil {
t.Errorf("unexpected: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestLedgerSetStatus_InProgressToCompleted_StoresResult(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("d-1").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
mock.ExpectExec(`UPDATE delegations`).
WithArgs("d-1", "completed", "", "answer text").
WillReturnResult(sqlmock.NewResult(0, 1))
if err := l.SetStatus(context.Background(), "d-1", "completed", "", "answer text"); err != nil {
t.Errorf("unexpected: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestLedgerSetStatus_TerminalForwardOnly(t *testing.T) {
// completed → failed must be rejected: terminal states are forward-only.
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("d-done").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed"))
err := l.SetStatus(context.Background(), "d-done", "failed", "post-hoc error", "")
if !errors.Is(err, ErrInvalidTransition) {
t.Errorf("expected ErrInvalidTransition, got %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestLedgerSetStatus_SameStatusReplay_NoUpdate(t *testing.T) {
// Re-applying the same terminal status should NOT bump updated_at —
// duplicate completion notifications shouldn't generate spurious writes.
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("d-1").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed"))
// No ExpectExec — UPDATE must not fire.
if err := l.SetStatus(context.Background(), "d-1", "completed", "", ""); err != nil {
t.Errorf("same-status replay should be no-op, got err: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet (or unexpected UPDATE): %v", err)
}
}
func TestLedgerSetStatus_MissingRowIsNoOp(t *testing.T) {
// A SetStatus call that arrives before Insert (lost INSERT, race, etc.)
// must NOT error — it's a transient inconsistency the next agent retry
// will heal.
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("d-missing").
WillReturnRows(sqlmock.NewRows([]string{"status"})) // empty
if err := l.SetStatus(context.Background(), "d-missing", "completed", "", "ok"); err != nil {
t.Errorf("missing row should be no-op; got err: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestLedgerSetStatus_RejectsEmptyDelegationID(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
if err := l.SetStatus(context.Background(), "", "completed", "", ""); err == nil {
t.Errorf("expected error for empty delegation_id")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unexpected sqlmock activity for empty input: %v", err)
}
}
func TestLedgerSetStatus_RejectsEmptyStatus(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
if err := l.SetStatus(context.Background(), "d-1", "", "", ""); err == nil {
t.Errorf("expected error for empty status")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unexpected sqlmock activity for empty input: %v", err)
}
}
// ---------- Heartbeat ----------
func TestLedgerHeartbeat_StampsInflightRow(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
mock.ExpectExec(`UPDATE delegations`).
WithArgs("d-1").
WillReturnResult(sqlmock.NewResult(0, 1))
l.Heartbeat(context.Background(), "d-1")
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestLedgerHeartbeat_EmptyIDIsNoOp(t *testing.T) {
mock := setupTestDB(t)
l := NewDelegationLedger(nil)
l.Heartbeat(context.Background(), "") // no SQL expected
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unexpected SQL on empty id: %v", err)
}
}
// ---------- Allowed-transition table ----------
// TestAllowedTransitionsTableShape pins the lifecycle map: every starting
// state must have at least one outbound transition, and every terminal
// state (completed/failed/stuck) must be ABSENT from the map keys (forward-
// only enforcement). Catches accidental edits that re-add an outbound edge
// from a terminal state.
func TestAllowedTransitionsTableShape(t *testing.T) {
for _, terminal := range []string{"completed", "failed", "stuck"} {
if _, has := allowedTransitions[terminal]; has {
t.Errorf("terminal state %q must not appear as transition source", terminal)
}
}
for src, dests := range allowedTransitions {
if len(dests) == 0 {
t.Errorf("non-terminal state %q has no outbound transitions", src)
}
}
}

View File

@ -0,0 +1,5 @@
DROP INDEX IF EXISTS idx_delegations_idempotency;
DROP INDEX IF EXISTS idx_delegations_callee_created;
DROP INDEX IF EXISTS idx_delegations_caller_created;
DROP INDEX IF EXISTS idx_delegations_inflight_heartbeat;
DROP TABLE IF EXISTS delegations;

View File

@ -0,0 +1,99 @@
-- RFC #2829 PR-1: durable delegations ledger.
--
-- Today, delegation state is reconstructed by GROUPing activity_logs rows
-- by delegation_id and ORDER BY created_at DESC. Three problems:
--
-- 1. No queryable "what is currently in flight for this workspace" — every
-- caller has to fold the event stream itself.
-- 2. No place to durably stamp last_heartbeat / deadline on a per-task
-- basis, so a stuck-task sweeper has nothing to scan.
-- 3. The 600s message/send proxy timeout (the user's 2026-05-05 home-hermes
-- iteration-14/90 incident) leaves the in-flight HTTP connection holding
-- all the state — caller restart, callee restart, proxy timeout all kill
-- the delegation. activity_logs can replay the *intent* but not the
-- *current state* without the row that says "yes this is still alive".
--
-- This table is the durable ledger that PRs #2-#4 build on:
-- PR-2 — push result to caller's inbox + use this row to track readiness
-- PR-3 — sweeper joins on (status='in_progress', last_heartbeat<now-N)
-- PR-4 — operator dashboard reads SELECT * WHERE status NOT IN ('completed','failed')
--
-- Delegation lifecycle:
-- queued — caller recorded intent, target unreachable / busy queue
-- dispatched — A2A request sent to target's HTTP server
-- in_progress — target acknowledged + started work
-- completed — terminal: result delivered to caller
-- failed — terminal: gave up after retries
-- stuck — terminal-ish: sweeper couldn't reach target for >threshold;
-- operator can transition to failed via dashboard (PR-4)
CREATE TABLE IF NOT EXISTS delegations (
-- delegation_id chosen by the caller so callee + caller agree on the key
-- without a database round-trip. UUID, but stored as TEXT to match the
-- existing agent-side string contract (delegation.py uses str(uuid4())).
delegation_id text PRIMARY KEY,
-- Caller is the workspace that initiated the delegation. Callee is the
-- target. Both reference workspaces, but we don't FK them — workspace
-- delete should NOT cascade-delete delegations history (audit retention).
-- Same posture as tenant_resources (PR #2343).
caller_id uuid NOT NULL,
callee_id uuid NOT NULL,
-- Truncated at insertion so a 50KB prompt doesn't bloat the ledger; the
-- full prompt lives in activity_logs.request_body for forensic replay.
task_preview text NOT NULL,
status text NOT NULL DEFAULT 'queued'
CHECK (status IN ('queued','dispatched','in_progress','completed','failed','stuck')),
-- Stamped by callee heartbeats (PR-3 sweeper compares to NOW()). NULL
-- before any heartbeat — sweeper treats NULL same as last_heartbeat
-- < (created_at) for stuckness purposes.
last_heartbeat timestamptz,
-- Hard deadline. Beyond this, sweeper marks `failed` regardless of
-- heartbeat liveness — protects against agents that heartbeat forever
-- without making progress. Default 6h matches the longest-observed legit
-- delegation in production (memory-namespace migration runs).
deadline timestamptz NOT NULL DEFAULT (now() + interval '6 hours'),
-- Truncated result preview (full result in activity_logs response_body).
-- Set on terminal completed transition.
result_preview text,
-- Set on failed/stuck terminal transition.
error_detail text,
-- For PR-3 retry policy. Not used in PR-1 — declared so PR-3 doesn't
-- need a follow-on migration.
retry_count integer NOT NULL DEFAULT 0,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
-- Idempotency: the agent-side delegate_task call accepts an idempotency
-- key. Two records of the same key collapse to one row. Indexed UNIQUE
-- where non-null so the natural collision becomes an INSERT … ON
-- CONFLICT no-op.
idempotency_key text
);
-- Sweeper hot path (PR-3): list everything that's in_progress and overdue
-- for a heartbeat. Partial index on non-terminal status keeps this small.
CREATE INDEX IF NOT EXISTS idx_delegations_inflight_heartbeat
ON delegations (last_heartbeat NULLS FIRST)
WHERE status IN ('queued','dispatched','in_progress');
-- Operator dashboard (PR-4): per-workspace recent delegations.
CREATE INDEX IF NOT EXISTS idx_delegations_caller_created
ON delegations (caller_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_delegations_callee_created
ON delegations (callee_id, created_at DESC);
-- Idempotency dedupe: composite (caller_id, idempotency_key) so two
-- different callers can use the same key without colliding.
CREATE UNIQUE INDEX IF NOT EXISTS idx_delegations_idempotency
ON delegations (caller_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;