fix(a2a-queue): extend system-caller normalization to a2a_queue.caller_id (was: #2694 RC #99248) #2696

Merged
devops-engineer merged 2 commits from fix/2694-a2a-queue-callerid into main 2026-06-13 04:29:14 +00:00
2 changed files with 151 additions and 7 deletions
@@ -113,8 +113,20 @@ func EnqueueA2A(
if idempotencyKey != "" {
keyArg = idempotencyKey
}
// Normalize the callerID the same way nilIfEmpty does in
// a2a_proxy_helpers.go: system-caller prefixes (webhook:,
// system:, test:, channel:) are non-UUID routing markers, not real
// workspace ids. Persisting them to a2a_queue.caller_id (a
// UUID-typed column per migrations/042_a2a_queue.up.sql:21) would
// trip a Postgres UUID cast failure → "invalid input syntax for
// type uuid" → EnqueueA2A returns an error → the busy-A2A path
// falls through to a 503 instead of queueing. See #2694 RC
// #99248 for the symptom + #2693 for the broader #2680 lineage.
//
// Real workspace UUIDs are passed through unchanged so the
// queue-row attribution is preserved.
var callerArg interface{}
if callerID != "" {
if callerID != "" && !isSystemCaller(callerID) {
callerArg = callerID
}
var methodArg interface{}
@@ -50,15 +50,46 @@ func expectSupersedeExpired(mock sqlmock.Sqlmock, workspaceID, key string, rowsR
// expectInsert registers the INSERT ... ON CONFLICT DO NOTHING RETURNING id.
// newID is the id the insert returns (non-conflict / fresh enqueue path).
func expectInsert(mock sqlmock.Sqlmock, newID string) {
//
// expectedCallerIDBind pins the exact bind value the test expects for
// the caller_id column. Pass:
// - sqlmock.AnyArg() for "I don't care" (used by the existing tests
// that don't pin the caller_id shape — they're not regression-tests
// for the system-caller normalization, just for the canonical
// enqueue path).
// - nil (sqlmock's nil interface{}) for system-caller prefixes — pins
// that the literal "system:restart-context" (or any other
// systemCallerPrefixes entry) is normalized to NULL in the bind
// parameter, not persisted as the literal string. This is the
// regression-test shape for #2694 RC #99248 (the busy-A2A path
// that tripped a UUID cast failure on a2a_queue.caller_id).
// - the actual workspace UUID string — pins that a real workspace
// UUID-shaped callerID is passed through unchanged (not
// normalized to NULL — that would hide real-workspace
// attribution in the queue row).
//
// The two new tests (TestEnqueueA2A_SystemCallerNormalizesToNULLCallerID
// + TestEnqueueA2A_RealWorkspaceUUIDPreserved) use this helper with
// explicit bind expectations to pin the new normalization contract.
func expectInsert(mock sqlmock.Sqlmock, newID string, expectedCallerIDBind interface{}) {
// Only pin the caller_id bind (position 2). The other 6 parameters are
// not the focus of the system-caller normalization test — use
// sqlmock.AnyArg() so the helper doesn't break when a test passes
// different values for workspace_id / key / expires_at (e.g.
// NoKey_SkipsSupersede passes enqKey="" which becomes nil at the
// SQL bind per the EnqueueA2A code's if-idempotencyKey!=""). The
// point of the test is to pin caller_id specifically, not the rest
// of the row.
mock.ExpectQuery(`
INSERT INTO a2a_queue (workspace_id, caller_id, priority, body, method, idempotency_key, expires_at)
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7)
ON CONFLICT (workspace_id, idempotency_key)
WHERE idempotency_key IS NOT NULL AND status IN ('queued','dispatched')
DO NOTHING
DO NOTHING
RETURNING id
`).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(newID))
`).
WithArgs(sqlmock.AnyArg(), expectedCallerIDBind, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(newID))
}
// expectDepth registers the trailing queue-depth count query.
@@ -81,7 +112,7 @@ func TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick(t *testing.T) {
expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 1)
// With the active set cleared, the insert proceeds (no conflict) → new id.
const freshID = "fresh-tick-id"
expectInsert(mock, freshID)
expectInsert(mock, freshID, nil)
expectDepth(mock, enqWorkspaceID, 1)
nextRun := time.Now().Add(30 * time.Second)
@@ -104,6 +135,107 @@ func TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick(t *testing.T) {
}
}
// TestEnqueueA2A_SystemCallerNormalizesToNULLCallerID: the synthetic
// "system:restart-context" callerID (and all systemCallerPrefixes:
// webhook:, system:, test:, channel:) must be normalized to NULL in
// the a2a_queue.caller_id bind parameter, NOT persisted as the literal
// string. The column is UUID-typed (migrations/042_a2a_queue.up.sql:21),
// so a literal-string insert would trip a Postgres UUID cast failure
// → EnqueueA2A returns an error → the busy-A2A path falls through to
// a 503 instead of queueing. See #2694 RC #99248 + #2693 for the
// broader #2680 lineage. Real workspace UUIDs are passed through
// unchanged (regression-guard).
//
// This test pins the new normalization contract. Without it, the
// restart-context → busy-queue path would have appeared "fixed" by my
// prior activity-log nilIfEmpty PR but still trip the UUID cast on
// the queue insert (a different column, same callerID-typed poison).
func TestEnqueueA2A_SystemCallerNormalizesToNULLCallerID(t *testing.T) {
// All 4 systemCallerPrefixes from a2a_proxy.go:82-84 must normalize
// to NULL in the caller_id bind. We test with "system:restart-context"
// (the actual offender) and the other 3 prefixes for full coverage.
systemCallerIDs := []string{
"webhook:github",
"system:restart-context", // the actual offender
"system:other-svc",
"test:integration-1",
"channel:discord",
}
for _, sysCaller := range systemCallerIDs {
mock := setupTestDBForQueueTests(t)
// No expired row for this key → the supersede UPDATE affects 0 rows.
expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 0)
// The insert proceeds. The mock's ExpectExec will validate the bind
// parameter shape: caller_id must be a nil interface{} (NOT the
// literal system-prefix string). sqlmock's default comparison
// distinguishes nil from non-nil, so passing the literal string
// would fail the expectationsWereMet check.
const freshID = "fresh-id-sys-caller"
expectInsert(mock, freshID, nil)
expectDepth(mock, enqWorkspaceID, 1)
nextRun := time.Now().Add(30 * time.Second)
id, depth, err := EnqueueA2A(
context.Background(), enqWorkspaceID, sysCaller, PriorityTask,
[]byte(enqBody), enqMethod, enqKey, &nextRun,
)
if err != nil {
t.Errorf("system-caller %q: EnqueueA2A returned error: %v", sysCaller, err)
}
if id != freshID {
t.Errorf("system-caller %q: expected fresh id %q, got %q", sysCaller, freshID, id)
}
if depth != 1 {
t.Errorf("system-caller %q: expected depth 1, got %d", sysCaller, depth)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("system-caller %q: unmet sqlmock expectations: %v "+
"(the literal callerID must have been normalized to NULL in the bind)", sysCaller, err)
}
}
}
// TestEnqueueA2A_RealWorkspaceUUIDPreserved: regression-guard that a real
// workspace UUID-shaped callerID still gets persisted as a non-nil
// bind parameter (otherwise we'd hide real-workspace attribution in the
// queue row). The fix in #2694 must NOT regress this case.
func TestEnqueueA2A_RealWorkspaceUUIDPreserved(t *testing.T) {
mock := setupTestDBForQueueTests(t)
// A real workspace UUID-shaped string (no system prefix). Per the
// isSystemCaller() rule, this should NOT be normalized to NULL —
// it must be passed through as the caller_id bind. Declared BEFORE
// the expectInsert call so the literal is in scope when sqlmock
// matches the bind parameter (the helper pins the exact value).
realUUID := "9a40df22-ba4b-3fc0-75c1-66dd6869ff25" // a real UUID-shaped string
expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 0)
const freshID = "fresh-id-real-uuid"
expectInsert(mock, freshID, realUUID)
expectDepth(mock, enqWorkspaceID, 1)
nextRun := time.Now().Add(30 * time.Second)
id, depth, err := EnqueueA2A(
context.Background(), enqWorkspaceID, realUUID, PriorityTask,
[]byte(enqBody), enqMethod, enqKey, &nextRun,
)
if err != nil {
t.Fatalf("real workspace UUID: EnqueueA2A returned error: %v", err)
}
if id != freshID {
t.Errorf("real workspace UUID: expected fresh id %q, got %q", freshID, id)
}
if depth != 1 {
t.Errorf("real workspace UUID: expected depth 1, got %d", depth)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("real workspace UUID: unmet sqlmock expectations: %v "+
"(a real workspace UUID-shaped callerID must be passed through, not normalized to NULL)", err)
}
}
// TestEnqueueA2A_NoExpiredRow_NormalEnqueue: when no expired row exists the
// supersede UPDATE simply affects zero rows and the enqueue proceeds normally.
func TestEnqueueA2A_NoExpiredRow_NormalEnqueue(t *testing.T) {
@@ -111,7 +243,7 @@ func TestEnqueueA2A_NoExpiredRow_NormalEnqueue(t *testing.T) {
expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 0) // nothing to retire
const newID = "new-id"
expectInsert(mock, newID)
expectInsert(mock, newID, nil)
expectDepth(mock, enqWorkspaceID, 2)
nextRun := time.Now().Add(30 * time.Second)
@@ -141,7 +273,7 @@ func TestEnqueueA2A_NoKey_SkipsSupersede(t *testing.T) {
// No expectSupersedeExpired — it must NOT be issued when key is empty.
const newID = "no-key-id"
expectInsert(mock, newID)
expectInsert(mock, newID, nil)
expectDepth(mock, enqWorkspaceID, 1)
id, _, err := EnqueueA2A(