diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index ac110093..8ef915d3 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -262,14 +262,24 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b "task": body.Task, "delegation_id": delegationID, }) + // Write delegation_id to response_body as well as request_body. The + // ListDelegations API query reads delegation_id from BOTH fields so either + // path works. But consistency here prevents a subtle gap: if a future + // status-update path ever strips delegation_id from request_body, the + // API would still find it in response_body. (updateDelegationStatus only + // writes status — it never touches request_body — so this is belt-and- + // suspenders for the initial row. See also the Record handler below.) + respJSON, _ := json.Marshal(map[string]interface{}{ + "delegation_id": delegationID, + }) var idemArg interface{} if body.IdempotencyKey != "" { idemArg = body.IdempotencyKey } _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status, idempotency_key) - VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6) - `, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg) + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status, idempotency_key) + VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'pending', $7) + `, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), string(respJSON), idemArg) if err == nil { // RFC #2829 #318 — mirror to the durable delegations ledger // (gated by DELEGATION_LEDGER_WRITE; default off → no-op). @@ -544,10 +554,16 @@ func (h *DelegationHandler) Record(c *gin.Context) { "task": body.Task, "delegation_id": body.DelegationID, }) + // Mirror delegation_id into response_body so ListDelegations (which reads + // COALESCE(request_body->>'delegation_id', response_body->>'delegation_id')) + // can locate this row regardless of which field the caller checks. + respJSON, _ := json.Marshal(map[string]interface{}{ + "delegation_id": body.DelegationID, + }) if _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status) - VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'dispatched') - `, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON)); err != nil { + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status) + VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'dispatched') + `, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), string(respJSON)); err != nil { log.Printf("Delegation Record: insert failed for %s: %v", body.DelegationID, err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to record delegation"}) return diff --git a/workspace-server/internal/handlers/delegation_test.go b/workspace-server/internal/handlers/delegation_test.go index 2f560972..ed574072 100644 --- a/workspace-server/internal/handlers/delegation_test.go +++ b/workspace-server/internal/handlers/delegation_test.go @@ -3,6 +3,7 @@ package handlers import ( "bytes" "context" + "database/sql/driver" "encoding/json" "fmt" "net" @@ -133,9 +134,10 @@ func TestDelegate_Success(t *testing.T) { targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" // Expect INSERT into activity_logs for delegation tracking - // (6th arg is idempotency_key — nil here since the request omits it) + // Args: $1=workspace_id, $2=source_id, $3=target_id, $4=summary, + // $5=request_body(jsonb), $6=response_body(jsonb), $7=idempotency_key(nil). mock.ExpectExec("INSERT INTO activity_logs"). - WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), nil). + WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil). WillReturnResult(sqlmock.NewResult(0, 1)) // Expect RecordAndBroadcast INSERT into structure_events @@ -189,9 +191,9 @@ func TestDelegate_DBInsertFails_Still202WithWarning(t *testing.T) { targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" - // DB insert fails (6th arg = idempotency_key, nil for this test) + // DB insert fails (7 args: $7=idempotency_key is nil for this test) mock.ExpectExec("INSERT INTO activity_logs"). - WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), nil). + WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil). WillReturnError(fmt.Errorf("database connection lost")) // RecordAndBroadcast still fires @@ -484,6 +486,7 @@ func TestDelegationRecord_InsertsActivityLogRow(t *testing.T) { wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) h := NewDelegationHandler(wh, broadcaster) + // Record handler now passes 6 args (added response_body jsonb). mock.ExpectExec("INSERT INTO activity_logs"). WithArgs( "550e8400-e29b-41d4-a716-446655440000", // workspace_id @@ -491,6 +494,7 @@ func TestDelegationRecord_InsertsActivityLogRow(t *testing.T) { "550e8400-e29b-41d4-a716-446655440001", // target_id "Delegating to 550e8400-e29b-41d4-a716-446655440001", // summary sqlmock.AnyArg(), // request_body (jsonb) + sqlmock.AnyArg(), // response_body (jsonb) — added ). WillReturnResult(sqlmock.NewResult(0, 1)) // RecordAndBroadcast INSERT for DELEGATION_SENT @@ -699,9 +703,9 @@ func TestDelegate_IdempotentFailedRowIsReleasedAndReplaced(t *testing.T) { mock.ExpectExec("DELETE FROM activity_logs"). WithArgs("ws-source", "retry-key"). WillReturnResult(sqlmock.NewResult(0, 1)) - // Fresh insert with the same idempotency key. + // Fresh insert with the same idempotency key (7 args: added response_body). mock.ExpectExec("INSERT INTO activity_logs"). - WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "retry-key"). + WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), "retry-key"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectExec("INSERT INTO structure_events"). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -745,9 +749,9 @@ func TestDelegate_IdempotentRaceUniqueViolationReturnsExisting(t *testing.T) { mock.ExpectQuery("SELECT request_body->>'delegation_id', status, target_id"). WithArgs("ws-source", "race-key"). WillReturnError(fmt.Errorf("sql: no rows in result set")) - // Insert loses the race against a concurrent caller. + // Insert loses the race against a concurrent caller (7 args: added response_body). mock.ExpectExec("INSERT INTO activity_logs"). - WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "race-key"). + WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), "race-key"). WillReturnError(fmt.Errorf("pq: duplicate key value violates unique constraint \"activity_logs_idempotency_uniq\"")) // Re-query returns the winner. mock.ExpectQuery("SELECT request_body->>'delegation_id', status"). @@ -900,13 +904,49 @@ func TestLookupIdempotentDelegation_ExistingPending(t *testing.T) { // --- insertDelegationRow --- +// delegationIDResponseBodyMatcher verifies that the INSERT's response_body argument +// (the 6th bound parameter) contains the delegation_id in its JSON object. +// This is the fix for the activity_logs delegation_id parity bug: the initial +// delegation row now writes delegation_id to response_body (not just request_body) +// so both the Activity logs API and the agent's check_delegation_status can find +// the row by looking at either field. +type delegationIDResponseBodyMatcher struct { + delegationID string +} + +func (m delegationIDResponseBodyMatcher) Match(v driver.Value) bool { + s, ok := v.(string) + if !ok { + return false + } + var parsed map[string]interface{} + if json.Unmarshal([]byte(s), &parsed) != nil { + return false + } + got, ok := parsed["delegation_id"].(string) + return ok && got == m.delegationID +} + func TestInsertDelegationRow_Success(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) + // 7 args: sourceID($1), sourceID($2), targetID($3), summary($4), + // request_body JSON($5), response_body JSON($6), idempotency_key($7). + // response_body must contain the delegation_id. + taskJSON, _ := json.Marshal(map[string]interface{}{ + "task": "hi", + "delegation_id": "del-1", + }) + respJSON, _ := json.Marshal(map[string]interface{}{ + "delegation_id": "del-1", + }) mock.ExpectExec("INSERT INTO activity_logs"). + WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + string(taskJSON), delegationIDResponseBodyMatcher{delegationID: "del-1"}, nil). WillReturnResult(sqlmock.NewResult(0, 1)) out := insertDelegationRow(context.Background(), c, @@ -924,7 +964,17 @@ func TestInsertDelegationRow_IdempotentConflict(t *testing.T) { w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) + idemTaskJSON, _ := json.Marshal(map[string]interface{}{ + "task": "hi", + "delegation_id": "loser-del", + }) + idemRespJSON, _ := json.Marshal(map[string]interface{}{ + "delegation_id": "loser-del", + }) mock.ExpectExec("INSERT INTO activity_logs"). + WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + string(idemTaskJSON), delegationIDResponseBodyMatcher{delegationID: "loser-del"}, "k1"). WillReturnError(fmt.Errorf("pq: duplicate key value violates unique constraint")) mock.ExpectQuery("SELECT request_body->>'delegation_id', status"). WithArgs("ws-src", "k1"). @@ -951,7 +1001,17 @@ func TestInsertDelegationRow_OtherDBError(t *testing.T) { // Without IdempotencyKey, the follow-up SELECT is skipped — any insert // error falls straight to insertTrackingUnavailable. + errTaskJSON, _ := json.Marshal(map[string]interface{}{ + "task": "hi", + "delegation_id": "del-x", + }) + errRespJSON, _ := json.Marshal(map[string]interface{}{ + "delegation_id": "del-x", + }) mock.ExpectExec("INSERT INTO activity_logs"). + WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + string(errTaskJSON), delegationIDResponseBodyMatcher{delegationID: "del-x"}, nil). WillReturnError(fmt.Errorf("connection refused")) out := insertDelegationRow(context.Background(), c,