fix(handlers): write delegation_id to response_body in activity_logs INSERT #985
@ -262,14 +262,24 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
|
||||
"task": body.Task,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
// Write delegation_id to response_body as well as request_body. The
|
||||
// ListDelegations API query reads delegation_id from BOTH fields so either
|
||||
// path works. But consistency here prevents a subtle gap: if a future
|
||||
// status-update path ever strips delegation_id from request_body, the
|
||||
// API would still find it in response_body. (updateDelegationStatus only
|
||||
// writes status — it never touches request_body — so this is belt-and-
|
||||
// suspenders for the initial row. See also the Record handler below.)
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
var idemArg interface{}
|
||||
if body.IdempotencyKey != "" {
|
||||
idemArg = body.IdempotencyKey
|
||||
}
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status, idempotency_key)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
|
||||
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status, idempotency_key)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'pending', $7)
|
||||
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), string(respJSON), idemArg)
|
||||
if err == nil {
|
||||
// RFC #2829 #318 — mirror to the durable delegations ledger
|
||||
// (gated by DELEGATION_LEDGER_WRITE; default off → no-op).
|
||||
@ -544,10 +554,16 @@ func (h *DelegationHandler) Record(c *gin.Context) {
|
||||
"task": body.Task,
|
||||
"delegation_id": body.DelegationID,
|
||||
})
|
||||
// Mirror delegation_id into response_body so ListDelegations (which reads
|
||||
// COALESCE(request_body->>'delegation_id', response_body->>'delegation_id'))
|
||||
// can locate this row regardless of which field the caller checks.
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": body.DelegationID,
|
||||
})
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'dispatched')
|
||||
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON)); err != nil {
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'dispatched')
|
||||
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), string(respJSON)); err != nil {
|
||||
log.Printf("Delegation Record: insert failed for %s: %v", body.DelegationID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to record delegation"})
|
||||
return
|
||||
|
||||
@ -3,6 +3,7 @@ package handlers
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
@ -133,9 +134,10 @@ func TestDelegate_Success(t *testing.T) {
|
||||
targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
||||
|
||||
// Expect INSERT into activity_logs for delegation tracking
|
||||
// (6th arg is idempotency_key — nil here since the request omits it)
|
||||
// Args: $1=workspace_id, $2=source_id, $3=target_id, $4=summary,
|
||||
// $5=request_body(jsonb), $6=response_body(jsonb), $7=idempotency_key(nil).
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), nil).
|
||||
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect RecordAndBroadcast INSERT into structure_events
|
||||
@ -189,9 +191,9 @@ func TestDelegate_DBInsertFails_Still202WithWarning(t *testing.T) {
|
||||
|
||||
targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
||||
|
||||
// DB insert fails (6th arg = idempotency_key, nil for this test)
|
||||
// DB insert fails (7 args: $7=idempotency_key is nil for this test)
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), nil).
|
||||
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil).
|
||||
WillReturnError(fmt.Errorf("database connection lost"))
|
||||
|
||||
// RecordAndBroadcast still fires
|
||||
@ -484,6 +486,7 @@ func TestDelegationRecord_InsertsActivityLogRow(t *testing.T) {
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
h := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
// Record handler now passes 6 args (added response_body jsonb).
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs(
|
||||
"550e8400-e29b-41d4-a716-446655440000", // workspace_id
|
||||
@ -491,6 +494,7 @@ func TestDelegationRecord_InsertsActivityLogRow(t *testing.T) {
|
||||
"550e8400-e29b-41d4-a716-446655440001", // target_id
|
||||
"Delegating to 550e8400-e29b-41d4-a716-446655440001", // summary
|
||||
sqlmock.AnyArg(), // request_body (jsonb)
|
||||
sqlmock.AnyArg(), // response_body (jsonb) — added
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// RecordAndBroadcast INSERT for DELEGATION_SENT
|
||||
@ -699,9 +703,9 @@ func TestDelegate_IdempotentFailedRowIsReleasedAndReplaced(t *testing.T) {
|
||||
mock.ExpectExec("DELETE FROM activity_logs").
|
||||
WithArgs("ws-source", "retry-key").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// Fresh insert with the same idempotency key.
|
||||
// Fresh insert with the same idempotency key (7 args: added response_body).
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "retry-key").
|
||||
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), "retry-key").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
@ -745,9 +749,9 @@ func TestDelegate_IdempotentRaceUniqueViolationReturnsExisting(t *testing.T) {
|
||||
mock.ExpectQuery("SELECT request_body->>'delegation_id', status, target_id").
|
||||
WithArgs("ws-source", "race-key").
|
||||
WillReturnError(fmt.Errorf("sql: no rows in result set"))
|
||||
// Insert loses the race against a concurrent caller.
|
||||
// Insert loses the race against a concurrent caller (7 args: added response_body).
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "race-key").
|
||||
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), "race-key").
|
||||
WillReturnError(fmt.Errorf("pq: duplicate key value violates unique constraint \"activity_logs_idempotency_uniq\""))
|
||||
// Re-query returns the winner.
|
||||
mock.ExpectQuery("SELECT request_body->>'delegation_id', status").
|
||||
@ -900,13 +904,49 @@ func TestLookupIdempotentDelegation_ExistingPending(t *testing.T) {
|
||||
|
||||
// --- insertDelegationRow ---
|
||||
|
||||
// delegationIDResponseBodyMatcher verifies that the INSERT's response_body argument
|
||||
// (the 6th bound parameter) contains the delegation_id in its JSON object.
|
||||
// This is the fix for the activity_logs delegation_id parity bug: the initial
|
||||
// delegation row now writes delegation_id to response_body (not just request_body)
|
||||
// so both the Activity logs API and the agent's check_delegation_status can find
|
||||
// the row by looking at either field.
|
||||
type delegationIDResponseBodyMatcher struct {
|
||||
delegationID string
|
||||
}
|
||||
|
||||
func (m delegationIDResponseBodyMatcher) Match(v driver.Value) bool {
|
||||
s, ok := v.(string)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if json.Unmarshal([]byte(s), &parsed) != nil {
|
||||
return false
|
||||
}
|
||||
got, ok := parsed["delegation_id"].(string)
|
||||
return ok && got == m.delegationID
|
||||
}
|
||||
|
||||
func TestInsertDelegationRow_Success(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
// 7 args: sourceID($1), sourceID($2), targetID($3), summary($4),
|
||||
// request_body JSON($5), response_body JSON($6), idempotency_key($7).
|
||||
// response_body must contain the delegation_id.
|
||||
taskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"task": "hi",
|
||||
"delegation_id": "del-1",
|
||||
})
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": "del-1",
|
||||
})
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
string(taskJSON), delegationIDResponseBodyMatcher{delegationID: "del-1"}, nil).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
out := insertDelegationRow(context.Background(), c,
|
||||
@ -924,7 +964,17 @@ func TestInsertDelegationRow_IdempotentConflict(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
idemTaskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"task": "hi",
|
||||
"delegation_id": "loser-del",
|
||||
})
|
||||
idemRespJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": "loser-del",
|
||||
})
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
string(idemTaskJSON), delegationIDResponseBodyMatcher{delegationID: "loser-del"}, "k1").
|
||||
WillReturnError(fmt.Errorf("pq: duplicate key value violates unique constraint"))
|
||||
mock.ExpectQuery("SELECT request_body->>'delegation_id', status").
|
||||
WithArgs("ws-src", "k1").
|
||||
@ -951,7 +1001,17 @@ func TestInsertDelegationRow_OtherDBError(t *testing.T) {
|
||||
|
||||
// Without IdempotencyKey, the follow-up SELECT is skipped — any insert
|
||||
// error falls straight to insertTrackingUnavailable.
|
||||
errTaskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"task": "hi",
|
||||
"delegation_id": "del-x",
|
||||
})
|
||||
errRespJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": "del-x",
|
||||
})
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs("ws-src", "ws-src", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"Delegating to aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
string(errTaskJSON), delegationIDResponseBodyMatcher{delegationID: "del-x"}, nil).
|
||||
WillReturnError(fmt.Errorf("connection refused"))
|
||||
|
||||
out := insertDelegationRow(context.Background(), c,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user