fix(handlers): write delegation_id to response_body in activity_logs INSERT #985

Closed
fullstack-engineer wants to merge 2 commits from fix/activity-logs-delegation-id-response-body into staging
2 changed files with 90 additions and 14 deletions

View File

@ -262,14 +262,24 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
"task": body.Task,
"delegation_id": delegationID,
})
// Write delegation_id to response_body as well as request_body. The
// ListDelegations API query reads delegation_id from BOTH fields so either
// path works. But consistency here prevents a subtle gap: if a future
// status-update path ever strips delegation_id from request_body, the
// API would still find it in response_body. (updateDelegationStatus only
// writes status — it never touches request_body — so this is belt-and-
// suspenders for the initial row. See also the Record handler below.)
respJSON, _ := json.Marshal(map[string]interface{}{
"delegation_id": delegationID,
})
var idemArg interface{}
if body.IdempotencyKey != "" {
idemArg = body.IdempotencyKey
}
_, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status, idempotency_key)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status, idempotency_key)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'pending', $7)
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), string(respJSON), idemArg)
if err == nil {
// RFC #2829 #318 — mirror to the durable delegations ledger
// (gated by DELEGATION_LEDGER_WRITE; default off → no-op).
@ -544,10 +554,16 @@ func (h *DelegationHandler) Record(c *gin.Context) {
"task": body.Task,
"delegation_id": body.DelegationID,
})
// Mirror delegation_id into response_body so ListDelegations (which reads
// COALESCE(request_body->>'delegation_id', response_body->>'delegation_id'))
// can locate this row regardless of which field the caller checks.
respJSON, _ := json.Marshal(map[string]interface{}{
"delegation_id": body.DelegationID,
})
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'dispatched')
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON)); err != nil {
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'dispatched')
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), string(respJSON)); err != nil {
log.Printf("Delegation Record: insert failed for %s: %v", body.DelegationID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to record delegation"})
return

View File

@ -3,6 +3,7 @@ package handlers
import (
"bytes"
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"net"
@ -133,9 +134,10 @@ func TestDelegate_Success(t *testing.T) {
targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
// Expect INSERT into activity_logs for delegation tracking
// (6th arg is idempotency_key — nil here since the request omits it)
// Args: $1=workspace_id, $2=source_id, $3=target_id, $4=summary,
// $5=request_body(jsonb), $6=response_body(jsonb), $7=idempotency_key(nil).
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), nil).
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil).
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect RecordAndBroadcast INSERT into structure_events
@ -189,9 +191,9 @@ func TestDelegate_DBInsertFails_Still202WithWarning(t *testing.T) {
targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
// DB insert fails (6th arg = idempotency_key, nil for this test)
// DB insert fails (7 args: $7=idempotency_key is nil for this test)
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), nil).
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil).
WillReturnError(fmt.Errorf("database connection lost"))
// RecordAndBroadcast still fires
@ -484,6 +486,7 @@ func TestDelegationRecord_InsertsActivityLogRow(t *testing.T) {
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
h := NewDelegationHandler(wh, broadcaster)
// Record handler now passes 6 args (added response_body jsonb).
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(
"550e8400-e29b-41d4-a716-446655440000", // workspace_id
@ -491,6 +494,7 @@ func TestDelegationRecord_InsertsActivityLogRow(t *testing.T) {
"550e8400-e29b-41d4-a716-446655440001", // target_id
"Delegating to 550e8400-e29b-41d4-a716-446655440001", // summary
sqlmock.AnyArg(), // request_body (jsonb)
sqlmock.AnyArg(), // response_body (jsonb) — added
).
WillReturnResult(sqlmock.NewResult(0, 1))
// RecordAndBroadcast INSERT for DELEGATION_SENT
@ -699,9 +703,9 @@ func TestDelegate_IdempotentFailedRowIsReleasedAndReplaced(t *testing.T) {
mock.ExpectExec("DELETE FROM activity_logs").
WithArgs("ws-source", "retry-key").
WillReturnResult(sqlmock.NewResult(0, 1))
// Fresh insert with the same idempotency key.
// Fresh insert with the same idempotency key (7 args: added response_body).
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "retry-key").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), "retry-key").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
@ -745,9 +749,9 @@ func TestDelegate_IdempotentRaceUniqueViolationReturnsExisting(t *testing.T) {
mock.ExpectQuery("SELECT request_body->>'delegation_id', status, target_id").
WithArgs("ws-source", "race-key").
WillReturnError(fmt.Errorf("sql: no rows in result set"))
// Insert loses the race against a concurrent caller.
// Insert loses the race against a concurrent caller (7 args: added response_body).
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "race-key").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), "race-key").
WillReturnError(fmt.Errorf("pq: duplicate key value violates unique constraint \"activity_logs_idempotency_uniq\""))
// Re-query returns the winner.
mock.ExpectQuery("SELECT request_body->>'delegation_id', status").
@ -900,13 +904,49 @@ func TestLookupIdempotentDelegation_ExistingPending(t *testing.T) {
// --- insertDelegationRow ---
// delegationIDResponseBodyMatcher verifies that the INSERT's response_body argument
// (the 6th bound parameter) contains the delegation_id in its JSON object.
// This is the fix for the activity_logs delegation_id parity bug: the initial
// delegation row now writes delegation_id to response_body (not just request_body)
// so both the Activity logs API and the agent's check_delegation_status can find
// the row by looking at either field.
type delegationIDResponseBodyMatcher struct {
delegationID string
}
func (m delegationIDResponseBodyMatcher) Match(v driver.Value) bool {
s, ok := v.(string)
if !ok {
return false
}
var parsed map[string]interface{}
if json.Unmarshal([]byte(s), &parsed) != nil {
return false
}
got, ok := parsed["delegation_id"].(string)
return ok && got == m.delegationID
}
func TestInsertDelegationRow_Success(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
// 7 args: sourceID($1), sourceID($2), targetID($3), summary($4),
// request_body JSON($5), response_body JSON($6), idempotency_key($7).
// response_body must contain the delegation_id.
taskJSON, _ := json.Marshal(map[string]interface{}{
"task": "hi",
"delegation_id": "del-1",
})
respJSON, _ := json.Marshal(map[string]interface{}{
"delegation_id": "del-1",
})
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
string(taskJSON), delegationIDResponseBodyMatcher{delegationID: "del-1"}, nil).
WillReturnResult(sqlmock.NewResult(0, 1))
out := insertDelegationRow(context.Background(), c,
@ -924,7 +964,17 @@ func TestInsertDelegationRow_IdempotentConflict(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
idemTaskJSON, _ := json.Marshal(map[string]interface{}{
"task": "hi",
"delegation_id": "loser-del",
})
idemRespJSON, _ := json.Marshal(map[string]interface{}{
"delegation_id": "loser-del",
})
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
string(idemTaskJSON), delegationIDResponseBodyMatcher{delegationID: "loser-del"}, "k1").
WillReturnError(fmt.Errorf("pq: duplicate key value violates unique constraint"))
mock.ExpectQuery("SELECT request_body->>'delegation_id', status").
WithArgs("ws-src", "k1").
@ -951,7 +1001,17 @@ func TestInsertDelegationRow_OtherDBError(t *testing.T) {
// Without IdempotencyKey, the follow-up SELECT is skipped — any insert
// error falls straight to insertTrackingUnavailable.
errTaskJSON, _ := json.Marshal(map[string]interface{}{
"task": "hi",
"delegation_id": "del-x",
})
errRespJSON, _ := json.Marshal(map[string]interface{}{
"delegation_id": "del-x",
})
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
string(errTaskJSON), delegationIDResponseBodyMatcher{delegationID: "del-x"}, nil).
WillReturnError(fmt.Errorf("connection refused"))
out := insertDelegationRow(context.Background(), c,