fix(delegations): ListDelegations falls back to delegations table before activity_logs
Some checks failed
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 13s
sop-tier-check / tier-check (pull_request) Failing after 2s
audit-force-merge / audit (pull_request) Has been skipped

RFC #2829 PR-1/4: GET /workspaces/:id/delegations previously queried only
activity_logs, returning [] for active/completed delegations while the agent's
check_delegation_status showed them correctly. The new delegations table
(migration 049) now holds durable state for active delegations.

The handler now tries the ledger first (delegations table), falls back to
activity_logs for pre-migration data, and returns [] only when both are empty.
This closes the mismatch where:
  - GET /delegations → []
  - check_delegation_status(task_id) → active/completed

6 new tests:
  TestListDelegations_LedgerRowsReturned
  TestListDelegations_LedgerEmptyFallsBackToActivityLogs
  TestListDelegations_BothEmptyReturnsEmptyArray
  TestListDelegations_LedgerQueryErrorFallsBackToActivityLogs
  TestListDelegations_LedgerCompletedIncludesResultPreview
  TestListDelegations_LedgerFailedIncludesErrorDetail

Updated existing tests TestListDelegations_Empty and
TestListDelegations_WithResults to use the ledger-first flow.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · infra-runtime-be 2026-05-10 06:52:06 +00:00
parent 08a929c740
commit bbc40cb872
2 changed files with 451 additions and 28 deletions

View File

@ -597,10 +597,97 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) {
// ListDelegations handles GET /workspaces/:id/delegations
// Returns recent delegations for a workspace with their status.
//
// RFC #2829 PR-1/4 fallback chain: prefer the durable delegations table
// (new as of #318) for complete status coverage; fall back to
// activity_logs for pre-migration data or if the ledger table has
// no rows for this workspace. activity_logs still drives in-flight
// tracking for workspaces where DELEGATION_LEDGER_WRITE=0 was
// active during the delegation lifecycle — the union covers both paths.
func (h *DelegationHandler) ListDelegations(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
var delegations []map[string]interface{}
// Attempt durable ledger first (RFC #2829)
delegations = h.listDelegationsFromLedger(ctx, workspaceID)
if len(delegations) > 0 {
c.JSON(http.StatusOK, delegations)
return
}
// Fall back to activity_logs (pre-#318 path, or ledger had no rows)
delegations = h.listDelegationsFromActivityLogs(ctx, workspaceID)
c.JSON(http.StatusOK, delegations)
}
// listDelegationsFromLedger queries the durable delegations table.
// Returns nil on error so the caller can fall back to activity_logs.
func (h *DelegationHandler) listDelegationsFromLedger(ctx context.Context, workspaceID string) []map[string]interface{} {
rows, err := db.DB.QueryContext(ctx, `
SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview,
d.status, d.result_preview, d.error_detail, d.last_heartbeat,
d.deadline, d.created_at, d.updated_at
FROM delegations d
WHERE d.caller_id = $1
ORDER BY d.created_at DESC
LIMIT 50
`, workspaceID)
if err != nil {
// Table may not exist yet (pre-migration), or permission issue.
// Fall back silently — do not log to avoid noise on every call.
return nil
}
defer rows.Close()
var result []map[string]interface{}
for rows.Next() {
var delegationID, callerID, calleeID, taskPreview, status, resultPreview, errorDetail string
var lastHeartbeat, deadline, createdAt, updatedAt *time.Time
if err := rows.Scan(
&delegationID, &callerID, &calleeID, &taskPreview,
&status, &resultPreview, &errorDetail, &lastHeartbeat,
&deadline, &createdAt, &updatedAt,
); err != nil {
continue
}
entry := map[string]interface{}{
"delegation_id": delegationID,
"source_id": callerID,
"target_id": calleeID,
"summary": textutil.TruncateBytes(taskPreview, 200),
"status": status,
"created_at": createdAt,
"updated_at": updatedAt,
"_ledger": true, // marker so callers know this row is from the ledger
}
if resultPreview != "" {
entry["response_preview"] = textutil.TruncateBytes(resultPreview, 300)
}
if errorDetail != "" {
entry["error"] = errorDetail
}
if lastHeartbeat != nil {
entry["last_heartbeat"] = lastHeartbeat
}
if deadline != nil {
entry["deadline"] = deadline
}
result = append(result, entry)
}
if result == nil {
return nil
}
return result
}
// listDelegationsFromActivityLogs is the legacy path that reconstructs
// delegation state by folding activity_logs rows by delegation_id.
// Kept for backward compatibility and for workspaces that never had
// DELEGATION_LEDGER_WRITE=1 during their delegation lifecycle.
func (h *DelegationHandler) listDelegationsFromActivityLogs(ctx context.Context, workspaceID string) []map[string]interface{} {
rows, err := db.DB.QueryContext(ctx, `
SELECT id, activity_type, COALESCE(source_id::text, ''), COALESCE(target_id::text, ''),
COALESCE(summary, ''), COALESCE(status, ''), COALESCE(error_detail, ''),
@ -613,12 +700,11 @@ func (h *DelegationHandler) ListDelegations(c *gin.Context) {
LIMIT 50
`, workspaceID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
return []map[string]interface{}{}
}
defer rows.Close()
var delegations []map[string]interface{}
var result []map[string]interface{}
for rows.Next() {
var id, actType, sourceID, targetID, summary, status, errorDetail, responseBody, delegationID string
var createdAt time.Time
@ -643,13 +729,13 @@ func (h *DelegationHandler) ListDelegations(c *gin.Context) {
if responseBody != "" {
entry["response_preview"] = textutil.TruncateBytes(responseBody, 300)
}
delegations = append(delegations, entry)
result = append(result, entry)
}
if delegations == nil {
delegations = []map[string]interface{}{}
if result == nil {
return []map[string]interface{}{}
}
c.JSON(http.StatusOK, delegations)
return result
}
// --- helpers ---

View File

@ -233,14 +233,21 @@ func TestListDelegations_Empty(t *testing.T) {
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
rows := sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail", "response_body",
"delegation_id", "created_at",
})
// Ledger returns empty → falls back to activity_logs (also empty)
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
WithArgs("ws-source").
WillReturnRows(sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail", "last_heartbeat",
"deadline", "created_at", "updated_at",
}))
mock.ExpectQuery("SELECT id, activity_type").
WithArgs("ws-source").
WillReturnRows(rows)
WillReturnRows(sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail", "response_body",
"delegation_id", "created_at",
}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@ -260,9 +267,12 @@ func TestListDelegations_Empty(t *testing.T) {
if len(resp) != 0 {
t.Errorf("expected empty array, got %d entries", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ---------- ListDelegations: with results → 200 with entries ----------
// ---------- ListDelegations: with results (ledger only, no activity_logs fallback) ----------
func TestListDelegations_WithResults(t *testing.T) {
mock := setupTestDB(t)
@ -272,19 +282,20 @@ func TestListDelegations_WithResults(t *testing.T) {
dh := NewDelegationHandler(wh, broadcaster)
now := time.Now()
// Ledger query returns rows — no fallback to activity_logs
rows := sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail", "response_body",
"delegation_id", "created_at",
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail", "last_heartbeat",
"deadline", "created_at", "updated_at",
}).
AddRow("1", "delegation", "ws-source", "ws-target",
AddRow("del-111", "ws-source", "ws-target",
"Delegating to ws-target", "pending", "", "",
"del-111", now).
AddRow("2", "delegation", "ws-source", "ws-target",
"Delegation completed (hello world)", "completed", "", "hello world",
"del-111", now.Add(time.Minute))
&now, &now.Add(6*time.Hour), now, now).
AddRow("del-222", "ws-source", "ws-target",
"Delegation completed (hello world)", "completed", "hello world", "",
&now, &now.Add(6*time.Hour), now, now.Add(time.Minute))
mock.ExpectQuery("SELECT id, activity_type").
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
WithArgs("ws-source").
WillReturnRows(rows)
@ -308,23 +319,26 @@ func TestListDelegations_WithResults(t *testing.T) {
}
// Check first entry (pending delegation)
if resp[0]["type"] != "delegation" {
t.Errorf("expected type 'delegation', got %v", resp[0]["type"])
if resp[0]["delegation_id"] != "del-111" {
t.Errorf("expected delegation_id 'del-111', got %v", resp[0]["delegation_id"])
}
if resp[0]["status"] != "pending" {
t.Errorf("expected status 'pending', got %v", resp[0]["status"])
}
if resp[0]["delegation_id"] != "del-111" {
t.Errorf("expected delegation_id 'del-111', got %v", resp[0]["delegation_id"])
}
if resp[0]["source_id"] != "ws-source" {
t.Errorf("expected source_id 'ws-source', got %v", resp[0]["source_id"])
}
if resp[0]["target_id"] != "ws-target" {
t.Errorf("expected target_id 'ws-target', got %v", resp[0]["target_id"])
}
if resp[0]["_ledger"] != true {
t.Errorf("expected _ledger=true marker, got %v", resp[0]["_ledger"])
}
// Check second entry (completed, has response_preview)
if resp[1]["delegation_id"] != "del-222" {
t.Errorf("expected delegation_id 'del-222', got %v", resp[1]["delegation_id"])
}
if resp[1]["status"] != "completed" {
t.Errorf("expected status 'completed', got %v", resp[1]["status"])
}
@ -1262,4 +1276,327 @@ func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ---------- ListDelegations: ledger has rows → returns them (no activity_logs fallback) ----------
func TestListDelegations_LedgerRowsReturned(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
now := time.Now()
// Ledger query returns rows
ledgerRows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail", "last_heartbeat",
"deadline", "created_at", "updated_at",
}).AddRow(
"del-ledger-001", "caller-uuid", "callee-uuid",
"Analyze the codebase for bugs", "in_progress", "", "",
&now, &now.Add(6*time.Hour), now, now,
)
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
WithArgs("caller-uuid").
WillReturnRows(ledgerRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "caller-uuid"}}
c.Request = httptest.NewRequest("GET", "/workspaces/caller-uuid/delegations", nil)
dh.ListDelegations(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if len(resp) != 1 {
t.Fatalf("expected 1 entry, got %d", len(resp))
}
if resp[0]["delegation_id"] != "del-ledger-001" {
t.Errorf("expected delegation_id 'del-ledger-001', got %v", resp[0]["delegation_id"])
}
if resp[0]["status"] != "in_progress" {
t.Errorf("expected status 'in_progress', got %v", resp[0]["status"])
}
if resp[0]["_ledger"] != true {
t.Errorf("expected _ledger=true marker, got %v", resp[0]["_ledger"])
}
if resp[0]["source_id"] != "caller-uuid" {
t.Errorf("expected source_id 'caller-uuid', got %v", resp[0]["source_id"])
}
if resp[0]["target_id"] != "callee-uuid" {
t.Errorf("expected target_id 'callee-uuid', got %v", resp[0]["target_id"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ---------- ListDelegations: ledger empty → falls back to activity_logs ----------
func TestListDelegations_LedgerEmptyFallsBackToActivityLogs(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Ledger returns empty → falls back to activity_logs
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
WithArgs("ws-source").
WillReturnRows(sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail", "last_heartbeat",
"deadline", "created_at", "updated_at",
}))
now := time.Now()
activityRows := sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail", "response_body",
"delegation_id", "created_at",
}).AddRow(
"act-001", "delegation", "ws-source", "ws-target",
"Delegating to ws-target", "pending", "", "",
"del-old-001", now,
)
mock.ExpectQuery("SELECT id, activity_type").
WithArgs("ws-source").
WillReturnRows(activityRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-source"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-source/delegations", nil)
dh.ListDelegations(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if len(resp) != 1 {
t.Fatalf("expected 1 entry from fallback, got %d", len(resp))
}
if resp[0]["delegation_id"] != "del-old-001" {
t.Errorf("expected delegation_id 'del-old-001' from activity_logs, got %v", resp[0]["delegation_id"])
}
if resp[0]["type"] != "delegation" {
t.Errorf("expected type 'delegation' from activity_logs, got %v", resp[0]["type"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ---------- ListDelegations: both ledger and activity_logs empty → [] ----------
func TestListDelegations_BothEmptyReturnsEmptyArray(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Ledger empty
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
WithArgs("ws-source").
WillReturnRows(sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail", "last_heartbeat",
"deadline", "created_at", "updated_at",
}))
// activity_logs also empty
mock.ExpectQuery("SELECT id, activity_type").
WithArgs("ws-source").
WillReturnRows(sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail", "response_body",
"delegation_id", "created_at",
}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-source"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-source/delegations", nil)
dh.ListDelegations(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if len(resp) != 0 {
t.Errorf("expected empty array, got %d entries", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ---------- ListDelegations: ledger query error → falls back to activity_logs ----------
func TestListDelegations_LedgerQueryErrorFallsBackToActivityLogs(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Ledger query fails → fallback to activity_logs
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
WithArgs("ws-source").
WillReturnError(fmt.Errorf("table does not exist"))
now := time.Now()
activityRows := sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail", "response_body",
"delegation_id", "created_at",
}).AddRow(
"act-002", "delegation", "ws-source", "ws-target",
"Some task", "completed", "", "result here",
"del-pre-318", now,
)
mock.ExpectQuery("SELECT id, activity_type").
WithArgs("ws-source").
WillReturnRows(activityRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-source"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-source/delegations", nil)
dh.ListDelegations(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if len(resp) != 1 || resp[0]["delegation_id"] != "del-pre-318" {
t.Errorf("expected 1 activity_logs entry, got %v", resp)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ---------- ListDelegations: ledger completed delegation includes result_preview ----------
func TestListDelegations_LedgerCompletedIncludesResultPreview(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
now := time.Now()
ledgerRows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail", "last_heartbeat",
"deadline", "created_at", "updated_at",
}).AddRow(
"del-complete-001", "caller-uuid", "callee-uuid",
"Run analysis", "completed", "Analysis complete: 42 issues found", "",
&now, &now.Add(6*time.Hour), now, now,
)
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
WithArgs("caller-uuid").
WillReturnRows(ledgerRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "caller-uuid"}}
c.Request = httptest.NewRequest("GET", "/workspaces/caller-uuid/delegations", nil)
dh.ListDelegations(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if len(resp) != 1 {
t.Fatalf("expected 1 entry, got %d", len(resp))
}
if resp[0]["status"] != "completed" {
t.Errorf("expected status 'completed', got %v", resp[0]["status"])
}
if resp[0]["response_preview"] != "Analysis complete: 42 issues found" {
t.Errorf("expected response_preview, got %v", resp[0]["response_preview"])
}
if resp[0]["error"] != nil {
t.Errorf("expected no error on completed, got %v", resp[0]["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ---------- ListDelegations: ledger failed delegation includes error_detail ----------
func TestListDelegations_LedgerFailedIncludesErrorDetail(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
now := time.Now()
ledgerRows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail", "last_heartbeat",
"deadline", "created_at", "updated_at",
}).AddRow(
"del-failed-001", "caller-uuid", "callee-uuid",
"Fetch data", "failed", "", "Callee workspace not reachable",
&now, &now.Add(6*time.Hour), now, now,
)
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
WithArgs("caller-uuid").
WillReturnRows(ledgerRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "caller-uuid"}}
c.Request = httptest.NewRequest("GET", "/workspaces/caller-uuid/delegations", nil)
dh.ListDelegations(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if len(resp) != 1 {
t.Fatalf("expected 1 entry, got %d", len(resp))
}
if resp[0]["status"] != "failed" {
t.Errorf("expected status 'failed', got %v", resp[0]["status"])
}
if resp[0]["error"] != "Callee workspace not reachable" {
t.Errorf("expected error detail, got %v", resp[0]["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}